ops_utils.bq_utils
Module for BigQuery operations.
1"""Module for BigQuery operations.""" 2import logging 3from google.cloud import bigquery 4from google.api_core.exceptions import Forbidden 5from typing import Optional 6 7 8class BigQueryUtil: 9 """Class to interact with Google BigQuery.""" 10 11 def __init__(self, project_id: Optional[str] = None): 12 """ 13 Initialize the BigQuery utility with user's authentication. 14 15 **Args:** 16 - project_id (str, optional): The GCP project ID. If provided, the GCP client 17 will be initialized using the project ID. 18 """ 19 self.project_id = project_id 20 """@private""" 21 if project_id: 22 self.client = bigquery.Client(project=self.project_id) 23 else: 24 self.client = bigquery.Client() 25 26 def _delete_existing_records(self, table_id: str) -> None: 27 """ 28 Delete all records from a BigQuery table. 29 30 Args: 31 table_id (str): BigQuery table ID in the format 'project.dataset.table'. 32 """ 33 delete_query = f"DELETE FROM `{table_id}` WHERE TRUE" 34 query_job = self.client.query(delete_query) 35 results = query_job.result() 36 n_rows_deleted = len([row for row in results]) 37 logging.info(f"Deleted {n_rows_deleted} records from table {table_id}") 38 39 def upload_data_to_table(self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None: 40 """ 41 Upload data directly from the provided list of dictionaries to a BigQuery table. 42 43 **Args:** 44 45 - table_id (`str`): BigQuery table ID in the format `project.dataset.table`. 46 - rows (`list[dict]`): List of dictionaries, where each dictionary represents a 47 row of data. 48 - delete_existing_data (`bool`): If `True`, deletes existing data in the table before 49 uploading. Default is `False`. 50 """ 51 if delete_existing_data: 52 self._delete_existing_records(table_id) 53 54 # Get the BigQuery table reference 55 destination_table = self.client.get_table(table_id) 56 previous_rows = destination_table.num_rows 57 logging.info(f"Currently {previous_rows} rows in {table_id} before upload") 58 59 # Insert rows from the list of dictionaries 60 errors = self.client.insert_rows_json(table_id, rows) 61 62 if errors: 63 logging.error(f"Encountered errors while inserting rows: {errors}") 64 else: 65 logging.info(f"Successfully inserted {len(rows)} rows into {table_id}") 66 67 # Get new row count for confirmation 68 destination_table = self.client.get_table(table_id) 69 new_rows = destination_table.num_rows 70 logging.info(f"Table now contains {new_rows} rows after upload") 71 72 def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]: 73 """ 74 Execute a SQL query on a BigQuery table and returns the results. 75 76 **Args:** 77 - query (str): The SQL query to execute. 78 - to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False. 79 80 **Returns:** 81 - list[dict]: List of dictionaries, where each dictionary represents one row of query results. 82 """ 83 query_job = self.client.query(query) 84 if to_dataframe: 85 return query_job.result().to_dataframe() 86 return [row for row in query_job.result()] 87 88 def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool: 89 """ 90 Check if the user has permission to access the project. 91 92 **Args:** 93 - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 94 95 **Returns:** 96 - bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 97 """ 98 return self._check_permissions("SELECT 1", raise_on_other_failure) 99 100 def check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool: 101 """ 102 Check if the user has permission to run a specific query. 103 104 **Args:** 105 - query (str): SQL query to execute. 106 - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 107 108 **Returns:** 109 - bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 110 """ 111 return self._check_permissions(query, raise_on_other_failure) 112 113 def _check_permissions(self, qry: str, raise_on_other_failure: bool = True) -> bool: 114 """ 115 Check if the user has permission to run queries and access the project. 116 117 Args: 118 raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 119 120 Returns: 121 bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 122 """ 123 try: 124 # A simple query that should succeed if the user has permissions 125 query = qry 126 self.client.query(query).result() # Run a lightweight query 127 return True 128 except Forbidden: 129 logging.warning("403 Permission Denied") 130 return False 131 except Exception as e: 132 logging.error(f"Unexpected error when trying to check permissions for project {self.project_id}. {e}") 133 if raise_on_other_failure: 134 logging.error("Raising error because raise_on_other_failure is set to True") 135 raise e 136 else: 137 logging.error("Continuing execution because raise_on_other_failure is set to False.") 138 return False
class
BigQueryUtil:
9class BigQueryUtil: 10 """Class to interact with Google BigQuery.""" 11 12 def __init__(self, project_id: Optional[str] = None): 13 """ 14 Initialize the BigQuery utility with user's authentication. 15 16 **Args:** 17 - project_id (str, optional): The GCP project ID. If provided, the GCP client 18 will be initialized using the project ID. 19 """ 20 self.project_id = project_id 21 """@private""" 22 if project_id: 23 self.client = bigquery.Client(project=self.project_id) 24 else: 25 self.client = bigquery.Client() 26 27 def _delete_existing_records(self, table_id: str) -> None: 28 """ 29 Delete all records from a BigQuery table. 30 31 Args: 32 table_id (str): BigQuery table ID in the format 'project.dataset.table'. 33 """ 34 delete_query = f"DELETE FROM `{table_id}` WHERE TRUE" 35 query_job = self.client.query(delete_query) 36 results = query_job.result() 37 n_rows_deleted = len([row for row in results]) 38 logging.info(f"Deleted {n_rows_deleted} records from table {table_id}") 39 40 def upload_data_to_table(self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None: 41 """ 42 Upload data directly from the provided list of dictionaries to a BigQuery table. 43 44 **Args:** 45 46 - table_id (`str`): BigQuery table ID in the format `project.dataset.table`. 47 - rows (`list[dict]`): List of dictionaries, where each dictionary represents a 48 row of data. 49 - delete_existing_data (`bool`): If `True`, deletes existing data in the table before 50 uploading. Default is `False`. 51 """ 52 if delete_existing_data: 53 self._delete_existing_records(table_id) 54 55 # Get the BigQuery table reference 56 destination_table = self.client.get_table(table_id) 57 previous_rows = destination_table.num_rows 58 logging.info(f"Currently {previous_rows} rows in {table_id} before upload") 59 60 # Insert rows from the list of dictionaries 61 errors = self.client.insert_rows_json(table_id, rows) 62 63 if errors: 64 logging.error(f"Encountered errors while inserting rows: {errors}") 65 else: 66 logging.info(f"Successfully inserted {len(rows)} rows into {table_id}") 67 68 # Get new row count for confirmation 69 destination_table = self.client.get_table(table_id) 70 new_rows = destination_table.num_rows 71 logging.info(f"Table now contains {new_rows} rows after upload") 72 73 def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]: 74 """ 75 Execute a SQL query on a BigQuery table and returns the results. 76 77 **Args:** 78 - query (str): The SQL query to execute. 79 - to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False. 80 81 **Returns:** 82 - list[dict]: List of dictionaries, where each dictionary represents one row of query results. 83 """ 84 query_job = self.client.query(query) 85 if to_dataframe: 86 return query_job.result().to_dataframe() 87 return [row for row in query_job.result()] 88 89 def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool: 90 """ 91 Check if the user has permission to access the project. 92 93 **Args:** 94 - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 95 96 **Returns:** 97 - bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 98 """ 99 return self._check_permissions("SELECT 1", raise_on_other_failure) 100 101 def check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool: 102 """ 103 Check if the user has permission to run a specific query. 104 105 **Args:** 106 - query (str): SQL query to execute. 107 - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 108 109 **Returns:** 110 - bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 111 """ 112 return self._check_permissions(query, raise_on_other_failure) 113 114 def _check_permissions(self, qry: str, raise_on_other_failure: bool = True) -> bool: 115 """ 116 Check if the user has permission to run queries and access the project. 117 118 Args: 119 raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 120 121 Returns: 122 bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 123 """ 124 try: 125 # A simple query that should succeed if the user has permissions 126 query = qry 127 self.client.query(query).result() # Run a lightweight query 128 return True 129 except Forbidden: 130 logging.warning("403 Permission Denied") 131 return False 132 except Exception as e: 133 logging.error(f"Unexpected error when trying to check permissions for project {self.project_id}. {e}") 134 if raise_on_other_failure: 135 logging.error("Raising error because raise_on_other_failure is set to True") 136 raise e 137 else: 138 logging.error("Continuing execution because raise_on_other_failure is set to False.") 139 return False
Class to interact with Google BigQuery.
BigQueryUtil(project_id: Optional[str] = None)
12 def __init__(self, project_id: Optional[str] = None): 13 """ 14 Initialize the BigQuery utility with user's authentication. 15 16 **Args:** 17 - project_id (str, optional): The GCP project ID. If provided, the GCP client 18 will be initialized using the project ID. 19 """ 20 self.project_id = project_id 21 """@private""" 22 if project_id: 23 self.client = bigquery.Client(project=self.project_id) 24 else: 25 self.client = bigquery.Client()
Initialize the BigQuery utility with user's authentication.
Args:
- project_id (str, optional): The GCP project ID. If provided, the GCP client will be initialized using the project ID.
def
upload_data_to_table( self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None:
40 def upload_data_to_table(self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None: 41 """ 42 Upload data directly from the provided list of dictionaries to a BigQuery table. 43 44 **Args:** 45 46 - table_id (`str`): BigQuery table ID in the format `project.dataset.table`. 47 - rows (`list[dict]`): List of dictionaries, where each dictionary represents a 48 row of data. 49 - delete_existing_data (`bool`): If `True`, deletes existing data in the table before 50 uploading. Default is `False`. 51 """ 52 if delete_existing_data: 53 self._delete_existing_records(table_id) 54 55 # Get the BigQuery table reference 56 destination_table = self.client.get_table(table_id) 57 previous_rows = destination_table.num_rows 58 logging.info(f"Currently {previous_rows} rows in {table_id} before upload") 59 60 # Insert rows from the list of dictionaries 61 errors = self.client.insert_rows_json(table_id, rows) 62 63 if errors: 64 logging.error(f"Encountered errors while inserting rows: {errors}") 65 else: 66 logging.info(f"Successfully inserted {len(rows)} rows into {table_id}") 67 68 # Get new row count for confirmation 69 destination_table = self.client.get_table(table_id) 70 new_rows = destination_table.num_rows 71 logging.info(f"Table now contains {new_rows} rows after upload")
Upload data directly from the provided list of dictionaries to a BigQuery table.
Args:
- table_id (
str
): BigQuery table ID in the formatproject.dataset.table
. - rows (
list[dict]
): List of dictionaries, where each dictionary represents a row of data. - delete_existing_data (
bool
): IfTrue
, deletes existing data in the table before uploading. Default isFalse
.
def
query_table(self, query: str, to_dataframe: bool = False) -> list[dict]:
73 def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]: 74 """ 75 Execute a SQL query on a BigQuery table and returns the results. 76 77 **Args:** 78 - query (str): The SQL query to execute. 79 - to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False. 80 81 **Returns:** 82 - list[dict]: List of dictionaries, where each dictionary represents one row of query results. 83 """ 84 query_job = self.client.query(query) 85 if to_dataframe: 86 return query_job.result().to_dataframe() 87 return [row for row in query_job.result()]
Execute a SQL query on a BigQuery table and returns the results.
Args:
- query (str): The SQL query to execute.
- to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False.
Returns:
- list[dict]: List of dictionaries, where each dictionary represents one row of query results.
def
check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool:
89 def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool: 90 """ 91 Check if the user has permission to access the project. 92 93 **Args:** 94 - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 95 96 **Returns:** 97 - bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 98 """ 99 return self._check_permissions("SELECT 1", raise_on_other_failure)
Check if the user has permission to access the project.
Args:
- raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
Returns:
- bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
def
check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool:
101 def check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool: 102 """ 103 Check if the user has permission to run a specific query. 104 105 **Args:** 106 - query (str): SQL query to execute. 107 - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True. 108 109 **Returns:** 110 - bool: True if the user has permissions, False if a 403 Forbidden error is encountered. 111 """ 112 return self._check_permissions(query, raise_on_other_failure)
Check if the user has permission to run a specific query.
Args:
- query (str): SQL query to execute.
- raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
Returns:
- bool: True if the user has permissions, False if a 403 Forbidden error is encountered.