ops_utils.bq_utils

Module for BigQuery operations.

  1"""Module for BigQuery operations."""
  2import logging
  3from google.cloud import bigquery
  4from google.api_core.exceptions import Forbidden
  5from typing import Optional
  6
  7
  8class BigQueryUtil:
  9    """Class to interact with Google BigQuery."""
 10
 11    def __init__(self, project_id: Optional[str] = None):
 12        """
 13        Initialize the BigQuery utility with user's authentication.
 14
 15        **Args:**
 16        - project_id (str, optional): The GCP project ID. If provided, the GCP client
 17         will be initialized using the project ID.
 18        """
 19        self.project_id = project_id
 20        """@private"""
 21        if project_id:
 22            self.client = bigquery.Client(project=self.project_id)
 23        else:
 24            self.client = bigquery.Client()
 25
 26    def _delete_existing_records(self, table_id: str) -> None:
 27        """
 28        Delete all records from a BigQuery table.
 29
 30        Args:
 31            table_id (str): BigQuery table ID in the format 'project.dataset.table'.
 32        """
 33        delete_query = f"DELETE FROM `{table_id}` WHERE TRUE"
 34        query_job = self.client.query(delete_query)
 35        results = query_job.result()
 36        n_rows_deleted = len([row for row in results])
 37        logging.info(f"Deleted {n_rows_deleted} records from table {table_id}")
 38
 39    def upload_data_to_table(self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None:
 40        """
 41        Upload data directly from the provided list of dictionaries to a BigQuery table.
 42
 43        **Args:**
 44
 45        - table_id (`str`): BigQuery table ID in the format `project.dataset.table`.
 46        - rows (`list[dict]`): List of dictionaries, where each dictionary represents a
 47        row of data.
 48        - delete_existing_data (`bool`): If `True`, deletes existing data in the table before
 49         uploading. Default is `False`.
 50        """
 51        if delete_existing_data:
 52            self._delete_existing_records(table_id)
 53
 54        # Get the BigQuery table reference
 55        destination_table = self.client.get_table(table_id)
 56        previous_rows = destination_table.num_rows
 57        logging.info(f"Currently {previous_rows} rows in {table_id} before upload")
 58
 59        # Insert rows from the list of dictionaries
 60        errors = self.client.insert_rows_json(table_id, rows)
 61
 62        if errors:
 63            logging.error(f"Encountered errors while inserting rows: {errors}")
 64        else:
 65            logging.info(f"Successfully inserted {len(rows)} rows into {table_id}")
 66
 67        # Get new row count for confirmation
 68        destination_table = self.client.get_table(table_id)
 69        new_rows = destination_table.num_rows
 70        logging.info(f"Table now contains {new_rows} rows after upload")
 71
 72    def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]:
 73        """
 74        Execute a SQL query on a BigQuery table and returns the results.
 75
 76        **Args:**
 77        - query (str): The SQL query to execute.
 78        - to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False.
 79
 80        **Returns:**
 81        - list[dict]: List of dictionaries, where each dictionary represents one row of query results.
 82        """
 83        query_job = self.client.query(query)
 84        if to_dataframe:
 85            return query_job.result().to_dataframe()
 86        return [row for row in query_job.result()]
 87
 88    def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool:
 89        """
 90        Check if the user has permission to access the project.
 91
 92        **Args:**
 93        - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
 94
 95        **Returns:**
 96        - bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
 97        """
 98        return self._check_permissions("SELECT 1", raise_on_other_failure)
 99
100    def check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool:
101        """
102        Check if the user has permission to run a specific query.
103
104        **Args:**
105        - query (str): SQL query to execute.
106        - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
107
108        **Returns:**
109        - bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
110        """
111        return self._check_permissions(query, raise_on_other_failure)
112
113    def _check_permissions(self, qry: str, raise_on_other_failure: bool = True) -> bool:
114        """
115        Check if the user has permission to run queries and access the project.
116
117        Args:
118            raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
119
120        Returns:
121            bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
122        """
123        try:
124            # A simple query that should succeed if the user has permissions
125            query = qry
126            self.client.query(query).result()  # Run a lightweight query
127            return True
128        except Forbidden:
129            logging.warning("403 Permission Denied")
130            return False
131        except Exception as e:
132            logging.error(f"Unexpected error when trying to check permissions for project {self.project_id}. {e}")
133            if raise_on_other_failure:
134                logging.error("Raising error because raise_on_other_failure is set to True")
135                raise e
136            else:
137                logging.error("Continuing execution because raise_on_other_failure is set to False.")
138                return False
class BigQueryUtil:
  9class BigQueryUtil:
 10    """Class to interact with Google BigQuery."""
 11
 12    def __init__(self, project_id: Optional[str] = None):
 13        """
 14        Initialize the BigQuery utility with user's authentication.
 15
 16        **Args:**
 17        - project_id (str, optional): The GCP project ID. If provided, the GCP client
 18         will be initialized using the project ID.
 19        """
 20        self.project_id = project_id
 21        """@private"""
 22        if project_id:
 23            self.client = bigquery.Client(project=self.project_id)
 24        else:
 25            self.client = bigquery.Client()
 26
 27    def _delete_existing_records(self, table_id: str) -> None:
 28        """
 29        Delete all records from a BigQuery table.
 30
 31        Args:
 32            table_id (str): BigQuery table ID in the format 'project.dataset.table'.
 33        """
 34        delete_query = f"DELETE FROM `{table_id}` WHERE TRUE"
 35        query_job = self.client.query(delete_query)
 36        results = query_job.result()
 37        n_rows_deleted = len([row for row in results])
 38        logging.info(f"Deleted {n_rows_deleted} records from table {table_id}")
 39
 40    def upload_data_to_table(self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None:
 41        """
 42        Upload data directly from the provided list of dictionaries to a BigQuery table.
 43
 44        **Args:**
 45
 46        - table_id (`str`): BigQuery table ID in the format `project.dataset.table`.
 47        - rows (`list[dict]`): List of dictionaries, where each dictionary represents a
 48        row of data.
 49        - delete_existing_data (`bool`): If `True`, deletes existing data in the table before
 50         uploading. Default is `False`.
 51        """
 52        if delete_existing_data:
 53            self._delete_existing_records(table_id)
 54
 55        # Get the BigQuery table reference
 56        destination_table = self.client.get_table(table_id)
 57        previous_rows = destination_table.num_rows
 58        logging.info(f"Currently {previous_rows} rows in {table_id} before upload")
 59
 60        # Insert rows from the list of dictionaries
 61        errors = self.client.insert_rows_json(table_id, rows)
 62
 63        if errors:
 64            logging.error(f"Encountered errors while inserting rows: {errors}")
 65        else:
 66            logging.info(f"Successfully inserted {len(rows)} rows into {table_id}")
 67
 68        # Get new row count for confirmation
 69        destination_table = self.client.get_table(table_id)
 70        new_rows = destination_table.num_rows
 71        logging.info(f"Table now contains {new_rows} rows after upload")
 72
 73    def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]:
 74        """
 75        Execute a SQL query on a BigQuery table and returns the results.
 76
 77        **Args:**
 78        - query (str): The SQL query to execute.
 79        - to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False.
 80
 81        **Returns:**
 82        - list[dict]: List of dictionaries, where each dictionary represents one row of query results.
 83        """
 84        query_job = self.client.query(query)
 85        if to_dataframe:
 86            return query_job.result().to_dataframe()
 87        return [row for row in query_job.result()]
 88
 89    def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool:
 90        """
 91        Check if the user has permission to access the project.
 92
 93        **Args:**
 94        - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
 95
 96        **Returns:**
 97        - bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
 98        """
 99        return self._check_permissions("SELECT 1", raise_on_other_failure)
100
101    def check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool:
102        """
103        Check if the user has permission to run a specific query.
104
105        **Args:**
106        - query (str): SQL query to execute.
107        - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
108
109        **Returns:**
110        - bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
111        """
112        return self._check_permissions(query, raise_on_other_failure)
113
114    def _check_permissions(self, qry: str, raise_on_other_failure: bool = True) -> bool:
115        """
116        Check if the user has permission to run queries and access the project.
117
118        Args:
119            raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
120
121        Returns:
122            bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
123        """
124        try:
125            # A simple query that should succeed if the user has permissions
126            query = qry
127            self.client.query(query).result()  # Run a lightweight query
128            return True
129        except Forbidden:
130            logging.warning("403 Permission Denied")
131            return False
132        except Exception as e:
133            logging.error(f"Unexpected error when trying to check permissions for project {self.project_id}. {e}")
134            if raise_on_other_failure:
135                logging.error("Raising error because raise_on_other_failure is set to True")
136                raise e
137            else:
138                logging.error("Continuing execution because raise_on_other_failure is set to False.")
139                return False

Class to interact with Google BigQuery.

BigQueryUtil(project_id: Optional[str] = None)
12    def __init__(self, project_id: Optional[str] = None):
13        """
14        Initialize the BigQuery utility with user's authentication.
15
16        **Args:**
17        - project_id (str, optional): The GCP project ID. If provided, the GCP client
18         will be initialized using the project ID.
19        """
20        self.project_id = project_id
21        """@private"""
22        if project_id:
23            self.client = bigquery.Client(project=self.project_id)
24        else:
25            self.client = bigquery.Client()

Initialize the BigQuery utility with user's authentication.

Args:

  • project_id (str, optional): The GCP project ID. If provided, the GCP client will be initialized using the project ID.
def upload_data_to_table( self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None:
40    def upload_data_to_table(self, table_id: str, rows: list[dict], delete_existing_data: bool = False) -> None:
41        """
42        Upload data directly from the provided list of dictionaries to a BigQuery table.
43
44        **Args:**
45
46        - table_id (`str`): BigQuery table ID in the format `project.dataset.table`.
47        - rows (`list[dict]`): List of dictionaries, where each dictionary represents a
48        row of data.
49        - delete_existing_data (`bool`): If `True`, deletes existing data in the table before
50         uploading. Default is `False`.
51        """
52        if delete_existing_data:
53            self._delete_existing_records(table_id)
54
55        # Get the BigQuery table reference
56        destination_table = self.client.get_table(table_id)
57        previous_rows = destination_table.num_rows
58        logging.info(f"Currently {previous_rows} rows in {table_id} before upload")
59
60        # Insert rows from the list of dictionaries
61        errors = self.client.insert_rows_json(table_id, rows)
62
63        if errors:
64            logging.error(f"Encountered errors while inserting rows: {errors}")
65        else:
66            logging.info(f"Successfully inserted {len(rows)} rows into {table_id}")
67
68        # Get new row count for confirmation
69        destination_table = self.client.get_table(table_id)
70        new_rows = destination_table.num_rows
71        logging.info(f"Table now contains {new_rows} rows after upload")

Upload data directly from the provided list of dictionaries to a BigQuery table.

Args:

  • table_id (str): BigQuery table ID in the format project.dataset.table.
  • rows (list[dict]): List of dictionaries, where each dictionary represents a row of data.
  • delete_existing_data (bool): If True, deletes existing data in the table before uploading. Default is False.
def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]:
73    def query_table(self, query: str, to_dataframe: bool = False) -> list[dict]:
74        """
75        Execute a SQL query on a BigQuery table and returns the results.
76
77        **Args:**
78        - query (str): The SQL query to execute.
79        - to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False.
80
81        **Returns:**
82        - list[dict]: List of dictionaries, where each dictionary represents one row of query results.
83        """
84        query_job = self.client.query(query)
85        if to_dataframe:
86            return query_job.result().to_dataframe()
87        return [row for row in query_job.result()]

Execute a SQL query on a BigQuery table and returns the results.

Args:

  • query (str): The SQL query to execute.
  • to_dataframe (bool): If True, returns the query results as a Pandas DataFrame. Default is False.

Returns:

  • list[dict]: List of dictionaries, where each dictionary represents one row of query results.
def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool:
89    def check_permissions_to_project(self, raise_on_other_failure: bool = True) -> bool:
90        """
91        Check if the user has permission to access the project.
92
93        **Args:**
94        - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
95
96        **Returns:**
97        - bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
98        """
99        return self._check_permissions("SELECT 1", raise_on_other_failure)

Check if the user has permission to access the project.

Args:

  • raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.

Returns:

  • bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
def check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool:
101    def check_permissions_for_query(self, query: str, raise_on_other_failure: bool = True) -> bool:
102        """
103        Check if the user has permission to run a specific query.
104
105        **Args:**
106        - query (str): SQL query to execute.
107        - raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.
108
109        **Returns:**
110        - bool: True if the user has permissions, False if a 403 Forbidden error is encountered.
111        """
112        return self._check_permissions(query, raise_on_other_failure)

Check if the user has permission to run a specific query.

Args:

  • query (str): SQL query to execute.
  • raise_on_other_failure (bool): If True, raises an error if an unexpected error occurs. Default is True.

Returns:

  • bool: True if the user has permissions, False if a 403 Forbidden error is encountered.