ops_utils.tdr_utils.tdr_table_utils

Module for interacting with TDR tables.

  1"""Module for interacting with TDR tables."""
  2import json
  3import logging
  4import sys
  5
  6from ..tdr_utils.tdr_api_utils import TDR
  7from ..tdr_utils.tdr_schema_utils import InferTDRSchema
  8
  9
 10class SetUpTDRTables:
 11    """A class to set up TDR tables by comparing and updating schemas."""
 12
 13    def __init__(
 14            self,
 15            tdr: TDR,
 16            dataset_id: str,
 17            table_info_dict: dict,
 18            all_fields_non_required: bool = False,
 19            force_disparate_rows_to_string: bool = False,
 20            ignore_existing_schema_mismatch: bool = False
 21    ):
 22        """
 23        Initialize the SetUpTDRTables class.
 24
 25        **Args:**
 26        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 27        - dataset_id (str): The ID of the dataset.
 28        - table_info_dict (dict): A dictionary containing table information.
 29        - all_fields_non_required (bool): A boolean indicating whether all columns are non-required.
 30        Defaults to `False`.
 31        - force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to
 32                string. Defaults to `False`.
 33        - ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not
 34                matching existing schema. Defaults to `False`.
 35        """
 36        self.tdr = tdr
 37        """@private"""
 38        self.dataset_id = dataset_id
 39        """@private"""
 40        self.table_info_dict = table_info_dict
 41        """@private"""
 42        self.all_fields_non_required = all_fields_non_required
 43        """@private"""
 44        self.force_disparate_rows_to_string = force_disparate_rows_to_string
 45        """@private"""
 46        self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch
 47        """@private"""
 48
 49    @staticmethod
 50    def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dict], table_name: str) -> list[dict]:
 51        """
 52        Compare tables between two datasets.
 53
 54        Args:
 55            reference_dataset_table (dict): The reference dataset table schema.
 56            target_dataset_table (list[dict]): The target dataset table schema.
 57            table_name (str): The name of the table being compared.
 58
 59        Returns:
 60            list[dict]: A list of columns that need to be updated.
 61        """
 62        logging.info(f"Comparing table {reference_dataset_table['name']} to existing target table")
 63        columns_to_update = []
 64        # Convert target table to dict for easier comparison
 65        target_dataset_table_dict = {col["name"]: col for col in target_dataset_table}
 66        # Go through each column in reference table and see if it exists and if so, is it the same in target table
 67        for column_dict in reference_dataset_table["columns"]:
 68            # Check if column exists in target table already
 69            if column_dict["name"] not in target_dataset_table_dict.keys():
 70                column_dict["action"] = "add"
 71                columns_to_update.append(column_dict)
 72            else:
 73                # Check if column exists but is not set up the same
 74                if column_dict != target_dataset_table_dict[column_dict["name"]]:
 75                    column_dict["action"] = "modify"
 76                    logging.warning(
 77                        f'Column {column_dict["name"]} in table {table_name} does not match. Expected column info:\n'
 78                        f'{json.dumps(column_dict, indent=4)}\nexisting column info:\n'
 79                        f'{json.dumps(target_dataset_table_dict[column_dict["name"]], indent=4)}'
 80                    )
 81                    columns_to_update.append(column_dict)
 82        return columns_to_update
 83
 84    def run(self) -> dict:
 85        """
 86        Run the setup process to ensure tables are created or updated as needed.
 87
 88        **Returns:**
 89        - dict: A dictionary with table names as keys and column information as values.
 90        """
 91        dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json()
 92        existing_tdr_table_schema_info = {
 93            table_dict["name"]: table_dict["columns"]
 94            for table_dict in dataset_info["schema"]["tables"]
 95        }
 96        tables_to_create = []
 97        valid = True
 98        # Loop through all expected tables to see if exist and match schema. If not then create one.
 99        for ingest_table_name, ingest_table_dict in self.table_info_dict.items():
100            primary_key = ingest_table_dict.get("primary_key")
101
102            # Get TDR schema info for tables to ingest
103            expected_tdr_schema_dict = InferTDRSchema(
104                input_metadata=ingest_table_dict["ingest_metadata"],
105                table_name=ingest_table_name,
106                all_fields_non_required=self.all_fields_non_required,
107                primary_key=primary_key,
108                allow_disparate_data_types_in_column=self.force_disparate_rows_to_string,
109            ).infer_schema()
110
111            # If unique id then add to table json
112            if primary_key:
113                expected_tdr_schema_dict["primaryKey"] = [ingest_table_dict["primary_key"]]
114
115            # add table to ones to create if it does not exist
116            if ingest_table_name not in existing_tdr_table_schema_info:
117                # Ensure there is columns in table before adding to list
118                if expected_tdr_schema_dict['columns']:
119                    tables_to_create.append(expected_tdr_schema_dict)
120            else:
121                # Compare columns
122                columns_to_update = self._compare_table(
123                    reference_dataset_table=expected_tdr_schema_dict,
124                    target_dataset_table=existing_tdr_table_schema_info[ingest_table_name],
125                    table_name=ingest_table_name
126                )
127                if columns_to_update:
128                    # If any updates needed nothing is done for whole ingest
129                    valid = False
130                    for column_to_update_dict in columns_to_update:
131                        logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}")
132                else:
133                    logging.info(f"Table {ingest_table_name} exists and is up to date")
134        if valid:
135            #  Does nothing with relationships for now
136            if tables_to_create:
137                tables_string = ", ".join(
138                    [table["name"] for table in tables_to_create]
139                )
140                logging.info(f"Table(s) {tables_string} do not exist in dataset. Will attempt to create")
141                self.tdr.update_dataset_schema(
142                    dataset_id=self.dataset_id,
143                    update_note=f"Creating tables in dataset {self.dataset_id}",
144                    tables_to_add=tables_to_create
145                )
146            else:
147                logging.info("All tables in dataset exist and are up to date")
148        else:
149            logging.warning("Tables do not appear to be valid")
150            if self.ignore_existing_schema_mismatch:
151                logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used")
152            else:
153                logging.error(
154                    "Tables need manual updating. If want to force through use ignore_existing_schema_mismatch."
155                )
156                sys.exit(1)
157        # Return schema info for all existing tables after creation
158        dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json()
159        # Return dict with key being table name and value being dict of columns with key being
160        # column name and value being column info
161        return {
162            table_dict["name"]: {
163                column_dict["name"]: column_dict
164                for column_dict in table_dict["columns"]
165            }
166            for table_dict in dataset_info["schema"]["tables"]
167        }
class SetUpTDRTables:
 11class SetUpTDRTables:
 12    """A class to set up TDR tables by comparing and updating schemas."""
 13
 14    def __init__(
 15            self,
 16            tdr: TDR,
 17            dataset_id: str,
 18            table_info_dict: dict,
 19            all_fields_non_required: bool = False,
 20            force_disparate_rows_to_string: bool = False,
 21            ignore_existing_schema_mismatch: bool = False
 22    ):
 23        """
 24        Initialize the SetUpTDRTables class.
 25
 26        **Args:**
 27        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 28        - dataset_id (str): The ID of the dataset.
 29        - table_info_dict (dict): A dictionary containing table information.
 30        - all_fields_non_required (bool): A boolean indicating whether all columns are non-required.
 31        Defaults to `False`.
 32        - force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to
 33                string. Defaults to `False`.
 34        - ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not
 35                matching existing schema. Defaults to `False`.
 36        """
 37        self.tdr = tdr
 38        """@private"""
 39        self.dataset_id = dataset_id
 40        """@private"""
 41        self.table_info_dict = table_info_dict
 42        """@private"""
 43        self.all_fields_non_required = all_fields_non_required
 44        """@private"""
 45        self.force_disparate_rows_to_string = force_disparate_rows_to_string
 46        """@private"""
 47        self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch
 48        """@private"""
 49
 50    @staticmethod
 51    def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dict], table_name: str) -> list[dict]:
 52        """
 53        Compare tables between two datasets.
 54
 55        Args:
 56            reference_dataset_table (dict): The reference dataset table schema.
 57            target_dataset_table (list[dict]): The target dataset table schema.
 58            table_name (str): The name of the table being compared.
 59
 60        Returns:
 61            list[dict]: A list of columns that need to be updated.
 62        """
 63        logging.info(f"Comparing table {reference_dataset_table['name']} to existing target table")
 64        columns_to_update = []
 65        # Convert target table to dict for easier comparison
 66        target_dataset_table_dict = {col["name"]: col for col in target_dataset_table}
 67        # Go through each column in reference table and see if it exists and if so, is it the same in target table
 68        for column_dict in reference_dataset_table["columns"]:
 69            # Check if column exists in target table already
 70            if column_dict["name"] not in target_dataset_table_dict.keys():
 71                column_dict["action"] = "add"
 72                columns_to_update.append(column_dict)
 73            else:
 74                # Check if column exists but is not set up the same
 75                if column_dict != target_dataset_table_dict[column_dict["name"]]:
 76                    column_dict["action"] = "modify"
 77                    logging.warning(
 78                        f'Column {column_dict["name"]} in table {table_name} does not match. Expected column info:\n'
 79                        f'{json.dumps(column_dict, indent=4)}\nexisting column info:\n'
 80                        f'{json.dumps(target_dataset_table_dict[column_dict["name"]], indent=4)}'
 81                    )
 82                    columns_to_update.append(column_dict)
 83        return columns_to_update
 84
 85    def run(self) -> dict:
 86        """
 87        Run the setup process to ensure tables are created or updated as needed.
 88
 89        **Returns:**
 90        - dict: A dictionary with table names as keys and column information as values.
 91        """
 92        dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json()
 93        existing_tdr_table_schema_info = {
 94            table_dict["name"]: table_dict["columns"]
 95            for table_dict in dataset_info["schema"]["tables"]
 96        }
 97        tables_to_create = []
 98        valid = True
 99        # Loop through all expected tables to see if exist and match schema. If not then create one.
100        for ingest_table_name, ingest_table_dict in self.table_info_dict.items():
101            primary_key = ingest_table_dict.get("primary_key")
102
103            # Get TDR schema info for tables to ingest
104            expected_tdr_schema_dict = InferTDRSchema(
105                input_metadata=ingest_table_dict["ingest_metadata"],
106                table_name=ingest_table_name,
107                all_fields_non_required=self.all_fields_non_required,
108                primary_key=primary_key,
109                allow_disparate_data_types_in_column=self.force_disparate_rows_to_string,
110            ).infer_schema()
111
112            # If unique id then add to table json
113            if primary_key:
114                expected_tdr_schema_dict["primaryKey"] = [ingest_table_dict["primary_key"]]
115
116            # add table to ones to create if it does not exist
117            if ingest_table_name not in existing_tdr_table_schema_info:
118                # Ensure there is columns in table before adding to list
119                if expected_tdr_schema_dict['columns']:
120                    tables_to_create.append(expected_tdr_schema_dict)
121            else:
122                # Compare columns
123                columns_to_update = self._compare_table(
124                    reference_dataset_table=expected_tdr_schema_dict,
125                    target_dataset_table=existing_tdr_table_schema_info[ingest_table_name],
126                    table_name=ingest_table_name
127                )
128                if columns_to_update:
129                    # If any updates needed nothing is done for whole ingest
130                    valid = False
131                    for column_to_update_dict in columns_to_update:
132                        logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}")
133                else:
134                    logging.info(f"Table {ingest_table_name} exists and is up to date")
135        if valid:
136            #  Does nothing with relationships for now
137            if tables_to_create:
138                tables_string = ", ".join(
139                    [table["name"] for table in tables_to_create]
140                )
141                logging.info(f"Table(s) {tables_string} do not exist in dataset. Will attempt to create")
142                self.tdr.update_dataset_schema(
143                    dataset_id=self.dataset_id,
144                    update_note=f"Creating tables in dataset {self.dataset_id}",
145                    tables_to_add=tables_to_create
146                )
147            else:
148                logging.info("All tables in dataset exist and are up to date")
149        else:
150            logging.warning("Tables do not appear to be valid")
151            if self.ignore_existing_schema_mismatch:
152                logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used")
153            else:
154                logging.error(
155                    "Tables need manual updating. If want to force through use ignore_existing_schema_mismatch."
156                )
157                sys.exit(1)
158        # Return schema info for all existing tables after creation
159        dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json()
160        # Return dict with key being table name and value being dict of columns with key being
161        # column name and value being column info
162        return {
163            table_dict["name"]: {
164                column_dict["name"]: column_dict
165                for column_dict in table_dict["columns"]
166            }
167            for table_dict in dataset_info["schema"]["tables"]
168        }

A class to set up TDR tables by comparing and updating schemas.

SetUpTDRTables( tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, dataset_id: str, table_info_dict: dict, all_fields_non_required: bool = False, force_disparate_rows_to_string: bool = False, ignore_existing_schema_mismatch: bool = False)
14    def __init__(
15            self,
16            tdr: TDR,
17            dataset_id: str,
18            table_info_dict: dict,
19            all_fields_non_required: bool = False,
20            force_disparate_rows_to_string: bool = False,
21            ignore_existing_schema_mismatch: bool = False
22    ):
23        """
24        Initialize the SetUpTDRTables class.
25
26        **Args:**
27        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
28        - dataset_id (str): The ID of the dataset.
29        - table_info_dict (dict): A dictionary containing table information.
30        - all_fields_non_required (bool): A boolean indicating whether all columns are non-required.
31        Defaults to `False`.
32        - force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to
33                string. Defaults to `False`.
34        - ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not
35                matching existing schema. Defaults to `False`.
36        """
37        self.tdr = tdr
38        """@private"""
39        self.dataset_id = dataset_id
40        """@private"""
41        self.table_info_dict = table_info_dict
42        """@private"""
43        self.all_fields_non_required = all_fields_non_required
44        """@private"""
45        self.force_disparate_rows_to_string = force_disparate_rows_to_string
46        """@private"""
47        self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch
48        """@private"""

Initialize the SetUpTDRTables class.

Args:

  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): An instance of the TDR class.
  • dataset_id (str): The ID of the dataset.
  • table_info_dict (dict): A dictionary containing table information.
  • all_fields_non_required (bool): A boolean indicating whether all columns are non-required. Defaults to False.
  • force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to string. Defaults to False.
  • ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not matching existing schema. Defaults to False.
def run(self) -> dict:
 85    def run(self) -> dict:
 86        """
 87        Run the setup process to ensure tables are created or updated as needed.
 88
 89        **Returns:**
 90        - dict: A dictionary with table names as keys and column information as values.
 91        """
 92        dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json()
 93        existing_tdr_table_schema_info = {
 94            table_dict["name"]: table_dict["columns"]
 95            for table_dict in dataset_info["schema"]["tables"]
 96        }
 97        tables_to_create = []
 98        valid = True
 99        # Loop through all expected tables to see if exist and match schema. If not then create one.
100        for ingest_table_name, ingest_table_dict in self.table_info_dict.items():
101            primary_key = ingest_table_dict.get("primary_key")
102
103            # Get TDR schema info for tables to ingest
104            expected_tdr_schema_dict = InferTDRSchema(
105                input_metadata=ingest_table_dict["ingest_metadata"],
106                table_name=ingest_table_name,
107                all_fields_non_required=self.all_fields_non_required,
108                primary_key=primary_key,
109                allow_disparate_data_types_in_column=self.force_disparate_rows_to_string,
110            ).infer_schema()
111
112            # If unique id then add to table json
113            if primary_key:
114                expected_tdr_schema_dict["primaryKey"] = [ingest_table_dict["primary_key"]]
115
116            # add table to ones to create if it does not exist
117            if ingest_table_name not in existing_tdr_table_schema_info:
118                # Ensure there is columns in table before adding to list
119                if expected_tdr_schema_dict['columns']:
120                    tables_to_create.append(expected_tdr_schema_dict)
121            else:
122                # Compare columns
123                columns_to_update = self._compare_table(
124                    reference_dataset_table=expected_tdr_schema_dict,
125                    target_dataset_table=existing_tdr_table_schema_info[ingest_table_name],
126                    table_name=ingest_table_name
127                )
128                if columns_to_update:
129                    # If any updates needed nothing is done for whole ingest
130                    valid = False
131                    for column_to_update_dict in columns_to_update:
132                        logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}")
133                else:
134                    logging.info(f"Table {ingest_table_name} exists and is up to date")
135        if valid:
136            #  Does nothing with relationships for now
137            if tables_to_create:
138                tables_string = ", ".join(
139                    [table["name"] for table in tables_to_create]
140                )
141                logging.info(f"Table(s) {tables_string} do not exist in dataset. Will attempt to create")
142                self.tdr.update_dataset_schema(
143                    dataset_id=self.dataset_id,
144                    update_note=f"Creating tables in dataset {self.dataset_id}",
145                    tables_to_add=tables_to_create
146                )
147            else:
148                logging.info("All tables in dataset exist and are up to date")
149        else:
150            logging.warning("Tables do not appear to be valid")
151            if self.ignore_existing_schema_mismatch:
152                logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used")
153            else:
154                logging.error(
155                    "Tables need manual updating. If want to force through use ignore_existing_schema_mismatch."
156                )
157                sys.exit(1)
158        # Return schema info for all existing tables after creation
159        dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json()
160        # Return dict with key being table name and value being dict of columns with key being
161        # column name and value being column info
162        return {
163            table_dict["name"]: {
164                column_dict["name"]: column_dict
165                for column_dict in table_dict["columns"]
166            }
167            for table_dict in dataset_info["schema"]["tables"]
168        }

Run the setup process to ensure tables are created or updated as needed.

Returns:

  • dict: A dictionary with table names as keys and column information as values.