ops_utils.tdr_utils.tdr_api_utils
Utility classes for interacting with TDR API.
1"""Utility classes for interacting with TDR API.""" 2 3import json 4import logging 5import requests 6from typing import Any, Optional, Union 7from pydantic import ValidationError 8 9from ..request_util import GET, POST, DELETE, PUT, RunRequest 10from ..tdr_api_schema.create_dataset_schema import CreateDatasetSchema 11from ..tdr_api_schema.update_dataset_schema import UpdateSchema 12from .tdr_job_utils import MonitorTDRJob, SubmitAndMonitorMultipleJobs 13from ..vars import ARG_DEFAULTS, GCP, APPLICATION_JSON 14 15 16class TDR: 17 """Class to interact with the Terra Data Repository (TDR) API.""" 18 19 PROD_LINK = "https://data.terra.bio/api/repository/v1" 20 DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1" 21 """(str): The base URL for the TDR API.""" 22 23 def __init__(self, request_util: RunRequest, env: str = 'prod', dry_run: bool = False): 24 """ 25 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 26 27 **Args:** 28 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 29 """ 30 self.request_util = request_util 31 # NOTE: dry_run is not fully implemented in this class, only in delete_files_and_snapshots 32 self.dry_run = dry_run 33 if env.lower() == 'prod': 34 self.tdr_link = self.PROD_LINK 35 elif env.lower() == 'dev': 36 self.tdr_link = self.DEV_LINK 37 else: 38 raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.") 39 """@private""" 40 41 @staticmethod 42 def _check_policy(policy: str) -> None: 43 """ 44 Check if the policy is valid. 45 46 **Args:** 47 - policy (str): The role to check. 48 49 **Raises:** 50 - ValueError: If the policy is not one of the allowed options. 51 """ 52 if policy not in ["steward", "custodian", "snapshot_creator"]: 53 raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator") 54 55 def get_dataset_files( 56 self, 57 dataset_id: str, 58 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 59 ) -> list[dict]: 60 """ 61 Get all files in a dataset. 62 63 Returns json like below 64 65 { 66 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 67 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 68 "path": "/path/set/in/ingest.csv", 69 "size": 1722, 70 "checksums": [ 71 { 72 "checksum": "82f7e79v", 73 "type": "crc32c" 74 }, 75 { 76 "checksum": "fff973507e30b74fa47a3d6830b84a90", 77 "type": "md5" 78 } 79 ], 80 "created": "2024-13-11T15:01:00.256Z", 81 "description": null, 82 "fileType": "file", 83 "fileDetail": { 84 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 85 "mimeType": null, 86 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 87 "loadTag": "RP_3333-RP_3333" 88 }, 89 "directoryDetail": null 90 } 91 92 **Args:** 93 - dataset_id (str): The ID of the dataset. 94 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 95 96 **Returns:** 97 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 98 """ 99 uri = f"{self.tdr_link}/datasets/{dataset_id}/files" 100 logging.info(f"Getting all files in dataset {dataset_id}") 101 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 102 103 def create_file_dict( 104 self, 105 dataset_id: str, 106 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 107 ) -> dict: 108 """ 109 Create a dictionary of all files in a dataset where the key is the file UUID. 110 111 **Args:** 112 - dataset_id (str): The ID of the dataset. 113 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 114 115 **Returns:** 116 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 117 """ 118 return { 119 file_dict["fileId"]: file_dict 120 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 121 } 122 123 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 124 self, 125 dataset_id: str, 126 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 127 ) -> dict: 128 """ 129 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 130 131 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 132 133 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 134 135 **Args:** 136 - dataset_id (str): The ID of the dataset. 137 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 138 139 **Returns:** 140 - dict: A dictionary where the key is the file UUID and the value is the file path. 141 """ 142 return { 143 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 144 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 145 } 146 147 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 148 """ 149 Delete a file from a dataset. 150 151 **Args:** 152 - file_id (str): The ID of the file to be deleted. 153 - dataset_id (str): The ID of the dataset. 154 155 **Returns:** 156 - requests.Response: The response from the request. 157 """ 158 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}" 159 logging.info(f"Submitting delete job for file {file_id}") 160 return self.request_util.run_request(uri=uri, method=DELETE) 161 162 def delete_files( 163 self, 164 file_ids: list[str], 165 dataset_id: str, 166 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 167 check_interval: int = 15) -> None: 168 """ 169 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 170 171 **Args:** 172 - file_ids (list[str]): A list of file IDs to be deleted. 173 - dataset_id (str): The ID of the dataset. 174 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 175 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 176 """ 177 SubmitAndMonitorMultipleJobs( 178 tdr=self, 179 job_function=self.delete_file, 180 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 181 batch_size=batch_size_to_delete_files, 182 check_interval=check_interval 183 ).run() 184 185 def _delete_snapshots_for_files(self, dataset_id: str, file_ids: set[str]) -> None: 186 """Delete snapshots that reference any of the provided file IDs.""" 187 snapshots_resp = self.get_dataset_snapshots(dataset_id=dataset_id) 188 snapshot_items = snapshots_resp.json().get('items', []) 189 snapshots_to_delete = [] 190 logging.info( 191 "Checking %d snapshots for references", 192 len(snapshot_items), 193 ) 194 for snap in snapshot_items: 195 snap_id = snap.get('id') 196 if not snap_id: 197 continue 198 snap_files = self.get_files_from_snapshot(snapshot_id=snap_id) 199 snap_file_ids = { 200 fd.get('fileId') for fd in snap_files if fd.get('fileId') 201 } 202 # Use set intersection to check for any matching file IDs 203 if snap_file_ids & file_ids: 204 snapshots_to_delete.append(snap_id) 205 if snapshots_to_delete: 206 self.delete_snapshots(snapshot_ids=snapshots_to_delete) 207 else: 208 logging.info("No snapshots reference the provided file ids") 209 210 def _dry_run_msg(self) -> str: 211 return '[Dry run] ' if self.dry_run else '' 212 213 def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None: 214 """Delete files from a dataset by their IDs, handling snapshots.""" 215 self._delete_snapshots_for_files(dataset_id=dataset_id, file_ids=file_ids) 216 217 logging.info( 218 f"{self._dry_run_msg()}Submitting delete request for {len(file_ids)} files in " 219 f"dataset {dataset_id}") 220 if not self.dry_run: 221 self.delete_files( 222 file_ids=list(file_ids), 223 dataset_id=dataset_id 224 ) 225 226 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 227 """ 228 Add a user to a dataset with a specified policy. 229 230 **Args:** 231 - dataset_id (str): The ID of the dataset. 232 - user (str): The email of the user to be added. 233 - policy (str): The policy to be assigned to the user. 234 Must be one of `steward`, `custodian`, or `snapshot_creator`. 235 236 **Returns:** 237 - requests.Response: The response from the request. 238 239 **Raises:** 240 - ValueError: If the policy is not valid. 241 """ 242 self._check_policy(policy) 243 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members" 244 member_dict = {"email": user} 245 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 246 return self.request_util.run_request( 247 uri=uri, 248 method=POST, 249 data=json.dumps(member_dict), 250 content_type=APPLICATION_JSON 251 ) 252 253 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 254 """ 255 Remove a user from a dataset. 256 257 **Args:** 258 - dataset_id (str): The ID of the dataset. 259 - user (str): The email of the user to be removed. 260 - policy (str): The policy to be removed from the user. 261 Must be one of `steward`, `custodian`, or `snapshot_creator`. 262 263 **Returns:** 264 - requests.Response: The response from the request. 265 266 **Raises:** 267 - ValueError: If the policy is not valid. 268 """ 269 self._check_policy(policy) 270 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}" 271 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 272 return self.request_util.run_request(uri=uri, method=DELETE) 273 274 def delete_dataset(self, dataset_id: str) -> None: 275 """ 276 Delete a dataset and monitors the job until completion. 277 278 **Args:** 279 dataset_id (str): The ID of the dataset to be deleted. 280 """ 281 uri = f"{self.tdr_link}/datasets/{dataset_id}" 282 logging.info(f"Deleting dataset {dataset_id}") 283 response = self.request_util.run_request(uri=uri, method=DELETE) 284 job_id = response.json()['id'] 285 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run() 286 287 def make_snapshot_public(self, snapshot_id: str) -> requests.Response: 288 """ 289 Make a snapshot public. 290 291 **Args:** 292 - snapshot_id (str): The ID of the snapshot to be made public. 293 294 **Returns:** 295 - requests.Response: The response from the request. 296 """ 297 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public" 298 logging.info(f"Making snapshot {snapshot_id} public") 299 return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true") 300 301 def get_snapshot_info( 302 self, 303 snapshot_id: str, 304 continue_not_found: bool = False, 305 info_to_include: Optional[list[str]] = None 306 ) -> Optional[requests.Response]: 307 """ 308 Get information about a snapshot. 309 310 **Args:** 311 - snapshot_id (str): The ID of the snapshot. 312 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 313 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 314 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 315 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 316 317 **Returns:** 318 - requests.Response (optional): The response from the request (returns None if the snapshot is not 319 found or access is denied). 320 """ 321 acceptable_return_code = [404, 403] if continue_not_found else [] 322 acceptable_include_info = [ 323 "SOURCES", 324 "TABLES", 325 "RELATIONSHIPS", 326 "ACCESS_INFORMATION", 327 "PROFILE", 328 "PROPERTIES", 329 "DATA_PROJECT", 330 "CREATION_INFORMATION", 331 "DUOS" 332 ] 333 if info_to_include: 334 if not all(info in acceptable_include_info for info in info_to_include): 335 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 336 include_string = '&include='.join(info_to_include) 337 else: 338 include_string = "" 339 uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}" 340 response = self.request_util.run_request( 341 uri=uri, 342 method=GET, 343 accept_return_codes=acceptable_return_code 344 ) 345 if response.status_code == 404: 346 logging.warning(f"Snapshot {snapshot_id} not found") 347 return None 348 if response.status_code == 403: 349 logging.warning(f"Access denied for snapshot {snapshot_id}") 350 return None 351 return response 352 353 def delete_snapshots( 354 self, 355 snapshot_ids: list[str], 356 batch_size: int = 25, 357 check_interval: int = 10, 358 verbose: bool = False) -> None: 359 """ 360 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 361 362 **Args:** 363 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 364 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 365 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 366 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 367 """ 368 logging.info(f"{self._dry_run_msg()}Deleting {len(snapshot_ids)} snapshots") 369 if not self.dry_run: 370 SubmitAndMonitorMultipleJobs( 371 tdr=self, 372 job_function=self.delete_snapshot, 373 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 374 batch_size=batch_size, 375 check_interval=check_interval, 376 verbose=verbose 377 ).run() 378 379 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 380 """ 381 Delete a snapshot. 382 383 **Args:** 384 - snapshot_id (str): The ID of the snapshot to be deleted. 385 386 **Returns:** 387 - requests.Response: The response from the request. 388 """ 389 uri = f"{self.tdr_link}/snapshots/{snapshot_id}" 390 logging.info(f"Deleting snapshot {snapshot_id}") 391 return self.request_util.run_request(uri=uri, method=DELETE) 392 393 def _yield_existing_datasets( 394 self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc" 395 ) -> Any: 396 """ 397 Get all datasets in TDR, optionally filtered by dataset name. 398 399 **Args:** 400 filter (Optional[str]): A filter string to match dataset names. Defaults to None. 401 batch_size (int): The number of datasets to retrieve per batch. Defaults to 100. 402 direction (str): The direction to sort the datasets by creation date. Defaults to "asc". 403 404 Yields: 405 Any: A generator yielding datasets. 406 """ 407 offset = 0 408 if filter: 409 filter_str = f"&filter={filter}" 410 log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}" 411 else: 412 filter_str = "" 413 log_message = f"Searching for all datasets in batches of {batch_size}" 414 logging.info(log_message) 415 while True: 416 uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501 417 response = self.request_util.run_request(uri=uri, method=GET) 418 datasets = response.json()["items"] 419 if not datasets: 420 break 421 for dataset in datasets: 422 yield dataset 423 offset += batch_size 424 break 425 426 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 427 """ 428 Check if a dataset exists by name and optionally by billing profile. 429 430 **Args:** 431 - dataset_name (str): The name of the dataset to check. 432 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 433 434 **Returns:** 435 - list[dict]: A list of matching datasets. 436 """ 437 matching_datasets = [] 438 for dataset in self._yield_existing_datasets(filter=dataset_name): 439 # Search uses wildcard so could grab more datasets where dataset_name is substring 440 if dataset_name == dataset["name"]: 441 if billing_profile: 442 if dataset["defaultProfileId"] == billing_profile: 443 logging.info( 444 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 445 dataset_id = dataset["id"] 446 logging.info(f"Dataset ID: {dataset_id}") 447 matching_datasets.append(dataset) 448 else: 449 logging.warning( 450 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 451 f"and not under billing profile {billing_profile}" 452 ) 453 # Datasets names need to be unique regardless of billing profile, so raise an error if 454 # a dataset with the same name is found but is not under the requested billing profile 455 raise ValueError( 456 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 457 else: 458 matching_datasets.append(dataset) 459 return matching_datasets 460 461 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 462 """ 463 Get information about a dataset. 464 465 **Args:** 466 - dataset_id (str): The ID of the dataset. 467 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 468 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 469 Defaults to None. 470 471 **Returns:** 472 - requests.Response: The response from the request. 473 474 **Raises:** 475 - ValueError: If `info_to_include` contains invalid information types. 476 """ 477 acceptable_include_info = [ 478 "SCHEMA", 479 "ACCESS_INFORMATION", 480 "PROFILE", 481 "PROPERTIES", 482 "DATA_PROJECT", 483 "STORAGE", 484 "SNAPSHOT_BUILDER_SETTING" 485 ] 486 if info_to_include: 487 if not all(info in acceptable_include_info for info in info_to_include): 488 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 489 include_string = '&include='.join(info_to_include) 490 else: 491 include_string = "" 492 uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}" 493 return self.request_util.run_request(uri=uri, method=GET) 494 495 def get_table_schema_info( 496 self, 497 dataset_id: str, 498 table_name: str, 499 dataset_info: Optional[dict] = None 500 ) -> Union[dict, None]: 501 """ 502 Get schema information for a specific table within a dataset. 503 504 **Args:** 505 - dataset_id (str): The ID of the dataset. 506 - table_name (str): The name of the table. 507 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 508 509 **Returns:** 510 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 511 """ 512 if not dataset_info: 513 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 514 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 515 if table["name"] == table_name: 516 return table 517 return None 518 519 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 520 """ 521 Retrieve the result of a job. 522 523 **Args:** 524 - job_id (str): The ID of the job. 525 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 526 527 **Returns:** 528 - requests.Response: The response from the request. 529 """ 530 uri = f"{self.tdr_link}/jobs/{job_id}/result" 531 # If job is expected to fail, accept any return code 532 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 533 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code) 534 535 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 536 """ 537 Load data into a TDR dataset. 538 539 **Args:** 540 - dataset_id (str): The ID of the dataset. 541 - data (dict): The data to be ingested. 542 543 **Returns:** 544 - requests.Response: The response from the request. 545 """ 546 uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest" 547 logging.info( 548 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 549 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 550 return self.request_util.run_request( 551 uri=uri, 552 method=POST, 553 content_type=APPLICATION_JSON, 554 data=data 555 ) 556 557 def file_ingest_to_dataset( 558 self, 559 dataset_id: str, 560 profile_id: str, 561 file_list: list[dict], 562 load_tag: str = "file_ingest_load_tag" 563 ) -> dict: 564 """ 565 Load files into a TDR dataset. 566 567 **Args:** 568 - dataset_id (str): The ID of the dataset. 569 - profile_id (str): The billing profile ID. 570 - file_list (list[dict]): The list of files to be ingested. 571 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 572 573 **Returns:** 574 - dict: A dictionary containing the response from the ingest operation job monitoring. 575 """ 576 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array" 577 data = { 578 "profileId": profile_id, 579 "loadTag": f"{load_tag}", 580 "maxFailedFileLoads": 0, 581 "loadArray": file_list 582 } 583 584 response = self.request_util.run_request( 585 uri=uri, 586 method=POST, 587 content_type=APPLICATION_JSON, 588 data=json.dumps(data) 589 ) 590 job_id = response.json()['id'] 591 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 592 return job_results # type: ignore[return-value] 593 594 def get_dataset_table_metrics( 595 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 596 ) -> list[dict]: 597 """ 598 Retrieve all metrics for a specific table within a dataset. 599 600 **Args:** 601 - dataset_id (str): The ID of the dataset. 602 - target_table_name (str): The name of the target table. 603 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 604 605 **Returns:** 606 - list[dict]: A list of dictionaries containing the metrics for the specified table. 607 """ 608 return [ 609 metric 610 for metric in self._yield_dataset_metrics( 611 dataset_id=dataset_id, 612 target_table_name=target_table_name, 613 query_limit=query_limit 614 ) 615 ] 616 617 def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any: 618 """ 619 Yield all entity metrics from a dataset. 620 621 **Args:** 622 dataset_id (str): The ID of the dataset. 623 target_table_name (str): The name of the target table. 624 query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000. 625 626 Yields: 627 Any: A generator yielding dictionaries containing the metrics for the specified table. 628 """ 629 search_request = { 630 "offset": 0, 631 "limit": query_limit, 632 "sort": "datarepo_row_id" 633 } 634 uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}" 635 while True: 636 batch_number = int((search_request["offset"] / query_limit)) + 1 # type: ignore[operator] 637 response = self.request_util.run_request( 638 uri=uri, 639 method=POST, 640 content_type=APPLICATION_JSON, 641 data=json.dumps(search_request) 642 ) 643 if not response or not response.json()["result"]: 644 break 645 logging.info( 646 f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " + 647 f"dataset {dataset_id}" 648 ) 649 for record in response.json()["result"]: 650 yield record 651 search_request["offset"] += query_limit # type: ignore[operator] 652 653 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 654 """ 655 Get existing IDs from a dataset. 656 657 **Args:** 658 - dataset_id (str): The ID of the dataset. 659 - target_table_name (str): The name of the target table. 660 - entity_id (str): The entity ID to retrieve. 661 662 **Returns:** 663 - list[str]: A list of entity IDs from the specified table. 664 """ 665 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 666 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata] 667 668 def get_job_status(self, job_id: str) -> requests.Response: 669 """ 670 Retrieve the status of a job. 671 672 **Args:** 673 - job_id (str): The ID of the job. 674 675 **Returns:** 676 - requests.Response: The response from the request. 677 """ 678 uri = f"{self.tdr_link}/jobs/{job_id}" 679 return self.request_util.run_request(uri=uri, method=GET) 680 681 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 682 """ 683 Get all file UUIDs from the metadata of a dataset. 684 685 **Args:** 686 - dataset_id (str): The ID of the dataset. 687 688 **Returns:** 689 - list[str]: A list of file UUIDs from the dataset metadata. 690 """ 691 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 692 all_metadata_file_uuids = [] 693 tables = 0 694 for table in dataset_info["schema"]["tables"]: 695 tables += 1 696 table_name = table["name"] 697 logging.info(f"Getting all file information for {table_name}") 698 # Get just columns where datatype is fileref 699 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 700 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 701 # Get unique list of file uuids 702 file_uuids = list( 703 set( 704 [ 705 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 706 ] 707 ) 708 ) 709 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 710 all_metadata_file_uuids.extend(file_uuids) 711 # Make full list unique 712 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 713 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 714 return all_metadata_file_uuids 715 716 def soft_delete_entries( 717 self, 718 dataset_id: str, 719 table_name: str, 720 datarepo_row_ids: list[str], 721 check_intervals: int = 15 722 ) -> Optional[dict]: 723 """ 724 Soft delete specific records from a table. 725 726 **Args:** 727 - dataset_id (str): The ID of the dataset. 728 - table_name (str): The name of the target table. 729 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 730 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 731 732 **Returns:** 733 - dict (optional): A dictionary containing the response from the soft delete operation job 734 monitoring. Returns None if no row IDs are provided. 735 """ 736 if not datarepo_row_ids: 737 logging.info(f"No records found to soft delete in table {table_name}") 738 return None 739 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 740 uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes" 741 payload = { 742 "deleteType": "soft", 743 "specType": "jsonArray", 744 "tables": [ 745 { 746 "tableName": table_name, 747 "jsonArraySpec": { 748 "rowIds": datarepo_row_ids 749 } 750 } 751 ] 752 } 753 response = self.request_util.run_request( 754 method=POST, 755 uri=uri, 756 data=json.dumps(payload), 757 content_type=APPLICATION_JSON 758 ) 759 job_id = response.json()["id"] 760 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run() 761 762 def soft_delete_all_table_entries( 763 self, 764 dataset_id: str, 765 table_name: str, 766 query_limit: int = 1000, 767 check_intervals: int = 15 768 ) -> Optional[dict]: 769 """ 770 Soft deletes all records in a table. 771 772 **Args:** 773 - dataset_id (str): The ID of the dataset. 774 - table_name (str): The name of the target table. 775 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 776 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 777 778 **Returns:** 779 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 780 None if no row IDs are provided. 781 """ 782 dataset_metrics = self.get_dataset_table_metrics( 783 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 784 ) 785 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 786 return self.soft_delete_entries( 787 dataset_id=dataset_id, 788 table_name=table_name, 789 datarepo_row_ids=row_ids, 790 check_intervals=check_intervals 791 ) 792 793 def get_or_create_dataset( 794 self, 795 dataset_name: str, 796 billing_profile: str, 797 schema: dict, 798 description: str, 799 relationships: Optional[list[dict]] = None, 800 delete_existing: bool = False, 801 continue_if_exists: bool = False, 802 additional_properties_dict: Optional[dict] = None 803 ) -> str: 804 """ 805 Get or create a dataset. 806 807 **Args:** 808 - dataset_name (str): The name of the dataset. 809 - billing_profile (str): The billing profile ID. 810 - schema (dict): The schema of the dataset. 811 - description (str): The description of the dataset. 812 - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. 813 Defaults to None. 814 - additional_properties_dict (Optional[dict], optional): Additional properties 815 for the dataset. Defaults to None. 816 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 817 Defaults to `False`. 818 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 819 Defaults to `False`. 820 821 **Returns:** 822 - str: The ID of the dataset. 823 824 **Raises:** 825 - ValueError: If multiple datasets with the same name are found under the billing profile. 826 """ 827 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 828 if existing_datasets: 829 if not continue_if_exists: 830 raise ValueError( 831 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 832 ) 833 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 834 if delete_existing: 835 logging.info(f"Deleting existing dataset {dataset_name}") 836 self.delete_dataset(existing_datasets[0]["id"]) 837 existing_datasets = [] 838 # If not delete_existing and continue_if_exists then grab existing datasets id 839 else: 840 dataset_id = existing_datasets[0]["id"] 841 if not existing_datasets: 842 logging.info("Did not find existing dataset") 843 # Create dataset 844 dataset_id = self.create_dataset( 845 schema=schema, 846 dataset_name=dataset_name, 847 description=description, 848 profile_id=billing_profile, 849 additional_dataset_properties=additional_properties_dict 850 ) 851 return dataset_id 852 853 def create_dataset( # type: ignore[return] 854 self, 855 schema: dict, 856 dataset_name: str, 857 description: str, 858 profile_id: str, 859 additional_dataset_properties: Optional[dict] = None 860 ) -> Optional[str]: 861 """ 862 Create a new dataset. 863 864 **Args:** 865 - schema (dict): The schema of the dataset. 866 - dataset_name (str): The name of the dataset. 867 - description (str): The description of the dataset. 868 - profile_id (str): The billing profile ID. 869 - additional_dataset_properties (Optional[dict], optional): Additional 870 properties for the dataset. Defaults to None. 871 872 **Returns:** 873 - Optional[str]: The ID of the created dataset, or None if creation failed. 874 875 Raises: 876 - ValueError: If the schema validation fails. 877 """ 878 dataset_properties = { 879 "name": dataset_name, 880 "description": description, 881 "defaultProfileId": profile_id, 882 "region": "us-central1", 883 "cloudPlatform": GCP, 884 "schema": schema 885 } 886 887 if additional_dataset_properties: 888 dataset_properties.update(additional_dataset_properties) 889 try: 890 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 891 except ValidationError as e: 892 raise ValueError(f"Schema validation error: {e}") 893 uri = f"{self.tdr_link}/datasets" 894 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 895 response = self.request_util.run_request( 896 method=POST, 897 uri=uri, 898 data=json.dumps(dataset_properties), 899 content_type=APPLICATION_JSON 900 ) 901 job_id = response.json()["id"] 902 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 903 dataset_id = job_results["id"] # type: ignore[index] 904 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 905 return dataset_id 906 907 def update_dataset_schema( # type: ignore[return] 908 self, 909 dataset_id: str, 910 update_note: str, 911 tables_to_add: Optional[list[dict]] = None, 912 relationships_to_add: Optional[list[dict]] = None, 913 columns_to_add: Optional[list[dict]] = None 914 ) -> Optional[str]: 915 """ 916 Update the schema of a dataset. 917 918 **Args:** 919 - dataset_id (str): The ID of the dataset. 920 - update_note (str): A note describing the update. 921 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 922 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 923 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 924 925 **Returns:** 926 - Optional[str]: The ID of the updated dataset, or None if the update failed. 927 928 **Raises:** 929 - ValueError: If the schema validation fails. 930 """ 931 uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema" 932 request_body: dict = {"description": f"{update_note}", "changes": {}} 933 if tables_to_add: 934 request_body["changes"]["addTables"] = tables_to_add 935 if relationships_to_add: 936 request_body["changes"]["addRelationships"] = relationships_to_add 937 if columns_to_add: 938 request_body["changes"]["addColumns"] = columns_to_add 939 try: 940 UpdateSchema(**request_body) 941 except ValidationError as e: 942 raise ValueError(f"Schema validation error: {e}") 943 944 response = self.request_util.run_request( 945 uri=uri, 946 method=POST, 947 content_type=APPLICATION_JSON, 948 data=json.dumps(request_body) 949 ) 950 job_id = response.json()["id"] 951 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 952 dataset_id = job_results["id"] # type: ignore[index] 953 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 954 return dataset_id 955 956 def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]: 957 """ 958 Get response from a batched endpoint. 959 960 Helper method for all GET endpoints that require batching. 961 962 Given the URI and the limit (optional), will 963 loop through batches until all metadata is retrieved. 964 965 **Args:** 966 - uri (str): The base URI for the endpoint (without query params for offset or limit). 967 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 968 969 **Returns:** 970 - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint. 971 """ 972 batch = 1 973 offset = 0 974 metadata: list = [] 975 while True: 976 logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata") 977 response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json() 978 979 # If no more files, break the loop 980 if not response_json: 981 logging.info(f"No more results to retrieve, found {len(metadata)} total records") 982 break 983 984 metadata.extend(response_json) 985 if len(response_json) < limit: 986 logging.info(f"Retrieved final batch of results, found {len(metadata)} total records") 987 break 988 989 # Increment the offset by limit for the next page 990 offset += limit 991 batch += 1 992 return metadata 993 994 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 995 """ 996 Return all the metadata about files in a given snapshot. 997 998 Not all files can be returned at once, so the API 999 is used repeatedly until all "batches" have been returned. 1000 1001 **Args:** 1002 - snapshot_id (str): The ID of the snapshot. 1003 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 1004 1005 **Returns:** 1006 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 1007 """ 1008 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files" 1009 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 1010 1011 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 1012 """ 1013 Return snapshots belonging to specified dataset. 1014 1015 **Args:** 1016 - dataset_id: uuid of dataset to query. 1017 1018 **Returns:** 1019 - requests.Response: The response from the request. 1020 """ 1021 uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}" 1022 return self.request_util.run_request( 1023 uri=uri, 1024 method=GET 1025 ) 1026 1027 def create_snapshot( 1028 self, 1029 snapshot_name: str, 1030 description: str, 1031 dataset_name: str, 1032 snapshot_mode: str, # byFullView is entire dataset 1033 profile_id: str, 1034 stewards: Optional[list[str]] = [], 1035 readers: Optional[list[str]] = [], 1036 consent_code: Optional[str] = None, 1037 duos_id: Optional[str] = None, 1038 data_access_control_groups: Optional[list[str]] = None, 1039 ) -> None: 1040 """ 1041 Create a snapshot in TDR. 1042 1043 **Returns:** 1044 - requests.Response: The response from the request. 1045 """ 1046 uri = f"{self.tdr_link}/snapshots" 1047 payload = { 1048 "name": snapshot_name, 1049 "description": description, 1050 "contents": [ 1051 { 1052 "datasetName": dataset_name, 1053 "mode": snapshot_mode, 1054 } 1055 ], 1056 "policies": { 1057 "stewards": stewards, 1058 "readers": readers, 1059 }, 1060 "profileId": profile_id, 1061 "globalFileIds": True, 1062 } 1063 if consent_code: 1064 payload["consentCode"] = consent_code 1065 if duos_id: 1066 payload["duosId"] = duos_id 1067 if data_access_control_groups: 1068 payload["dataAccessControlGroups"] = data_access_control_groups 1069 logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}") 1070 response = self.request_util.run_request( 1071 uri=uri, 1072 method=POST, 1073 content_type=APPLICATION_JSON, 1074 data=json.dumps(payload) 1075 ) 1076 job_id = response.json()["id"] 1077 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 1078 snapshot_id = job_results["id"] # type: ignore[index] 1079 logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}") 1080 1081 1082class FilterOutSampleIdsAlreadyInDataset: 1083 """Class to filter ingest metrics to remove sample IDs that already exist in the dataset.""" 1084 1085 def __init__( 1086 self, 1087 ingest_metrics: list[dict], 1088 dataset_id: str, 1089 tdr: TDR, 1090 target_table_name: str, 1091 filter_entity_id: str 1092 ): 1093 """ 1094 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1095 1096 **Args:** 1097 - ingest_metrics (list[dict]): The metrics to be ingested. 1098 - dataset_id (str): The ID of the dataset. 1099 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1100 - target_table_name (str): The name of the target table. 1101 - filter_entity_id (str): The entity ID to filter on. 1102 """ 1103 self.ingest_metrics = ingest_metrics 1104 """@private""" 1105 self.tdr = tdr 1106 """@private""" 1107 self.dataset_id = dataset_id 1108 """@private""" 1109 self.target_table_name = target_table_name 1110 """@private""" 1111 self.filter_entity_id = filter_entity_id 1112 """@private""" 1113 1114 def run(self) -> list[dict]: 1115 """ 1116 Run the filter process to remove sample IDs that already exist in the dataset. 1117 1118 **Returns:** 1119 - list[dict]: The filtered ingest metrics. 1120 """ 1121 # Get all sample ids that already exist in dataset 1122 logging.info( 1123 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1124 f"dataset {self.dataset_id}" 1125 ) 1126 1127 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1128 dataset_id=self.dataset_id, 1129 target_table_name=self.target_table_name, 1130 entity_id=self.filter_entity_id 1131 ) 1132 # Filter out rows that already exist in dataset 1133 filtered_ingest_metrics = [ 1134 row 1135 for row in self.ingest_metrics 1136 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1137 ] 1138 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1139 logging.info( 1140 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1141 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1142 ) 1143 1144 if filtered_ingest_metrics: 1145 return filtered_ingest_metrics 1146 else: 1147 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1148 return [] 1149 else: 1150 logging.info("No rows were filtered out as they all do not exist in dataset") 1151 return filtered_ingest_metrics
17class TDR: 18 """Class to interact with the Terra Data Repository (TDR) API.""" 19 20 PROD_LINK = "https://data.terra.bio/api/repository/v1" 21 DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1" 22 """(str): The base URL for the TDR API.""" 23 24 def __init__(self, request_util: RunRequest, env: str = 'prod', dry_run: bool = False): 25 """ 26 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 27 28 **Args:** 29 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 30 """ 31 self.request_util = request_util 32 # NOTE: dry_run is not fully implemented in this class, only in delete_files_and_snapshots 33 self.dry_run = dry_run 34 if env.lower() == 'prod': 35 self.tdr_link = self.PROD_LINK 36 elif env.lower() == 'dev': 37 self.tdr_link = self.DEV_LINK 38 else: 39 raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.") 40 """@private""" 41 42 @staticmethod 43 def _check_policy(policy: str) -> None: 44 """ 45 Check if the policy is valid. 46 47 **Args:** 48 - policy (str): The role to check. 49 50 **Raises:** 51 - ValueError: If the policy is not one of the allowed options. 52 """ 53 if policy not in ["steward", "custodian", "snapshot_creator"]: 54 raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator") 55 56 def get_dataset_files( 57 self, 58 dataset_id: str, 59 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 60 ) -> list[dict]: 61 """ 62 Get all files in a dataset. 63 64 Returns json like below 65 66 { 67 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 68 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 69 "path": "/path/set/in/ingest.csv", 70 "size": 1722, 71 "checksums": [ 72 { 73 "checksum": "82f7e79v", 74 "type": "crc32c" 75 }, 76 { 77 "checksum": "fff973507e30b74fa47a3d6830b84a90", 78 "type": "md5" 79 } 80 ], 81 "created": "2024-13-11T15:01:00.256Z", 82 "description": null, 83 "fileType": "file", 84 "fileDetail": { 85 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 86 "mimeType": null, 87 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 88 "loadTag": "RP_3333-RP_3333" 89 }, 90 "directoryDetail": null 91 } 92 93 **Args:** 94 - dataset_id (str): The ID of the dataset. 95 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 96 97 **Returns:** 98 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 99 """ 100 uri = f"{self.tdr_link}/datasets/{dataset_id}/files" 101 logging.info(f"Getting all files in dataset {dataset_id}") 102 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 103 104 def create_file_dict( 105 self, 106 dataset_id: str, 107 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 108 ) -> dict: 109 """ 110 Create a dictionary of all files in a dataset where the key is the file UUID. 111 112 **Args:** 113 - dataset_id (str): The ID of the dataset. 114 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 115 116 **Returns:** 117 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 118 """ 119 return { 120 file_dict["fileId"]: file_dict 121 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 122 } 123 124 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 125 self, 126 dataset_id: str, 127 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 128 ) -> dict: 129 """ 130 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 131 132 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 133 134 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 135 136 **Args:** 137 - dataset_id (str): The ID of the dataset. 138 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 139 140 **Returns:** 141 - dict: A dictionary where the key is the file UUID and the value is the file path. 142 """ 143 return { 144 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 145 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 146 } 147 148 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 149 """ 150 Delete a file from a dataset. 151 152 **Args:** 153 - file_id (str): The ID of the file to be deleted. 154 - dataset_id (str): The ID of the dataset. 155 156 **Returns:** 157 - requests.Response: The response from the request. 158 """ 159 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}" 160 logging.info(f"Submitting delete job for file {file_id}") 161 return self.request_util.run_request(uri=uri, method=DELETE) 162 163 def delete_files( 164 self, 165 file_ids: list[str], 166 dataset_id: str, 167 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 168 check_interval: int = 15) -> None: 169 """ 170 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 171 172 **Args:** 173 - file_ids (list[str]): A list of file IDs to be deleted. 174 - dataset_id (str): The ID of the dataset. 175 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 176 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 177 """ 178 SubmitAndMonitorMultipleJobs( 179 tdr=self, 180 job_function=self.delete_file, 181 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 182 batch_size=batch_size_to_delete_files, 183 check_interval=check_interval 184 ).run() 185 186 def _delete_snapshots_for_files(self, dataset_id: str, file_ids: set[str]) -> None: 187 """Delete snapshots that reference any of the provided file IDs.""" 188 snapshots_resp = self.get_dataset_snapshots(dataset_id=dataset_id) 189 snapshot_items = snapshots_resp.json().get('items', []) 190 snapshots_to_delete = [] 191 logging.info( 192 "Checking %d snapshots for references", 193 len(snapshot_items), 194 ) 195 for snap in snapshot_items: 196 snap_id = snap.get('id') 197 if not snap_id: 198 continue 199 snap_files = self.get_files_from_snapshot(snapshot_id=snap_id) 200 snap_file_ids = { 201 fd.get('fileId') for fd in snap_files if fd.get('fileId') 202 } 203 # Use set intersection to check for any matching file IDs 204 if snap_file_ids & file_ids: 205 snapshots_to_delete.append(snap_id) 206 if snapshots_to_delete: 207 self.delete_snapshots(snapshot_ids=snapshots_to_delete) 208 else: 209 logging.info("No snapshots reference the provided file ids") 210 211 def _dry_run_msg(self) -> str: 212 return '[Dry run] ' if self.dry_run else '' 213 214 def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None: 215 """Delete files from a dataset by their IDs, handling snapshots.""" 216 self._delete_snapshots_for_files(dataset_id=dataset_id, file_ids=file_ids) 217 218 logging.info( 219 f"{self._dry_run_msg()}Submitting delete request for {len(file_ids)} files in " 220 f"dataset {dataset_id}") 221 if not self.dry_run: 222 self.delete_files( 223 file_ids=list(file_ids), 224 dataset_id=dataset_id 225 ) 226 227 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 228 """ 229 Add a user to a dataset with a specified policy. 230 231 **Args:** 232 - dataset_id (str): The ID of the dataset. 233 - user (str): The email of the user to be added. 234 - policy (str): The policy to be assigned to the user. 235 Must be one of `steward`, `custodian`, or `snapshot_creator`. 236 237 **Returns:** 238 - requests.Response: The response from the request. 239 240 **Raises:** 241 - ValueError: If the policy is not valid. 242 """ 243 self._check_policy(policy) 244 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members" 245 member_dict = {"email": user} 246 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 247 return self.request_util.run_request( 248 uri=uri, 249 method=POST, 250 data=json.dumps(member_dict), 251 content_type=APPLICATION_JSON 252 ) 253 254 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 255 """ 256 Remove a user from a dataset. 257 258 **Args:** 259 - dataset_id (str): The ID of the dataset. 260 - user (str): The email of the user to be removed. 261 - policy (str): The policy to be removed from the user. 262 Must be one of `steward`, `custodian`, or `snapshot_creator`. 263 264 **Returns:** 265 - requests.Response: The response from the request. 266 267 **Raises:** 268 - ValueError: If the policy is not valid. 269 """ 270 self._check_policy(policy) 271 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}" 272 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 273 return self.request_util.run_request(uri=uri, method=DELETE) 274 275 def delete_dataset(self, dataset_id: str) -> None: 276 """ 277 Delete a dataset and monitors the job until completion. 278 279 **Args:** 280 dataset_id (str): The ID of the dataset to be deleted. 281 """ 282 uri = f"{self.tdr_link}/datasets/{dataset_id}" 283 logging.info(f"Deleting dataset {dataset_id}") 284 response = self.request_util.run_request(uri=uri, method=DELETE) 285 job_id = response.json()['id'] 286 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run() 287 288 def make_snapshot_public(self, snapshot_id: str) -> requests.Response: 289 """ 290 Make a snapshot public. 291 292 **Args:** 293 - snapshot_id (str): The ID of the snapshot to be made public. 294 295 **Returns:** 296 - requests.Response: The response from the request. 297 """ 298 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public" 299 logging.info(f"Making snapshot {snapshot_id} public") 300 return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true") 301 302 def get_snapshot_info( 303 self, 304 snapshot_id: str, 305 continue_not_found: bool = False, 306 info_to_include: Optional[list[str]] = None 307 ) -> Optional[requests.Response]: 308 """ 309 Get information about a snapshot. 310 311 **Args:** 312 - snapshot_id (str): The ID of the snapshot. 313 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 314 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 315 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 316 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 317 318 **Returns:** 319 - requests.Response (optional): The response from the request (returns None if the snapshot is not 320 found or access is denied). 321 """ 322 acceptable_return_code = [404, 403] if continue_not_found else [] 323 acceptable_include_info = [ 324 "SOURCES", 325 "TABLES", 326 "RELATIONSHIPS", 327 "ACCESS_INFORMATION", 328 "PROFILE", 329 "PROPERTIES", 330 "DATA_PROJECT", 331 "CREATION_INFORMATION", 332 "DUOS" 333 ] 334 if info_to_include: 335 if not all(info in acceptable_include_info for info in info_to_include): 336 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 337 include_string = '&include='.join(info_to_include) 338 else: 339 include_string = "" 340 uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}" 341 response = self.request_util.run_request( 342 uri=uri, 343 method=GET, 344 accept_return_codes=acceptable_return_code 345 ) 346 if response.status_code == 404: 347 logging.warning(f"Snapshot {snapshot_id} not found") 348 return None 349 if response.status_code == 403: 350 logging.warning(f"Access denied for snapshot {snapshot_id}") 351 return None 352 return response 353 354 def delete_snapshots( 355 self, 356 snapshot_ids: list[str], 357 batch_size: int = 25, 358 check_interval: int = 10, 359 verbose: bool = False) -> None: 360 """ 361 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 362 363 **Args:** 364 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 365 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 366 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 367 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 368 """ 369 logging.info(f"{self._dry_run_msg()}Deleting {len(snapshot_ids)} snapshots") 370 if not self.dry_run: 371 SubmitAndMonitorMultipleJobs( 372 tdr=self, 373 job_function=self.delete_snapshot, 374 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 375 batch_size=batch_size, 376 check_interval=check_interval, 377 verbose=verbose 378 ).run() 379 380 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 381 """ 382 Delete a snapshot. 383 384 **Args:** 385 - snapshot_id (str): The ID of the snapshot to be deleted. 386 387 **Returns:** 388 - requests.Response: The response from the request. 389 """ 390 uri = f"{self.tdr_link}/snapshots/{snapshot_id}" 391 logging.info(f"Deleting snapshot {snapshot_id}") 392 return self.request_util.run_request(uri=uri, method=DELETE) 393 394 def _yield_existing_datasets( 395 self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc" 396 ) -> Any: 397 """ 398 Get all datasets in TDR, optionally filtered by dataset name. 399 400 **Args:** 401 filter (Optional[str]): A filter string to match dataset names. Defaults to None. 402 batch_size (int): The number of datasets to retrieve per batch. Defaults to 100. 403 direction (str): The direction to sort the datasets by creation date. Defaults to "asc". 404 405 Yields: 406 Any: A generator yielding datasets. 407 """ 408 offset = 0 409 if filter: 410 filter_str = f"&filter={filter}" 411 log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}" 412 else: 413 filter_str = "" 414 log_message = f"Searching for all datasets in batches of {batch_size}" 415 logging.info(log_message) 416 while True: 417 uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}" # noqa: E501 418 response = self.request_util.run_request(uri=uri, method=GET) 419 datasets = response.json()["items"] 420 if not datasets: 421 break 422 for dataset in datasets: 423 yield dataset 424 offset += batch_size 425 break 426 427 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 428 """ 429 Check if a dataset exists by name and optionally by billing profile. 430 431 **Args:** 432 - dataset_name (str): The name of the dataset to check. 433 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 434 435 **Returns:** 436 - list[dict]: A list of matching datasets. 437 """ 438 matching_datasets = [] 439 for dataset in self._yield_existing_datasets(filter=dataset_name): 440 # Search uses wildcard so could grab more datasets where dataset_name is substring 441 if dataset_name == dataset["name"]: 442 if billing_profile: 443 if dataset["defaultProfileId"] == billing_profile: 444 logging.info( 445 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 446 dataset_id = dataset["id"] 447 logging.info(f"Dataset ID: {dataset_id}") 448 matching_datasets.append(dataset) 449 else: 450 logging.warning( 451 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 452 f"and not under billing profile {billing_profile}" 453 ) 454 # Datasets names need to be unique regardless of billing profile, so raise an error if 455 # a dataset with the same name is found but is not under the requested billing profile 456 raise ValueError( 457 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 458 else: 459 matching_datasets.append(dataset) 460 return matching_datasets 461 462 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 463 """ 464 Get information about a dataset. 465 466 **Args:** 467 - dataset_id (str): The ID of the dataset. 468 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 469 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 470 Defaults to None. 471 472 **Returns:** 473 - requests.Response: The response from the request. 474 475 **Raises:** 476 - ValueError: If `info_to_include` contains invalid information types. 477 """ 478 acceptable_include_info = [ 479 "SCHEMA", 480 "ACCESS_INFORMATION", 481 "PROFILE", 482 "PROPERTIES", 483 "DATA_PROJECT", 484 "STORAGE", 485 "SNAPSHOT_BUILDER_SETTING" 486 ] 487 if info_to_include: 488 if not all(info in acceptable_include_info for info in info_to_include): 489 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 490 include_string = '&include='.join(info_to_include) 491 else: 492 include_string = "" 493 uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}" 494 return self.request_util.run_request(uri=uri, method=GET) 495 496 def get_table_schema_info( 497 self, 498 dataset_id: str, 499 table_name: str, 500 dataset_info: Optional[dict] = None 501 ) -> Union[dict, None]: 502 """ 503 Get schema information for a specific table within a dataset. 504 505 **Args:** 506 - dataset_id (str): The ID of the dataset. 507 - table_name (str): The name of the table. 508 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 509 510 **Returns:** 511 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 512 """ 513 if not dataset_info: 514 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 515 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 516 if table["name"] == table_name: 517 return table 518 return None 519 520 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 521 """ 522 Retrieve the result of a job. 523 524 **Args:** 525 - job_id (str): The ID of the job. 526 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 527 528 **Returns:** 529 - requests.Response: The response from the request. 530 """ 531 uri = f"{self.tdr_link}/jobs/{job_id}/result" 532 # If job is expected to fail, accept any return code 533 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 534 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code) 535 536 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 537 """ 538 Load data into a TDR dataset. 539 540 **Args:** 541 - dataset_id (str): The ID of the dataset. 542 - data (dict): The data to be ingested. 543 544 **Returns:** 545 - requests.Response: The response from the request. 546 """ 547 uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest" 548 logging.info( 549 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 550 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 551 return self.request_util.run_request( 552 uri=uri, 553 method=POST, 554 content_type=APPLICATION_JSON, 555 data=data 556 ) 557 558 def file_ingest_to_dataset( 559 self, 560 dataset_id: str, 561 profile_id: str, 562 file_list: list[dict], 563 load_tag: str = "file_ingest_load_tag" 564 ) -> dict: 565 """ 566 Load files into a TDR dataset. 567 568 **Args:** 569 - dataset_id (str): The ID of the dataset. 570 - profile_id (str): The billing profile ID. 571 - file_list (list[dict]): The list of files to be ingested. 572 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 573 574 **Returns:** 575 - dict: A dictionary containing the response from the ingest operation job monitoring. 576 """ 577 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array" 578 data = { 579 "profileId": profile_id, 580 "loadTag": f"{load_tag}", 581 "maxFailedFileLoads": 0, 582 "loadArray": file_list 583 } 584 585 response = self.request_util.run_request( 586 uri=uri, 587 method=POST, 588 content_type=APPLICATION_JSON, 589 data=json.dumps(data) 590 ) 591 job_id = response.json()['id'] 592 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 593 return job_results # type: ignore[return-value] 594 595 def get_dataset_table_metrics( 596 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 597 ) -> list[dict]: 598 """ 599 Retrieve all metrics for a specific table within a dataset. 600 601 **Args:** 602 - dataset_id (str): The ID of the dataset. 603 - target_table_name (str): The name of the target table. 604 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 605 606 **Returns:** 607 - list[dict]: A list of dictionaries containing the metrics for the specified table. 608 """ 609 return [ 610 metric 611 for metric in self._yield_dataset_metrics( 612 dataset_id=dataset_id, 613 target_table_name=target_table_name, 614 query_limit=query_limit 615 ) 616 ] 617 618 def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any: 619 """ 620 Yield all entity metrics from a dataset. 621 622 **Args:** 623 dataset_id (str): The ID of the dataset. 624 target_table_name (str): The name of the target table. 625 query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000. 626 627 Yields: 628 Any: A generator yielding dictionaries containing the metrics for the specified table. 629 """ 630 search_request = { 631 "offset": 0, 632 "limit": query_limit, 633 "sort": "datarepo_row_id" 634 } 635 uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}" 636 while True: 637 batch_number = int((search_request["offset"] / query_limit)) + 1 # type: ignore[operator] 638 response = self.request_util.run_request( 639 uri=uri, 640 method=POST, 641 content_type=APPLICATION_JSON, 642 data=json.dumps(search_request) 643 ) 644 if not response or not response.json()["result"]: 645 break 646 logging.info( 647 f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " + 648 f"dataset {dataset_id}" 649 ) 650 for record in response.json()["result"]: 651 yield record 652 search_request["offset"] += query_limit # type: ignore[operator] 653 654 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 655 """ 656 Get existing IDs from a dataset. 657 658 **Args:** 659 - dataset_id (str): The ID of the dataset. 660 - target_table_name (str): The name of the target table. 661 - entity_id (str): The entity ID to retrieve. 662 663 **Returns:** 664 - list[str]: A list of entity IDs from the specified table. 665 """ 666 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 667 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata] 668 669 def get_job_status(self, job_id: str) -> requests.Response: 670 """ 671 Retrieve the status of a job. 672 673 **Args:** 674 - job_id (str): The ID of the job. 675 676 **Returns:** 677 - requests.Response: The response from the request. 678 """ 679 uri = f"{self.tdr_link}/jobs/{job_id}" 680 return self.request_util.run_request(uri=uri, method=GET) 681 682 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 683 """ 684 Get all file UUIDs from the metadata of a dataset. 685 686 **Args:** 687 - dataset_id (str): The ID of the dataset. 688 689 **Returns:** 690 - list[str]: A list of file UUIDs from the dataset metadata. 691 """ 692 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 693 all_metadata_file_uuids = [] 694 tables = 0 695 for table in dataset_info["schema"]["tables"]: 696 tables += 1 697 table_name = table["name"] 698 logging.info(f"Getting all file information for {table_name}") 699 # Get just columns where datatype is fileref 700 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 701 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 702 # Get unique list of file uuids 703 file_uuids = list( 704 set( 705 [ 706 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 707 ] 708 ) 709 ) 710 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 711 all_metadata_file_uuids.extend(file_uuids) 712 # Make full list unique 713 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 714 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 715 return all_metadata_file_uuids 716 717 def soft_delete_entries( 718 self, 719 dataset_id: str, 720 table_name: str, 721 datarepo_row_ids: list[str], 722 check_intervals: int = 15 723 ) -> Optional[dict]: 724 """ 725 Soft delete specific records from a table. 726 727 **Args:** 728 - dataset_id (str): The ID of the dataset. 729 - table_name (str): The name of the target table. 730 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 731 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 732 733 **Returns:** 734 - dict (optional): A dictionary containing the response from the soft delete operation job 735 monitoring. Returns None if no row IDs are provided. 736 """ 737 if not datarepo_row_ids: 738 logging.info(f"No records found to soft delete in table {table_name}") 739 return None 740 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 741 uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes" 742 payload = { 743 "deleteType": "soft", 744 "specType": "jsonArray", 745 "tables": [ 746 { 747 "tableName": table_name, 748 "jsonArraySpec": { 749 "rowIds": datarepo_row_ids 750 } 751 } 752 ] 753 } 754 response = self.request_util.run_request( 755 method=POST, 756 uri=uri, 757 data=json.dumps(payload), 758 content_type=APPLICATION_JSON 759 ) 760 job_id = response.json()["id"] 761 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run() 762 763 def soft_delete_all_table_entries( 764 self, 765 dataset_id: str, 766 table_name: str, 767 query_limit: int = 1000, 768 check_intervals: int = 15 769 ) -> Optional[dict]: 770 """ 771 Soft deletes all records in a table. 772 773 **Args:** 774 - dataset_id (str): The ID of the dataset. 775 - table_name (str): The name of the target table. 776 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 777 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 778 779 **Returns:** 780 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 781 None if no row IDs are provided. 782 """ 783 dataset_metrics = self.get_dataset_table_metrics( 784 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 785 ) 786 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 787 return self.soft_delete_entries( 788 dataset_id=dataset_id, 789 table_name=table_name, 790 datarepo_row_ids=row_ids, 791 check_intervals=check_intervals 792 ) 793 794 def get_or_create_dataset( 795 self, 796 dataset_name: str, 797 billing_profile: str, 798 schema: dict, 799 description: str, 800 relationships: Optional[list[dict]] = None, 801 delete_existing: bool = False, 802 continue_if_exists: bool = False, 803 additional_properties_dict: Optional[dict] = None 804 ) -> str: 805 """ 806 Get or create a dataset. 807 808 **Args:** 809 - dataset_name (str): The name of the dataset. 810 - billing_profile (str): The billing profile ID. 811 - schema (dict): The schema of the dataset. 812 - description (str): The description of the dataset. 813 - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. 814 Defaults to None. 815 - additional_properties_dict (Optional[dict], optional): Additional properties 816 for the dataset. Defaults to None. 817 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 818 Defaults to `False`. 819 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 820 Defaults to `False`. 821 822 **Returns:** 823 - str: The ID of the dataset. 824 825 **Raises:** 826 - ValueError: If multiple datasets with the same name are found under the billing profile. 827 """ 828 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 829 if existing_datasets: 830 if not continue_if_exists: 831 raise ValueError( 832 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 833 ) 834 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 835 if delete_existing: 836 logging.info(f"Deleting existing dataset {dataset_name}") 837 self.delete_dataset(existing_datasets[0]["id"]) 838 existing_datasets = [] 839 # If not delete_existing and continue_if_exists then grab existing datasets id 840 else: 841 dataset_id = existing_datasets[0]["id"] 842 if not existing_datasets: 843 logging.info("Did not find existing dataset") 844 # Create dataset 845 dataset_id = self.create_dataset( 846 schema=schema, 847 dataset_name=dataset_name, 848 description=description, 849 profile_id=billing_profile, 850 additional_dataset_properties=additional_properties_dict 851 ) 852 return dataset_id 853 854 def create_dataset( # type: ignore[return] 855 self, 856 schema: dict, 857 dataset_name: str, 858 description: str, 859 profile_id: str, 860 additional_dataset_properties: Optional[dict] = None 861 ) -> Optional[str]: 862 """ 863 Create a new dataset. 864 865 **Args:** 866 - schema (dict): The schema of the dataset. 867 - dataset_name (str): The name of the dataset. 868 - description (str): The description of the dataset. 869 - profile_id (str): The billing profile ID. 870 - additional_dataset_properties (Optional[dict], optional): Additional 871 properties for the dataset. Defaults to None. 872 873 **Returns:** 874 - Optional[str]: The ID of the created dataset, or None if creation failed. 875 876 Raises: 877 - ValueError: If the schema validation fails. 878 """ 879 dataset_properties = { 880 "name": dataset_name, 881 "description": description, 882 "defaultProfileId": profile_id, 883 "region": "us-central1", 884 "cloudPlatform": GCP, 885 "schema": schema 886 } 887 888 if additional_dataset_properties: 889 dataset_properties.update(additional_dataset_properties) 890 try: 891 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 892 except ValidationError as e: 893 raise ValueError(f"Schema validation error: {e}") 894 uri = f"{self.tdr_link}/datasets" 895 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 896 response = self.request_util.run_request( 897 method=POST, 898 uri=uri, 899 data=json.dumps(dataset_properties), 900 content_type=APPLICATION_JSON 901 ) 902 job_id = response.json()["id"] 903 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 904 dataset_id = job_results["id"] # type: ignore[index] 905 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 906 return dataset_id 907 908 def update_dataset_schema( # type: ignore[return] 909 self, 910 dataset_id: str, 911 update_note: str, 912 tables_to_add: Optional[list[dict]] = None, 913 relationships_to_add: Optional[list[dict]] = None, 914 columns_to_add: Optional[list[dict]] = None 915 ) -> Optional[str]: 916 """ 917 Update the schema of a dataset. 918 919 **Args:** 920 - dataset_id (str): The ID of the dataset. 921 - update_note (str): A note describing the update. 922 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 923 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 924 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 925 926 **Returns:** 927 - Optional[str]: The ID of the updated dataset, or None if the update failed. 928 929 **Raises:** 930 - ValueError: If the schema validation fails. 931 """ 932 uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema" 933 request_body: dict = {"description": f"{update_note}", "changes": {}} 934 if tables_to_add: 935 request_body["changes"]["addTables"] = tables_to_add 936 if relationships_to_add: 937 request_body["changes"]["addRelationships"] = relationships_to_add 938 if columns_to_add: 939 request_body["changes"]["addColumns"] = columns_to_add 940 try: 941 UpdateSchema(**request_body) 942 except ValidationError as e: 943 raise ValueError(f"Schema validation error: {e}") 944 945 response = self.request_util.run_request( 946 uri=uri, 947 method=POST, 948 content_type=APPLICATION_JSON, 949 data=json.dumps(request_body) 950 ) 951 job_id = response.json()["id"] 952 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 953 dataset_id = job_results["id"] # type: ignore[index] 954 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 955 return dataset_id 956 957 def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]: 958 """ 959 Get response from a batched endpoint. 960 961 Helper method for all GET endpoints that require batching. 962 963 Given the URI and the limit (optional), will 964 loop through batches until all metadata is retrieved. 965 966 **Args:** 967 - uri (str): The base URI for the endpoint (without query params for offset or limit). 968 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 969 970 **Returns:** 971 - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint. 972 """ 973 batch = 1 974 offset = 0 975 metadata: list = [] 976 while True: 977 logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata") 978 response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json() 979 980 # If no more files, break the loop 981 if not response_json: 982 logging.info(f"No more results to retrieve, found {len(metadata)} total records") 983 break 984 985 metadata.extend(response_json) 986 if len(response_json) < limit: 987 logging.info(f"Retrieved final batch of results, found {len(metadata)} total records") 988 break 989 990 # Increment the offset by limit for the next page 991 offset += limit 992 batch += 1 993 return metadata 994 995 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 996 """ 997 Return all the metadata about files in a given snapshot. 998 999 Not all files can be returned at once, so the API 1000 is used repeatedly until all "batches" have been returned. 1001 1002 **Args:** 1003 - snapshot_id (str): The ID of the snapshot. 1004 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 1005 1006 **Returns:** 1007 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 1008 """ 1009 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files" 1010 return self._get_response_from_batched_endpoint(uri=uri, limit=limit) 1011 1012 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 1013 """ 1014 Return snapshots belonging to specified dataset. 1015 1016 **Args:** 1017 - dataset_id: uuid of dataset to query. 1018 1019 **Returns:** 1020 - requests.Response: The response from the request. 1021 """ 1022 uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}" 1023 return self.request_util.run_request( 1024 uri=uri, 1025 method=GET 1026 ) 1027 1028 def create_snapshot( 1029 self, 1030 snapshot_name: str, 1031 description: str, 1032 dataset_name: str, 1033 snapshot_mode: str, # byFullView is entire dataset 1034 profile_id: str, 1035 stewards: Optional[list[str]] = [], 1036 readers: Optional[list[str]] = [], 1037 consent_code: Optional[str] = None, 1038 duos_id: Optional[str] = None, 1039 data_access_control_groups: Optional[list[str]] = None, 1040 ) -> None: 1041 """ 1042 Create a snapshot in TDR. 1043 1044 **Returns:** 1045 - requests.Response: The response from the request. 1046 """ 1047 uri = f"{self.tdr_link}/snapshots" 1048 payload = { 1049 "name": snapshot_name, 1050 "description": description, 1051 "contents": [ 1052 { 1053 "datasetName": dataset_name, 1054 "mode": snapshot_mode, 1055 } 1056 ], 1057 "policies": { 1058 "stewards": stewards, 1059 "readers": readers, 1060 }, 1061 "profileId": profile_id, 1062 "globalFileIds": True, 1063 } 1064 if consent_code: 1065 payload["consentCode"] = consent_code 1066 if duos_id: 1067 payload["duosId"] = duos_id 1068 if data_access_control_groups: 1069 payload["dataAccessControlGroups"] = data_access_control_groups 1070 logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}") 1071 response = self.request_util.run_request( 1072 uri=uri, 1073 method=POST, 1074 content_type=APPLICATION_JSON, 1075 data=json.dumps(payload) 1076 ) 1077 job_id = response.json()["id"] 1078 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 1079 snapshot_id = job_results["id"] # type: ignore[index] 1080 logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")
Class to interact with the Terra Data Repository (TDR) API.
24 def __init__(self, request_util: RunRequest, env: str = 'prod', dry_run: bool = False): 25 """ 26 Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API). 27 28 **Args:** 29 - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests. 30 """ 31 self.request_util = request_util 32 # NOTE: dry_run is not fully implemented in this class, only in delete_files_and_snapshots 33 self.dry_run = dry_run 34 if env.lower() == 'prod': 35 self.tdr_link = self.PROD_LINK 36 elif env.lower() == 'dev': 37 self.tdr_link = self.DEV_LINK 38 else: 39 raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.") 40 """@private"""
Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
Args:
- request_util (
ops_utils.request_util.RunRequest
): Utility for making HTTP requests.
(str): The base URL for the TDR API.
56 def get_dataset_files( 57 self, 58 dataset_id: str, 59 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 60 ) -> list[dict]: 61 """ 62 Get all files in a dataset. 63 64 Returns json like below 65 66 { 67 "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr", 68 "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7", 69 "path": "/path/set/in/ingest.csv", 70 "size": 1722, 71 "checksums": [ 72 { 73 "checksum": "82f7e79v", 74 "type": "crc32c" 75 }, 76 { 77 "checksum": "fff973507e30b74fa47a3d6830b84a90", 78 "type": "md5" 79 } 80 ], 81 "created": "2024-13-11T15:01:00.256Z", 82 "description": null, 83 "fileType": "file", 84 "fileDetail": { 85 "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444", 86 "mimeType": null, 87 "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv", 88 "loadTag": "RP_3333-RP_3333" 89 }, 90 "directoryDetail": null 91 } 92 93 **Args:** 94 - dataset_id (str): The ID of the dataset. 95 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 96 97 **Returns:** 98 - list[dict]: A list of dictionaries containing the metadata of the files in the dataset. 99 """ 100 uri = f"{self.tdr_link}/datasets/{dataset_id}/files" 101 logging.info(f"Getting all files in dataset {dataset_id}") 102 return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
Get all files in a dataset.
Returns json like below
{
"fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
"collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
"path": "/path/set/in/ingest.csv",
"size": 1722,
"checksums": [
{
"checksum": "82f7e79v",
"type": "crc32c"
},
{
"checksum": "fff973507e30b74fa47a3d6830b84a90",
"type": "md5"
}
],
"created": "2024-13-11T15:01:00.256Z",
"description": null,
"fileType": "file",
"fileDetail": {
"datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
"mimeType": null,
"accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
"loadTag": "RP_3333-RP_3333"
},
"directoryDetail": null
}
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
104 def create_file_dict( 105 self, 106 dataset_id: str, 107 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 108 ) -> dict: 109 """ 110 Create a dictionary of all files in a dataset where the key is the file UUID. 111 112 **Args:** 113 - dataset_id (str): The ID of the dataset. 114 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 115 116 **Returns:** 117 - dict: A dictionary where the key is the file UUID and the value is the file metadata. 118 """ 119 return { 120 file_dict["fileId"]: file_dict 121 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 122 }
Create a dictionary of all files in a dataset where the key is the file UUID.
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- dict: A dictionary where the key is the file UUID and the value is the file metadata.
124 def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset( 125 self, 126 dataset_id: str, 127 limit: int = ARG_DEFAULTS['batch_size_to_list_files'] # type: ignore[assignment] 128 ) -> dict: 129 """ 130 Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID. 131 132 This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out. 133 134 This will ONLY work if dataset was created with `experimentalSelfHosted = True` 135 136 **Args:** 137 - dataset_id (str): The ID of the dataset. 138 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`. 139 140 **Returns:** 141 - dict: A dictionary where the key is the file UUID and the value is the file path. 142 """ 143 return { 144 file_dict['fileDetail']['accessUrl']: file_dict['fileId'] 145 for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit) 146 }
Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
This assumes that the TDR 'path' is original path of the file in the cloud storage with gs://
stripped out.
This will ONLY work if dataset was created with experimentalSelfHosted = True
Args:
- dataset_id (str): The ID of the dataset.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
20000
.
Returns:
- dict: A dictionary where the key is the file UUID and the value is the file path.
148 def delete_file(self, file_id: str, dataset_id: str) -> requests.Response: 149 """ 150 Delete a file from a dataset. 151 152 **Args:** 153 - file_id (str): The ID of the file to be deleted. 154 - dataset_id (str): The ID of the dataset. 155 156 **Returns:** 157 - requests.Response: The response from the request. 158 """ 159 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}" 160 logging.info(f"Submitting delete job for file {file_id}") 161 return self.request_util.run_request(uri=uri, method=DELETE)
Delete a file from a dataset.
Args:
- file_id (str): The ID of the file to be deleted.
- dataset_id (str): The ID of the dataset.
Returns:
- requests.Response: The response from the request.
163 def delete_files( 164 self, 165 file_ids: list[str], 166 dataset_id: str, 167 batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"], # type: ignore[assignment] 168 check_interval: int = 15) -> None: 169 """ 170 Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch. 171 172 **Args:** 173 - file_ids (list[str]): A list of file IDs to be deleted. 174 - dataset_id (str): The ID of the dataset. 175 - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`. 176 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 177 """ 178 SubmitAndMonitorMultipleJobs( 179 tdr=self, 180 job_function=self.delete_file, 181 job_args_list=[(file_id, dataset_id) for file_id in file_ids], 182 batch_size=batch_size_to_delete_files, 183 check_interval=check_interval 184 ).run()
Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
Args:
- file_ids (list[str]): A list of file IDs to be deleted.
- dataset_id (str): The ID of the dataset.
- batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to
200
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
214 def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None: 215 """Delete files from a dataset by their IDs, handling snapshots.""" 216 self._delete_snapshots_for_files(dataset_id=dataset_id, file_ids=file_ids) 217 218 logging.info( 219 f"{self._dry_run_msg()}Submitting delete request for {len(file_ids)} files in " 220 f"dataset {dataset_id}") 221 if not self.dry_run: 222 self.delete_files( 223 file_ids=list(file_ids), 224 dataset_id=dataset_id 225 )
Delete files from a dataset by their IDs, handling snapshots.
227 def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 228 """ 229 Add a user to a dataset with a specified policy. 230 231 **Args:** 232 - dataset_id (str): The ID of the dataset. 233 - user (str): The email of the user to be added. 234 - policy (str): The policy to be assigned to the user. 235 Must be one of `steward`, `custodian`, or `snapshot_creator`. 236 237 **Returns:** 238 - requests.Response: The response from the request. 239 240 **Raises:** 241 - ValueError: If the policy is not valid. 242 """ 243 self._check_policy(policy) 244 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members" 245 member_dict = {"email": user} 246 logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}") 247 return self.request_util.run_request( 248 uri=uri, 249 method=POST, 250 data=json.dumps(member_dict), 251 content_type=APPLICATION_JSON 252 )
Add a user to a dataset with a specified policy.
Args:
- dataset_id (str): The ID of the dataset.
- user (str): The email of the user to be added.
- policy (str): The policy to be assigned to the user.
Must be one of
steward
,custodian
, orsnapshot_creator
.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If the policy is not valid.
254 def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response: 255 """ 256 Remove a user from a dataset. 257 258 **Args:** 259 - dataset_id (str): The ID of the dataset. 260 - user (str): The email of the user to be removed. 261 - policy (str): The policy to be removed from the user. 262 Must be one of `steward`, `custodian`, or `snapshot_creator`. 263 264 **Returns:** 265 - requests.Response: The response from the request. 266 267 **Raises:** 268 - ValueError: If the policy is not valid. 269 """ 270 self._check_policy(policy) 271 uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}" 272 logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}") 273 return self.request_util.run_request(uri=uri, method=DELETE)
Remove a user from a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- user (str): The email of the user to be removed.
- policy (str): The policy to be removed from the user.
Must be one of
steward
,custodian
, orsnapshot_creator
.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If the policy is not valid.
275 def delete_dataset(self, dataset_id: str) -> None: 276 """ 277 Delete a dataset and monitors the job until completion. 278 279 **Args:** 280 dataset_id (str): The ID of the dataset to be deleted. 281 """ 282 uri = f"{self.tdr_link}/datasets/{dataset_id}" 283 logging.info(f"Deleting dataset {dataset_id}") 284 response = self.request_util.run_request(uri=uri, method=DELETE) 285 job_id = response.json()['id'] 286 MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
Delete a dataset and monitors the job until completion.
Args: dataset_id (str): The ID of the dataset to be deleted.
288 def make_snapshot_public(self, snapshot_id: str) -> requests.Response: 289 """ 290 Make a snapshot public. 291 292 **Args:** 293 - snapshot_id (str): The ID of the snapshot to be made public. 294 295 **Returns:** 296 - requests.Response: The response from the request. 297 """ 298 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public" 299 logging.info(f"Making snapshot {snapshot_id} public") 300 return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")
Make a snapshot public.
Args:
- snapshot_id (str): The ID of the snapshot to be made public.
Returns:
- requests.Response: The response from the request.
302 def get_snapshot_info( 303 self, 304 snapshot_id: str, 305 continue_not_found: bool = False, 306 info_to_include: Optional[list[str]] = None 307 ) -> Optional[requests.Response]: 308 """ 309 Get information about a snapshot. 310 311 **Args:** 312 - snapshot_id (str): The ID of the snapshot. 313 - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`. 314 - info_to_include (list[str], optional): A list of additional information to include. Defaults to None. 315 Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, 316 `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS` 317 318 **Returns:** 319 - requests.Response (optional): The response from the request (returns None if the snapshot is not 320 found or access is denied). 321 """ 322 acceptable_return_code = [404, 403] if continue_not_found else [] 323 acceptable_include_info = [ 324 "SOURCES", 325 "TABLES", 326 "RELATIONSHIPS", 327 "ACCESS_INFORMATION", 328 "PROFILE", 329 "PROPERTIES", 330 "DATA_PROJECT", 331 "CREATION_INFORMATION", 332 "DUOS" 333 ] 334 if info_to_include: 335 if not all(info in acceptable_include_info for info in info_to_include): 336 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 337 include_string = '&include='.join(info_to_include) 338 else: 339 include_string = "" 340 uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}" 341 response = self.request_util.run_request( 342 uri=uri, 343 method=GET, 344 accept_return_codes=acceptable_return_code 345 ) 346 if response.status_code == 404: 347 logging.warning(f"Snapshot {snapshot_id} not found") 348 return None 349 if response.status_code == 403: 350 logging.warning(f"Access denied for snapshot {snapshot_id}") 351 return None 352 return response
Get information about a snapshot.
Args:
- snapshot_id (str): The ID of the snapshot.
- continue_not_found (bool, optional): Whether to accept a
404
response. Defaults toFalse
. - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
Options are:
SOURCES
,TABLES
,RELATIONSHIPS
,ACCESS_INFORMATION
,PROFILE
,PROPERTIES
,DATA_PROJECT
,CREATION_INFORMATION
,DUOS
Returns:
- requests.Response (optional): The response from the request (returns None if the snapshot is not found or access is denied).
354 def delete_snapshots( 355 self, 356 snapshot_ids: list[str], 357 batch_size: int = 25, 358 check_interval: int = 10, 359 verbose: bool = False) -> None: 360 """ 361 Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch. 362 363 **Args:** 364 - snapshot_ids (list[str]): A list of snapshot IDs to be deleted. 365 - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`. 366 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`. 367 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 368 """ 369 logging.info(f"{self._dry_run_msg()}Deleting {len(snapshot_ids)} snapshots") 370 if not self.dry_run: 371 SubmitAndMonitorMultipleJobs( 372 tdr=self, 373 job_function=self.delete_snapshot, 374 job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids], 375 batch_size=batch_size, 376 check_interval=check_interval, 377 verbose=verbose 378 ).run()
Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
Args:
- snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
- batch_size (int, optional): The number of snapshots to delete per batch. Defaults to
25
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
10
. - verbose (bool, optional): Whether to log detailed information about each job. Defaults to
False
.
380 def delete_snapshot(self, snapshot_id: str) -> requests.Response: 381 """ 382 Delete a snapshot. 383 384 **Args:** 385 - snapshot_id (str): The ID of the snapshot to be deleted. 386 387 **Returns:** 388 - requests.Response: The response from the request. 389 """ 390 uri = f"{self.tdr_link}/snapshots/{snapshot_id}" 391 logging.info(f"Deleting snapshot {snapshot_id}") 392 return self.request_util.run_request(uri=uri, method=DELETE)
Delete a snapshot.
Args:
- snapshot_id (str): The ID of the snapshot to be deleted.
Returns:
- requests.Response: The response from the request.
427 def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]: 428 """ 429 Check if a dataset exists by name and optionally by billing profile. 430 431 **Args:** 432 - dataset_name (str): The name of the dataset to check. 433 - billing_profile (str, optional): The billing profile ID to match. Defaults to None. 434 435 **Returns:** 436 - list[dict]: A list of matching datasets. 437 """ 438 matching_datasets = [] 439 for dataset in self._yield_existing_datasets(filter=dataset_name): 440 # Search uses wildcard so could grab more datasets where dataset_name is substring 441 if dataset_name == dataset["name"]: 442 if billing_profile: 443 if dataset["defaultProfileId"] == billing_profile: 444 logging.info( 445 f"Dataset {dataset['name']} already exists under billing profile {billing_profile}") 446 dataset_id = dataset["id"] 447 logging.info(f"Dataset ID: {dataset_id}") 448 matching_datasets.append(dataset) 449 else: 450 logging.warning( 451 f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " + 452 f"and not under billing profile {billing_profile}" 453 ) 454 # Datasets names need to be unique regardless of billing profile, so raise an error if 455 # a dataset with the same name is found but is not under the requested billing profile 456 raise ValueError( 457 f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}") 458 else: 459 matching_datasets.append(dataset) 460 return matching_datasets
Check if a dataset exists by name and optionally by billing profile.
Args:
- dataset_name (str): The name of the dataset to check.
- billing_profile (str, optional): The billing profile ID to match. Defaults to None.
Returns:
- list[dict]: A list of matching datasets.
462 def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response: 463 """ 464 Get information about a dataset. 465 466 **Args:** 467 - dataset_id (str): The ID of the dataset. 468 - info_to_include (list[str], optional): A list of additional information to include. Valid options include: 469 `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`. 470 Defaults to None. 471 472 **Returns:** 473 - requests.Response: The response from the request. 474 475 **Raises:** 476 - ValueError: If `info_to_include` contains invalid information types. 477 """ 478 acceptable_include_info = [ 479 "SCHEMA", 480 "ACCESS_INFORMATION", 481 "PROFILE", 482 "PROPERTIES", 483 "DATA_PROJECT", 484 "STORAGE", 485 "SNAPSHOT_BUILDER_SETTING" 486 ] 487 if info_to_include: 488 if not all(info in acceptable_include_info for info in info_to_include): 489 raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}") 490 include_string = '&include='.join(info_to_include) 491 else: 492 include_string = "" 493 uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}" 494 return self.request_util.run_request(uri=uri, method=GET)
Get information about a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- info_to_include (list[str], optional): A list of additional information to include. Valid options include:
SCHEMA
,ACCESS_INFORMATION
,PROFILE
,PROPERTIES
,DATA_PROJECT
,STORAGE
,SNAPSHOT_BUILDER_SETTING
. Defaults to None.
Returns:
- requests.Response: The response from the request.
Raises:
- ValueError: If
info_to_include
contains invalid information types.
496 def get_table_schema_info( 497 self, 498 dataset_id: str, 499 table_name: str, 500 dataset_info: Optional[dict] = None 501 ) -> Union[dict, None]: 502 """ 503 Get schema information for a specific table within a dataset. 504 505 **Args:** 506 - dataset_id (str): The ID of the dataset. 507 - table_name (str): The name of the table. 508 - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None. 509 510 **Returns:** 511 - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found. 512 """ 513 if not dataset_info: 514 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 515 for table in dataset_info["schema"]["tables"]: # type: ignore[index] 516 if table["name"] == table_name: 517 return table 518 return None
Get schema information for a specific table within a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the table.
- dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
Returns:
- Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
520 def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response: 521 """ 522 Retrieve the result of a job. 523 524 **Args:** 525 - job_id (str): The ID of the job. 526 - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`. 527 528 **Returns:** 529 - requests.Response: The response from the request. 530 """ 531 uri = f"{self.tdr_link}/jobs/{job_id}/result" 532 # If job is expected to fail, accept any return code 533 acceptable_return_code = list(range(100, 600)) if expect_failure else [] 534 return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
Retrieve the result of a job.
Args:
- job_id (str): The ID of the job.
- expect_failure (bool, optional): Whether the job is expected to fail. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
536 def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response: 537 """ 538 Load data into a TDR dataset. 539 540 **Args:** 541 - dataset_id (str): The ID of the dataset. 542 - data (dict): The data to be ingested. 543 544 **Returns:** 545 - requests.Response: The response from the request. 546 """ 547 uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest" 548 logging.info( 549 "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " + 550 "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.") 551 return self.request_util.run_request( 552 uri=uri, 553 method=POST, 554 content_type=APPLICATION_JSON, 555 data=data 556 )
Load data into a TDR dataset.
Args:
- dataset_id (str): The ID of the dataset.
- data (dict): The data to be ingested.
Returns:
- requests.Response: The response from the request.
558 def file_ingest_to_dataset( 559 self, 560 dataset_id: str, 561 profile_id: str, 562 file_list: list[dict], 563 load_tag: str = "file_ingest_load_tag" 564 ) -> dict: 565 """ 566 Load files into a TDR dataset. 567 568 **Args:** 569 - dataset_id (str): The ID of the dataset. 570 - profile_id (str): The billing profile ID. 571 - file_list (list[dict]): The list of files to be ingested. 572 - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`. 573 574 **Returns:** 575 - dict: A dictionary containing the response from the ingest operation job monitoring. 576 """ 577 uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array" 578 data = { 579 "profileId": profile_id, 580 "loadTag": f"{load_tag}", 581 "maxFailedFileLoads": 0, 582 "loadArray": file_list 583 } 584 585 response = self.request_util.run_request( 586 uri=uri, 587 method=POST, 588 content_type=APPLICATION_JSON, 589 data=json.dumps(data) 590 ) 591 job_id = response.json()['id'] 592 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 593 return job_results # type: ignore[return-value]
Load files into a TDR dataset.
Args:
- dataset_id (str): The ID of the dataset.
- profile_id (str): The billing profile ID.
- file_list (list[dict]): The list of files to be ingested.
- load_tag (str): The tag to be used in the ingest job. Defaults to
file_ingest_load_tag
.
Returns:
- dict: A dictionary containing the response from the ingest operation job monitoring.
595 def get_dataset_table_metrics( 596 self, dataset_id: str, target_table_name: str, query_limit: int = 1000 597 ) -> list[dict]: 598 """ 599 Retrieve all metrics for a specific table within a dataset. 600 601 **Args:** 602 - dataset_id (str): The ID of the dataset. 603 - target_table_name (str): The name of the target table. 604 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 605 606 **Returns:** 607 - list[dict]: A list of dictionaries containing the metrics for the specified table. 608 """ 609 return [ 610 metric 611 for metric in self._yield_dataset_metrics( 612 dataset_id=dataset_id, 613 target_table_name=target_table_name, 614 query_limit=query_limit 615 ) 616 ]
Retrieve all metrics for a specific table within a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- target_table_name (str): The name of the target table.
- query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
.
Returns:
- list[dict]: A list of dictionaries containing the metrics for the specified table.
654 def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]: 655 """ 656 Get existing IDs from a dataset. 657 658 **Args:** 659 - dataset_id (str): The ID of the dataset. 660 - target_table_name (str): The name of the target table. 661 - entity_id (str): The entity ID to retrieve. 662 663 **Returns:** 664 - list[str]: A list of entity IDs from the specified table. 665 """ 666 dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name) 667 return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
Get existing IDs from a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- target_table_name (str): The name of the target table.
- entity_id (str): The entity ID to retrieve.
Returns:
- list[str]: A list of entity IDs from the specified table.
669 def get_job_status(self, job_id: str) -> requests.Response: 670 """ 671 Retrieve the status of a job. 672 673 **Args:** 674 - job_id (str): The ID of the job. 675 676 **Returns:** 677 - requests.Response: The response from the request. 678 """ 679 uri = f"{self.tdr_link}/jobs/{job_id}" 680 return self.request_util.run_request(uri=uri, method=GET)
Retrieve the status of a job.
Args:
- job_id (str): The ID of the job.
Returns:
- requests.Response: The response from the request.
682 def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]: 683 """ 684 Get all file UUIDs from the metadata of a dataset. 685 686 **Args:** 687 - dataset_id (str): The ID of the dataset. 688 689 **Returns:** 690 - list[str]: A list of file UUIDs from the dataset metadata. 691 """ 692 dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json() 693 all_metadata_file_uuids = [] 694 tables = 0 695 for table in dataset_info["schema"]["tables"]: 696 tables += 1 697 table_name = table["name"] 698 logging.info(f"Getting all file information for {table_name}") 699 # Get just columns where datatype is fileref 700 file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"] 701 dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name) 702 # Get unique list of file uuids 703 file_uuids = list( 704 set( 705 [ 706 value for metric in dataset_metrics for key, value in metric.items() if key in file_columns 707 ] 708 ) 709 ) 710 logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'") 711 all_metadata_file_uuids.extend(file_uuids) 712 # Make full list unique 713 all_metadata_file_uuids = list(set(all_metadata_file_uuids)) 714 logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)") 715 return all_metadata_file_uuids
Get all file UUIDs from the metadata of a dataset.
Args:
- dataset_id (str): The ID of the dataset.
Returns:
- list[str]: A list of file UUIDs from the dataset metadata.
717 def soft_delete_entries( 718 self, 719 dataset_id: str, 720 table_name: str, 721 datarepo_row_ids: list[str], 722 check_intervals: int = 15 723 ) -> Optional[dict]: 724 """ 725 Soft delete specific records from a table. 726 727 **Args:** 728 - dataset_id (str): The ID of the dataset. 729 - table_name (str): The name of the target table. 730 - datarepo_row_ids (list[str]): A list of row IDs to be deleted. 731 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 732 733 **Returns:** 734 - dict (optional): A dictionary containing the response from the soft delete operation job 735 monitoring. Returns None if no row IDs are provided. 736 """ 737 if not datarepo_row_ids: 738 logging.info(f"No records found to soft delete in table {table_name}") 739 return None 740 logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}") 741 uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes" 742 payload = { 743 "deleteType": "soft", 744 "specType": "jsonArray", 745 "tables": [ 746 { 747 "tableName": table_name, 748 "jsonArraySpec": { 749 "rowIds": datarepo_row_ids 750 } 751 } 752 ] 753 } 754 response = self.request_util.run_request( 755 method=POST, 756 uri=uri, 757 data=json.dumps(payload), 758 content_type=APPLICATION_JSON 759 ) 760 job_id = response.json()["id"] 761 return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
Soft delete specific records from a table.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the target table.
- datarepo_row_ids (list[str]): A list of row IDs to be deleted.
- check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
Returns:
- dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
763 def soft_delete_all_table_entries( 764 self, 765 dataset_id: str, 766 table_name: str, 767 query_limit: int = 1000, 768 check_intervals: int = 15 769 ) -> Optional[dict]: 770 """ 771 Soft deletes all records in a table. 772 773 **Args:** 774 - dataset_id (str): The ID of the dataset. 775 - table_name (str): The name of the target table. 776 - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 777 - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`. 778 779 **Returns:** 780 - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns 781 None if no row IDs are provided. 782 """ 783 dataset_metrics = self.get_dataset_table_metrics( 784 dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit 785 ) 786 row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics] 787 return self.soft_delete_entries( 788 dataset_id=dataset_id, 789 table_name=table_name, 790 datarepo_row_ids=row_ids, 791 check_intervals=check_intervals 792 )
Soft deletes all records in a table.
Args:
- dataset_id (str): The ID of the dataset.
- table_name (str): The name of the target table.
- query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
. - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to
15
.
Returns:
- dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
794 def get_or_create_dataset( 795 self, 796 dataset_name: str, 797 billing_profile: str, 798 schema: dict, 799 description: str, 800 relationships: Optional[list[dict]] = None, 801 delete_existing: bool = False, 802 continue_if_exists: bool = False, 803 additional_properties_dict: Optional[dict] = None 804 ) -> str: 805 """ 806 Get or create a dataset. 807 808 **Args:** 809 - dataset_name (str): The name of the dataset. 810 - billing_profile (str): The billing profile ID. 811 - schema (dict): The schema of the dataset. 812 - description (str): The description of the dataset. 813 - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. 814 Defaults to None. 815 - additional_properties_dict (Optional[dict], optional): Additional properties 816 for the dataset. Defaults to None. 817 - delete_existing (bool, optional): Whether to delete the existing dataset if found. 818 Defaults to `False`. 819 - continue_if_exists (bool, optional): Whether to continue if the dataset already exists. 820 Defaults to `False`. 821 822 **Returns:** 823 - str: The ID of the dataset. 824 825 **Raises:** 826 - ValueError: If multiple datasets with the same name are found under the billing profile. 827 """ 828 existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile) 829 if existing_datasets: 830 if not continue_if_exists: 831 raise ValueError( 832 f"Run with continue_if_exists=True to use the existing dataset {dataset_name}" 833 ) 834 # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list 835 if delete_existing: 836 logging.info(f"Deleting existing dataset {dataset_name}") 837 self.delete_dataset(existing_datasets[0]["id"]) 838 existing_datasets = [] 839 # If not delete_existing and continue_if_exists then grab existing datasets id 840 else: 841 dataset_id = existing_datasets[0]["id"] 842 if not existing_datasets: 843 logging.info("Did not find existing dataset") 844 # Create dataset 845 dataset_id = self.create_dataset( 846 schema=schema, 847 dataset_name=dataset_name, 848 description=description, 849 profile_id=billing_profile, 850 additional_dataset_properties=additional_properties_dict 851 ) 852 return dataset_id
Get or create a dataset.
Args:
- dataset_name (str): The name of the dataset.
- billing_profile (str): The billing profile ID.
- schema (dict): The schema of the dataset.
- description (str): The description of the dataset.
- relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. Defaults to None.
- additional_properties_dict (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
- delete_existing (bool, optional): Whether to delete the existing dataset if found.
Defaults to
False
. - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
Defaults to
False
.
Returns:
- str: The ID of the dataset.
Raises:
- ValueError: If multiple datasets with the same name are found under the billing profile.
854 def create_dataset( # type: ignore[return] 855 self, 856 schema: dict, 857 dataset_name: str, 858 description: str, 859 profile_id: str, 860 additional_dataset_properties: Optional[dict] = None 861 ) -> Optional[str]: 862 """ 863 Create a new dataset. 864 865 **Args:** 866 - schema (dict): The schema of the dataset. 867 - dataset_name (str): The name of the dataset. 868 - description (str): The description of the dataset. 869 - profile_id (str): The billing profile ID. 870 - additional_dataset_properties (Optional[dict], optional): Additional 871 properties for the dataset. Defaults to None. 872 873 **Returns:** 874 - Optional[str]: The ID of the created dataset, or None if creation failed. 875 876 Raises: 877 - ValueError: If the schema validation fails. 878 """ 879 dataset_properties = { 880 "name": dataset_name, 881 "description": description, 882 "defaultProfileId": profile_id, 883 "region": "us-central1", 884 "cloudPlatform": GCP, 885 "schema": schema 886 } 887 888 if additional_dataset_properties: 889 dataset_properties.update(additional_dataset_properties) 890 try: 891 CreateDatasetSchema(**dataset_properties) # type: ignore[arg-type] 892 except ValidationError as e: 893 raise ValueError(f"Schema validation error: {e}") 894 uri = f"{self.tdr_link}/datasets" 895 logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}") 896 response = self.request_util.run_request( 897 method=POST, 898 uri=uri, 899 data=json.dumps(dataset_properties), 900 content_type=APPLICATION_JSON 901 ) 902 job_id = response.json()["id"] 903 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 904 dataset_id = job_results["id"] # type: ignore[index] 905 logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}") 906 return dataset_id
Create a new dataset.
Args:
- schema (dict): The schema of the dataset.
- dataset_name (str): The name of the dataset.
- description (str): The description of the dataset.
- profile_id (str): The billing profile ID.
- additional_dataset_properties (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
Returns:
- Optional[str]: The ID of the created dataset, or None if creation failed.
Raises:
- ValueError: If the schema validation fails.
908 def update_dataset_schema( # type: ignore[return] 909 self, 910 dataset_id: str, 911 update_note: str, 912 tables_to_add: Optional[list[dict]] = None, 913 relationships_to_add: Optional[list[dict]] = None, 914 columns_to_add: Optional[list[dict]] = None 915 ) -> Optional[str]: 916 """ 917 Update the schema of a dataset. 918 919 **Args:** 920 - dataset_id (str): The ID of the dataset. 921 - update_note (str): A note describing the update. 922 - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None. 923 - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None. 924 - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None. 925 926 **Returns:** 927 - Optional[str]: The ID of the updated dataset, or None if the update failed. 928 929 **Raises:** 930 - ValueError: If the schema validation fails. 931 """ 932 uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema" 933 request_body: dict = {"description": f"{update_note}", "changes": {}} 934 if tables_to_add: 935 request_body["changes"]["addTables"] = tables_to_add 936 if relationships_to_add: 937 request_body["changes"]["addRelationships"] = relationships_to_add 938 if columns_to_add: 939 request_body["changes"]["addColumns"] = columns_to_add 940 try: 941 UpdateSchema(**request_body) 942 except ValidationError as e: 943 raise ValueError(f"Schema validation error: {e}") 944 945 response = self.request_util.run_request( 946 uri=uri, 947 method=POST, 948 content_type=APPLICATION_JSON, 949 data=json.dumps(request_body) 950 ) 951 job_id = response.json()["id"] 952 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 953 dataset_id = job_results["id"] # type: ignore[index] 954 logging.info(f"Successfully ran schema updates in dataset {dataset_id}") 955 return dataset_id
Update the schema of a dataset.
Args:
- dataset_id (str): The ID of the dataset.
- update_note (str): A note describing the update.
- tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
- relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
- columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
Returns:
- Optional[str]: The ID of the updated dataset, or None if the update failed.
Raises:
- ValueError: If the schema validation fails.
995 def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]: 996 """ 997 Return all the metadata about files in a given snapshot. 998 999 Not all files can be returned at once, so the API 1000 is used repeatedly until all "batches" have been returned. 1001 1002 **Args:** 1003 - snapshot_id (str): The ID of the snapshot. 1004 - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`. 1005 1006 **Returns:** 1007 - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot. 1008 """ 1009 uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files" 1010 return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
Return all the metadata about files in a given snapshot.
Not all files can be returned at once, so the API is used repeatedly until all "batches" have been returned.
Args:
- snapshot_id (str): The ID of the snapshot.
- limit (int, optional): The maximum number of records to retrieve per batch. Defaults to
1000
.
Returns:
- list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
1012 def get_dataset_snapshots(self, dataset_id: str) -> requests.Response: 1013 """ 1014 Return snapshots belonging to specified dataset. 1015 1016 **Args:** 1017 - dataset_id: uuid of dataset to query. 1018 1019 **Returns:** 1020 - requests.Response: The response from the request. 1021 """ 1022 uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}" 1023 return self.request_util.run_request( 1024 uri=uri, 1025 method=GET 1026 )
Return snapshots belonging to specified dataset.
Args:
- dataset_id: uuid of dataset to query.
Returns:
- requests.Response: The response from the request.
1028 def create_snapshot( 1029 self, 1030 snapshot_name: str, 1031 description: str, 1032 dataset_name: str, 1033 snapshot_mode: str, # byFullView is entire dataset 1034 profile_id: str, 1035 stewards: Optional[list[str]] = [], 1036 readers: Optional[list[str]] = [], 1037 consent_code: Optional[str] = None, 1038 duos_id: Optional[str] = None, 1039 data_access_control_groups: Optional[list[str]] = None, 1040 ) -> None: 1041 """ 1042 Create a snapshot in TDR. 1043 1044 **Returns:** 1045 - requests.Response: The response from the request. 1046 """ 1047 uri = f"{self.tdr_link}/snapshots" 1048 payload = { 1049 "name": snapshot_name, 1050 "description": description, 1051 "contents": [ 1052 { 1053 "datasetName": dataset_name, 1054 "mode": snapshot_mode, 1055 } 1056 ], 1057 "policies": { 1058 "stewards": stewards, 1059 "readers": readers, 1060 }, 1061 "profileId": profile_id, 1062 "globalFileIds": True, 1063 } 1064 if consent_code: 1065 payload["consentCode"] = consent_code 1066 if duos_id: 1067 payload["duosId"] = duos_id 1068 if data_access_control_groups: 1069 payload["dataAccessControlGroups"] = data_access_control_groups 1070 logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}") 1071 response = self.request_util.run_request( 1072 uri=uri, 1073 method=POST, 1074 content_type=APPLICATION_JSON, 1075 data=json.dumps(payload) 1076 ) 1077 job_id = response.json()["id"] 1078 job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run() 1079 snapshot_id = job_results["id"] # type: ignore[index] 1080 logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")
Create a snapshot in TDR.
Returns:
- requests.Response: The response from the request.
1083class FilterOutSampleIdsAlreadyInDataset: 1084 """Class to filter ingest metrics to remove sample IDs that already exist in the dataset.""" 1085 1086 def __init__( 1087 self, 1088 ingest_metrics: list[dict], 1089 dataset_id: str, 1090 tdr: TDR, 1091 target_table_name: str, 1092 filter_entity_id: str 1093 ): 1094 """ 1095 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1096 1097 **Args:** 1098 - ingest_metrics (list[dict]): The metrics to be ingested. 1099 - dataset_id (str): The ID of the dataset. 1100 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1101 - target_table_name (str): The name of the target table. 1102 - filter_entity_id (str): The entity ID to filter on. 1103 """ 1104 self.ingest_metrics = ingest_metrics 1105 """@private""" 1106 self.tdr = tdr 1107 """@private""" 1108 self.dataset_id = dataset_id 1109 """@private""" 1110 self.target_table_name = target_table_name 1111 """@private""" 1112 self.filter_entity_id = filter_entity_id 1113 """@private""" 1114 1115 def run(self) -> list[dict]: 1116 """ 1117 Run the filter process to remove sample IDs that already exist in the dataset. 1118 1119 **Returns:** 1120 - list[dict]: The filtered ingest metrics. 1121 """ 1122 # Get all sample ids that already exist in dataset 1123 logging.info( 1124 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1125 f"dataset {self.dataset_id}" 1126 ) 1127 1128 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1129 dataset_id=self.dataset_id, 1130 target_table_name=self.target_table_name, 1131 entity_id=self.filter_entity_id 1132 ) 1133 # Filter out rows that already exist in dataset 1134 filtered_ingest_metrics = [ 1135 row 1136 for row in self.ingest_metrics 1137 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1138 ] 1139 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1140 logging.info( 1141 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1142 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1143 ) 1144 1145 if filtered_ingest_metrics: 1146 return filtered_ingest_metrics 1147 else: 1148 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1149 return [] 1150 else: 1151 logging.info("No rows were filtered out as they all do not exist in dataset") 1152 return filtered_ingest_metrics
Class to filter ingest metrics to remove sample IDs that already exist in the dataset.
1086 def __init__( 1087 self, 1088 ingest_metrics: list[dict], 1089 dataset_id: str, 1090 tdr: TDR, 1091 target_table_name: str, 1092 filter_entity_id: str 1093 ): 1094 """ 1095 Initialize the FilterOutSampleIdsAlreadyInDataset class. 1096 1097 **Args:** 1098 - ingest_metrics (list[dict]): The metrics to be ingested. 1099 - dataset_id (str): The ID of the dataset. 1100 - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance 1101 - target_table_name (str): The name of the target table. 1102 - filter_entity_id (str): The entity ID to filter on. 1103 """ 1104 self.ingest_metrics = ingest_metrics 1105 """@private""" 1106 self.tdr = tdr 1107 """@private""" 1108 self.dataset_id = dataset_id 1109 """@private""" 1110 self.target_table_name = target_table_name 1111 """@private""" 1112 self.filter_entity_id = filter_entity_id 1113 """@private"""
Initialize the FilterOutSampleIdsAlreadyInDataset class.
Args:
- ingest_metrics (list[dict]): The metrics to be ingested.
- dataset_id (str): The ID of the dataset.
- tdr (
ops_utils.tdr_utils.tdr_utils.TDR
): The TDR instance - target_table_name (str): The name of the target table.
- filter_entity_id (str): The entity ID to filter on.
1115 def run(self) -> list[dict]: 1116 """ 1117 Run the filter process to remove sample IDs that already exist in the dataset. 1118 1119 **Returns:** 1120 - list[dict]: The filtered ingest metrics. 1121 """ 1122 # Get all sample ids that already exist in dataset 1123 logging.info( 1124 f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in " 1125 f"dataset {self.dataset_id}" 1126 ) 1127 1128 dataset_sample_ids = self.tdr.get_dataset_sample_ids( 1129 dataset_id=self.dataset_id, 1130 target_table_name=self.target_table_name, 1131 entity_id=self.filter_entity_id 1132 ) 1133 # Filter out rows that already exist in dataset 1134 filtered_ingest_metrics = [ 1135 row 1136 for row in self.ingest_metrics 1137 if str(row[self.filter_entity_id]) not in dataset_sample_ids 1138 ] 1139 if len(filtered_ingest_metrics) < len(self.ingest_metrics): 1140 logging.info( 1141 f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in " 1142 f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest" 1143 ) 1144 1145 if filtered_ingest_metrics: 1146 return filtered_ingest_metrics 1147 else: 1148 logging.info("All rows filtered out as they all exist in dataset, nothing to ingest") 1149 return [] 1150 else: 1151 logging.info("No rows were filtered out as they all do not exist in dataset") 1152 return filtered_ingest_metrics
Run the filter process to remove sample IDs that already exist in the dataset.
Returns:
- list[dict]: The filtered ingest metrics.