ops_utils.tdr_utils.tdr_ingest_utils

Classes and functions for ingesting data into TDR.

  1"""Classes and functions for ingesting data into TDR."""
  2
  3import json
  4import logging
  5import sys
  6import pytz
  7from datetime import datetime
  8import math
  9from typing import Optional, Any
 10from dateutil import parser
 11
 12from ..vars import ARG_DEFAULTS
 13
 14from .tdr_api_utils import TDR, FilterOutSampleIdsAlreadyInDataset
 15from .tdr_job_utils import MonitorTDRJob
 16from ..terra_util import TerraWorkspace
 17
 18
 19class BatchIngest:
 20    """A class to handle batch ingestion of metadata into TDR (Terra Data Repository)."""
 21
 22    def __init__(
 23            self,
 24            ingest_metadata: list[dict],
 25            tdr: TDR,
 26            target_table_name: str,
 27            dataset_id: str,
 28            batch_size: int,
 29            bulk_mode: bool,
 30            update_strategy: str = "replace",
 31            waiting_time_to_poll: int = ARG_DEFAULTS["waiting_time_to_poll"],  # type: ignore[assignment]
 32            test_ingest: bool = False,
 33            load_tag: Optional[str] = None,
 34            file_to_uuid_dict: Optional[dict] = None,
 35            schema_info: Optional[dict] = None,
 36            skip_reformat: bool = False
 37    ):
 38        """
 39        Initialize the BatchIngest class.
 40
 41        **Args:**
 42        - ingest_metadata (list[dict]): The metadata to be ingested.
 43        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 44        - target_table_name (str): The name of the target table.
 45        - dataset_id (str): The ID of the dataset.
 46        - batch_size (int): The size of each batch for ingestion.
 47        - bulk_mode (bool): Flag indicating if bulk mode should be used.
 48        - update_strategy (str, optional): The strategy for updating existing records. Defaults to `replace`.
 49        - waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to `90`.
 50        - test_ingest (bool, optional): Flag indicating if only the first batch should be
 51                ingested for testing. Defaults to `False`.
 52        - load_tag (str, optional): A tag to identify the load. Used so future ingests
 53                can pick up where left off. Defaults to None.
 54        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
 55            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
 56            source file paths to UUIDs. If used will make ingest much quicker since no ingest
 57            or look up of file needed. Defaults to None.
 58        - schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up
 59                with schema info. Defaults to None.
 60        - skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to `False`.
 61        """
 62        self.ingest_metadata = self._reformat_for_type_consistency(ingest_metadata)
 63        """@private"""
 64        self.tdr = tdr
 65        """@private"""
 66        self.target_table_name = target_table_name
 67        """@private"""
 68        self.dataset_id = dataset_id
 69        """@private"""
 70        self.batch_size = int(batch_size)
 71        """@private"""
 72        self.update_strategy = update_strategy
 73        """@private"""
 74        self.bulk_mode = bulk_mode
 75        """@private"""
 76        self.waiting_time_to_poll = waiting_time_to_poll
 77        """@private"""
 78        # Used if you want to run first batch and then exit after success
 79        self.test_ingest = test_ingest
 80        """@private"""
 81        self.load_tag = load_tag
 82        """@private"""
 83        self.file_to_uuid_dict = file_to_uuid_dict
 84        """@private"""
 85        # Used if you want to provide schema info for tables to make sure values match.
 86        # Should be dict with key being column name and value being dict with datatype
 87        self.schema_info = schema_info
 88        """@private"""
 89        # Use if input is already formatted correctly for ingest
 90        self.skip_reformat = skip_reformat
 91        """@private"""
 92
 93    @staticmethod
 94    def _reformat_for_type_consistency(ingest_metadata: list[dict]) -> list[dict]:
 95        """
 96        Reformats ingest metadata and finds headers where values are a mix of lists and non-lists.
 97
 98        If there is mix of these types of values, it converts the non-array to a one-item list. The updated metadata
 99        is then returned to be used for everything downstream
100        """
101        unique_headers = sorted({key for item in ingest_metadata for key in item.keys()})
102
103        headers_containing_mismatch = []
104        for header in unique_headers:
105            all_values_for_header = [r.get(header) for r in ingest_metadata]
106            # Find headers where some values are lists and some are not (while filtering out None values)
107            if any(isinstance(value, list) for value in all_values_for_header if value is not None) and not all(
108                    isinstance(value, list) for value in all_values_for_header if value is not None):
109                logging.info(
110                    f"Header {header} contains lists and non-list items. Will convert the non-list items into a list"
111                )
112                headers_containing_mismatch.append(header)
113
114        updated_metadata = []
115        for record in ingest_metadata:
116            new_record = {}
117            for header, value in record.items():
118                if header in headers_containing_mismatch:
119                    updated_value = [value] if not isinstance(value, list) else value
120                else:
121                    updated_value = value
122                new_record[header] = updated_value
123            updated_metadata.append(new_record)
124
125        return updated_metadata
126
127    def run(self) -> None:
128        """Run the batch ingestion process."""
129        logging.info(
130            f"Batching {len(self.ingest_metadata)} total rows into batches of {self.batch_size} for ingest")
131        total_batches = math.ceil(len(self.ingest_metadata) / self.batch_size)
132        for i in range(0, len(self.ingest_metadata), self.batch_size):
133            batch_number = i // self.batch_size + 1
134            logging.info(f"Starting ingest batch {batch_number} of {total_batches} into table {self.target_table_name}")
135            metrics_batch = self.ingest_metadata[i:i + self.batch_size]
136            if self.skip_reformat:
137                reformatted_batch = metrics_batch
138            else:
139                reformatted_batch = ReformatMetricsForIngest(
140                    ingest_metadata=metrics_batch,
141                    file_to_uuid_dict=self.file_to_uuid_dict,
142                    schema_info=self.schema_info
143                ).run()
144
145            if self.load_tag:
146                load_tag = self.load_tag
147            else:
148                load_tag = f"{self.dataset_id}.{self.target_table_name}"
149            # Start actual ingest
150            if reformatted_batch:
151                StartAndMonitorIngest(
152                    tdr=self.tdr,
153                    ingest_records=reformatted_batch,
154                    target_table_name=self.target_table_name,
155                    dataset_id=self.dataset_id,
156                    load_tag=load_tag,
157                    bulk_mode=self.bulk_mode,
158                    update_strategy=self.update_strategy,
159                    waiting_time_to_poll=self.waiting_time_to_poll
160                ).run()
161                logging.info(f"Completed batch ingest of {len(reformatted_batch)} rows")
162                if self.test_ingest:
163                    logging.info("First batch completed, exiting since test_ingest was used")
164                    sys.exit(0)
165            else:
166                logging.info("No rows to ingest in this batch after reformatting")
167        logging.info("Whole Ingest completed")
168
169
170class StartAndMonitorIngest:
171    """Class to start and monitor the ingestion of recordsinto a TDR dataset."""
172
173    def __init__(
174            self, tdr: TDR,
175            ingest_records: list[dict],
176            target_table_name: str,
177            dataset_id: str,
178            load_tag: str,
179            bulk_mode: bool,
180            update_strategy: str,
181            waiting_time_to_poll: int
182    ):
183        """
184        Initialize the StartAndMonitorIngest.
185
186        **Args:**
187        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
188        - ingest_records (list[dict]): The records to be ingested.
189        - target_table_name (str): The name of the target table.
190        - dataset_id (str): The ID of the dataset.
191        - load_tag (str): A tag to identify the load.
192        - bulk_mode (bool): Flag indicating if bulk mode should be used.
193        - update_strategy (str): The strategy for updating existing records.
194        - waiting_time_to_poll (int): The time to wait between polling for job status.
195        """
196        self.tdr = tdr
197        """@private"""
198        self.ingest_records = ingest_records
199        """@private"""
200        self.target_table_name = target_table_name
201        """@private"""
202        self.dataset_id = dataset_id
203        """@private"""
204        self.load_tag = load_tag
205        """@private"""
206        self.bulk_mode = bulk_mode
207        """@private"""
208        self.update_strategy = update_strategy
209        """@private"""
210        self.waiting_time_to_poll = waiting_time_to_poll
211        """@private"""
212
213    def _create_ingest_dataset_request(self) -> Any:
214        """
215        Create the ingestDataset request body.
216
217        Returns:
218            Any: The request body for ingesting the dataset.
219        """
220        # https://support.terra.bio/hc/en-us/articles/23460453585819-How-to-ingest-and-update-TDR-data-with-APIs
221        load_dict = {
222            "format": "array",
223            "records": self.ingest_records,
224            "table": self.target_table_name,
225            "resolve_existing_files": "true",
226            "updateStrategy": self.update_strategy,
227            "load_tag": self.load_tag,
228            "bulkMode": "true" if self.bulk_mode else "false"
229        }
230        return json.dumps(load_dict)  # dict -> json
231
232    def run(self) -> None:
233        """Run the ingestion process and monitor the job until completion."""
234        ingest_request = self._create_ingest_dataset_request()
235        logging.info(f"Starting ingest to {self.dataset_id}")
236        ingest_response = self.tdr.ingest_to_dataset(dataset_id=self.dataset_id, data=ingest_request).json()
237        MonitorTDRJob(
238            tdr=self.tdr,
239            job_id=ingest_response["id"],
240            check_interval=self.waiting_time_to_poll,
241            return_json=False
242        ).run()
243
244
245class ReformatMetricsForIngest:
246    """A class to reformat metrics for ingestion into TDR (Terra Data Repository)."""
247
248    def __init__(
249            self,
250            ingest_metadata: list[dict],
251            file_to_uuid_dict: Optional[dict] = None,
252            schema_info: Optional[dict] = None
253    ):
254        """
255        Initialize the ReformatMetricsForIngest class.
256
257        This class is used to reformat metrics for ingest.
258        Assumes input JSON will be in the following format for GCP:
259        ```
260        {
261            "file_name": blob.name,
262            "file_path": f"gs://{self.bucket_name}/{blob.name}",
263            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
264            "file_extension": os.path.splitext(blob.name)[1],
265            "size_in_bytes": blob.size,
266            "md5_hash": blob.md5_hash
267        }
268        ```
269
270        **Args:**
271        - ingest_metadata (list[dict]): The metadata to be ingested.
272        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
273            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
274            source file paths to UUIDs. If used will make ingest much quicker since no ingest
275            or look up of file needed. Defaults to None.
276        - schema_info (dict, optional): Schema information for the tables. Defaults to None.
277        """
278        self.ingest_metadata = ingest_metadata
279        """@private"""
280        self.file_prefix = "gs://"
281        """@private"""
282        self.file_to_uuid_dict = file_to_uuid_dict
283        """@private"""
284        self.schema_info = schema_info
285        """@private"""
286
287    def _add_file_ref(self, file_details: dict) -> None:
288        """
289        Create file ref for ingest.
290
291        Args:
292            file_details (dict): The details of the file to be ingested.
293        """
294        file_details["file_ref"] = {
295            "sourcePath": file_details["path"],
296            "targetPath": self._format_relative_tdr_path(file_details["path"]),
297            "description": f"Ingest of {file_details['path']}",
298            "mimeType": file_details["content_type"]
299        }
300
301    @staticmethod
302    def _format_relative_tdr_path(cloud_path: str) -> str:
303        """
304        Format cloud path to TDR path.
305
306        Args:
307            cloud_path (str): The cloud path to be formatted.
308
309        Returns:
310            str: The formatted TDR path.
311        """
312        relative_path = "/".join(cloud_path.split("/")[3:])
313        return f"/{relative_path}"
314
315    def _check_and_format_file_path(self, column_value: str) -> tuple[Any, bool]:
316        """
317        Check if column value is a gs:// path and reformat to dict with ingest information.
318
319        If file_to_uuid_dict is
320        provided then it will add existing UUID. If file_to_uuid_dict provided and file not found then will warn and
321        return None.
322
323        Args:
324            column_value (str): The column value to be checked and formatted.
325
326        Returns:
327            tuple[Any, bool]: The formatted column value and a validity flag.
328        """
329        valid = True
330        if isinstance(column_value, str):
331            if column_value.startswith(self.file_prefix):
332                if self.file_to_uuid_dict:
333                    uuid = self.file_to_uuid_dict.get(column_value)
334                    if uuid:
335                        column_value = uuid
336                        return column_value, valid
337                    else:
338                        logging.warning(
339                            f"File {column_value} not found in file_to_uuid_dict, will attempt "
340                            "to ingest as regular file and not use UUID"
341                        )
342                source_dest_mapping = {
343                    "sourcePath": column_value,
344                    "targetPath": self._format_relative_tdr_path(column_value)
345                }
346                return source_dest_mapping, valid
347        return column_value, valid
348
349    def _validate_and_update_column_for_schema(self, column_name: str, column_value: Any) -> tuple[str, bool]:
350        """
351        Check if column matches what schema expects and attempt to update if not. Changes to string at the end.
352
353        Args:
354            column_name (str): The name of the column.
355            column_value (Any): The value of the column.
356
357        Returns:
358            tuple[str, bool]: The validated and updated column value and a validity flag.
359        """
360        valid = True
361        if self.schema_info:
362            if column_name in self.schema_info.keys():
363                expected_data_type = self.schema_info[column_name]["datatype"]
364                if expected_data_type == "string" and not isinstance(column_value, str):
365                    try:
366                        column_value = str(column_value)
367                    except ValueError:
368                        logging.warning(f"Column {column_name} with value {column_value} is not a string")
369                        valid = False
370                if expected_data_type in ["int64", "integer"] and not isinstance(column_value, int):
371                    try:
372                        column_value = int(column_value)
373                    except ValueError:
374                        logging.warning(f"Column {column_name} with value {column_value} is not an integer")
375                        valid = False
376                if expected_data_type == "float64" and not isinstance(column_value, float):
377                    try:
378                        column_value = float(column_value)
379                    except ValueError:
380                        logging.warning(f"Column {column_name} with value {column_value} is not a float")
381                        valid = False
382                if expected_data_type == "boolean" and not isinstance(column_value, bool):
383                    try:
384                        column_value = bool(column_value)
385                    except ValueError:
386                        logging.warning(f"Column {column_name} with value {column_value} is not a boolean")
387                        valid = False
388                if expected_data_type in ["datetime", "date", "time"] and not isinstance(column_value, datetime):
389                    try:
390                        column_value = parser.parse(column_value)
391                    except ValueError:
392                        logging.warning(f"Column {column_name} with value {column_value} is not a datetime")
393                        valid = False
394                if expected_data_type == "array" and not isinstance(column_value, list):
395                    valid = False
396                    logging.warning(f"Column {column_name} with value {column_value} is not a list")
397                if expected_data_type == "bytes" and not isinstance(column_value, bytes):
398                    valid = False
399                    logging.warning(f"Column {column_name} with value {column_value} is not bytes")
400                if expected_data_type == "fileref" and not column_value.startswith(self.file_prefix):
401                    valid = False
402                    logging.warning(f"Column {column_name} with value {column_value} is not a file path")
403        return str(column_value), valid
404
405    def _reformat_metric(self, row_dict: dict) -> Optional[dict]:
406        """
407        Reformat metric for ingest.
408
409        Args:
410            row_dict (dict): The row dictionary to be reformatted.
411
412        Returns:
413            Optional[dict]: The reformatted row dictionary or None if invalid.
414        """
415        reformatted_dict = {}
416        row_valid = True
417        for key, value in row_dict.items():
418            if value or value == 0:
419                if self.schema_info:
420                    value, valid = self._validate_and_update_column_for_schema(key, value)
421                    if not valid:
422                        row_valid = False
423                if isinstance(value, list):
424                    updated_value_list = []
425                    for item in value:
426                        update_value, valid = self._check_and_format_file_path(item)
427                        if not valid:
428                            row_valid = False
429                        updated_value_list.append(update_value)
430                    reformatted_dict[key] = updated_value_list
431                else:
432                    update_value, valid = self._check_and_format_file_path(value)
433                    reformatted_dict[key] = update_value
434                    if not valid:
435                        row_valid = False
436        reformatted_dict["last_modified_date"] = datetime.now(tz=pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S")  # type: ignore[assignment] # noqa: E501
437        if row_valid:
438            return reformatted_dict
439        else:
440            logging.info(f"Row {json.dumps(row_dict, indent=4)} not valid and will not be included in ingest")
441            return None
442
443    def run(self) -> list[dict]:
444        """
445        Run the reformatting process for all metrics.
446
447        **Returns:**
448        - list[dict]: A list of reformatted metrics.
449        """
450        reformatted_metrics = []
451        for row_dict in self.ingest_metadata:
452            reformatted_row = self._reformat_metric(row_dict)
453            if reformatted_row:
454                reformatted_metrics.append(reformatted_row)
455        return reformatted_metrics
456
457
458class ConvertTerraTableInfoForIngest:
459    """Converts each row of table metadata into a dictionary that can be ingested into TDR."""
460
461    def __init__(
462            self,
463            table_metadata: list[dict],
464            columns_to_ignore: list[str] = [],
465            tdr_row_id: Optional[str] = None
466    ):
467        """
468        Initialize the ConvertTerraTableInfoForIngest class.
469
470        Input will look like this:
471        ```
472            [{
473              "attributes": {
474                "some_metric": 99.99,
475                "some_file_path": "gs://path/to/file",
476                "something_to_exclude": "exclude_me"
477              },
478              "entityType": "sample",
479              "name": "SM-MVVVV"
480            }]
481        ```
482        And be converted to this:
483        ```
484            [{
485              "sample_id": "SM-MVVVV",
486              "some_metric": 99.99,
487              "some_file_path": "gs://path/to/file"
488            }]
489        ```
490        **Args:**
491        - table_metadata (list[dict]): The metadata of the table to be converted.
492        - tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id.
493        - columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list.
494        """
495        self.table_metadata = table_metadata
496        """@private"""
497        if table_metadata:
498            self.tdr_row_id = tdr_row_id if tdr_row_id else f'{table_metadata[0]["entityType"]}_id'
499            """@private"""
500        else:
501            # Won't be used if table_metadata is empty but will be set to empty string
502            self.tdr_row_id = ""
503            """@private"""
504        self.columns_to_ignore = columns_to_ignore
505        """@private"""
506
507    def run(self) -> list[dict]:
508        """
509        Convert the table metadata into a format suitable for TDR ingestion.
510
511        **Returns:**
512        - list[dict]: A list of dictionaries containing the converted table metadata.
513        """
514        return [
515            {
516                self.tdr_row_id: row["name"],
517                **{k: v for k, v in row["attributes"].items()
518                   # if columns_to_ignore is not provided or if the column is not in the columns_to_ignore list
519                   if k not in self.columns_to_ignore}
520            }
521            for row in self.table_metadata
522        ]
523
524
525class FilterAndBatchIngest:
526    """Filter and batch ingest process into TDR."""
527
528    def __init__(
529            self,
530            tdr: TDR,
531            filter_existing_ids: bool,
532            unique_id_field: str,
533            table_name: str,
534            ingest_metadata: list[dict],
535            dataset_id: str,
536            ingest_waiting_time_poll: int,
537            ingest_batch_size: int,
538            bulk_mode: bool,
539            update_strategy: str,
540            load_tag: str,
541            test_ingest: bool = False,
542            file_to_uuid_dict: Optional[dict] = None,
543            schema_info: Optional[dict] = None,
544            skip_reformat: bool = False
545    ):
546        """
547        Initialize the FilterAndBatchIngest class.
548
549        **Args:**
550        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
551        - filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset.
552        - unique_id_field (str): The unique ID field to filter on.
553        - table_name (str): The name of the table to ingest data into.
554        - ingest_metadata (list[dict]): The metadata to ingest.
555        - dataset_id (str): The ID of the dataset.
556        - ingest_waiting_time_poll (int): The waiting time to poll for ingest status.
557        - ingest_batch_size (int): The batch size for ingest.
558        - bulk_mode (bool): Whether to use bulk mode for ingest.
559        - update_strategy (str): The update strategy to use.
560        - load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster.
561        - test_ingest (bool, optional): Whether to run a test ingest. Defaults to False.
562        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
563            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
564            source file paths to UUIDs. If used will make ingest much quicker since no ingest
565            or look up of file needed. Defaults to None.
566        - schema_info (dict, optional): Schema information for the tables.
567                Used to validate ingest metrics match. Defaults to None.
568        - skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False.
569        """
570        self.tdr = tdr
571        """@private"""
572        self.filter_existing_ids = filter_existing_ids
573        """@private"""
574        self.unique_id_field = unique_id_field
575        """@private"""
576        self.table_name = table_name
577        """@private"""
578        self.ingest_metadata = ingest_metadata
579        """@private"""
580        self.dataset_id = dataset_id
581        """@private"""
582        self.ingest_waiting_time_poll = ingest_waiting_time_poll
583        """@private"""
584        self.ingest_batch_size = ingest_batch_size
585        """@private"""
586        self.bulk_mode = bulk_mode
587        """@private"""
588        self.update_strategy = update_strategy
589        """@private"""
590        self.load_tag = load_tag
591        """@private"""
592        self.test_ingest = test_ingest
593        """@private"""
594        self.file_to_uuid_dict = file_to_uuid_dict
595        """@private"""
596        self.schema_info = schema_info
597        """@private"""
598        self.skip_reformat = skip_reformat
599        """@private"""
600
601    def run(self) -> None:
602        """
603        Run the filter and batch ingest process.
604
605        This method filters out sample IDs that already exist in the dataset (if specified),
606        and then performs a batch ingest of the remaining metadata into the specified table.
607        """
608        if self.filter_existing_ids:
609            # Filter out sample ids that are already in the dataset
610            filtered_metrics = FilterOutSampleIdsAlreadyInDataset(
611                ingest_metrics=self.ingest_metadata,
612                dataset_id=self.dataset_id,
613                tdr=self.tdr,
614                target_table_name=self.table_name,
615                filter_entity_id=self.unique_id_field
616            ).run()
617        else:
618            filtered_metrics = self.ingest_metadata
619        # If there are metrics to ingest then ingest them
620        if filtered_metrics:
621            # Batch ingest of table to table within dataset
622            logging.info(f"Starting ingest into {self.table_name} in dataset {self.dataset_id}")
623            BatchIngest(
624                ingest_metadata=filtered_metrics,
625                tdr=self.tdr,
626                target_table_name=self.table_name,
627                dataset_id=self.dataset_id,
628                batch_size=self.ingest_batch_size,
629                bulk_mode=self.bulk_mode,
630                update_strategy=self.update_strategy,
631                waiting_time_to_poll=self.ingest_waiting_time_poll,
632                test_ingest=self.test_ingest,
633                load_tag=self.load_tag,
634                file_to_uuid_dict=self.file_to_uuid_dict,
635                schema_info=self.schema_info,
636                skip_reformat=self.skip_reformat
637            ).run()
638
639
640class GetPermissionsForWorkspaceIngest:
641    """Obtain permissions necessary for workspace ingest."""
642
643    def __init__(self, terra_workspace: TerraWorkspace, dataset_info: dict, added_to_auth_domain: bool = False):
644        """
645        Initialize the GetPermissionsForWorkspaceIngest class.
646
647        **Args:**
648        - terra_workspace (`ops_utils.terra_util.TerraWorkspace`): Instance of the TerraWorkspace class.
649        - dataset_info (dict): Information about the dataset.
650        - added_to_auth_domain (bool, optional): Flag indicating if the SA account
651                has been added to the auth domain. Defaults to `False`.
652        """
653        self.terra_workspace = terra_workspace
654        """@private"""
655        self.dataset_info = dataset_info
656        """@private"""
657        self.added_to_auth_domain = added_to_auth_domain
658        """@private"""
659
660    def run(self) -> None:
661        """
662        Ensure the dataset SA account has the necessary permissions on the Terra workspace.
663
664        This method updates the user ACL to make the SA account a reader on the Terra workspace.
665        It also checks if the workspace has an authorization domain, and logs the
666        necessary steps to add the SA account to the auth domain.
667        """
668        # Ensure dataset SA account is reader on Terra workspace.
669        tdr_sa_account = self.dataset_info["ingestServiceAccount"]
670        self.terra_workspace.update_user_acl(email=tdr_sa_account, access_level="READER")
671
672        # Check if workspace has auth domain
673        workspace_info = self.terra_workspace.get_workspace_info().json()
674        auth_domain_list = workspace_info["workspace"]["authorizationDomain"]
675        # Attempt to add tdr_sa_account to auth domain
676        if auth_domain_list:
677            for auth_domain_dict in auth_domain_list:
678                auth_domain = auth_domain_dict["membersGroupName"]
679                logging.info(f"TDR SA account {tdr_sa_account} needs to be added to auth domain group {auth_domain}")
680            if self.added_to_auth_domain:
681                logging.info("added_to_auth_domain has been set to true so assuming account has already been added")
682            else:
683                logging.info(
684                    f"Please add TDR SA account {tdr_sa_account} to auth domain group(s) to allow  "
685                    "access to workspace and then rerun with added_to_auth_domain=True"
686                )
687                sys.exit(0)
class BatchIngest:
 20class BatchIngest:
 21    """A class to handle batch ingestion of metadata into TDR (Terra Data Repository)."""
 22
 23    def __init__(
 24            self,
 25            ingest_metadata: list[dict],
 26            tdr: TDR,
 27            target_table_name: str,
 28            dataset_id: str,
 29            batch_size: int,
 30            bulk_mode: bool,
 31            update_strategy: str = "replace",
 32            waiting_time_to_poll: int = ARG_DEFAULTS["waiting_time_to_poll"],  # type: ignore[assignment]
 33            test_ingest: bool = False,
 34            load_tag: Optional[str] = None,
 35            file_to_uuid_dict: Optional[dict] = None,
 36            schema_info: Optional[dict] = None,
 37            skip_reformat: bool = False
 38    ):
 39        """
 40        Initialize the BatchIngest class.
 41
 42        **Args:**
 43        - ingest_metadata (list[dict]): The metadata to be ingested.
 44        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 45        - target_table_name (str): The name of the target table.
 46        - dataset_id (str): The ID of the dataset.
 47        - batch_size (int): The size of each batch for ingestion.
 48        - bulk_mode (bool): Flag indicating if bulk mode should be used.
 49        - update_strategy (str, optional): The strategy for updating existing records. Defaults to `replace`.
 50        - waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to `90`.
 51        - test_ingest (bool, optional): Flag indicating if only the first batch should be
 52                ingested for testing. Defaults to `False`.
 53        - load_tag (str, optional): A tag to identify the load. Used so future ingests
 54                can pick up where left off. Defaults to None.
 55        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
 56            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
 57            source file paths to UUIDs. If used will make ingest much quicker since no ingest
 58            or look up of file needed. Defaults to None.
 59        - schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up
 60                with schema info. Defaults to None.
 61        - skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to `False`.
 62        """
 63        self.ingest_metadata = self._reformat_for_type_consistency(ingest_metadata)
 64        """@private"""
 65        self.tdr = tdr
 66        """@private"""
 67        self.target_table_name = target_table_name
 68        """@private"""
 69        self.dataset_id = dataset_id
 70        """@private"""
 71        self.batch_size = int(batch_size)
 72        """@private"""
 73        self.update_strategy = update_strategy
 74        """@private"""
 75        self.bulk_mode = bulk_mode
 76        """@private"""
 77        self.waiting_time_to_poll = waiting_time_to_poll
 78        """@private"""
 79        # Used if you want to run first batch and then exit after success
 80        self.test_ingest = test_ingest
 81        """@private"""
 82        self.load_tag = load_tag
 83        """@private"""
 84        self.file_to_uuid_dict = file_to_uuid_dict
 85        """@private"""
 86        # Used if you want to provide schema info for tables to make sure values match.
 87        # Should be dict with key being column name and value being dict with datatype
 88        self.schema_info = schema_info
 89        """@private"""
 90        # Use if input is already formatted correctly for ingest
 91        self.skip_reformat = skip_reformat
 92        """@private"""
 93
 94    @staticmethod
 95    def _reformat_for_type_consistency(ingest_metadata: list[dict]) -> list[dict]:
 96        """
 97        Reformats ingest metadata and finds headers where values are a mix of lists and non-lists.
 98
 99        If there is mix of these types of values, it converts the non-array to a one-item list. The updated metadata
100        is then returned to be used for everything downstream
101        """
102        unique_headers = sorted({key for item in ingest_metadata for key in item.keys()})
103
104        headers_containing_mismatch = []
105        for header in unique_headers:
106            all_values_for_header = [r.get(header) for r in ingest_metadata]
107            # Find headers where some values are lists and some are not (while filtering out None values)
108            if any(isinstance(value, list) for value in all_values_for_header if value is not None) and not all(
109                    isinstance(value, list) for value in all_values_for_header if value is not None):
110                logging.info(
111                    f"Header {header} contains lists and non-list items. Will convert the non-list items into a list"
112                )
113                headers_containing_mismatch.append(header)
114
115        updated_metadata = []
116        for record in ingest_metadata:
117            new_record = {}
118            for header, value in record.items():
119                if header in headers_containing_mismatch:
120                    updated_value = [value] if not isinstance(value, list) else value
121                else:
122                    updated_value = value
123                new_record[header] = updated_value
124            updated_metadata.append(new_record)
125
126        return updated_metadata
127
128    def run(self) -> None:
129        """Run the batch ingestion process."""
130        logging.info(
131            f"Batching {len(self.ingest_metadata)} total rows into batches of {self.batch_size} for ingest")
132        total_batches = math.ceil(len(self.ingest_metadata) / self.batch_size)
133        for i in range(0, len(self.ingest_metadata), self.batch_size):
134            batch_number = i // self.batch_size + 1
135            logging.info(f"Starting ingest batch {batch_number} of {total_batches} into table {self.target_table_name}")
136            metrics_batch = self.ingest_metadata[i:i + self.batch_size]
137            if self.skip_reformat:
138                reformatted_batch = metrics_batch
139            else:
140                reformatted_batch = ReformatMetricsForIngest(
141                    ingest_metadata=metrics_batch,
142                    file_to_uuid_dict=self.file_to_uuid_dict,
143                    schema_info=self.schema_info
144                ).run()
145
146            if self.load_tag:
147                load_tag = self.load_tag
148            else:
149                load_tag = f"{self.dataset_id}.{self.target_table_name}"
150            # Start actual ingest
151            if reformatted_batch:
152                StartAndMonitorIngest(
153                    tdr=self.tdr,
154                    ingest_records=reformatted_batch,
155                    target_table_name=self.target_table_name,
156                    dataset_id=self.dataset_id,
157                    load_tag=load_tag,
158                    bulk_mode=self.bulk_mode,
159                    update_strategy=self.update_strategy,
160                    waiting_time_to_poll=self.waiting_time_to_poll
161                ).run()
162                logging.info(f"Completed batch ingest of {len(reformatted_batch)} rows")
163                if self.test_ingest:
164                    logging.info("First batch completed, exiting since test_ingest was used")
165                    sys.exit(0)
166            else:
167                logging.info("No rows to ingest in this batch after reformatting")
168        logging.info("Whole Ingest completed")

A class to handle batch ingestion of metadata into TDR (Terra Data Repository).

BatchIngest( ingest_metadata: list[dict], tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, target_table_name: str, dataset_id: str, batch_size: int, bulk_mode: bool, update_strategy: str = 'replace', waiting_time_to_poll: int = 90, test_ingest: bool = False, load_tag: Optional[str] = None, file_to_uuid_dict: Optional[dict] = None, schema_info: Optional[dict] = None, skip_reformat: bool = False)
23    def __init__(
24            self,
25            ingest_metadata: list[dict],
26            tdr: TDR,
27            target_table_name: str,
28            dataset_id: str,
29            batch_size: int,
30            bulk_mode: bool,
31            update_strategy: str = "replace",
32            waiting_time_to_poll: int = ARG_DEFAULTS["waiting_time_to_poll"],  # type: ignore[assignment]
33            test_ingest: bool = False,
34            load_tag: Optional[str] = None,
35            file_to_uuid_dict: Optional[dict] = None,
36            schema_info: Optional[dict] = None,
37            skip_reformat: bool = False
38    ):
39        """
40        Initialize the BatchIngest class.
41
42        **Args:**
43        - ingest_metadata (list[dict]): The metadata to be ingested.
44        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
45        - target_table_name (str): The name of the target table.
46        - dataset_id (str): The ID of the dataset.
47        - batch_size (int): The size of each batch for ingestion.
48        - bulk_mode (bool): Flag indicating if bulk mode should be used.
49        - update_strategy (str, optional): The strategy for updating existing records. Defaults to `replace`.
50        - waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to `90`.
51        - test_ingest (bool, optional): Flag indicating if only the first batch should be
52                ingested for testing. Defaults to `False`.
53        - load_tag (str, optional): A tag to identify the load. Used so future ingests
54                can pick up where left off. Defaults to None.
55        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
56            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
57            source file paths to UUIDs. If used will make ingest much quicker since no ingest
58            or look up of file needed. Defaults to None.
59        - schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up
60                with schema info. Defaults to None.
61        - skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to `False`.
62        """
63        self.ingest_metadata = self._reformat_for_type_consistency(ingest_metadata)
64        """@private"""
65        self.tdr = tdr
66        """@private"""
67        self.target_table_name = target_table_name
68        """@private"""
69        self.dataset_id = dataset_id
70        """@private"""
71        self.batch_size = int(batch_size)
72        """@private"""
73        self.update_strategy = update_strategy
74        """@private"""
75        self.bulk_mode = bulk_mode
76        """@private"""
77        self.waiting_time_to_poll = waiting_time_to_poll
78        """@private"""
79        # Used if you want to run first batch and then exit after success
80        self.test_ingest = test_ingest
81        """@private"""
82        self.load_tag = load_tag
83        """@private"""
84        self.file_to_uuid_dict = file_to_uuid_dict
85        """@private"""
86        # Used if you want to provide schema info for tables to make sure values match.
87        # Should be dict with key being column name and value being dict with datatype
88        self.schema_info = schema_info
89        """@private"""
90        # Use if input is already formatted correctly for ingest
91        self.skip_reformat = skip_reformat
92        """@private"""

Initialize the BatchIngest class.

Args:

  • ingest_metadata (list[dict]): The metadata to be ingested.
  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): An instance of the TDR class.
  • target_table_name (str): The name of the target table.
  • dataset_id (str): The ID of the dataset.
  • batch_size (int): The size of each batch for ingestion.
  • bulk_mode (bool): Flag indicating if bulk mode should be used.
  • update_strategy (str, optional): The strategy for updating existing records. Defaults to replace.
  • waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to 90.
  • test_ingest (bool, optional): Flag indicating if only the first batch should be ingested for testing. Defaults to False.
  • load_tag (str, optional): A tag to identify the load. Used so future ingests can pick up where left off. Defaults to None.
  • file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping source file paths to UUIDs. If used will make ingest much quicker since no ingest or look up of file needed. Defaults to None.
  • schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up with schema info. Defaults to None.
  • skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to False.
def run(self) -> None:
128    def run(self) -> None:
129        """Run the batch ingestion process."""
130        logging.info(
131            f"Batching {len(self.ingest_metadata)} total rows into batches of {self.batch_size} for ingest")
132        total_batches = math.ceil(len(self.ingest_metadata) / self.batch_size)
133        for i in range(0, len(self.ingest_metadata), self.batch_size):
134            batch_number = i // self.batch_size + 1
135            logging.info(f"Starting ingest batch {batch_number} of {total_batches} into table {self.target_table_name}")
136            metrics_batch = self.ingest_metadata[i:i + self.batch_size]
137            if self.skip_reformat:
138                reformatted_batch = metrics_batch
139            else:
140                reformatted_batch = ReformatMetricsForIngest(
141                    ingest_metadata=metrics_batch,
142                    file_to_uuid_dict=self.file_to_uuid_dict,
143                    schema_info=self.schema_info
144                ).run()
145
146            if self.load_tag:
147                load_tag = self.load_tag
148            else:
149                load_tag = f"{self.dataset_id}.{self.target_table_name}"
150            # Start actual ingest
151            if reformatted_batch:
152                StartAndMonitorIngest(
153                    tdr=self.tdr,
154                    ingest_records=reformatted_batch,
155                    target_table_name=self.target_table_name,
156                    dataset_id=self.dataset_id,
157                    load_tag=load_tag,
158                    bulk_mode=self.bulk_mode,
159                    update_strategy=self.update_strategy,
160                    waiting_time_to_poll=self.waiting_time_to_poll
161                ).run()
162                logging.info(f"Completed batch ingest of {len(reformatted_batch)} rows")
163                if self.test_ingest:
164                    logging.info("First batch completed, exiting since test_ingest was used")
165                    sys.exit(0)
166            else:
167                logging.info("No rows to ingest in this batch after reformatting")
168        logging.info("Whole Ingest completed")

Run the batch ingestion process.

class StartAndMonitorIngest:
171class StartAndMonitorIngest:
172    """Class to start and monitor the ingestion of recordsinto a TDR dataset."""
173
174    def __init__(
175            self, tdr: TDR,
176            ingest_records: list[dict],
177            target_table_name: str,
178            dataset_id: str,
179            load_tag: str,
180            bulk_mode: bool,
181            update_strategy: str,
182            waiting_time_to_poll: int
183    ):
184        """
185        Initialize the StartAndMonitorIngest.
186
187        **Args:**
188        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
189        - ingest_records (list[dict]): The records to be ingested.
190        - target_table_name (str): The name of the target table.
191        - dataset_id (str): The ID of the dataset.
192        - load_tag (str): A tag to identify the load.
193        - bulk_mode (bool): Flag indicating if bulk mode should be used.
194        - update_strategy (str): The strategy for updating existing records.
195        - waiting_time_to_poll (int): The time to wait between polling for job status.
196        """
197        self.tdr = tdr
198        """@private"""
199        self.ingest_records = ingest_records
200        """@private"""
201        self.target_table_name = target_table_name
202        """@private"""
203        self.dataset_id = dataset_id
204        """@private"""
205        self.load_tag = load_tag
206        """@private"""
207        self.bulk_mode = bulk_mode
208        """@private"""
209        self.update_strategy = update_strategy
210        """@private"""
211        self.waiting_time_to_poll = waiting_time_to_poll
212        """@private"""
213
214    def _create_ingest_dataset_request(self) -> Any:
215        """
216        Create the ingestDataset request body.
217
218        Returns:
219            Any: The request body for ingesting the dataset.
220        """
221        # https://support.terra.bio/hc/en-us/articles/23460453585819-How-to-ingest-and-update-TDR-data-with-APIs
222        load_dict = {
223            "format": "array",
224            "records": self.ingest_records,
225            "table": self.target_table_name,
226            "resolve_existing_files": "true",
227            "updateStrategy": self.update_strategy,
228            "load_tag": self.load_tag,
229            "bulkMode": "true" if self.bulk_mode else "false"
230        }
231        return json.dumps(load_dict)  # dict -> json
232
233    def run(self) -> None:
234        """Run the ingestion process and monitor the job until completion."""
235        ingest_request = self._create_ingest_dataset_request()
236        logging.info(f"Starting ingest to {self.dataset_id}")
237        ingest_response = self.tdr.ingest_to_dataset(dataset_id=self.dataset_id, data=ingest_request).json()
238        MonitorTDRJob(
239            tdr=self.tdr,
240            job_id=ingest_response["id"],
241            check_interval=self.waiting_time_to_poll,
242            return_json=False
243        ).run()

Class to start and monitor the ingestion of recordsinto a TDR dataset.

StartAndMonitorIngest( tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, ingest_records: list[dict], target_table_name: str, dataset_id: str, load_tag: str, bulk_mode: bool, update_strategy: str, waiting_time_to_poll: int)
174    def __init__(
175            self, tdr: TDR,
176            ingest_records: list[dict],
177            target_table_name: str,
178            dataset_id: str,
179            load_tag: str,
180            bulk_mode: bool,
181            update_strategy: str,
182            waiting_time_to_poll: int
183    ):
184        """
185        Initialize the StartAndMonitorIngest.
186
187        **Args:**
188        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
189        - ingest_records (list[dict]): The records to be ingested.
190        - target_table_name (str): The name of the target table.
191        - dataset_id (str): The ID of the dataset.
192        - load_tag (str): A tag to identify the load.
193        - bulk_mode (bool): Flag indicating if bulk mode should be used.
194        - update_strategy (str): The strategy for updating existing records.
195        - waiting_time_to_poll (int): The time to wait between polling for job status.
196        """
197        self.tdr = tdr
198        """@private"""
199        self.ingest_records = ingest_records
200        """@private"""
201        self.target_table_name = target_table_name
202        """@private"""
203        self.dataset_id = dataset_id
204        """@private"""
205        self.load_tag = load_tag
206        """@private"""
207        self.bulk_mode = bulk_mode
208        """@private"""
209        self.update_strategy = update_strategy
210        """@private"""
211        self.waiting_time_to_poll = waiting_time_to_poll
212        """@private"""

Initialize the StartAndMonitorIngest.

Args:

  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): An instance of the TDR class.
  • ingest_records (list[dict]): The records to be ingested.
  • target_table_name (str): The name of the target table.
  • dataset_id (str): The ID of the dataset.
  • load_tag (str): A tag to identify the load.
  • bulk_mode (bool): Flag indicating if bulk mode should be used.
  • update_strategy (str): The strategy for updating existing records.
  • waiting_time_to_poll (int): The time to wait between polling for job status.
def run(self) -> None:
233    def run(self) -> None:
234        """Run the ingestion process and monitor the job until completion."""
235        ingest_request = self._create_ingest_dataset_request()
236        logging.info(f"Starting ingest to {self.dataset_id}")
237        ingest_response = self.tdr.ingest_to_dataset(dataset_id=self.dataset_id, data=ingest_request).json()
238        MonitorTDRJob(
239            tdr=self.tdr,
240            job_id=ingest_response["id"],
241            check_interval=self.waiting_time_to_poll,
242            return_json=False
243        ).run()

Run the ingestion process and monitor the job until completion.

class ReformatMetricsForIngest:
246class ReformatMetricsForIngest:
247    """A class to reformat metrics for ingestion into TDR (Terra Data Repository)."""
248
249    def __init__(
250            self,
251            ingest_metadata: list[dict],
252            file_to_uuid_dict: Optional[dict] = None,
253            schema_info: Optional[dict] = None
254    ):
255        """
256        Initialize the ReformatMetricsForIngest class.
257
258        This class is used to reformat metrics for ingest.
259        Assumes input JSON will be in the following format for GCP:
260        ```
261        {
262            "file_name": blob.name,
263            "file_path": f"gs://{self.bucket_name}/{blob.name}",
264            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
265            "file_extension": os.path.splitext(blob.name)[1],
266            "size_in_bytes": blob.size,
267            "md5_hash": blob.md5_hash
268        }
269        ```
270
271        **Args:**
272        - ingest_metadata (list[dict]): The metadata to be ingested.
273        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
274            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
275            source file paths to UUIDs. If used will make ingest much quicker since no ingest
276            or look up of file needed. Defaults to None.
277        - schema_info (dict, optional): Schema information for the tables. Defaults to None.
278        """
279        self.ingest_metadata = ingest_metadata
280        """@private"""
281        self.file_prefix = "gs://"
282        """@private"""
283        self.file_to_uuid_dict = file_to_uuid_dict
284        """@private"""
285        self.schema_info = schema_info
286        """@private"""
287
288    def _add_file_ref(self, file_details: dict) -> None:
289        """
290        Create file ref for ingest.
291
292        Args:
293            file_details (dict): The details of the file to be ingested.
294        """
295        file_details["file_ref"] = {
296            "sourcePath": file_details["path"],
297            "targetPath": self._format_relative_tdr_path(file_details["path"]),
298            "description": f"Ingest of {file_details['path']}",
299            "mimeType": file_details["content_type"]
300        }
301
302    @staticmethod
303    def _format_relative_tdr_path(cloud_path: str) -> str:
304        """
305        Format cloud path to TDR path.
306
307        Args:
308            cloud_path (str): The cloud path to be formatted.
309
310        Returns:
311            str: The formatted TDR path.
312        """
313        relative_path = "/".join(cloud_path.split("/")[3:])
314        return f"/{relative_path}"
315
316    def _check_and_format_file_path(self, column_value: str) -> tuple[Any, bool]:
317        """
318        Check if column value is a gs:// path and reformat to dict with ingest information.
319
320        If file_to_uuid_dict is
321        provided then it will add existing UUID. If file_to_uuid_dict provided and file not found then will warn and
322        return None.
323
324        Args:
325            column_value (str): The column value to be checked and formatted.
326
327        Returns:
328            tuple[Any, bool]: The formatted column value and a validity flag.
329        """
330        valid = True
331        if isinstance(column_value, str):
332            if column_value.startswith(self.file_prefix):
333                if self.file_to_uuid_dict:
334                    uuid = self.file_to_uuid_dict.get(column_value)
335                    if uuid:
336                        column_value = uuid
337                        return column_value, valid
338                    else:
339                        logging.warning(
340                            f"File {column_value} not found in file_to_uuid_dict, will attempt "
341                            "to ingest as regular file and not use UUID"
342                        )
343                source_dest_mapping = {
344                    "sourcePath": column_value,
345                    "targetPath": self._format_relative_tdr_path(column_value)
346                }
347                return source_dest_mapping, valid
348        return column_value, valid
349
350    def _validate_and_update_column_for_schema(self, column_name: str, column_value: Any) -> tuple[str, bool]:
351        """
352        Check if column matches what schema expects and attempt to update if not. Changes to string at the end.
353
354        Args:
355            column_name (str): The name of the column.
356            column_value (Any): The value of the column.
357
358        Returns:
359            tuple[str, bool]: The validated and updated column value and a validity flag.
360        """
361        valid = True
362        if self.schema_info:
363            if column_name in self.schema_info.keys():
364                expected_data_type = self.schema_info[column_name]["datatype"]
365                if expected_data_type == "string" and not isinstance(column_value, str):
366                    try:
367                        column_value = str(column_value)
368                    except ValueError:
369                        logging.warning(f"Column {column_name} with value {column_value} is not a string")
370                        valid = False
371                if expected_data_type in ["int64", "integer"] and not isinstance(column_value, int):
372                    try:
373                        column_value = int(column_value)
374                    except ValueError:
375                        logging.warning(f"Column {column_name} with value {column_value} is not an integer")
376                        valid = False
377                if expected_data_type == "float64" and not isinstance(column_value, float):
378                    try:
379                        column_value = float(column_value)
380                    except ValueError:
381                        logging.warning(f"Column {column_name} with value {column_value} is not a float")
382                        valid = False
383                if expected_data_type == "boolean" and not isinstance(column_value, bool):
384                    try:
385                        column_value = bool(column_value)
386                    except ValueError:
387                        logging.warning(f"Column {column_name} with value {column_value} is not a boolean")
388                        valid = False
389                if expected_data_type in ["datetime", "date", "time"] and not isinstance(column_value, datetime):
390                    try:
391                        column_value = parser.parse(column_value)
392                    except ValueError:
393                        logging.warning(f"Column {column_name} with value {column_value} is not a datetime")
394                        valid = False
395                if expected_data_type == "array" and not isinstance(column_value, list):
396                    valid = False
397                    logging.warning(f"Column {column_name} with value {column_value} is not a list")
398                if expected_data_type == "bytes" and not isinstance(column_value, bytes):
399                    valid = False
400                    logging.warning(f"Column {column_name} with value {column_value} is not bytes")
401                if expected_data_type == "fileref" and not column_value.startswith(self.file_prefix):
402                    valid = False
403                    logging.warning(f"Column {column_name} with value {column_value} is not a file path")
404        return str(column_value), valid
405
406    def _reformat_metric(self, row_dict: dict) -> Optional[dict]:
407        """
408        Reformat metric for ingest.
409
410        Args:
411            row_dict (dict): The row dictionary to be reformatted.
412
413        Returns:
414            Optional[dict]: The reformatted row dictionary or None if invalid.
415        """
416        reformatted_dict = {}
417        row_valid = True
418        for key, value in row_dict.items():
419            if value or value == 0:
420                if self.schema_info:
421                    value, valid = self._validate_and_update_column_for_schema(key, value)
422                    if not valid:
423                        row_valid = False
424                if isinstance(value, list):
425                    updated_value_list = []
426                    for item in value:
427                        update_value, valid = self._check_and_format_file_path(item)
428                        if not valid:
429                            row_valid = False
430                        updated_value_list.append(update_value)
431                    reformatted_dict[key] = updated_value_list
432                else:
433                    update_value, valid = self._check_and_format_file_path(value)
434                    reformatted_dict[key] = update_value
435                    if not valid:
436                        row_valid = False
437        reformatted_dict["last_modified_date"] = datetime.now(tz=pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S")  # type: ignore[assignment] # noqa: E501
438        if row_valid:
439            return reformatted_dict
440        else:
441            logging.info(f"Row {json.dumps(row_dict, indent=4)} not valid and will not be included in ingest")
442            return None
443
444    def run(self) -> list[dict]:
445        """
446        Run the reformatting process for all metrics.
447
448        **Returns:**
449        - list[dict]: A list of reformatted metrics.
450        """
451        reformatted_metrics = []
452        for row_dict in self.ingest_metadata:
453            reformatted_row = self._reformat_metric(row_dict)
454            if reformatted_row:
455                reformatted_metrics.append(reformatted_row)
456        return reformatted_metrics

A class to reformat metrics for ingestion into TDR (Terra Data Repository).

ReformatMetricsForIngest( ingest_metadata: list[dict], file_to_uuid_dict: Optional[dict] = None, schema_info: Optional[dict] = None)
249    def __init__(
250            self,
251            ingest_metadata: list[dict],
252            file_to_uuid_dict: Optional[dict] = None,
253            schema_info: Optional[dict] = None
254    ):
255        """
256        Initialize the ReformatMetricsForIngest class.
257
258        This class is used to reformat metrics for ingest.
259        Assumes input JSON will be in the following format for GCP:
260        ```
261        {
262            "file_name": blob.name,
263            "file_path": f"gs://{self.bucket_name}/{blob.name}",
264            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
265            "file_extension": os.path.splitext(blob.name)[1],
266            "size_in_bytes": blob.size,
267            "md5_hash": blob.md5_hash
268        }
269        ```
270
271        **Args:**
272        - ingest_metadata (list[dict]): The metadata to be ingested.
273        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
274            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
275            source file paths to UUIDs. If used will make ingest much quicker since no ingest
276            or look up of file needed. Defaults to None.
277        - schema_info (dict, optional): Schema information for the tables. Defaults to None.
278        """
279        self.ingest_metadata = ingest_metadata
280        """@private"""
281        self.file_prefix = "gs://"
282        """@private"""
283        self.file_to_uuid_dict = file_to_uuid_dict
284        """@private"""
285        self.schema_info = schema_info
286        """@private"""

Initialize the ReformatMetricsForIngest class.

This class is used to reformat metrics for ingest. Assumes input JSON will be in the following format for GCP:

{
    "file_name": blob.name,
    "file_path": f"gs://{self.bucket_name}/{blob.name}",
    "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
    "file_extension": os.path.splitext(blob.name)[1],
    "size_in_bytes": blob.size,
    "md5_hash": blob.md5_hash
}

Args:

  • ingest_metadata (list[dict]): The metadata to be ingested.
  • file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping source file paths to UUIDs. If used will make ingest much quicker since no ingest or look up of file needed. Defaults to None.
  • schema_info (dict, optional): Schema information for the tables. Defaults to None.
def run(self) -> list[dict]:
444    def run(self) -> list[dict]:
445        """
446        Run the reformatting process for all metrics.
447
448        **Returns:**
449        - list[dict]: A list of reformatted metrics.
450        """
451        reformatted_metrics = []
452        for row_dict in self.ingest_metadata:
453            reformatted_row = self._reformat_metric(row_dict)
454            if reformatted_row:
455                reformatted_metrics.append(reformatted_row)
456        return reformatted_metrics

Run the reformatting process for all metrics.

Returns:

  • list[dict]: A list of reformatted metrics.
class ConvertTerraTableInfoForIngest:
459class ConvertTerraTableInfoForIngest:
460    """Converts each row of table metadata into a dictionary that can be ingested into TDR."""
461
462    def __init__(
463            self,
464            table_metadata: list[dict],
465            columns_to_ignore: list[str] = [],
466            tdr_row_id: Optional[str] = None
467    ):
468        """
469        Initialize the ConvertTerraTableInfoForIngest class.
470
471        Input will look like this:
472        ```
473            [{
474              "attributes": {
475                "some_metric": 99.99,
476                "some_file_path": "gs://path/to/file",
477                "something_to_exclude": "exclude_me"
478              },
479              "entityType": "sample",
480              "name": "SM-MVVVV"
481            }]
482        ```
483        And be converted to this:
484        ```
485            [{
486              "sample_id": "SM-MVVVV",
487              "some_metric": 99.99,
488              "some_file_path": "gs://path/to/file"
489            }]
490        ```
491        **Args:**
492        - table_metadata (list[dict]): The metadata of the table to be converted.
493        - tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id.
494        - columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list.
495        """
496        self.table_metadata = table_metadata
497        """@private"""
498        if table_metadata:
499            self.tdr_row_id = tdr_row_id if tdr_row_id else f'{table_metadata[0]["entityType"]}_id'
500            """@private"""
501        else:
502            # Won't be used if table_metadata is empty but will be set to empty string
503            self.tdr_row_id = ""
504            """@private"""
505        self.columns_to_ignore = columns_to_ignore
506        """@private"""
507
508    def run(self) -> list[dict]:
509        """
510        Convert the table metadata into a format suitable for TDR ingestion.
511
512        **Returns:**
513        - list[dict]: A list of dictionaries containing the converted table metadata.
514        """
515        return [
516            {
517                self.tdr_row_id: row["name"],
518                **{k: v for k, v in row["attributes"].items()
519                   # if columns_to_ignore is not provided or if the column is not in the columns_to_ignore list
520                   if k not in self.columns_to_ignore}
521            }
522            for row in self.table_metadata
523        ]

Converts each row of table metadata into a dictionary that can be ingested into TDR.

ConvertTerraTableInfoForIngest( table_metadata: list[dict], columns_to_ignore: list[str] = [], tdr_row_id: Optional[str] = None)
462    def __init__(
463            self,
464            table_metadata: list[dict],
465            columns_to_ignore: list[str] = [],
466            tdr_row_id: Optional[str] = None
467    ):
468        """
469        Initialize the ConvertTerraTableInfoForIngest class.
470
471        Input will look like this:
472        ```
473            [{
474              "attributes": {
475                "some_metric": 99.99,
476                "some_file_path": "gs://path/to/file",
477                "something_to_exclude": "exclude_me"
478              },
479              "entityType": "sample",
480              "name": "SM-MVVVV"
481            }]
482        ```
483        And be converted to this:
484        ```
485            [{
486              "sample_id": "SM-MVVVV",
487              "some_metric": 99.99,
488              "some_file_path": "gs://path/to/file"
489            }]
490        ```
491        **Args:**
492        - table_metadata (list[dict]): The metadata of the table to be converted.
493        - tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id.
494        - columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list.
495        """
496        self.table_metadata = table_metadata
497        """@private"""
498        if table_metadata:
499            self.tdr_row_id = tdr_row_id if tdr_row_id else f'{table_metadata[0]["entityType"]}_id'
500            """@private"""
501        else:
502            # Won't be used if table_metadata is empty but will be set to empty string
503            self.tdr_row_id = ""
504            """@private"""
505        self.columns_to_ignore = columns_to_ignore
506        """@private"""

Initialize the ConvertTerraTableInfoForIngest class.

Input will look like this:

    [{
      "attributes": {
        "some_metric": 99.99,
        "some_file_path": "gs://path/to/file",
        "something_to_exclude": "exclude_me"
      },
      "entityType": "sample",
      "name": "SM-MVVVV"
    }]

And be converted to this:

    [{
      "sample_id": "SM-MVVVV",
      "some_metric": 99.99,
      "some_file_path": "gs://path/to/file"
    }]

Args:

  • table_metadata (list[dict]): The metadata of the table to be converted.
  • tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id.
  • columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list.
def run(self) -> list[dict]:
508    def run(self) -> list[dict]:
509        """
510        Convert the table metadata into a format suitable for TDR ingestion.
511
512        **Returns:**
513        - list[dict]: A list of dictionaries containing the converted table metadata.
514        """
515        return [
516            {
517                self.tdr_row_id: row["name"],
518                **{k: v for k, v in row["attributes"].items()
519                   # if columns_to_ignore is not provided or if the column is not in the columns_to_ignore list
520                   if k not in self.columns_to_ignore}
521            }
522            for row in self.table_metadata
523        ]

Convert the table metadata into a format suitable for TDR ingestion.

Returns:

  • list[dict]: A list of dictionaries containing the converted table metadata.
class FilterAndBatchIngest:
526class FilterAndBatchIngest:
527    """Filter and batch ingest process into TDR."""
528
529    def __init__(
530            self,
531            tdr: TDR,
532            filter_existing_ids: bool,
533            unique_id_field: str,
534            table_name: str,
535            ingest_metadata: list[dict],
536            dataset_id: str,
537            ingest_waiting_time_poll: int,
538            ingest_batch_size: int,
539            bulk_mode: bool,
540            update_strategy: str,
541            load_tag: str,
542            test_ingest: bool = False,
543            file_to_uuid_dict: Optional[dict] = None,
544            schema_info: Optional[dict] = None,
545            skip_reformat: bool = False
546    ):
547        """
548        Initialize the FilterAndBatchIngest class.
549
550        **Args:**
551        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
552        - filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset.
553        - unique_id_field (str): The unique ID field to filter on.
554        - table_name (str): The name of the table to ingest data into.
555        - ingest_metadata (list[dict]): The metadata to ingest.
556        - dataset_id (str): The ID of the dataset.
557        - ingest_waiting_time_poll (int): The waiting time to poll for ingest status.
558        - ingest_batch_size (int): The batch size for ingest.
559        - bulk_mode (bool): Whether to use bulk mode for ingest.
560        - update_strategy (str): The update strategy to use.
561        - load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster.
562        - test_ingest (bool, optional): Whether to run a test ingest. Defaults to False.
563        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
564            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
565            source file paths to UUIDs. If used will make ingest much quicker since no ingest
566            or look up of file needed. Defaults to None.
567        - schema_info (dict, optional): Schema information for the tables.
568                Used to validate ingest metrics match. Defaults to None.
569        - skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False.
570        """
571        self.tdr = tdr
572        """@private"""
573        self.filter_existing_ids = filter_existing_ids
574        """@private"""
575        self.unique_id_field = unique_id_field
576        """@private"""
577        self.table_name = table_name
578        """@private"""
579        self.ingest_metadata = ingest_metadata
580        """@private"""
581        self.dataset_id = dataset_id
582        """@private"""
583        self.ingest_waiting_time_poll = ingest_waiting_time_poll
584        """@private"""
585        self.ingest_batch_size = ingest_batch_size
586        """@private"""
587        self.bulk_mode = bulk_mode
588        """@private"""
589        self.update_strategy = update_strategy
590        """@private"""
591        self.load_tag = load_tag
592        """@private"""
593        self.test_ingest = test_ingest
594        """@private"""
595        self.file_to_uuid_dict = file_to_uuid_dict
596        """@private"""
597        self.schema_info = schema_info
598        """@private"""
599        self.skip_reformat = skip_reformat
600        """@private"""
601
602    def run(self) -> None:
603        """
604        Run the filter and batch ingest process.
605
606        This method filters out sample IDs that already exist in the dataset (if specified),
607        and then performs a batch ingest of the remaining metadata into the specified table.
608        """
609        if self.filter_existing_ids:
610            # Filter out sample ids that are already in the dataset
611            filtered_metrics = FilterOutSampleIdsAlreadyInDataset(
612                ingest_metrics=self.ingest_metadata,
613                dataset_id=self.dataset_id,
614                tdr=self.tdr,
615                target_table_name=self.table_name,
616                filter_entity_id=self.unique_id_field
617            ).run()
618        else:
619            filtered_metrics = self.ingest_metadata
620        # If there are metrics to ingest then ingest them
621        if filtered_metrics:
622            # Batch ingest of table to table within dataset
623            logging.info(f"Starting ingest into {self.table_name} in dataset {self.dataset_id}")
624            BatchIngest(
625                ingest_metadata=filtered_metrics,
626                tdr=self.tdr,
627                target_table_name=self.table_name,
628                dataset_id=self.dataset_id,
629                batch_size=self.ingest_batch_size,
630                bulk_mode=self.bulk_mode,
631                update_strategy=self.update_strategy,
632                waiting_time_to_poll=self.ingest_waiting_time_poll,
633                test_ingest=self.test_ingest,
634                load_tag=self.load_tag,
635                file_to_uuid_dict=self.file_to_uuid_dict,
636                schema_info=self.schema_info,
637                skip_reformat=self.skip_reformat
638            ).run()

Filter and batch ingest process into TDR.

FilterAndBatchIngest( tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, filter_existing_ids: bool, unique_id_field: str, table_name: str, ingest_metadata: list[dict], dataset_id: str, ingest_waiting_time_poll: int, ingest_batch_size: int, bulk_mode: bool, update_strategy: str, load_tag: str, test_ingest: bool = False, file_to_uuid_dict: Optional[dict] = None, schema_info: Optional[dict] = None, skip_reformat: bool = False)
529    def __init__(
530            self,
531            tdr: TDR,
532            filter_existing_ids: bool,
533            unique_id_field: str,
534            table_name: str,
535            ingest_metadata: list[dict],
536            dataset_id: str,
537            ingest_waiting_time_poll: int,
538            ingest_batch_size: int,
539            bulk_mode: bool,
540            update_strategy: str,
541            load_tag: str,
542            test_ingest: bool = False,
543            file_to_uuid_dict: Optional[dict] = None,
544            schema_info: Optional[dict] = None,
545            skip_reformat: bool = False
546    ):
547        """
548        Initialize the FilterAndBatchIngest class.
549
550        **Args:**
551        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
552        - filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset.
553        - unique_id_field (str): The unique ID field to filter on.
554        - table_name (str): The name of the table to ingest data into.
555        - ingest_metadata (list[dict]): The metadata to ingest.
556        - dataset_id (str): The ID of the dataset.
557        - ingest_waiting_time_poll (int): The waiting time to poll for ingest status.
558        - ingest_batch_size (int): The batch size for ingest.
559        - bulk_mode (bool): Whether to use bulk mode for ingest.
560        - update_strategy (str): The update strategy to use.
561        - load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster.
562        - test_ingest (bool, optional): Whether to run a test ingest. Defaults to False.
563        - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from
564            create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping
565            source file paths to UUIDs. If used will make ingest much quicker since no ingest
566            or look up of file needed. Defaults to None.
567        - schema_info (dict, optional): Schema information for the tables.
568                Used to validate ingest metrics match. Defaults to None.
569        - skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False.
570        """
571        self.tdr = tdr
572        """@private"""
573        self.filter_existing_ids = filter_existing_ids
574        """@private"""
575        self.unique_id_field = unique_id_field
576        """@private"""
577        self.table_name = table_name
578        """@private"""
579        self.ingest_metadata = ingest_metadata
580        """@private"""
581        self.dataset_id = dataset_id
582        """@private"""
583        self.ingest_waiting_time_poll = ingest_waiting_time_poll
584        """@private"""
585        self.ingest_batch_size = ingest_batch_size
586        """@private"""
587        self.bulk_mode = bulk_mode
588        """@private"""
589        self.update_strategy = update_strategy
590        """@private"""
591        self.load_tag = load_tag
592        """@private"""
593        self.test_ingest = test_ingest
594        """@private"""
595        self.file_to_uuid_dict = file_to_uuid_dict
596        """@private"""
597        self.schema_info = schema_info
598        """@private"""
599        self.skip_reformat = skip_reformat
600        """@private"""

Initialize the FilterAndBatchIngest class.

Args:

  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): An instance of the TDR class.
  • filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset.
  • unique_id_field (str): The unique ID field to filter on.
  • table_name (str): The name of the table to ingest data into.
  • ingest_metadata (list[dict]): The metadata to ingest.
  • dataset_id (str): The ID of the dataset.
  • ingest_waiting_time_poll (int): The waiting time to poll for ingest status.
  • ingest_batch_size (int): The batch size for ingest.
  • bulk_mode (bool): Whether to use bulk mode for ingest.
  • update_strategy (str): The update strategy to use.
  • load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster.
  • test_ingest (bool, optional): Whether to run a test ingest. Defaults to False.
  • file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping source file paths to UUIDs. If used will make ingest much quicker since no ingest or look up of file needed. Defaults to None.
  • schema_info (dict, optional): Schema information for the tables. Used to validate ingest metrics match. Defaults to None.
  • skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False.
def run(self) -> None:
602    def run(self) -> None:
603        """
604        Run the filter and batch ingest process.
605
606        This method filters out sample IDs that already exist in the dataset (if specified),
607        and then performs a batch ingest of the remaining metadata into the specified table.
608        """
609        if self.filter_existing_ids:
610            # Filter out sample ids that are already in the dataset
611            filtered_metrics = FilterOutSampleIdsAlreadyInDataset(
612                ingest_metrics=self.ingest_metadata,
613                dataset_id=self.dataset_id,
614                tdr=self.tdr,
615                target_table_name=self.table_name,
616                filter_entity_id=self.unique_id_field
617            ).run()
618        else:
619            filtered_metrics = self.ingest_metadata
620        # If there are metrics to ingest then ingest them
621        if filtered_metrics:
622            # Batch ingest of table to table within dataset
623            logging.info(f"Starting ingest into {self.table_name} in dataset {self.dataset_id}")
624            BatchIngest(
625                ingest_metadata=filtered_metrics,
626                tdr=self.tdr,
627                target_table_name=self.table_name,
628                dataset_id=self.dataset_id,
629                batch_size=self.ingest_batch_size,
630                bulk_mode=self.bulk_mode,
631                update_strategy=self.update_strategy,
632                waiting_time_to_poll=self.ingest_waiting_time_poll,
633                test_ingest=self.test_ingest,
634                load_tag=self.load_tag,
635                file_to_uuid_dict=self.file_to_uuid_dict,
636                schema_info=self.schema_info,
637                skip_reformat=self.skip_reformat
638            ).run()

Run the filter and batch ingest process.

This method filters out sample IDs that already exist in the dataset (if specified), and then performs a batch ingest of the remaining metadata into the specified table.

class GetPermissionsForWorkspaceIngest:
641class GetPermissionsForWorkspaceIngest:
642    """Obtain permissions necessary for workspace ingest."""
643
644    def __init__(self, terra_workspace: TerraWorkspace, dataset_info: dict, added_to_auth_domain: bool = False):
645        """
646        Initialize the GetPermissionsForWorkspaceIngest class.
647
648        **Args:**
649        - terra_workspace (`ops_utils.terra_util.TerraWorkspace`): Instance of the TerraWorkspace class.
650        - dataset_info (dict): Information about the dataset.
651        - added_to_auth_domain (bool, optional): Flag indicating if the SA account
652                has been added to the auth domain. Defaults to `False`.
653        """
654        self.terra_workspace = terra_workspace
655        """@private"""
656        self.dataset_info = dataset_info
657        """@private"""
658        self.added_to_auth_domain = added_to_auth_domain
659        """@private"""
660
661    def run(self) -> None:
662        """
663        Ensure the dataset SA account has the necessary permissions on the Terra workspace.
664
665        This method updates the user ACL to make the SA account a reader on the Terra workspace.
666        It also checks if the workspace has an authorization domain, and logs the
667        necessary steps to add the SA account to the auth domain.
668        """
669        # Ensure dataset SA account is reader on Terra workspace.
670        tdr_sa_account = self.dataset_info["ingestServiceAccount"]
671        self.terra_workspace.update_user_acl(email=tdr_sa_account, access_level="READER")
672
673        # Check if workspace has auth domain
674        workspace_info = self.terra_workspace.get_workspace_info().json()
675        auth_domain_list = workspace_info["workspace"]["authorizationDomain"]
676        # Attempt to add tdr_sa_account to auth domain
677        if auth_domain_list:
678            for auth_domain_dict in auth_domain_list:
679                auth_domain = auth_domain_dict["membersGroupName"]
680                logging.info(f"TDR SA account {tdr_sa_account} needs to be added to auth domain group {auth_domain}")
681            if self.added_to_auth_domain:
682                logging.info("added_to_auth_domain has been set to true so assuming account has already been added")
683            else:
684                logging.info(
685                    f"Please add TDR SA account {tdr_sa_account} to auth domain group(s) to allow  "
686                    "access to workspace and then rerun with added_to_auth_domain=True"
687                )
688                sys.exit(0)

Obtain permissions necessary for workspace ingest.

GetPermissionsForWorkspaceIngest( terra_workspace: ops_utils.terra_util.TerraWorkspace, dataset_info: dict, added_to_auth_domain: bool = False)
644    def __init__(self, terra_workspace: TerraWorkspace, dataset_info: dict, added_to_auth_domain: bool = False):
645        """
646        Initialize the GetPermissionsForWorkspaceIngest class.
647
648        **Args:**
649        - terra_workspace (`ops_utils.terra_util.TerraWorkspace`): Instance of the TerraWorkspace class.
650        - dataset_info (dict): Information about the dataset.
651        - added_to_auth_domain (bool, optional): Flag indicating if the SA account
652                has been added to the auth domain. Defaults to `False`.
653        """
654        self.terra_workspace = terra_workspace
655        """@private"""
656        self.dataset_info = dataset_info
657        """@private"""
658        self.added_to_auth_domain = added_to_auth_domain
659        """@private"""

Initialize the GetPermissionsForWorkspaceIngest class.

Args:

  • terra_workspace (ops_utils.terra_util.TerraWorkspace): Instance of the TerraWorkspace class.
  • dataset_info (dict): Information about the dataset.
  • added_to_auth_domain (bool, optional): Flag indicating if the SA account has been added to the auth domain. Defaults to False.
def run(self) -> None:
661    def run(self) -> None:
662        """
663        Ensure the dataset SA account has the necessary permissions on the Terra workspace.
664
665        This method updates the user ACL to make the SA account a reader on the Terra workspace.
666        It also checks if the workspace has an authorization domain, and logs the
667        necessary steps to add the SA account to the auth domain.
668        """
669        # Ensure dataset SA account is reader on Terra workspace.
670        tdr_sa_account = self.dataset_info["ingestServiceAccount"]
671        self.terra_workspace.update_user_acl(email=tdr_sa_account, access_level="READER")
672
673        # Check if workspace has auth domain
674        workspace_info = self.terra_workspace.get_workspace_info().json()
675        auth_domain_list = workspace_info["workspace"]["authorizationDomain"]
676        # Attempt to add tdr_sa_account to auth domain
677        if auth_domain_list:
678            for auth_domain_dict in auth_domain_list:
679                auth_domain = auth_domain_dict["membersGroupName"]
680                logging.info(f"TDR SA account {tdr_sa_account} needs to be added to auth domain group {auth_domain}")
681            if self.added_to_auth_domain:
682                logging.info("added_to_auth_domain has been set to true so assuming account has already been added")
683            else:
684                logging.info(
685                    f"Please add TDR SA account {tdr_sa_account} to auth domain group(s) to allow  "
686                    "access to workspace and then rerun with added_to_auth_domain=True"
687                )
688                sys.exit(0)

Ensure the dataset SA account has the necessary permissions on the Terra workspace.

This method updates the user ACL to make the SA account a reader on the Terra workspace. It also checks if the workspace has an authorization domain, and logs the necessary steps to add the SA account to the auth domain.