ops_utils.tdr_utils.tdr_schema_utils

Utility classes for TDR schema.

  1"""Utility classes for TDR schema."""
  2
  3import logging
  4import re
  5import time
  6import numpy as np
  7import pandas as pd
  8import math
  9from datetime import date, datetime
 10from typing import Any, Optional, Union
 11
 12
 13class InferTDRSchema:
 14    """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata."""
 15
 16    PYTHON_TDR_DATA_TYPE_MAPPING = {
 17        str: "string",
 18        "fileref": "fileref",
 19        bool: "boolean",
 20        bytes: "bytes",
 21        date: "date",
 22        datetime: "datetime",
 23        float: "float64",
 24        np.float64: "float64",
 25        int: "int64",
 26        np.int64: "int64",
 27        time: "time",
 28    }
 29    """@private"""
 30
 31    def __init__(
 32            self,
 33            input_metadata: list[dict],
 34            table_name: str,
 35            all_fields_non_required: bool = False,
 36            allow_disparate_data_types_in_column: bool = False,
 37            primary_key: Optional[str] = None
 38    ):
 39        """
 40        Initialize the InferTDRSchema class.
 41
 42        **Args:**
 43        - input_metadata (list[dict]): The input metadata to infer the schema from.
 44        - table_name (str): The name of the table for which the schema is being inferred.
 45        - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
 46                besides for primary key. Defaults to `False`
 47        - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
 48                column to be of type `str` Defaults to `False`.
 49        - primary_key (str, optional): The name of the primary key column. Used to determine if the column
 50            should be required
 51        """
 52        self.input_metadata = input_metadata
 53        """@private"""
 54        self.table_name = table_name
 55        """@private"""
 56        self.all_fields_non_required = all_fields_non_required
 57        """@private"""
 58        self.primary_key = primary_key
 59        """@private"""
 60        self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column
 61        """@private"""
 62
 63    def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]:
 64        """
 65        Check if all values for each header are of the same type.
 66
 67        **Args:**
 68        - key_value_type_mappings (dict): A dictionary where the key is the header,
 69                and the value is a list of values for the header.
 70
 71        Raises:
 72            Exception: If types do not match for any header.
 73        """
 74        matching = []
 75
 76        disparate_header_info = []
 77
 78        for header, values_for_header in key_value_type_mappings.items():
 79            # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring
 80            # "None" entries
 81            if (any(isinstance(item, list) for item in values_for_header if item is not None) and
 82                    not all(isinstance(item, list) for item in values_for_header if item is not None)):
 83                all_values_matching = False
 84            # if the row contains ONLY lists of items, check that all items in each list are of the same type (while
 85            # ignoring "None" entries)
 86            elif all(isinstance(item, list) for item in values_for_header if item is not None):
 87                # first get all substrings that have some values
 88                non_empty_substrings = [v for v in values_for_header if v]
 89                if non_empty_substrings:
 90                    # get one "type" from the list of values
 91                    first_match_type = type([v[0] for v in non_empty_substrings][0])
 92                    all_values_matching = all(
 93                        all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings
 94                    )
 95                else:
 96                    # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled
 97                    # below)
 98                    all_values_matching = True
 99            else:
100                # find one value that's non-none to get the type to check against
101                # specifically check if not "None" since we can have all zeroes, for example
102                type_to_match_against = type([v for v in values_for_header if v is not None][0])
103                # check if all the values in the list that are non-none match the type of the first entry
104                all_values_matching = all(
105                    isinstance(v, type_to_match_against) for v in values_for_header if v is not None
106                )
107
108            # If ALL rows for the header are none, force the type to be a string
109            if all_values_matching and not any(values_for_header):
110                matching.append({header: all_values_matching})
111                disparate_header_info.append(
112                    {
113                        "header": header,
114                        "force_to_string": True,
115                    }
116                )
117            if not all_values_matching and self.allow_disparate_data_types_in_column:
118                logging.info(
119                    f"Not all data types matched for header '{header}' but forcing them to strings as "
120                    f"'allow_disparate_data_types_in_column' setting is set to true"
121                )
122                matching.append({header: True})
123                disparate_header_info.append(
124                    {
125                        "header": header,
126                        "force_to_string": True,
127                    }
128                )
129            else:
130                matching.append({header: all_values_matching})  # type: ignore[dict-item]
131                disparate_header_info.append(
132                    {
133                        "header": header,
134                        "force_to_string": False,
135                    }
136                )
137
138        # Returns true if all headers are determined to be "matching"
139        problematic_headers = [
140            d.keys()
141            for d in matching
142            if not list(d.values())[0]
143        ]
144
145        if problematic_headers:
146            raise Exception(
147                f"Not all values for the following headers are of the same type: {problematic_headers}. To force all"
148                f" values in rows of a given column to be forced to the same type and bypass this error, re-run with "
149                f"the 'force_disparate_rows_to_string' option set to true"
150            )
151
152        return disparate_header_info
153
154    @staticmethod
155    def _interpret_number(x: Union[float, int]) -> Union[int, float]:
156        if isinstance(x, float) and x.is_integer():
157            return int(x)
158        return x
159
160    def _determine_if_float_or_int(self, interpreted_numbers: list[Union[int, float]]) -> str:
161        # Remove NaNs before type checks
162        non_nan_numbers = [x for x in interpreted_numbers if not (isinstance(x, float) and math.isnan(x))]
163
164        # If all values are int, return int type
165        if all(isinstance(row_value, int) for row_value in non_nan_numbers):
166            return self.PYTHON_TDR_DATA_TYPE_MAPPING[int]
167        # If ANY or ALL values are float, return float type
168        return self.PYTHON_TDR_DATA_TYPE_MAPPING[float]
169
170    def _python_type_to_tdr_type_conversion(self, values_for_header: list[Any]) -> str:
171        """
172        Convert Python data types to TDR data types.
173
174        Args:
175            values_for_header (Any): All values for a column header.
176
177        Returns:
178            str: The TDR data type.
179        """
180        gcp_fileref_regex = "^gs://.*"
181
182        # Collect all the non-None values for the column
183        non_none_values = [v for v in values_for_header if v is not None]
184
185        # HANDLE SPECIAL CASES
186
187        # FILE REFS AND LISTS OF FILE REFS
188        # If ANY of the values for a header are of type "fileref", we assume that the column is a fileref
189        for row_value in non_none_values:
190            if isinstance(row_value, str) and re.search(pattern=gcp_fileref_regex, string=row_value):
191                return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
192
193            if isinstance(row_value, list):
194                # Check for a potential array of filerefs - if ANY of the items in a list are
195                # of type "fileref" we assume that the whole column is a fileref
196                for item in row_value:
197                    if isinstance(item, str) and re.search(pattern=gcp_fileref_regex, string=item):
198                        return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
199
200        # INTEGERS/FLOATS AND LISTS OF INTEGERS AND FLOATS
201        # Case 1: All values are plain numbers (int or float) - specifically excluding bools
202        if all(isinstance(x, (int, float)) and not isinstance(x, bool) for x in non_none_values):
203            interpreted_numbers = [self._interpret_number(row_value) for row_value in non_none_values]
204            return self._determine_if_float_or_int(interpreted_numbers)
205
206        # Case 2: Values are lists of numbers (e.g., [[1, 2], [3.1], [4]])
207        if all(isinstance(row_value, list) for row_value in non_none_values):
208            if all(
209                    all(isinstance(item, (int, float)) and not isinstance(item, bool) for item in row_value)
210                    for row_value in non_none_values
211            ):
212                # Flatten the list of lists and interpret all non-None elements
213                interpreted_numbers = [self._interpret_number(item)
214                                       for row_value in non_none_values for item in row_value if item is not None]
215
216                return self._determine_if_float_or_int(interpreted_numbers)
217
218        # If none of the above special cases apply, use the first of the non-null values to determine the
219        # TDR data type
220        first_value = non_none_values[0]
221        if isinstance(first_value, list):
222            return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value[0])]
223        return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value)]
224
225    def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]:
226        """
227        Generate the metadata for each column's header name, data type, and whether it's an array of values.
228
229        Args:
230            key_value_type_mappings (dict): A dictionary where the key is the header,
231                and the value is a list of values for the header.
232
233        Returns:
234            list[dict]: A list of dictionaries containing column metadata.
235        """
236        columns = []
237
238        for header, values_for_header in key_value_type_mappings.items():
239            force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0]
240
241            # if the ANY of the values for a given header is a list, we assume that column contains arrays of values
242            array_of = True if any(isinstance(v, list) for v in values_for_header) else False
243
244            if force_to_string:
245                logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column")
246                data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str]
247            else:
248                # Use all existing values for the header to determine the data type
249                data_type = self._python_type_to_tdr_type_conversion(values_for_header)
250
251            column_metadata = {
252                "name": header,
253                "datatype": data_type,
254                "array_of": array_of,
255            }
256            columns.append(column_metadata)
257
258        return columns
259
260    def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]:
261        """
262        Determine whether each header is required or not.
263
264        Args:
265            metadata_df (Any): The original dataframe.
266            dataframe_headers (list[str]): A list of dataframe headers.
267
268        Returns:
269            list[dict]: A list of dictionaries containing header requirements.
270        """
271        header_requirements = []
272
273        na_replaced = metadata_df.replace({None: np.nan})
274        for header in dataframe_headers:
275            all_none = na_replaced[header].isna().all()
276            some_none = na_replaced[header].isna().any()
277            contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any()
278
279            # if the column contains any arrays, set it as optional since arrays cannot be required in tdr
280            if contains_array:
281                header_requirements.append({"name": header, "required": False})
282            # if all rows are none for a given column, we set the default type to "string" type in TDR
283            elif all_none:
284                header_requirements.append({"name": header, "required": False, "data_type": "string"})
285            # if some rows are none or all non required is set to true AND header
286            # is not primary key, we set the column to non-required
287            elif some_none or (self.all_fields_non_required and header != self.primary_key):
288                header_requirements.append({"name": header, "required": False})
289            else:
290                header_requirements.append({"name": header, "required": True})
291
292        return header_requirements
293
294    @staticmethod
295    def _reformat_metadata(cleaned_metadata: list[dict]) -> dict:
296        """
297        Create a dictionary where the key is the header name, and the value is a list of all values for that header.
298
299        Args:
300            cleaned_metadata (list[dict]): The cleaned metadata.
301
302        Returns:
303            dict: A dictionary with header names as keys and lists of values as values.
304        """
305        key_value_type_mappings = {}
306        unique_headers = {key for row in cleaned_metadata for key in row}
307
308        for header in unique_headers:
309            for row in cleaned_metadata:
310                value = row[header]
311                if header not in key_value_type_mappings:
312                    key_value_type_mappings[header] = [value]
313                else:
314                    key_value_type_mappings[header].append(value)
315        return key_value_type_mappings
316
317    def infer_schema(self) -> dict:
318        """
319        Infer the schema for the table based on the input metadata.
320
321        **Returns:**
322        - dict: The inferred schema in the format expected by TDR.
323        """
324        logging.info(f"Inferring schema for table {self.table_name}")
325        # create the dataframe
326        metadata_df = pd.DataFrame(self.input_metadata)
327        # Replace all nan with None
328        metadata_df = metadata_df.where(pd.notnull(metadata_df), None)
329
330        # find all headers that need to be renamed if they have "entity" in them and rename the headers
331        headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0]
332        metadata_df = metadata_df.rename(columns=headers_to_be_renamed)
333
334        # start by gathering the column metadata by determining which headers are required or not
335        column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns))
336
337        # drop columns where ALL values are None, but keep rows where some values are None
338        # we keep the rows where some values are none because if we happen to have a different column that's none in
339        # every row, we could end up with no data at the end
340        all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all")
341        cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records")
342        key_value_type_mappings = self._reformat_metadata(cleaned_metadata)
343
344        # check to see if all values corresponding to a header are of the same type
345        disparate_header_info = self._check_type_consistency(key_value_type_mappings)
346
347        columns = self._format_column_metadata(
348            key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info
349        )
350
351        # combine the information about required headers with the data types that were collected
352        for header_metadata in column_metadata:
353            matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]]
354            if matching_metadata:
355                header_metadata.update(matching_metadata[0])
356
357        tdr_tables_json = {
358            "name": self.table_name,
359            "columns": column_metadata,
360        }
361
362        return tdr_tables_json
class InferTDRSchema:
 14class InferTDRSchema:
 15    """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata."""
 16
 17    PYTHON_TDR_DATA_TYPE_MAPPING = {
 18        str: "string",
 19        "fileref": "fileref",
 20        bool: "boolean",
 21        bytes: "bytes",
 22        date: "date",
 23        datetime: "datetime",
 24        float: "float64",
 25        np.float64: "float64",
 26        int: "int64",
 27        np.int64: "int64",
 28        time: "time",
 29    }
 30    """@private"""
 31
 32    def __init__(
 33            self,
 34            input_metadata: list[dict],
 35            table_name: str,
 36            all_fields_non_required: bool = False,
 37            allow_disparate_data_types_in_column: bool = False,
 38            primary_key: Optional[str] = None
 39    ):
 40        """
 41        Initialize the InferTDRSchema class.
 42
 43        **Args:**
 44        - input_metadata (list[dict]): The input metadata to infer the schema from.
 45        - table_name (str): The name of the table for which the schema is being inferred.
 46        - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
 47                besides for primary key. Defaults to `False`
 48        - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
 49                column to be of type `str` Defaults to `False`.
 50        - primary_key (str, optional): The name of the primary key column. Used to determine if the column
 51            should be required
 52        """
 53        self.input_metadata = input_metadata
 54        """@private"""
 55        self.table_name = table_name
 56        """@private"""
 57        self.all_fields_non_required = all_fields_non_required
 58        """@private"""
 59        self.primary_key = primary_key
 60        """@private"""
 61        self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column
 62        """@private"""
 63
 64    def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]:
 65        """
 66        Check if all values for each header are of the same type.
 67
 68        **Args:**
 69        - key_value_type_mappings (dict): A dictionary where the key is the header,
 70                and the value is a list of values for the header.
 71
 72        Raises:
 73            Exception: If types do not match for any header.
 74        """
 75        matching = []
 76
 77        disparate_header_info = []
 78
 79        for header, values_for_header in key_value_type_mappings.items():
 80            # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring
 81            # "None" entries
 82            if (any(isinstance(item, list) for item in values_for_header if item is not None) and
 83                    not all(isinstance(item, list) for item in values_for_header if item is not None)):
 84                all_values_matching = False
 85            # if the row contains ONLY lists of items, check that all items in each list are of the same type (while
 86            # ignoring "None" entries)
 87            elif all(isinstance(item, list) for item in values_for_header if item is not None):
 88                # first get all substrings that have some values
 89                non_empty_substrings = [v for v in values_for_header if v]
 90                if non_empty_substrings:
 91                    # get one "type" from the list of values
 92                    first_match_type = type([v[0] for v in non_empty_substrings][0])
 93                    all_values_matching = all(
 94                        all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings
 95                    )
 96                else:
 97                    # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled
 98                    # below)
 99                    all_values_matching = True
100            else:
101                # find one value that's non-none to get the type to check against
102                # specifically check if not "None" since we can have all zeroes, for example
103                type_to_match_against = type([v for v in values_for_header if v is not None][0])
104                # check if all the values in the list that are non-none match the type of the first entry
105                all_values_matching = all(
106                    isinstance(v, type_to_match_against) for v in values_for_header if v is not None
107                )
108
109            # If ALL rows for the header are none, force the type to be a string
110            if all_values_matching and not any(values_for_header):
111                matching.append({header: all_values_matching})
112                disparate_header_info.append(
113                    {
114                        "header": header,
115                        "force_to_string": True,
116                    }
117                )
118            if not all_values_matching and self.allow_disparate_data_types_in_column:
119                logging.info(
120                    f"Not all data types matched for header '{header}' but forcing them to strings as "
121                    f"'allow_disparate_data_types_in_column' setting is set to true"
122                )
123                matching.append({header: True})
124                disparate_header_info.append(
125                    {
126                        "header": header,
127                        "force_to_string": True,
128                    }
129                )
130            else:
131                matching.append({header: all_values_matching})  # type: ignore[dict-item]
132                disparate_header_info.append(
133                    {
134                        "header": header,
135                        "force_to_string": False,
136                    }
137                )
138
139        # Returns true if all headers are determined to be "matching"
140        problematic_headers = [
141            d.keys()
142            for d in matching
143            if not list(d.values())[0]
144        ]
145
146        if problematic_headers:
147            raise Exception(
148                f"Not all values for the following headers are of the same type: {problematic_headers}. To force all"
149                f" values in rows of a given column to be forced to the same type and bypass this error, re-run with "
150                f"the 'force_disparate_rows_to_string' option set to true"
151            )
152
153        return disparate_header_info
154
155    @staticmethod
156    def _interpret_number(x: Union[float, int]) -> Union[int, float]:
157        if isinstance(x, float) and x.is_integer():
158            return int(x)
159        return x
160
161    def _determine_if_float_or_int(self, interpreted_numbers: list[Union[int, float]]) -> str:
162        # Remove NaNs before type checks
163        non_nan_numbers = [x for x in interpreted_numbers if not (isinstance(x, float) and math.isnan(x))]
164
165        # If all values are int, return int type
166        if all(isinstance(row_value, int) for row_value in non_nan_numbers):
167            return self.PYTHON_TDR_DATA_TYPE_MAPPING[int]
168        # If ANY or ALL values are float, return float type
169        return self.PYTHON_TDR_DATA_TYPE_MAPPING[float]
170
171    def _python_type_to_tdr_type_conversion(self, values_for_header: list[Any]) -> str:
172        """
173        Convert Python data types to TDR data types.
174
175        Args:
176            values_for_header (Any): All values for a column header.
177
178        Returns:
179            str: The TDR data type.
180        """
181        gcp_fileref_regex = "^gs://.*"
182
183        # Collect all the non-None values for the column
184        non_none_values = [v for v in values_for_header if v is not None]
185
186        # HANDLE SPECIAL CASES
187
188        # FILE REFS AND LISTS OF FILE REFS
189        # If ANY of the values for a header are of type "fileref", we assume that the column is a fileref
190        for row_value in non_none_values:
191            if isinstance(row_value, str) and re.search(pattern=gcp_fileref_regex, string=row_value):
192                return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
193
194            if isinstance(row_value, list):
195                # Check for a potential array of filerefs - if ANY of the items in a list are
196                # of type "fileref" we assume that the whole column is a fileref
197                for item in row_value:
198                    if isinstance(item, str) and re.search(pattern=gcp_fileref_regex, string=item):
199                        return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
200
201        # INTEGERS/FLOATS AND LISTS OF INTEGERS AND FLOATS
202        # Case 1: All values are plain numbers (int or float) - specifically excluding bools
203        if all(isinstance(x, (int, float)) and not isinstance(x, bool) for x in non_none_values):
204            interpreted_numbers = [self._interpret_number(row_value) for row_value in non_none_values]
205            return self._determine_if_float_or_int(interpreted_numbers)
206
207        # Case 2: Values are lists of numbers (e.g., [[1, 2], [3.1], [4]])
208        if all(isinstance(row_value, list) for row_value in non_none_values):
209            if all(
210                    all(isinstance(item, (int, float)) and not isinstance(item, bool) for item in row_value)
211                    for row_value in non_none_values
212            ):
213                # Flatten the list of lists and interpret all non-None elements
214                interpreted_numbers = [self._interpret_number(item)
215                                       for row_value in non_none_values for item in row_value if item is not None]
216
217                return self._determine_if_float_or_int(interpreted_numbers)
218
219        # If none of the above special cases apply, use the first of the non-null values to determine the
220        # TDR data type
221        first_value = non_none_values[0]
222        if isinstance(first_value, list):
223            return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value[0])]
224        return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(first_value)]
225
226    def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]:
227        """
228        Generate the metadata for each column's header name, data type, and whether it's an array of values.
229
230        Args:
231            key_value_type_mappings (dict): A dictionary where the key is the header,
232                and the value is a list of values for the header.
233
234        Returns:
235            list[dict]: A list of dictionaries containing column metadata.
236        """
237        columns = []
238
239        for header, values_for_header in key_value_type_mappings.items():
240            force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0]
241
242            # if the ANY of the values for a given header is a list, we assume that column contains arrays of values
243            array_of = True if any(isinstance(v, list) for v in values_for_header) else False
244
245            if force_to_string:
246                logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column")
247                data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str]
248            else:
249                # Use all existing values for the header to determine the data type
250                data_type = self._python_type_to_tdr_type_conversion(values_for_header)
251
252            column_metadata = {
253                "name": header,
254                "datatype": data_type,
255                "array_of": array_of,
256            }
257            columns.append(column_metadata)
258
259        return columns
260
261    def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]:
262        """
263        Determine whether each header is required or not.
264
265        Args:
266            metadata_df (Any): The original dataframe.
267            dataframe_headers (list[str]): A list of dataframe headers.
268
269        Returns:
270            list[dict]: A list of dictionaries containing header requirements.
271        """
272        header_requirements = []
273
274        na_replaced = metadata_df.replace({None: np.nan})
275        for header in dataframe_headers:
276            all_none = na_replaced[header].isna().all()
277            some_none = na_replaced[header].isna().any()
278            contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any()
279
280            # if the column contains any arrays, set it as optional since arrays cannot be required in tdr
281            if contains_array:
282                header_requirements.append({"name": header, "required": False})
283            # if all rows are none for a given column, we set the default type to "string" type in TDR
284            elif all_none:
285                header_requirements.append({"name": header, "required": False, "data_type": "string"})
286            # if some rows are none or all non required is set to true AND header
287            # is not primary key, we set the column to non-required
288            elif some_none or (self.all_fields_non_required and header != self.primary_key):
289                header_requirements.append({"name": header, "required": False})
290            else:
291                header_requirements.append({"name": header, "required": True})
292
293        return header_requirements
294
295    @staticmethod
296    def _reformat_metadata(cleaned_metadata: list[dict]) -> dict:
297        """
298        Create a dictionary where the key is the header name, and the value is a list of all values for that header.
299
300        Args:
301            cleaned_metadata (list[dict]): The cleaned metadata.
302
303        Returns:
304            dict: A dictionary with header names as keys and lists of values as values.
305        """
306        key_value_type_mappings = {}
307        unique_headers = {key for row in cleaned_metadata for key in row}
308
309        for header in unique_headers:
310            for row in cleaned_metadata:
311                value = row[header]
312                if header not in key_value_type_mappings:
313                    key_value_type_mappings[header] = [value]
314                else:
315                    key_value_type_mappings[header].append(value)
316        return key_value_type_mappings
317
318    def infer_schema(self) -> dict:
319        """
320        Infer the schema for the table based on the input metadata.
321
322        **Returns:**
323        - dict: The inferred schema in the format expected by TDR.
324        """
325        logging.info(f"Inferring schema for table {self.table_name}")
326        # create the dataframe
327        metadata_df = pd.DataFrame(self.input_metadata)
328        # Replace all nan with None
329        metadata_df = metadata_df.where(pd.notnull(metadata_df), None)
330
331        # find all headers that need to be renamed if they have "entity" in them and rename the headers
332        headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0]
333        metadata_df = metadata_df.rename(columns=headers_to_be_renamed)
334
335        # start by gathering the column metadata by determining which headers are required or not
336        column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns))
337
338        # drop columns where ALL values are None, but keep rows where some values are None
339        # we keep the rows where some values are none because if we happen to have a different column that's none in
340        # every row, we could end up with no data at the end
341        all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all")
342        cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records")
343        key_value_type_mappings = self._reformat_metadata(cleaned_metadata)
344
345        # check to see if all values corresponding to a header are of the same type
346        disparate_header_info = self._check_type_consistency(key_value_type_mappings)
347
348        columns = self._format_column_metadata(
349            key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info
350        )
351
352        # combine the information about required headers with the data types that were collected
353        for header_metadata in column_metadata:
354            matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]]
355            if matching_metadata:
356                header_metadata.update(matching_metadata[0])
357
358        tdr_tables_json = {
359            "name": self.table_name,
360            "columns": column_metadata,
361        }
362
363        return tdr_tables_json

A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.

InferTDRSchema( input_metadata: list[dict], table_name: str, all_fields_non_required: bool = False, allow_disparate_data_types_in_column: bool = False, primary_key: Optional[str] = None)
32    def __init__(
33            self,
34            input_metadata: list[dict],
35            table_name: str,
36            all_fields_non_required: bool = False,
37            allow_disparate_data_types_in_column: bool = False,
38            primary_key: Optional[str] = None
39    ):
40        """
41        Initialize the InferTDRSchema class.
42
43        **Args:**
44        - input_metadata (list[dict]): The input metadata to infer the schema from.
45        - table_name (str): The name of the table for which the schema is being inferred.
46        - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
47                besides for primary key. Defaults to `False`
48        - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
49                column to be of type `str` Defaults to `False`.
50        - primary_key (str, optional): The name of the primary key column. Used to determine if the column
51            should be required
52        """
53        self.input_metadata = input_metadata
54        """@private"""
55        self.table_name = table_name
56        """@private"""
57        self.all_fields_non_required = all_fields_non_required
58        """@private"""
59        self.primary_key = primary_key
60        """@private"""
61        self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column
62        """@private"""

Initialize the InferTDRSchema class.

Args:

  • input_metadata (list[dict]): The input metadata to infer the schema from.
  • table_name (str): The name of the table for which the schema is being inferred.
  • all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required besides for primary key. Defaults to False
  • allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a column to be of type str Defaults to False.
  • primary_key (str, optional): The name of the primary key column. Used to determine if the column should be required
def infer_schema(self) -> dict:
318    def infer_schema(self) -> dict:
319        """
320        Infer the schema for the table based on the input metadata.
321
322        **Returns:**
323        - dict: The inferred schema in the format expected by TDR.
324        """
325        logging.info(f"Inferring schema for table {self.table_name}")
326        # create the dataframe
327        metadata_df = pd.DataFrame(self.input_metadata)
328        # Replace all nan with None
329        metadata_df = metadata_df.where(pd.notnull(metadata_df), None)
330
331        # find all headers that need to be renamed if they have "entity" in them and rename the headers
332        headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0]
333        metadata_df = metadata_df.rename(columns=headers_to_be_renamed)
334
335        # start by gathering the column metadata by determining which headers are required or not
336        column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns))
337
338        # drop columns where ALL values are None, but keep rows where some values are None
339        # we keep the rows where some values are none because if we happen to have a different column that's none in
340        # every row, we could end up with no data at the end
341        all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all")
342        cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records")
343        key_value_type_mappings = self._reformat_metadata(cleaned_metadata)
344
345        # check to see if all values corresponding to a header are of the same type
346        disparate_header_info = self._check_type_consistency(key_value_type_mappings)
347
348        columns = self._format_column_metadata(
349            key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info
350        )
351
352        # combine the information about required headers with the data types that were collected
353        for header_metadata in column_metadata:
354            matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]]
355            if matching_metadata:
356                header_metadata.update(matching_metadata[0])
357
358        tdr_tables_json = {
359            "name": self.table_name,
360            "columns": column_metadata,
361        }
362
363        return tdr_tables_json

Infer the schema for the table based on the input metadata.

Returns:

  • dict: The inferred schema in the format expected by TDR.