ops_utils.tdr_utils.tdr_schema_utils
Utility classes for TDR schema.
1"""Utility classes for TDR schema.""" 2 3import logging 4import re 5import time 6import numpy as np 7import pandas as pd 8import math 9from datetime import date, datetime 10from typing import Any, Optional, Union 11 12 13class InferTDRSchema: 14 """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.""" 15 16 PYTHON_TDR_DATA_TYPE_MAPPING = { 17 str: "string", 18 "fileref": "fileref", 19 bool: "boolean", 20 bytes: "bytes", 21 date: "date", 22 datetime: "datetime", 23 float: "float64", 24 np.float64: "float64", 25 int: "int64", 26 np.int64: "int64", 27 time: "time", 28 } 29 """@private""" 30 31 def __init__( 32 self, 33 input_metadata: list[dict], 34 table_name: str, 35 all_fields_non_required: bool = False, 36 allow_disparate_data_types_in_column: bool = False, 37 primary_key: Optional[str] = None 38 ): 39 """ 40 Initialize the InferTDRSchema class. 41 42 **Args:** 43 - input_metadata (list[dict]): The input metadata to infer the schema from. 44 - table_name (str): The name of the table for which the schema is being inferred. 45 - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required 46 besides for primary key. Defaults to `False` 47 - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a 48 column to be of type `str` Defaults to `False`. 49 - primary_key (str, optional): The name of the primary key column. Used to determine if the column 50 should be required 51 """ 52 self.input_metadata = input_metadata 53 """@private""" 54 self.table_name = table_name 55 """@private""" 56 self.all_fields_non_required = all_fields_non_required 57 """@private""" 58 self.primary_key = primary_key 59 """@private""" 60 self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column 61 """@private""" 62 63 def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]: 64 """ 65 Check if all values for each header are of the same type. 66 67 **Args:** 68 - key_value_type_mappings (dict): A dictionary where the key is the header, 69 and the value is a list of values for the header. 70 71 Raises: 72 Exception: If types do not match for any header. 73 """ 74 matching = [] 75 76 disparate_header_info = [] 77 78 for header, values_for_header in key_value_type_mappings.items(): 79 # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring 80 # "None" entries 81 if (any(isinstance(item, list) for item in values_for_header if item is not None) and 82 not all(isinstance(item, list) for item in values_for_header if item is not None)): 83 all_values_matching = False 84 # if the row contains ONLY lists of items, check that all items in each list are of the same type (while 85 # ignoring "None" entries) 86 elif all(isinstance(item, list) for item in values_for_header if item is not None): 87 # first get all substrings that have some values 88 non_empty_substrings = [v for v in values_for_header if v] 89 if non_empty_substrings: 90 # get one "type" from the list of values 91 first_match_type = type([v[0] for v in non_empty_substrings][0]) 92 all_values_matching = all( 93 all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings 94 ) 95 else: 96 # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled 97 # below) 98 all_values_matching = True 99 else: 100 # find one value that's non-none to get the type to check against 101 # specifically check if not "None" since we can have all zeroes, for example 102 type_to_match_against = type([v for v in values_for_header if v is not None][0]) 103 # check if all the values in the list that are non-none match the type of the first entry 104 all_values_matching = all( 105 isinstance(v, type_to_match_against) for v in values_for_header if v is not None 106 ) 107 108 # If ALL rows for the header are none, force the type to be a string 109 if all_values_matching and not any(values_for_header): 110 matching.append({header: all_values_matching}) 111 disparate_header_info.append( 112 { 113 "header": header, 114 "force_to_string": True, 115 } 116 ) 117 if not all_values_matching and self.allow_disparate_data_types_in_column: 118 logging.info( 119 f"Not all data types matched for header '{header}' but forcing them to strings as " 120 f"'allow_disparate_data_types_in_column' setting is set to true" 121 ) 122 matching.append({header: True}) 123 disparate_header_info.append( 124 { 125 "header": header, 126 "force_to_string": True, 127 } 128 ) 129 else: 130 matching.append({header: all_values_matching}) # type: ignore[dict-item] 131 disparate_header_info.append( 132 { 133 "header": header, 134 "force_to_string": False, 135 } 136 ) 137 138 # Returns true if all headers are determined to be "matching" 139 problematic_headers = [ 140 d.keys() 141 for d in matching 142 if not list(d.values())[0] 143 ] 144 145 if problematic_headers: 146 raise Exception( 147 f"Not all values for the following headers are of the same type: {problematic_headers}. To force all" 148 f" values in rows of a given column to be forced to the same type and bypass this error, re-run with " 149 f"the 'force_disparate_rows_to_string' option set to true" 150 ) 151 152 return disparate_header_info 153 154 @staticmethod 155 def _interpret_number(x: Union[float, int]) -> Union[int, float]: 156 if isinstance(x, float) and x.is_integer(): 157 return int(x) 158 return x 159 160 def _determine_if_float_or_int(self, interpreted_numbers: list[Union[int, float]]) -> str: 161 # Remove NaNs before type checks 162 non_nan_numbers = [x for x in interpreted_numbers if not (isinstance(x, float) and math.isnan(x))] 163 164 # If all values are int, return int type 165 if all(isinstance(row_value, int) for row_value in non_nan_numbers): 166 return self.PYTHON_TDR_DATA_TYPE_MAPPING[int] 167 # If ANY or ALL values are float, return float type 168 return self.PYTHON_TDR_DATA_TYPE_MAPPING[float] 169 170 def _python_type_to_tdr_type_conversion(self, values_for_header: list[Any]) -> str: 171 """ 172 Convert Python data types to TDR data types. 173 174 Args: 175 values_for_header (Any): All values for a column header. 176 177 Returns: 178 str: The TDR data type. 179 """ 180 gcp_fileref_regex = "^gs://.*" 181 182 # Collect all the non-None values for the column 183 non_none_values = [v for v in values_for_header if v is not None] 184 185 # HANDLE SPECIAL CASES 186 187 # FILE REFS AND LISTS OF FILE REFS 188 # If ANY of the values for a header are of type "fileref", we assume that the column is a fileref 189 for row_value in non_none_values: 190 if isinstance(row_value, str) and re.search(pattern=gcp_fileref_regex, string=row_value): 191 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 192 193 if isinstance(row_value, list): 194 # Check for a potential array of filerefs - if ANY of the items in a list are 195 # of type "fileref" we assume that the whole column is a fileref 196 for item in row_value: 197 if isinstance(item, str) and re.search(pattern=gcp_fileref_regex, string=item): 198 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 199 200 # INTEGERS/FLOATS AND LISTS OF INTEGERS AND FLOATS 201 # Case 1: All values are plain numbers (int or float) - specifically excluding bools 202 if all(isinstance(x, (int, float)) and not isinstance(x, bool) for x in non_none_values): 203 interpreted_numbers = [self._interpret_number(row_value) for row_value in non_none_values] 204 return self._determine_if_float_or_int(interpreted_numbers) 205 206 # Case 2: Values are lists of numbers (e.g., [[1, 2], [3.1], [4]]) 207 if all(isinstance(row_value, list) for row_value in non_none_values): 208 if all( 209 all(isinstance(item, (int, float)) and not isinstance(item, bool) for item in row_value) 210 for row_value in non_none_values 211 ): 212 # Flatten the list of lists and interpret all non-None elements 213 interpreted_numbers = [self._interpret_number(item) 214 for row_value in non_none_values for item in row_value if item is not None] 215 216 return self._determine_if_float_or_int(interpreted_numbers) 217 218 # If none of the above special cases apply, use the first of the non-null values to determine the 219 # TDR data type 220 first_value = non_none_values[0] 221 if isinstance(first_value, list): 222 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value[0])] 223 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value)] 224 225 def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]: 226 """ 227 Generate the metadata for each column's header name, data type, and whether it's an array of values. 228 229 Args: 230 key_value_type_mappings (dict): A dictionary where the key is the header, 231 and the value is a list of values for the header. 232 233 Returns: 234 list[dict]: A list of dictionaries containing column metadata. 235 """ 236 columns = [] 237 238 for header, values_for_header in key_value_type_mappings.items(): 239 force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0] 240 241 # if the ANY of the values for a given header is a list, we assume that column contains arrays of values 242 array_of = True if any(isinstance(v, list) for v in values_for_header) else False 243 244 if force_to_string: 245 logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column") 246 data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str] 247 else: 248 # Use all existing values for the header to determine the data type 249 data_type = self._python_type_to_tdr_type_conversion(values_for_header) 250 251 column_metadata = { 252 "name": header, 253 "datatype": data_type, 254 "array_of": array_of, 255 } 256 columns.append(column_metadata) 257 258 return columns 259 260 def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]: 261 """ 262 Determine whether each header is required or not. 263 264 Args: 265 metadata_df (Any): The original dataframe. 266 dataframe_headers (list[str]): A list of dataframe headers. 267 268 Returns: 269 list[dict]: A list of dictionaries containing header requirements. 270 """ 271 header_requirements = [] 272 273 na_replaced = metadata_df.replace({None: np.nan}) 274 for header in dataframe_headers: 275 all_none = na_replaced[header].isna().all() 276 some_none = na_replaced[header].isna().any() 277 contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any() 278 279 # if the column contains any arrays, set it as optional since arrays cannot be required in tdr 280 if contains_array: 281 header_requirements.append({"name": header, "required": False}) 282 # if all rows are none for a given column, we set the default type to "string" type in TDR 283 elif all_none: 284 header_requirements.append({"name": header, "required": False, "data_type": "string"}) 285 # if some rows are none or all non required is set to true AND header 286 # is not primary key, we set the column to non-required 287 elif some_none or (self.all_fields_non_required and header != self.primary_key): 288 header_requirements.append({"name": header, "required": False}) 289 else: 290 header_requirements.append({"name": header, "required": True}) 291 292 return header_requirements 293 294 @staticmethod 295 def _reformat_metadata(cleaned_metadata: list[dict]) -> dict: 296 """ 297 Create a dictionary where the key is the header name, and the value is a list of all values for that header. 298 299 Args: 300 cleaned_metadata (list[dict]): The cleaned metadata. 301 302 Returns: 303 dict: A dictionary with header names as keys and lists of values as values. 304 """ 305 key_value_type_mappings = {} 306 unique_headers = {key for row in cleaned_metadata for key in row} 307 308 for header in unique_headers: 309 for row in cleaned_metadata: 310 value = row[header] 311 if header not in key_value_type_mappings: 312 key_value_type_mappings[header] = [value] 313 else: 314 key_value_type_mappings[header].append(value) 315 return key_value_type_mappings 316 317 def infer_schema(self) -> dict: 318 """ 319 Infer the schema for the table based on the input metadata. 320 321 **Returns:** 322 - dict: The inferred schema in the format expected by TDR. 323 """ 324 logging.info(f"Inferring schema for table {self.table_name}") 325 # create the dataframe 326 metadata_df = pd.DataFrame(self.input_metadata) 327 # Replace all nan with None 328 metadata_df = metadata_df.where(pd.notnull(metadata_df), None) 329 330 # find all headers that need to be renamed if they have "entity" in them and rename the headers 331 headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0] 332 metadata_df = metadata_df.rename(columns=headers_to_be_renamed) 333 334 # start by gathering the column metadata by determining which headers are required or not 335 column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns)) 336 337 # drop columns where ALL values are None, but keep rows where some values are None 338 # we keep the rows where some values are none because if we happen to have a different column that's none in 339 # every row, we could end up with no data at the end 340 all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all") 341 cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records") 342 key_value_type_mappings = self._reformat_metadata(cleaned_metadata) 343 344 # check to see if all values corresponding to a header are of the same type 345 disparate_header_info = self._check_type_consistency(key_value_type_mappings) 346 347 columns = self._format_column_metadata( 348 key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info 349 ) 350 351 # combine the information about required headers with the data types that were collected 352 for header_metadata in column_metadata: 353 matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]] 354 if matching_metadata: 355 header_metadata.update(matching_metadata[0]) 356 357 tdr_tables_json = { 358 "name": self.table_name, 359 "columns": column_metadata, 360 } 361 362 return tdr_tables_json
class
InferTDRSchema:
14class InferTDRSchema: 15 """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.""" 16 17 PYTHON_TDR_DATA_TYPE_MAPPING = { 18 str: "string", 19 "fileref": "fileref", 20 bool: "boolean", 21 bytes: "bytes", 22 date: "date", 23 datetime: "datetime", 24 float: "float64", 25 np.float64: "float64", 26 int: "int64", 27 np.int64: "int64", 28 time: "time", 29 } 30 """@private""" 31 32 def __init__( 33 self, 34 input_metadata: list[dict], 35 table_name: str, 36 all_fields_non_required: bool = False, 37 allow_disparate_data_types_in_column: bool = False, 38 primary_key: Optional[str] = None 39 ): 40 """ 41 Initialize the InferTDRSchema class. 42 43 **Args:** 44 - input_metadata (list[dict]): The input metadata to infer the schema from. 45 - table_name (str): The name of the table for which the schema is being inferred. 46 - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required 47 besides for primary key. Defaults to `False` 48 - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a 49 column to be of type `str` Defaults to `False`. 50 - primary_key (str, optional): The name of the primary key column. Used to determine if the column 51 should be required 52 """ 53 self.input_metadata = input_metadata 54 """@private""" 55 self.table_name = table_name 56 """@private""" 57 self.all_fields_non_required = all_fields_non_required 58 """@private""" 59 self.primary_key = primary_key 60 """@private""" 61 self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column 62 """@private""" 63 64 def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]: 65 """ 66 Check if all values for each header are of the same type. 67 68 **Args:** 69 - key_value_type_mappings (dict): A dictionary where the key is the header, 70 and the value is a list of values for the header. 71 72 Raises: 73 Exception: If types do not match for any header. 74 """ 75 matching = [] 76 77 disparate_header_info = [] 78 79 for header, values_for_header in key_value_type_mappings.items(): 80 # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring 81 # "None" entries 82 if (any(isinstance(item, list) for item in values_for_header if item is not None) and 83 not all(isinstance(item, list) for item in values_for_header if item is not None)): 84 all_values_matching = False 85 # if the row contains ONLY lists of items, check that all items in each list are of the same type (while 86 # ignoring "None" entries) 87 elif all(isinstance(item, list) for item in values_for_header if item is not None): 88 # first get all substrings that have some values 89 non_empty_substrings = [v for v in values_for_header if v] 90 if non_empty_substrings: 91 # get one "type" from the list of values 92 first_match_type = type([v[0] for v in non_empty_substrings][0]) 93 all_values_matching = all( 94 all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings 95 ) 96 else: 97 # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled 98 # below) 99 all_values_matching = True 100 else: 101 # find one value that's non-none to get the type to check against 102 # specifically check if not "None" since we can have all zeroes, for example 103 type_to_match_against = type([v for v in values_for_header if v is not None][0]) 104 # check if all the values in the list that are non-none match the type of the first entry 105 all_values_matching = all( 106 isinstance(v, type_to_match_against) for v in values_for_header if v is not None 107 ) 108 109 # If ALL rows for the header are none, force the type to be a string 110 if all_values_matching and not any(values_for_header): 111 matching.append({header: all_values_matching}) 112 disparate_header_info.append( 113 { 114 "header": header, 115 "force_to_string": True, 116 } 117 ) 118 if not all_values_matching and self.allow_disparate_data_types_in_column: 119 logging.info( 120 f"Not all data types matched for header '{header}' but forcing them to strings as " 121 f"'allow_disparate_data_types_in_column' setting is set to true" 122 ) 123 matching.append({header: True}) 124 disparate_header_info.append( 125 { 126 "header": header, 127 "force_to_string": True, 128 } 129 ) 130 else: 131 matching.append({header: all_values_matching}) # type: ignore[dict-item] 132 disparate_header_info.append( 133 { 134 "header": header, 135 "force_to_string": False, 136 } 137 ) 138 139 # Returns true if all headers are determined to be "matching" 140 problematic_headers = [ 141 d.keys() 142 for d in matching 143 if not list(d.values())[0] 144 ] 145 146 if problematic_headers: 147 raise Exception( 148 f"Not all values for the following headers are of the same type: {problematic_headers}. To force all" 149 f" values in rows of a given column to be forced to the same type and bypass this error, re-run with " 150 f"the 'force_disparate_rows_to_string' option set to true" 151 ) 152 153 return disparate_header_info 154 155 @staticmethod 156 def _interpret_number(x: Union[float, int]) -> Union[int, float]: 157 if isinstance(x, float) and x.is_integer(): 158 return int(x) 159 return x 160 161 def _determine_if_float_or_int(self, interpreted_numbers: list[Union[int, float]]) -> str: 162 # Remove NaNs before type checks 163 non_nan_numbers = [x for x in interpreted_numbers if not (isinstance(x, float) and math.isnan(x))] 164 165 # If all values are int, return int type 166 if all(isinstance(row_value, int) for row_value in non_nan_numbers): 167 return self.PYTHON_TDR_DATA_TYPE_MAPPING[int] 168 # If ANY or ALL values are float, return float type 169 return self.PYTHON_TDR_DATA_TYPE_MAPPING[float] 170 171 def _python_type_to_tdr_type_conversion(self, values_for_header: list[Any]) -> str: 172 """ 173 Convert Python data types to TDR data types. 174 175 Args: 176 values_for_header (Any): All values for a column header. 177 178 Returns: 179 str: The TDR data type. 180 """ 181 gcp_fileref_regex = "^gs://.*" 182 183 # Collect all the non-None values for the column 184 non_none_values = [v for v in values_for_header if v is not None] 185 186 # HANDLE SPECIAL CASES 187 188 # FILE REFS AND LISTS OF FILE REFS 189 # If ANY of the values for a header are of type "fileref", we assume that the column is a fileref 190 for row_value in non_none_values: 191 if isinstance(row_value, str) and re.search(pattern=gcp_fileref_regex, string=row_value): 192 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 193 194 if isinstance(row_value, list): 195 # Check for a potential array of filerefs - if ANY of the items in a list are 196 # of type "fileref" we assume that the whole column is a fileref 197 for item in row_value: 198 if isinstance(item, str) and re.search(pattern=gcp_fileref_regex, string=item): 199 return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"] 200 201 # INTEGERS/FLOATS AND LISTS OF INTEGERS AND FLOATS 202 # Case 1: All values are plain numbers (int or float) - specifically excluding bools 203 if all(isinstance(x, (int, float)) and not isinstance(x, bool) for x in non_none_values): 204 interpreted_numbers = [self._interpret_number(row_value) for row_value in non_none_values] 205 return self._determine_if_float_or_int(interpreted_numbers) 206 207 # Case 2: Values are lists of numbers (e.g., [[1, 2], [3.1], [4]]) 208 if all(isinstance(row_value, list) for row_value in non_none_values): 209 if all( 210 all(isinstance(item, (int, float)) and not isinstance(item, bool) for item in row_value) 211 for row_value in non_none_values 212 ): 213 # Flatten the list of lists and interpret all non-None elements 214 interpreted_numbers = [self._interpret_number(item) 215 for row_value in non_none_values for item in row_value if item is not None] 216 217 return self._determine_if_float_or_int(interpreted_numbers) 218 219 # If none of the above special cases apply, use the first of the non-null values to determine the 220 # TDR data type 221 first_value = non_none_values[0] 222 if isinstance(first_value, list): 223 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value[0])] 224 return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value)] 225 226 def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]: 227 """ 228 Generate the metadata for each column's header name, data type, and whether it's an array of values. 229 230 Args: 231 key_value_type_mappings (dict): A dictionary where the key is the header, 232 and the value is a list of values for the header. 233 234 Returns: 235 list[dict]: A list of dictionaries containing column metadata. 236 """ 237 columns = [] 238 239 for header, values_for_header in key_value_type_mappings.items(): 240 force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0] 241 242 # if the ANY of the values for a given header is a list, we assume that column contains arrays of values 243 array_of = True if any(isinstance(v, list) for v in values_for_header) else False 244 245 if force_to_string: 246 logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column") 247 data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str] 248 else: 249 # Use all existing values for the header to determine the data type 250 data_type = self._python_type_to_tdr_type_conversion(values_for_header) 251 252 column_metadata = { 253 "name": header, 254 "datatype": data_type, 255 "array_of": array_of, 256 } 257 columns.append(column_metadata) 258 259 return columns 260 261 def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]: 262 """ 263 Determine whether each header is required or not. 264 265 Args: 266 metadata_df (Any): The original dataframe. 267 dataframe_headers (list[str]): A list of dataframe headers. 268 269 Returns: 270 list[dict]: A list of dictionaries containing header requirements. 271 """ 272 header_requirements = [] 273 274 na_replaced = metadata_df.replace({None: np.nan}) 275 for header in dataframe_headers: 276 all_none = na_replaced[header].isna().all() 277 some_none = na_replaced[header].isna().any() 278 contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any() 279 280 # if the column contains any arrays, set it as optional since arrays cannot be required in tdr 281 if contains_array: 282 header_requirements.append({"name": header, "required": False}) 283 # if all rows are none for a given column, we set the default type to "string" type in TDR 284 elif all_none: 285 header_requirements.append({"name": header, "required": False, "data_type": "string"}) 286 # if some rows are none or all non required is set to true AND header 287 # is not primary key, we set the column to non-required 288 elif some_none or (self.all_fields_non_required and header != self.primary_key): 289 header_requirements.append({"name": header, "required": False}) 290 else: 291 header_requirements.append({"name": header, "required": True}) 292 293 return header_requirements 294 295 @staticmethod 296 def _reformat_metadata(cleaned_metadata: list[dict]) -> dict: 297 """ 298 Create a dictionary where the key is the header name, and the value is a list of all values for that header. 299 300 Args: 301 cleaned_metadata (list[dict]): The cleaned metadata. 302 303 Returns: 304 dict: A dictionary with header names as keys and lists of values as values. 305 """ 306 key_value_type_mappings = {} 307 unique_headers = {key for row in cleaned_metadata for key in row} 308 309 for header in unique_headers: 310 for row in cleaned_metadata: 311 value = row[header] 312 if header not in key_value_type_mappings: 313 key_value_type_mappings[header] = [value] 314 else: 315 key_value_type_mappings[header].append(value) 316 return key_value_type_mappings 317 318 def infer_schema(self) -> dict: 319 """ 320 Infer the schema for the table based on the input metadata. 321 322 **Returns:** 323 - dict: The inferred schema in the format expected by TDR. 324 """ 325 logging.info(f"Inferring schema for table {self.table_name}") 326 # create the dataframe 327 metadata_df = pd.DataFrame(self.input_metadata) 328 # Replace all nan with None 329 metadata_df = metadata_df.where(pd.notnull(metadata_df), None) 330 331 # find all headers that need to be renamed if they have "entity" in them and rename the headers 332 headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0] 333 metadata_df = metadata_df.rename(columns=headers_to_be_renamed) 334 335 # start by gathering the column metadata by determining which headers are required or not 336 column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns)) 337 338 # drop columns where ALL values are None, but keep rows where some values are None 339 # we keep the rows where some values are none because if we happen to have a different column that's none in 340 # every row, we could end up with no data at the end 341 all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all") 342 cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records") 343 key_value_type_mappings = self._reformat_metadata(cleaned_metadata) 344 345 # check to see if all values corresponding to a header are of the same type 346 disparate_header_info = self._check_type_consistency(key_value_type_mappings) 347 348 columns = self._format_column_metadata( 349 key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info 350 ) 351 352 # combine the information about required headers with the data types that were collected 353 for header_metadata in column_metadata: 354 matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]] 355 if matching_metadata: 356 header_metadata.update(matching_metadata[0]) 357 358 tdr_tables_json = { 359 "name": self.table_name, 360 "columns": column_metadata, 361 } 362 363 return tdr_tables_json
A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.
InferTDRSchema( input_metadata: list[dict], table_name: str, all_fields_non_required: bool = False, allow_disparate_data_types_in_column: bool = False, primary_key: Optional[str] = None)
32 def __init__( 33 self, 34 input_metadata: list[dict], 35 table_name: str, 36 all_fields_non_required: bool = False, 37 allow_disparate_data_types_in_column: bool = False, 38 primary_key: Optional[str] = None 39 ): 40 """ 41 Initialize the InferTDRSchema class. 42 43 **Args:** 44 - input_metadata (list[dict]): The input metadata to infer the schema from. 45 - table_name (str): The name of the table for which the schema is being inferred. 46 - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required 47 besides for primary key. Defaults to `False` 48 - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a 49 column to be of type `str` Defaults to `False`. 50 - primary_key (str, optional): The name of the primary key column. Used to determine if the column 51 should be required 52 """ 53 self.input_metadata = input_metadata 54 """@private""" 55 self.table_name = table_name 56 """@private""" 57 self.all_fields_non_required = all_fields_non_required 58 """@private""" 59 self.primary_key = primary_key 60 """@private""" 61 self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column 62 """@private"""
Initialize the InferTDRSchema class.
Args:
- input_metadata (list[dict]): The input metadata to infer the schema from.
- table_name (str): The name of the table for which the schema is being inferred.
- all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
besides for primary key. Defaults to
False
- allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
column to be of type
str
Defaults toFalse
. - primary_key (str, optional): The name of the primary key column. Used to determine if the column should be required
def
infer_schema(self) -> dict:
318 def infer_schema(self) -> dict: 319 """ 320 Infer the schema for the table based on the input metadata. 321 322 **Returns:** 323 - dict: The inferred schema in the format expected by TDR. 324 """ 325 logging.info(f"Inferring schema for table {self.table_name}") 326 # create the dataframe 327 metadata_df = pd.DataFrame(self.input_metadata) 328 # Replace all nan with None 329 metadata_df = metadata_df.where(pd.notnull(metadata_df), None) 330 331 # find all headers that need to be renamed if they have "entity" in them and rename the headers 332 headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0] 333 metadata_df = metadata_df.rename(columns=headers_to_be_renamed) 334 335 # start by gathering the column metadata by determining which headers are required or not 336 column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns)) 337 338 # drop columns where ALL values are None, but keep rows where some values are None 339 # we keep the rows where some values are none because if we happen to have a different column that's none in 340 # every row, we could end up with no data at the end 341 all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all") 342 cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records") 343 key_value_type_mappings = self._reformat_metadata(cleaned_metadata) 344 345 # check to see if all values corresponding to a header are of the same type 346 disparate_header_info = self._check_type_consistency(key_value_type_mappings) 347 348 columns = self._format_column_metadata( 349 key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info 350 ) 351 352 # combine the information about required headers with the data types that were collected 353 for header_metadata in column_metadata: 354 matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]] 355 if matching_metadata: 356 header_metadata.update(matching_metadata[0]) 357 358 tdr_tables_json = { 359 "name": self.table_name, 360 "columns": column_metadata, 361 } 362 363 return tdr_tables_json
Infer the schema for the table based on the input metadata.
Returns:
- dict: The inferred schema in the format expected by TDR.