ops_utils.tdr_utils.tdr_ingest_utils
Classes and functions for ingesting data into TDR.
1"""Classes and functions for ingesting data into TDR.""" 2 3import json 4import logging 5import sys 6import pytz 7from datetime import datetime 8import math 9from typing import Optional, Any 10from dateutil import parser 11 12from ..vars import ARG_DEFAULTS 13 14from .tdr_api_utils import TDR, FilterOutSampleIdsAlreadyInDataset 15from .tdr_job_utils import MonitorTDRJob 16from ..terra_util import TerraWorkspace 17 18 19class BatchIngest: 20 """A class to handle batch ingestion of metadata into TDR (Terra Data Repository).""" 21 22 def __init__( 23 self, 24 ingest_metadata: list[dict], 25 tdr: TDR, 26 target_table_name: str, 27 dataset_id: str, 28 batch_size: int, 29 bulk_mode: bool, 30 update_strategy: str = "replace", 31 waiting_time_to_poll: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 32 test_ingest: bool = False, 33 load_tag: Optional[str] = None, 34 file_to_uuid_dict: Optional[dict] = None, 35 schema_info: Optional[dict] = None, 36 skip_reformat: bool = False 37 ): 38 """ 39 Initialize the BatchIngest class. 40 41 **Args:** 42 - ingest_metadata (list[dict]): The metadata to be ingested. 43 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 44 - target_table_name (str): The name of the target table. 45 - dataset_id (str): The ID of the dataset. 46 - batch_size (int): The size of each batch for ingestion. 47 - bulk_mode (bool): Flag indicating if bulk mode should be used. 48 - update_strategy (str, optional): The strategy for updating existing records. Defaults to `replace`. 49 - waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to `90`. 50 - test_ingest (bool, optional): Flag indicating if only the first batch should be 51 ingested for testing. Defaults to `False`. 52 - load_tag (str, optional): A tag to identify the load. Used so future ingests 53 can pick up where left off. Defaults to None. 54 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 55 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 56 source file paths to UUIDs. If used will make ingest much quicker since no ingest 57 or look up of file needed. Defaults to None. 58 - schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up 59 with schema info. Defaults to None. 60 - skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to `False`. 61 """ 62 self.ingest_metadata = self._reformat_for_type_consistency(ingest_metadata) 63 """@private""" 64 self.tdr = tdr 65 """@private""" 66 self.target_table_name = target_table_name 67 """@private""" 68 self.dataset_id = dataset_id 69 """@private""" 70 self.batch_size = int(batch_size) 71 """@private""" 72 self.update_strategy = update_strategy 73 """@private""" 74 self.bulk_mode = bulk_mode 75 """@private""" 76 self.waiting_time_to_poll = waiting_time_to_poll 77 """@private""" 78 # Used if you want to run first batch and then exit after success 79 self.test_ingest = test_ingest 80 """@private""" 81 self.load_tag = load_tag 82 """@private""" 83 self.file_to_uuid_dict = file_to_uuid_dict 84 """@private""" 85 # Used if you want to provide schema info for tables to make sure values match. 86 # Should be dict with key being column name and value being dict with datatype 87 self.schema_info = schema_info 88 """@private""" 89 # Use if input is already formatted correctly for ingest 90 self.skip_reformat = skip_reformat 91 """@private""" 92 93 @staticmethod 94 def _reformat_for_type_consistency(ingest_metadata: list[dict]) -> list[dict]: 95 """ 96 Reformats ingest metadata and finds headers where values are a mix of lists and non-lists. 97 98 If there is mix of these types of values, it converts the non-array to a one-item list. The updated metadata 99 is then returned to be used for everything downstream 100 """ 101 unique_headers = sorted({key for item in ingest_metadata for key in item.keys()}) 102 103 headers_containing_mismatch = [] 104 for header in unique_headers: 105 all_values_for_header = [r.get(header) for r in ingest_metadata] 106 # Find headers where some values are lists and some are not (while filtering out None values) 107 if any(isinstance(value, list) for value in all_values_for_header if value is not None) and not all( 108 isinstance(value, list) for value in all_values_for_header if value is not None): 109 logging.info( 110 f"Header {header} contains lists and non-list items. Will convert the non-list items into a list" 111 ) 112 headers_containing_mismatch.append(header) 113 114 updated_metadata = [] 115 for record in ingest_metadata: 116 new_record = {} 117 for header, value in record.items(): 118 if header in headers_containing_mismatch: 119 updated_value = [value] if not isinstance(value, list) else value 120 else: 121 updated_value = value 122 new_record[header] = updated_value 123 updated_metadata.append(new_record) 124 125 return updated_metadata 126 127 def run(self) -> None: 128 """Run the batch ingestion process.""" 129 logging.info( 130 f"Batching {len(self.ingest_metadata)} total rows into batches of {self.batch_size} for ingest") 131 total_batches = math.ceil(len(self.ingest_metadata) / self.batch_size) 132 for i in range(0, len(self.ingest_metadata), self.batch_size): 133 batch_number = i // self.batch_size + 1 134 logging.info(f"Starting ingest batch {batch_number} of {total_batches} into table {self.target_table_name}") 135 metrics_batch = self.ingest_metadata[i:i + self.batch_size] 136 if self.skip_reformat: 137 reformatted_batch = metrics_batch 138 else: 139 reformatted_batch = ReformatMetricsForIngest( 140 ingest_metadata=metrics_batch, 141 file_to_uuid_dict=self.file_to_uuid_dict, 142 schema_info=self.schema_info 143 ).run() 144 145 if self.load_tag: 146 load_tag = self.load_tag 147 else: 148 load_tag = f"{self.dataset_id}.{self.target_table_name}" 149 # Start actual ingest 150 if reformatted_batch: 151 StartAndMonitorIngest( 152 tdr=self.tdr, 153 ingest_records=reformatted_batch, 154 target_table_name=self.target_table_name, 155 dataset_id=self.dataset_id, 156 load_tag=load_tag, 157 bulk_mode=self.bulk_mode, 158 update_strategy=self.update_strategy, 159 waiting_time_to_poll=self.waiting_time_to_poll 160 ).run() 161 logging.info(f"Completed batch ingest of {len(reformatted_batch)} rows") 162 if self.test_ingest: 163 logging.info("First batch completed, exiting since test_ingest was used") 164 sys.exit(0) 165 else: 166 logging.info("No rows to ingest in this batch after reformatting") 167 logging.info("Whole Ingest completed") 168 169 170class StartAndMonitorIngest: 171 """Class to start and monitor the ingestion of recordsinto a TDR dataset.""" 172 173 def __init__( 174 self, tdr: TDR, 175 ingest_records: list[dict], 176 target_table_name: str, 177 dataset_id: str, 178 load_tag: str, 179 bulk_mode: bool, 180 update_strategy: str, 181 waiting_time_to_poll: int 182 ): 183 """ 184 Initialize the StartAndMonitorIngest. 185 186 **Args:** 187 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 188 - ingest_records (list[dict]): The records to be ingested. 189 - target_table_name (str): The name of the target table. 190 - dataset_id (str): The ID of the dataset. 191 - load_tag (str): A tag to identify the load. 192 - bulk_mode (bool): Flag indicating if bulk mode should be used. 193 - update_strategy (str): The strategy for updating existing records. 194 - waiting_time_to_poll (int): The time to wait between polling for job status. 195 """ 196 self.tdr = tdr 197 """@private""" 198 self.ingest_records = ingest_records 199 """@private""" 200 self.target_table_name = target_table_name 201 """@private""" 202 self.dataset_id = dataset_id 203 """@private""" 204 self.load_tag = load_tag 205 """@private""" 206 self.bulk_mode = bulk_mode 207 """@private""" 208 self.update_strategy = update_strategy 209 """@private""" 210 self.waiting_time_to_poll = waiting_time_to_poll 211 """@private""" 212 213 def _create_ingest_dataset_request(self) -> Any: 214 """ 215 Create the ingestDataset request body. 216 217 Returns: 218 Any: The request body for ingesting the dataset. 219 """ 220 # https://support.terra.bio/hc/en-us/articles/23460453585819-How-to-ingest-and-update-TDR-data-with-APIs 221 load_dict = { 222 "format": "array", 223 "records": self.ingest_records, 224 "table": self.target_table_name, 225 "resolve_existing_files": "true", 226 "updateStrategy": self.update_strategy, 227 "load_tag": self.load_tag, 228 "bulkMode": "true" if self.bulk_mode else "false" 229 } 230 return json.dumps(load_dict) # dict -> json 231 232 def run(self) -> None: 233 """Run the ingestion process and monitor the job until completion.""" 234 ingest_request = self._create_ingest_dataset_request() 235 logging.info(f"Starting ingest to {self.dataset_id}") 236 ingest_response = self.tdr.ingest_to_dataset(dataset_id=self.dataset_id, data=ingest_request).json() 237 MonitorTDRJob( 238 tdr=self.tdr, 239 job_id=ingest_response["id"], 240 check_interval=self.waiting_time_to_poll, 241 return_json=False 242 ).run() 243 244 245class ReformatMetricsForIngest: 246 """A class to reformat metrics for ingestion into TDR (Terra Data Repository).""" 247 248 def __init__( 249 self, 250 ingest_metadata: list[dict], 251 file_to_uuid_dict: Optional[dict] = None, 252 schema_info: Optional[dict] = None 253 ): 254 """ 255 Initialize the ReformatMetricsForIngest class. 256 257 This class is used to reformat metrics for ingest. 258 Assumes input JSON will be in the following format for GCP: 259 ``` 260 { 261 "file_name": blob.name, 262 "file_path": f"gs://{self.bucket_name}/{blob.name}", 263 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 264 "file_extension": os.path.splitext(blob.name)[1], 265 "size_in_bytes": blob.size, 266 "md5_hash": blob.md5_hash 267 } 268 ``` 269 270 **Args:** 271 - ingest_metadata (list[dict]): The metadata to be ingested. 272 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 273 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 274 source file paths to UUIDs. If used will make ingest much quicker since no ingest 275 or look up of file needed. Defaults to None. 276 - schema_info (dict, optional): Schema information for the tables. Defaults to None. 277 """ 278 self.ingest_metadata = ingest_metadata 279 """@private""" 280 self.file_prefix = "gs://" 281 """@private""" 282 self.file_to_uuid_dict = file_to_uuid_dict 283 """@private""" 284 self.schema_info = schema_info 285 """@private""" 286 287 def _add_file_ref(self, file_details: dict) -> None: 288 """ 289 Create file ref for ingest. 290 291 Args: 292 file_details (dict): The details of the file to be ingested. 293 """ 294 file_details["file_ref"] = { 295 "sourcePath": file_details["path"], 296 "targetPath": self._format_relative_tdr_path(file_details["path"]), 297 "description": f"Ingest of {file_details['path']}", 298 "mimeType": file_details["content_type"] 299 } 300 301 @staticmethod 302 def _format_relative_tdr_path(cloud_path: str) -> str: 303 """ 304 Format cloud path to TDR path. 305 306 Args: 307 cloud_path (str): The cloud path to be formatted. 308 309 Returns: 310 str: The formatted TDR path. 311 """ 312 relative_path = "/".join(cloud_path.split("/")[3:]) 313 return f"/{relative_path}" 314 315 def _check_and_format_file_path(self, column_value: str) -> tuple[Any, bool]: 316 """ 317 Check if column value is a gs:// path and reformat to dict with ingest information. 318 319 If file_to_uuid_dict is 320 provided then it will add existing UUID. If file_to_uuid_dict provided and file not found then will warn and 321 return None. 322 323 Args: 324 column_value (str): The column value to be checked and formatted. 325 326 Returns: 327 tuple[Any, bool]: The formatted column value and a validity flag. 328 """ 329 valid = True 330 if isinstance(column_value, str): 331 if column_value.startswith(self.file_prefix): 332 if self.file_to_uuid_dict: 333 uuid = self.file_to_uuid_dict.get(column_value) 334 if uuid: 335 column_value = uuid 336 return column_value, valid 337 else: 338 logging.warning( 339 f"File {column_value} not found in file_to_uuid_dict, will attempt " 340 "to ingest as regular file and not use UUID" 341 ) 342 source_dest_mapping = { 343 "sourcePath": column_value, 344 "targetPath": self._format_relative_tdr_path(column_value) 345 } 346 return source_dest_mapping, valid 347 return column_value, valid 348 349 def _validate_and_update_column_for_schema(self, column_name: str, column_value: Any) -> tuple[str, bool]: 350 """ 351 Check if column matches what schema expects and attempt to update if not. Changes to string at the end. 352 353 Args: 354 column_name (str): The name of the column. 355 column_value (Any): The value of the column. 356 357 Returns: 358 tuple[str, bool]: The validated and updated column value and a validity flag. 359 """ 360 valid = True 361 if self.schema_info: 362 if column_name in self.schema_info.keys(): 363 expected_data_type = self.schema_info[column_name]["datatype"] 364 if expected_data_type == "string" and not isinstance(column_value, str): 365 try: 366 column_value = str(column_value) 367 except ValueError: 368 logging.warning(f"Column {column_name} with value {column_value} is not a string") 369 valid = False 370 if expected_data_type in ["int64", "integer"] and not isinstance(column_value, int): 371 try: 372 column_value = int(column_value) 373 except ValueError: 374 logging.warning(f"Column {column_name} with value {column_value} is not an integer") 375 valid = False 376 if expected_data_type == "float64" and not isinstance(column_value, float): 377 try: 378 column_value = float(column_value) 379 except ValueError: 380 logging.warning(f"Column {column_name} with value {column_value} is not a float") 381 valid = False 382 if expected_data_type == "boolean" and not isinstance(column_value, bool): 383 try: 384 column_value = bool(column_value) 385 except ValueError: 386 logging.warning(f"Column {column_name} with value {column_value} is not a boolean") 387 valid = False 388 if expected_data_type in ["datetime", "date", "time"] and not isinstance(column_value, datetime): 389 try: 390 column_value = parser.parse(column_value) 391 except ValueError: 392 logging.warning(f"Column {column_name} with value {column_value} is not a datetime") 393 valid = False 394 if expected_data_type == "array" and not isinstance(column_value, list): 395 valid = False 396 logging.warning(f"Column {column_name} with value {column_value} is not a list") 397 if expected_data_type == "bytes" and not isinstance(column_value, bytes): 398 valid = False 399 logging.warning(f"Column {column_name} with value {column_value} is not bytes") 400 if expected_data_type == "fileref" and not column_value.startswith(self.file_prefix): 401 valid = False 402 logging.warning(f"Column {column_name} with value {column_value} is not a file path") 403 return str(column_value), valid 404 405 def _reformat_metric(self, row_dict: dict) -> Optional[dict]: 406 """ 407 Reformat metric for ingest. 408 409 Args: 410 row_dict (dict): The row dictionary to be reformatted. 411 412 Returns: 413 Optional[dict]: The reformatted row dictionary or None if invalid. 414 """ 415 reformatted_dict = {} 416 row_valid = True 417 for key, value in row_dict.items(): 418 if value or value == 0: 419 if self.schema_info: 420 value, valid = self._validate_and_update_column_for_schema(key, value) 421 if not valid: 422 row_valid = False 423 if isinstance(value, list): 424 updated_value_list = [] 425 for item in value: 426 update_value, valid = self._check_and_format_file_path(item) 427 if not valid: 428 row_valid = False 429 updated_value_list.append(update_value) 430 reformatted_dict[key] = updated_value_list 431 else: 432 update_value, valid = self._check_and_format_file_path(value) 433 reformatted_dict[key] = update_value 434 if not valid: 435 row_valid = False 436 reformatted_dict["last_modified_date"] = datetime.now(tz=pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S") # type: ignore[assignment] # noqa: E501 437 if row_valid: 438 return reformatted_dict 439 else: 440 logging.info(f"Row {json.dumps(row_dict, indent=4)} not valid and will not be included in ingest") 441 return None 442 443 def run(self) -> list[dict]: 444 """ 445 Run the reformatting process for all metrics. 446 447 **Returns:** 448 - list[dict]: A list of reformatted metrics. 449 """ 450 reformatted_metrics = [] 451 for row_dict in self.ingest_metadata: 452 reformatted_row = self._reformat_metric(row_dict) 453 if reformatted_row: 454 reformatted_metrics.append(reformatted_row) 455 return reformatted_metrics 456 457 458class ConvertTerraTableInfoForIngest: 459 """Converts each row of table metadata into a dictionary that can be ingested into TDR.""" 460 461 def __init__( 462 self, 463 table_metadata: list[dict], 464 columns_to_ignore: list[str] = [], 465 tdr_row_id: Optional[str] = None 466 ): 467 """ 468 Initialize the ConvertTerraTableInfoForIngest class. 469 470 Input will look like this: 471 ``` 472 [{ 473 "attributes": { 474 "some_metric": 99.99, 475 "some_file_path": "gs://path/to/file", 476 "something_to_exclude": "exclude_me" 477 }, 478 "entityType": "sample", 479 "name": "SM-MVVVV" 480 }] 481 ``` 482 And be converted to this: 483 ``` 484 [{ 485 "sample_id": "SM-MVVVV", 486 "some_metric": 99.99, 487 "some_file_path": "gs://path/to/file" 488 }] 489 ``` 490 **Args:** 491 - table_metadata (list[dict]): The metadata of the table to be converted. 492 - tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id. 493 - columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list. 494 """ 495 self.table_metadata = table_metadata 496 """@private""" 497 if table_metadata: 498 self.tdr_row_id = tdr_row_id if tdr_row_id else f'{table_metadata[0]["entityType"]}_id' 499 """@private""" 500 else: 501 # Won't be used if table_metadata is empty but will be set to empty string 502 self.tdr_row_id = "" 503 """@private""" 504 self.columns_to_ignore = columns_to_ignore 505 """@private""" 506 507 def run(self) -> list[dict]: 508 """ 509 Convert the table metadata into a format suitable for TDR ingestion. 510 511 **Returns:** 512 - list[dict]: A list of dictionaries containing the converted table metadata. 513 """ 514 return [ 515 { 516 self.tdr_row_id: row["name"], 517 **{k: v for k, v in row["attributes"].items() 518 # if columns_to_ignore is not provided or if the column is not in the columns_to_ignore list 519 if k not in self.columns_to_ignore} 520 } 521 for row in self.table_metadata 522 ] 523 524 525class FilterAndBatchIngest: 526 """Filter and batch ingest process into TDR.""" 527 528 def __init__( 529 self, 530 tdr: TDR, 531 filter_existing_ids: bool, 532 unique_id_field: str, 533 table_name: str, 534 ingest_metadata: list[dict], 535 dataset_id: str, 536 ingest_waiting_time_poll: int, 537 ingest_batch_size: int, 538 bulk_mode: bool, 539 update_strategy: str, 540 load_tag: str, 541 test_ingest: bool = False, 542 file_to_uuid_dict: Optional[dict] = None, 543 schema_info: Optional[dict] = None, 544 skip_reformat: bool = False 545 ): 546 """ 547 Initialize the FilterAndBatchIngest class. 548 549 **Args:** 550 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 551 - filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset. 552 - unique_id_field (str): The unique ID field to filter on. 553 - table_name (str): The name of the table to ingest data into. 554 - ingest_metadata (list[dict]): The metadata to ingest. 555 - dataset_id (str): The ID of the dataset. 556 - ingest_waiting_time_poll (int): The waiting time to poll for ingest status. 557 - ingest_batch_size (int): The batch size for ingest. 558 - bulk_mode (bool): Whether to use bulk mode for ingest. 559 - update_strategy (str): The update strategy to use. 560 - load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster. 561 - test_ingest (bool, optional): Whether to run a test ingest. Defaults to False. 562 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 563 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 564 source file paths to UUIDs. If used will make ingest much quicker since no ingest 565 or look up of file needed. Defaults to None. 566 - schema_info (dict, optional): Schema information for the tables. 567 Used to validate ingest metrics match. Defaults to None. 568 - skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False. 569 """ 570 self.tdr = tdr 571 """@private""" 572 self.filter_existing_ids = filter_existing_ids 573 """@private""" 574 self.unique_id_field = unique_id_field 575 """@private""" 576 self.table_name = table_name 577 """@private""" 578 self.ingest_metadata = ingest_metadata 579 """@private""" 580 self.dataset_id = dataset_id 581 """@private""" 582 self.ingest_waiting_time_poll = ingest_waiting_time_poll 583 """@private""" 584 self.ingest_batch_size = ingest_batch_size 585 """@private""" 586 self.bulk_mode = bulk_mode 587 """@private""" 588 self.update_strategy = update_strategy 589 """@private""" 590 self.load_tag = load_tag 591 """@private""" 592 self.test_ingest = test_ingest 593 """@private""" 594 self.file_to_uuid_dict = file_to_uuid_dict 595 """@private""" 596 self.schema_info = schema_info 597 """@private""" 598 self.skip_reformat = skip_reformat 599 """@private""" 600 601 def run(self) -> None: 602 """ 603 Run the filter and batch ingest process. 604 605 This method filters out sample IDs that already exist in the dataset (if specified), 606 and then performs a batch ingest of the remaining metadata into the specified table. 607 """ 608 if self.filter_existing_ids: 609 # Filter out sample ids that are already in the dataset 610 filtered_metrics = FilterOutSampleIdsAlreadyInDataset( 611 ingest_metrics=self.ingest_metadata, 612 dataset_id=self.dataset_id, 613 tdr=self.tdr, 614 target_table_name=self.table_name, 615 filter_entity_id=self.unique_id_field 616 ).run() 617 else: 618 filtered_metrics = self.ingest_metadata 619 # If there are metrics to ingest then ingest them 620 if filtered_metrics: 621 # Batch ingest of table to table within dataset 622 logging.info(f"Starting ingest into {self.table_name} in dataset {self.dataset_id}") 623 BatchIngest( 624 ingest_metadata=filtered_metrics, 625 tdr=self.tdr, 626 target_table_name=self.table_name, 627 dataset_id=self.dataset_id, 628 batch_size=self.ingest_batch_size, 629 bulk_mode=self.bulk_mode, 630 update_strategy=self.update_strategy, 631 waiting_time_to_poll=self.ingest_waiting_time_poll, 632 test_ingest=self.test_ingest, 633 load_tag=self.load_tag, 634 file_to_uuid_dict=self.file_to_uuid_dict, 635 schema_info=self.schema_info, 636 skip_reformat=self.skip_reformat 637 ).run() 638 639 640class GetPermissionsForWorkspaceIngest: 641 """Obtain permissions necessary for workspace ingest.""" 642 643 def __init__(self, terra_workspace: TerraWorkspace, dataset_info: dict, added_to_auth_domain: bool = False): 644 """ 645 Initialize the GetPermissionsForWorkspaceIngest class. 646 647 **Args:** 648 - terra_workspace (`ops_utils.terra_util.TerraWorkspace`): Instance of the TerraWorkspace class. 649 - dataset_info (dict): Information about the dataset. 650 - added_to_auth_domain (bool, optional): Flag indicating if the SA account 651 has been added to the auth domain. Defaults to `False`. 652 """ 653 self.terra_workspace = terra_workspace 654 """@private""" 655 self.dataset_info = dataset_info 656 """@private""" 657 self.added_to_auth_domain = added_to_auth_domain 658 """@private""" 659 660 def run(self) -> None: 661 """ 662 Ensure the dataset SA account has the necessary permissions on the Terra workspace. 663 664 This method updates the user ACL to make the SA account a reader on the Terra workspace. 665 It also checks if the workspace has an authorization domain, and logs the 666 necessary steps to add the SA account to the auth domain. 667 """ 668 # Ensure dataset SA account is reader on Terra workspace. 669 tdr_sa_account = self.dataset_info["ingestServiceAccount"] 670 self.terra_workspace.update_user_acl(email=tdr_sa_account, access_level="READER") 671 672 # Check if workspace has auth domain 673 workspace_info = self.terra_workspace.get_workspace_info().json() 674 auth_domain_list = workspace_info["workspace"]["authorizationDomain"] 675 # Attempt to add tdr_sa_account to auth domain 676 if auth_domain_list: 677 for auth_domain_dict in auth_domain_list: 678 auth_domain = auth_domain_dict["membersGroupName"] 679 logging.info(f"TDR SA account {tdr_sa_account} needs to be added to auth domain group {auth_domain}") 680 if self.added_to_auth_domain: 681 logging.info("added_to_auth_domain has been set to true so assuming account has already been added") 682 else: 683 logging.info( 684 f"Please add TDR SA account {tdr_sa_account} to auth domain group(s) to allow " 685 "access to workspace and then rerun with added_to_auth_domain=True" 686 ) 687 sys.exit(0)
20class BatchIngest: 21 """A class to handle batch ingestion of metadata into TDR (Terra Data Repository).""" 22 23 def __init__( 24 self, 25 ingest_metadata: list[dict], 26 tdr: TDR, 27 target_table_name: str, 28 dataset_id: str, 29 batch_size: int, 30 bulk_mode: bool, 31 update_strategy: str = "replace", 32 waiting_time_to_poll: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 33 test_ingest: bool = False, 34 load_tag: Optional[str] = None, 35 file_to_uuid_dict: Optional[dict] = None, 36 schema_info: Optional[dict] = None, 37 skip_reformat: bool = False 38 ): 39 """ 40 Initialize the BatchIngest class. 41 42 **Args:** 43 - ingest_metadata (list[dict]): The metadata to be ingested. 44 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 45 - target_table_name (str): The name of the target table. 46 - dataset_id (str): The ID of the dataset. 47 - batch_size (int): The size of each batch for ingestion. 48 - bulk_mode (bool): Flag indicating if bulk mode should be used. 49 - update_strategy (str, optional): The strategy for updating existing records. Defaults to `replace`. 50 - waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to `90`. 51 - test_ingest (bool, optional): Flag indicating if only the first batch should be 52 ingested for testing. Defaults to `False`. 53 - load_tag (str, optional): A tag to identify the load. Used so future ingests 54 can pick up where left off. Defaults to None. 55 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 56 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 57 source file paths to UUIDs. If used will make ingest much quicker since no ingest 58 or look up of file needed. Defaults to None. 59 - schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up 60 with schema info. Defaults to None. 61 - skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to `False`. 62 """ 63 self.ingest_metadata = self._reformat_for_type_consistency(ingest_metadata) 64 """@private""" 65 self.tdr = tdr 66 """@private""" 67 self.target_table_name = target_table_name 68 """@private""" 69 self.dataset_id = dataset_id 70 """@private""" 71 self.batch_size = int(batch_size) 72 """@private""" 73 self.update_strategy = update_strategy 74 """@private""" 75 self.bulk_mode = bulk_mode 76 """@private""" 77 self.waiting_time_to_poll = waiting_time_to_poll 78 """@private""" 79 # Used if you want to run first batch and then exit after success 80 self.test_ingest = test_ingest 81 """@private""" 82 self.load_tag = load_tag 83 """@private""" 84 self.file_to_uuid_dict = file_to_uuid_dict 85 """@private""" 86 # Used if you want to provide schema info for tables to make sure values match. 87 # Should be dict with key being column name and value being dict with datatype 88 self.schema_info = schema_info 89 """@private""" 90 # Use if input is already formatted correctly for ingest 91 self.skip_reformat = skip_reformat 92 """@private""" 93 94 @staticmethod 95 def _reformat_for_type_consistency(ingest_metadata: list[dict]) -> list[dict]: 96 """ 97 Reformats ingest metadata and finds headers where values are a mix of lists and non-lists. 98 99 If there is mix of these types of values, it converts the non-array to a one-item list. The updated metadata 100 is then returned to be used for everything downstream 101 """ 102 unique_headers = sorted({key for item in ingest_metadata for key in item.keys()}) 103 104 headers_containing_mismatch = [] 105 for header in unique_headers: 106 all_values_for_header = [r.get(header) for r in ingest_metadata] 107 # Find headers where some values are lists and some are not (while filtering out None values) 108 if any(isinstance(value, list) for value in all_values_for_header if value is not None) and not all( 109 isinstance(value, list) for value in all_values_for_header if value is not None): 110 logging.info( 111 f"Header {header} contains lists and non-list items. Will convert the non-list items into a list" 112 ) 113 headers_containing_mismatch.append(header) 114 115 updated_metadata = [] 116 for record in ingest_metadata: 117 new_record = {} 118 for header, value in record.items(): 119 if header in headers_containing_mismatch: 120 updated_value = [value] if not isinstance(value, list) else value 121 else: 122 updated_value = value 123 new_record[header] = updated_value 124 updated_metadata.append(new_record) 125 126 return updated_metadata 127 128 def run(self) -> None: 129 """Run the batch ingestion process.""" 130 logging.info( 131 f"Batching {len(self.ingest_metadata)} total rows into batches of {self.batch_size} for ingest") 132 total_batches = math.ceil(len(self.ingest_metadata) / self.batch_size) 133 for i in range(0, len(self.ingest_metadata), self.batch_size): 134 batch_number = i // self.batch_size + 1 135 logging.info(f"Starting ingest batch {batch_number} of {total_batches} into table {self.target_table_name}") 136 metrics_batch = self.ingest_metadata[i:i + self.batch_size] 137 if self.skip_reformat: 138 reformatted_batch = metrics_batch 139 else: 140 reformatted_batch = ReformatMetricsForIngest( 141 ingest_metadata=metrics_batch, 142 file_to_uuid_dict=self.file_to_uuid_dict, 143 schema_info=self.schema_info 144 ).run() 145 146 if self.load_tag: 147 load_tag = self.load_tag 148 else: 149 load_tag = f"{self.dataset_id}.{self.target_table_name}" 150 # Start actual ingest 151 if reformatted_batch: 152 StartAndMonitorIngest( 153 tdr=self.tdr, 154 ingest_records=reformatted_batch, 155 target_table_name=self.target_table_name, 156 dataset_id=self.dataset_id, 157 load_tag=load_tag, 158 bulk_mode=self.bulk_mode, 159 update_strategy=self.update_strategy, 160 waiting_time_to_poll=self.waiting_time_to_poll 161 ).run() 162 logging.info(f"Completed batch ingest of {len(reformatted_batch)} rows") 163 if self.test_ingest: 164 logging.info("First batch completed, exiting since test_ingest was used") 165 sys.exit(0) 166 else: 167 logging.info("No rows to ingest in this batch after reformatting") 168 logging.info("Whole Ingest completed")
A class to handle batch ingestion of metadata into TDR (Terra Data Repository).
23 def __init__( 24 self, 25 ingest_metadata: list[dict], 26 tdr: TDR, 27 target_table_name: str, 28 dataset_id: str, 29 batch_size: int, 30 bulk_mode: bool, 31 update_strategy: str = "replace", 32 waiting_time_to_poll: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 33 test_ingest: bool = False, 34 load_tag: Optional[str] = None, 35 file_to_uuid_dict: Optional[dict] = None, 36 schema_info: Optional[dict] = None, 37 skip_reformat: bool = False 38 ): 39 """ 40 Initialize the BatchIngest class. 41 42 **Args:** 43 - ingest_metadata (list[dict]): The metadata to be ingested. 44 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 45 - target_table_name (str): The name of the target table. 46 - dataset_id (str): The ID of the dataset. 47 - batch_size (int): The size of each batch for ingestion. 48 - bulk_mode (bool): Flag indicating if bulk mode should be used. 49 - update_strategy (str, optional): The strategy for updating existing records. Defaults to `replace`. 50 - waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to `90`. 51 - test_ingest (bool, optional): Flag indicating if only the first batch should be 52 ingested for testing. Defaults to `False`. 53 - load_tag (str, optional): A tag to identify the load. Used so future ingests 54 can pick up where left off. Defaults to None. 55 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 56 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 57 source file paths to UUIDs. If used will make ingest much quicker since no ingest 58 or look up of file needed. Defaults to None. 59 - schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up 60 with schema info. Defaults to None. 61 - skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to `False`. 62 """ 63 self.ingest_metadata = self._reformat_for_type_consistency(ingest_metadata) 64 """@private""" 65 self.tdr = tdr 66 """@private""" 67 self.target_table_name = target_table_name 68 """@private""" 69 self.dataset_id = dataset_id 70 """@private""" 71 self.batch_size = int(batch_size) 72 """@private""" 73 self.update_strategy = update_strategy 74 """@private""" 75 self.bulk_mode = bulk_mode 76 """@private""" 77 self.waiting_time_to_poll = waiting_time_to_poll 78 """@private""" 79 # Used if you want to run first batch and then exit after success 80 self.test_ingest = test_ingest 81 """@private""" 82 self.load_tag = load_tag 83 """@private""" 84 self.file_to_uuid_dict = file_to_uuid_dict 85 """@private""" 86 # Used if you want to provide schema info for tables to make sure values match. 87 # Should be dict with key being column name and value being dict with datatype 88 self.schema_info = schema_info 89 """@private""" 90 # Use if input is already formatted correctly for ingest 91 self.skip_reformat = skip_reformat 92 """@private"""
Initialize the BatchIngest class.
Args:
- ingest_metadata (list[dict]): The metadata to be ingested.
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - target_table_name (str): The name of the target table.
- dataset_id (str): The ID of the dataset.
- batch_size (int): The size of each batch for ingestion.
- bulk_mode (bool): Flag indicating if bulk mode should be used.
- update_strategy (str, optional): The strategy for updating existing records. Defaults to
replace
. - waiting_time_to_poll (int, optional): The time to wait between polling for job status. Defaults to
90
. - test_ingest (bool, optional): Flag indicating if only the first batch should be
ingested for testing. Defaults to
False
. - load_tag (str, optional): A tag to identify the load. Used so future ingests can pick up where left off. Defaults to None.
- file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping source file paths to UUIDs. If used will make ingest much quicker since no ingest or look up of file needed. Defaults to None.
- schema_info (dict, optional): Schema information for the tables. Validates ingest data matches up with schema info. Defaults to None.
- skip_reformat (bool, optional): Flag indicating if reformatting should be skipped. Defaults to
False
.
128 def run(self) -> None: 129 """Run the batch ingestion process.""" 130 logging.info( 131 f"Batching {len(self.ingest_metadata)} total rows into batches of {self.batch_size} for ingest") 132 total_batches = math.ceil(len(self.ingest_metadata) / self.batch_size) 133 for i in range(0, len(self.ingest_metadata), self.batch_size): 134 batch_number = i // self.batch_size + 1 135 logging.info(f"Starting ingest batch {batch_number} of {total_batches} into table {self.target_table_name}") 136 metrics_batch = self.ingest_metadata[i:i + self.batch_size] 137 if self.skip_reformat: 138 reformatted_batch = metrics_batch 139 else: 140 reformatted_batch = ReformatMetricsForIngest( 141 ingest_metadata=metrics_batch, 142 file_to_uuid_dict=self.file_to_uuid_dict, 143 schema_info=self.schema_info 144 ).run() 145 146 if self.load_tag: 147 load_tag = self.load_tag 148 else: 149 load_tag = f"{self.dataset_id}.{self.target_table_name}" 150 # Start actual ingest 151 if reformatted_batch: 152 StartAndMonitorIngest( 153 tdr=self.tdr, 154 ingest_records=reformatted_batch, 155 target_table_name=self.target_table_name, 156 dataset_id=self.dataset_id, 157 load_tag=load_tag, 158 bulk_mode=self.bulk_mode, 159 update_strategy=self.update_strategy, 160 waiting_time_to_poll=self.waiting_time_to_poll 161 ).run() 162 logging.info(f"Completed batch ingest of {len(reformatted_batch)} rows") 163 if self.test_ingest: 164 logging.info("First batch completed, exiting since test_ingest was used") 165 sys.exit(0) 166 else: 167 logging.info("No rows to ingest in this batch after reformatting") 168 logging.info("Whole Ingest completed")
Run the batch ingestion process.
171class StartAndMonitorIngest: 172 """Class to start and monitor the ingestion of recordsinto a TDR dataset.""" 173 174 def __init__( 175 self, tdr: TDR, 176 ingest_records: list[dict], 177 target_table_name: str, 178 dataset_id: str, 179 load_tag: str, 180 bulk_mode: bool, 181 update_strategy: str, 182 waiting_time_to_poll: int 183 ): 184 """ 185 Initialize the StartAndMonitorIngest. 186 187 **Args:** 188 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 189 - ingest_records (list[dict]): The records to be ingested. 190 - target_table_name (str): The name of the target table. 191 - dataset_id (str): The ID of the dataset. 192 - load_tag (str): A tag to identify the load. 193 - bulk_mode (bool): Flag indicating if bulk mode should be used. 194 - update_strategy (str): The strategy for updating existing records. 195 - waiting_time_to_poll (int): The time to wait between polling for job status. 196 """ 197 self.tdr = tdr 198 """@private""" 199 self.ingest_records = ingest_records 200 """@private""" 201 self.target_table_name = target_table_name 202 """@private""" 203 self.dataset_id = dataset_id 204 """@private""" 205 self.load_tag = load_tag 206 """@private""" 207 self.bulk_mode = bulk_mode 208 """@private""" 209 self.update_strategy = update_strategy 210 """@private""" 211 self.waiting_time_to_poll = waiting_time_to_poll 212 """@private""" 213 214 def _create_ingest_dataset_request(self) -> Any: 215 """ 216 Create the ingestDataset request body. 217 218 Returns: 219 Any: The request body for ingesting the dataset. 220 """ 221 # https://support.terra.bio/hc/en-us/articles/23460453585819-How-to-ingest-and-update-TDR-data-with-APIs 222 load_dict = { 223 "format": "array", 224 "records": self.ingest_records, 225 "table": self.target_table_name, 226 "resolve_existing_files": "true", 227 "updateStrategy": self.update_strategy, 228 "load_tag": self.load_tag, 229 "bulkMode": "true" if self.bulk_mode else "false" 230 } 231 return json.dumps(load_dict) # dict -> json 232 233 def run(self) -> None: 234 """Run the ingestion process and monitor the job until completion.""" 235 ingest_request = self._create_ingest_dataset_request() 236 logging.info(f"Starting ingest to {self.dataset_id}") 237 ingest_response = self.tdr.ingest_to_dataset(dataset_id=self.dataset_id, data=ingest_request).json() 238 MonitorTDRJob( 239 tdr=self.tdr, 240 job_id=ingest_response["id"], 241 check_interval=self.waiting_time_to_poll, 242 return_json=False 243 ).run()
Class to start and monitor the ingestion of recordsinto a TDR dataset.
174 def __init__( 175 self, tdr: TDR, 176 ingest_records: list[dict], 177 target_table_name: str, 178 dataset_id: str, 179 load_tag: str, 180 bulk_mode: bool, 181 update_strategy: str, 182 waiting_time_to_poll: int 183 ): 184 """ 185 Initialize the StartAndMonitorIngest. 186 187 **Args:** 188 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 189 - ingest_records (list[dict]): The records to be ingested. 190 - target_table_name (str): The name of the target table. 191 - dataset_id (str): The ID of the dataset. 192 - load_tag (str): A tag to identify the load. 193 - bulk_mode (bool): Flag indicating if bulk mode should be used. 194 - update_strategy (str): The strategy for updating existing records. 195 - waiting_time_to_poll (int): The time to wait between polling for job status. 196 """ 197 self.tdr = tdr 198 """@private""" 199 self.ingest_records = ingest_records 200 """@private""" 201 self.target_table_name = target_table_name 202 """@private""" 203 self.dataset_id = dataset_id 204 """@private""" 205 self.load_tag = load_tag 206 """@private""" 207 self.bulk_mode = bulk_mode 208 """@private""" 209 self.update_strategy = update_strategy 210 """@private""" 211 self.waiting_time_to_poll = waiting_time_to_poll 212 """@private"""
Initialize the StartAndMonitorIngest.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - ingest_records (list[dict]): The records to be ingested.
- target_table_name (str): The name of the target table.
- dataset_id (str): The ID of the dataset.
- load_tag (str): A tag to identify the load.
- bulk_mode (bool): Flag indicating if bulk mode should be used.
- update_strategy (str): The strategy for updating existing records.
- waiting_time_to_poll (int): The time to wait between polling for job status.
233 def run(self) -> None: 234 """Run the ingestion process and monitor the job until completion.""" 235 ingest_request = self._create_ingest_dataset_request() 236 logging.info(f"Starting ingest to {self.dataset_id}") 237 ingest_response = self.tdr.ingest_to_dataset(dataset_id=self.dataset_id, data=ingest_request).json() 238 MonitorTDRJob( 239 tdr=self.tdr, 240 job_id=ingest_response["id"], 241 check_interval=self.waiting_time_to_poll, 242 return_json=False 243 ).run()
Run the ingestion process and monitor the job until completion.
246class ReformatMetricsForIngest: 247 """A class to reformat metrics for ingestion into TDR (Terra Data Repository).""" 248 249 def __init__( 250 self, 251 ingest_metadata: list[dict], 252 file_to_uuid_dict: Optional[dict] = None, 253 schema_info: Optional[dict] = None 254 ): 255 """ 256 Initialize the ReformatMetricsForIngest class. 257 258 This class is used to reformat metrics for ingest. 259 Assumes input JSON will be in the following format for GCP: 260 ``` 261 { 262 "file_name": blob.name, 263 "file_path": f"gs://{self.bucket_name}/{blob.name}", 264 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 265 "file_extension": os.path.splitext(blob.name)[1], 266 "size_in_bytes": blob.size, 267 "md5_hash": blob.md5_hash 268 } 269 ``` 270 271 **Args:** 272 - ingest_metadata (list[dict]): The metadata to be ingested. 273 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 274 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 275 source file paths to UUIDs. If used will make ingest much quicker since no ingest 276 or look up of file needed. Defaults to None. 277 - schema_info (dict, optional): Schema information for the tables. Defaults to None. 278 """ 279 self.ingest_metadata = ingest_metadata 280 """@private""" 281 self.file_prefix = "gs://" 282 """@private""" 283 self.file_to_uuid_dict = file_to_uuid_dict 284 """@private""" 285 self.schema_info = schema_info 286 """@private""" 287 288 def _add_file_ref(self, file_details: dict) -> None: 289 """ 290 Create file ref for ingest. 291 292 Args: 293 file_details (dict): The details of the file to be ingested. 294 """ 295 file_details["file_ref"] = { 296 "sourcePath": file_details["path"], 297 "targetPath": self._format_relative_tdr_path(file_details["path"]), 298 "description": f"Ingest of {file_details['path']}", 299 "mimeType": file_details["content_type"] 300 } 301 302 @staticmethod 303 def _format_relative_tdr_path(cloud_path: str) -> str: 304 """ 305 Format cloud path to TDR path. 306 307 Args: 308 cloud_path (str): The cloud path to be formatted. 309 310 Returns: 311 str: The formatted TDR path. 312 """ 313 relative_path = "/".join(cloud_path.split("/")[3:]) 314 return f"/{relative_path}" 315 316 def _check_and_format_file_path(self, column_value: str) -> tuple[Any, bool]: 317 """ 318 Check if column value is a gs:// path and reformat to dict with ingest information. 319 320 If file_to_uuid_dict is 321 provided then it will add existing UUID. If file_to_uuid_dict provided and file not found then will warn and 322 return None. 323 324 Args: 325 column_value (str): The column value to be checked and formatted. 326 327 Returns: 328 tuple[Any, bool]: The formatted column value and a validity flag. 329 """ 330 valid = True 331 if isinstance(column_value, str): 332 if column_value.startswith(self.file_prefix): 333 if self.file_to_uuid_dict: 334 uuid = self.file_to_uuid_dict.get(column_value) 335 if uuid: 336 column_value = uuid 337 return column_value, valid 338 else: 339 logging.warning( 340 f"File {column_value} not found in file_to_uuid_dict, will attempt " 341 "to ingest as regular file and not use UUID" 342 ) 343 source_dest_mapping = { 344 "sourcePath": column_value, 345 "targetPath": self._format_relative_tdr_path(column_value) 346 } 347 return source_dest_mapping, valid 348 return column_value, valid 349 350 def _validate_and_update_column_for_schema(self, column_name: str, column_value: Any) -> tuple[str, bool]: 351 """ 352 Check if column matches what schema expects and attempt to update if not. Changes to string at the end. 353 354 Args: 355 column_name (str): The name of the column. 356 column_value (Any): The value of the column. 357 358 Returns: 359 tuple[str, bool]: The validated and updated column value and a validity flag. 360 """ 361 valid = True 362 if self.schema_info: 363 if column_name in self.schema_info.keys(): 364 expected_data_type = self.schema_info[column_name]["datatype"] 365 if expected_data_type == "string" and not isinstance(column_value, str): 366 try: 367 column_value = str(column_value) 368 except ValueError: 369 logging.warning(f"Column {column_name} with value {column_value} is not a string") 370 valid = False 371 if expected_data_type in ["int64", "integer"] and not isinstance(column_value, int): 372 try: 373 column_value = int(column_value) 374 except ValueError: 375 logging.warning(f"Column {column_name} with value {column_value} is not an integer") 376 valid = False 377 if expected_data_type == "float64" and not isinstance(column_value, float): 378 try: 379 column_value = float(column_value) 380 except ValueError: 381 logging.warning(f"Column {column_name} with value {column_value} is not a float") 382 valid = False 383 if expected_data_type == "boolean" and not isinstance(column_value, bool): 384 try: 385 column_value = bool(column_value) 386 except ValueError: 387 logging.warning(f"Column {column_name} with value {column_value} is not a boolean") 388 valid = False 389 if expected_data_type in ["datetime", "date", "time"] and not isinstance(column_value, datetime): 390 try: 391 column_value = parser.parse(column_value) 392 except ValueError: 393 logging.warning(f"Column {column_name} with value {column_value} is not a datetime") 394 valid = False 395 if expected_data_type == "array" and not isinstance(column_value, list): 396 valid = False 397 logging.warning(f"Column {column_name} with value {column_value} is not a list") 398 if expected_data_type == "bytes" and not isinstance(column_value, bytes): 399 valid = False 400 logging.warning(f"Column {column_name} with value {column_value} is not bytes") 401 if expected_data_type == "fileref" and not column_value.startswith(self.file_prefix): 402 valid = False 403 logging.warning(f"Column {column_name} with value {column_value} is not a file path") 404 return str(column_value), valid 405 406 def _reformat_metric(self, row_dict: dict) -> Optional[dict]: 407 """ 408 Reformat metric for ingest. 409 410 Args: 411 row_dict (dict): The row dictionary to be reformatted. 412 413 Returns: 414 Optional[dict]: The reformatted row dictionary or None if invalid. 415 """ 416 reformatted_dict = {} 417 row_valid = True 418 for key, value in row_dict.items(): 419 if value or value == 0: 420 if self.schema_info: 421 value, valid = self._validate_and_update_column_for_schema(key, value) 422 if not valid: 423 row_valid = False 424 if isinstance(value, list): 425 updated_value_list = [] 426 for item in value: 427 update_value, valid = self._check_and_format_file_path(item) 428 if not valid: 429 row_valid = False 430 updated_value_list.append(update_value) 431 reformatted_dict[key] = updated_value_list 432 else: 433 update_value, valid = self._check_and_format_file_path(value) 434 reformatted_dict[key] = update_value 435 if not valid: 436 row_valid = False 437 reformatted_dict["last_modified_date"] = datetime.now(tz=pytz.UTC).strftime("%Y-%m-%dT%H:%M:%S") # type: ignore[assignment] # noqa: E501 438 if row_valid: 439 return reformatted_dict 440 else: 441 logging.info(f"Row {json.dumps(row_dict, indent=4)} not valid and will not be included in ingest") 442 return None 443 444 def run(self) -> list[dict]: 445 """ 446 Run the reformatting process for all metrics. 447 448 **Returns:** 449 - list[dict]: A list of reformatted metrics. 450 """ 451 reformatted_metrics = [] 452 for row_dict in self.ingest_metadata: 453 reformatted_row = self._reformat_metric(row_dict) 454 if reformatted_row: 455 reformatted_metrics.append(reformatted_row) 456 return reformatted_metrics
A class to reformat metrics for ingestion into TDR (Terra Data Repository).
249 def __init__( 250 self, 251 ingest_metadata: list[dict], 252 file_to_uuid_dict: Optional[dict] = None, 253 schema_info: Optional[dict] = None 254 ): 255 """ 256 Initialize the ReformatMetricsForIngest class. 257 258 This class is used to reformat metrics for ingest. 259 Assumes input JSON will be in the following format for GCP: 260 ``` 261 { 262 "file_name": blob.name, 263 "file_path": f"gs://{self.bucket_name}/{blob.name}", 264 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 265 "file_extension": os.path.splitext(blob.name)[1], 266 "size_in_bytes": blob.size, 267 "md5_hash": blob.md5_hash 268 } 269 ``` 270 271 **Args:** 272 - ingest_metadata (list[dict]): The metadata to be ingested. 273 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 274 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 275 source file paths to UUIDs. If used will make ingest much quicker since no ingest 276 or look up of file needed. Defaults to None. 277 - schema_info (dict, optional): Schema information for the tables. Defaults to None. 278 """ 279 self.ingest_metadata = ingest_metadata 280 """@private""" 281 self.file_prefix = "gs://" 282 """@private""" 283 self.file_to_uuid_dict = file_to_uuid_dict 284 """@private""" 285 self.schema_info = schema_info 286 """@private"""
Initialize the ReformatMetricsForIngest class.
This class is used to reformat metrics for ingest. Assumes input JSON will be in the following format for GCP:
{
"file_name": blob.name,
"file_path": f"gs://{self.bucket_name}/{blob.name}",
"content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
"file_extension": os.path.splitext(blob.name)[1],
"size_in_bytes": blob.size,
"md5_hash": blob.md5_hash
}
Args:
- ingest_metadata (list[dict]): The metadata to be ingested.
- file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping source file paths to UUIDs. If used will make ingest much quicker since no ingest or look up of file needed. Defaults to None.
- schema_info (dict, optional): Schema information for the tables. Defaults to None.
444 def run(self) -> list[dict]: 445 """ 446 Run the reformatting process for all metrics. 447 448 **Returns:** 449 - list[dict]: A list of reformatted metrics. 450 """ 451 reformatted_metrics = [] 452 for row_dict in self.ingest_metadata: 453 reformatted_row = self._reformat_metric(row_dict) 454 if reformatted_row: 455 reformatted_metrics.append(reformatted_row) 456 return reformatted_metrics
Run the reformatting process for all metrics.
Returns:
- list[dict]: A list of reformatted metrics.
459class ConvertTerraTableInfoForIngest: 460 """Converts each row of table metadata into a dictionary that can be ingested into TDR.""" 461 462 def __init__( 463 self, 464 table_metadata: list[dict], 465 columns_to_ignore: list[str] = [], 466 tdr_row_id: Optional[str] = None 467 ): 468 """ 469 Initialize the ConvertTerraTableInfoForIngest class. 470 471 Input will look like this: 472 ``` 473 [{ 474 "attributes": { 475 "some_metric": 99.99, 476 "some_file_path": "gs://path/to/file", 477 "something_to_exclude": "exclude_me" 478 }, 479 "entityType": "sample", 480 "name": "SM-MVVVV" 481 }] 482 ``` 483 And be converted to this: 484 ``` 485 [{ 486 "sample_id": "SM-MVVVV", 487 "some_metric": 99.99, 488 "some_file_path": "gs://path/to/file" 489 }] 490 ``` 491 **Args:** 492 - table_metadata (list[dict]): The metadata of the table to be converted. 493 - tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id. 494 - columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list. 495 """ 496 self.table_metadata = table_metadata 497 """@private""" 498 if table_metadata: 499 self.tdr_row_id = tdr_row_id if tdr_row_id else f'{table_metadata[0]["entityType"]}_id' 500 """@private""" 501 else: 502 # Won't be used if table_metadata is empty but will be set to empty string 503 self.tdr_row_id = "" 504 """@private""" 505 self.columns_to_ignore = columns_to_ignore 506 """@private""" 507 508 def run(self) -> list[dict]: 509 """ 510 Convert the table metadata into a format suitable for TDR ingestion. 511 512 **Returns:** 513 - list[dict]: A list of dictionaries containing the converted table metadata. 514 """ 515 return [ 516 { 517 self.tdr_row_id: row["name"], 518 **{k: v for k, v in row["attributes"].items() 519 # if columns_to_ignore is not provided or if the column is not in the columns_to_ignore list 520 if k not in self.columns_to_ignore} 521 } 522 for row in self.table_metadata 523 ]
Converts each row of table metadata into a dictionary that can be ingested into TDR.
462 def __init__( 463 self, 464 table_metadata: list[dict], 465 columns_to_ignore: list[str] = [], 466 tdr_row_id: Optional[str] = None 467 ): 468 """ 469 Initialize the ConvertTerraTableInfoForIngest class. 470 471 Input will look like this: 472 ``` 473 [{ 474 "attributes": { 475 "some_metric": 99.99, 476 "some_file_path": "gs://path/to/file", 477 "something_to_exclude": "exclude_me" 478 }, 479 "entityType": "sample", 480 "name": "SM-MVVVV" 481 }] 482 ``` 483 And be converted to this: 484 ``` 485 [{ 486 "sample_id": "SM-MVVVV", 487 "some_metric": 99.99, 488 "some_file_path": "gs://path/to/file" 489 }] 490 ``` 491 **Args:** 492 - table_metadata (list[dict]): The metadata of the table to be converted. 493 - tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id. 494 - columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list. 495 """ 496 self.table_metadata = table_metadata 497 """@private""" 498 if table_metadata: 499 self.tdr_row_id = tdr_row_id if tdr_row_id else f'{table_metadata[0]["entityType"]}_id' 500 """@private""" 501 else: 502 # Won't be used if table_metadata is empty but will be set to empty string 503 self.tdr_row_id = "" 504 """@private""" 505 self.columns_to_ignore = columns_to_ignore 506 """@private"""
Initialize the ConvertTerraTableInfoForIngest class.
Input will look like this:
[{
"attributes": {
"some_metric": 99.99,
"some_file_path": "gs://path/to/file",
"something_to_exclude": "exclude_me"
},
"entityType": "sample",
"name": "SM-MVVVV"
}]
And be converted to this:
[{
"sample_id": "SM-MVVVV",
"some_metric": 99.99,
"some_file_path": "gs://path/to/file"
}]
Args:
- table_metadata (list[dict]): The metadata of the table to be converted.
- tdr_row_id (str): The row ID to be used in the TDR. Defaults to {entityType}_id.
- columns_to_ignore (list[str]): List of columns to ignore during conversion. Defaults to an empty list.
508 def run(self) -> list[dict]: 509 """ 510 Convert the table metadata into a format suitable for TDR ingestion. 511 512 **Returns:** 513 - list[dict]: A list of dictionaries containing the converted table metadata. 514 """ 515 return [ 516 { 517 self.tdr_row_id: row["name"], 518 **{k: v for k, v in row["attributes"].items() 519 # if columns_to_ignore is not provided or if the column is not in the columns_to_ignore list 520 if k not in self.columns_to_ignore} 521 } 522 for row in self.table_metadata 523 ]
Convert the table metadata into a format suitable for TDR ingestion.
Returns:
- list[dict]: A list of dictionaries containing the converted table metadata.
526class FilterAndBatchIngest: 527 """Filter and batch ingest process into TDR.""" 528 529 def __init__( 530 self, 531 tdr: TDR, 532 filter_existing_ids: bool, 533 unique_id_field: str, 534 table_name: str, 535 ingest_metadata: list[dict], 536 dataset_id: str, 537 ingest_waiting_time_poll: int, 538 ingest_batch_size: int, 539 bulk_mode: bool, 540 update_strategy: str, 541 load_tag: str, 542 test_ingest: bool = False, 543 file_to_uuid_dict: Optional[dict] = None, 544 schema_info: Optional[dict] = None, 545 skip_reformat: bool = False 546 ): 547 """ 548 Initialize the FilterAndBatchIngest class. 549 550 **Args:** 551 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 552 - filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset. 553 - unique_id_field (str): The unique ID field to filter on. 554 - table_name (str): The name of the table to ingest data into. 555 - ingest_metadata (list[dict]): The metadata to ingest. 556 - dataset_id (str): The ID of the dataset. 557 - ingest_waiting_time_poll (int): The waiting time to poll for ingest status. 558 - ingest_batch_size (int): The batch size for ingest. 559 - bulk_mode (bool): Whether to use bulk mode for ingest. 560 - update_strategy (str): The update strategy to use. 561 - load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster. 562 - test_ingest (bool, optional): Whether to run a test ingest. Defaults to False. 563 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 564 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 565 source file paths to UUIDs. If used will make ingest much quicker since no ingest 566 or look up of file needed. Defaults to None. 567 - schema_info (dict, optional): Schema information for the tables. 568 Used to validate ingest metrics match. Defaults to None. 569 - skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False. 570 """ 571 self.tdr = tdr 572 """@private""" 573 self.filter_existing_ids = filter_existing_ids 574 """@private""" 575 self.unique_id_field = unique_id_field 576 """@private""" 577 self.table_name = table_name 578 """@private""" 579 self.ingest_metadata = ingest_metadata 580 """@private""" 581 self.dataset_id = dataset_id 582 """@private""" 583 self.ingest_waiting_time_poll = ingest_waiting_time_poll 584 """@private""" 585 self.ingest_batch_size = ingest_batch_size 586 """@private""" 587 self.bulk_mode = bulk_mode 588 """@private""" 589 self.update_strategy = update_strategy 590 """@private""" 591 self.load_tag = load_tag 592 """@private""" 593 self.test_ingest = test_ingest 594 """@private""" 595 self.file_to_uuid_dict = file_to_uuid_dict 596 """@private""" 597 self.schema_info = schema_info 598 """@private""" 599 self.skip_reformat = skip_reformat 600 """@private""" 601 602 def run(self) -> None: 603 """ 604 Run the filter and batch ingest process. 605 606 This method filters out sample IDs that already exist in the dataset (if specified), 607 and then performs a batch ingest of the remaining metadata into the specified table. 608 """ 609 if self.filter_existing_ids: 610 # Filter out sample ids that are already in the dataset 611 filtered_metrics = FilterOutSampleIdsAlreadyInDataset( 612 ingest_metrics=self.ingest_metadata, 613 dataset_id=self.dataset_id, 614 tdr=self.tdr, 615 target_table_name=self.table_name, 616 filter_entity_id=self.unique_id_field 617 ).run() 618 else: 619 filtered_metrics = self.ingest_metadata 620 # If there are metrics to ingest then ingest them 621 if filtered_metrics: 622 # Batch ingest of table to table within dataset 623 logging.info(f"Starting ingest into {self.table_name} in dataset {self.dataset_id}") 624 BatchIngest( 625 ingest_metadata=filtered_metrics, 626 tdr=self.tdr, 627 target_table_name=self.table_name, 628 dataset_id=self.dataset_id, 629 batch_size=self.ingest_batch_size, 630 bulk_mode=self.bulk_mode, 631 update_strategy=self.update_strategy, 632 waiting_time_to_poll=self.ingest_waiting_time_poll, 633 test_ingest=self.test_ingest, 634 load_tag=self.load_tag, 635 file_to_uuid_dict=self.file_to_uuid_dict, 636 schema_info=self.schema_info, 637 skip_reformat=self.skip_reformat 638 ).run()
Filter and batch ingest process into TDR.
529 def __init__( 530 self, 531 tdr: TDR, 532 filter_existing_ids: bool, 533 unique_id_field: str, 534 table_name: str, 535 ingest_metadata: list[dict], 536 dataset_id: str, 537 ingest_waiting_time_poll: int, 538 ingest_batch_size: int, 539 bulk_mode: bool, 540 update_strategy: str, 541 load_tag: str, 542 test_ingest: bool = False, 543 file_to_uuid_dict: Optional[dict] = None, 544 schema_info: Optional[dict] = None, 545 skip_reformat: bool = False 546 ): 547 """ 548 Initialize the FilterAndBatchIngest class. 549 550 **Args:** 551 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 552 - filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset. 553 - unique_id_field (str): The unique ID field to filter on. 554 - table_name (str): The name of the table to ingest data into. 555 - ingest_metadata (list[dict]): The metadata to ingest. 556 - dataset_id (str): The ID of the dataset. 557 - ingest_waiting_time_poll (int): The waiting time to poll for ingest status. 558 - ingest_batch_size (int): The batch size for ingest. 559 - bulk_mode (bool): Whether to use bulk mode for ingest. 560 - update_strategy (str): The update strategy to use. 561 - load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster. 562 - test_ingest (bool, optional): Whether to run a test ingest. Defaults to False. 563 - file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from 564 create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping 565 source file paths to UUIDs. If used will make ingest much quicker since no ingest 566 or look up of file needed. Defaults to None. 567 - schema_info (dict, optional): Schema information for the tables. 568 Used to validate ingest metrics match. Defaults to None. 569 - skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False. 570 """ 571 self.tdr = tdr 572 """@private""" 573 self.filter_existing_ids = filter_existing_ids 574 """@private""" 575 self.unique_id_field = unique_id_field 576 """@private""" 577 self.table_name = table_name 578 """@private""" 579 self.ingest_metadata = ingest_metadata 580 """@private""" 581 self.dataset_id = dataset_id 582 """@private""" 583 self.ingest_waiting_time_poll = ingest_waiting_time_poll 584 """@private""" 585 self.ingest_batch_size = ingest_batch_size 586 """@private""" 587 self.bulk_mode = bulk_mode 588 """@private""" 589 self.update_strategy = update_strategy 590 """@private""" 591 self.load_tag = load_tag 592 """@private""" 593 self.test_ingest = test_ingest 594 """@private""" 595 self.file_to_uuid_dict = file_to_uuid_dict 596 """@private""" 597 self.schema_info = schema_info 598 """@private""" 599 self.skip_reformat = skip_reformat 600 """@private"""
Initialize the FilterAndBatchIngest class.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - filter_existing_ids (bool): Whether to filter out sample IDs that already exist in the dataset.
- unique_id_field (str): The unique ID field to filter on.
- table_name (str): The name of the table to ingest data into.
- ingest_metadata (list[dict]): The metadata to ingest.
- dataset_id (str): The ID of the dataset.
- ingest_waiting_time_poll (int): The waiting time to poll for ingest status.
- ingest_batch_size (int): The batch size for ingest.
- bulk_mode (bool): Whether to use bulk mode for ingest.
- update_strategy (str): The update strategy to use.
- load_tag (str): The load tag for ingest. Used to make future ingests of the same files go faster.
- test_ingest (bool, optional): Whether to run a test ingest. Defaults to False.
- file_to_uuid_dict (dict, optional): Only useful for self-hosted dataset. Can get from create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset. A dictionary mapping source file paths to UUIDs. If used will make ingest much quicker since no ingest or look up of file needed. Defaults to None.
- schema_info (dict, optional): Schema information for the tables. Used to validate ingest metrics match. Defaults to None.
- skip_reformat (bool, optional): Whether to skip reformatting of metrics. Defaults to False.
602 def run(self) -> None: 603 """ 604 Run the filter and batch ingest process. 605 606 This method filters out sample IDs that already exist in the dataset (if specified), 607 and then performs a batch ingest of the remaining metadata into the specified table. 608 """ 609 if self.filter_existing_ids: 610 # Filter out sample ids that are already in the dataset 611 filtered_metrics = FilterOutSampleIdsAlreadyInDataset( 612 ingest_metrics=self.ingest_metadata, 613 dataset_id=self.dataset_id, 614 tdr=self.tdr, 615 target_table_name=self.table_name, 616 filter_entity_id=self.unique_id_field 617 ).run() 618 else: 619 filtered_metrics = self.ingest_metadata 620 # If there are metrics to ingest then ingest them 621 if filtered_metrics: 622 # Batch ingest of table to table within dataset 623 logging.info(f"Starting ingest into {self.table_name} in dataset {self.dataset_id}") 624 BatchIngest( 625 ingest_metadata=filtered_metrics, 626 tdr=self.tdr, 627 target_table_name=self.table_name, 628 dataset_id=self.dataset_id, 629 batch_size=self.ingest_batch_size, 630 bulk_mode=self.bulk_mode, 631 update_strategy=self.update_strategy, 632 waiting_time_to_poll=self.ingest_waiting_time_poll, 633 test_ingest=self.test_ingest, 634 load_tag=self.load_tag, 635 file_to_uuid_dict=self.file_to_uuid_dict, 636 schema_info=self.schema_info, 637 skip_reformat=self.skip_reformat 638 ).run()
Run the filter and batch ingest process.
This method filters out sample IDs that already exist in the dataset (if specified), and then performs a batch ingest of the remaining metadata into the specified table.
641class GetPermissionsForWorkspaceIngest: 642 """Obtain permissions necessary for workspace ingest.""" 643 644 def __init__(self, terra_workspace: TerraWorkspace, dataset_info: dict, added_to_auth_domain: bool = False): 645 """ 646 Initialize the GetPermissionsForWorkspaceIngest class. 647 648 **Args:** 649 - terra_workspace (`ops_utils.terra_util.TerraWorkspace`): Instance of the TerraWorkspace class. 650 - dataset_info (dict): Information about the dataset. 651 - added_to_auth_domain (bool, optional): Flag indicating if the SA account 652 has been added to the auth domain. Defaults to `False`. 653 """ 654 self.terra_workspace = terra_workspace 655 """@private""" 656 self.dataset_info = dataset_info 657 """@private""" 658 self.added_to_auth_domain = added_to_auth_domain 659 """@private""" 660 661 def run(self) -> None: 662 """ 663 Ensure the dataset SA account has the necessary permissions on the Terra workspace. 664 665 This method updates the user ACL to make the SA account a reader on the Terra workspace. 666 It also checks if the workspace has an authorization domain, and logs the 667 necessary steps to add the SA account to the auth domain. 668 """ 669 # Ensure dataset SA account is reader on Terra workspace. 670 tdr_sa_account = self.dataset_info["ingestServiceAccount"] 671 self.terra_workspace.update_user_acl(email=tdr_sa_account, access_level="READER") 672 673 # Check if workspace has auth domain 674 workspace_info = self.terra_workspace.get_workspace_info().json() 675 auth_domain_list = workspace_info["workspace"]["authorizationDomain"] 676 # Attempt to add tdr_sa_account to auth domain 677 if auth_domain_list: 678 for auth_domain_dict in auth_domain_list: 679 auth_domain = auth_domain_dict["membersGroupName"] 680 logging.info(f"TDR SA account {tdr_sa_account} needs to be added to auth domain group {auth_domain}") 681 if self.added_to_auth_domain: 682 logging.info("added_to_auth_domain has been set to true so assuming account has already been added") 683 else: 684 logging.info( 685 f"Please add TDR SA account {tdr_sa_account} to auth domain group(s) to allow " 686 "access to workspace and then rerun with added_to_auth_domain=True" 687 ) 688 sys.exit(0)
Obtain permissions necessary for workspace ingest.
644 def __init__(self, terra_workspace: TerraWorkspace, dataset_info: dict, added_to_auth_domain: bool = False): 645 """ 646 Initialize the GetPermissionsForWorkspaceIngest class. 647 648 **Args:** 649 - terra_workspace (`ops_utils.terra_util.TerraWorkspace`): Instance of the TerraWorkspace class. 650 - dataset_info (dict): Information about the dataset. 651 - added_to_auth_domain (bool, optional): Flag indicating if the SA account 652 has been added to the auth domain. Defaults to `False`. 653 """ 654 self.terra_workspace = terra_workspace 655 """@private""" 656 self.dataset_info = dataset_info 657 """@private""" 658 self.added_to_auth_domain = added_to_auth_domain 659 """@private"""
Initialize the GetPermissionsForWorkspaceIngest class.
Args:
- terra_workspace (
ops_utils.terra_util.TerraWorkspace
): Instance of the TerraWorkspace class. - dataset_info (dict): Information about the dataset.
- added_to_auth_domain (bool, optional): Flag indicating if the SA account
has been added to the auth domain. Defaults to
False
.
661 def run(self) -> None: 662 """ 663 Ensure the dataset SA account has the necessary permissions on the Terra workspace. 664 665 This method updates the user ACL to make the SA account a reader on the Terra workspace. 666 It also checks if the workspace has an authorization domain, and logs the 667 necessary steps to add the SA account to the auth domain. 668 """ 669 # Ensure dataset SA account is reader on Terra workspace. 670 tdr_sa_account = self.dataset_info["ingestServiceAccount"] 671 self.terra_workspace.update_user_acl(email=tdr_sa_account, access_level="READER") 672 673 # Check if workspace has auth domain 674 workspace_info = self.terra_workspace.get_workspace_info().json() 675 auth_domain_list = workspace_info["workspace"]["authorizationDomain"] 676 # Attempt to add tdr_sa_account to auth domain 677 if auth_domain_list: 678 for auth_domain_dict in auth_domain_list: 679 auth_domain = auth_domain_dict["membersGroupName"] 680 logging.info(f"TDR SA account {tdr_sa_account} needs to be added to auth domain group {auth_domain}") 681 if self.added_to_auth_domain: 682 logging.info("added_to_auth_domain has been set to true so assuming account has already been added") 683 else: 684 logging.info( 685 f"Please add TDR SA account {tdr_sa_account} to auth domain group(s) to allow " 686 "access to workspace and then rerun with added_to_auth_domain=True" 687 ) 688 sys.exit(0)
Ensure the dataset SA account has the necessary permissions on the Terra workspace.
This method updates the user ACL to make the SA account a reader on the Terra workspace. It also checks if the workspace has an authorization domain, and logs the necessary steps to add the SA account to the auth domain.