ops_utils.tdr_utils.tdr_table_utils
Module for interacting with TDR tables.
1"""Module for interacting with TDR tables.""" 2import json 3import logging 4import sys 5 6from ..tdr_utils.tdr_api_utils import TDR 7from ..tdr_utils.tdr_schema_utils import InferTDRSchema 8 9 10class SetUpTDRTables: 11 """A class to set up TDR tables by comparing and updating schemas.""" 12 13 def __init__( 14 self, 15 tdr: TDR, 16 dataset_id: str, 17 table_info_dict: dict, 18 all_fields_non_required: bool = False, 19 force_disparate_rows_to_string: bool = False, 20 ignore_existing_schema_mismatch: bool = False 21 ): 22 """ 23 Initialize the SetUpTDRTables class. 24 25 **Args:** 26 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 27 - dataset_id (str): The ID of the dataset. 28 - table_info_dict (dict): A dictionary containing table information. 29 - all_fields_non_required (bool): A boolean indicating whether all columns are non-required. 30 Defaults to `False`. 31 - force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to 32 string. Defaults to `False`. 33 - ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not 34 matching existing schema. Defaults to `False`. 35 """ 36 self.tdr = tdr 37 """@private""" 38 self.dataset_id = dataset_id 39 """@private""" 40 self.table_info_dict = table_info_dict 41 """@private""" 42 self.all_fields_non_required = all_fields_non_required 43 """@private""" 44 self.force_disparate_rows_to_string = force_disparate_rows_to_string 45 """@private""" 46 self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch 47 """@private""" 48 49 @staticmethod 50 def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dict], table_name: str) -> list[dict]: 51 """ 52 Compare tables between two datasets. 53 54 Args: 55 reference_dataset_table (dict): The reference dataset table schema. 56 target_dataset_table (list[dict]): The target dataset table schema. 57 table_name (str): The name of the table being compared. 58 59 Returns: 60 list[dict]: A list of columns that need to be updated. 61 """ 62 logging.info(f"Comparing table {reference_dataset_table['name']} to existing target table") 63 columns_to_update = [] 64 # Convert target table to dict for easier comparison 65 target_dataset_table_dict = {col["name"]: col for col in target_dataset_table} 66 # Go through each column in reference table and see if it exists and if so, is it the same in target table 67 for column_dict in reference_dataset_table["columns"]: 68 # Check if column exists in target table already 69 if column_dict["name"] not in target_dataset_table_dict.keys(): 70 column_dict["action"] = "add" 71 columns_to_update.append(column_dict) 72 else: 73 # Check if column exists but is not set up the same 74 if column_dict != target_dataset_table_dict[column_dict["name"]]: 75 column_dict["action"] = "modify" 76 logging.warning( 77 f'Column {column_dict["name"]} in table {table_name} does not match. Expected column info:\n' 78 f'{json.dumps(column_dict, indent=4)}\nexisting column info:\n' 79 f'{json.dumps(target_dataset_table_dict[column_dict["name"]], indent=4)}' 80 ) 81 columns_to_update.append(column_dict) 82 return columns_to_update 83 84 def run(self) -> dict: 85 """ 86 Run the setup process to ensure tables are created or updated as needed. 87 88 **Returns:** 89 - dict: A dictionary with table names as keys and column information as values. 90 """ 91 dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json() 92 existing_tdr_table_schema_info = { 93 table_dict["name"]: table_dict["columns"] 94 for table_dict in dataset_info["schema"]["tables"] 95 } 96 tables_to_create = [] 97 valid = True 98 # Loop through all expected tables to see if exist and match schema. If not then create one. 99 for ingest_table_name, ingest_table_dict in self.table_info_dict.items(): 100 primary_key = ingest_table_dict.get("primary_key") 101 102 # Get TDR schema info for tables to ingest 103 expected_tdr_schema_dict = InferTDRSchema( 104 input_metadata=ingest_table_dict["ingest_metadata"], 105 table_name=ingest_table_name, 106 all_fields_non_required=self.all_fields_non_required, 107 primary_key=primary_key, 108 allow_disparate_data_types_in_column=self.force_disparate_rows_to_string, 109 ).infer_schema() 110 111 # If unique id then add to table json 112 if primary_key: 113 expected_tdr_schema_dict["primaryKey"] = [ingest_table_dict["primary_key"]] 114 115 # add table to ones to create if it does not exist 116 if ingest_table_name not in existing_tdr_table_schema_info: 117 # Ensure there is columns in table before adding to list 118 if expected_tdr_schema_dict['columns']: 119 tables_to_create.append(expected_tdr_schema_dict) 120 else: 121 # Compare columns 122 columns_to_update = self._compare_table( 123 reference_dataset_table=expected_tdr_schema_dict, 124 target_dataset_table=existing_tdr_table_schema_info[ingest_table_name], 125 table_name=ingest_table_name 126 ) 127 if columns_to_update: 128 # If any updates needed nothing is done for whole ingest 129 valid = False 130 for column_to_update_dict in columns_to_update: 131 logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}") 132 else: 133 logging.info(f"Table {ingest_table_name} exists and is up to date") 134 if valid: 135 # Does nothing with relationships for now 136 if tables_to_create: 137 tables_string = ", ".join( 138 [table["name"] for table in tables_to_create] 139 ) 140 logging.info(f"Table(s) {tables_string} do not exist in dataset. Will attempt to create") 141 self.tdr.update_dataset_schema( 142 dataset_id=self.dataset_id, 143 update_note=f"Creating tables in dataset {self.dataset_id}", 144 tables_to_add=tables_to_create 145 ) 146 else: 147 logging.info("All tables in dataset exist and are up to date") 148 else: 149 logging.warning("Tables do not appear to be valid") 150 if self.ignore_existing_schema_mismatch: 151 logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used") 152 else: 153 logging.error( 154 "Tables need manual updating. If want to force through use ignore_existing_schema_mismatch." 155 ) 156 sys.exit(1) 157 # Return schema info for all existing tables after creation 158 dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json() 159 # Return dict with key being table name and value being dict of columns with key being 160 # column name and value being column info 161 return { 162 table_dict["name"]: { 163 column_dict["name"]: column_dict 164 for column_dict in table_dict["columns"] 165 } 166 for table_dict in dataset_info["schema"]["tables"] 167 }
class
SetUpTDRTables:
11class SetUpTDRTables: 12 """A class to set up TDR tables by comparing and updating schemas.""" 13 14 def __init__( 15 self, 16 tdr: TDR, 17 dataset_id: str, 18 table_info_dict: dict, 19 all_fields_non_required: bool = False, 20 force_disparate_rows_to_string: bool = False, 21 ignore_existing_schema_mismatch: bool = False 22 ): 23 """ 24 Initialize the SetUpTDRTables class. 25 26 **Args:** 27 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 28 - dataset_id (str): The ID of the dataset. 29 - table_info_dict (dict): A dictionary containing table information. 30 - all_fields_non_required (bool): A boolean indicating whether all columns are non-required. 31 Defaults to `False`. 32 - force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to 33 string. Defaults to `False`. 34 - ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not 35 matching existing schema. Defaults to `False`. 36 """ 37 self.tdr = tdr 38 """@private""" 39 self.dataset_id = dataset_id 40 """@private""" 41 self.table_info_dict = table_info_dict 42 """@private""" 43 self.all_fields_non_required = all_fields_non_required 44 """@private""" 45 self.force_disparate_rows_to_string = force_disparate_rows_to_string 46 """@private""" 47 self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch 48 """@private""" 49 50 @staticmethod 51 def _compare_table(reference_dataset_table: dict, target_dataset_table: list[dict], table_name: str) -> list[dict]: 52 """ 53 Compare tables between two datasets. 54 55 Args: 56 reference_dataset_table (dict): The reference dataset table schema. 57 target_dataset_table (list[dict]): The target dataset table schema. 58 table_name (str): The name of the table being compared. 59 60 Returns: 61 list[dict]: A list of columns that need to be updated. 62 """ 63 logging.info(f"Comparing table {reference_dataset_table['name']} to existing target table") 64 columns_to_update = [] 65 # Convert target table to dict for easier comparison 66 target_dataset_table_dict = {col["name"]: col for col in target_dataset_table} 67 # Go through each column in reference table and see if it exists and if so, is it the same in target table 68 for column_dict in reference_dataset_table["columns"]: 69 # Check if column exists in target table already 70 if column_dict["name"] not in target_dataset_table_dict.keys(): 71 column_dict["action"] = "add" 72 columns_to_update.append(column_dict) 73 else: 74 # Check if column exists but is not set up the same 75 if column_dict != target_dataset_table_dict[column_dict["name"]]: 76 column_dict["action"] = "modify" 77 logging.warning( 78 f'Column {column_dict["name"]} in table {table_name} does not match. Expected column info:\n' 79 f'{json.dumps(column_dict, indent=4)}\nexisting column info:\n' 80 f'{json.dumps(target_dataset_table_dict[column_dict["name"]], indent=4)}' 81 ) 82 columns_to_update.append(column_dict) 83 return columns_to_update 84 85 def run(self) -> dict: 86 """ 87 Run the setup process to ensure tables are created or updated as needed. 88 89 **Returns:** 90 - dict: A dictionary with table names as keys and column information as values. 91 """ 92 dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json() 93 existing_tdr_table_schema_info = { 94 table_dict["name"]: table_dict["columns"] 95 for table_dict in dataset_info["schema"]["tables"] 96 } 97 tables_to_create = [] 98 valid = True 99 # Loop through all expected tables to see if exist and match schema. If not then create one. 100 for ingest_table_name, ingest_table_dict in self.table_info_dict.items(): 101 primary_key = ingest_table_dict.get("primary_key") 102 103 # Get TDR schema info for tables to ingest 104 expected_tdr_schema_dict = InferTDRSchema( 105 input_metadata=ingest_table_dict["ingest_metadata"], 106 table_name=ingest_table_name, 107 all_fields_non_required=self.all_fields_non_required, 108 primary_key=primary_key, 109 allow_disparate_data_types_in_column=self.force_disparate_rows_to_string, 110 ).infer_schema() 111 112 # If unique id then add to table json 113 if primary_key: 114 expected_tdr_schema_dict["primaryKey"] = [ingest_table_dict["primary_key"]] 115 116 # add table to ones to create if it does not exist 117 if ingest_table_name not in existing_tdr_table_schema_info: 118 # Ensure there is columns in table before adding to list 119 if expected_tdr_schema_dict['columns']: 120 tables_to_create.append(expected_tdr_schema_dict) 121 else: 122 # Compare columns 123 columns_to_update = self._compare_table( 124 reference_dataset_table=expected_tdr_schema_dict, 125 target_dataset_table=existing_tdr_table_schema_info[ingest_table_name], 126 table_name=ingest_table_name 127 ) 128 if columns_to_update: 129 # If any updates needed nothing is done for whole ingest 130 valid = False 131 for column_to_update_dict in columns_to_update: 132 logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}") 133 else: 134 logging.info(f"Table {ingest_table_name} exists and is up to date") 135 if valid: 136 # Does nothing with relationships for now 137 if tables_to_create: 138 tables_string = ", ".join( 139 [table["name"] for table in tables_to_create] 140 ) 141 logging.info(f"Table(s) {tables_string} do not exist in dataset. Will attempt to create") 142 self.tdr.update_dataset_schema( 143 dataset_id=self.dataset_id, 144 update_note=f"Creating tables in dataset {self.dataset_id}", 145 tables_to_add=tables_to_create 146 ) 147 else: 148 logging.info("All tables in dataset exist and are up to date") 149 else: 150 logging.warning("Tables do not appear to be valid") 151 if self.ignore_existing_schema_mismatch: 152 logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used") 153 else: 154 logging.error( 155 "Tables need manual updating. If want to force through use ignore_existing_schema_mismatch." 156 ) 157 sys.exit(1) 158 # Return schema info for all existing tables after creation 159 dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json() 160 # Return dict with key being table name and value being dict of columns with key being 161 # column name and value being column info 162 return { 163 table_dict["name"]: { 164 column_dict["name"]: column_dict 165 for column_dict in table_dict["columns"] 166 } 167 for table_dict in dataset_info["schema"]["tables"] 168 }
A class to set up TDR tables by comparing and updating schemas.
SetUpTDRTables( tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, dataset_id: str, table_info_dict: dict, all_fields_non_required: bool = False, force_disparate_rows_to_string: bool = False, ignore_existing_schema_mismatch: bool = False)
14 def __init__( 15 self, 16 tdr: TDR, 17 dataset_id: str, 18 table_info_dict: dict, 19 all_fields_non_required: bool = False, 20 force_disparate_rows_to_string: bool = False, 21 ignore_existing_schema_mismatch: bool = False 22 ): 23 """ 24 Initialize the SetUpTDRTables class. 25 26 **Args:** 27 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 28 - dataset_id (str): The ID of the dataset. 29 - table_info_dict (dict): A dictionary containing table information. 30 - all_fields_non_required (bool): A boolean indicating whether all columns are non-required. 31 Defaults to `False`. 32 - force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to 33 string. Defaults to `False`. 34 - ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not 35 matching existing schema. Defaults to `False`. 36 """ 37 self.tdr = tdr 38 """@private""" 39 self.dataset_id = dataset_id 40 """@private""" 41 self.table_info_dict = table_info_dict 42 """@private""" 43 self.all_fields_non_required = all_fields_non_required 44 """@private""" 45 self.force_disparate_rows_to_string = force_disparate_rows_to_string 46 """@private""" 47 self.ignore_existing_schema_mismatch = ignore_existing_schema_mismatch 48 """@private"""
Initialize the SetUpTDRTables class.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - dataset_id (str): The ID of the dataset.
- table_info_dict (dict): A dictionary containing table information.
- all_fields_non_required (bool): A boolean indicating whether all columns are non-required.
Defaults to
False
. - force_disparate_rows_to_string (bool): A boolean indicating whether disparate rows should be forced to
string. Defaults to
False
. - ignore_existing_schema_mismatch (bool): A boolean indicating whether to not fail on data type not
matching existing schema. Defaults to
False
.
def
run(self) -> dict:
85 def run(self) -> dict: 86 """ 87 Run the setup process to ensure tables are created or updated as needed. 88 89 **Returns:** 90 - dict: A dictionary with table names as keys and column information as values. 91 """ 92 dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json() 93 existing_tdr_table_schema_info = { 94 table_dict["name"]: table_dict["columns"] 95 for table_dict in dataset_info["schema"]["tables"] 96 } 97 tables_to_create = [] 98 valid = True 99 # Loop through all expected tables to see if exist and match schema. If not then create one. 100 for ingest_table_name, ingest_table_dict in self.table_info_dict.items(): 101 primary_key = ingest_table_dict.get("primary_key") 102 103 # Get TDR schema info for tables to ingest 104 expected_tdr_schema_dict = InferTDRSchema( 105 input_metadata=ingest_table_dict["ingest_metadata"], 106 table_name=ingest_table_name, 107 all_fields_non_required=self.all_fields_non_required, 108 primary_key=primary_key, 109 allow_disparate_data_types_in_column=self.force_disparate_rows_to_string, 110 ).infer_schema() 111 112 # If unique id then add to table json 113 if primary_key: 114 expected_tdr_schema_dict["primaryKey"] = [ingest_table_dict["primary_key"]] 115 116 # add table to ones to create if it does not exist 117 if ingest_table_name not in existing_tdr_table_schema_info: 118 # Ensure there is columns in table before adding to list 119 if expected_tdr_schema_dict['columns']: 120 tables_to_create.append(expected_tdr_schema_dict) 121 else: 122 # Compare columns 123 columns_to_update = self._compare_table( 124 reference_dataset_table=expected_tdr_schema_dict, 125 target_dataset_table=existing_tdr_table_schema_info[ingest_table_name], 126 table_name=ingest_table_name 127 ) 128 if columns_to_update: 129 # If any updates needed nothing is done for whole ingest 130 valid = False 131 for column_to_update_dict in columns_to_update: 132 logging.warning(f"Column {column_to_update_dict['name']} needs updates in {ingest_table_name}") 133 else: 134 logging.info(f"Table {ingest_table_name} exists and is up to date") 135 if valid: 136 # Does nothing with relationships for now 137 if tables_to_create: 138 tables_string = ", ".join( 139 [table["name"] for table in tables_to_create] 140 ) 141 logging.info(f"Table(s) {tables_string} do not exist in dataset. Will attempt to create") 142 self.tdr.update_dataset_schema( 143 dataset_id=self.dataset_id, 144 update_note=f"Creating tables in dataset {self.dataset_id}", 145 tables_to_add=tables_to_create 146 ) 147 else: 148 logging.info("All tables in dataset exist and are up to date") 149 else: 150 logging.warning("Tables do not appear to be valid") 151 if self.ignore_existing_schema_mismatch: 152 logging.warning("Ignoring schema mismatch because ignore_existing_schema_mismatch was used") 153 else: 154 logging.error( 155 "Tables need manual updating. If want to force through use ignore_existing_schema_mismatch." 156 ) 157 sys.exit(1) 158 # Return schema info for all existing tables after creation 159 dataset_info = self.tdr.get_dataset_info(dataset_id=self.dataset_id, info_to_include=["SCHEMA"]).json() 160 # Return dict with key being table name and value being dict of columns with key being 161 # column name and value being column info 162 return { 163 table_dict["name"]: { 164 column_dict["name"]: column_dict 165 for column_dict in table_dict["columns"] 166 } 167 for table_dict in dataset_info["schema"]["tables"] 168 }
Run the setup process to ensure tables are created or updated as needed.
Returns:
- dict: A dictionary with table names as keys and column information as values.