ops_utils.token_util

Module for generating tokens for other module services.

  1"""Module for generating tokens for other module services."""
  2import httplib2
  3import pytz
  4import logging
  5import requests
  6import os
  7from typing import Optional, Union
  8from datetime import datetime, timedelta
  9
 10
 11class Token:
 12    """Class for generating tokens for other module services."""
 13
 14    def __init__(
 15            self,
 16            token_file: Optional[str] = None,
 17            extra_scopes: Optional[list[str]] = None,
 18            service_account_json: Optional[str] = None
 19    ) -> None:
 20        """Initialize the Token class.
 21
 22        **Args:**
 23        - token_file (str, optional): The path to a file containing an existing token string.
 24        - extra_scopes (list[str], optional): Additional scopes to request for the token.
 25        - service_account_json (str, optional): Path to service account JSON key file.
 26        """
 27        self.expiry: Optional[datetime] = None
 28        """@private"""
 29        self.token_string: Optional[str] = ""
 30        """@private"""
 31
 32        # If provided with a file just use the contents of file
 33        if token_file:
 34            self.token_file = token_file
 35            with open(self.token_file) as f:
 36                self.token_string = f.read().rstrip()
 37            return
 38
 39        self.token_file = ""
 40
 41        # Default scopes
 42        scopes = [
 43            "https://www.googleapis.com/auth/userinfo.profile",
 44            "https://www.googleapis.com/auth/userinfo.email",
 45            "https://www.googleapis.com/auth/devstorage.full_control"
 46        ]
 47
 48        # Add extra scopes if provided
 49        if extra_scopes:
 50            scopes.extend(extra_scopes)
 51
 52        # Use service account if provided
 53        if service_account_json:
 54            from oauth2client.service_account import ServiceAccountCredentials
 55            self.credentials = ServiceAccountCredentials.from_json_keyfile_name(
 56                service_account_json,
 57                scopes=scopes
 58            )
 59        else:
 60            # Fall back to application default credentials
 61            from oauth2client.client import GoogleCredentials
 62            self.credentials = GoogleCredentials.get_application_default()
 63            self.credentials = self.credentials.create_scoped(scopes)
 64
 65    def _get_gcp_token(self) -> Union[str, None]:
 66        # Refresh token if it has not been set or if it is expired or close to expiry
 67        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
 68            http = httplib2.Http()
 69            self.credentials.refresh(http)
 70            self.token_string = self.credentials.get_access_token().access_token
 71            # Set expiry to use UTC since google uses that timezone
 72            self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC)  # type: ignore[union-attr]
 73            # Convert expiry time to EST for logging
 74            est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern"))  # type: ignore[union-attr]
 75            logging.info(f"New token expires at {est_expiry} EST")
 76        return self.token_string
 77
 78    def _get_sa_token(self) -> Union[str, None]:
 79        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
 80            SCOPES = ['https://www.googleapis.com/auth/userinfo.profile',
 81                      'https://www.googleapis.com/auth/userinfo.email']
 82            url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}"  # noqa: E501
 83            token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'})
 84            self.token_string = token_response.json()['access_token']
 85        return token_response.json()['access_token']
 86
 87    def get_token(self) -> str:
 88        """
 89        Generate a token with a set expiration time.
 90
 91        **Returns:**
 92        - string: The generated token
 93        """
 94        # If token file provided then always return contents
 95        if self.token_file:
 96            return self.token_string  # type: ignore[return-value]
 97        else:
 98            # detect if this is running as a cloud run job
 99            if os.getenv("CLOUD_RUN_JOB"):
100                return self._get_sa_token()  # type: ignore[return-value]
101            else:
102                return self._get_gcp_token()  # type: ignore[return-value]
class Token:
 12class Token:
 13    """Class for generating tokens for other module services."""
 14
 15    def __init__(
 16            self,
 17            token_file: Optional[str] = None,
 18            extra_scopes: Optional[list[str]] = None,
 19            service_account_json: Optional[str] = None
 20    ) -> None:
 21        """Initialize the Token class.
 22
 23        **Args:**
 24        - token_file (str, optional): The path to a file containing an existing token string.
 25        - extra_scopes (list[str], optional): Additional scopes to request for the token.
 26        - service_account_json (str, optional): Path to service account JSON key file.
 27        """
 28        self.expiry: Optional[datetime] = None
 29        """@private"""
 30        self.token_string: Optional[str] = ""
 31        """@private"""
 32
 33        # If provided with a file just use the contents of file
 34        if token_file:
 35            self.token_file = token_file
 36            with open(self.token_file) as f:
 37                self.token_string = f.read().rstrip()
 38            return
 39
 40        self.token_file = ""
 41
 42        # Default scopes
 43        scopes = [
 44            "https://www.googleapis.com/auth/userinfo.profile",
 45            "https://www.googleapis.com/auth/userinfo.email",
 46            "https://www.googleapis.com/auth/devstorage.full_control"
 47        ]
 48
 49        # Add extra scopes if provided
 50        if extra_scopes:
 51            scopes.extend(extra_scopes)
 52
 53        # Use service account if provided
 54        if service_account_json:
 55            from oauth2client.service_account import ServiceAccountCredentials
 56            self.credentials = ServiceAccountCredentials.from_json_keyfile_name(
 57                service_account_json,
 58                scopes=scopes
 59            )
 60        else:
 61            # Fall back to application default credentials
 62            from oauth2client.client import GoogleCredentials
 63            self.credentials = GoogleCredentials.get_application_default()
 64            self.credentials = self.credentials.create_scoped(scopes)
 65
 66    def _get_gcp_token(self) -> Union[str, None]:
 67        # Refresh token if it has not been set or if it is expired or close to expiry
 68        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
 69            http = httplib2.Http()
 70            self.credentials.refresh(http)
 71            self.token_string = self.credentials.get_access_token().access_token
 72            # Set expiry to use UTC since google uses that timezone
 73            self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC)  # type: ignore[union-attr]
 74            # Convert expiry time to EST for logging
 75            est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern"))  # type: ignore[union-attr]
 76            logging.info(f"New token expires at {est_expiry} EST")
 77        return self.token_string
 78
 79    def _get_sa_token(self) -> Union[str, None]:
 80        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
 81            SCOPES = ['https://www.googleapis.com/auth/userinfo.profile',
 82                      'https://www.googleapis.com/auth/userinfo.email']
 83            url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}"  # noqa: E501
 84            token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'})
 85            self.token_string = token_response.json()['access_token']
 86        return token_response.json()['access_token']
 87
 88    def get_token(self) -> str:
 89        """
 90        Generate a token with a set expiration time.
 91
 92        **Returns:**
 93        - string: The generated token
 94        """
 95        # If token file provided then always return contents
 96        if self.token_file:
 97            return self.token_string  # type: ignore[return-value]
 98        else:
 99            # detect if this is running as a cloud run job
100            if os.getenv("CLOUD_RUN_JOB"):
101                return self._get_sa_token()  # type: ignore[return-value]
102            else:
103                return self._get_gcp_token()  # type: ignore[return-value]

Class for generating tokens for other module services.

Token( token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None, service_account_json: Optional[str] = None)
15    def __init__(
16            self,
17            token_file: Optional[str] = None,
18            extra_scopes: Optional[list[str]] = None,
19            service_account_json: Optional[str] = None
20    ) -> None:
21        """Initialize the Token class.
22
23        **Args:**
24        - token_file (str, optional): The path to a file containing an existing token string.
25        - extra_scopes (list[str], optional): Additional scopes to request for the token.
26        - service_account_json (str, optional): Path to service account JSON key file.
27        """
28        self.expiry: Optional[datetime] = None
29        """@private"""
30        self.token_string: Optional[str] = ""
31        """@private"""
32
33        # If provided with a file just use the contents of file
34        if token_file:
35            self.token_file = token_file
36            with open(self.token_file) as f:
37                self.token_string = f.read().rstrip()
38            return
39
40        self.token_file = ""
41
42        # Default scopes
43        scopes = [
44            "https://www.googleapis.com/auth/userinfo.profile",
45            "https://www.googleapis.com/auth/userinfo.email",
46            "https://www.googleapis.com/auth/devstorage.full_control"
47        ]
48
49        # Add extra scopes if provided
50        if extra_scopes:
51            scopes.extend(extra_scopes)
52
53        # Use service account if provided
54        if service_account_json:
55            from oauth2client.service_account import ServiceAccountCredentials
56            self.credentials = ServiceAccountCredentials.from_json_keyfile_name(
57                service_account_json,
58                scopes=scopes
59            )
60        else:
61            # Fall back to application default credentials
62            from oauth2client.client import GoogleCredentials
63            self.credentials = GoogleCredentials.get_application_default()
64            self.credentials = self.credentials.create_scoped(scopes)

Initialize the Token class.

Args:

  • token_file (str, optional): The path to a file containing an existing token string.
  • extra_scopes (list[str], optional): Additional scopes to request for the token.
  • service_account_json (str, optional): Path to service account JSON key file.
token_file
def get_token(self) -> str:
 88    def get_token(self) -> str:
 89        """
 90        Generate a token with a set expiration time.
 91
 92        **Returns:**
 93        - string: The generated token
 94        """
 95        # If token file provided then always return contents
 96        if self.token_file:
 97            return self.token_string  # type: ignore[return-value]
 98        else:
 99            # detect if this is running as a cloud run job
100            if os.getenv("CLOUD_RUN_JOB"):
101                return self._get_sa_token()  # type: ignore[return-value]
102            else:
103                return self._get_gcp_token()  # type: ignore[return-value]

Generate a token with a set expiration time.

Returns:

  • string: The generated token