ops_utils.tdr_utils.tdr_api_utils
Utility classes for interacting with TDR API.
1"""Utility classes for interacting with TDR API.""" 2 3import json 4import logging 5import requests 6import re 7from typing import Any, Optional, Union 8from urllib.parse import unquote 9from pydantic import ValidationError 10 11from ..request_util import GET, POST, DELETE, RunRequest 12from ..tdr_api_schema.create_dataset_schema import CreateDatasetSchema 13from ..tdr_api_schema.update_dataset_schema import UpdateSchema 14from .tdr_job_utils import MonitorTDRJob, SubmitAndMonitorMultipleJobs 15from ..vars import ARG_DEFAULTS, GCP 16 17 18class TDR: 19 """Class to interact with the Terra Data Repository (TDR) API.""" 20 21 TDR_LINK = "https://data.terra.bio/api/repository/v1" 22 """(str): The base URL for the TDR API.""" 23 24 def __init__(self, request_util: RunRequest): 25 """ 26 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 27 28 **Args:** 29 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 30 """ 31 self.request_util = request_util 32 """@private""" 33 34 @staticmethod 35 def _check_policy(policy: str) -> None: 36 """ 37 Check if the policy is valid. 38 39 **Args:** 40 - policy (str): The role to check. 41 42 **Raises:** 43 - ValueError: If the policy is not one of the allowed options. 44 """ 45 if policy not in ["steward", "custodian", "snapshot_creator"]: 46 raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator") 47 48 def get_dataset_files( 49 self, 50 dataset_id: str, 51 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 52 ) -> list[dict]: 53 """ 54 Get all files in a dataset. 55 56 Returns json like below 57 58 { 59 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 60 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 61 "path": "/path/set/in/ingest.csv", 62 "size": 1722, 63 "checksums": [ 64 { 65 "checksum": "82f7e79v", 66 "type": "crc32c" 67 }, 68 { 69 "checksum": "fff973507e30b74fa47a3d6830b84a90", 70 "type": "md5" 71 } 72 ], 73 "created": "2024-13-11T15:01:00.256Z", 74 "description": null, 75 "fileType": "file", 76 "fileDetail": { 77 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 78 "mimeType": null, 79 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 80 "loadTag": "RP_3333-RP_3333" 81 }, 82 "directoryDetail": null 83 } 84 85 **Args:** 86 - dataset_id (str): The ID of the dataset. 87 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 88 89 **Returns:** 90 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 91 """ 92 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files" 93 logging.info(f"Getting all files in dataset {dataset_id}") 94 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 95 96 def create_file_dict( 97 self, 98 dataset_id: str, 99 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 100 ) -> dict: 101 """ 102 Create a dictionary of all files in a dataset where the key is the file UUID. 103 104 **Args:** 105 - dataset_id (str): The ID of the dataset. 106 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 107 108 **Returns:** 109 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 110 """ 111 return { 112 file_dict["fileId"]: file_dict 113 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 114 } 115 116 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 117 self, 118 dataset_id: str, 119 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 120 ) -> dict: 121 """ 122 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 123 124 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 125 126 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 127 128 **Args:** 129 - dataset_id (str): The ID of the dataset. 130 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 131 132 **Returns:** 133 - dict: A dictionary where the key is the file UUID and the value is the file path. 134 """ 135 return { 136 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 137 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 138 } 139 140 def get_sas_token(self, snapshot_id: str = "", dataset_id: str = "") -> dict: 141 """ 142 Get the SAS token for a snapshot OR dataset. Only one should be provided. 143 144 **Args:** 145 - snapshot_id (str, optional): The ID of the snapshot. Defaults to "". 146 - dataset_id (str, optional): The ID of the dataset. Defaults to "". 147 148 **Returns:** 149 - dict: A dictionary containing the SAS token and its expiry time. Expiry 150 time is a string like `2025-02-13T19:31:47Z`. 151 152 **Raises:** 153 - ValueError: If neither `snapshot_id` nor `dataset_id` is provided. 154 """ 155 if snapshot_id: 156 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION" 157 response = self.request_util.run_request(uri=uri, method=GET) 158 snapshot_info = json.loads(response.text) 159 sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"] 160 elif dataset_id: 161 uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include=ACCESS_INFORMATION" 162 response = self.request_util.run_request(uri=uri, method=GET) 163 snapshot_info = json.loads(response.text) 164 sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"] 165 else: 166 raise ValueError("Must provide either snapshot_id or dataset_id") 167 168 sas_expiry_time_pattern = re.compile(r"se.+?(?=\&sp)") 169 expiry_time_str = sas_expiry_time_pattern.search(sas_token) 170 time_str = unquote(expiry_time_str.group()).replace("se=", "") # type: ignore[union-attr] 171 172 return {"sas_token": sas_token, "expiry_time": time_str} 173 174 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 175 """ 176 Delete a file from a dataset. 177 178 **Args:** 179 - file_id (str): The ID of the file to be deleted. 180 - dataset_id (str): The ID of the dataset. 181 182 **Returns:** 183 - requests.Response: The response from the request. 184 """ 185 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/{file_id}" 186 logging.info(f"Submitting delete job for file {file_id}") 187 return self.request_util.run_request(uri=uri, method=DELETE) 188 189 def delete_files( 190 self, 191 file_ids: list[str], 192 dataset_id: str, 193 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 194 check_interval: int = 15) -> None: 195 """ 196 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 197 198 **Args:** 199 - file_ids (list[str]): A list of file IDs to be deleted. 200 - dataset_id (str): The ID of the dataset. 201 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 202 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 203 """ 204 SubmitAndMonitorMultipleJobs( 205 tdr=self, 206 job_function=self.delete_file, 207 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 208 batch_size=batch_size_to_delete_files, 209 check_interval=check_interval 210 ).run() 211 212 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 213 """ 214 Add a user to a dataset with a specified policy. 215 216 **Args:** 217 - dataset_id (str): The ID of the dataset. 218 - user (str): The email of the user to be added. 219 - policy (str): The policy to be assigned to the user. 220 Must be one of `steward`, `custodian`, or `snapshot_creator`. 221 222 **Returns:** 223 - requests.Response: The response from the request. 224 225 **Raises:** 226 - ValueError: If the policy is not valid. 227 """ 228 self._check_policy(policy) 229 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members" 230 member_dict = {"email": user} 231 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 232 return self.request_util.run_request( 233 uri=uri, 234 method=POST, 235 data=json.dumps(member_dict), 236 content_type="application/json" 237 ) 238 239 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 240 """ 241 Remove a user from a dataset. 242 243 **Args:** 244 - dataset_id (str): The ID of the dataset. 245 - user (str): The email of the user to be removed. 246 - policy (str): The policy to be removed from the user. 247 Must be one of `steward`, `custodian`, or `snapshot_creator`. 248 249 **Returns:** 250 - requests.Response: The response from the request. 251 252 **Raises:** 253 - ValueError: If the policy is not valid. 254 """ 255 self._check_policy(policy) 256 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members/{user}" 257 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 258 return self.request_util.run_request(uri=uri, method=DELETE) 259 260 def delete_dataset(self, dataset_id: str) -> None: 261 """ 262 Delete a dataset and monitors the job until completion. 263 264 **Args:** 265 dataset_id (str): The ID of the dataset to be deleted. 266 """ 267 uri = f"{self.TDR_LINK}/datasets/{dataset_id}" 268 logging.info(f"Deleting dataset {dataset_id}") 269 response = self.request_util.run_request(uri=uri, method=DELETE) 270 job_id = response.json()['id'] 271 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run() 272 273 def get_snapshot_info( 274 self, 275 snapshot_id: str, 276 continue_not_found: bool = False, 277 info_to_include: Optional[list[str]] = None 278 ) -> Optional[requests.Response]: 279 """ 280 Get information about a snapshot. 281 282 **Args:** 283 - snapshot_id (str): The ID of the snapshot. 284 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 285 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 286 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 287 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 288 289 **Returns:** 290 - requests.Response (optional): The response from the request (returns None if the snapshot is not 291 found or access is denied). 292 """ 293 acceptable_return_code = [404, 403] if continue_not_found else [] 294 acceptable_include_info = [ 295 "SOURCES", 296 "TABLES", 297 "RELATIONSHIPS", 298 "ACCESS_INFORMATION", 299 "PROFILE", 300 "PROPERTIES", 301 "DATA_PROJECT", 302 "CREATION_INFORMATION", 303 "DUOS" 304 ] 305 if info_to_include: 306 if not all(info in acceptable_include_info for info in info_to_include): 307 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 308 include_string = '&include='.join(info_to_include) 309 else: 310 include_string = "" 311 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include={include_string}" 312 response = self.request_util.run_request( 313 uri=uri, 314 method=GET, 315 accept_return_codes=acceptable_return_code 316 ) 317 if response.status_code == 404: 318 logging.warning(f"Snapshot {snapshot_id} not found") 319 return None 320 if response.status_code == 403: 321 logging.warning(f"Access denied for snapshot {snapshot_id}") 322 return None 323 return response 324 325 def delete_snapshots( 326 self, 327 snapshot_ids: list[str], 328 batch_size: int = 25, 329 check_interval: int = 10, 330 verbose: bool = False) -> None: 331 """ 332 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 333 334 **Args:** 335 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 336 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 337 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 338 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 339 """ 340 SubmitAndMonitorMultipleJobs( 341 tdr=self, 342 job_function=self.delete_snapshot, 343 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 344 batch_size=batch_size, 345 check_interval=check_interval, 346 verbose=verbose 347 ).run() 348 349 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 350 """ 351 Delete a snapshot. 352 353 **Args:** 354 - snapshot_id (str): The ID of the snapshot to be deleted. 355 356 **Returns:** 357 - requests.Response: The response from the request. 358 """ 359 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}" 360 logging.info(f"Deleting snapshot {snapshot_id}") 361 return self.request_util.run_request(uri=uri, method=DELETE) 362 363 def _yield_existing_datasets( 364 self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc" 365 ) -> Any: 366 """ 367 Get all datasets in TDR, optionally filtered by dataset name. 368 369 **Args:** 370 filter (Optional[str]): A filter string to match dataset names. Defaults to None. 371 batch_size (int): The number of datasets to retrieve per batch. Defaults to 100. 372 direction (str): The direction to sort the datasets by creation date. Defaults to "asc". 373 374 Yields: 375 Any: A generator yielding datasets. 376 """ 377 offset = 0 378 if filter: 379 filter_str = f"&filter={filter}" 380 log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}" 381 else: 382 filter_str = "" 383 log_message = f"Searching for all datasets in batches of {batch_size}" 384 logging.info(log_message) 385 while True: 386 uri = f"{self.TDR_LINK}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501 387 response = self.request_util.run_request(uri=uri, method=GET) 388 datasets = response.json()["items"] 389 if not datasets: 390 break 391 for dataset in datasets: 392 yield dataset 393 offset += batch_size 394 break 395 396 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 397 """ 398 Check if a dataset exists by name and optionally by billing profile. 399 400 **Args:** 401 - dataset_name (str): The name of the dataset to check. 402 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 403 404 **Returns:** 405 - list[dict]: A list of matching datasets. 406 """ 407 matching_datasets = [] 408 for dataset in self._yield_existing_datasets(filter=dataset_name): 409 # Search uses wildcard so could grab more datasets where dataset_name is substring 410 if dataset_name == dataset["name"]: 411 if billing_profile: 412 if dataset["defaultProfileId"] == billing_profile: 413 logging.info( 414 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 415 dataset_id = dataset["id"] 416 logging.info(f"Dataset ID: {dataset_id}") 417 matching_datasets.append(dataset) 418 else: 419 logging.warning( 420 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 421 f"and not under billing profile {billing_profile}" 422 ) 423 # Datasets names need to be unique regardless of billing profile, so raise an error if 424 # a dataset with the same name is found but is not under the requested billing profile 425 raise ValueError( 426 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 427 else: 428 matching_datasets.append(dataset) 429 return matching_datasets 430 431 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 432 """ 433 Get information about a dataset. 434 435 **Args:** 436 - dataset_id (str): The ID of the dataset. 437 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 438 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 439 Defaults to None. 440 441 **Returns:** 442 - requests.Response: The response from the request. 443 444 **Raises:** 445 - ValueError: If `info_to_include` contains invalid information types. 446 """ 447 acceptable_include_info = [ 448 "SCHEMA", 449 "ACCESS_INFORMATION", 450 "PROFILE", 451 "PROPERTIES", 452 "DATA_PROJECT", 453 "STORAGE", 454 "SNAPSHOT_BUILDER_SETTING" 455 ] 456 if info_to_include: 457 if not all(info in acceptable_include_info for info in info_to_include): 458 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 459 include_string = '&include='.join(info_to_include) 460 else: 461 include_string = "" 462 uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include={include_string}" 463 return self.request_util.run_request(uri=uri, method=GET) 464 465 def get_table_schema_info( 466 self, 467 dataset_id: str, 468 table_name: str, 469 dataset_info: Optional[dict] = None 470 ) -> Union[dict, None]: 471 """ 472 Get schema information for a specific table within a dataset. 473 474 **Args:** 475 - dataset_id (str): The ID of the dataset. 476 - table_name (str): The name of the table. 477 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 478 479 **Returns:** 480 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 481 """ 482 if not dataset_info: 483 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 484 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 485 if table["name"] == table_name: 486 return table 487 return None 488 489 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 490 """ 491 Retrieve the result of a job. 492 493 **Args:** 494 - job_id (str): The ID of the job. 495 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 496 497 **Returns:** 498 - requests.Response: The response from the request. 499 """ 500 uri = f"{self.TDR_LINK}/jobs/{job_id}/result" 501 # If job is expected to fail, accept any return code 502 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 503 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code) 504 505 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 506 """ 507 Load data into a TDR dataset. 508 509 **Args:** 510 - dataset_id (str): The ID of the dataset. 511 - data (dict): The data to be ingested. 512 513 **Returns:** 514 - requests.Response: The response from the request. 515 """ 516 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/ingest" 517 logging.info( 518 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 519 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 520 return self.request_util.run_request( 521 uri=uri, 522 method=POST, 523 content_type="application/json", 524 data=data 525 ) 526 527 def file_ingest_to_dataset( 528 self, 529 dataset_id: str, 530 profile_id: str, 531 file_list: list[dict], 532 load_tag: str = "file_ingest_load_tag" 533 ) -> dict: 534 """ 535 Load files into a TDR dataset. 536 537 **Args:** 538 - dataset_id (str): The ID of the dataset. 539 - profile_id (str): The billing profile ID. 540 - file_list (list[dict]): The list of files to be ingested. 541 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 542 543 **Returns:** 544 - dict: A dictionary containing the response from the ingest operation job monitoring. 545 """ 546 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/bulk/array" 547 data = { 548 "profileId": profile_id, 549 "loadTag": f"{load_tag}", 550 "maxFailedFileLoads": 0, 551 "loadArray": file_list 552 } 553 554 response = self.request_util.run_request( 555 uri=uri, 556 method=POST, 557 content_type="application/json", 558 data=json.dumps(data) 559 ) 560 job_id = response.json()['id'] 561 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 562 return job_results # type: ignore[return-value] 563 564 def get_dataset_table_metrics( 565 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 566 ) -> list[dict]: 567 """ 568 Retrieve all metrics for a specific table within a dataset. 569 570 **Args:** 571 - dataset_id (str): The ID of the dataset. 572 - target_table_name (str): The name of the target table. 573 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 574 575 **Returns:** 576 - list[dict]: A list of dictionaries containing the metrics for the specified table. 577 """ 578 return [ 579 metric 580 for metric in self._yield_dataset_metrics( 581 dataset_id=dataset_id, 582 target_table_name=target_table_name, 583 query_limit=query_limit 584 ) 585 ] 586 587 def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any: 588 """ 589 Yield all entity metrics from a dataset. 590 591 **Args:** 592 dataset_id (str): The ID of the dataset. 593 target_table_name (str): The name of the target table. 594 query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000. 595 596 Yields: 597 Any: A generator yielding dictionaries containing the metrics for the specified table. 598 """ 599 search_request = { 600 "offset": 0, 601 "limit": query_limit, 602 "sort": "datarepo_row_id" 603 } 604 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/data/{target_table_name}" 605 while True: 606 batch_number = int((search_request["offset"] / query_limit)) + 1 # type: ignore[operator] 607 response = self.request_util.run_request( 608 uri=uri, 609 method=POST, 610 content_type="application/json", 611 data=json.dumps(search_request) 612 ) 613 if not response or not response.json()["result"]: 614 break 615 logging.info( 616 f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " + 617 f"dataset {dataset_id}" 618 ) 619 for record in response.json()["result"]: 620 yield record 621 search_request["offset"] += query_limit # type: ignore[operator] 622 623 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 624 """ 625 Get existing IDs from a dataset. 626 627 **Args:** 628 - dataset_id (str): The ID of the dataset. 629 - target_table_name (str): The name of the target table. 630 - entity_id (str): The entity ID to retrieve. 631 632 **Returns:** 633 - list[str]: A list of entity IDs from the specified table. 634 """ 635 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 636 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata] 637 638 def get_job_status(self, job_id: str) -> requests.Response: 639 """ 640 Retrieve the status of a job. 641 642 **Args:** 643 - job_id (str): The ID of the job. 644 645 **Returns:** 646 - requests.Response: The response from the request. 647 """ 648 uri = f"{self.TDR_LINK}/jobs/{job_id}" 649 return self.request_util.run_request(uri=uri, method=GET) 650 651 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 652 """ 653 Get all file UUIDs from the metadata of a dataset. 654 655 **Args:** 656 - dataset_id (str): The ID of the dataset. 657 658 **Returns:** 659 - list[str]: A list of file UUIDs from the dataset metadata. 660 """ 661 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 662 all_metadata_file_uuids = [] 663 tables = 0 664 for table in dataset_info["schema"]["tables"]: 665 tables += 1 666 table_name = table["name"] 667 logging.info(f"Getting all file information for {table_name}") 668 # Get just columns where datatype is fileref 669 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 670 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 671 # Get unique list of file uuids 672 file_uuids = list( 673 set( 674 [ 675 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 676 ] 677 ) 678 ) 679 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 680 all_metadata_file_uuids.extend(file_uuids) 681 # Make full list unique 682 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 683 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 684 return all_metadata_file_uuids 685 686 def soft_delete_entries( 687 self, 688 dataset_id: str, 689 table_name: str, 690 datarepo_row_ids: list[str], 691 check_intervals: int = 15 692 ) -> Optional[dict]: 693 """ 694 Soft delete specific records from a table. 695 696 **Args:** 697 - dataset_id (str): The ID of the dataset. 698 - table_name (str): The name of the target table. 699 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 700 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 701 702 **Returns:** 703 - dict (optional): A dictionary containing the response from the soft delete operation job 704 monitoring. Returns None if no row IDs are provided. 705 """ 706 if not datarepo_row_ids: 707 logging.info(f"No records found to soft delete in table {table_name}") 708 return None 709 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 710 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/deletes" 711 payload = { 712 "deleteType": "soft", 713 "specType": "jsonArray", 714 "tables": [ 715 { 716 "tableName": table_name, 717 "jsonArraySpec": { 718 "rowIds": datarepo_row_ids 719 } 720 } 721 ] 722 } 723 response = self.request_util.run_request( 724 method=POST, 725 uri=uri, 726 data=json.dumps(payload), 727 content_type="application/json" 728 ) 729 job_id = response.json()["id"] 730 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run() 731 732 def soft_delete_all_table_entries( 733 self, 734 dataset_id: str, 735 table_name: str, 736 query_limit: int = 1000, 737 check_intervals: int = 15 738 ) -> Optional[dict]: 739 """ 740 Soft deletes all records in a table. 741 742 **Args:** 743 - dataset_id (str): The ID of the dataset. 744 - table_name (str): The name of the target table. 745 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 746 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 747 748 **Returns:** 749 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 750 None if no row IDs are provided. 751 """ 752 dataset_metrics = self.get_dataset_table_metrics( 753 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 754 ) 755 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 756 return self.soft_delete_entries( 757 dataset_id=dataset_id, 758 table_name=table_name, 759 datarepo_row_ids=row_ids, 760 check_intervals=check_intervals 761 ) 762 763 def get_or_create_dataset( 764 self, 765 dataset_name: str, 766 billing_profile: str, 767 schema: dict, 768 description: str, 769 delete_existing: bool = False, 770 continue_if_exists: bool = False, 771 additional_properties_dict: Optional[dict] = None 772 ) -> str: 773 """ 774 Get or create a dataset. 775 776 **Args:** 777 - dataset_name (str): The name of the dataset. 778 - billing_profile (str): The billing profile ID. 779 - schema (dict): The schema of the dataset. 780 - description (str): The description of the dataset. 781 - additional_properties_dict (Optional[dict], optional): Additional properties 782 for the dataset. Defaults to None. 783 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 784 Defaults to `False`. 785 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 786 Defaults to `False`. 787 788 **Returns:** 789 - str: The ID of the dataset. 790 791 **Raises:** 792 - ValueError: If multiple datasets with the same name are found under the billing profile. 793 """ 794 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 795 if existing_datasets: 796 if not continue_if_exists: 797 raise ValueError( 798 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 799 ) 800 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 801 if delete_existing: 802 logging.info(f"Deleting existing dataset {dataset_name}") 803 self.delete_dataset(existing_datasets[0]["id"]) 804 existing_datasets = [] 805 # If not delete_existing and continue_if_exists then grab existing datasets id 806 else: 807 dataset_id = existing_datasets[0]["id"] 808 if not existing_datasets: 809 logging.info("Did not find existing dataset") 810 # Create dataset 811 dataset_id = self.create_dataset( 812 schema=schema, 813 dataset_name=dataset_name, 814 description=description, 815 profile_id=billing_profile, 816 additional_dataset_properties=additional_properties_dict 817 ) 818 return dataset_id 819 820 def create_dataset( # type: ignore[return] 821 self, 822 schema: dict, 823 dataset_name: str, 824 description: str, 825 profile_id: str, 826 additional_dataset_properties: Optional[dict] = None 827 ) -> Optional[str]: 828 """ 829 Create a new dataset. 830 831 **Args:** 832 - schema (dict): The schema of the dataset. 833 - dataset_name (str): The name of the dataset. 834 - description (str): The description of the dataset. 835 - profile_id (str): The billing profile ID. 836 - additional_dataset_properties (Optional[dict], optional): Additional 837 properties for the dataset. Defaults to None. 838 839 **Returns:** 840 - Optional[str]: The ID of the created dataset, or None if creation failed. 841 842 Raises: 843 - ValueError: If the schema validation fails. 844 """ 845 dataset_properties = { 846 "name": dataset_name, 847 "description": description, 848 "defaultProfileId": profile_id, 849 "region": "us-central1", 850 "cloudPlatform": GCP, 851 "schema": schema 852 } 853 854 if additional_dataset_properties: 855 dataset_properties.update(additional_dataset_properties) 856 try: 857 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 858 except ValidationError as e: 859 raise ValueError(f"Schema validation error: {e}") 860 uri = f"{self.TDR_LINK}/datasets" 861 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 862 response = self.request_util.run_request( 863 method=POST, 864 uri=uri, 865 data=json.dumps(dataset_properties), 866 content_type="application/json" 867 ) 868 job_id = response.json()["id"] 869 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 870 dataset_id = job_results["id"] # type: ignore[index] 871 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 872 return dataset_id 873 874 def update_dataset_schema( # type: ignore[return] 875 self, 876 dataset_id: str, 877 update_note: str, 878 tables_to_add: Optional[list[dict]] = None, 879 relationships_to_add: Optional[list[dict]] = None, 880 columns_to_add: Optional[list[dict]] = None 881 ) -> Optional[str]: 882 """ 883 Update the schema of a dataset. 884 885 **Args:** 886 - dataset_id (str): The ID of the dataset. 887 - update_note (str): A note describing the update. 888 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 889 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 890 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 891 892 **Returns:** 893 - Optional[str]: The ID of the updated dataset, or None if the update failed. 894 895 **Raises:** 896 - ValueError: If the schema validation fails. 897 """ 898 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/updateSchema" 899 request_body: dict = {"description": f"{update_note}", "changes": {}} 900 if tables_to_add: 901 request_body["changes"]["addTables"] = tables_to_add 902 if relationships_to_add: 903 request_body["changes"]["addRelationships"] = relationships_to_add 904 if columns_to_add: 905 request_body["changes"]["addColumns"] = columns_to_add 906 try: 907 UpdateSchema(**request_body) 908 except ValidationError as e: 909 raise ValueError(f"Schema validation error: {e}") 910 911 response = self.request_util.run_request( 912 uri=uri, 913 method=POST, 914 content_type="application/json", 915 data=json.dumps(request_body) 916 ) 917 job_id = response.json()["id"] 918 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 919 dataset_id = job_results["id"] # type: ignore[index] 920 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 921 return dataset_id 922 923 def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]: 924 """ 925 Get response from a batched endpoint. 926 927 Helper method for all GET endpoints that require batching. 928 929 Given the URI and the limit (optional), will 930 loop through batches until all metadata is retrieved. 931 932 **Args:** 933 - uri (str): The base URI for the endpoint (without query params for offset or limit). 934 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 935 936 **Returns:** 937 - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint. 938 """ 939 batch = 1 940 offset = 0 941 metadata: list = [] 942 while True: 943 logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata") 944 response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json() 945 946 # If no more files, break the loop 947 if not response_json: 948 logging.info(f"No more results to retrieve, found {len(metadata)} total records") 949 break 950 951 metadata.extend(response_json) 952 # Increment the offset by limit for the next page 953 offset += limit 954 batch += 1 955 return metadata 956 957 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 958 """ 959 Return all the metadata about files in a given snapshot. 960 961 Not all files can be returned at once, so the API 962 is used repeatedly until all "batches" have been returned. 963 964 **Args:** 965 - snapshot_id (str): The ID of the snapshot. 966 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 967 968 **Returns:** 969 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 970 """ 971 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}/files" 972 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 973 974 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 975 """ 976 Return snapshots belonging to specified dataset. 977 978 **Args:** 979 - dataset_id: uuid of dataset to query. 980 981 **Returns:** 982 - requests.Response: The response from the request. 983 """ 984 uri = f"{self.TDR_LINK}/snapshots?datasetIds={dataset_id}" 985 return self.request_util.run_request( 986 uri=uri, 987 method=GET 988 ) 989 990 991class FilterOutSampleIdsAlreadyInDataset: 992 """Class to filter ingest metrics to remove sample IDs that already exist in the dataset.""" 993 994 def __init__( 995 self, 996 ingest_metrics: list[dict], 997 dataset_id: str, 998 tdr: TDR, 999 target_table_name: str, 1000 filter_entity_id: str 1001 ): 1002 """ 1003 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1004 1005 **Args:** 1006 - ingest_metrics (list[dict]): The metrics to be ingested. 1007 - dataset_id (str): The ID of the dataset. 1008 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1009 - target_table_name (str): The name of the target table. 1010 - filter_entity_id (str): The entity ID to filter on. 1011 """ 1012 self.ingest_metrics = ingest_metrics 1013 """@private""" 1014 self.tdr = tdr 1015 """@private""" 1016 self.dataset_id = dataset_id 1017 """@private""" 1018 self.target_table_name = target_table_name 1019 """@private""" 1020 self.filter_entity_id = filter_entity_id 1021 """@private""" 1022 1023 def run(self) -> list[dict]: 1024 """ 1025 Run the filter process to remove sample IDs that already exist in the dataset. 1026 1027 **Returns:** 1028 - list[dict]: The filtered ingest metrics. 1029 """ 1030 # Get all sample ids that already exist in dataset 1031 logging.info( 1032 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1033 f"dataset {self.dataset_id}" 1034 ) 1035 1036 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1037 dataset_id=self.dataset_id, 1038 target_table_name=self.target_table_name, 1039 entity_id=self.filter_entity_id 1040 ) 1041 # Filter out rows that already exist in dataset 1042 filtered_ingest_metrics = [ 1043 row 1044 for row in self.ingest_metrics 1045 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1046 ] 1047 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1048 logging.info( 1049 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1050 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1051 ) 1052 1053 if filtered_ingest_metrics: 1054 return filtered_ingest_metrics 1055 else: 1056 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1057 return [] 1058 else: 1059 logging.info("No rows were filtered out as they all do not exist in dataset") 1060 return filtered_ingest_metrics
19class TDR: 20 """Class to interact with the Terra Data Repository (TDR) API.""" 21 22 TDR_LINK = "https://data.terra.bio/api/repository/v1" 23 """(str): The base URL for the TDR API.""" 24 25 def __init__(self, request_util: RunRequest): 26 """ 27 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 28 29 **Args:** 30 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 31 """ 32 self.request_util = request_util 33 """@private""" 34 35 @staticmethod 36 def _check_policy(policy: str) -> None: 37 """ 38 Check if the policy is valid. 39 40 **Args:** 41 - policy (str): The role to check. 42 43 **Raises:** 44 - ValueError: If the policy is not one of the allowed options. 45 """ 46 if policy not in ["steward", "custodian", "snapshot_creator"]: 47 raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator") 48 49 def get_dataset_files( 50 self, 51 dataset_id: str, 52 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 53 ) -> list[dict]: 54 """ 55 Get all files in a dataset. 56 57 Returns json like below 58 59 { 60 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 61 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 62 "path": "/path/set/in/ingest.csv", 63 "size": 1722, 64 "checksums": [ 65 { 66 "checksum": "82f7e79v", 67 "type": "crc32c" 68 }, 69 { 70 "checksum": "fff973507e30b74fa47a3d6830b84a90", 71 "type": "md5" 72 } 73 ], 74 "created": "2024-13-11T15:01:00.256Z", 75 "description": null, 76 "fileType": "file", 77 "fileDetail": { 78 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 79 "mimeType": null, 80 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 81 "loadTag": "RP_3333-RP_3333" 82 }, 83 "directoryDetail": null 84 } 85 86 **Args:** 87 - dataset_id (str): The ID of the dataset. 88 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 89 90 **Returns:** 91 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 92 """ 93 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files" 94 logging.info(f"Getting all files in dataset {dataset_id}") 95 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 96 97 def create_file_dict( 98 self, 99 dataset_id: str, 100 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 101 ) -> dict: 102 """ 103 Create a dictionary of all files in a dataset where the key is the file UUID. 104 105 **Args:** 106 - dataset_id (str): The ID of the dataset. 107 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 108 109 **Returns:** 110 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 111 """ 112 return { 113 file_dict["fileId"]: file_dict 114 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 115 } 116 117 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 118 self, 119 dataset_id: str, 120 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 121 ) -> dict: 122 """ 123 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 124 125 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 126 127 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 128 129 **Args:** 130 - dataset_id (str): The ID of the dataset. 131 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 132 133 **Returns:** 134 - dict: A dictionary where the key is the file UUID and the value is the file path. 135 """ 136 return { 137 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 138 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 139 } 140 141 def get_sas_token(self, snapshot_id: str = "", dataset_id: str = "") -> dict: 142 """ 143 Get the SAS token for a snapshot OR dataset. Only one should be provided. 144 145 **Args:** 146 - snapshot_id (str, optional): The ID of the snapshot. Defaults to "". 147 - dataset_id (str, optional): The ID of the dataset. Defaults to "". 148 149 **Returns:** 150 - dict: A dictionary containing the SAS token and its expiry time. Expiry 151 time is a string like `2025-02-13T19:31:47Z`. 152 153 **Raises:** 154 - ValueError: If neither `snapshot_id` nor `dataset_id` is provided. 155 """ 156 if snapshot_id: 157 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION" 158 response = self.request_util.run_request(uri=uri, method=GET) 159 snapshot_info = json.loads(response.text) 160 sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"] 161 elif dataset_id: 162 uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include=ACCESS_INFORMATION" 163 response = self.request_util.run_request(uri=uri, method=GET) 164 snapshot_info = json.loads(response.text) 165 sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"] 166 else: 167 raise ValueError("Must provide either snapshot_id or dataset_id") 168 169 sas_expiry_time_pattern = re.compile(r"se.+?(?=\&sp)") 170 expiry_time_str = sas_expiry_time_pattern.search(sas_token) 171 time_str = unquote(expiry_time_str.group()).replace("se=", "") # type: ignore[union-attr] 172 173 return {"sas_token": sas_token, "expiry_time": time_str} 174 175 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 176 """ 177 Delete a file from a dataset. 178 179 **Args:** 180 - file_id (str): The ID of the file to be deleted. 181 - dataset_id (str): The ID of the dataset. 182 183 **Returns:** 184 - requests.Response: The response from the request. 185 """ 186 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/{file_id}" 187 logging.info(f"Submitting delete job for file {file_id}") 188 return self.request_util.run_request(uri=uri, method=DELETE) 189 190 def delete_files( 191 self, 192 file_ids: list[str], 193 dataset_id: str, 194 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 195 check_interval: int = 15) -> None: 196 """ 197 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 198 199 **Args:** 200 - file_ids (list[str]): A list of file IDs to be deleted. 201 - dataset_id (str): The ID of the dataset. 202 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 203 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 204 """ 205 SubmitAndMonitorMultipleJobs( 206 tdr=self, 207 job_function=self.delete_file, 208 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 209 batch_size=batch_size_to_delete_files, 210 check_interval=check_interval 211 ).run() 212 213 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 214 """ 215 Add a user to a dataset with a specified policy. 216 217 **Args:** 218 - dataset_id (str): The ID of the dataset. 219 - user (str): The email of the user to be added. 220 - policy (str): The policy to be assigned to the user. 221 Must be one of `steward`, `custodian`, or `snapshot_creator`. 222 223 **Returns:** 224 - requests.Response: The response from the request. 225 226 **Raises:** 227 - ValueError: If the policy is not valid. 228 """ 229 self._check_policy(policy) 230 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members" 231 member_dict = {"email": user} 232 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 233 return self.request_util.run_request( 234 uri=uri, 235 method=POST, 236 data=json.dumps(member_dict), 237 content_type="application/json" 238 ) 239 240 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 241 """ 242 Remove a user from a dataset. 243 244 **Args:** 245 - dataset_id (str): The ID of the dataset. 246 - user (str): The email of the user to be removed. 247 - policy (str): The policy to be removed from the user. 248 Must be one of `steward`, `custodian`, or `snapshot_creator`. 249 250 **Returns:** 251 - requests.Response: The response from the request. 252 253 **Raises:** 254 - ValueError: If the policy is not valid. 255 """ 256 self._check_policy(policy) 257 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members/{user}" 258 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 259 return self.request_util.run_request(uri=uri, method=DELETE) 260 261 def delete_dataset(self, dataset_id: str) -> None: 262 """ 263 Delete a dataset and monitors the job until completion. 264 265 **Args:** 266 dataset_id (str): The ID of the dataset to be deleted. 267 """ 268 uri = f"{self.TDR_LINK}/datasets/{dataset_id}" 269 logging.info(f"Deleting dataset {dataset_id}") 270 response = self.request_util.run_request(uri=uri, method=DELETE) 271 job_id = response.json()['id'] 272 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run() 273 274 def get_snapshot_info( 275 self, 276 snapshot_id: str, 277 continue_not_found: bool = False, 278 info_to_include: Optional[list[str]] = None 279 ) -> Optional[requests.Response]: 280 """ 281 Get information about a snapshot. 282 283 **Args:** 284 - snapshot_id (str): The ID of the snapshot. 285 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 286 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 287 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 288 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 289 290 **Returns:** 291 - requests.Response (optional): The response from the request (returns None if the snapshot is not 292 found or access is denied). 293 """ 294 acceptable_return_code = [404, 403] if continue_not_found else [] 295 acceptable_include_info = [ 296 "SOURCES", 297 "TABLES", 298 "RELATIONSHIPS", 299 "ACCESS_INFORMATION", 300 "PROFILE", 301 "PROPERTIES", 302 "DATA_PROJECT", 303 "CREATION_INFORMATION", 304 "DUOS" 305 ] 306 if info_to_include: 307 if not all(info in acceptable_include_info for info in info_to_include): 308 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 309 include_string = '&include='.join(info_to_include) 310 else: 311 include_string = "" 312 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include={include_string}" 313 response = self.request_util.run_request( 314 uri=uri, 315 method=GET, 316 accept_return_codes=acceptable_return_code 317 ) 318 if response.status_code == 404: 319 logging.warning(f"Snapshot {snapshot_id} not found") 320 return None 321 if response.status_code == 403: 322 logging.warning(f"Access denied for snapshot {snapshot_id}") 323 return None 324 return response 325 326 def delete_snapshots( 327 self, 328 snapshot_ids: list[str], 329 batch_size: int = 25, 330 check_interval: int = 10, 331 verbose: bool = False) -> None: 332 """ 333 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 334 335 **Args:** 336 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 337 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 338 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 339 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 340 """ 341 SubmitAndMonitorMultipleJobs( 342 tdr=self, 343 job_function=self.delete_snapshot, 344 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 345 batch_size=batch_size, 346 check_interval=check_interval, 347 verbose=verbose 348 ).run() 349 350 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 351 """ 352 Delete a snapshot. 353 354 **Args:** 355 - snapshot_id (str): The ID of the snapshot to be deleted. 356 357 **Returns:** 358 - requests.Response: The response from the request. 359 """ 360 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}" 361 logging.info(f"Deleting snapshot {snapshot_id}") 362 return self.request_util.run_request(uri=uri, method=DELETE) 363 364 def _yield_existing_datasets( 365 self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc" 366 ) -> Any: 367 """ 368 Get all datasets in TDR, optionally filtered by dataset name. 369 370 **Args:** 371 filter (Optional[str]): A filter string to match dataset names. Defaults to None. 372 batch_size (int): The number of datasets to retrieve per batch. Defaults to 100. 373 direction (str): The direction to sort the datasets by creation date. Defaults to "asc". 374 375 Yields: 376 Any: A generator yielding datasets. 377 """ 378 offset = 0 379 if filter: 380 filter_str = f"&filter={filter}" 381 log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}" 382 else: 383 filter_str = "" 384 log_message = f"Searching for all datasets in batches of {batch_size}" 385 logging.info(log_message) 386 while True: 387 uri = f"{self.TDR_LINK}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501 388 response = self.request_util.run_request(uri=uri, method=GET) 389 datasets = response.json()["items"] 390 if not datasets: 391 break 392 for dataset in datasets: 393 yield dataset 394 offset += batch_size 395 break 396 397 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 398 """ 399 Check if a dataset exists by name and optionally by billing profile. 400 401 **Args:** 402 - dataset_name (str): The name of the dataset to check. 403 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 404 405 **Returns:** 406 - list[dict]: A list of matching datasets. 407 """ 408 matching_datasets = [] 409 for dataset in self._yield_existing_datasets(filter=dataset_name): 410 # Search uses wildcard so could grab more datasets where dataset_name is substring 411 if dataset_name == dataset["name"]: 412 if billing_profile: 413 if dataset["defaultProfileId"] == billing_profile: 414 logging.info( 415 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 416 dataset_id = dataset["id"] 417 logging.info(f"Dataset ID: {dataset_id}") 418 matching_datasets.append(dataset) 419 else: 420 logging.warning( 421 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 422 f"and not under billing profile {billing_profile}" 423 ) 424 # Datasets names need to be unique regardless of billing profile, so raise an error if 425 # a dataset with the same name is found but is not under the requested billing profile 426 raise ValueError( 427 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 428 else: 429 matching_datasets.append(dataset) 430 return matching_datasets 431 432 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 433 """ 434 Get information about a dataset. 435 436 **Args:** 437 - dataset_id (str): The ID of the dataset. 438 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 439 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 440 Defaults to None. 441 442 **Returns:** 443 - requests.Response: The response from the request. 444 445 **Raises:** 446 - ValueError: If `info_to_include` contains invalid information types. 447 """ 448 acceptable_include_info = [ 449 "SCHEMA", 450 "ACCESS_INFORMATION", 451 "PROFILE", 452 "PROPERTIES", 453 "DATA_PROJECT", 454 "STORAGE", 455 "SNAPSHOT_BUILDER_SETTING" 456 ] 457 if info_to_include: 458 if not all(info in acceptable_include_info for info in info_to_include): 459 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 460 include_string = '&include='.join(info_to_include) 461 else: 462 include_string = "" 463 uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include={include_string}" 464 return self.request_util.run_request(uri=uri, method=GET) 465 466 def get_table_schema_info( 467 self, 468 dataset_id: str, 469 table_name: str, 470 dataset_info: Optional[dict] = None 471 ) -> Union[dict, None]: 472 """ 473 Get schema information for a specific table within a dataset. 474 475 **Args:** 476 - dataset_id (str): The ID of the dataset. 477 - table_name (str): The name of the table. 478 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 479 480 **Returns:** 481 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 482 """ 483 if not dataset_info: 484 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 485 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 486 if table["name"] == table_name: 487 return table 488 return None 489 490 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 491 """ 492 Retrieve the result of a job. 493 494 **Args:** 495 - job_id (str): The ID of the job. 496 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 497 498 **Returns:** 499 - requests.Response: The response from the request. 500 """ 501 uri = f"{self.TDR_LINK}/jobs/{job_id}/result" 502 # If job is expected to fail, accept any return code 503 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 504 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code) 505 506 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 507 """ 508 Load data into a TDR dataset. 509 510 **Args:** 511 - dataset_id (str): The ID of the dataset. 512 - data (dict): The data to be ingested. 513 514 **Returns:** 515 - requests.Response: The response from the request. 516 """ 517 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/ingest" 518 logging.info( 519 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 520 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 521 return self.request_util.run_request( 522 uri=uri, 523 method=POST, 524 content_type="application/json", 525 data=data 526 ) 527 528 def file_ingest_to_dataset( 529 self, 530 dataset_id: str, 531 profile_id: str, 532 file_list: list[dict], 533 load_tag: str = "file_ingest_load_tag" 534 ) -> dict: 535 """ 536 Load files into a TDR dataset. 537 538 **Args:** 539 - dataset_id (str): The ID of the dataset. 540 - profile_id (str): The billing profile ID. 541 - file_list (list[dict]): The list of files to be ingested. 542 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 543 544 **Returns:** 545 - dict: A dictionary containing the response from the ingest operation job monitoring. 546 """ 547 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/bulk/array" 548 data = { 549 "profileId": profile_id, 550 "loadTag": f"{load_tag}", 551 "maxFailedFileLoads": 0, 552 "loadArray": file_list 553 } 554 555 response = self.request_util.run_request( 556 uri=uri, 557 method=POST, 558 content_type="application/json", 559 data=json.dumps(data) 560 ) 561 job_id = response.json()['id'] 562 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 563 return job_results # type: ignore[return-value] 564 565 def get_dataset_table_metrics( 566 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 567 ) -> list[dict]: 568 """ 569 Retrieve all metrics for a specific table within a dataset. 570 571 **Args:** 572 - dataset_id (str): The ID of the dataset. 573 - target_table_name (str): The name of the target table. 574 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 575 576 **Returns:** 577 - list[dict]: A list of dictionaries containing the metrics for the specified table. 578 """ 579 return [ 580 metric 581 for metric in self._yield_dataset_metrics( 582 dataset_id=dataset_id, 583 target_table_name=target_table_name, 584 query_limit=query_limit 585 ) 586 ] 587 588 def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any: 589 """ 590 Yield all entity metrics from a dataset. 591 592 **Args:** 593 dataset_id (str): The ID of the dataset. 594 target_table_name (str): The name of the target table. 595 query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000. 596 597 Yields: 598 Any: A generator yielding dictionaries containing the metrics for the specified table. 599 """ 600 search_request = { 601 "offset": 0, 602 "limit": query_limit, 603 "sort": "datarepo_row_id" 604 } 605 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/data/{target_table_name}" 606 while True: 607 batch_number = int((search_request["offset"] / query_limit)) + 1 # type: ignore[operator] 608 response = self.request_util.run_request( 609 uri=uri, 610 method=POST, 611 content_type="application/json", 612 data=json.dumps(search_request) 613 ) 614 if not response or not response.json()["result"]: 615 break 616 logging.info( 617 f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " + 618 f"dataset {dataset_id}" 619 ) 620 for record in response.json()["result"]: 621 yield record 622 search_request["offset"] += query_limit # type: ignore[operator] 623 624 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 625 """ 626 Get existing IDs from a dataset. 627 628 **Args:** 629 - dataset_id (str): The ID of the dataset. 630 - target_table_name (str): The name of the target table. 631 - entity_id (str): The entity ID to retrieve. 632 633 **Returns:** 634 - list[str]: A list of entity IDs from the specified table. 635 """ 636 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 637 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata] 638 639 def get_job_status(self, job_id: str) -> requests.Response: 640 """ 641 Retrieve the status of a job. 642 643 **Args:** 644 - job_id (str): The ID of the job. 645 646 **Returns:** 647 - requests.Response: The response from the request. 648 """ 649 uri = f"{self.TDR_LINK}/jobs/{job_id}" 650 return self.request_util.run_request(uri=uri, method=GET) 651 652 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 653 """ 654 Get all file UUIDs from the metadata of a dataset. 655 656 **Args:** 657 - dataset_id (str): The ID of the dataset. 658 659 **Returns:** 660 - list[str]: A list of file UUIDs from the dataset metadata. 661 """ 662 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 663 all_metadata_file_uuids = [] 664 tables = 0 665 for table in dataset_info["schema"]["tables"]: 666 tables += 1 667 table_name = table["name"] 668 logging.info(f"Getting all file information for {table_name}") 669 # Get just columns where datatype is fileref 670 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 671 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 672 # Get unique list of file uuids 673 file_uuids = list( 674 set( 675 [ 676 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 677 ] 678 ) 679 ) 680 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 681 all_metadata_file_uuids.extend(file_uuids) 682 # Make full list unique 683 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 684 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 685 return all_metadata_file_uuids 686 687 def soft_delete_entries( 688 self, 689 dataset_id: str, 690 table_name: str, 691 datarepo_row_ids: list[str], 692 check_intervals: int = 15 693 ) -> Optional[dict]: 694 """ 695 Soft delete specific records from a table. 696 697 **Args:** 698 - dataset_id (str): The ID of the dataset. 699 - table_name (str): The name of the target table. 700 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 701 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 702 703 **Returns:** 704 - dict (optional): A dictionary containing the response from the soft delete operation job 705 monitoring. Returns None if no row IDs are provided. 706 """ 707 if not datarepo_row_ids: 708 logging.info(f"No records found to soft delete in table {table_name}") 709 return None 710 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 711 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/deletes" 712 payload = { 713 "deleteType": "soft", 714 "specType": "jsonArray", 715 "tables": [ 716 { 717 "tableName": table_name, 718 "jsonArraySpec": { 719 "rowIds": datarepo_row_ids 720 } 721 } 722 ] 723 } 724 response = self.request_util.run_request( 725 method=POST, 726 uri=uri, 727 data=json.dumps(payload), 728 content_type="application/json" 729 ) 730 job_id = response.json()["id"] 731 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run() 732 733 def soft_delete_all_table_entries( 734 self, 735 dataset_id: str, 736 table_name: str, 737 query_limit: int = 1000, 738 check_intervals: int = 15 739 ) -> Optional[dict]: 740 """ 741 Soft deletes all records in a table. 742 743 **Args:** 744 - dataset_id (str): The ID of the dataset. 745 - table_name (str): The name of the target table. 746 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 747 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 748 749 **Returns:** 750 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 751 None if no row IDs are provided. 752 """ 753 dataset_metrics = self.get_dataset_table_metrics( 754 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 755 ) 756 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 757 return self.soft_delete_entries( 758 dataset_id=dataset_id, 759 table_name=table_name, 760 datarepo_row_ids=row_ids, 761 check_intervals=check_intervals 762 ) 763 764 def get_or_create_dataset( 765 self, 766 dataset_name: str, 767 billing_profile: str, 768 schema: dict, 769 description: str, 770 delete_existing: bool = False, 771 continue_if_exists: bool = False, 772 additional_properties_dict: Optional[dict] = None 773 ) -> str: 774 """ 775 Get or create a dataset. 776 777 **Args:** 778 - dataset_name (str): The name of the dataset. 779 - billing_profile (str): The billing profile ID. 780 - schema (dict): The schema of the dataset. 781 - description (str): The description of the dataset. 782 - additional_properties_dict (Optional[dict], optional): Additional properties 783 for the dataset. Defaults to None. 784 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 785 Defaults to `False`. 786 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 787 Defaults to `False`. 788 789 **Returns:** 790 - str: The ID of the dataset. 791 792 **Raises:** 793 - ValueError: If multiple datasets with the same name are found under the billing profile. 794 """ 795 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 796 if existing_datasets: 797 if not continue_if_exists: 798 raise ValueError( 799 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 800 ) 801 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 802 if delete_existing: 803 logging.info(f"Deleting existing dataset {dataset_name}") 804 self.delete_dataset(existing_datasets[0]["id"]) 805 existing_datasets = [] 806 # If not delete_existing and continue_if_exists then grab existing datasets id 807 else: 808 dataset_id = existing_datasets[0]["id"] 809 if not existing_datasets: 810 logging.info("Did not find existing dataset") 811 # Create dataset 812 dataset_id = self.create_dataset( 813 schema=schema, 814 dataset_name=dataset_name, 815 description=description, 816 profile_id=billing_profile, 817 additional_dataset_properties=additional_properties_dict 818 ) 819 return dataset_id 820 821 def create_dataset( # type: ignore[return] 822 self, 823 schema: dict, 824 dataset_name: str, 825 description: str, 826 profile_id: str, 827 additional_dataset_properties: Optional[dict] = None 828 ) -> Optional[str]: 829 """ 830 Create a new dataset. 831 832 **Args:** 833 - schema (dict): The schema of the dataset. 834 - dataset_name (str): The name of the dataset. 835 - description (str): The description of the dataset. 836 - profile_id (str): The billing profile ID. 837 - additional_dataset_properties (Optional[dict], optional): Additional 838 properties for the dataset. Defaults to None. 839 840 **Returns:** 841 - Optional[str]: The ID of the created dataset, or None if creation failed. 842 843 Raises: 844 - ValueError: If the schema validation fails. 845 """ 846 dataset_properties = { 847 "name": dataset_name, 848 "description": description, 849 "defaultProfileId": profile_id, 850 "region": "us-central1", 851 "cloudPlatform": GCP, 852 "schema": schema 853 } 854 855 if additional_dataset_properties: 856 dataset_properties.update(additional_dataset_properties) 857 try: 858 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 859 except ValidationError as e: 860 raise ValueError(f"Schema validation error: {e}") 861 uri = f"{self.TDR_LINK}/datasets" 862 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 863 response = self.request_util.run_request( 864 method=POST, 865 uri=uri, 866 data=json.dumps(dataset_properties), 867 content_type="application/json" 868 ) 869 job_id = response.json()["id"] 870 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 871 dataset_id = job_results["id"] # type: ignore[index] 872 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 873 return dataset_id 874 875 def update_dataset_schema( # type: ignore[return] 876 self, 877 dataset_id: str, 878 update_note: str, 879 tables_to_add: Optional[list[dict]] = None, 880 relationships_to_add: Optional[list[dict]] = None, 881 columns_to_add: Optional[list[dict]] = None 882 ) -> Optional[str]: 883 """ 884 Update the schema of a dataset. 885 886 **Args:** 887 - dataset_id (str): The ID of the dataset. 888 - update_note (str): A note describing the update. 889 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 890 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 891 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 892 893 **Returns:** 894 - Optional[str]: The ID of the updated dataset, or None if the update failed. 895 896 **Raises:** 897 - ValueError: If the schema validation fails. 898 """ 899 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/updateSchema" 900 request_body: dict = {"description": f"{update_note}", "changes": {}} 901 if tables_to_add: 902 request_body["changes"]["addTables"] = tables_to_add 903 if relationships_to_add: 904 request_body["changes"]["addRelationships"] = relationships_to_add 905 if columns_to_add: 906 request_body["changes"]["addColumns"] = columns_to_add 907 try: 908 UpdateSchema(**request_body) 909 except ValidationError as e: 910 raise ValueError(f"Schema validation error: {e}") 911 912 response = self.request_util.run_request( 913 uri=uri, 914 method=POST, 915 content_type="application/json", 916 data=json.dumps(request_body) 917 ) 918 job_id = response.json()["id"] 919 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 920 dataset_id = job_results["id"] # type: ignore[index] 921 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 922 return dataset_id 923 924 def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]: 925 """ 926 Get response from a batched endpoint. 927 928 Helper method for all GET endpoints that require batching. 929 930 Given the URI and the limit (optional), will 931 loop through batches until all metadata is retrieved. 932 933 **Args:** 934 - uri (str): The base URI for the endpoint (without query params for offset or limit). 935 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 936 937 **Returns:** 938 - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint. 939 """ 940 batch = 1 941 offset = 0 942 metadata: list = [] 943 while True: 944 logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata") 945 response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json() 946 947 # If no more files, break the loop 948 if not response_json: 949 logging.info(f"No more results to retrieve, found {len(metadata)} total records") 950 break 951 952 metadata.extend(response_json) 953 # Increment the offset by limit for the next page 954 offset += limit 955 batch += 1 956 return metadata 957 958 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 959 """ 960 Return all the metadata about files in a given snapshot. 961 962 Not all files can be returned at once, so the API 963 is used repeatedly until all "batches" have been returned. 964 965 **Args:** 966 - snapshot_id (str): The ID of the snapshot. 967 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 968 969 **Returns:** 970 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 971 """ 972 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}/files" 973 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 974 975 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 976 """ 977 Return snapshots belonging to specified dataset. 978 979 **Args:** 980 - dataset_id: uuid of dataset to query. 981 982 **Returns:** 983 - requests.Response: The response from the request. 984 """ 985 uri = f"{self.TDR_LINK}/snapshots?datasetIds={dataset_id}" 986 return self.request_util.run_request( 987 uri=uri, 988 method=GET 989 )
Class to interact with the Terra Data Repository (TDR) API.
25 def __init__(self, request_util: RunRequest): 26 """ 27 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 28 29 **Args:** 30 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 31 """ 32 self.request_util = request_util 33 """@private"""
Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
Args:
- request_util (
ops_utils.request_util.RunRequest
): Utility for making HTTP requests.
49 def get_dataset_files( 50 self, 51 dataset_id: str, 52 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 53 ) -> list[dict]: 54 """ 55 Get all files in a dataset. 56 57 Returns json like below 58 59 { 60 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 61 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 62 "path": "/path/set/in/ingest.csv", 63 "size": 1722, 64 "checksums": [ 65 { 66 "checksum": "82f7e79v", 67 "type": "crc32c" 68 }, 69 { 70 "checksum": "fff973507e30b74fa47a3d6830b84a90", 71 "type": "md5" 72 } 73 ], 74 "created": "2024-13-11T15:01:00.256Z", 75 "description": null, 76 "fileType": "file", 77 "fileDetail": { 78 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 79 "mimeType": null, 80 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 81 "loadTag": "RP_3333-RP_3333" 82 }, 83 "directoryDetail": null 84 } 85 86 **Args:** 87 - dataset_id (str): The ID of the dataset. 88 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 89 90 **Returns:** 91 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 92 """ 93 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files" 94 logging.info(f"Getting all files in dataset {dataset_id}") 95 return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
Get all files in a dataset.
Returns json like below
{
"fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
"collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
"path": "/path/set/in/ingest.csv",
"size": 1722,
"checksums": [
{
"checksum": "82f7e79v",
"type": "crc32c"
},
{
"checksum": "fff973507e30b74fa47a3d6830b84a90",
"type": "md5"
}
],
"created": "2024-13-11T15:01:00.256Z",
"description": null,
"fileType": "file",
"fileDetail": {
"datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
"mimeType": null,
"accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
"loadTag": "RP_3333-RP_3333"
},
"directoryDetail": null
}
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
97 def create_file_dict( 98 self, 99 dataset_id: str, 100 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 101 ) -> dict: 102 """ 103 Create a dictionary of all files in a dataset where the key is the file UUID. 104 105 **Args:** 106 - dataset_id (str): The ID of the dataset. 107 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 108 109 **Returns:** 110 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 111 """ 112 return { 113 file_dict["fileId"]: file_dict 114 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 115 }
Create a dictionary of all files in a dataset where the key is the file UUID.
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- dict: A dictionary where the key is the file UUID and the value is the file metadata.
117 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 118 self, 119 dataset_id: str, 120 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 121 ) -> dict: 122 """ 123 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 124 125 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 126 127 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 128 129 **Args:** 130 - dataset_id (str): The ID of the dataset. 131 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 132 133 **Returns:** 134 - dict: A dictionary where the key is the file UUID and the value is the file path. 135 """ 136 return { 137 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 138 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 139 }
Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
This assumes that the TDR 'path' is original path of the file in the cloud storage with gs://
stripped out.
This will ONLY work if dataset was created with experimentalSelfHosted = True
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- dict: A dictionary where the key is the file UUID and the value is the file path.
141 def get_sas_token(self, snapshot_id: str = "", dataset_id: str = "") -> dict: 142 """ 143 Get the SAS token for a snapshot OR dataset. Only one should be provided. 144 145 **Args:** 146 - snapshot_id (str, optional): The ID of the snapshot. Defaults to "". 147 - dataset_id (str, optional): The ID of the dataset. Defaults to "". 148 149 **Returns:** 150 - dict: A dictionary containing the SAS token and its expiry time. Expiry 151 time is a string like `2025-02-13T19:31:47Z`. 152 153 **Raises:** 154 - ValueError: If neither `snapshot_id` nor `dataset_id` is provided. 155 """ 156 if snapshot_id: 157 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION" 158 response = self.request_util.run_request(uri=uri, method=GET) 159 snapshot_info = json.loads(response.text) 160 sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"] 161 elif dataset_id: 162 uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include=ACCESS_INFORMATION" 163 response = self.request_util.run_request(uri=uri, method=GET) 164 snapshot_info = json.loads(response.text) 165 sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"] 166 else: 167 raise ValueError("Must provide either snapshot_id or dataset_id") 168 169 sas_expiry_time_pattern = re.compile(r"se.+?(?=\&sp)") 170 expiry_time_str = sas_expiry_time_pattern.search(sas_token) 171 time_str = unquote(expiry_time_str.group()).replace("se=", "") # type: ignore[union-attr] 172 173 return {"sas_token": sas_token, "expiry_time": time_str}
Get the SAS token for a snapshot OR dataset. Only one should be provided.
Args:
- snapshot_id (str, optional): The ID of the snapshot. Defaults to "".
- dataset_id (str, optional): The ID of the dataset. Defaults to "".
Returns:
- dict: A dictionary containing the SAS token and its expiry time. Expiry
time is a string like
2025-02-13T19:31:47Z
.
Raises:
- ValueError: If neither
snapshot_id
nordataset_id
is provided.
175 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 176 """ 177 Delete a file from a dataset. 178 179 **Args:** 180 - file_id (str): The ID of the file to be deleted. 181 - dataset_id (str): The ID of the dataset. 182 183 **Returns:** 184 - requests.Response: The response from the request. 185 """ 186 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/{file_id}" 187 logging.info(f"Submitting delete job for file {file_id}") 188 return self.request_util.run_request(uri=uri, method=DELETE)
Delete a file from a dataset.
Args:
- file_id (str): The ID of the file to be deleted.
- dataset_id (str): The ID of the dataset.
Returns:
- requests.Response: The response from the request.
190 def delete_files( 191 self, 192 file_ids: list[str], 193 dataset_id: str, 194 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 195 check_interval: int = 15) -> None: 196 """ 197 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 198 199 **Args:** 200 - file_ids (list[str]): A list of file IDs to be deleted. 201 - dataset_id (str): The ID of the dataset. 202 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 203 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 204 """ 205 SubmitAndMonitorMultipleJobs( 206 tdr=self, 207 job_function=self.delete_file, 208 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 209 batch_size=batch_size_to_delete_files, 210 check_interval=check_interval 211 ).run()
Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
Args:
- file_ids (list[str]): A list of file IDs to be deleted.
- dataset_id (str): The ID of the dataset.
- batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to
200
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
213 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 214 """ 215 Add a user to a dataset with a specified policy. 216 217 **Args:** 218 - dataset_id (str): The ID of the dataset. 219 - user (str): The email of the user to be added. 220 - policy (str): The policy to be assigned to the user. 221 Must be one of `steward`, `custodian`, or `snapshot_creator`. 222 223 **Returns:** 224 - requests.Response: The response from the request. 225 226 **Raises:** 227 - ValueError: If the policy is not valid. 228 """ 229 self._check_policy(policy) 230 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members" 231 member_dict = {"email": user} 232 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 233 return self.request_util.run_request( 234 uri=uri, 235 method=POST, 236 data=json.dumps(member_dict), 237 content_type="application/json" 238 )
Add a user to a dataset with a specified policy.
Args:
- dataset_id (str): The ID of the dataset.
- user (str): The email of the user to be added.
- policy (str): The policy to be assigned to the user.
Must be one of
steward
,custodian
, orsnapshot_creator
.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If the policy is not valid.
240 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 241 """ 242 Remove a user from a dataset. 243 244 **Args:** 245 - dataset_id (str): The ID of the dataset. 246 - user (str): The email of the user to be removed. 247 - policy (str): The policy to be removed from the user. 248 Must be one of `steward`, `custodian`, or `snapshot_creator`. 249 250 **Returns:** 251 - requests.Response: The response from the request. 252 253 **Raises:** 254 - ValueError: If the policy is not valid. 255 """ 256 self._check_policy(policy) 257 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members/{user}" 258 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 259 return self.request_util.run_request(uri=uri, method=DELETE)
Remove a user from a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- user (str): The email of the user to be removed.
- policy (str): The policy to be removed from the user.
Must be one of
steward
,custodian
, orsnapshot_creator
.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If the policy is not valid.
261 def delete_dataset(self, dataset_id: str) -> None: 262 """ 263 Delete a dataset and monitors the job until completion. 264 265 **Args:** 266 dataset_id (str): The ID of the dataset to be deleted. 267 """ 268 uri = f"{self.TDR_LINK}/datasets/{dataset_id}" 269 logging.info(f"Deleting dataset {dataset_id}") 270 response = self.request_util.run_request(uri=uri, method=DELETE) 271 job_id = response.json()['id'] 272 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
Delete a dataset and monitors the job until completion.
Args: dataset_id (str): The ID of the dataset to be deleted.
274 def get_snapshot_info( 275 self, 276 snapshot_id: str, 277 continue_not_found: bool = False, 278 info_to_include: Optional[list[str]] = None 279 ) -> Optional[requests.Response]: 280 """ 281 Get information about a snapshot. 282 283 **Args:** 284 - snapshot_id (str): The ID of the snapshot. 285 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 286 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 287 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 288 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 289 290 **Returns:** 291 - requests.Response (optional): The response from the request (returns None if the snapshot is not 292 found or access is denied). 293 """ 294 acceptable_return_code = [404, 403] if continue_not_found else [] 295 acceptable_include_info = [ 296 "SOURCES", 297 "TABLES", 298 "RELATIONSHIPS", 299 "ACCESS_INFORMATION", 300 "PROFILE", 301 "PROPERTIES", 302 "DATA_PROJECT", 303 "CREATION_INFORMATION", 304 "DUOS" 305 ] 306 if info_to_include: 307 if not all(info in acceptable_include_info for info in info_to_include): 308 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 309 include_string = '&include='.join(info_to_include) 310 else: 311 include_string = "" 312 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include={include_string}" 313 response = self.request_util.run_request( 314 uri=uri, 315 method=GET, 316 accept_return_codes=acceptable_return_code 317 ) 318 if response.status_code == 404: 319 logging.warning(f"Snapshot {snapshot_id} not found") 320 return None 321 if response.status_code == 403: 322 logging.warning(f"Access denied for snapshot {snapshot_id}") 323 return None 324 return response
Get information about a snapshot.
Args:
- snapshot_id (str): The ID of the snapshot.
- continue_not_found (bool, optional): Whether to accept a
404
response. Defaults toFalse
. - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
Options are:
SOURCES
,TABLES
,RELATIONSHIPS
,ACCESS_INFORMATION
,PROFILE
,PROPERTIES
,DATA_PROJECT
,CREATION_INFORMATION
,DUOS
Returns:
- requests.Response (optional): The response from the request (returns None if the snapshot is not found or access is denied).
326 def delete_snapshots( 327 self, 328 snapshot_ids: list[str], 329 batch_size: int = 25, 330 check_interval: int = 10, 331 verbose: bool = False) -> None: 332 """ 333 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 334 335 **Args:** 336 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 337 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 338 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 339 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 340 """ 341 SubmitAndMonitorMultipleJobs( 342 tdr=self, 343 job_function=self.delete_snapshot, 344 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 345 batch_size=batch_size, 346 check_interval=check_interval, 347 verbose=verbose 348 ).run()
Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
Args:
- snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
- batch_size (int, optional): The number of snapshots to delete per batch. Defaults to
25
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
10
. - verbose (bool, optional): Whether to log detailed information about each job. Defaults to
False
.
350 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 351 """ 352 Delete a snapshot. 353 354 **Args:** 355 - snapshot_id (str): The ID of the snapshot to be deleted. 356 357 **Returns:** 358 - requests.Response: The response from the request. 359 """ 360 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}" 361 logging.info(f"Deleting snapshot {snapshot_id}") 362 return self.request_util.run_request(uri=uri, method=DELETE)
Delete a snapshot.
Args:
- snapshot_id (str): The ID of the snapshot to be deleted.
Returns:
- requests.Response: The response from the request.
397 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 398 """ 399 Check if a dataset exists by name and optionally by billing profile. 400 401 **Args:** 402 - dataset_name (str): The name of the dataset to check. 403 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 404 405 **Returns:** 406 - list[dict]: A list of matching datasets. 407 """ 408 matching_datasets = [] 409 for dataset in self._yield_existing_datasets(filter=dataset_name): 410 # Search uses wildcard so could grab more datasets where dataset_name is substring 411 if dataset_name == dataset["name"]: 412 if billing_profile: 413 if dataset["defaultProfileId"] == billing_profile: 414 logging.info( 415 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 416 dataset_id = dataset["id"] 417 logging.info(f"Dataset ID: {dataset_id}") 418 matching_datasets.append(dataset) 419 else: 420 logging.warning( 421 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 422 f"and not under billing profile {billing_profile}" 423 ) 424 # Datasets names need to be unique regardless of billing profile, so raise an error if 425 # a dataset with the same name is found but is not under the requested billing profile 426 raise ValueError( 427 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 428 else: 429 matching_datasets.append(dataset) 430 return matching_datasets
Check if a dataset exists by name and optionally by billing profile.
Args:
- dataset_name (str): The name of the dataset to check.
- billing_profile (str, optional): The billing profile ID to match. Defaults to None.
Returns:
- list[dict]: A list of matching datasets.
432 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 433 """ 434 Get information about a dataset. 435 436 **Args:** 437 - dataset_id (str): The ID of the dataset. 438 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 439 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 440 Defaults to None. 441 442 **Returns:** 443 - requests.Response: The response from the request. 444 445 **Raises:** 446 - ValueError: If `info_to_include` contains invalid information types. 447 """ 448 acceptable_include_info = [ 449 "SCHEMA", 450 "ACCESS_INFORMATION", 451 "PROFILE", 452 "PROPERTIES", 453 "DATA_PROJECT", 454 "STORAGE", 455 "SNAPSHOT_BUILDER_SETTING" 456 ] 457 if info_to_include: 458 if not all(info in acceptable_include_info for info in info_to_include): 459 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 460 include_string = '&include='.join(info_to_include) 461 else: 462 include_string = "" 463 uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include={include_string}" 464 return self.request_util.run_request(uri=uri, method=GET)
Get information about a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- info_to_include (list[str], optional): A list of additional information to include. Valid options include:
SCHEMA
,ACCESS_INFORMATION
,PROFILE
,PROPERTIES
,DATA_PROJECT
,STORAGE
,SNAPSHOT_BUILDER_SETTING
. Defaults to None.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If
info_to_include
contains invalid information types.
466 def get_table_schema_info( 467 self, 468 dataset_id: str, 469 table_name: str, 470 dataset_info: Optional[dict] = None 471 ) -> Union[dict, None]: 472 """ 473 Get schema information for a specific table within a dataset. 474 475 **Args:** 476 - dataset_id (str): The ID of the dataset. 477 - table_name (str): The name of the table. 478 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 479 480 **Returns:** 481 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 482 """ 483 if not dataset_info: 484 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 485 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 486 if table["name"] == table_name: 487 return table 488 return None
Get schema information for a specific table within a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the table.
- dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
Returns:
- Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
490 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 491 """ 492 Retrieve the result of a job. 493 494 **Args:** 495 - job_id (str): The ID of the job. 496 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 497 498 **Returns:** 499 - requests.Response: The response from the request. 500 """ 501 uri = f"{self.TDR_LINK}/jobs/{job_id}/result" 502 # If job is expected to fail, accept any return code 503 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 504 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
Retrieve the result of a job.
Args:
- job_id (str): The ID of the job.
- expect_failure (bool, optional): Whether the job is expected to fail. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
506 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 507 """ 508 Load data into a TDR dataset. 509 510 **Args:** 511 - dataset_id (str): The ID of the dataset. 512 - data (dict): The data to be ingested. 513 514 **Returns:** 515 - requests.Response: The response from the request. 516 """ 517 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/ingest" 518 logging.info( 519 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 520 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 521 return self.request_util.run_request( 522 uri=uri, 523 method=POST, 524 content_type="application/json", 525 data=data 526 )
Load data into a TDR dataset.
Args:
- dataset_id (str): The ID of the dataset.
- data (dict): The data to be ingested.
Returns:
- requests.Response: The response from the request.
528 def file_ingest_to_dataset( 529 self, 530 dataset_id: str, 531 profile_id: str, 532 file_list: list[dict], 533 load_tag: str = "file_ingest_load_tag" 534 ) -> dict: 535 """ 536 Load files into a TDR dataset. 537 538 **Args:** 539 - dataset_id (str): The ID of the dataset. 540 - profile_id (str): The billing profile ID. 541 - file_list (list[dict]): The list of files to be ingested. 542 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 543 544 **Returns:** 545 - dict: A dictionary containing the response from the ingest operation job monitoring. 546 """ 547 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/bulk/array" 548 data = { 549 "profileId": profile_id, 550 "loadTag": f"{load_tag}", 551 "maxFailedFileLoads": 0, 552 "loadArray": file_list 553 } 554 555 response = self.request_util.run_request( 556 uri=uri, 557 method=POST, 558 content_type="application/json", 559 data=json.dumps(data) 560 ) 561 job_id = response.json()['id'] 562 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 563 return job_results # type: ignore[return-value]
Load files into a TDR dataset.
Args:
- dataset_id (str): The ID of the dataset.
- profile_id (str): The billing profile ID.
- file_list (list[dict]): The list of files to be ingested.
- load_tag (str): The tag to be used in the ingest job. Defaults to
file_ingest_load_tag
.
Returns:
- dict: A dictionary containing the response from the ingest operation job monitoring.
565 def get_dataset_table_metrics( 566 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 567 ) -> list[dict]: 568 """ 569 Retrieve all metrics for a specific table within a dataset. 570 571 **Args:** 572 - dataset_id (str): The ID of the dataset. 573 - target_table_name (str): The name of the target table. 574 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 575 576 **Returns:** 577 - list[dict]: A list of dictionaries containing the metrics for the specified table. 578 """ 579 return [ 580 metric 581 for metric in self._yield_dataset_metrics( 582 dataset_id=dataset_id, 583 target_table_name=target_table_name, 584 query_limit=query_limit 585 ) 586 ]
Retrieve all metrics for a specific table within a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- target_table_name (str): The name of the target table.
- query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
.
Returns:
- list[dict]: A list of dictionaries containing the metrics for the specified table.
624 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 625 """ 626 Get existing IDs from a dataset. 627 628 **Args:** 629 - dataset_id (str): The ID of the dataset. 630 - target_table_name (str): The name of the target table. 631 - entity_id (str): The entity ID to retrieve. 632 633 **Returns:** 634 - list[str]: A list of entity IDs from the specified table. 635 """ 636 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 637 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
Get existing IDs from a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- target_table_name (str): The name of the target table.
- entity_id (str): The entity ID to retrieve.
Returns:
- list[str]: A list of entity IDs from the specified table.
639 def get_job_status(self, job_id: str) -> requests.Response: 640 """ 641 Retrieve the status of a job. 642 643 **Args:** 644 - job_id (str): The ID of the job. 645 646 **Returns:** 647 - requests.Response: The response from the request. 648 """ 649 uri = f"{self.TDR_LINK}/jobs/{job_id}" 650 return self.request_util.run_request(uri=uri, method=GET)
Retrieve the status of a job.
Args:
- job_id (str): The ID of the job.
Returns:
- requests.Response: The response from the request.
652 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 653 """ 654 Get all file UUIDs from the metadata of a dataset. 655 656 **Args:** 657 - dataset_id (str): The ID of the dataset. 658 659 **Returns:** 660 - list[str]: A list of file UUIDs from the dataset metadata. 661 """ 662 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 663 all_metadata_file_uuids = [] 664 tables = 0 665 for table in dataset_info["schema"]["tables"]: 666 tables += 1 667 table_name = table["name"] 668 logging.info(f"Getting all file information for {table_name}") 669 # Get just columns where datatype is fileref 670 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 671 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 672 # Get unique list of file uuids 673 file_uuids = list( 674 set( 675 [ 676 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 677 ] 678 ) 679 ) 680 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 681 all_metadata_file_uuids.extend(file_uuids) 682 # Make full list unique 683 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 684 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 685 return all_metadata_file_uuids
Get all file UUIDs from the metadata of a dataset.
Args:
- dataset_id (str): The ID of the dataset.
Returns:
- list[str]: A list of file UUIDs from the dataset metadata.
687 def soft_delete_entries( 688 self, 689 dataset_id: str, 690 table_name: str, 691 datarepo_row_ids: list[str], 692 check_intervals: int = 15 693 ) -> Optional[dict]: 694 """ 695 Soft delete specific records from a table. 696 697 **Args:** 698 - dataset_id (str): The ID of the dataset. 699 - table_name (str): The name of the target table. 700 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 701 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 702 703 **Returns:** 704 - dict (optional): A dictionary containing the response from the soft delete operation job 705 monitoring. Returns None if no row IDs are provided. 706 """ 707 if not datarepo_row_ids: 708 logging.info(f"No records found to soft delete in table {table_name}") 709 return None 710 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 711 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/deletes" 712 payload = { 713 "deleteType": "soft", 714 "specType": "jsonArray", 715 "tables": [ 716 { 717 "tableName": table_name, 718 "jsonArraySpec": { 719 "rowIds": datarepo_row_ids 720 } 721 } 722 ] 723 } 724 response = self.request_util.run_request( 725 method=POST, 726 uri=uri, 727 data=json.dumps(payload), 728 content_type="application/json" 729 ) 730 job_id = response.json()["id"] 731 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
Soft delete specific records from a table.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the target table.
- datarepo_row_ids (list[str]): A list of row IDs to be deleted.
- check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
Returns:
- dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
733 def soft_delete_all_table_entries( 734 self, 735 dataset_id: str, 736 table_name: str, 737 query_limit: int = 1000, 738 check_intervals: int = 15 739 ) -> Optional[dict]: 740 """ 741 Soft deletes all records in a table. 742 743 **Args:** 744 - dataset_id (str): The ID of the dataset. 745 - table_name (str): The name of the target table. 746 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 747 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 748 749 **Returns:** 750 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 751 None if no row IDs are provided. 752 """ 753 dataset_metrics = self.get_dataset_table_metrics( 754 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 755 ) 756 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 757 return self.soft_delete_entries( 758 dataset_id=dataset_id, 759 table_name=table_name, 760 datarepo_row_ids=row_ids, 761 check_intervals=check_intervals 762 )
Soft deletes all records in a table.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the target table.
- query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
. - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
Returns:
- dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
764 def get_or_create_dataset( 765 self, 766 dataset_name: str, 767 billing_profile: str, 768 schema: dict, 769 description: str, 770 delete_existing: bool = False, 771 continue_if_exists: bool = False, 772 additional_properties_dict: Optional[dict] = None 773 ) -> str: 774 """ 775 Get or create a dataset. 776 777 **Args:** 778 - dataset_name (str): The name of the dataset. 779 - billing_profile (str): The billing profile ID. 780 - schema (dict): The schema of the dataset. 781 - description (str): The description of the dataset. 782 - additional_properties_dict (Optional[dict], optional): Additional properties 783 for the dataset. Defaults to None. 784 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 785 Defaults to `False`. 786 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 787 Defaults to `False`. 788 789 **Returns:** 790 - str: The ID of the dataset. 791 792 **Raises:** 793 - ValueError: If multiple datasets with the same name are found under the billing profile. 794 """ 795 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 796 if existing_datasets: 797 if not continue_if_exists: 798 raise ValueError( 799 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 800 ) 801 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 802 if delete_existing: 803 logging.info(f"Deleting existing dataset {dataset_name}") 804 self.delete_dataset(existing_datasets[0]["id"]) 805 existing_datasets = [] 806 # If not delete_existing and continue_if_exists then grab existing datasets id 807 else: 808 dataset_id = existing_datasets[0]["id"] 809 if not existing_datasets: 810 logging.info("Did not find existing dataset") 811 # Create dataset 812 dataset_id = self.create_dataset( 813 schema=schema, 814 dataset_name=dataset_name, 815 description=description, 816 profile_id=billing_profile, 817 additional_dataset_properties=additional_properties_dict 818 ) 819 return dataset_id
Get or create a dataset.
Args:
- dataset_name (str): The name of the dataset.
- billing_profile (str): The billing profile ID.
- schema (dict): The schema of the dataset.
- description (str): The description of the dataset.
- additional_properties_dict (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
- delete_existing (bool, optional): Whether to delete the existing dataset if found.
Defaults to
False
. - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
Defaults to
False
.
Returns:
- str: The ID of the dataset.
Raises:
- ValueError: If multiple datasets with the same name are found under the billing profile.
821 def create_dataset( # type: ignore[return] 822 self, 823 schema: dict, 824 dataset_name: str, 825 description: str, 826 profile_id: str, 827 additional_dataset_properties: Optional[dict] = None 828 ) -> Optional[str]: 829 """ 830 Create a new dataset. 831 832 **Args:** 833 - schema (dict): The schema of the dataset. 834 - dataset_name (str): The name of the dataset. 835 - description (str): The description of the dataset. 836 - profile_id (str): The billing profile ID. 837 - additional_dataset_properties (Optional[dict], optional): Additional 838 properties for the dataset. Defaults to None. 839 840 **Returns:** 841 - Optional[str]: The ID of the created dataset, or None if creation failed. 842 843 Raises: 844 - ValueError: If the schema validation fails. 845 """ 846 dataset_properties = { 847 "name": dataset_name, 848 "description": description, 849 "defaultProfileId": profile_id, 850 "region": "us-central1", 851 "cloudPlatform": GCP, 852 "schema": schema 853 } 854 855 if additional_dataset_properties: 856 dataset_properties.update(additional_dataset_properties) 857 try: 858 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 859 except ValidationError as e: 860 raise ValueError(f"Schema validation error: {e}") 861 uri = f"{self.TDR_LINK}/datasets" 862 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 863 response = self.request_util.run_request( 864 method=POST, 865 uri=uri, 866 data=json.dumps(dataset_properties), 867 content_type="application/json" 868 ) 869 job_id = response.json()["id"] 870 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 871 dataset_id = job_results["id"] # type: ignore[index] 872 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 873 return dataset_id
Create a new dataset.
Args:
- schema (dict): The schema of the dataset.
- dataset_name (str): The name of the dataset.
- description (str): The description of the dataset.
- profile_id (str): The billing profile ID.
- additional_dataset_properties (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
Returns:
- Optional[str]: The ID of the created dataset, or None if creation failed.
Raises:
- ValueError: If the schema validation fails.
875 def update_dataset_schema( # type: ignore[return] 876 self, 877 dataset_id: str, 878 update_note: str, 879 tables_to_add: Optional[list[dict]] = None, 880 relationships_to_add: Optional[list[dict]] = None, 881 columns_to_add: Optional[list[dict]] = None 882 ) -> Optional[str]: 883 """ 884 Update the schema of a dataset. 885 886 **Args:** 887 - dataset_id (str): The ID of the dataset. 888 - update_note (str): A note describing the update. 889 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 890 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 891 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 892 893 **Returns:** 894 - Optional[str]: The ID of the updated dataset, or None if the update failed. 895 896 **Raises:** 897 - ValueError: If the schema validation fails. 898 """ 899 uri = f"{self.TDR_LINK}/datasets/{dataset_id}/updateSchema" 900 request_body: dict = {"description": f"{update_note}", "changes": {}} 901 if tables_to_add: 902 request_body["changes"]["addTables"] = tables_to_add 903 if relationships_to_add: 904 request_body["changes"]["addRelationships"] = relationships_to_add 905 if columns_to_add: 906 request_body["changes"]["addColumns"] = columns_to_add 907 try: 908 UpdateSchema(**request_body) 909 except ValidationError as e: 910 raise ValueError(f"Schema validation error: {e}") 911 912 response = self.request_util.run_request( 913 uri=uri, 914 method=POST, 915 content_type="application/json", 916 data=json.dumps(request_body) 917 ) 918 job_id = response.json()["id"] 919 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 920 dataset_id = job_results["id"] # type: ignore[index] 921 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 922 return dataset_id
Update the schema of a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- update_note (str): A note describing the update.
- tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
- relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
- columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
Returns:
- Optional[str]: The ID of the updated dataset, or None if the update failed.
Raises:
- ValueError: If the schema validation fails.
958 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 959 """ 960 Return all the metadata about files in a given snapshot. 961 962 Not all files can be returned at once, so the API 963 is used repeatedly until all "batches" have been returned. 964 965 **Args:** 966 - snapshot_id (str): The ID of the snapshot. 967 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 968 969 **Returns:** 970 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 971 """ 972 uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}/files" 973 return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
Return all the metadata about files in a given snapshot.
Not all files can be returned at once, so the API is used repeatedly until all "batches" have been returned.
Args:
- snapshot_id (str): The ID of the snapshot.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
.
Returns:
- list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
975 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 976 """ 977 Return snapshots belonging to specified dataset. 978 979 **Args:** 980 - dataset_id: uuid of dataset to query. 981 982 **Returns:** 983 - requests.Response: The response from the request. 984 """ 985 uri = f"{self.TDR_LINK}/snapshots?datasetIds={dataset_id}" 986 return self.request_util.run_request( 987 uri=uri, 988 method=GET 989 )
Return snapshots belonging to specified dataset.
Args:
- dataset_id: uuid of dataset to query.
Returns:
- requests.Response: The response from the request.
992class FilterOutSampleIdsAlreadyInDataset: 993 """Class to filter ingest metrics to remove sample IDs that already exist in the dataset.""" 994 995 def __init__( 996 self, 997 ingest_metrics: list[dict], 998 dataset_id: str, 999 tdr: TDR, 1000 target_table_name: str, 1001 filter_entity_id: str 1002 ): 1003 """ 1004 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1005 1006 **Args:** 1007 - ingest_metrics (list[dict]): The metrics to be ingested. 1008 - dataset_id (str): The ID of the dataset. 1009 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1010 - target_table_name (str): The name of the target table. 1011 - filter_entity_id (str): The entity ID to filter on. 1012 """ 1013 self.ingest_metrics = ingest_metrics 1014 """@private""" 1015 self.tdr = tdr 1016 """@private""" 1017 self.dataset_id = dataset_id 1018 """@private""" 1019 self.target_table_name = target_table_name 1020 """@private""" 1021 self.filter_entity_id = filter_entity_id 1022 """@private""" 1023 1024 def run(self) -> list[dict]: 1025 """ 1026 Run the filter process to remove sample IDs that already exist in the dataset. 1027 1028 **Returns:** 1029 - list[dict]: The filtered ingest metrics. 1030 """ 1031 # Get all sample ids that already exist in dataset 1032 logging.info( 1033 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1034 f"dataset {self.dataset_id}" 1035 ) 1036 1037 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1038 dataset_id=self.dataset_id, 1039 target_table_name=self.target_table_name, 1040 entity_id=self.filter_entity_id 1041 ) 1042 # Filter out rows that already exist in dataset 1043 filtered_ingest_metrics = [ 1044 row 1045 for row in self.ingest_metrics 1046 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1047 ] 1048 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1049 logging.info( 1050 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1051 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1052 ) 1053 1054 if filtered_ingest_metrics: 1055 return filtered_ingest_metrics 1056 else: 1057 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1058 return [] 1059 else: 1060 logging.info("No rows were filtered out as they all do not exist in dataset") 1061 return filtered_ingest_metrics
Class to filter ingest metrics to remove sample IDs that already exist in the dataset.
995 def __init__( 996 self, 997 ingest_metrics: list[dict], 998 dataset_id: str, 999 tdr: TDR, 1000 target_table_name: str, 1001 filter_entity_id: str 1002 ): 1003 """ 1004 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1005 1006 **Args:** 1007 - ingest_metrics (list[dict]): The metrics to be ingested. 1008 - dataset_id (str): The ID of the dataset. 1009 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1010 - target_table_name (str): The name of the target table. 1011 - filter_entity_id (str): The entity ID to filter on. 1012 """ 1013 self.ingest_metrics = ingest_metrics 1014 """@private""" 1015 self.tdr = tdr 1016 """@private""" 1017 self.dataset_id = dataset_id 1018 """@private""" 1019 self.target_table_name = target_table_name 1020 """@private""" 1021 self.filter_entity_id = filter_entity_id 1022 """@private"""
Initialize the FilterOutSampleIdsAlreadyInDataset class.
Args:
- ingest_metrics (list[dict]): The metrics to be ingested.
- dataset_id (str): The ID of the dataset.
- tdr (
ops_utils.tdr_utils.tdr_utils.TDR
): The TDR instance - target_table_name (str): The name of the target table.
- filter_entity_id (str): The entity ID to filter on.
1024 def run(self) -> list[dict]: 1025 """ 1026 Run the filter process to remove sample IDs that already exist in the dataset. 1027 1028 **Returns:** 1029 - list[dict]: The filtered ingest metrics. 1030 """ 1031 # Get all sample ids that already exist in dataset 1032 logging.info( 1033 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1034 f"dataset {self.dataset_id}" 1035 ) 1036 1037 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1038 dataset_id=self.dataset_id, 1039 target_table_name=self.target_table_name, 1040 entity_id=self.filter_entity_id 1041 ) 1042 # Filter out rows that already exist in dataset 1043 filtered_ingest_metrics = [ 1044 row 1045 for row in self.ingest_metrics 1046 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1047 ] 1048 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1049 logging.info( 1050 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1051 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1052 ) 1053 1054 if filtered_ingest_metrics: 1055 return filtered_ingest_metrics 1056 else: 1057 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1058 return [] 1059 else: 1060 logging.info("No rows were filtered out as they all do not exist in dataset") 1061 return filtered_ingest_metrics
Run the filter process to remove sample IDs that already exist in the dataset.
Returns:
- list[dict]: The filtered ingest metrics.