ops_utils.tdr_utils.tdr_bq_utils
Module for handling TDR interactions with BigQuery.
1"""Module for handling TDR interactions with BigQuery.""" 2 3from ..bq_utils import BigQueryUtil 4from .tdr_api_utils import TDR 5 6import logging 7from typing import Optional, Any 8 9 10class GetTdrAssetInfo: 11 """Class to obtain TDR asset metadata (from dataset or snapshot) from BigQuery.""" 12 13 def __init__(self, tdr: TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None): 14 """ 15 Initialize the GetTdrAssetInfo class. 16 17 **Args:** 18 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API. 19 - dataset_id (str, optional): ID of the dataset. 20 - snapshot_id (str, optional): ID of the snapshot. 21 """ 22 if not dataset_id and not snapshot_id: 23 raise ValueError("Either dataset_id or snapshot_id must be provided.") 24 self.tdr = tdr 25 """@private""" 26 self.dataset_id = dataset_id 27 """@private""" 28 self.snapshot_id = snapshot_id 29 """@private""" 30 31 def _get_dataset_info(self) -> dict: 32 """ 33 Retrieve dataset information from TDR. 34 35 **Returns:** 36 - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships. 37 """ 38 dataset_info = self.tdr.get_dataset_info( 39 dataset_id=self.dataset_id, # type: ignore[arg-type] 40 info_to_include=["SCHEMA", "ACCESS_INFORMATION"] 41 ).json() 42 return { 43 "bq_project": dataset_info["accessInformation"]["bigQuery"]["projectId"], 44 "bq_schema": dataset_info["accessInformation"]["bigQuery"]["datasetName"], 45 "tables": dataset_info["schema"]["tables"], 46 "relationships": dataset_info["schema"]["relationships"] 47 } 48 49 def _get_snapshot_info(self) -> Optional[dict]: 50 """ 51 Retrieve snapshot information from TDR. 52 53 **Returns:** 54 - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships. 55 Returns None if the snapshot is not found or access is denied. 56 """ 57 response = self.tdr.get_snapshot_info( 58 snapshot_id=self.snapshot_id, # type: ignore[arg-type] 59 info_to_include=["TABLES", "RELATIONSHIPS", "ACCESS_INFORMATION"] 60 ) 61 if response: 62 snapshot_info = response.json() 63 return { 64 "bq_project": snapshot_info["accessInformation"]["bigQuery"]["projectId"], 65 "bq_schema": snapshot_info["accessInformation"]["bigQuery"]["datasetName"], 66 "tables": snapshot_info["tables"], 67 "relationships": snapshot_info["relationships"] 68 } 69 return None 70 71 def run(self) -> Optional[dict]: 72 """ 73 Execute the process to retrieve either dataset or snapshot information. 74 75 **Returns:** 76 - dict (optional): A dictionary containing the relevant information based on whether `dataset_id` or 77 `snapshot_id` is provided. Returns None if the snapshot is not found or access is denied. 78 """ 79 if self.dataset_id: 80 return self._get_dataset_info() 81 return self._get_snapshot_info() 82 83 84class TdrBq: 85 """Class to interact with TDR BigQuery tables.""" 86 87 def __init__(self, project_id: str, bq_schema: str): 88 """ 89 Initialize the TdrBq class. 90 91 **Args:** 92 - project_id (str): The Google Cloud project ID. 93 - bq_schema (str): The BigQuery schema name. 94 """ 95 self.project_id = project_id 96 """@private""" 97 self.bq_schema = bq_schema 98 """@private""" 99 self.bq_util = BigQueryUtil(project_id) 100 """@private""" 101 102 def check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool: 103 """ 104 Check the permissions for accessing BigQuery for specific dataset. 105 106 **Args:** 107 - raise_on_other_failure (bool): Whether to raise an exception on other failures. 108 109 **Returns:** 110 - bool: `True` if permissions are sufficient, `False` otherwise. 111 """ 112 query = f"""SELECT 1 FROM `{self.project_id}.{self.bq_schema}.INFORMATION_SCHEMA.TABLES`""" 113 return self.bq_util.check_permissions_for_query( 114 query=query, 115 raise_on_other_failure=raise_on_other_failure 116 ) 117 118 def get_tdr_table_contents(self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any: 119 """ 120 Retrieve the contents of a TDR table from BigQuery. 121 122 **Args:** 123 - exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column. 124 - to_dataframe (bool): Whether to return the results as a DataFrame. 125 126 **Returns:** 127 - list[dict]: The contents of the table 128 """ 129 if exclude_datarepo_id: 130 exclude_str = "EXCEPT (datarepo_row_id)" 131 else: 132 exclude_str = "" 133 query = f"""SELECT * {exclude_str} FROM `{self.project_id}.{self.bq_schema}.{table_name}`""" 134 logging.info(f"Getting contents of table {table_name} from BQ") 135 return self.bq_util.query_table(query=query, to_dataframe=to_dataframe)
class
GetTdrAssetInfo:
11class GetTdrAssetInfo: 12 """Class to obtain TDR asset metadata (from dataset or snapshot) from BigQuery.""" 13 14 def __init__(self, tdr: TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None): 15 """ 16 Initialize the GetTdrAssetInfo class. 17 18 **Args:** 19 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API. 20 - dataset_id (str, optional): ID of the dataset. 21 - snapshot_id (str, optional): ID of the snapshot. 22 """ 23 if not dataset_id and not snapshot_id: 24 raise ValueError("Either dataset_id or snapshot_id must be provided.") 25 self.tdr = tdr 26 """@private""" 27 self.dataset_id = dataset_id 28 """@private""" 29 self.snapshot_id = snapshot_id 30 """@private""" 31 32 def _get_dataset_info(self) -> dict: 33 """ 34 Retrieve dataset information from TDR. 35 36 **Returns:** 37 - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships. 38 """ 39 dataset_info = self.tdr.get_dataset_info( 40 dataset_id=self.dataset_id, # type: ignore[arg-type] 41 info_to_include=["SCHEMA", "ACCESS_INFORMATION"] 42 ).json() 43 return { 44 "bq_project": dataset_info["accessInformation"]["bigQuery"]["projectId"], 45 "bq_schema": dataset_info["accessInformation"]["bigQuery"]["datasetName"], 46 "tables": dataset_info["schema"]["tables"], 47 "relationships": dataset_info["schema"]["relationships"] 48 } 49 50 def _get_snapshot_info(self) -> Optional[dict]: 51 """ 52 Retrieve snapshot information from TDR. 53 54 **Returns:** 55 - dict: A dictionary containing BigQuery project ID, schema, tables, and relationships. 56 Returns None if the snapshot is not found or access is denied. 57 """ 58 response = self.tdr.get_snapshot_info( 59 snapshot_id=self.snapshot_id, # type: ignore[arg-type] 60 info_to_include=["TABLES", "RELATIONSHIPS", "ACCESS_INFORMATION"] 61 ) 62 if response: 63 snapshot_info = response.json() 64 return { 65 "bq_project": snapshot_info["accessInformation"]["bigQuery"]["projectId"], 66 "bq_schema": snapshot_info["accessInformation"]["bigQuery"]["datasetName"], 67 "tables": snapshot_info["tables"], 68 "relationships": snapshot_info["relationships"] 69 } 70 return None 71 72 def run(self) -> Optional[dict]: 73 """ 74 Execute the process to retrieve either dataset or snapshot information. 75 76 **Returns:** 77 - dict (optional): A dictionary containing the relevant information based on whether `dataset_id` or 78 `snapshot_id` is provided. Returns None if the snapshot is not found or access is denied. 79 """ 80 if self.dataset_id: 81 return self._get_dataset_info() 82 return self._get_snapshot_info()
Class to obtain TDR asset metadata (from dataset or snapshot) from BigQuery.
GetTdrAssetInfo( tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None)
14 def __init__(self, tdr: TDR, dataset_id: Optional[str] = None, snapshot_id: Optional[str] = None): 15 """ 16 Initialize the GetTdrAssetInfo class. 17 18 **Args:** 19 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API. 20 - dataset_id (str, optional): ID of the dataset. 21 - snapshot_id (str, optional): ID of the snapshot. 22 """ 23 if not dataset_id and not snapshot_id: 24 raise ValueError("Either dataset_id or snapshot_id must be provided.") 25 self.tdr = tdr 26 """@private""" 27 self.dataset_id = dataset_id 28 """@private""" 29 self.snapshot_id = snapshot_id 30 """@private"""
Initialize the GetTdrAssetInfo class.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): TDR instance for interacting with the TDR API. - dataset_id (str, optional): ID of the dataset.
- snapshot_id (str, optional): ID of the snapshot.
def
run(self) -> Optional[dict]:
72 def run(self) -> Optional[dict]: 73 """ 74 Execute the process to retrieve either dataset or snapshot information. 75 76 **Returns:** 77 - dict (optional): A dictionary containing the relevant information based on whether `dataset_id` or 78 `snapshot_id` is provided. Returns None if the snapshot is not found or access is denied. 79 """ 80 if self.dataset_id: 81 return self._get_dataset_info() 82 return self._get_snapshot_info()
Execute the process to retrieve either dataset or snapshot information.
Returns:
- dict (optional): A dictionary containing the relevant information based on whether
dataset_id
orsnapshot_id
is provided. Returns None if the snapshot is not found or access is denied.
class
TdrBq:
85class TdrBq: 86 """Class to interact with TDR BigQuery tables.""" 87 88 def __init__(self, project_id: str, bq_schema: str): 89 """ 90 Initialize the TdrBq class. 91 92 **Args:** 93 - project_id (str): The Google Cloud project ID. 94 - bq_schema (str): The BigQuery schema name. 95 """ 96 self.project_id = project_id 97 """@private""" 98 self.bq_schema = bq_schema 99 """@private""" 100 self.bq_util = BigQueryUtil(project_id) 101 """@private""" 102 103 def check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool: 104 """ 105 Check the permissions for accessing BigQuery for specific dataset. 106 107 **Args:** 108 - raise_on_other_failure (bool): Whether to raise an exception on other failures. 109 110 **Returns:** 111 - bool: `True` if permissions are sufficient, `False` otherwise. 112 """ 113 query = f"""SELECT 1 FROM `{self.project_id}.{self.bq_schema}.INFORMATION_SCHEMA.TABLES`""" 114 return self.bq_util.check_permissions_for_query( 115 query=query, 116 raise_on_other_failure=raise_on_other_failure 117 ) 118 119 def get_tdr_table_contents(self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any: 120 """ 121 Retrieve the contents of a TDR table from BigQuery. 122 123 **Args:** 124 - exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column. 125 - to_dataframe (bool): Whether to return the results as a DataFrame. 126 127 **Returns:** 128 - list[dict]: The contents of the table 129 """ 130 if exclude_datarepo_id: 131 exclude_str = "EXCEPT (datarepo_row_id)" 132 else: 133 exclude_str = "" 134 query = f"""SELECT * {exclude_str} FROM `{self.project_id}.{self.bq_schema}.{table_name}`""" 135 logging.info(f"Getting contents of table {table_name} from BQ") 136 return self.bq_util.query_table(query=query, to_dataframe=to_dataframe)
Class to interact with TDR BigQuery tables.
TdrBq(project_id: str, bq_schema: str)
88 def __init__(self, project_id: str, bq_schema: str): 89 """ 90 Initialize the TdrBq class. 91 92 **Args:** 93 - project_id (str): The Google Cloud project ID. 94 - bq_schema (str): The BigQuery schema name. 95 """ 96 self.project_id = project_id 97 """@private""" 98 self.bq_schema = bq_schema 99 """@private""" 100 self.bq_util = BigQueryUtil(project_id) 101 """@private"""
Initialize the TdrBq class.
Args:
- project_id (str): The Google Cloud project ID.
- bq_schema (str): The BigQuery schema name.
def
check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool:
103 def check_permissions_for_dataset(self, raise_on_other_failure: bool) -> bool: 104 """ 105 Check the permissions for accessing BigQuery for specific dataset. 106 107 **Args:** 108 - raise_on_other_failure (bool): Whether to raise an exception on other failures. 109 110 **Returns:** 111 - bool: `True` if permissions are sufficient, `False` otherwise. 112 """ 113 query = f"""SELECT 1 FROM `{self.project_id}.{self.bq_schema}.INFORMATION_SCHEMA.TABLES`""" 114 return self.bq_util.check_permissions_for_query( 115 query=query, 116 raise_on_other_failure=raise_on_other_failure 117 )
Check the permissions for accessing BigQuery for specific dataset.
Args:
- raise_on_other_failure (bool): Whether to raise an exception on other failures.
Returns:
- bool:
True
if permissions are sufficient,False
otherwise.
def
get_tdr_table_contents( self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any:
119 def get_tdr_table_contents(self, exclude_datarepo_id: bool, table_name: str, to_dataframe: bool) -> Any: 120 """ 121 Retrieve the contents of a TDR table from BigQuery. 122 123 **Args:** 124 - exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column. 125 - to_dataframe (bool): Whether to return the results as a DataFrame. 126 127 **Returns:** 128 - list[dict]: The contents of the table 129 """ 130 if exclude_datarepo_id: 131 exclude_str = "EXCEPT (datarepo_row_id)" 132 else: 133 exclude_str = "" 134 query = f"""SELECT * {exclude_str} FROM `{self.project_id}.{self.bq_schema}.{table_name}`""" 135 logging.info(f"Getting contents of table {table_name} from BQ") 136 return self.bq_util.query_table(query=query, to_dataframe=to_dataframe)
Retrieve the contents of a TDR table from BigQuery.
Args:
- exclude_datarepo_id (bool): Whether to exclude the datarepo_row_id column.
- to_dataframe (bool): Whether to return the results as a DataFrame.
Returns:
- list[dict]: The contents of the table