ops_utils.tdr_utils.tdr_schema_utils

Utility classes for TDR schema.

  1"""Utility classes for TDR schema."""
  2
  3import logging
  4import re
  5import time
  6import numpy as np
  7import pandas as pd
  8from datetime import date, datetime
  9from typing import Any, Optional
 10
 11
 12class InferTDRSchema:
 13    """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata."""
 14
 15    PYTHON_TDR_DATA_TYPE_MAPPING = {
 16        str: "string",
 17        "fileref": "fileref",
 18        bool: "boolean",
 19        bytes: "bytes",
 20        date: "date",
 21        datetime: "datetime",
 22        float: "float64",
 23        np.float64: "float64",
 24        int: "int64",
 25        np.int64: "int64",
 26        time: "time",
 27    }
 28    """@private"""
 29
 30    def __init__(
 31            self,
 32            input_metadata: list[dict],
 33            table_name: str,
 34            all_fields_non_required: bool = False,
 35            allow_disparate_data_types_in_column: bool = False,
 36            primary_key: Optional[str] = None
 37    ):
 38        """
 39        Initialize the InferTDRSchema class.
 40
 41        **Args:**
 42        - input_metadata (list[dict]): The input metadata to infer the schema from.
 43        - table_name (str): The name of the table for which the schema is being inferred.
 44        - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
 45                besides for primary key. Defaults to `False`
 46        - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
 47                column to be of type `str` Defaults to `False`.
 48        - primary_key (str, optional): The name of the primary key column. Used to determine if the column
 49            should be required
 50        """
 51        self.input_metadata = input_metadata
 52        """@private"""
 53        self.table_name = table_name
 54        """@private"""
 55        self.all_fields_non_required = all_fields_non_required
 56        """@private"""
 57        self.primary_key = primary_key
 58        """@private"""
 59        self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column
 60        """@private"""
 61
 62    def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]:
 63        """
 64        Check if all values for each header are of the same type.
 65
 66        **Args:**
 67        - key_value_type_mappings (dict): A dictionary where the key is the header,
 68                and the value is a list of values for the header.
 69
 70        Raises:
 71            Exception: If types do not match for any header.
 72        """
 73        matching = []
 74
 75        disparate_header_info = []
 76
 77        for header, values_for_header in key_value_type_mappings.items():
 78            # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring
 79            # "None" entries
 80            if (any(isinstance(item, list) for item in values_for_header if item is not None) and
 81                    not all(isinstance(item, list) for item in values_for_header if item is not None)):
 82                all_values_matching = False
 83            # if the row contains ONLY lists of items, check that all items in each list are of the same type (while
 84            # ignoring "None" entries)
 85            elif all(isinstance(item, list) for item in values_for_header if item is not None):
 86                # first get all substrings that have some values
 87                non_empty_substrings = [v for v in values_for_header if v]
 88                if non_empty_substrings:
 89                    # get one "type" from the list of values
 90                    first_match_type = type([v[0] for v in non_empty_substrings][0])
 91                    all_values_matching = all(
 92                        all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings
 93                    )
 94                else:
 95                    # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled
 96                    # below)
 97                    all_values_matching = True
 98            else:
 99                # find one value that's non-none to get the type to check against
100                # specifically check if not "None" since we can have all zeroes, for example
101                type_to_match_against = type([v for v in values_for_header if v is not None][0])
102                # check if all the values in the list that are non-none match the type of the first entry
103                all_values_matching = all(
104                    isinstance(v, type_to_match_against) for v in values_for_header if v is not None
105                )
106
107            # If ALL rows for the header are none, force the type to be a string
108            if all_values_matching and not any(values_for_header):
109                matching.append({header: all_values_matching})
110                disparate_header_info.append(
111                    {
112                        "header": header,
113                        "force_to_string": True,
114                    }
115                )
116            if not all_values_matching and self.allow_disparate_data_types_in_column:
117                logging.info(
118                    f"Not all data types matched for header '{header}' but forcing them to strings as "
119                    f"'allow_disparate_data_types_in_column' setting is set to true"
120                )
121                matching.append({header: True})
122                disparate_header_info.append(
123                    {
124                        "header": header,
125                        "force_to_string": True,
126                    }
127                )
128            else:
129                matching.append({header: all_values_matching})  # type: ignore[dict-item]
130                disparate_header_info.append(
131                    {
132                        "header": header,
133                        "force_to_string": False,
134                    }
135                )
136
137        # Returns true if all headers are determined to be "matching"
138        problematic_headers = [
139            d.keys()
140            for d in matching
141            if not list(d.values())[0]
142        ]
143
144        if problematic_headers:
145            raise Exception(
146                f"Not all values for the following headers are of the same type: {problematic_headers}. To force all"
147                f" values in rows of a given column to be forced to the same type and bypass this error, re-run with "
148                f"the 'force_disparate_rows_to_string' option set to true"
149            )
150
151        return disparate_header_info
152
153    def _python_type_to_tdr_type_conversion(self, value_for_header: Any) -> str:
154        """
155        Convert Python data types to TDR data types.
156
157        Args:
158            value_for_header (Any): The value to determine the TDR type for.
159
160        Returns:
161            str: The TDR data type.
162        """
163        gcp_fileref_regex = "^gs://.*"
164
165        # Find potential file references
166        if isinstance(value_for_header, str):
167            gcp_match = re.search(
168                pattern=gcp_fileref_regex, string=value_for_header)
169            if gcp_match:
170                return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
171
172        # Tried to use this to parse datetimes, but it was turning too many
173        # regular ints into datetimes. Commenting out for now
174        # try:
175        #    date_or_time = parser.parse(value_for_header)
176        #    return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(date_or_time)]
177        #    pass
178        # except (TypeError, ParserError):
179        #    pass
180
181        if isinstance(value_for_header, list):
182            # check for potential list of filerefs
183            for v in value_for_header:
184                if isinstance(v, str):
185                    gcp_match = re.search(pattern=gcp_fileref_regex, string=v)
186                    if gcp_match:
187                        return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
188            non_none_entry_in_list = [a for a in value_for_header if a is not None][0]
189            return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(non_none_entry_in_list)]
190
191        # if none of the above special cases apply, just pass the type of the value to determine the TDR type
192        return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(value_for_header)]
193
194    def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]:
195        """
196        Generate the metadata for each column's header name, data type, and whether it's an array of values.
197
198        Args:
199            key_value_type_mappings (dict): A dictionary where the key is the header,
200                and the value is a list of values for the header.
201
202        Returns:
203            list[dict]: A list of dictionaries containing column metadata.
204        """
205        columns = []
206
207        for header, values_for_header in key_value_type_mappings.items():
208            force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0]
209
210            # if the ANY of the values for a given header is a list, we assume that column contains arrays of values
211            array_of = True if any(isinstance(v, list) for v in values_for_header) else False
212
213            if force_to_string:
214                logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column")
215                data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str]
216            else:
217                # find either the first item that's non-None, or the first non-empty list
218                data_type = self._python_type_to_tdr_type_conversion([a for a in values_for_header if a is not None][0])
219
220            column_metadata = {
221                "name": header,
222                "datatype": data_type,
223                "array_of": array_of,
224            }
225            columns.append(column_metadata)
226
227        return columns
228
229    def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]:
230        """
231        Determine whether each header is required or not.
232
233        Args:
234            metadata_df (Any): The original dataframe.
235            dataframe_headers (list[str]): A list of dataframe headers.
236
237        Returns:
238            list[dict]: A list of dictionaries containing header requirements.
239        """
240        header_requirements = []
241
242        na_replaced = metadata_df.replace({None: np.nan})
243        for header in dataframe_headers:
244            all_none = na_replaced[header].isna().all()
245            some_none = na_replaced[header].isna().any()
246            contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any()
247
248            # if the column contains any arrays, set it as optional since arrays cannot be required in tdr
249            if contains_array:
250                header_requirements.append({"name": header, "required": False})
251            # if all rows are none for a given column, we set the default type to "string" type in TDR
252            elif all_none:
253                header_requirements.append({"name": header, "required": False, "data_type": "string"})
254            # if some rows are none or all non required is set to true AND header
255            # is not primary key, we set the column to non-required
256            elif some_none or (self.all_fields_non_required and header != self.primary_key):
257                header_requirements.append({"name": header, "required": False})
258            else:
259                header_requirements.append({"name": header, "required": True})
260
261        return header_requirements
262
263    @staticmethod
264    def _reformat_metadata(cleaned_metadata: list[dict]) -> dict:
265        """
266        Create a dictionary where the key is the header name, and the value is a list of all values for that header.
267
268        Args:
269            cleaned_metadata (list[dict]): The cleaned metadata.
270
271        Returns:
272            dict: A dictionary with header names as keys and lists of values as values.
273        """
274        key_value_type_mappings = {}
275        unique_headers = {key for row in cleaned_metadata for key in row}
276
277        for header in unique_headers:
278            for row in cleaned_metadata:
279                value = row[header]
280                if header not in key_value_type_mappings:
281                    key_value_type_mappings[header] = [value]
282                else:
283                    key_value_type_mappings[header].append(value)
284        return key_value_type_mappings
285
286    def infer_schema(self) -> dict:
287        """
288        Infer the schema for the table based on the input metadata.
289
290        **Returns:**
291        - dict: The inferred schema in the format expected by TDR.
292        """
293        logging.info(f"Inferring schema for table {self.table_name}")
294        # create the dataframe
295        metadata_df = pd.DataFrame(self.input_metadata)
296        # Replace all nan with None
297        metadata_df = metadata_df.where(pd.notnull(metadata_df), None)
298
299        # find all headers that need to be renamed if they have "entity" in them and rename the headers
300        headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0]
301        metadata_df = metadata_df.rename(columns=headers_to_be_renamed)
302
303        # start by gathering the column metadata by determining which headers are required or not
304        column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns))
305
306        # drop columns where ALL values are None, but keep rows where some values are None
307        # we keep the rows where some values are none because if we happen to have a different column that's none in
308        # every row, we could end up with no data at the end
309        all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all")
310        cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records")
311        key_value_type_mappings = self._reformat_metadata(cleaned_metadata)
312
313        # check to see if all values corresponding to a header are of the same type
314        disparate_header_info = self._check_type_consistency(key_value_type_mappings)
315
316        columns = self._format_column_metadata(
317            key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info
318        )
319
320        # combine the information about required headers with the data types that were collected
321        for header_metadata in column_metadata:
322            matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]]
323            if matching_metadata:
324                header_metadata.update(matching_metadata[0])
325
326        tdr_tables_json = {
327            "name": self.table_name,
328            "columns": column_metadata,
329        }
330
331        return tdr_tables_json
class InferTDRSchema:
 13class InferTDRSchema:
 14    """A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata."""
 15
 16    PYTHON_TDR_DATA_TYPE_MAPPING = {
 17        str: "string",
 18        "fileref": "fileref",
 19        bool: "boolean",
 20        bytes: "bytes",
 21        date: "date",
 22        datetime: "datetime",
 23        float: "float64",
 24        np.float64: "float64",
 25        int: "int64",
 26        np.int64: "int64",
 27        time: "time",
 28    }
 29    """@private"""
 30
 31    def __init__(
 32            self,
 33            input_metadata: list[dict],
 34            table_name: str,
 35            all_fields_non_required: bool = False,
 36            allow_disparate_data_types_in_column: bool = False,
 37            primary_key: Optional[str] = None
 38    ):
 39        """
 40        Initialize the InferTDRSchema class.
 41
 42        **Args:**
 43        - input_metadata (list[dict]): The input metadata to infer the schema from.
 44        - table_name (str): The name of the table for which the schema is being inferred.
 45        - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
 46                besides for primary key. Defaults to `False`
 47        - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
 48                column to be of type `str` Defaults to `False`.
 49        - primary_key (str, optional): The name of the primary key column. Used to determine if the column
 50            should be required
 51        """
 52        self.input_metadata = input_metadata
 53        """@private"""
 54        self.table_name = table_name
 55        """@private"""
 56        self.all_fields_non_required = all_fields_non_required
 57        """@private"""
 58        self.primary_key = primary_key
 59        """@private"""
 60        self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column
 61        """@private"""
 62
 63    def _check_type_consistency(self, key_value_type_mappings: dict) -> list[dict]:
 64        """
 65        Check if all values for each header are of the same type.
 66
 67        **Args:**
 68        - key_value_type_mappings (dict): A dictionary where the key is the header,
 69                and the value is a list of values for the header.
 70
 71        Raises:
 72            Exception: If types do not match for any header.
 73        """
 74        matching = []
 75
 76        disparate_header_info = []
 77
 78        for header, values_for_header in key_value_type_mappings.items():
 79            # check if some values are lists while others are not (consider this a "mismatch" if so) while ignoring
 80            # "None" entries
 81            if (any(isinstance(item, list) for item in values_for_header if item is not None) and
 82                    not all(isinstance(item, list) for item in values_for_header if item is not None)):
 83                all_values_matching = False
 84            # if the row contains ONLY lists of items, check that all items in each list are of the same type (while
 85            # ignoring "None" entries)
 86            elif all(isinstance(item, list) for item in values_for_header if item is not None):
 87                # first get all substrings that have some values
 88                non_empty_substrings = [v for v in values_for_header if v]
 89                if non_empty_substrings:
 90                    # get one "type" from the list of values
 91                    first_match_type = type([v[0] for v in non_empty_substrings][0])
 92                    all_values_matching = all(
 93                        all(isinstance(item, first_match_type) for item in sublist) for sublist in non_empty_substrings
 94                    )
 95                else:
 96                    # if all "sub-lists" are empty, assume that all types are matching (all empty lists are handled
 97                    # below)
 98                    all_values_matching = True
 99            else:
100                # find one value that's non-none to get the type to check against
101                # specifically check if not "None" since we can have all zeroes, for example
102                type_to_match_against = type([v for v in values_for_header if v is not None][0])
103                # check if all the values in the list that are non-none match the type of the first entry
104                all_values_matching = all(
105                    isinstance(v, type_to_match_against) for v in values_for_header if v is not None
106                )
107
108            # If ALL rows for the header are none, force the type to be a string
109            if all_values_matching and not any(values_for_header):
110                matching.append({header: all_values_matching})
111                disparate_header_info.append(
112                    {
113                        "header": header,
114                        "force_to_string": True,
115                    }
116                )
117            if not all_values_matching and self.allow_disparate_data_types_in_column:
118                logging.info(
119                    f"Not all data types matched for header '{header}' but forcing them to strings as "
120                    f"'allow_disparate_data_types_in_column' setting is set to true"
121                )
122                matching.append({header: True})
123                disparate_header_info.append(
124                    {
125                        "header": header,
126                        "force_to_string": True,
127                    }
128                )
129            else:
130                matching.append({header: all_values_matching})  # type: ignore[dict-item]
131                disparate_header_info.append(
132                    {
133                        "header": header,
134                        "force_to_string": False,
135                    }
136                )
137
138        # Returns true if all headers are determined to be "matching"
139        problematic_headers = [
140            d.keys()
141            for d in matching
142            if not list(d.values())[0]
143        ]
144
145        if problematic_headers:
146            raise Exception(
147                f"Not all values for the following headers are of the same type: {problematic_headers}. To force all"
148                f" values in rows of a given column to be forced to the same type and bypass this error, re-run with "
149                f"the 'force_disparate_rows_to_string' option set to true"
150            )
151
152        return disparate_header_info
153
154    def _python_type_to_tdr_type_conversion(self, value_for_header: Any) -> str:
155        """
156        Convert Python data types to TDR data types.
157
158        Args:
159            value_for_header (Any): The value to determine the TDR type for.
160
161        Returns:
162            str: The TDR data type.
163        """
164        gcp_fileref_regex = "^gs://.*"
165
166        # Find potential file references
167        if isinstance(value_for_header, str):
168            gcp_match = re.search(
169                pattern=gcp_fileref_regex, string=value_for_header)
170            if gcp_match:
171                return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
172
173        # Tried to use this to parse datetimes, but it was turning too many
174        # regular ints into datetimes. Commenting out for now
175        # try:
176        #    date_or_time = parser.parse(value_for_header)
177        #    return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(date_or_time)]
178        #    pass
179        # except (TypeError, ParserError):
180        #    pass
181
182        if isinstance(value_for_header, list):
183            # check for potential list of filerefs
184            for v in value_for_header:
185                if isinstance(v, str):
186                    gcp_match = re.search(pattern=gcp_fileref_regex, string=v)
187                    if gcp_match:
188                        return self.PYTHON_TDR_DATA_TYPE_MAPPING["fileref"]
189            non_none_entry_in_list = [a for a in value_for_header if a is not None][0]
190            return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(non_none_entry_in_list)]
191
192        # if none of the above special cases apply, just pass the type of the value to determine the TDR type
193        return self.PYTHON_TDR_DATA_TYPE_MAPPING[type(value_for_header)]
194
195    def _format_column_metadata(self, key_value_type_mappings: dict, disparate_header_info: list[dict]) -> list[dict]:
196        """
197        Generate the metadata for each column's header name, data type, and whether it's an array of values.
198
199        Args:
200            key_value_type_mappings (dict): A dictionary where the key is the header,
201                and the value is a list of values for the header.
202
203        Returns:
204            list[dict]: A list of dictionaries containing column metadata.
205        """
206        columns = []
207
208        for header, values_for_header in key_value_type_mappings.items():
209            force_to_string = [h["force_to_string"] for h in disparate_header_info if h["header"] == header][0]
210
211            # if the ANY of the values for a given header is a list, we assume that column contains arrays of values
212            array_of = True if any(isinstance(v, list) for v in values_for_header) else False
213
214            if force_to_string:
215                logging.info(f"Header '{header}' was forced to string to to mismatched datatypes in column")
216                data_type = self.PYTHON_TDR_DATA_TYPE_MAPPING[str]
217            else:
218                # find either the first item that's non-None, or the first non-empty list
219                data_type = self._python_type_to_tdr_type_conversion([a for a in values_for_header if a is not None][0])
220
221            column_metadata = {
222                "name": header,
223                "datatype": data_type,
224                "array_of": array_of,
225            }
226            columns.append(column_metadata)
227
228        return columns
229
230    def _gather_required_and_non_required_headers(self, metadata_df: Any, dataframe_headers: list[str]) -> list[dict]:
231        """
232        Determine whether each header is required or not.
233
234        Args:
235            metadata_df (Any): The original dataframe.
236            dataframe_headers (list[str]): A list of dataframe headers.
237
238        Returns:
239            list[dict]: A list of dictionaries containing header requirements.
240        """
241        header_requirements = []
242
243        na_replaced = metadata_df.replace({None: np.nan})
244        for header in dataframe_headers:
245            all_none = na_replaced[header].isna().all()
246            some_none = na_replaced[header].isna().any()
247            contains_array = na_replaced[header].apply(lambda x: isinstance(x, (np.ndarray, list))).any()
248
249            # if the column contains any arrays, set it as optional since arrays cannot be required in tdr
250            if contains_array:
251                header_requirements.append({"name": header, "required": False})
252            # if all rows are none for a given column, we set the default type to "string" type in TDR
253            elif all_none:
254                header_requirements.append({"name": header, "required": False, "data_type": "string"})
255            # if some rows are none or all non required is set to true AND header
256            # is not primary key, we set the column to non-required
257            elif some_none or (self.all_fields_non_required and header != self.primary_key):
258                header_requirements.append({"name": header, "required": False})
259            else:
260                header_requirements.append({"name": header, "required": True})
261
262        return header_requirements
263
264    @staticmethod
265    def _reformat_metadata(cleaned_metadata: list[dict]) -> dict:
266        """
267        Create a dictionary where the key is the header name, and the value is a list of all values for that header.
268
269        Args:
270            cleaned_metadata (list[dict]): The cleaned metadata.
271
272        Returns:
273            dict: A dictionary with header names as keys and lists of values as values.
274        """
275        key_value_type_mappings = {}
276        unique_headers = {key for row in cleaned_metadata for key in row}
277
278        for header in unique_headers:
279            for row in cleaned_metadata:
280                value = row[header]
281                if header not in key_value_type_mappings:
282                    key_value_type_mappings[header] = [value]
283                else:
284                    key_value_type_mappings[header].append(value)
285        return key_value_type_mappings
286
287    def infer_schema(self) -> dict:
288        """
289        Infer the schema for the table based on the input metadata.
290
291        **Returns:**
292        - dict: The inferred schema in the format expected by TDR.
293        """
294        logging.info(f"Inferring schema for table {self.table_name}")
295        # create the dataframe
296        metadata_df = pd.DataFrame(self.input_metadata)
297        # Replace all nan with None
298        metadata_df = metadata_df.where(pd.notnull(metadata_df), None)
299
300        # find all headers that need to be renamed if they have "entity" in them and rename the headers
301        headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0]
302        metadata_df = metadata_df.rename(columns=headers_to_be_renamed)
303
304        # start by gathering the column metadata by determining which headers are required or not
305        column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns))
306
307        # drop columns where ALL values are None, but keep rows where some values are None
308        # we keep the rows where some values are none because if we happen to have a different column that's none in
309        # every row, we could end up with no data at the end
310        all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all")
311        cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records")
312        key_value_type_mappings = self._reformat_metadata(cleaned_metadata)
313
314        # check to see if all values corresponding to a header are of the same type
315        disparate_header_info = self._check_type_consistency(key_value_type_mappings)
316
317        columns = self._format_column_metadata(
318            key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info
319        )
320
321        # combine the information about required headers with the data types that were collected
322        for header_metadata in column_metadata:
323            matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]]
324            if matching_metadata:
325                header_metadata.update(matching_metadata[0])
326
327        tdr_tables_json = {
328            "name": self.table_name,
329            "columns": column_metadata,
330        }
331
332        return tdr_tables_json

A class to infer the schema for a table in TDR (Terra Data Repository) based on input metadata.

InferTDRSchema( input_metadata: list[dict], table_name: str, all_fields_non_required: bool = False, allow_disparate_data_types_in_column: bool = False, primary_key: Optional[str] = None)
31    def __init__(
32            self,
33            input_metadata: list[dict],
34            table_name: str,
35            all_fields_non_required: bool = False,
36            allow_disparate_data_types_in_column: bool = False,
37            primary_key: Optional[str] = None
38    ):
39        """
40        Initialize the InferTDRSchema class.
41
42        **Args:**
43        - input_metadata (list[dict]): The input metadata to infer the schema from.
44        - table_name (str): The name of the table for which the schema is being inferred.
45        - all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required
46                besides for primary key. Defaults to `False`
47        - allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a
48                column to be of type `str` Defaults to `False`.
49        - primary_key (str, optional): The name of the primary key column. Used to determine if the column
50            should be required
51        """
52        self.input_metadata = input_metadata
53        """@private"""
54        self.table_name = table_name
55        """@private"""
56        self.all_fields_non_required = all_fields_non_required
57        """@private"""
58        self.primary_key = primary_key
59        """@private"""
60        self.allow_disparate_data_types_in_column = allow_disparate_data_types_in_column
61        """@private"""

Initialize the InferTDRSchema class.

Args:

  • input_metadata (list[dict]): The input metadata to infer the schema from.
  • table_name (str): The name of the table for which the schema is being inferred.
  • all_fields_non_required (bool): A boolean indicating whether all columns should be set to non-required besides for primary key. Defaults to False
  • allow_disparate_data_types_in_column (bool): A boolean indicating whether force disparate data types in a column to be of type str Defaults to False.
  • primary_key (str, optional): The name of the primary key column. Used to determine if the column should be required
def infer_schema(self) -> dict:
287    def infer_schema(self) -> dict:
288        """
289        Infer the schema for the table based on the input metadata.
290
291        **Returns:**
292        - dict: The inferred schema in the format expected by TDR.
293        """
294        logging.info(f"Inferring schema for table {self.table_name}")
295        # create the dataframe
296        metadata_df = pd.DataFrame(self.input_metadata)
297        # Replace all nan with None
298        metadata_df = metadata_df.where(pd.notnull(metadata_df), None)
299
300        # find all headers that need to be renamed if they have "entity" in them and rename the headers
301        headers_to_be_renamed = [{h: h.split(":")[1] for h in list(metadata_df.columns) if h.startswith("entity")}][0]
302        metadata_df = metadata_df.rename(columns=headers_to_be_renamed)
303
304        # start by gathering the column metadata by determining which headers are required or not
305        column_metadata = self._gather_required_and_non_required_headers(metadata_df, list(metadata_df.columns))
306
307        # drop columns where ALL values are None, but keep rows where some values are None
308        # we keep the rows where some values are none because if we happen to have a different column that's none in
309        # every row, we could end up with no data at the end
310        all_none_columns_dropped_df = metadata_df.dropna(axis=1, how="all")
311        cleaned_metadata = all_none_columns_dropped_df.to_dict(orient="records")
312        key_value_type_mappings = self._reformat_metadata(cleaned_metadata)
313
314        # check to see if all values corresponding to a header are of the same type
315        disparate_header_info = self._check_type_consistency(key_value_type_mappings)
316
317        columns = self._format_column_metadata(
318            key_value_type_mappings=key_value_type_mappings, disparate_header_info=disparate_header_info
319        )
320
321        # combine the information about required headers with the data types that were collected
322        for header_metadata in column_metadata:
323            matching_metadata = [d for d in columns if d["name"] == header_metadata["name"]]
324            if matching_metadata:
325                header_metadata.update(matching_metadata[0])
326
327        tdr_tables_json = {
328            "name": self.table_name,
329            "columns": column_metadata,
330        }
331
332        return tdr_tables_json

Infer the schema for the table based on the input metadata.

Returns:

  • dict: The inferred schema in the format expected by TDR.