ops_utils.token_util
Module for generating tokens for other module services.
1"""Module for generating tokens for other module services.""" 2import httplib2 3import pytz 4import logging 5import requests 6import os 7from typing import Optional, Union 8from datetime import datetime, timedelta 9 10 11class Token: 12 """Class for generating tokens for other module services.""" 13 14 def __init__(self, token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None) -> None: 15 """Initialize the Token class. 16 17 **Args:** 18 - token_file (str, optional): The path to a file containing an existing token string. 19 - extra_scopes (list[str], optional): Additional scopes to request for the token. 20 """ 21 self.expiry: Optional[datetime] = None 22 """@private""" 23 self.token_string: Optional[str] = "" 24 """@private""" 25 # If provided with a file just use the contents of file 26 if token_file: 27 self.token_file = token_file 28 with open(self.token_file) as f: 29 self.token_string = f.read().rstrip() 30 else: 31 self.token_file = "" 32 from oauth2client.client import GoogleCredentials 33 self.credentials = GoogleCredentials.get_application_default() 34 self.credentials = self.credentials.create_scoped( 35 [ 36 "https://www.googleapis.com/auth/userinfo.profile", 37 "https://www.googleapis.com/auth/userinfo.email", 38 "https://www.googleapis.com/auth/devstorage.full_control" 39 ] + (extra_scopes if extra_scopes else []) 40 ) 41 42 def _get_gcp_token(self) -> Union[str, None]: 43 # Refresh token if it has not been set or if it is expired or close to expiry 44 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 45 http = httplib2.Http() 46 self.credentials.refresh(http) 47 self.token_string = self.credentials.get_access_token().access_token 48 # Set expiry to use UTC since google uses that timezone 49 self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC) # type: ignore[union-attr] 50 # Convert expiry time to EST for logging 51 est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern")) # type: ignore[union-attr] 52 logging.info(f"New token expires at {est_expiry} EST") 53 return self.token_string 54 55 def _get_sa_token(self) -> Union[str, None]: 56 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 57 SCOPES = ['https://www.googleapis.com/auth/userinfo.profile', 58 'https://www.googleapis.com/auth/userinfo.email'] 59 url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}" # noqa: E501 60 token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'}) 61 self.token_string = token_response.json()['access_token'] 62 return token_response.json()['access_token'] 63 64 def get_token(self) -> str: 65 """ 66 Generate a token with a set expiration time. 67 68 **Returns:** 69 - string: The generated token 70 """ 71 # If token file provided then always return contents 72 if self.token_file: 73 return self.token_string # type: ignore[return-value] 74 else: 75 # detect if this is running as a cloud run job 76 if os.getenv("CLOUD_RUN_JOB"): 77 return self._get_sa_token() # type: ignore[return-value] 78 else: 79 return self._get_gcp_token() # type: ignore[return-value]
class
Token:
12class Token: 13 """Class for generating tokens for other module services.""" 14 15 def __init__(self, token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None) -> None: 16 """Initialize the Token class. 17 18 **Args:** 19 - token_file (str, optional): The path to a file containing an existing token string. 20 - extra_scopes (list[str], optional): Additional scopes to request for the token. 21 """ 22 self.expiry: Optional[datetime] = None 23 """@private""" 24 self.token_string: Optional[str] = "" 25 """@private""" 26 # If provided with a file just use the contents of file 27 if token_file: 28 self.token_file = token_file 29 with open(self.token_file) as f: 30 self.token_string = f.read().rstrip() 31 else: 32 self.token_file = "" 33 from oauth2client.client import GoogleCredentials 34 self.credentials = GoogleCredentials.get_application_default() 35 self.credentials = self.credentials.create_scoped( 36 [ 37 "https://www.googleapis.com/auth/userinfo.profile", 38 "https://www.googleapis.com/auth/userinfo.email", 39 "https://www.googleapis.com/auth/devstorage.full_control" 40 ] + (extra_scopes if extra_scopes else []) 41 ) 42 43 def _get_gcp_token(self) -> Union[str, None]: 44 # Refresh token if it has not been set or if it is expired or close to expiry 45 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 46 http = httplib2.Http() 47 self.credentials.refresh(http) 48 self.token_string = self.credentials.get_access_token().access_token 49 # Set expiry to use UTC since google uses that timezone 50 self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC) # type: ignore[union-attr] 51 # Convert expiry time to EST for logging 52 est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern")) # type: ignore[union-attr] 53 logging.info(f"New token expires at {est_expiry} EST") 54 return self.token_string 55 56 def _get_sa_token(self) -> Union[str, None]: 57 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 58 SCOPES = ['https://www.googleapis.com/auth/userinfo.profile', 59 'https://www.googleapis.com/auth/userinfo.email'] 60 url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}" # noqa: E501 61 token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'}) 62 self.token_string = token_response.json()['access_token'] 63 return token_response.json()['access_token'] 64 65 def get_token(self) -> str: 66 """ 67 Generate a token with a set expiration time. 68 69 **Returns:** 70 - string: The generated token 71 """ 72 # If token file provided then always return contents 73 if self.token_file: 74 return self.token_string # type: ignore[return-value] 75 else: 76 # detect if this is running as a cloud run job 77 if os.getenv("CLOUD_RUN_JOB"): 78 return self._get_sa_token() # type: ignore[return-value] 79 else: 80 return self._get_gcp_token() # type: ignore[return-value]
Class for generating tokens for other module services.
Token( token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None)
15 def __init__(self, token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None) -> None: 16 """Initialize the Token class. 17 18 **Args:** 19 - token_file (str, optional): The path to a file containing an existing token string. 20 - extra_scopes (list[str], optional): Additional scopes to request for the token. 21 """ 22 self.expiry: Optional[datetime] = None 23 """@private""" 24 self.token_string: Optional[str] = "" 25 """@private""" 26 # If provided with a file just use the contents of file 27 if token_file: 28 self.token_file = token_file 29 with open(self.token_file) as f: 30 self.token_string = f.read().rstrip() 31 else: 32 self.token_file = "" 33 from oauth2client.client import GoogleCredentials 34 self.credentials = GoogleCredentials.get_application_default() 35 self.credentials = self.credentials.create_scoped( 36 [ 37 "https://www.googleapis.com/auth/userinfo.profile", 38 "https://www.googleapis.com/auth/userinfo.email", 39 "https://www.googleapis.com/auth/devstorage.full_control" 40 ] + (extra_scopes if extra_scopes else []) 41 )
Initialize the Token class.
Args:
- token_file (str, optional): The path to a file containing an existing token string.
- extra_scopes (list[str], optional): Additional scopes to request for the token.
def
get_token(self) -> str:
65 def get_token(self) -> str: 66 """ 67 Generate a token with a set expiration time. 68 69 **Returns:** 70 - string: The generated token 71 """ 72 # If token file provided then always return contents 73 if self.token_file: 74 return self.token_string # type: ignore[return-value] 75 else: 76 # detect if this is running as a cloud run job 77 if os.getenv("CLOUD_RUN_JOB"): 78 return self._get_sa_token() # type: ignore[return-value] 79 else: 80 return self._get_gcp_token() # type: ignore[return-value]
Generate a token with a set expiration time.
Returns:
- string: The generated token