ops_utils.tdr_utils.tdr_api_utils
Utility classes for interacting with TDR API.
1"""Utility classes for interacting with TDR API.""" 2 3import json 4import logging 5import requests 6from typing import Any, Optional, Union 7from pydantic import ValidationError 8 9from ..request_util import GET, POST, DELETE, PUT, RunRequest 10from ..tdr_api_schema.create_dataset_schema import CreateDatasetSchema 11from ..tdr_api_schema.update_dataset_schema import UpdateSchema 12from .tdr_job_utils import MonitorTDRJob, SubmitAndMonitorMultipleJobs 13from ..vars import ARG_DEFAULTS, GCP, APPLICATION_JSON 14 15 16class TDR: 17 """Class to interact with the Terra Data Repository (TDR) API.""" 18 19 PROD_LINK = "https://data.terra.bio/api/repository/v1" 20 DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1" 21 """(str): The base URL for the TDR API.""" 22 23 def __init__(self, request_util: RunRequest, env: str = 'prod'): 24 """ 25 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 26 27 **Args:** 28 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 29 """ 30 self.request_util = request_util 31 if env.lower() == 'prod': 32 self.tdr_link = self.PROD_LINK 33 elif env.lower() == 'dev': 34 self.tdr_link = self.DEV_LINK 35 else: 36 raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.") 37 """@private""" 38 39 @staticmethod 40 def _check_policy(policy: str) -> None: 41 """ 42 Check if the policy is valid. 43 44 **Args:** 45 - policy (str): The role to check. 46 47 **Raises:** 48 - ValueError: If the policy is not one of the allowed options. 49 """ 50 if policy not in ["steward", "custodian", "snapshot_creator"]: 51 raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator") 52 53 def get_dataset_files( 54 self, 55 dataset_id: str, 56 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 57 ) -> list[dict]: 58 """ 59 Get all files in a dataset. 60 61 Returns json like below 62 63 { 64 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 65 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 66 "path": "/path/set/in/ingest.csv", 67 "size": 1722, 68 "checksums": [ 69 { 70 "checksum": "82f7e79v", 71 "type": "crc32c" 72 }, 73 { 74 "checksum": "fff973507e30b74fa47a3d6830b84a90", 75 "type": "md5" 76 } 77 ], 78 "created": "2024-13-11T15:01:00.256Z", 79 "description": null, 80 "fileType": "file", 81 "fileDetail": { 82 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 83 "mimeType": null, 84 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 85 "loadTag": "RP_3333-RP_3333" 86 }, 87 "directoryDetail": null 88 } 89 90 **Args:** 91 - dataset_id (str): The ID of the dataset. 92 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 93 94 **Returns:** 95 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 96 """ 97 uri = f"{self.tdr_link}/datasets/{dataset_id}/files" 98 logging.info(f"Getting all files in dataset {dataset_id}") 99 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 100 101 def create_file_dict( 102 self, 103 dataset_id: str, 104 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 105 ) -> dict: 106 """ 107 Create a dictionary of all files in a dataset where the key is the file UUID. 108 109 **Args:** 110 - dataset_id (str): The ID of the dataset. 111 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 112 113 **Returns:** 114 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 115 """ 116 return { 117 file_dict["fileId"]: file_dict 118 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 119 } 120 121 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 122 self, 123 dataset_id: str, 124 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 125 ) -> dict: 126 """ 127 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 128 129 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 130 131 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 132 133 **Args:** 134 - dataset_id (str): The ID of the dataset. 135 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 136 137 **Returns:** 138 - dict: A dictionary where the key is the file UUID and the value is the file path. 139 """ 140 return { 141 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 142 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 143 } 144 145 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 146 """ 147 Delete a file from a dataset. 148 149 **Args:** 150 - file_id (str): The ID of the file to be deleted. 151 - dataset_id (str): The ID of the dataset. 152 153 **Returns:** 154 - requests.Response: The response from the request. 155 """ 156 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}" 157 logging.info(f"Submitting delete job for file {file_id}") 158 return self.request_util.run_request(uri=uri, method=DELETE) 159 160 def delete_files( 161 self, 162 file_ids: list[str], 163 dataset_id: str, 164 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 165 check_interval: int = 15) -> None: 166 """ 167 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 168 169 **Args:** 170 - file_ids (list[str]): A list of file IDs to be deleted. 171 - dataset_id (str): The ID of the dataset. 172 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 173 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 174 """ 175 SubmitAndMonitorMultipleJobs( 176 tdr=self, 177 job_function=self.delete_file, 178 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 179 batch_size=batch_size_to_delete_files, 180 check_interval=check_interval 181 ).run() 182 183 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 184 """ 185 Add a user to a dataset with a specified policy. 186 187 **Args:** 188 - dataset_id (str): The ID of the dataset. 189 - user (str): The email of the user to be added. 190 - policy (str): The policy to be assigned to the user. 191 Must be one of `steward`, `custodian`, or `snapshot_creator`. 192 193 **Returns:** 194 - requests.Response: The response from the request. 195 196 **Raises:** 197 - ValueError: If the policy is not valid. 198 """ 199 self._check_policy(policy) 200 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members" 201 member_dict = {"email": user} 202 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 203 return self.request_util.run_request( 204 uri=uri, 205 method=POST, 206 data=json.dumps(member_dict), 207 content_type=APPLICATION_JSON 208 ) 209 210 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 211 """ 212 Remove a user from a dataset. 213 214 **Args:** 215 - dataset_id (str): The ID of the dataset. 216 - user (str): The email of the user to be removed. 217 - policy (str): The policy to be removed from the user. 218 Must be one of `steward`, `custodian`, or `snapshot_creator`. 219 220 **Returns:** 221 - requests.Response: The response from the request. 222 223 **Raises:** 224 - ValueError: If the policy is not valid. 225 """ 226 self._check_policy(policy) 227 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}" 228 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 229 return self.request_util.run_request(uri=uri, method=DELETE) 230 231 def delete_dataset(self, dataset_id: str) -> None: 232 """ 233 Delete a dataset and monitors the job until completion. 234 235 **Args:** 236 dataset_id (str): The ID of the dataset to be deleted. 237 """ 238 uri = f"{self.tdr_link}/datasets/{dataset_id}" 239 logging.info(f"Deleting dataset {dataset_id}") 240 response = self.request_util.run_request(uri=uri, method=DELETE) 241 job_id = response.json()['id'] 242 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run() 243 244 def make_snapshot_public(self, snapshot_id: str) -> requests.Response: 245 """ 246 Make a snapshot public. 247 248 **Args:** 249 - snapshot_id (str): The ID of the snapshot to be made public. 250 251 **Returns:** 252 - requests.Response: The response from the request. 253 """ 254 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public" 255 logging.info(f"Making snapshot {snapshot_id} public") 256 return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true") 257 258 def get_snapshot_info( 259 self, 260 snapshot_id: str, 261 continue_not_found: bool = False, 262 info_to_include: Optional[list[str]] = None 263 ) -> Optional[requests.Response]: 264 """ 265 Get information about a snapshot. 266 267 **Args:** 268 - snapshot_id (str): The ID of the snapshot. 269 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 270 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 271 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 272 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 273 274 **Returns:** 275 - requests.Response (optional): The response from the request (returns None if the snapshot is not 276 found or access is denied). 277 """ 278 acceptable_return_code = [404, 403] if continue_not_found else [] 279 acceptable_include_info = [ 280 "SOURCES", 281 "TABLES", 282 "RELATIONSHIPS", 283 "ACCESS_INFORMATION", 284 "PROFILE", 285 "PROPERTIES", 286 "DATA_PROJECT", 287 "CREATION_INFORMATION", 288 "DUOS" 289 ] 290 if info_to_include: 291 if not all(info in acceptable_include_info for info in info_to_include): 292 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 293 include_string = '&include='.join(info_to_include) 294 else: 295 include_string = "" 296 uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}" 297 response = self.request_util.run_request( 298 uri=uri, 299 method=GET, 300 accept_return_codes=acceptable_return_code 301 ) 302 if response.status_code == 404: 303 logging.warning(f"Snapshot {snapshot_id} not found") 304 return None 305 if response.status_code == 403: 306 logging.warning(f"Access denied for snapshot {snapshot_id}") 307 return None 308 return response 309 310 def delete_snapshots( 311 self, 312 snapshot_ids: list[str], 313 batch_size: int = 25, 314 check_interval: int = 10, 315 verbose: bool = False) -> None: 316 """ 317 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 318 319 **Args:** 320 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 321 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 322 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 323 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 324 """ 325 SubmitAndMonitorMultipleJobs( 326 tdr=self, 327 job_function=self.delete_snapshot, 328 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 329 batch_size=batch_size, 330 check_interval=check_interval, 331 verbose=verbose 332 ).run() 333 334 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 335 """ 336 Delete a snapshot. 337 338 **Args:** 339 - snapshot_id (str): The ID of the snapshot to be deleted. 340 341 **Returns:** 342 - requests.Response: The response from the request. 343 """ 344 uri = f"{self.tdr_link}/snapshots/{snapshot_id}" 345 logging.info(f"Deleting snapshot {snapshot_id}") 346 return self.request_util.run_request(uri=uri, method=DELETE) 347 348 def _yield_existing_datasets( 349 self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc" 350 ) -> Any: 351 """ 352 Get all datasets in TDR, optionally filtered by dataset name. 353 354 **Args:** 355 filter (Optional[str]): A filter string to match dataset names. Defaults to None. 356 batch_size (int): The number of datasets to retrieve per batch. Defaults to 100. 357 direction (str): The direction to sort the datasets by creation date. Defaults to "asc". 358 359 Yields: 360 Any: A generator yielding datasets. 361 """ 362 offset = 0 363 if filter: 364 filter_str = f"&filter={filter}" 365 log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}" 366 else: 367 filter_str = "" 368 log_message = f"Searching for all datasets in batches of {batch_size}" 369 logging.info(log_message) 370 while True: 371 uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501 372 response = self.request_util.run_request(uri=uri, method=GET) 373 datasets = response.json()["items"] 374 if not datasets: 375 break 376 for dataset in datasets: 377 yield dataset 378 offset += batch_size 379 break 380 381 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 382 """ 383 Check if a dataset exists by name and optionally by billing profile. 384 385 **Args:** 386 - dataset_name (str): The name of the dataset to check. 387 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 388 389 **Returns:** 390 - list[dict]: A list of matching datasets. 391 """ 392 matching_datasets = [] 393 for dataset in self._yield_existing_datasets(filter=dataset_name): 394 # Search uses wildcard so could grab more datasets where dataset_name is substring 395 if dataset_name == dataset["name"]: 396 if billing_profile: 397 if dataset["defaultProfileId"] == billing_profile: 398 logging.info( 399 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 400 dataset_id = dataset["id"] 401 logging.info(f"Dataset ID: {dataset_id}") 402 matching_datasets.append(dataset) 403 else: 404 logging.warning( 405 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 406 f"and not under billing profile {billing_profile}" 407 ) 408 # Datasets names need to be unique regardless of billing profile, so raise an error if 409 # a dataset with the same name is found but is not under the requested billing profile 410 raise ValueError( 411 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 412 else: 413 matching_datasets.append(dataset) 414 return matching_datasets 415 416 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 417 """ 418 Get information about a dataset. 419 420 **Args:** 421 - dataset_id (str): The ID of the dataset. 422 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 423 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 424 Defaults to None. 425 426 **Returns:** 427 - requests.Response: The response from the request. 428 429 **Raises:** 430 - ValueError: If `info_to_include` contains invalid information types. 431 """ 432 acceptable_include_info = [ 433 "SCHEMA", 434 "ACCESS_INFORMATION", 435 "PROFILE", 436 "PROPERTIES", 437 "DATA_PROJECT", 438 "STORAGE", 439 "SNAPSHOT_BUILDER_SETTING" 440 ] 441 if info_to_include: 442 if not all(info in acceptable_include_info for info in info_to_include): 443 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 444 include_string = '&include='.join(info_to_include) 445 else: 446 include_string = "" 447 uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}" 448 return self.request_util.run_request(uri=uri, method=GET) 449 450 def get_table_schema_info( 451 self, 452 dataset_id: str, 453 table_name: str, 454 dataset_info: Optional[dict] = None 455 ) -> Union[dict, None]: 456 """ 457 Get schema information for a specific table within a dataset. 458 459 **Args:** 460 - dataset_id (str): The ID of the dataset. 461 - table_name (str): The name of the table. 462 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 463 464 **Returns:** 465 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 466 """ 467 if not dataset_info: 468 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 469 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 470 if table["name"] == table_name: 471 return table 472 return None 473 474 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 475 """ 476 Retrieve the result of a job. 477 478 **Args:** 479 - job_id (str): The ID of the job. 480 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 481 482 **Returns:** 483 - requests.Response: The response from the request. 484 """ 485 uri = f"{self.tdr_link}/jobs/{job_id}/result" 486 # If job is expected to fail, accept any return code 487 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 488 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code) 489 490 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 491 """ 492 Load data into a TDR dataset. 493 494 **Args:** 495 - dataset_id (str): The ID of the dataset. 496 - data (dict): The data to be ingested. 497 498 **Returns:** 499 - requests.Response: The response from the request. 500 """ 501 uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest" 502 logging.info( 503 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 504 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 505 return self.request_util.run_request( 506 uri=uri, 507 method=POST, 508 content_type=APPLICATION_JSON, 509 data=data 510 ) 511 512 def file_ingest_to_dataset( 513 self, 514 dataset_id: str, 515 profile_id: str, 516 file_list: list[dict], 517 load_tag: str = "file_ingest_load_tag" 518 ) -> dict: 519 """ 520 Load files into a TDR dataset. 521 522 **Args:** 523 - dataset_id (str): The ID of the dataset. 524 - profile_id (str): The billing profile ID. 525 - file_list (list[dict]): The list of files to be ingested. 526 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 527 528 **Returns:** 529 - dict: A dictionary containing the response from the ingest operation job monitoring. 530 """ 531 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array" 532 data = { 533 "profileId": profile_id, 534 "loadTag": f"{load_tag}", 535 "maxFailedFileLoads": 0, 536 "loadArray": file_list 537 } 538 539 response = self.request_util.run_request( 540 uri=uri, 541 method=POST, 542 content_type=APPLICATION_JSON, 543 data=json.dumps(data) 544 ) 545 job_id = response.json()['id'] 546 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 547 return job_results # type: ignore[return-value] 548 549 def get_dataset_table_metrics( 550 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 551 ) -> list[dict]: 552 """ 553 Retrieve all metrics for a specific table within a dataset. 554 555 **Args:** 556 - dataset_id (str): The ID of the dataset. 557 - target_table_name (str): The name of the target table. 558 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 559 560 **Returns:** 561 - list[dict]: A list of dictionaries containing the metrics for the specified table. 562 """ 563 return [ 564 metric 565 for metric in self._yield_dataset_metrics( 566 dataset_id=dataset_id, 567 target_table_name=target_table_name, 568 query_limit=query_limit 569 ) 570 ] 571 572 def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any: 573 """ 574 Yield all entity metrics from a dataset. 575 576 **Args:** 577 dataset_id (str): The ID of the dataset. 578 target_table_name (str): The name of the target table. 579 query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000. 580 581 Yields: 582 Any: A generator yielding dictionaries containing the metrics for the specified table. 583 """ 584 search_request = { 585 "offset": 0, 586 "limit": query_limit, 587 "sort": "datarepo_row_id" 588 } 589 uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}" 590 while True: 591 batch_number = int((search_request["offset"] / query_limit)) + 1 # type: ignore[operator] 592 response = self.request_util.run_request( 593 uri=uri, 594 method=POST, 595 content_type=APPLICATION_JSON, 596 data=json.dumps(search_request) 597 ) 598 if not response or not response.json()["result"]: 599 break 600 logging.info( 601 f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " + 602 f"dataset {dataset_id}" 603 ) 604 for record in response.json()["result"]: 605 yield record 606 search_request["offset"] += query_limit # type: ignore[operator] 607 608 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 609 """ 610 Get existing IDs from a dataset. 611 612 **Args:** 613 - dataset_id (str): The ID of the dataset. 614 - target_table_name (str): The name of the target table. 615 - entity_id (str): The entity ID to retrieve. 616 617 **Returns:** 618 - list[str]: A list of entity IDs from the specified table. 619 """ 620 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 621 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata] 622 623 def get_job_status(self, job_id: str) -> requests.Response: 624 """ 625 Retrieve the status of a job. 626 627 **Args:** 628 - job_id (str): The ID of the job. 629 630 **Returns:** 631 - requests.Response: The response from the request. 632 """ 633 uri = f"{self.tdr_link}/jobs/{job_id}" 634 return self.request_util.run_request(uri=uri, method=GET) 635 636 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 637 """ 638 Get all file UUIDs from the metadata of a dataset. 639 640 **Args:** 641 - dataset_id (str): The ID of the dataset. 642 643 **Returns:** 644 - list[str]: A list of file UUIDs from the dataset metadata. 645 """ 646 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 647 all_metadata_file_uuids = [] 648 tables = 0 649 for table in dataset_info["schema"]["tables"]: 650 tables += 1 651 table_name = table["name"] 652 logging.info(f"Getting all file information for {table_name}") 653 # Get just columns where datatype is fileref 654 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 655 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 656 # Get unique list of file uuids 657 file_uuids = list( 658 set( 659 [ 660 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 661 ] 662 ) 663 ) 664 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 665 all_metadata_file_uuids.extend(file_uuids) 666 # Make full list unique 667 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 668 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 669 return all_metadata_file_uuids 670 671 def soft_delete_entries( 672 self, 673 dataset_id: str, 674 table_name: str, 675 datarepo_row_ids: list[str], 676 check_intervals: int = 15 677 ) -> Optional[dict]: 678 """ 679 Soft delete specific records from a table. 680 681 **Args:** 682 - dataset_id (str): The ID of the dataset. 683 - table_name (str): The name of the target table. 684 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 685 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 686 687 **Returns:** 688 - dict (optional): A dictionary containing the response from the soft delete operation job 689 monitoring. Returns None if no row IDs are provided. 690 """ 691 if not datarepo_row_ids: 692 logging.info(f"No records found to soft delete in table {table_name}") 693 return None 694 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 695 uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes" 696 payload = { 697 "deleteType": "soft", 698 "specType": "jsonArray", 699 "tables": [ 700 { 701 "tableName": table_name, 702 "jsonArraySpec": { 703 "rowIds": datarepo_row_ids 704 } 705 } 706 ] 707 } 708 response = self.request_util.run_request( 709 method=POST, 710 uri=uri, 711 data=json.dumps(payload), 712 content_type=APPLICATION_JSON 713 ) 714 job_id = response.json()["id"] 715 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run() 716 717 def soft_delete_all_table_entries( 718 self, 719 dataset_id: str, 720 table_name: str, 721 query_limit: int = 1000, 722 check_intervals: int = 15 723 ) -> Optional[dict]: 724 """ 725 Soft deletes all records in a table. 726 727 **Args:** 728 - dataset_id (str): The ID of the dataset. 729 - table_name (str): The name of the target table. 730 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 731 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 732 733 **Returns:** 734 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 735 None if no row IDs are provided. 736 """ 737 dataset_metrics = self.get_dataset_table_metrics( 738 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 739 ) 740 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 741 return self.soft_delete_entries( 742 dataset_id=dataset_id, 743 table_name=table_name, 744 datarepo_row_ids=row_ids, 745 check_intervals=check_intervals 746 ) 747 748 def get_or_create_dataset( 749 self, 750 dataset_name: str, 751 billing_profile: str, 752 schema: dict, 753 description: str, 754 relationships: Optional[list[dict]] = None, 755 delete_existing: bool = False, 756 continue_if_exists: bool = False, 757 additional_properties_dict: Optional[dict] = None 758 ) -> str: 759 """ 760 Get or create a dataset. 761 762 **Args:** 763 - dataset_name (str): The name of the dataset. 764 - billing_profile (str): The billing profile ID. 765 - schema (dict): The schema of the dataset. 766 - description (str): The description of the dataset. 767 - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. 768 Defaults to None. 769 - additional_properties_dict (Optional[dict], optional): Additional properties 770 for the dataset. Defaults to None. 771 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 772 Defaults to `False`. 773 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 774 Defaults to `False`. 775 776 **Returns:** 777 - str: The ID of the dataset. 778 779 **Raises:** 780 - ValueError: If multiple datasets with the same name are found under the billing profile. 781 """ 782 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 783 if existing_datasets: 784 if not continue_if_exists: 785 raise ValueError( 786 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 787 ) 788 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 789 if delete_existing: 790 logging.info(f"Deleting existing dataset {dataset_name}") 791 self.delete_dataset(existing_datasets[0]["id"]) 792 existing_datasets = [] 793 # If not delete_existing and continue_if_exists then grab existing datasets id 794 else: 795 dataset_id = existing_datasets[0]["id"] 796 if not existing_datasets: 797 logging.info("Did not find existing dataset") 798 # Create dataset 799 dataset_id = self.create_dataset( 800 schema=schema, 801 dataset_name=dataset_name, 802 description=description, 803 profile_id=billing_profile, 804 additional_dataset_properties=additional_properties_dict 805 ) 806 return dataset_id 807 808 def create_dataset( # type: ignore[return] 809 self, 810 schema: dict, 811 dataset_name: str, 812 description: str, 813 profile_id: str, 814 additional_dataset_properties: Optional[dict] = None 815 ) -> Optional[str]: 816 """ 817 Create a new dataset. 818 819 **Args:** 820 - schema (dict): The schema of the dataset. 821 - dataset_name (str): The name of the dataset. 822 - description (str): The description of the dataset. 823 - profile_id (str): The billing profile ID. 824 - additional_dataset_properties (Optional[dict], optional): Additional 825 properties for the dataset. Defaults to None. 826 827 **Returns:** 828 - Optional[str]: The ID of the created dataset, or None if creation failed. 829 830 Raises: 831 - ValueError: If the schema validation fails. 832 """ 833 dataset_properties = { 834 "name": dataset_name, 835 "description": description, 836 "defaultProfileId": profile_id, 837 "region": "us-central1", 838 "cloudPlatform": GCP, 839 "schema": schema 840 } 841 842 if additional_dataset_properties: 843 dataset_properties.update(additional_dataset_properties) 844 try: 845 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 846 except ValidationError as e: 847 raise ValueError(f"Schema validation error: {e}") 848 uri = f"{self.tdr_link}/datasets" 849 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 850 response = self.request_util.run_request( 851 method=POST, 852 uri=uri, 853 data=json.dumps(dataset_properties), 854 content_type=APPLICATION_JSON 855 ) 856 job_id = response.json()["id"] 857 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 858 dataset_id = job_results["id"] # type: ignore[index] 859 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 860 return dataset_id 861 862 def update_dataset_schema( # type: ignore[return] 863 self, 864 dataset_id: str, 865 update_note: str, 866 tables_to_add: Optional[list[dict]] = None, 867 relationships_to_add: Optional[list[dict]] = None, 868 columns_to_add: Optional[list[dict]] = None 869 ) -> Optional[str]: 870 """ 871 Update the schema of a dataset. 872 873 **Args:** 874 - dataset_id (str): The ID of the dataset. 875 - update_note (str): A note describing the update. 876 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 877 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 878 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 879 880 **Returns:** 881 - Optional[str]: The ID of the updated dataset, or None if the update failed. 882 883 **Raises:** 884 - ValueError: If the schema validation fails. 885 """ 886 uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema" 887 request_body: dict = {"description": f"{update_note}", "changes": {}} 888 if tables_to_add: 889 request_body["changes"]["addTables"] = tables_to_add 890 if relationships_to_add: 891 request_body["changes"]["addRelationships"] = relationships_to_add 892 if columns_to_add: 893 request_body["changes"]["addColumns"] = columns_to_add 894 try: 895 UpdateSchema(**request_body) 896 except ValidationError as e: 897 raise ValueError(f"Schema validation error: {e}") 898 899 response = self.request_util.run_request( 900 uri=uri, 901 method=POST, 902 content_type=APPLICATION_JSON, 903 data=json.dumps(request_body) 904 ) 905 job_id = response.json()["id"] 906 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 907 dataset_id = job_results["id"] # type: ignore[index] 908 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 909 return dataset_id 910 911 def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]: 912 """ 913 Get response from a batched endpoint. 914 915 Helper method for all GET endpoints that require batching. 916 917 Given the URI and the limit (optional), will 918 loop through batches until all metadata is retrieved. 919 920 **Args:** 921 - uri (str): The base URI for the endpoint (without query params for offset or limit). 922 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 923 924 **Returns:** 925 - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint. 926 """ 927 batch = 1 928 offset = 0 929 metadata: list = [] 930 while True: 931 logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata") 932 response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json() 933 934 # If no more files, break the loop 935 if not response_json: 936 logging.info(f"No more results to retrieve, found {len(metadata)} total records") 937 break 938 939 metadata.extend(response_json) 940 # Increment the offset by limit for the next page 941 offset += limit 942 batch += 1 943 return metadata 944 945 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 946 """ 947 Return all the metadata about files in a given snapshot. 948 949 Not all files can be returned at once, so the API 950 is used repeatedly until all "batches" have been returned. 951 952 **Args:** 953 - snapshot_id (str): The ID of the snapshot. 954 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 955 956 **Returns:** 957 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 958 """ 959 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files" 960 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 961 962 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 963 """ 964 Return snapshots belonging to specified dataset. 965 966 **Args:** 967 - dataset_id: uuid of dataset to query. 968 969 **Returns:** 970 - requests.Response: The response from the request. 971 """ 972 uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}" 973 return self.request_util.run_request( 974 uri=uri, 975 method=GET 976 ) 977 978 def create_snapshot( 979 self, 980 snapshot_name: str, 981 description: str, 982 dataset_name: str, 983 snapshot_mode: str, # byFullView is entire dataset 984 profile_id: str, 985 stewards: Optional[list[str]] = [], 986 readers: Optional[list[str]] = [], 987 consent_code: Optional[str] = None, 988 duos_id: Optional[str] = None, 989 data_access_control_groups: Optional[list[str]] = None, 990 ) -> None: 991 """ 992 Create a snapshot in TDR. 993 994 **Returns:** 995 - requests.Response: The response from the request. 996 """ 997 uri = f"{self.tdr_link}/snapshots" 998 payload = { 999 "name": snapshot_name, 1000 "description": description, 1001 "contents": [ 1002 { 1003 "datasetName": dataset_name, 1004 "mode": snapshot_mode, 1005 } 1006 ], 1007 "policies": { 1008 "stewards": stewards, 1009 "readers": readers, 1010 }, 1011 "profileId": profile_id, 1012 "globalFileIds": True, 1013 } 1014 if consent_code: 1015 payload["consentCode"] = consent_code 1016 if duos_id: 1017 payload["duosId"] = duos_id 1018 if data_access_control_groups: 1019 payload["dataAccessControlGroups"] = data_access_control_groups 1020 logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}") 1021 response = self.request_util.run_request( 1022 uri=uri, 1023 method=POST, 1024 content_type=APPLICATION_JSON, 1025 data=json.dumps(payload) 1026 ) 1027 job_id = response.json()["id"] 1028 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 1029 snapshot_id = job_results["id"] # type: ignore[index] 1030 logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}") 1031 1032 1033class FilterOutSampleIdsAlreadyInDataset: 1034 """Class to filter ingest metrics to remove sample IDs that already exist in the dataset.""" 1035 1036 def __init__( 1037 self, 1038 ingest_metrics: list[dict], 1039 dataset_id: str, 1040 tdr: TDR, 1041 target_table_name: str, 1042 filter_entity_id: str 1043 ): 1044 """ 1045 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1046 1047 **Args:** 1048 - ingest_metrics (list[dict]): The metrics to be ingested. 1049 - dataset_id (str): The ID of the dataset. 1050 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1051 - target_table_name (str): The name of the target table. 1052 - filter_entity_id (str): The entity ID to filter on. 1053 """ 1054 self.ingest_metrics = ingest_metrics 1055 """@private""" 1056 self.tdr = tdr 1057 """@private""" 1058 self.dataset_id = dataset_id 1059 """@private""" 1060 self.target_table_name = target_table_name 1061 """@private""" 1062 self.filter_entity_id = filter_entity_id 1063 """@private""" 1064 1065 def run(self) -> list[dict]: 1066 """ 1067 Run the filter process to remove sample IDs that already exist in the dataset. 1068 1069 **Returns:** 1070 - list[dict]: The filtered ingest metrics. 1071 """ 1072 # Get all sample ids that already exist in dataset 1073 logging.info( 1074 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1075 f"dataset {self.dataset_id}" 1076 ) 1077 1078 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1079 dataset_id=self.dataset_id, 1080 target_table_name=self.target_table_name, 1081 entity_id=self.filter_entity_id 1082 ) 1083 # Filter out rows that already exist in dataset 1084 filtered_ingest_metrics = [ 1085 row 1086 for row in self.ingest_metrics 1087 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1088 ] 1089 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1090 logging.info( 1091 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1092 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1093 ) 1094 1095 if filtered_ingest_metrics: 1096 return filtered_ingest_metrics 1097 else: 1098 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1099 return [] 1100 else: 1101 logging.info("No rows were filtered out as they all do not exist in dataset") 1102 return filtered_ingest_metrics
17class TDR: 18 """Class to interact with the Terra Data Repository (TDR) API.""" 19 20 PROD_LINK = "https://data.terra.bio/api/repository/v1" 21 DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1" 22 """(str): The base URL for the TDR API.""" 23 24 def __init__(self, request_util: RunRequest, env: str = 'prod'): 25 """ 26 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 27 28 **Args:** 29 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 30 """ 31 self.request_util = request_util 32 if env.lower() == 'prod': 33 self.tdr_link = self.PROD_LINK 34 elif env.lower() == 'dev': 35 self.tdr_link = self.DEV_LINK 36 else: 37 raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.") 38 """@private""" 39 40 @staticmethod 41 def _check_policy(policy: str) -> None: 42 """ 43 Check if the policy is valid. 44 45 **Args:** 46 - policy (str): The role to check. 47 48 **Raises:** 49 - ValueError: If the policy is not one of the allowed options. 50 """ 51 if policy not in ["steward", "custodian", "snapshot_creator"]: 52 raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator") 53 54 def get_dataset_files( 55 self, 56 dataset_id: str, 57 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 58 ) -> list[dict]: 59 """ 60 Get all files in a dataset. 61 62 Returns json like below 63 64 { 65 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 66 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 67 "path": "/path/set/in/ingest.csv", 68 "size": 1722, 69 "checksums": [ 70 { 71 "checksum": "82f7e79v", 72 "type": "crc32c" 73 }, 74 { 75 "checksum": "fff973507e30b74fa47a3d6830b84a90", 76 "type": "md5" 77 } 78 ], 79 "created": "2024-13-11T15:01:00.256Z", 80 "description": null, 81 "fileType": "file", 82 "fileDetail": { 83 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 84 "mimeType": null, 85 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 86 "loadTag": "RP_3333-RP_3333" 87 }, 88 "directoryDetail": null 89 } 90 91 **Args:** 92 - dataset_id (str): The ID of the dataset. 93 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 94 95 **Returns:** 96 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 97 """ 98 uri = f"{self.tdr_link}/datasets/{dataset_id}/files" 99 logging.info(f"Getting all files in dataset {dataset_id}") 100 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 101 102 def create_file_dict( 103 self, 104 dataset_id: str, 105 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 106 ) -> dict: 107 """ 108 Create a dictionary of all files in a dataset where the key is the file UUID. 109 110 **Args:** 111 - dataset_id (str): The ID of the dataset. 112 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 113 114 **Returns:** 115 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 116 """ 117 return { 118 file_dict["fileId"]: file_dict 119 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 120 } 121 122 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 123 self, 124 dataset_id: str, 125 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 126 ) -> dict: 127 """ 128 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 129 130 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 131 132 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 133 134 **Args:** 135 - dataset_id (str): The ID of the dataset. 136 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 137 138 **Returns:** 139 - dict: A dictionary where the key is the file UUID and the value is the file path. 140 """ 141 return { 142 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 143 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 144 } 145 146 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 147 """ 148 Delete a file from a dataset. 149 150 **Args:** 151 - file_id (str): The ID of the file to be deleted. 152 - dataset_id (str): The ID of the dataset. 153 154 **Returns:** 155 - requests.Response: The response from the request. 156 """ 157 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}" 158 logging.info(f"Submitting delete job for file {file_id}") 159 return self.request_util.run_request(uri=uri, method=DELETE) 160 161 def delete_files( 162 self, 163 file_ids: list[str], 164 dataset_id: str, 165 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 166 check_interval: int = 15) -> None: 167 """ 168 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 169 170 **Args:** 171 - file_ids (list[str]): A list of file IDs to be deleted. 172 - dataset_id (str): The ID of the dataset. 173 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 174 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 175 """ 176 SubmitAndMonitorMultipleJobs( 177 tdr=self, 178 job_function=self.delete_file, 179 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 180 batch_size=batch_size_to_delete_files, 181 check_interval=check_interval 182 ).run() 183 184 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 185 """ 186 Add a user to a dataset with a specified policy. 187 188 **Args:** 189 - dataset_id (str): The ID of the dataset. 190 - user (str): The email of the user to be added. 191 - policy (str): The policy to be assigned to the user. 192 Must be one of `steward`, `custodian`, or `snapshot_creator`. 193 194 **Returns:** 195 - requests.Response: The response from the request. 196 197 **Raises:** 198 - ValueError: If the policy is not valid. 199 """ 200 self._check_policy(policy) 201 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members" 202 member_dict = {"email": user} 203 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 204 return self.request_util.run_request( 205 uri=uri, 206 method=POST, 207 data=json.dumps(member_dict), 208 content_type=APPLICATION_JSON 209 ) 210 211 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 212 """ 213 Remove a user from a dataset. 214 215 **Args:** 216 - dataset_id (str): The ID of the dataset. 217 - user (str): The email of the user to be removed. 218 - policy (str): The policy to be removed from the user. 219 Must be one of `steward`, `custodian`, or `snapshot_creator`. 220 221 **Returns:** 222 - requests.Response: The response from the request. 223 224 **Raises:** 225 - ValueError: If the policy is not valid. 226 """ 227 self._check_policy(policy) 228 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}" 229 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 230 return self.request_util.run_request(uri=uri, method=DELETE) 231 232 def delete_dataset(self, dataset_id: str) -> None: 233 """ 234 Delete a dataset and monitors the job until completion. 235 236 **Args:** 237 dataset_id (str): The ID of the dataset to be deleted. 238 """ 239 uri = f"{self.tdr_link}/datasets/{dataset_id}" 240 logging.info(f"Deleting dataset {dataset_id}") 241 response = self.request_util.run_request(uri=uri, method=DELETE) 242 job_id = response.json()['id'] 243 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run() 244 245 def make_snapshot_public(self, snapshot_id: str) -> requests.Response: 246 """ 247 Make a snapshot public. 248 249 **Args:** 250 - snapshot_id (str): The ID of the snapshot to be made public. 251 252 **Returns:** 253 - requests.Response: The response from the request. 254 """ 255 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public" 256 logging.info(f"Making snapshot {snapshot_id} public") 257 return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true") 258 259 def get_snapshot_info( 260 self, 261 snapshot_id: str, 262 continue_not_found: bool = False, 263 info_to_include: Optional[list[str]] = None 264 ) -> Optional[requests.Response]: 265 """ 266 Get information about a snapshot. 267 268 **Args:** 269 - snapshot_id (str): The ID of the snapshot. 270 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 271 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 272 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 273 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 274 275 **Returns:** 276 - requests.Response (optional): The response from the request (returns None if the snapshot is not 277 found or access is denied). 278 """ 279 acceptable_return_code = [404, 403] if continue_not_found else [] 280 acceptable_include_info = [ 281 "SOURCES", 282 "TABLES", 283 "RELATIONSHIPS", 284 "ACCESS_INFORMATION", 285 "PROFILE", 286 "PROPERTIES", 287 "DATA_PROJECT", 288 "CREATION_INFORMATION", 289 "DUOS" 290 ] 291 if info_to_include: 292 if not all(info in acceptable_include_info for info in info_to_include): 293 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 294 include_string = '&include='.join(info_to_include) 295 else: 296 include_string = "" 297 uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}" 298 response = self.request_util.run_request( 299 uri=uri, 300 method=GET, 301 accept_return_codes=acceptable_return_code 302 ) 303 if response.status_code == 404: 304 logging.warning(f"Snapshot {snapshot_id} not found") 305 return None 306 if response.status_code == 403: 307 logging.warning(f"Access denied for snapshot {snapshot_id}") 308 return None 309 return response 310 311 def delete_snapshots( 312 self, 313 snapshot_ids: list[str], 314 batch_size: int = 25, 315 check_interval: int = 10, 316 verbose: bool = False) -> None: 317 """ 318 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 319 320 **Args:** 321 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 322 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 323 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 324 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 325 """ 326 SubmitAndMonitorMultipleJobs( 327 tdr=self, 328 job_function=self.delete_snapshot, 329 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 330 batch_size=batch_size, 331 check_interval=check_interval, 332 verbose=verbose 333 ).run() 334 335 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 336 """ 337 Delete a snapshot. 338 339 **Args:** 340 - snapshot_id (str): The ID of the snapshot to be deleted. 341 342 **Returns:** 343 - requests.Response: The response from the request. 344 """ 345 uri = f"{self.tdr_link}/snapshots/{snapshot_id}" 346 logging.info(f"Deleting snapshot {snapshot_id}") 347 return self.request_util.run_request(uri=uri, method=DELETE) 348 349 def _yield_existing_datasets( 350 self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc" 351 ) -> Any: 352 """ 353 Get all datasets in TDR, optionally filtered by dataset name. 354 355 **Args:** 356 filter (Optional[str]): A filter string to match dataset names. Defaults to None. 357 batch_size (int): The number of datasets to retrieve per batch. Defaults to 100. 358 direction (str): The direction to sort the datasets by creation date. Defaults to "asc". 359 360 Yields: 361 Any: A generator yielding datasets. 362 """ 363 offset = 0 364 if filter: 365 filter_str = f"&filter={filter}" 366 log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}" 367 else: 368 filter_str = "" 369 log_message = f"Searching for all datasets in batches of {batch_size}" 370 logging.info(log_message) 371 while True: 372 uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501 373 response = self.request_util.run_request(uri=uri, method=GET) 374 datasets = response.json()["items"] 375 if not datasets: 376 break 377 for dataset in datasets: 378 yield dataset 379 offset += batch_size 380 break 381 382 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 383 """ 384 Check if a dataset exists by name and optionally by billing profile. 385 386 **Args:** 387 - dataset_name (str): The name of the dataset to check. 388 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 389 390 **Returns:** 391 - list[dict]: A list of matching datasets. 392 """ 393 matching_datasets = [] 394 for dataset in self._yield_existing_datasets(filter=dataset_name): 395 # Search uses wildcard so could grab more datasets where dataset_name is substring 396 if dataset_name == dataset["name"]: 397 if billing_profile: 398 if dataset["defaultProfileId"] == billing_profile: 399 logging.info( 400 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 401 dataset_id = dataset["id"] 402 logging.info(f"Dataset ID: {dataset_id}") 403 matching_datasets.append(dataset) 404 else: 405 logging.warning( 406 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 407 f"and not under billing profile {billing_profile}" 408 ) 409 # Datasets names need to be unique regardless of billing profile, so raise an error if 410 # a dataset with the same name is found but is not under the requested billing profile 411 raise ValueError( 412 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 413 else: 414 matching_datasets.append(dataset) 415 return matching_datasets 416 417 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 418 """ 419 Get information about a dataset. 420 421 **Args:** 422 - dataset_id (str): The ID of the dataset. 423 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 424 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 425 Defaults to None. 426 427 **Returns:** 428 - requests.Response: The response from the request. 429 430 **Raises:** 431 - ValueError: If `info_to_include` contains invalid information types. 432 """ 433 acceptable_include_info = [ 434 "SCHEMA", 435 "ACCESS_INFORMATION", 436 "PROFILE", 437 "PROPERTIES", 438 "DATA_PROJECT", 439 "STORAGE", 440 "SNAPSHOT_BUILDER_SETTING" 441 ] 442 if info_to_include: 443 if not all(info in acceptable_include_info for info in info_to_include): 444 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 445 include_string = '&include='.join(info_to_include) 446 else: 447 include_string = "" 448 uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}" 449 return self.request_util.run_request(uri=uri, method=GET) 450 451 def get_table_schema_info( 452 self, 453 dataset_id: str, 454 table_name: str, 455 dataset_info: Optional[dict] = None 456 ) -> Union[dict, None]: 457 """ 458 Get schema information for a specific table within a dataset. 459 460 **Args:** 461 - dataset_id (str): The ID of the dataset. 462 - table_name (str): The name of the table. 463 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 464 465 **Returns:** 466 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 467 """ 468 if not dataset_info: 469 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 470 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 471 if table["name"] == table_name: 472 return table 473 return None 474 475 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 476 """ 477 Retrieve the result of a job. 478 479 **Args:** 480 - job_id (str): The ID of the job. 481 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 482 483 **Returns:** 484 - requests.Response: The response from the request. 485 """ 486 uri = f"{self.tdr_link}/jobs/{job_id}/result" 487 # If job is expected to fail, accept any return code 488 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 489 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code) 490 491 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 492 """ 493 Load data into a TDR dataset. 494 495 **Args:** 496 - dataset_id (str): The ID of the dataset. 497 - data (dict): The data to be ingested. 498 499 **Returns:** 500 - requests.Response: The response from the request. 501 """ 502 uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest" 503 logging.info( 504 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 505 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 506 return self.request_util.run_request( 507 uri=uri, 508 method=POST, 509 content_type=APPLICATION_JSON, 510 data=data 511 ) 512 513 def file_ingest_to_dataset( 514 self, 515 dataset_id: str, 516 profile_id: str, 517 file_list: list[dict], 518 load_tag: str = "file_ingest_load_tag" 519 ) -> dict: 520 """ 521 Load files into a TDR dataset. 522 523 **Args:** 524 - dataset_id (str): The ID of the dataset. 525 - profile_id (str): The billing profile ID. 526 - file_list (list[dict]): The list of files to be ingested. 527 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 528 529 **Returns:** 530 - dict: A dictionary containing the response from the ingest operation job monitoring. 531 """ 532 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array" 533 data = { 534 "profileId": profile_id, 535 "loadTag": f"{load_tag}", 536 "maxFailedFileLoads": 0, 537 "loadArray": file_list 538 } 539 540 response = self.request_util.run_request( 541 uri=uri, 542 method=POST, 543 content_type=APPLICATION_JSON, 544 data=json.dumps(data) 545 ) 546 job_id = response.json()['id'] 547 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 548 return job_results # type: ignore[return-value] 549 550 def get_dataset_table_metrics( 551 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 552 ) -> list[dict]: 553 """ 554 Retrieve all metrics for a specific table within a dataset. 555 556 **Args:** 557 - dataset_id (str): The ID of the dataset. 558 - target_table_name (str): The name of the target table. 559 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 560 561 **Returns:** 562 - list[dict]: A list of dictionaries containing the metrics for the specified table. 563 """ 564 return [ 565 metric 566 for metric in self._yield_dataset_metrics( 567 dataset_id=dataset_id, 568 target_table_name=target_table_name, 569 query_limit=query_limit 570 ) 571 ] 572 573 def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any: 574 """ 575 Yield all entity metrics from a dataset. 576 577 **Args:** 578 dataset_id (str): The ID of the dataset. 579 target_table_name (str): The name of the target table. 580 query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000. 581 582 Yields: 583 Any: A generator yielding dictionaries containing the metrics for the specified table. 584 """ 585 search_request = { 586 "offset": 0, 587 "limit": query_limit, 588 "sort": "datarepo_row_id" 589 } 590 uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}" 591 while True: 592 batch_number = int((search_request["offset"] / query_limit)) + 1 # type: ignore[operator] 593 response = self.request_util.run_request( 594 uri=uri, 595 method=POST, 596 content_type=APPLICATION_JSON, 597 data=json.dumps(search_request) 598 ) 599 if not response or not response.json()["result"]: 600 break 601 logging.info( 602 f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " + 603 f"dataset {dataset_id}" 604 ) 605 for record in response.json()["result"]: 606 yield record 607 search_request["offset"] += query_limit # type: ignore[operator] 608 609 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 610 """ 611 Get existing IDs from a dataset. 612 613 **Args:** 614 - dataset_id (str): The ID of the dataset. 615 - target_table_name (str): The name of the target table. 616 - entity_id (str): The entity ID to retrieve. 617 618 **Returns:** 619 - list[str]: A list of entity IDs from the specified table. 620 """ 621 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 622 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata] 623 624 def get_job_status(self, job_id: str) -> requests.Response: 625 """ 626 Retrieve the status of a job. 627 628 **Args:** 629 - job_id (str): The ID of the job. 630 631 **Returns:** 632 - requests.Response: The response from the request. 633 """ 634 uri = f"{self.tdr_link}/jobs/{job_id}" 635 return self.request_util.run_request(uri=uri, method=GET) 636 637 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 638 """ 639 Get all file UUIDs from the metadata of a dataset. 640 641 **Args:** 642 - dataset_id (str): The ID of the dataset. 643 644 **Returns:** 645 - list[str]: A list of file UUIDs from the dataset metadata. 646 """ 647 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 648 all_metadata_file_uuids = [] 649 tables = 0 650 for table in dataset_info["schema"]["tables"]: 651 tables += 1 652 table_name = table["name"] 653 logging.info(f"Getting all file information for {table_name}") 654 # Get just columns where datatype is fileref 655 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 656 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 657 # Get unique list of file uuids 658 file_uuids = list( 659 set( 660 [ 661 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 662 ] 663 ) 664 ) 665 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 666 all_metadata_file_uuids.extend(file_uuids) 667 # Make full list unique 668 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 669 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 670 return all_metadata_file_uuids 671 672 def soft_delete_entries( 673 self, 674 dataset_id: str, 675 table_name: str, 676 datarepo_row_ids: list[str], 677 check_intervals: int = 15 678 ) -> Optional[dict]: 679 """ 680 Soft delete specific records from a table. 681 682 **Args:** 683 - dataset_id (str): The ID of the dataset. 684 - table_name (str): The name of the target table. 685 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 686 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 687 688 **Returns:** 689 - dict (optional): A dictionary containing the response from the soft delete operation job 690 monitoring. Returns None if no row IDs are provided. 691 """ 692 if not datarepo_row_ids: 693 logging.info(f"No records found to soft delete in table {table_name}") 694 return None 695 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 696 uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes" 697 payload = { 698 "deleteType": "soft", 699 "specType": "jsonArray", 700 "tables": [ 701 { 702 "tableName": table_name, 703 "jsonArraySpec": { 704 "rowIds": datarepo_row_ids 705 } 706 } 707 ] 708 } 709 response = self.request_util.run_request( 710 method=POST, 711 uri=uri, 712 data=json.dumps(payload), 713 content_type=APPLICATION_JSON 714 ) 715 job_id = response.json()["id"] 716 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run() 717 718 def soft_delete_all_table_entries( 719 self, 720 dataset_id: str, 721 table_name: str, 722 query_limit: int = 1000, 723 check_intervals: int = 15 724 ) -> Optional[dict]: 725 """ 726 Soft deletes all records in a table. 727 728 **Args:** 729 - dataset_id (str): The ID of the dataset. 730 - table_name (str): The name of the target table. 731 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 732 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 733 734 **Returns:** 735 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 736 None if no row IDs are provided. 737 """ 738 dataset_metrics = self.get_dataset_table_metrics( 739 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 740 ) 741 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 742 return self.soft_delete_entries( 743 dataset_id=dataset_id, 744 table_name=table_name, 745 datarepo_row_ids=row_ids, 746 check_intervals=check_intervals 747 ) 748 749 def get_or_create_dataset( 750 self, 751 dataset_name: str, 752 billing_profile: str, 753 schema: dict, 754 description: str, 755 relationships: Optional[list[dict]] = None, 756 delete_existing: bool = False, 757 continue_if_exists: bool = False, 758 additional_properties_dict: Optional[dict] = None 759 ) -> str: 760 """ 761 Get or create a dataset. 762 763 **Args:** 764 - dataset_name (str): The name of the dataset. 765 - billing_profile (str): The billing profile ID. 766 - schema (dict): The schema of the dataset. 767 - description (str): The description of the dataset. 768 - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. 769 Defaults to None. 770 - additional_properties_dict (Optional[dict], optional): Additional properties 771 for the dataset. Defaults to None. 772 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 773 Defaults to `False`. 774 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 775 Defaults to `False`. 776 777 **Returns:** 778 - str: The ID of the dataset. 779 780 **Raises:** 781 - ValueError: If multiple datasets with the same name are found under the billing profile. 782 """ 783 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 784 if existing_datasets: 785 if not continue_if_exists: 786 raise ValueError( 787 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 788 ) 789 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 790 if delete_existing: 791 logging.info(f"Deleting existing dataset {dataset_name}") 792 self.delete_dataset(existing_datasets[0]["id"]) 793 existing_datasets = [] 794 # If not delete_existing and continue_if_exists then grab existing datasets id 795 else: 796 dataset_id = existing_datasets[0]["id"] 797 if not existing_datasets: 798 logging.info("Did not find existing dataset") 799 # Create dataset 800 dataset_id = self.create_dataset( 801 schema=schema, 802 dataset_name=dataset_name, 803 description=description, 804 profile_id=billing_profile, 805 additional_dataset_properties=additional_properties_dict 806 ) 807 return dataset_id 808 809 def create_dataset( # type: ignore[return] 810 self, 811 schema: dict, 812 dataset_name: str, 813 description: str, 814 profile_id: str, 815 additional_dataset_properties: Optional[dict] = None 816 ) -> Optional[str]: 817 """ 818 Create a new dataset. 819 820 **Args:** 821 - schema (dict): The schema of the dataset. 822 - dataset_name (str): The name of the dataset. 823 - description (str): The description of the dataset. 824 - profile_id (str): The billing profile ID. 825 - additional_dataset_properties (Optional[dict], optional): Additional 826 properties for the dataset. Defaults to None. 827 828 **Returns:** 829 - Optional[str]: The ID of the created dataset, or None if creation failed. 830 831 Raises: 832 - ValueError: If the schema validation fails. 833 """ 834 dataset_properties = { 835 "name": dataset_name, 836 "description": description, 837 "defaultProfileId": profile_id, 838 "region": "us-central1", 839 "cloudPlatform": GCP, 840 "schema": schema 841 } 842 843 if additional_dataset_properties: 844 dataset_properties.update(additional_dataset_properties) 845 try: 846 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 847 except ValidationError as e: 848 raise ValueError(f"Schema validation error: {e}") 849 uri = f"{self.tdr_link}/datasets" 850 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 851 response = self.request_util.run_request( 852 method=POST, 853 uri=uri, 854 data=json.dumps(dataset_properties), 855 content_type=APPLICATION_JSON 856 ) 857 job_id = response.json()["id"] 858 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 859 dataset_id = job_results["id"] # type: ignore[index] 860 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 861 return dataset_id 862 863 def update_dataset_schema( # type: ignore[return] 864 self, 865 dataset_id: str, 866 update_note: str, 867 tables_to_add: Optional[list[dict]] = None, 868 relationships_to_add: Optional[list[dict]] = None, 869 columns_to_add: Optional[list[dict]] = None 870 ) -> Optional[str]: 871 """ 872 Update the schema of a dataset. 873 874 **Args:** 875 - dataset_id (str): The ID of the dataset. 876 - update_note (str): A note describing the update. 877 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 878 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 879 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 880 881 **Returns:** 882 - Optional[str]: The ID of the updated dataset, or None if the update failed. 883 884 **Raises:** 885 - ValueError: If the schema validation fails. 886 """ 887 uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema" 888 request_body: dict = {"description": f"{update_note}", "changes": {}} 889 if tables_to_add: 890 request_body["changes"]["addTables"] = tables_to_add 891 if relationships_to_add: 892 request_body["changes"]["addRelationships"] = relationships_to_add 893 if columns_to_add: 894 request_body["changes"]["addColumns"] = columns_to_add 895 try: 896 UpdateSchema(**request_body) 897 except ValidationError as e: 898 raise ValueError(f"Schema validation error: {e}") 899 900 response = self.request_util.run_request( 901 uri=uri, 902 method=POST, 903 content_type=APPLICATION_JSON, 904 data=json.dumps(request_body) 905 ) 906 job_id = response.json()["id"] 907 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 908 dataset_id = job_results["id"] # type: ignore[index] 909 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 910 return dataset_id 911 912 def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]: 913 """ 914 Get response from a batched endpoint. 915 916 Helper method for all GET endpoints that require batching. 917 918 Given the URI and the limit (optional), will 919 loop through batches until all metadata is retrieved. 920 921 **Args:** 922 - uri (str): The base URI for the endpoint (without query params for offset or limit). 923 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 924 925 **Returns:** 926 - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint. 927 """ 928 batch = 1 929 offset = 0 930 metadata: list = [] 931 while True: 932 logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata") 933 response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json() 934 935 # If no more files, break the loop 936 if not response_json: 937 logging.info(f"No more results to retrieve, found {len(metadata)} total records") 938 break 939 940 metadata.extend(response_json) 941 # Increment the offset by limit for the next page 942 offset += limit 943 batch += 1 944 return metadata 945 946 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 947 """ 948 Return all the metadata about files in a given snapshot. 949 950 Not all files can be returned at once, so the API 951 is used repeatedly until all "batches" have been returned. 952 953 **Args:** 954 - snapshot_id (str): The ID of the snapshot. 955 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 956 957 **Returns:** 958 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 959 """ 960 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files" 961 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 962 963 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 964 """ 965 Return snapshots belonging to specified dataset. 966 967 **Args:** 968 - dataset_id: uuid of dataset to query. 969 970 **Returns:** 971 - requests.Response: The response from the request. 972 """ 973 uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}" 974 return self.request_util.run_request( 975 uri=uri, 976 method=GET 977 ) 978 979 def create_snapshot( 980 self, 981 snapshot_name: str, 982 description: str, 983 dataset_name: str, 984 snapshot_mode: str, # byFullView is entire dataset 985 profile_id: str, 986 stewards: Optional[list[str]] = [], 987 readers: Optional[list[str]] = [], 988 consent_code: Optional[str] = None, 989 duos_id: Optional[str] = None, 990 data_access_control_groups: Optional[list[str]] = None, 991 ) -> None: 992 """ 993 Create a snapshot in TDR. 994 995 **Returns:** 996 - requests.Response: The response from the request. 997 """ 998 uri = f"{self.tdr_link}/snapshots" 999 payload = { 1000 "name": snapshot_name, 1001 "description": description, 1002 "contents": [ 1003 { 1004 "datasetName": dataset_name, 1005 "mode": snapshot_mode, 1006 } 1007 ], 1008 "policies": { 1009 "stewards": stewards, 1010 "readers": readers, 1011 }, 1012 "profileId": profile_id, 1013 "globalFileIds": True, 1014 } 1015 if consent_code: 1016 payload["consentCode"] = consent_code 1017 if duos_id: 1018 payload["duosId"] = duos_id 1019 if data_access_control_groups: 1020 payload["dataAccessControlGroups"] = data_access_control_groups 1021 logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}") 1022 response = self.request_util.run_request( 1023 uri=uri, 1024 method=POST, 1025 content_type=APPLICATION_JSON, 1026 data=json.dumps(payload) 1027 ) 1028 job_id = response.json()["id"] 1029 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 1030 snapshot_id = job_results["id"] # type: ignore[index] 1031 logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")
Class to interact with the Terra Data Repository (TDR) API.
24 def __init__(self, request_util: RunRequest, env: str = 'prod'): 25 """ 26 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 27 28 **Args:** 29 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 30 """ 31 self.request_util = request_util 32 if env.lower() == 'prod': 33 self.tdr_link = self.PROD_LINK 34 elif env.lower() == 'dev': 35 self.tdr_link = self.DEV_LINK 36 else: 37 raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.") 38 """@private"""
Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
Args:
- request_util (
ops_utils.request_util.RunRequest
): Utility for making HTTP requests.
(str): The base URL for the TDR API.
54 def get_dataset_files( 55 self, 56 dataset_id: str, 57 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 58 ) -> list[dict]: 59 """ 60 Get all files in a dataset. 61 62 Returns json like below 63 64 { 65 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 66 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 67 "path": "/path/set/in/ingest.csv", 68 "size": 1722, 69 "checksums": [ 70 { 71 "checksum": "82f7e79v", 72 "type": "crc32c" 73 }, 74 { 75 "checksum": "fff973507e30b74fa47a3d6830b84a90", 76 "type": "md5" 77 } 78 ], 79 "created": "2024-13-11T15:01:00.256Z", 80 "description": null, 81 "fileType": "file", 82 "fileDetail": { 83 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 84 "mimeType": null, 85 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 86 "loadTag": "RP_3333-RP_3333" 87 }, 88 "directoryDetail": null 89 } 90 91 **Args:** 92 - dataset_id (str): The ID of the dataset. 93 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 94 95 **Returns:** 96 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 97 """ 98 uri = f"{self.tdr_link}/datasets/{dataset_id}/files" 99 logging.info(f"Getting all files in dataset {dataset_id}") 100 return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
Get all files in a dataset.
Returns json like below
{
"fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
"collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
"path": "/path/set/in/ingest.csv",
"size": 1722,
"checksums": [
{
"checksum": "82f7e79v",
"type": "crc32c"
},
{
"checksum": "fff973507e30b74fa47a3d6830b84a90",
"type": "md5"
}
],
"created": "2024-13-11T15:01:00.256Z",
"description": null,
"fileType": "file",
"fileDetail": {
"datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
"mimeType": null,
"accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
"loadTag": "RP_3333-RP_3333"
},
"directoryDetail": null
}
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
102 def create_file_dict( 103 self, 104 dataset_id: str, 105 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 106 ) -> dict: 107 """ 108 Create a dictionary of all files in a dataset where the key is the file UUID. 109 110 **Args:** 111 - dataset_id (str): The ID of the dataset. 112 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 113 114 **Returns:** 115 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 116 """ 117 return { 118 file_dict["fileId"]: file_dict 119 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 120 }
Create a dictionary of all files in a dataset where the key is the file UUID.
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- dict: A dictionary where the key is the file UUID and the value is the file metadata.
122 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 123 self, 124 dataset_id: str, 125 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 126 ) -> dict: 127 """ 128 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 129 130 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 131 132 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 133 134 **Args:** 135 - dataset_id (str): The ID of the dataset. 136 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 137 138 **Returns:** 139 - dict: A dictionary where the key is the file UUID and the value is the file path. 140 """ 141 return { 142 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 143 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 144 }
Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
This assumes that the TDR 'path' is original path of the file in the cloud storage with gs://
stripped out.
This will ONLY work if dataset was created with experimentalSelfHosted = True
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- dict: A dictionary where the key is the file UUID and the value is the file path.
146 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 147 """ 148 Delete a file from a dataset. 149 150 **Args:** 151 - file_id (str): The ID of the file to be deleted. 152 - dataset_id (str): The ID of the dataset. 153 154 **Returns:** 155 - requests.Response: The response from the request. 156 """ 157 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}" 158 logging.info(f"Submitting delete job for file {file_id}") 159 return self.request_util.run_request(uri=uri, method=DELETE)
Delete a file from a dataset.
Args:
- file_id (str): The ID of the file to be deleted.
- dataset_id (str): The ID of the dataset.
Returns:
- requests.Response: The response from the request.
161 def delete_files( 162 self, 163 file_ids: list[str], 164 dataset_id: str, 165 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 166 check_interval: int = 15) -> None: 167 """ 168 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 169 170 **Args:** 171 - file_ids (list[str]): A list of file IDs to be deleted. 172 - dataset_id (str): The ID of the dataset. 173 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 174 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 175 """ 176 SubmitAndMonitorMultipleJobs( 177 tdr=self, 178 job_function=self.delete_file, 179 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 180 batch_size=batch_size_to_delete_files, 181 check_interval=check_interval 182 ).run()
Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
Args:
- file_ids (list[str]): A list of file IDs to be deleted.
- dataset_id (str): The ID of the dataset.
- batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to
200
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
184 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 185 """ 186 Add a user to a dataset with a specified policy. 187 188 **Args:** 189 - dataset_id (str): The ID of the dataset. 190 - user (str): The email of the user to be added. 191 - policy (str): The policy to be assigned to the user. 192 Must be one of `steward`, `custodian`, or `snapshot_creator`. 193 194 **Returns:** 195 - requests.Response: The response from the request. 196 197 **Raises:** 198 - ValueError: If the policy is not valid. 199 """ 200 self._check_policy(policy) 201 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members" 202 member_dict = {"email": user} 203 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 204 return self.request_util.run_request( 205 uri=uri, 206 method=POST, 207 data=json.dumps(member_dict), 208 content_type=APPLICATION_JSON 209 )
Add a user to a dataset with a specified policy.
Args:
- dataset_id (str): The ID of the dataset.
- user (str): The email of the user to be added.
- policy (str): The policy to be assigned to the user.
Must be one of
steward
,custodian
, orsnapshot_creator
.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If the policy is not valid.
211 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 212 """ 213 Remove a user from a dataset. 214 215 **Args:** 216 - dataset_id (str): The ID of the dataset. 217 - user (str): The email of the user to be removed. 218 - policy (str): The policy to be removed from the user. 219 Must be one of `steward`, `custodian`, or `snapshot_creator`. 220 221 **Returns:** 222 - requests.Response: The response from the request. 223 224 **Raises:** 225 - ValueError: If the policy is not valid. 226 """ 227 self._check_policy(policy) 228 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}" 229 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 230 return self.request_util.run_request(uri=uri, method=DELETE)
Remove a user from a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- user (str): The email of the user to be removed.
- policy (str): The policy to be removed from the user.
Must be one of
steward
,custodian
, orsnapshot_creator
.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If the policy is not valid.
232 def delete_dataset(self, dataset_id: str) -> None: 233 """ 234 Delete a dataset and monitors the job until completion. 235 236 **Args:** 237 dataset_id (str): The ID of the dataset to be deleted. 238 """ 239 uri = f"{self.tdr_link}/datasets/{dataset_id}" 240 logging.info(f"Deleting dataset {dataset_id}") 241 response = self.request_util.run_request(uri=uri, method=DELETE) 242 job_id = response.json()['id'] 243 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
Delete a dataset and monitors the job until completion.
Args: dataset_id (str): The ID of the dataset to be deleted.
245 def make_snapshot_public(self, snapshot_id: str) -> requests.Response: 246 """ 247 Make a snapshot public. 248 249 **Args:** 250 - snapshot_id (str): The ID of the snapshot to be made public. 251 252 **Returns:** 253 - requests.Response: The response from the request. 254 """ 255 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public" 256 logging.info(f"Making snapshot {snapshot_id} public") 257 return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")
Make a snapshot public.
Args:
- snapshot_id (str): The ID of the snapshot to be made public.
Returns:
- requests.Response: The response from the request.
259 def get_snapshot_info( 260 self, 261 snapshot_id: str, 262 continue_not_found: bool = False, 263 info_to_include: Optional[list[str]] = None 264 ) -> Optional[requests.Response]: 265 """ 266 Get information about a snapshot. 267 268 **Args:** 269 - snapshot_id (str): The ID of the snapshot. 270 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 271 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 272 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 273 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 274 275 **Returns:** 276 - requests.Response (optional): The response from the request (returns None if the snapshot is not 277 found or access is denied). 278 """ 279 acceptable_return_code = [404, 403] if continue_not_found else [] 280 acceptable_include_info = [ 281 "SOURCES", 282 "TABLES", 283 "RELATIONSHIPS", 284 "ACCESS_INFORMATION", 285 "PROFILE", 286 "PROPERTIES", 287 "DATA_PROJECT", 288 "CREATION_INFORMATION", 289 "DUOS" 290 ] 291 if info_to_include: 292 if not all(info in acceptable_include_info for info in info_to_include): 293 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 294 include_string = '&include='.join(info_to_include) 295 else: 296 include_string = "" 297 uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}" 298 response = self.request_util.run_request( 299 uri=uri, 300 method=GET, 301 accept_return_codes=acceptable_return_code 302 ) 303 if response.status_code == 404: 304 logging.warning(f"Snapshot {snapshot_id} not found") 305 return None 306 if response.status_code == 403: 307 logging.warning(f"Access denied for snapshot {snapshot_id}") 308 return None 309 return response
Get information about a snapshot.
Args:
- snapshot_id (str): The ID of the snapshot.
- continue_not_found (bool, optional): Whether to accept a
404
response. Defaults toFalse
. - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
Options are:
SOURCES
,TABLES
,RELATIONSHIPS
,ACCESS_INFORMATION
,PROFILE
,PROPERTIES
,DATA_PROJECT
,CREATION_INFORMATION
,DUOS
Returns:
- requests.Response (optional): The response from the request (returns None if the snapshot is not found or access is denied).
311 def delete_snapshots( 312 self, 313 snapshot_ids: list[str], 314 batch_size: int = 25, 315 check_interval: int = 10, 316 verbose: bool = False) -> None: 317 """ 318 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 319 320 **Args:** 321 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 322 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 323 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 324 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 325 """ 326 SubmitAndMonitorMultipleJobs( 327 tdr=self, 328 job_function=self.delete_snapshot, 329 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 330 batch_size=batch_size, 331 check_interval=check_interval, 332 verbose=verbose 333 ).run()
Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
Args:
- snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
- batch_size (int, optional): The number of snapshots to delete per batch. Defaults to
25
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
10
. - verbose (bool, optional): Whether to log detailed information about each job. Defaults to
False
.
335 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 336 """ 337 Delete a snapshot. 338 339 **Args:** 340 - snapshot_id (str): The ID of the snapshot to be deleted. 341 342 **Returns:** 343 - requests.Response: The response from the request. 344 """ 345 uri = f"{self.tdr_link}/snapshots/{snapshot_id}" 346 logging.info(f"Deleting snapshot {snapshot_id}") 347 return self.request_util.run_request(uri=uri, method=DELETE)
Delete a snapshot.
Args:
- snapshot_id (str): The ID of the snapshot to be deleted.
Returns:
- requests.Response: The response from the request.
382 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 383 """ 384 Check if a dataset exists by name and optionally by billing profile. 385 386 **Args:** 387 - dataset_name (str): The name of the dataset to check. 388 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 389 390 **Returns:** 391 - list[dict]: A list of matching datasets. 392 """ 393 matching_datasets = [] 394 for dataset in self._yield_existing_datasets(filter=dataset_name): 395 # Search uses wildcard so could grab more datasets where dataset_name is substring 396 if dataset_name == dataset["name"]: 397 if billing_profile: 398 if dataset["defaultProfileId"] == billing_profile: 399 logging.info( 400 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 401 dataset_id = dataset["id"] 402 logging.info(f"Dataset ID: {dataset_id}") 403 matching_datasets.append(dataset) 404 else: 405 logging.warning( 406 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 407 f"and not under billing profile {billing_profile}" 408 ) 409 # Datasets names need to be unique regardless of billing profile, so raise an error if 410 # a dataset with the same name is found but is not under the requested billing profile 411 raise ValueError( 412 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 413 else: 414 matching_datasets.append(dataset) 415 return matching_datasets
Check if a dataset exists by name and optionally by billing profile.
Args:
- dataset_name (str): The name of the dataset to check.
- billing_profile (str, optional): The billing profile ID to match. Defaults to None.
Returns:
- list[dict]: A list of matching datasets.
417 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 418 """ 419 Get information about a dataset. 420 421 **Args:** 422 - dataset_id (str): The ID of the dataset. 423 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 424 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 425 Defaults to None. 426 427 **Returns:** 428 - requests.Response: The response from the request. 429 430 **Raises:** 431 - ValueError: If `info_to_include` contains invalid information types. 432 """ 433 acceptable_include_info = [ 434 "SCHEMA", 435 "ACCESS_INFORMATION", 436 "PROFILE", 437 "PROPERTIES", 438 "DATA_PROJECT", 439 "STORAGE", 440 "SNAPSHOT_BUILDER_SETTING" 441 ] 442 if info_to_include: 443 if not all(info in acceptable_include_info for info in info_to_include): 444 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 445 include_string = '&include='.join(info_to_include) 446 else: 447 include_string = "" 448 uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}" 449 return self.request_util.run_request(uri=uri, method=GET)
Get information about a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- info_to_include (list[str], optional): A list of additional information to include. Valid options include:
SCHEMA
,ACCESS_INFORMATION
,PROFILE
,PROPERTIES
,DATA_PROJECT
,STORAGE
,SNAPSHOT_BUILDER_SETTING
. Defaults to None.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If
info_to_include
contains invalid information types.
451 def get_table_schema_info( 452 self, 453 dataset_id: str, 454 table_name: str, 455 dataset_info: Optional[dict] = None 456 ) -> Union[dict, None]: 457 """ 458 Get schema information for a specific table within a dataset. 459 460 **Args:** 461 - dataset_id (str): The ID of the dataset. 462 - table_name (str): The name of the table. 463 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 464 465 **Returns:** 466 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 467 """ 468 if not dataset_info: 469 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 470 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 471 if table["name"] == table_name: 472 return table 473 return None
Get schema information for a specific table within a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the table.
- dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
Returns:
- Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
475 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 476 """ 477 Retrieve the result of a job. 478 479 **Args:** 480 - job_id (str): The ID of the job. 481 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 482 483 **Returns:** 484 - requests.Response: The response from the request. 485 """ 486 uri = f"{self.tdr_link}/jobs/{job_id}/result" 487 # If job is expected to fail, accept any return code 488 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 489 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
Retrieve the result of a job.
Args:
- job_id (str): The ID of the job.
- expect_failure (bool, optional): Whether the job is expected to fail. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
491 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 492 """ 493 Load data into a TDR dataset. 494 495 **Args:** 496 - dataset_id (str): The ID of the dataset. 497 - data (dict): The data to be ingested. 498 499 **Returns:** 500 - requests.Response: The response from the request. 501 """ 502 uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest" 503 logging.info( 504 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 505 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 506 return self.request_util.run_request( 507 uri=uri, 508 method=POST, 509 content_type=APPLICATION_JSON, 510 data=data 511 )
Load data into a TDR dataset.
Args:
- dataset_id (str): The ID of the dataset.
- data (dict): The data to be ingested.
Returns:
- requests.Response: The response from the request.
513 def file_ingest_to_dataset( 514 self, 515 dataset_id: str, 516 profile_id: str, 517 file_list: list[dict], 518 load_tag: str = "file_ingest_load_tag" 519 ) -> dict: 520 """ 521 Load files into a TDR dataset. 522 523 **Args:** 524 - dataset_id (str): The ID of the dataset. 525 - profile_id (str): The billing profile ID. 526 - file_list (list[dict]): The list of files to be ingested. 527 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 528 529 **Returns:** 530 - dict: A dictionary containing the response from the ingest operation job monitoring. 531 """ 532 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array" 533 data = { 534 "profileId": profile_id, 535 "loadTag": f"{load_tag}", 536 "maxFailedFileLoads": 0, 537 "loadArray": file_list 538 } 539 540 response = self.request_util.run_request( 541 uri=uri, 542 method=POST, 543 content_type=APPLICATION_JSON, 544 data=json.dumps(data) 545 ) 546 job_id = response.json()['id'] 547 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 548 return job_results # type: ignore[return-value]
Load files into a TDR dataset.
Args:
- dataset_id (str): The ID of the dataset.
- profile_id (str): The billing profile ID.
- file_list (list[dict]): The list of files to be ingested.
- load_tag (str): The tag to be used in the ingest job. Defaults to
file_ingest_load_tag
.
Returns:
- dict: A dictionary containing the response from the ingest operation job monitoring.
550 def get_dataset_table_metrics( 551 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 552 ) -> list[dict]: 553 """ 554 Retrieve all metrics for a specific table within a dataset. 555 556 **Args:** 557 - dataset_id (str): The ID of the dataset. 558 - target_table_name (str): The name of the target table. 559 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 560 561 **Returns:** 562 - list[dict]: A list of dictionaries containing the metrics for the specified table. 563 """ 564 return [ 565 metric 566 for metric in self._yield_dataset_metrics( 567 dataset_id=dataset_id, 568 target_table_name=target_table_name, 569 query_limit=query_limit 570 ) 571 ]
Retrieve all metrics for a specific table within a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- target_table_name (str): The name of the target table.
- query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
.
Returns:
- list[dict]: A list of dictionaries containing the metrics for the specified table.
609 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 610 """ 611 Get existing IDs from a dataset. 612 613 **Args:** 614 - dataset_id (str): The ID of the dataset. 615 - target_table_name (str): The name of the target table. 616 - entity_id (str): The entity ID to retrieve. 617 618 **Returns:** 619 - list[str]: A list of entity IDs from the specified table. 620 """ 621 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 622 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
Get existing IDs from a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- target_table_name (str): The name of the target table.
- entity_id (str): The entity ID to retrieve.
Returns:
- list[str]: A list of entity IDs from the specified table.
624 def get_job_status(self, job_id: str) -> requests.Response: 625 """ 626 Retrieve the status of a job. 627 628 **Args:** 629 - job_id (str): The ID of the job. 630 631 **Returns:** 632 - requests.Response: The response from the request. 633 """ 634 uri = f"{self.tdr_link}/jobs/{job_id}" 635 return self.request_util.run_request(uri=uri, method=GET)
Retrieve the status of a job.
Args:
- job_id (str): The ID of the job.
Returns:
- requests.Response: The response from the request.
637 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 638 """ 639 Get all file UUIDs from the metadata of a dataset. 640 641 **Args:** 642 - dataset_id (str): The ID of the dataset. 643 644 **Returns:** 645 - list[str]: A list of file UUIDs from the dataset metadata. 646 """ 647 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 648 all_metadata_file_uuids = [] 649 tables = 0 650 for table in dataset_info["schema"]["tables"]: 651 tables += 1 652 table_name = table["name"] 653 logging.info(f"Getting all file information for {table_name}") 654 # Get just columns where datatype is fileref 655 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 656 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 657 # Get unique list of file uuids 658 file_uuids = list( 659 set( 660 [ 661 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 662 ] 663 ) 664 ) 665 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 666 all_metadata_file_uuids.extend(file_uuids) 667 # Make full list unique 668 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 669 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 670 return all_metadata_file_uuids
Get all file UUIDs from the metadata of a dataset.
Args:
- dataset_id (str): The ID of the dataset.
Returns:
- list[str]: A list of file UUIDs from the dataset metadata.
672 def soft_delete_entries( 673 self, 674 dataset_id: str, 675 table_name: str, 676 datarepo_row_ids: list[str], 677 check_intervals: int = 15 678 ) -> Optional[dict]: 679 """ 680 Soft delete specific records from a table. 681 682 **Args:** 683 - dataset_id (str): The ID of the dataset. 684 - table_name (str): The name of the target table. 685 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 686 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 687 688 **Returns:** 689 - dict (optional): A dictionary containing the response from the soft delete operation job 690 monitoring. Returns None if no row IDs are provided. 691 """ 692 if not datarepo_row_ids: 693 logging.info(f"No records found to soft delete in table {table_name}") 694 return None 695 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 696 uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes" 697 payload = { 698 "deleteType": "soft", 699 "specType": "jsonArray", 700 "tables": [ 701 { 702 "tableName": table_name, 703 "jsonArraySpec": { 704 "rowIds": datarepo_row_ids 705 } 706 } 707 ] 708 } 709 response = self.request_util.run_request( 710 method=POST, 711 uri=uri, 712 data=json.dumps(payload), 713 content_type=APPLICATION_JSON 714 ) 715 job_id = response.json()["id"] 716 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
Soft delete specific records from a table.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the target table.
- datarepo_row_ids (list[str]): A list of row IDs to be deleted.
- check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
Returns:
- dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
718 def soft_delete_all_table_entries( 719 self, 720 dataset_id: str, 721 table_name: str, 722 query_limit: int = 1000, 723 check_intervals: int = 15 724 ) -> Optional[dict]: 725 """ 726 Soft deletes all records in a table. 727 728 **Args:** 729 - dataset_id (str): The ID of the dataset. 730 - table_name (str): The name of the target table. 731 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 732 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 733 734 **Returns:** 735 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 736 None if no row IDs are provided. 737 """ 738 dataset_metrics = self.get_dataset_table_metrics( 739 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 740 ) 741 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 742 return self.soft_delete_entries( 743 dataset_id=dataset_id, 744 table_name=table_name, 745 datarepo_row_ids=row_ids, 746 check_intervals=check_intervals 747 )
Soft deletes all records in a table.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the target table.
- query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
. - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
Returns:
- dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
749 def get_or_create_dataset( 750 self, 751 dataset_name: str, 752 billing_profile: str, 753 schema: dict, 754 description: str, 755 relationships: Optional[list[dict]] = None, 756 delete_existing: bool = False, 757 continue_if_exists: bool = False, 758 additional_properties_dict: Optional[dict] = None 759 ) -> str: 760 """ 761 Get or create a dataset. 762 763 **Args:** 764 - dataset_name (str): The name of the dataset. 765 - billing_profile (str): The billing profile ID. 766 - schema (dict): The schema of the dataset. 767 - description (str): The description of the dataset. 768 - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. 769 Defaults to None. 770 - additional_properties_dict (Optional[dict], optional): Additional properties 771 for the dataset. Defaults to None. 772 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 773 Defaults to `False`. 774 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 775 Defaults to `False`. 776 777 **Returns:** 778 - str: The ID of the dataset. 779 780 **Raises:** 781 - ValueError: If multiple datasets with the same name are found under the billing profile. 782 """ 783 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 784 if existing_datasets: 785 if not continue_if_exists: 786 raise ValueError( 787 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 788 ) 789 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 790 if delete_existing: 791 logging.info(f"Deleting existing dataset {dataset_name}") 792 self.delete_dataset(existing_datasets[0]["id"]) 793 existing_datasets = [] 794 # If not delete_existing and continue_if_exists then grab existing datasets id 795 else: 796 dataset_id = existing_datasets[0]["id"] 797 if not existing_datasets: 798 logging.info("Did not find existing dataset") 799 # Create dataset 800 dataset_id = self.create_dataset( 801 schema=schema, 802 dataset_name=dataset_name, 803 description=description, 804 profile_id=billing_profile, 805 additional_dataset_properties=additional_properties_dict 806 ) 807 return dataset_id
Get or create a dataset.
Args:
- dataset_name (str): The name of the dataset.
- billing_profile (str): The billing profile ID.
- schema (dict): The schema of the dataset.
- description (str): The description of the dataset.
- relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. Defaults to None.
- additional_properties_dict (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
- delete_existing (bool, optional): Whether to delete the existing dataset if found.
Defaults to
False
. - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
Defaults to
False
.
Returns:
- str: The ID of the dataset.
Raises:
- ValueError: If multiple datasets with the same name are found under the billing profile.
809 def create_dataset( # type: ignore[return] 810 self, 811 schema: dict, 812 dataset_name: str, 813 description: str, 814 profile_id: str, 815 additional_dataset_properties: Optional[dict] = None 816 ) -> Optional[str]: 817 """ 818 Create a new dataset. 819 820 **Args:** 821 - schema (dict): The schema of the dataset. 822 - dataset_name (str): The name of the dataset. 823 - description (str): The description of the dataset. 824 - profile_id (str): The billing profile ID. 825 - additional_dataset_properties (Optional[dict], optional): Additional 826 properties for the dataset. Defaults to None. 827 828 **Returns:** 829 - Optional[str]: The ID of the created dataset, or None if creation failed. 830 831 Raises: 832 - ValueError: If the schema validation fails. 833 """ 834 dataset_properties = { 835 "name": dataset_name, 836 "description": description, 837 "defaultProfileId": profile_id, 838 "region": "us-central1", 839 "cloudPlatform": GCP, 840 "schema": schema 841 } 842 843 if additional_dataset_properties: 844 dataset_properties.update(additional_dataset_properties) 845 try: 846 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 847 except ValidationError as e: 848 raise ValueError(f"Schema validation error: {e}") 849 uri = f"{self.tdr_link}/datasets" 850 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 851 response = self.request_util.run_request( 852 method=POST, 853 uri=uri, 854 data=json.dumps(dataset_properties), 855 content_type=APPLICATION_JSON 856 ) 857 job_id = response.json()["id"] 858 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 859 dataset_id = job_results["id"] # type: ignore[index] 860 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 861 return dataset_id
Create a new dataset.
Args:
- schema (dict): The schema of the dataset.
- dataset_name (str): The name of the dataset.
- description (str): The description of the dataset.
- profile_id (str): The billing profile ID.
- additional_dataset_properties (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
Returns:
- Optional[str]: The ID of the created dataset, or None if creation failed.
Raises:
- ValueError: If the schema validation fails.
863 def update_dataset_schema( # type: ignore[return] 864 self, 865 dataset_id: str, 866 update_note: str, 867 tables_to_add: Optional[list[dict]] = None, 868 relationships_to_add: Optional[list[dict]] = None, 869 columns_to_add: Optional[list[dict]] = None 870 ) -> Optional[str]: 871 """ 872 Update the schema of a dataset. 873 874 **Args:** 875 - dataset_id (str): The ID of the dataset. 876 - update_note (str): A note describing the update. 877 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 878 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 879 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 880 881 **Returns:** 882 - Optional[str]: The ID of the updated dataset, or None if the update failed. 883 884 **Raises:** 885 - ValueError: If the schema validation fails. 886 """ 887 uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema" 888 request_body: dict = {"description": f"{update_note}", "changes": {}} 889 if tables_to_add: 890 request_body["changes"]["addTables"] = tables_to_add 891 if relationships_to_add: 892 request_body["changes"]["addRelationships"] = relationships_to_add 893 if columns_to_add: 894 request_body["changes"]["addColumns"] = columns_to_add 895 try: 896 UpdateSchema(**request_body) 897 except ValidationError as e: 898 raise ValueError(f"Schema validation error: {e}") 899 900 response = self.request_util.run_request( 901 uri=uri, 902 method=POST, 903 content_type=APPLICATION_JSON, 904 data=json.dumps(request_body) 905 ) 906 job_id = response.json()["id"] 907 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 908 dataset_id = job_results["id"] # type: ignore[index] 909 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 910 return dataset_id
Update the schema of a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- update_note (str): A note describing the update.
- tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
- relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
- columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
Returns:
- Optional[str]: The ID of the updated dataset, or None if the update failed.
Raises:
- ValueError: If the schema validation fails.
946 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 947 """ 948 Return all the metadata about files in a given snapshot. 949 950 Not all files can be returned at once, so the API 951 is used repeatedly until all "batches" have been returned. 952 953 **Args:** 954 - snapshot_id (str): The ID of the snapshot. 955 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 956 957 **Returns:** 958 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 959 """ 960 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files" 961 return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
Return all the metadata about files in a given snapshot.
Not all files can be returned at once, so the API is used repeatedly until all "batches" have been returned.
Args:
- snapshot_id (str): The ID of the snapshot.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
.
Returns:
- list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
963 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 964 """ 965 Return snapshots belonging to specified dataset. 966 967 **Args:** 968 - dataset_id: uuid of dataset to query. 969 970 **Returns:** 971 - requests.Response: The response from the request. 972 """ 973 uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}" 974 return self.request_util.run_request( 975 uri=uri, 976 method=GET 977 )
Return snapshots belonging to specified dataset.
Args:
- dataset_id: uuid of dataset to query.
Returns:
- requests.Response: The response from the request.
979 def create_snapshot( 980 self, 981 snapshot_name: str, 982 description: str, 983 dataset_name: str, 984 snapshot_mode: str, # byFullView is entire dataset 985 profile_id: str, 986 stewards: Optional[list[str]] = [], 987 readers: Optional[list[str]] = [], 988 consent_code: Optional[str] = None, 989 duos_id: Optional[str] = None, 990 data_access_control_groups: Optional[list[str]] = None, 991 ) -> None: 992 """ 993 Create a snapshot in TDR. 994 995 **Returns:** 996 - requests.Response: The response from the request. 997 """ 998 uri = f"{self.tdr_link}/snapshots" 999 payload = { 1000 "name": snapshot_name, 1001 "description": description, 1002 "contents": [ 1003 { 1004 "datasetName": dataset_name, 1005 "mode": snapshot_mode, 1006 } 1007 ], 1008 "policies": { 1009 "stewards": stewards, 1010 "readers": readers, 1011 }, 1012 "profileId": profile_id, 1013 "globalFileIds": True, 1014 } 1015 if consent_code: 1016 payload["consentCode"] = consent_code 1017 if duos_id: 1018 payload["duosId"] = duos_id 1019 if data_access_control_groups: 1020 payload["dataAccessControlGroups"] = data_access_control_groups 1021 logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}") 1022 response = self.request_util.run_request( 1023 uri=uri, 1024 method=POST, 1025 content_type=APPLICATION_JSON, 1026 data=json.dumps(payload) 1027 ) 1028 job_id = response.json()["id"] 1029 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 1030 snapshot_id = job_results["id"] # type: ignore[index] 1031 logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")
Create a snapshot in TDR.
Returns:
- requests.Response: The response from the request.
1034class FilterOutSampleIdsAlreadyInDataset: 1035 """Class to filter ingest metrics to remove sample IDs that already exist in the dataset.""" 1036 1037 def __init__( 1038 self, 1039 ingest_metrics: list[dict], 1040 dataset_id: str, 1041 tdr: TDR, 1042 target_table_name: str, 1043 filter_entity_id: str 1044 ): 1045 """ 1046 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1047 1048 **Args:** 1049 - ingest_metrics (list[dict]): The metrics to be ingested. 1050 - dataset_id (str): The ID of the dataset. 1051 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1052 - target_table_name (str): The name of the target table. 1053 - filter_entity_id (str): The entity ID to filter on. 1054 """ 1055 self.ingest_metrics = ingest_metrics 1056 """@private""" 1057 self.tdr = tdr 1058 """@private""" 1059 self.dataset_id = dataset_id 1060 """@private""" 1061 self.target_table_name = target_table_name 1062 """@private""" 1063 self.filter_entity_id = filter_entity_id 1064 """@private""" 1065 1066 def run(self) -> list[dict]: 1067 """ 1068 Run the filter process to remove sample IDs that already exist in the dataset. 1069 1070 **Returns:** 1071 - list[dict]: The filtered ingest metrics. 1072 """ 1073 # Get all sample ids that already exist in dataset 1074 logging.info( 1075 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1076 f"dataset {self.dataset_id}" 1077 ) 1078 1079 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1080 dataset_id=self.dataset_id, 1081 target_table_name=self.target_table_name, 1082 entity_id=self.filter_entity_id 1083 ) 1084 # Filter out rows that already exist in dataset 1085 filtered_ingest_metrics = [ 1086 row 1087 for row in self.ingest_metrics 1088 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1089 ] 1090 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1091 logging.info( 1092 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1093 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1094 ) 1095 1096 if filtered_ingest_metrics: 1097 return filtered_ingest_metrics 1098 else: 1099 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1100 return [] 1101 else: 1102 logging.info("No rows were filtered out as they all do not exist in dataset") 1103 return filtered_ingest_metrics
Class to filter ingest metrics to remove sample IDs that already exist in the dataset.
1037 def __init__( 1038 self, 1039 ingest_metrics: list[dict], 1040 dataset_id: str, 1041 tdr: TDR, 1042 target_table_name: str, 1043 filter_entity_id: str 1044 ): 1045 """ 1046 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1047 1048 **Args:** 1049 - ingest_metrics (list[dict]): The metrics to be ingested. 1050 - dataset_id (str): The ID of the dataset. 1051 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1052 - target_table_name (str): The name of the target table. 1053 - filter_entity_id (str): The entity ID to filter on. 1054 """ 1055 self.ingest_metrics = ingest_metrics 1056 """@private""" 1057 self.tdr = tdr 1058 """@private""" 1059 self.dataset_id = dataset_id 1060 """@private""" 1061 self.target_table_name = target_table_name 1062 """@private""" 1063 self.filter_entity_id = filter_entity_id 1064 """@private"""
Initialize the FilterOutSampleIdsAlreadyInDataset class.
Args:
- ingest_metrics (list[dict]): The metrics to be ingested.
- dataset_id (str): The ID of the dataset.
- tdr (
ops_utils.tdr_utils.tdr_utils.TDR
): The TDR instance - target_table_name (str): The name of the target table.
- filter_entity_id (str): The entity ID to filter on.
1066 def run(self) -> list[dict]: 1067 """ 1068 Run the filter process to remove sample IDs that already exist in the dataset. 1069 1070 **Returns:** 1071 - list[dict]: The filtered ingest metrics. 1072 """ 1073 # Get all sample ids that already exist in dataset 1074 logging.info( 1075 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1076 f"dataset {self.dataset_id}" 1077 ) 1078 1079 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1080 dataset_id=self.dataset_id, 1081 target_table_name=self.target_table_name, 1082 entity_id=self.filter_entity_id 1083 ) 1084 # Filter out rows that already exist in dataset 1085 filtered_ingest_metrics = [ 1086 row 1087 for row in self.ingest_metrics 1088 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1089 ] 1090 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1091 logging.info( 1092 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1093 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1094 ) 1095 1096 if filtered_ingest_metrics: 1097 return filtered_ingest_metrics 1098 else: 1099 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1100 return [] 1101 else: 1102 logging.info("No rows were filtered out as they all do not exist in dataset") 1103 return filtered_ingest_metrics
Run the filter process to remove sample IDs that already exist in the dataset.
Returns:
- list[dict]: The filtered ingest metrics.