ops_utils.token_util
Module for generating tokens for other module services.
1"""Module for generating tokens for other module services.""" 2import httplib2 3import pytz 4import logging 5import requests 6import os 7from typing import Optional, Union 8from datetime import datetime, timedelta 9 10 11class Token: 12 """Class for generating tokens for other module services.""" 13 14 def __init__( 15 self, 16 token_file: Optional[str] = None, 17 extra_scopes: Optional[list[str]] = None, 18 service_account_json: Optional[str] = None 19 ) -> None: 20 """Initialize the Token class. 21 22 **Args:** 23 - token_file (str, optional): The path to a file containing an existing token string. 24 - extra_scopes (list[str], optional): Additional scopes to request for the token. 25 - service_account_json (str, optional): Path to service account JSON key file. 26 """ 27 self.expiry: Optional[datetime] = None 28 """@private""" 29 self.token_string: Optional[str] = "" 30 """@private""" 31 32 # If provided with a file just use the contents of file 33 if token_file: 34 self.token_file = token_file 35 with open(self.token_file) as f: 36 self.token_string = f.read().rstrip() 37 return 38 39 self.token_file = "" 40 41 # Default scopes 42 scopes = [ 43 "https://www.googleapis.com/auth/userinfo.profile", 44 "https://www.googleapis.com/auth/userinfo.email", 45 "https://www.googleapis.com/auth/devstorage.full_control" 46 ] 47 48 # Add extra scopes if provided 49 if extra_scopes: 50 scopes.extend(extra_scopes) 51 52 # Use service account if provided 53 if service_account_json: 54 from oauth2client.service_account import ServiceAccountCredentials 55 self.credentials = ServiceAccountCredentials.from_json_keyfile_name( 56 service_account_json, 57 scopes=scopes 58 ) 59 else: 60 # Fall back to application default credentials 61 from oauth2client.client import GoogleCredentials 62 self.credentials = GoogleCredentials.get_application_default() 63 self.credentials = self.credentials.create_scoped(scopes) 64 65 def _get_gcp_token(self) -> Union[str, None]: 66 # Refresh token if it has not been set or if it is expired or close to expiry 67 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 68 http = httplib2.Http() 69 self.credentials.refresh(http) 70 self.token_string = self.credentials.get_access_token().access_token 71 # Set expiry to use UTC since google uses that timezone 72 self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC) # type: ignore[union-attr] 73 # Convert expiry time to EST for logging 74 est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern")) # type: ignore[union-attr] 75 logging.info(f"New token expires at {est_expiry} EST") 76 return self.token_string 77 78 def _get_sa_token(self) -> Union[str, None]: 79 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 80 SCOPES = ['https://www.googleapis.com/auth/userinfo.profile', 81 'https://www.googleapis.com/auth/userinfo.email'] 82 url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}" # noqa: E501 83 token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'}) 84 self.token_string = token_response.json()['access_token'] 85 return token_response.json()['access_token'] 86 87 def get_token(self) -> str: 88 """ 89 Generate a token with a set expiration time. 90 91 **Returns:** 92 - string: The generated token 93 """ 94 # If token file provided then always return contents 95 if self.token_file: 96 return self.token_string # type: ignore[return-value] 97 else: 98 # detect if this is running as a cloud run job 99 if os.getenv("CLOUD_RUN_JOB"): 100 return self._get_sa_token() # type: ignore[return-value] 101 else: 102 return self._get_gcp_token() # type: ignore[return-value]
class
Token:
12class Token: 13 """Class for generating tokens for other module services.""" 14 15 def __init__( 16 self, 17 token_file: Optional[str] = None, 18 extra_scopes: Optional[list[str]] = None, 19 service_account_json: Optional[str] = None 20 ) -> None: 21 """Initialize the Token class. 22 23 **Args:** 24 - token_file (str, optional): The path to a file containing an existing token string. 25 - extra_scopes (list[str], optional): Additional scopes to request for the token. 26 - service_account_json (str, optional): Path to service account JSON key file. 27 """ 28 self.expiry: Optional[datetime] = None 29 """@private""" 30 self.token_string: Optional[str] = "" 31 """@private""" 32 33 # If provided with a file just use the contents of file 34 if token_file: 35 self.token_file = token_file 36 with open(self.token_file) as f: 37 self.token_string = f.read().rstrip() 38 return 39 40 self.token_file = "" 41 42 # Default scopes 43 scopes = [ 44 "https://www.googleapis.com/auth/userinfo.profile", 45 "https://www.googleapis.com/auth/userinfo.email", 46 "https://www.googleapis.com/auth/devstorage.full_control" 47 ] 48 49 # Add extra scopes if provided 50 if extra_scopes: 51 scopes.extend(extra_scopes) 52 53 # Use service account if provided 54 if service_account_json: 55 from oauth2client.service_account import ServiceAccountCredentials 56 self.credentials = ServiceAccountCredentials.from_json_keyfile_name( 57 service_account_json, 58 scopes=scopes 59 ) 60 else: 61 # Fall back to application default credentials 62 from oauth2client.client import GoogleCredentials 63 self.credentials = GoogleCredentials.get_application_default() 64 self.credentials = self.credentials.create_scoped(scopes) 65 66 def _get_gcp_token(self) -> Union[str, None]: 67 # Refresh token if it has not been set or if it is expired or close to expiry 68 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 69 http = httplib2.Http() 70 self.credentials.refresh(http) 71 self.token_string = self.credentials.get_access_token().access_token 72 # Set expiry to use UTC since google uses that timezone 73 self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC) # type: ignore[union-attr] 74 # Convert expiry time to EST for logging 75 est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern")) # type: ignore[union-attr] 76 logging.info(f"New token expires at {est_expiry} EST") 77 return self.token_string 78 79 def _get_sa_token(self) -> Union[str, None]: 80 if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5): 81 SCOPES = ['https://www.googleapis.com/auth/userinfo.profile', 82 'https://www.googleapis.com/auth/userinfo.email'] 83 url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}" # noqa: E501 84 token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'}) 85 self.token_string = token_response.json()['access_token'] 86 return token_response.json()['access_token'] 87 88 def get_token(self) -> str: 89 """ 90 Generate a token with a set expiration time. 91 92 **Returns:** 93 - string: The generated token 94 """ 95 # If token file provided then always return contents 96 if self.token_file: 97 return self.token_string # type: ignore[return-value] 98 else: 99 # detect if this is running as a cloud run job 100 if os.getenv("CLOUD_RUN_JOB"): 101 return self._get_sa_token() # type: ignore[return-value] 102 else: 103 return self._get_gcp_token() # type: ignore[return-value]
Class for generating tokens for other module services.
Token( token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None, service_account_json: Optional[str] = None)
15 def __init__( 16 self, 17 token_file: Optional[str] = None, 18 extra_scopes: Optional[list[str]] = None, 19 service_account_json: Optional[str] = None 20 ) -> None: 21 """Initialize the Token class. 22 23 **Args:** 24 - token_file (str, optional): The path to a file containing an existing token string. 25 - extra_scopes (list[str], optional): Additional scopes to request for the token. 26 - service_account_json (str, optional): Path to service account JSON key file. 27 """ 28 self.expiry: Optional[datetime] = None 29 """@private""" 30 self.token_string: Optional[str] = "" 31 """@private""" 32 33 # If provided with a file just use the contents of file 34 if token_file: 35 self.token_file = token_file 36 with open(self.token_file) as f: 37 self.token_string = f.read().rstrip() 38 return 39 40 self.token_file = "" 41 42 # Default scopes 43 scopes = [ 44 "https://www.googleapis.com/auth/userinfo.profile", 45 "https://www.googleapis.com/auth/userinfo.email", 46 "https://www.googleapis.com/auth/devstorage.full_control" 47 ] 48 49 # Add extra scopes if provided 50 if extra_scopes: 51 scopes.extend(extra_scopes) 52 53 # Use service account if provided 54 if service_account_json: 55 from oauth2client.service_account import ServiceAccountCredentials 56 self.credentials = ServiceAccountCredentials.from_json_keyfile_name( 57 service_account_json, 58 scopes=scopes 59 ) 60 else: 61 # Fall back to application default credentials 62 from oauth2client.client import GoogleCredentials 63 self.credentials = GoogleCredentials.get_application_default() 64 self.credentials = self.credentials.create_scoped(scopes)
Initialize the Token class.
Args:
- token_file (str, optional): The path to a file containing an existing token string.
- extra_scopes (list[str], optional): Additional scopes to request for the token.
- service_account_json (str, optional): Path to service account JSON key file.
def
get_token(self) -> str:
88 def get_token(self) -> str: 89 """ 90 Generate a token with a set expiration time. 91 92 **Returns:** 93 - string: The generated token 94 """ 95 # If token file provided then always return contents 96 if self.token_file: 97 return self.token_string # type: ignore[return-value] 98 else: 99 # detect if this is running as a cloud run job 100 if os.getenv("CLOUD_RUN_JOB"): 101 return self._get_sa_token() # type: ignore[return-value] 102 else: 103 return self._get_gcp_token() # type: ignore[return-value]
Generate a token with a set expiration time.
Returns:
- string: The generated token