ops_utils.token_util

Module for generating tokens for other module services.

 1"""Module for generating tokens for other module services."""
 2import httplib2
 3import pytz
 4import logging
 5import requests
 6import os
 7from typing import Optional, Union
 8from datetime import datetime, timedelta
 9
10
11class Token:
12    """Class for generating tokens for other module services."""
13
14    def __init__(self, token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None) -> None:
15        """Initialize the Token class.
16
17        **Args:**
18        - token_file (str, optional): The path to a file containing an existing token string.
19        - extra_scopes (list[str], optional): Additional scopes to request for the token.
20        """
21        self.expiry: Optional[datetime] = None
22        """@private"""
23        self.token_string: Optional[str] = ""
24        """@private"""
25        # If provided with a file just use the contents of file
26        if token_file:
27            self.token_file = token_file
28            with open(self.token_file) as f:
29                self.token_string = f.read().rstrip()
30        else:
31            self.token_file = ""
32            from oauth2client.client import GoogleCredentials
33            self.credentials = GoogleCredentials.get_application_default()
34            self.credentials = self.credentials.create_scoped(
35                [
36                    "https://www.googleapis.com/auth/userinfo.profile",
37                    "https://www.googleapis.com/auth/userinfo.email",
38                    "https://www.googleapis.com/auth/devstorage.full_control"
39                ] + (extra_scopes if extra_scopes else [])
40            )
41
42    def _get_gcp_token(self) -> Union[str, None]:
43        # Refresh token if it has not been set or if it is expired or close to expiry
44        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
45            http = httplib2.Http()
46            self.credentials.refresh(http)
47            self.token_string = self.credentials.get_access_token().access_token
48            # Set expiry to use UTC since google uses that timezone
49            self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC)  # type: ignore[union-attr]
50            # Convert expiry time to EST for logging
51            est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern"))  # type: ignore[union-attr]
52            logging.info(f"New token expires at {est_expiry} EST")
53        return self.token_string
54
55    def _get_sa_token(self) -> Union[str, None]:
56        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
57            SCOPES = ['https://www.googleapis.com/auth/userinfo.profile',
58                      'https://www.googleapis.com/auth/userinfo.email']
59            url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}"  # noqa: E501
60            token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'})
61            self.token_string = token_response.json()['access_token']
62        return token_response.json()['access_token']
63
64    def get_token(self) -> str:
65        """
66        Generate a token with a set expiration time.
67
68        **Returns:**
69        - string: The generated token
70        """
71        # If token file provided then always return contents
72        if self.token_file:
73            return self.token_string  # type: ignore[return-value]
74        else:
75            # detect if this is running as a cloud run job
76            if os.getenv("CLOUD_RUN_JOB"):
77                return self._get_sa_token()  # type: ignore[return-value]
78            else:
79                return self._get_gcp_token()  # type: ignore[return-value]
class Token:
12class Token:
13    """Class for generating tokens for other module services."""
14
15    def __init__(self, token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None) -> None:
16        """Initialize the Token class.
17
18        **Args:**
19        - token_file (str, optional): The path to a file containing an existing token string.
20        - extra_scopes (list[str], optional): Additional scopes to request for the token.
21        """
22        self.expiry: Optional[datetime] = None
23        """@private"""
24        self.token_string: Optional[str] = ""
25        """@private"""
26        # If provided with a file just use the contents of file
27        if token_file:
28            self.token_file = token_file
29            with open(self.token_file) as f:
30                self.token_string = f.read().rstrip()
31        else:
32            self.token_file = ""
33            from oauth2client.client import GoogleCredentials
34            self.credentials = GoogleCredentials.get_application_default()
35            self.credentials = self.credentials.create_scoped(
36                [
37                    "https://www.googleapis.com/auth/userinfo.profile",
38                    "https://www.googleapis.com/auth/userinfo.email",
39                    "https://www.googleapis.com/auth/devstorage.full_control"
40                ] + (extra_scopes if extra_scopes else [])
41            )
42
43    def _get_gcp_token(self) -> Union[str, None]:
44        # Refresh token if it has not been set or if it is expired or close to expiry
45        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
46            http = httplib2.Http()
47            self.credentials.refresh(http)
48            self.token_string = self.credentials.get_access_token().access_token
49            # Set expiry to use UTC since google uses that timezone
50            self.expiry = self.credentials.token_expiry.replace(tzinfo=pytz.UTC)  # type: ignore[union-attr]
51            # Convert expiry time to EST for logging
52            est_expiry = self.expiry.astimezone(pytz.timezone("US/Eastern"))  # type: ignore[union-attr]
53            logging.info(f"New token expires at {est_expiry} EST")
54        return self.token_string
55
56    def _get_sa_token(self) -> Union[str, None]:
57        if not self.token_string or not self.expiry or self.expiry < datetime.now(pytz.UTC) + timedelta(minutes=5):
58            SCOPES = ['https://www.googleapis.com/auth/userinfo.profile',
59                      'https://www.googleapis.com/auth/userinfo.email']
60            url = f"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token?scopes={','.join(SCOPES)}"  # noqa: E501
61            token_response = requests.get(url, headers={'Metadata-Flavor': 'Google'})
62            self.token_string = token_response.json()['access_token']
63        return token_response.json()['access_token']
64
65    def get_token(self) -> str:
66        """
67        Generate a token with a set expiration time.
68
69        **Returns:**
70        - string: The generated token
71        """
72        # If token file provided then always return contents
73        if self.token_file:
74            return self.token_string  # type: ignore[return-value]
75        else:
76            # detect if this is running as a cloud run job
77            if os.getenv("CLOUD_RUN_JOB"):
78                return self._get_sa_token()  # type: ignore[return-value]
79            else:
80                return self._get_gcp_token()  # type: ignore[return-value]

Class for generating tokens for other module services.

Token( token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None)
15    def __init__(self, token_file: Optional[str] = None, extra_scopes: Optional[list[str]] = None) -> None:
16        """Initialize the Token class.
17
18        **Args:**
19        - token_file (str, optional): The path to a file containing an existing token string.
20        - extra_scopes (list[str], optional): Additional scopes to request for the token.
21        """
22        self.expiry: Optional[datetime] = None
23        """@private"""
24        self.token_string: Optional[str] = ""
25        """@private"""
26        # If provided with a file just use the contents of file
27        if token_file:
28            self.token_file = token_file
29            with open(self.token_file) as f:
30                self.token_string = f.read().rstrip()
31        else:
32            self.token_file = ""
33            from oauth2client.client import GoogleCredentials
34            self.credentials = GoogleCredentials.get_application_default()
35            self.credentials = self.credentials.create_scoped(
36                [
37                    "https://www.googleapis.com/auth/userinfo.profile",
38                    "https://www.googleapis.com/auth/userinfo.email",
39                    "https://www.googleapis.com/auth/devstorage.full_control"
40                ] + (extra_scopes if extra_scopes else [])
41            )

Initialize the Token class.

Args:

  • token_file (str, optional): The path to a file containing an existing token string.
  • extra_scopes (list[str], optional): Additional scopes to request for the token.
def get_token(self) -> str:
65    def get_token(self) -> str:
66        """
67        Generate a token with a set expiration time.
68
69        **Returns:**
70        - string: The generated token
71        """
72        # If token file provided then always return contents
73        if self.token_file:
74            return self.token_string  # type: ignore[return-value]
75        else:
76            # detect if this is running as a cloud run job
77            if os.getenv("CLOUD_RUN_JOB"):
78                return self._get_sa_token()  # type: ignore[return-value]
79            else:
80                return self._get_gcp_token()  # type: ignore[return-value]

Generate a token with a set expiration time.

Returns:

  • string: The generated token