ops_utils.tdr_utils.tdr_schema_utils
Utility classes for TDR schema.
1"""Utility classes for TDR schema.""" 2 3import logging 4import re 5import time 6import numpy as np 7import pandas as pd 8from datetime import date, datetime 9from typing import Any, Optional 10 11 12class InferTDRSchema: 13 """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.""" 14 15 PYTHON_TDR_DATA_TYPE_MAPPING = { 16 str: "string", 17 "fileref": "fileref", 18 bool: "boolean", 19 bytes: "bytes", 20 date: "date", 21 datetime: "datetime", 22 float: "float64", 23 np.float64: "float64", 24 int: "int64", 25 np.int64: "int64", 26 time: "time", 27 } 28 """@private""" 29 30 def __init__( 31 self, 32 input_metadata: list[dict], 33 table_name: str, 34 all_fields_non_required: bool = False, 35 allow_disparate_data_types_in_column: bool = False, 36 primary_key: Optional[str] = None 37 ): 38 """ 39 Initialize the InferTDRSchema class. 40 41 **Args:** 42 - input_metadata (list[dict]): The input metadata to infer the schema from. 43 - table_name (str): The name of the table for which the schema is being inferred. 44 - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required 45 besides for primary key. Defaults to `False` 46 - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a 47 column to be of type `str` Defaults to `False`. 48 - primary_key (str, optional): The name of the primary key column. Used to determine if the column 49 should be required 50 """ 51 self.input_metadata = input_metadata 52 """@private""" 53 self.table_name = table_name 54 """@private""" 55 self.all_fields_non_required = all_fields_non_required 56 """@private""" 57 self.primary_key = primary_key 58 """@private""" 59 self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column 60 """@private""" 61 62 def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]: 63 """ 64 Check if all values for each header are of the same type. 65 66 **Args:** 67 - key_value_type_mappings (dict): A dictionary where the key is the header, 68 and the value is a list of values for the header. 69 70 Raises: 71 Exception: If types do not match for any header. 72 """ 73 matching = [] 74 75 disparate_header_info = [] 76 77 for header, values_for_header in key_value_type_mappings.items(): 78 # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring 79 # "None" entries 80 if (any(isinstance(item, list) for item in values_for_header if item is not None) and 81 not all(isinstance(item, list) for item in values_for_header if item is not None)): 82 all_values_matching = False 83 # if the row contains ONLY lists of items, check that all items in each list are of the same type (while 84 # ignoring "None" entries) 85 elif all(isinstance(item, list) for item in values_for_header if item is not None): 86 # first get all substrings that have some values 87 non_empty_substrings = [v for v in values_for_header if v] 88 if non_empty_substrings: 89 # get one "type" from the list of values 90 first_match_type = type([v[0] for v in non_empty_substrings][0]) 91 all_values_matching = all( 92 all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings 93 ) 94 else: 95 # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled 96 # below) 97 all_values_matching = True 98 else: 99 # find one value that's non-none to get the type to check against 100 # specifically check if not "None" since we can have all zeroes, for example 101 type_to_match_against = type([v for v in values_for_header if v is not None][0]) 102 # check if all the values in the list that are non-none match the type of the first entry 103 all_values_matching = all( 104 isinstance(v, type_to_match_against) for v in values_for_header if v is not None 105 ) 106 107 # If ALL rows for the header are none, force the type to be a string 108 if all_values_matching and not any(values_for_header): 109 matching.append({header: all_values_matching}) 110 disparate_header_info.append( 111 { 112 "header": header, 113 "force_to_string": True, 114 } 115 ) 116 if not all_values_matching and self.allow_disparate_data_types_in_column: 117 logging.info( 118 f"Not all data types matched for header '{header}' but forcing them to strings as " 119 f"'allow_disparate_data_types_in_column' setting is set to true" 120 ) 121 matching.append({header: True}) 122 disparate_header_info.append( 123 { 124 "header": header, 125 "force_to_string": True, 126 } 127 ) 128 else: 129 matching.append({header: all_values_matching}) # type: ignore[dict-item] 130 disparate_header_info.append( 131 { 132 "header": header, 133 "force_to_string": False, 134 } 135 ) 136 137 # Returns true if all headers are determined to be "matching" 138 problematic_headers = [ 139 d.keys() 140 for d in matching 141 if not list(d.values())[0] 142 ] 143 144 if problematic_headers: 145 raise Exception( 146 f"Not all values for the following headers are of the same type: {problematic_headers}. To force all" 147 f" values in rows of a given column to be forced to the same type and bypass this error, re-run with " 148 f"the 'force_disparate_rows_to_string' option set to true" 149 ) 150 151 return disparate_header_info 152 153 def _python_type_to_tdr_type_conversion(self, value_for_header: Any) -> str: 154 """ 155 Convert Python data types to TDR data types. 156 157 Args: 158 value_for_header (Any): The value to determine the TDR type for. 159 160 Returns: 161 str: The TDR data type. 162 """ 163 gcp_fileref_regex = "^gs://.*" 164 165 # Find potential file references 166 if isinstance(value_for_header, str): 167 gcp_match = re.search( 168 pattern=gcp_fileref_regex, string=value_for_header) 169 if gcp_match: 170 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 171 172 # Tried to use this to parse datetimes, but it was turning too many 173 # regular ints into datetimes. Commenting out for now 174 # try: 175 # date_or_time = parser.parse(value_for_header) 176 # return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(date_or_time)] 177 # pass 178 # except (TypeError, ParserError): 179 # pass 180 181 if isinstance(value_for_header, list): 182 # check for potential list of filerefs 183 for v in value_for_header: 184 if isinstance(v, str): 185 gcp_match = re.search(pattern=gcp_fileref_regex, string=v) 186 if gcp_match: 187 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 188 non_none_entry_in_list = [a for a in value_for_header if a is not None][0] 189 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(non_none_entry_in_list)] 190 191 # if none of the above special cases apply, just pass the type of the value to determine the TDR type 192 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(value_for_header)] 193 194 def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]: 195 """ 196 Generate the metadata for each column's header name, data type, and whether it's an array of values. 197 198 Args: 199 key_value_type_mappings (dict): A dictionary where the key is the header, 200 and the value is a list of values for the header. 201 202 Returns: 203 list[dict]: A list of dictionaries containing column metadata. 204 """ 205 columns = [] 206 207 for header, values_for_header in key_value_type_mappings.items(): 208 force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0] 209 210 # if the ANY of the values for a given header is a list, we assume that column contains arrays of values 211 array_of = True if any(isinstance(v, list) for v in values_for_header) else False 212 213 if force_to_string: 214 logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column") 215 data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str] 216 else: 217 # find either the first item that's non-None, or the first non-empty list 218 data_type = self._python_type_to_tdr_type_conversion([a for a in values_for_header if a is not None][0]) 219 220 column_metadata = { 221 "name": header, 222 "datatype": data_type, 223 "array_of": array_of, 224 } 225 columns.append(column_metadata) 226 227 return columns 228 229 def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]: 230 """ 231 Determine whether each header is required or not. 232 233 Args: 234 metadata_df (Any): The original dataframe. 235 dataframe_headers (list[str]): A list of dataframe headers. 236 237 Returns: 238 list[dict]: A list of dictionaries containing header requirements. 239 """ 240 header_requirements = [] 241 242 na_replaced = metadata_df.replace({None: np.nan}) 243 for header in dataframe_headers: 244 all_none = na_replaced[header].isna().all() 245 some_none = na_replaced[header].isna().any() 246 contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any() 247 248 # if the column contains any arrays, set it as optional since arrays cannot be required in tdr 249 if contains_array: 250 header_requirements.append({"name": header, "required": False}) 251 # if all rows are none for a given column, we set the default type to "string" type in TDR 252 elif all_none: 253 header_requirements.append({"name": header, "required": False, "data_type": "string"}) 254 # if some rows are none or all non required is set to true AND header 255 # is not primary key, we set the column to non-required 256 elif some_none or (self.all_fields_non_required and header != self.primary_key): 257 header_requirements.append({"name": header, "required": False}) 258 else: 259 header_requirements.append({"name": header, "required": True}) 260 261 return header_requirements 262 263 @staticmethod 264 def _reformat_metadata(cleaned_metadata: list[dict]) -> dict: 265 """ 266 Create a dictionary where the key is the header name, and the value is a list of all values for that header. 267 268 Args: 269 cleaned_metadata (list[dict]): The cleaned metadata. 270 271 Returns: 272 dict: A dictionary with header names as keys and lists of values as values. 273 """ 274 key_value_type_mappings = {} 275 unique_headers = {key for row in cleaned_metadata for key in row} 276 277 for header in unique_headers: 278 for row in cleaned_metadata: 279 value = row[header] 280 if header not in key_value_type_mappings: 281 key_value_type_mappings[header] = [value] 282 else: 283 key_value_type_mappings[header].append(value) 284 return key_value_type_mappings 285 286 def infer_schema(self) -> dict: 287 """ 288 Infer the schema for the table based on the input metadata. 289 290 **Returns:** 291 - dict: The inferred schema in the format expected by TDR. 292 """ 293 logging.info(f"Inferring schema for table {self.table_name}") 294 # create the dataframe 295 metadata_df = pd.DataFrame(self.input_metadata) 296 # Replace all nan with None 297 metadata_df = metadata_df.where(pd.notnull(metadata_df), None) 298 299 # find all headers that need to be renamed if they have "entity" in them and rename the headers 300 headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0] 301 metadata_df = metadata_df.rename(columns=headers_to_be_renamed) 302 303 # start by gathering the column metadata by determining which headers are required or not 304 column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns)) 305 306 # drop columns where ALL values are None, but keep rows where some values are None 307 # we keep the rows where some values are none because if we happen to have a different column that's none in 308 # every row, we could end up with no data at the end 309 all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all") 310 cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records") 311 key_value_type_mappings = self._reformat_metadata(cleaned_metadata) 312 313 # check to see if all values corresponding to a header are of the same type 314 disparate_header_info = self._check_type_consistency(key_value_type_mappings) 315 316 columns = self._format_column_metadata( 317 key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info 318 ) 319 320 # combine the information about required headers with the data types that were collected 321 for header_metadata in column_metadata: 322 matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]] 323 if matching_metadata: 324 header_metadata.update(matching_metadata[0]) 325 326 tdr_tables_json = { 327 "name": self.table_name, 328 "columns": column_metadata, 329 } 330 331 return tdr_tables_json
class
InferTDRSchema:
13class InferTDRSchema: 14 """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.""" 15 16 PYTHON_TDR_DATA_TYPE_MAPPING = { 17 str: "string", 18 "fileref": "fileref", 19 bool: "boolean", 20 bytes: "bytes", 21 date: "date", 22 datetime: "datetime", 23 float: "float64", 24 np.float64: "float64", 25 int: "int64", 26 np.int64: "int64", 27 time: "time", 28 } 29 """@private""" 30 31 def __init__( 32 self, 33 input_metadata: list[dict], 34 table_name: str, 35 all_fields_non_required: bool = False, 36 allow_disparate_data_types_in_column: bool = False, 37 primary_key: Optional[str] = None 38 ): 39 """ 40 Initialize the InferTDRSchema class. 41 42 **Args:** 43 - input_metadata (list[dict]): The input metadata to infer the schema from. 44 - table_name (str): The name of the table for which the schema is being inferred. 45 - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required 46 besides for primary key. Defaults to `False` 47 - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a 48 column to be of type `str` Defaults to `False`. 49 - primary_key (str, optional): The name of the primary key column. Used to determine if the column 50 should be required 51 """ 52 self.input_metadata = input_metadata 53 """@private""" 54 self.table_name = table_name 55 """@private""" 56 self.all_fields_non_required = all_fields_non_required 57 """@private""" 58 self.primary_key = primary_key 59 """@private""" 60 self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column 61 """@private""" 62 63 def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]: 64 """ 65 Check if all values for each header are of the same type. 66 67 **Args:** 68 - key_value_type_mappings (dict): A dictionary where the key is the header, 69 and the value is a list of values for the header. 70 71 Raises: 72 Exception: If types do not match for any header. 73 """ 74 matching = [] 75 76 disparate_header_info = [] 77 78 for header, values_for_header in key_value_type_mappings.items(): 79 # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring 80 # "None" entries 81 if (any(isinstance(item, list) for item in values_for_header if item is not None) and 82 not all(isinstance(item, list) for item in values_for_header if item is not None)): 83 all_values_matching = False 84 # if the row contains ONLY lists of items, check that all items in each list are of the same type (while 85 # ignoring "None" entries) 86 elif all(isinstance(item, list) for item in values_for_header if item is not None): 87 # first get all substrings that have some values 88 non_empty_substrings = [v for v in values_for_header if v] 89 if non_empty_substrings: 90 # get one "type" from the list of values 91 first_match_type = type([v[0] for v in non_empty_substrings][0]) 92 all_values_matching = all( 93 all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings 94 ) 95 else: 96 # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled 97 # below) 98 all_values_matching = True 99 else: 100 # find one value that's non-none to get the type to check against 101 # specifically check if not "None" since we can have all zeroes, for example 102 type_to_match_against = type([v for v in values_for_header if v is not None][0]) 103 # check if all the values in the list that are non-none match the type of the first entry 104 all_values_matching = all( 105 isinstance(v, type_to_match_against) for v in values_for_header if v is not None 106 ) 107 108 # If ALL rows for the header are none, force the type to be a string 109 if all_values_matching and not any(values_for_header): 110 matching.append({header: all_values_matching}) 111 disparate_header_info.append( 112 { 113 "header": header, 114 "force_to_string": True, 115 } 116 ) 117 if not all_values_matching and self.allow_disparate_data_types_in_column: 118 logging.info( 119 f"Not all data types matched for header '{header}' but forcing them to strings as " 120 f"'allow_disparate_data_types_in_column' setting is set to true" 121 ) 122 matching.append({header: True}) 123 disparate_header_info.append( 124 { 125 "header": header, 126 "force_to_string": True, 127 } 128 ) 129 else: 130 matching.append({header: all_values_matching}) # type: ignore[dict-item] 131 disparate_header_info.append( 132 { 133 "header": header, 134 "force_to_string": False, 135 } 136 ) 137 138 # Returns true if all headers are determined to be "matching" 139 problematic_headers = [ 140 d.keys() 141 for d in matching 142 if not list(d.values())[0] 143 ] 144 145 if problematic_headers: 146 raise Exception( 147 f"Not all values for the following headers are of the same type: {problematic_headers}. To force all" 148 f" values in rows of a given column to be forced to the same type and bypass this error, re-run with " 149 f"the 'force_disparate_rows_to_string' option set to true" 150 ) 151 152 return disparate_header_info 153 154 def _python_type_to_tdr_type_conversion(self, value_for_header: Any) -> str: 155 """ 156 Convert Python data types to TDR data types. 157 158 Args: 159 value_for_header (Any): The value to determine the TDR type for. 160 161 Returns: 162 str: The TDR data type. 163 """ 164 gcp_fileref_regex = "^gs://.*" 165 166 # Find potential file references 167 if isinstance(value_for_header, str): 168 gcp_match = re.search( 169 pattern=gcp_fileref_regex, string=value_for_header) 170 if gcp_match: 171 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 172 173 # Tried to use this to parse datetimes, but it was turning too many 174 # regular ints into datetimes. Commenting out for now 175 # try: 176 # date_or_time = parser.parse(value_for_header) 177 # return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(date_or_time)] 178 # pass 179 # except (TypeError, ParserError): 180 # pass 181 182 if isinstance(value_for_header, list): 183 # check for potential list of filerefs 184 for v in value_for_header: 185 if isinstance(v, str): 186 gcp_match = re.search(pattern=gcp_fileref_regex, string=v) 187 if gcp_match: 188 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 189 non_none_entry_in_list = [a for a in value_for_header if a is not None][0] 190 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(non_none_entry_in_list)] 191 192 # if none of the above special cases apply, just pass the type of the value to determine the TDR type 193 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(value_for_header)] 194 195 def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]: 196 """ 197 Generate the metadata for each column's header name, data type, and whether it's an array of values. 198 199 Args: 200 key_value_type_mappings (dict): A dictionary where the key is the header, 201 and the value is a list of values for the header. 202 203 Returns: 204 list[dict]: A list of dictionaries containing column metadata. 205 """ 206 columns = [] 207 208 for header, values_for_header in key_value_type_mappings.items(): 209 force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0] 210 211 # if the ANY of the values for a given header is a list, we assume that column contains arrays of values 212 array_of = True if any(isinstance(v, list) for v in values_for_header) else False 213 214 if force_to_string: 215 logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column") 216 data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str] 217 else: 218 # find either the first item that's non-None, or the first non-empty list 219 data_type = self._python_type_to_tdr_type_conversion([a for a in values_for_header if a is not None][0]) 220 221 column_metadata = { 222 "name": header, 223 "datatype": data_type, 224 "array_of": array_of, 225 } 226 columns.append(column_metadata) 227 228 return columns 229 230 def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]: 231 """ 232 Determine whether each header is required or not. 233 234 Args: 235 metadata_df (Any): The original dataframe. 236 dataframe_headers (list[str]): A list of dataframe headers. 237 238 Returns: 239 list[dict]: A list of dictionaries containing header requirements. 240 """ 241 header_requirements = [] 242 243 na_replaced = metadata_df.replace({None: np.nan}) 244 for header in dataframe_headers: 245 all_none = na_replaced[header].isna().all() 246 some_none = na_replaced[header].isna().any() 247 contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any() 248 249 # if the column contains any arrays, set it as optional since arrays cannot be required in tdr 250 if contains_array: 251 header_requirements.append({"name": header, "required": False}) 252 # if all rows are none for a given column, we set the default type to "string" type in TDR 253 elif all_none: 254 header_requirements.append({"name": header, "required": False, "data_type": "string"}) 255 # if some rows are none or all non required is set to true AND header 256 # is not primary key, we set the column to non-required 257 elif some_none or (self.all_fields_non_required and header != self.primary_key): 258 header_requirements.append({"name": header, "required": False}) 259 else: 260 header_requirements.append({"name": header, "required": True}) 261 262 return header_requirements 263 264 @staticmethod 265 def _reformat_metadata(cleaned_metadata: list[dict]) -> dict: 266 """ 267 Create a dictionary where the key is the header name, and the value is a list of all values for that header. 268 269 Args: 270 cleaned_metadata (list[dict]): The cleaned metadata. 271 272 Returns: 273 dict: A dictionary with header names as keys and lists of values as values. 274 """ 275 key_value_type_mappings = {} 276 unique_headers = {key for row in cleaned_metadata for key in row} 277 278 for header in unique_headers: 279 for row in cleaned_metadata: 280 value = row[header] 281 if header not in key_value_type_mappings: 282 key_value_type_mappings[header] = [value] 283 else: 284 key_value_type_mappings[header].append(value) 285 return key_value_type_mappings 286 287 def infer_schema(self) -> dict: 288 """ 289 Infer the schema for the table based on the input metadata. 290 291 **Returns:** 292 - dict: The inferred schema in the format expected by TDR. 293 """ 294 logging.info(f"Inferring schema for table {self.table_name}") 295 # create the dataframe 296 metadata_df = pd.DataFrame(self.input_metadata) 297 # Replace all nan with None 298 metadata_df = metadata_df.where(pd.notnull(metadata_df), None) 299 300 # find all headers that need to be renamed if they have "entity" in them and rename the headers 301 headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0] 302 metadata_df = metadata_df.rename(columns=headers_to_be_renamed) 303 304 # start by gathering the column metadata by determining which headers are required or not 305 column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns)) 306 307 # drop columns where ALL values are None, but keep rows where some values are None 308 # we keep the rows where some values are none because if we happen to have a different column that's none in 309 # every row, we could end up with no data at the end 310 all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all") 311 cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records") 312 key_value_type_mappings = self._reformat_metadata(cleaned_metadata) 313 314 # check to see if all values corresponding to a header are of the same type 315 disparate_header_info = self._check_type_consistency(key_value_type_mappings) 316 317 columns = self._format_column_metadata( 318 key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info 319 ) 320 321 # combine the information about required headers with the data types that were collected 322 for header_metadata in column_metadata: 323 matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]] 324 if matching_metadata: 325 header_metadata.update(matching_metadata[0]) 326 327 tdr_tables_json = { 328 "name": self.table_name, 329 "columns": column_metadata, 330 } 331 332 return tdr_tables_json
A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.
InferTDRSchema( input_metadata: list[dict], table_name: str, all_fields_non_required: bool = False, allow_disparate_data_types_in_column: bool = False, primary_key: Optional[str] = None)
31 def __init__( 32 self, 33 input_metadata: list[dict], 34 table_name: str, 35 all_fields_non_required: bool = False, 36 allow_disparate_data_types_in_column: bool = False, 37 primary_key: Optional[str] = None 38 ): 39 """ 40 Initialize the InferTDRSchema class. 41 42 **Args:** 43 - input_metadata (list[dict]): The input metadata to infer the schema from. 44 - table_name (str): The name of the table for which the schema is being inferred. 45 - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required 46 besides for primary key. Defaults to `False` 47 - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a 48 column to be of type `str` Defaults to `False`. 49 - primary_key (str, optional): The name of the primary key column. Used to determine if the column 50 should be required 51 """ 52 self.input_metadata = input_metadata 53 """@private""" 54 self.table_name = table_name 55 """@private""" 56 self.all_fields_non_required = all_fields_non_required 57 """@private""" 58 self.primary_key = primary_key 59 """@private""" 60 self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column 61 """@private"""
Initialize the InferTDRSchema class.
Args:
- input_metadata (list[dict]): The input metadata to infer the schema from.
- table_name (str): The name of the table for which the schema is being inferred.
- all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
besides for primary key. Defaults to
False
- allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
column to be of type
str
Defaults toFalse
. - primary_key (str, optional): The name of the primary key column. Used to determine if the column should be required
def
infer_schema(self) -> dict:
287 def infer_schema(self) -> dict: 288 """ 289 Infer the schema for the table based on the input metadata. 290 291 **Returns:** 292 - dict: The inferred schema in the format expected by TDR. 293 """ 294 logging.info(f"Inferring schema for table {self.table_name}") 295 # create the dataframe 296 metadata_df = pd.DataFrame(self.input_metadata) 297 # Replace all nan with None 298 metadata_df = metadata_df.where(pd.notnull(metadata_df), None) 299 300 # find all headers that need to be renamed if they have "entity" in them and rename the headers 301 headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0] 302 metadata_df = metadata_df.rename(columns=headers_to_be_renamed) 303 304 # start by gathering the column metadata by determining which headers are required or not 305 column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns)) 306 307 # drop columns where ALL values are None, but keep rows where some values are None 308 # we keep the rows where some values are none because if we happen to have a different column that's none in 309 # every row, we could end up with no data at the end 310 all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all") 311 cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records") 312 key_value_type_mappings = self._reformat_metadata(cleaned_metadata) 313 314 # check to see if all values corresponding to a header are of the same type 315 disparate_header_info = self._check_type_consistency(key_value_type_mappings) 316 317 columns = self._format_column_metadata( 318 key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info 319 ) 320 321 # combine the information about required headers with the data types that were collected 322 for header_metadata in column_metadata: 323 matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]] 324 if matching_metadata: 325 header_metadata.update(matching_metadata[0]) 326 327 tdr_tables_json = { 328 "name": self.table_name, 329 "columns": column_metadata, 330 } 331 332 return tdr_tables_json
Infer the schema for the table based on the input metadata.
Returns:
- dict: The inferred schema in the format expected by TDR.