ops_utils.tdr_utils.tdr_bq_utils

Module for handling TDR interactions with BigQuery.

  1"""Module for handling TDR interactions with BigQuery."""
  2
  3from ..bq_utils import BigQueryUtil
  4from .tdr_api_utils import TDR
  5
  6import logging
  7from typing import Optional, Any
  8
  9
 10class GetTdrAssetInfo:
 11    """Class to obtain TDR asset metadata (from dataset or snapshot) from BigQuery."""
 12
 13    def __init__(self, tdr: TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None):
 14        """
 15        Initialize the GetTdrAssetInfo class.
 16
 17        **Args:**
 18        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API.
 19        - dataset_id (str, optional): ID of the dataset.
 20        - snapshot_id (str, optional): ID of the snapshot.
 21        """
 22        if not dataset_id and not snapshot_id:
 23            raise ValueError("Either dataset_id or snapshot_id must be provided.")
 24        self.tdr = tdr
 25        """@private"""
 26        self.dataset_id = dataset_id
 27        """@private"""
 28        self.snapshot_id = snapshot_id
 29        """@private"""
 30
 31    def _get_dataset_info(self) -> dict:
 32        """
 33        Retrieve dataset information from TDR.
 34
 35        **Returns:**
 36        - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships.
 37        """
 38        dataset_info = self.tdr.get_dataset_info(
 39            dataset_id=self.dataset_id,  # type: ignore[arg-type]
 40            info_to_include=["SCHEMA", "ACCESS_INFORMATION"]
 41        ).json()
 42        return {
 43            "bq_project": dataset_info["accessInformation"]["bigQuery"]["projectId"],
 44            "bq_schema": dataset_info["accessInformation"]["bigQuery"]["datasetName"],
 45            "tables": dataset_info["schema"]["tables"],
 46            "relationships": dataset_info["schema"]["relationships"]
 47        }
 48
 49    def _get_snapshot_info(self) -> Optional[dict]:
 50        """
 51        Retrieve snapshot information from TDR.
 52
 53        **Returns:**
 54        - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships.
 55        Returns None if the snapshot is not found or access is denied.
 56        """
 57        response = self.tdr.get_snapshot_info(
 58            snapshot_id=self.snapshot_id,  # type: ignore[arg-type]
 59            info_to_include=["TABLES", "RELATIONSHIPS", "ACCESS_INFORMATION"]
 60        )
 61        if response:
 62            snapshot_info = response.json()
 63            return {
 64                "bq_project": snapshot_info["accessInformation"]["bigQuery"]["projectId"],
 65                "bq_schema": snapshot_info["accessInformation"]["bigQuery"]["datasetName"],
 66                "tables": snapshot_info["tables"],
 67                "relationships": snapshot_info["relationships"]
 68            }
 69        return None
 70
 71    def run(self) -> Optional[dict]:
 72        """
 73        Execute the process to retrieve either dataset or snapshot information.
 74
 75        **Returns:**
 76        - dict (optional): A dictionary containing the relevant information based on whether `dataset_id` or
 77            `snapshot_id` is provided. Returns None if the snapshot is not found or access is denied.
 78        """
 79        if self.dataset_id:
 80            return self._get_dataset_info()
 81        return self._get_snapshot_info()
 82
 83
 84class TdrBq:
 85    """Class to interact with TDR BigQuery tables."""
 86
 87    def __init__(self, project_id: str, bq_schema: str):
 88        """
 89        Initialize the TdrBq class.
 90
 91        **Args:**
 92        - project_id (str): The Google Cloud project ID.
 93        - bq_schema (str): The BigQuery schema name.
 94        """
 95        self.project_id = project_id
 96        """@private"""
 97        self.bq_schema = bq_schema
 98        """@private"""
 99        self.bq_util = BigQueryUtil(project_id)
100        """@private"""
101
102    def check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool:
103        """
104        Check the permissions for accessing BigQuery for specific dataset.
105
106        **Args:**
107        - raise_on_other_failure (bool): Whether to raise an exception on other failures.
108
109        **Returns:**
110        - bool: `True` if permissions are sufficient, `False` otherwise.
111        """
112        query = f"""SELECT 1 FROM `{self.project_id}.{self.bq_schema}.INFORMATION_SCHEMA.TABLES`"""
113        return self.bq_util.check_permissions_for_query(
114            query=query,
115            raise_on_other_failure=raise_on_other_failure
116        )
117
118    def get_tdr_table_contents(self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any:
119        """
120        Retrieve the contents of a TDR table from BigQuery.
121
122        **Args:**
123        - exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column.
124        - to_dataframe (bool): Whether to return the results as a DataFrame.
125
126        **Returns:**
127        - list[dict]: The contents of the table
128        """
129        if exclude_datarepo_id:
130            exclude_str = "EXCEPT (datarepo_row_id)"
131        else:
132            exclude_str = ""
133        query = f"""SELECT * {exclude_str} FROM `{self.project_id}.{self.bq_schema}.{table_name}`"""
134        logging.info(f"Getting contents of table {table_name} from BQ")
135        return self.bq_util.query_table(query=query, to_dataframe=to_dataframe)
class GetTdrAssetInfo:
11class GetTdrAssetInfo:
12    """Class to obtain TDR asset metadata (from dataset or snapshot) from BigQuery."""
13
14    def __init__(self, tdr: TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None):
15        """
16        Initialize the GetTdrAssetInfo class.
17
18        **Args:**
19        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API.
20        - dataset_id (str, optional): ID of the dataset.
21        - snapshot_id (str, optional): ID of the snapshot.
22        """
23        if not dataset_id and not snapshot_id:
24            raise ValueError("Either dataset_id or snapshot_id must be provided.")
25        self.tdr = tdr
26        """@private"""
27        self.dataset_id = dataset_id
28        """@private"""
29        self.snapshot_id = snapshot_id
30        """@private"""
31
32    def _get_dataset_info(self) -> dict:
33        """
34        Retrieve dataset information from TDR.
35
36        **Returns:**
37        - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships.
38        """
39        dataset_info = self.tdr.get_dataset_info(
40            dataset_id=self.dataset_id,  # type: ignore[arg-type]
41            info_to_include=["SCHEMA", "ACCESS_INFORMATION"]
42        ).json()
43        return {
44            "bq_project": dataset_info["accessInformation"]["bigQuery"]["projectId"],
45            "bq_schema": dataset_info["accessInformation"]["bigQuery"]["datasetName"],
46            "tables": dataset_info["schema"]["tables"],
47            "relationships": dataset_info["schema"]["relationships"]
48        }
49
50    def _get_snapshot_info(self) -> Optional[dict]:
51        """
52        Retrieve snapshot information from TDR.
53
54        **Returns:**
55        - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships.
56        Returns None if the snapshot is not found or access is denied.
57        """
58        response = self.tdr.get_snapshot_info(
59            snapshot_id=self.snapshot_id,  # type: ignore[arg-type]
60            info_to_include=["TABLES", "RELATIONSHIPS", "ACCESS_INFORMATION"]
61        )
62        if response:
63            snapshot_info = response.json()
64            return {
65                "bq_project": snapshot_info["accessInformation"]["bigQuery"]["projectId"],
66                "bq_schema": snapshot_info["accessInformation"]["bigQuery"]["datasetName"],
67                "tables": snapshot_info["tables"],
68                "relationships": snapshot_info["relationships"]
69            }
70        return None
71
72    def run(self) -> Optional[dict]:
73        """
74        Execute the process to retrieve either dataset or snapshot information.
75
76        **Returns:**
77        - dict (optional): A dictionary containing the relevant information based on whether `dataset_id` or
78            `snapshot_id` is provided. Returns None if the snapshot is not found or access is denied.
79        """
80        if self.dataset_id:
81            return self._get_dataset_info()
82        return self._get_snapshot_info()

Class to obtain TDR asset metadata (from dataset or snapshot) from BigQuery.

GetTdrAssetInfo( tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None)
14    def __init__(self, tdr: TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None):
15        """
16        Initialize the GetTdrAssetInfo class.
17
18        **Args:**
19        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API.
20        - dataset_id (str, optional): ID of the dataset.
21        - snapshot_id (str, optional): ID of the snapshot.
22        """
23        if not dataset_id and not snapshot_id:
24            raise ValueError("Either dataset_id or snapshot_id must be provided.")
25        self.tdr = tdr
26        """@private"""
27        self.dataset_id = dataset_id
28        """@private"""
29        self.snapshot_id = snapshot_id
30        """@private"""

Initialize the GetTdrAssetInfo class.

Args:

  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): TDR instance for interacting with the TDR API.
  • dataset_id (str, optional): ID of the dataset.
  • snapshot_id (str, optional): ID of the snapshot.
def run(self) -> Optional[dict]:
72    def run(self) -> Optional[dict]:
73        """
74        Execute the process to retrieve either dataset or snapshot information.
75
76        **Returns:**
77        - dict (optional): A dictionary containing the relevant information based on whether `dataset_id` or
78            `snapshot_id` is provided. Returns None if the snapshot is not found or access is denied.
79        """
80        if self.dataset_id:
81            return self._get_dataset_info()
82        return self._get_snapshot_info()

Execute the process to retrieve either dataset or snapshot information.

Returns:

  • dict (optional): A dictionary containing the relevant information based on whether dataset_id or snapshot_id is provided. Returns None if the snapshot is not found or access is denied.
class TdrBq:
 85class TdrBq:
 86    """Class to interact with TDR BigQuery tables."""
 87
 88    def __init__(self, project_id: str, bq_schema: str):
 89        """
 90        Initialize the TdrBq class.
 91
 92        **Args:**
 93        - project_id (str): The Google Cloud project ID.
 94        - bq_schema (str): The BigQuery schema name.
 95        """
 96        self.project_id = project_id
 97        """@private"""
 98        self.bq_schema = bq_schema
 99        """@private"""
100        self.bq_util = BigQueryUtil(project_id)
101        """@private"""
102
103    def check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool:
104        """
105        Check the permissions for accessing BigQuery for specific dataset.
106
107        **Args:**
108        - raise_on_other_failure (bool): Whether to raise an exception on other failures.
109
110        **Returns:**
111        - bool: `True` if permissions are sufficient, `False` otherwise.
112        """
113        query = f"""SELECT 1 FROM `{self.project_id}.{self.bq_schema}.INFORMATION_SCHEMA.TABLES`"""
114        return self.bq_util.check_permissions_for_query(
115            query=query,
116            raise_on_other_failure=raise_on_other_failure
117        )
118
119    def get_tdr_table_contents(self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any:
120        """
121        Retrieve the contents of a TDR table from BigQuery.
122
123        **Args:**
124        - exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column.
125        - to_dataframe (bool): Whether to return the results as a DataFrame.
126
127        **Returns:**
128        - list[dict]: The contents of the table
129        """
130        if exclude_datarepo_id:
131            exclude_str = "EXCEPT (datarepo_row_id)"
132        else:
133            exclude_str = ""
134        query = f"""SELECT * {exclude_str} FROM `{self.project_id}.{self.bq_schema}.{table_name}`"""
135        logging.info(f"Getting contents of table {table_name} from BQ")
136        return self.bq_util.query_table(query=query, to_dataframe=to_dataframe)

Class to interact with TDR BigQuery tables.

TdrBq(project_id: str, bq_schema: str)
 88    def __init__(self, project_id: str, bq_schema: str):
 89        """
 90        Initialize the TdrBq class.
 91
 92        **Args:**
 93        - project_id (str): The Google Cloud project ID.
 94        - bq_schema (str): The BigQuery schema name.
 95        """
 96        self.project_id = project_id
 97        """@private"""
 98        self.bq_schema = bq_schema
 99        """@private"""
100        self.bq_util = BigQueryUtil(project_id)
101        """@private"""

Initialize the TdrBq class.

Args:

  • project_id (str): The Google Cloud project ID.
  • bq_schema (str): The BigQuery schema name.
def check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool:
103    def check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool:
104        """
105        Check the permissions for accessing BigQuery for specific dataset.
106
107        **Args:**
108        - raise_on_other_failure (bool): Whether to raise an exception on other failures.
109
110        **Returns:**
111        - bool: `True` if permissions are sufficient, `False` otherwise.
112        """
113        query = f"""SELECT 1 FROM `{self.project_id}.{self.bq_schema}.INFORMATION_SCHEMA.TABLES`"""
114        return self.bq_util.check_permissions_for_query(
115            query=query,
116            raise_on_other_failure=raise_on_other_failure
117        )

Check the permissions for accessing BigQuery for specific dataset.

Args:

  • raise_on_other_failure (bool): Whether to raise an exception on other failures.

Returns:

  • bool: True if permissions are sufficient, False otherwise.
def get_tdr_table_contents( self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any:
119    def get_tdr_table_contents(self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any:
120        """
121        Retrieve the contents of a TDR table from BigQuery.
122
123        **Args:**
124        - exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column.
125        - to_dataframe (bool): Whether to return the results as a DataFrame.
126
127        **Returns:**
128        - list[dict]: The contents of the table
129        """
130        if exclude_datarepo_id:
131            exclude_str = "EXCEPT (datarepo_row_id)"
132        else:
133            exclude_str = ""
134        query = f"""SELECT * {exclude_str} FROM `{self.project_id}.{self.bq_schema}.{table_name}`"""
135        logging.info(f"Getting contents of table {table_name} from BQ")
136        return self.bq_util.query_table(query=query, to_dataframe=to_dataframe)

Retrieve the contents of a TDR table from BigQuery.

Args:

  • exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column.
  • to_dataframe (bool): Whether to return the results as a DataFrame.

Returns:

  • list[dict]: The contents of the table