ops_utils.tdr_utils.tdr_api_utils

Utility classes for interacting with TDR API.

   1"""Utility classes for interacting with TDR API."""
   2
   3import json
   4import logging
   5import requests
   6from typing import Any, Optional, Union
   7from pydantic import ValidationError
   8
   9from ..request_util import GET, POST, DELETE, PUT, RunRequest
  10from ..tdr_api_schema.create_dataset_schema import CreateDatasetSchema
  11from ..tdr_api_schema.update_dataset_schema import UpdateSchema
  12from .tdr_job_utils import MonitorTDRJob, SubmitAndMonitorMultipleJobs
  13from ..vars import ARG_DEFAULTS, GCP, APPLICATION_JSON
  14
  15
  16class TDR:
  17    """Class to interact with the Terra Data Repository (TDR) API."""
  18
  19    PROD_LINK = "https://data.terra.bio/api/repository/v1"
  20    DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1"
  21    """(str): The base URL for the TDR API."""
  22
  23    def __init__(self, request_util: RunRequest, env: str = 'prod', dry_run: bool = False):
  24        """
  25        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
  26
  27        **Args:**
  28        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
  29        """
  30        self.request_util = request_util
  31        # NOTE: dry_run is not fully implemented in this class, only in delete_files_and_snapshots
  32        self.dry_run = dry_run
  33        if env.lower() == 'prod':
  34            self.tdr_link = self.PROD_LINK
  35        elif env.lower() == 'dev':
  36            self.tdr_link = self.DEV_LINK
  37        else:
  38            raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.")
  39        """@private"""
  40
  41    @staticmethod
  42    def _check_policy(policy: str) -> None:
  43        """
  44        Check if the policy is valid.
  45
  46        **Args:**
  47        - policy (str): The role to check.
  48
  49        **Raises:**
  50        - ValueError: If the policy is not one of the allowed options.
  51        """
  52        if policy not in ["steward", "custodian", "snapshot_creator"]:
  53            raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator")
  54
  55    def get_dataset_files(
  56            self,
  57            dataset_id: str,
  58            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
  59    ) -> list[dict]:
  60        """
  61        Get all files in a dataset.
  62
  63        Returns json like below
  64
  65            {
  66                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
  67                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
  68                "path": "/path/set/in/ingest.csv",
  69                "size": 1722,
  70                "checksums": [
  71                    {
  72                        "checksum": "82f7e79v",
  73                        "type": "crc32c"
  74                    },
  75                    {
  76                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
  77                        "type": "md5"
  78                    }
  79                ],
  80                "created": "2024-13-11T15:01:00.256Z",
  81                "description": null,
  82                "fileType": "file",
  83                "fileDetail": {
  84                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
  85                    "mimeType": null,
  86                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
  87                    "loadTag": "RP_3333-RP_3333"
  88                },
  89                "directoryDetail": null
  90            }
  91
  92        **Args:**
  93        - dataset_id (str): The ID of the dataset.
  94        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
  95
  96        **Returns:**
  97        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
  98        """
  99        uri = f"{self.tdr_link}/datasets/{dataset_id}/files"
 100        logging.info(f"Getting all files in dataset {dataset_id}")
 101        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 102
 103    def create_file_dict(
 104            self,
 105            dataset_id: str,
 106            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 107    ) -> dict:
 108        """
 109        Create a dictionary of all files in a dataset where the key is the file UUID.
 110
 111        **Args:**
 112        - dataset_id (str): The ID of the dataset.
 113        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 114
 115        **Returns:**
 116        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
 117        """
 118        return {
 119            file_dict["fileId"]: file_dict
 120            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 121        }
 122
 123    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
 124            self,
 125            dataset_id: str,
 126            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 127    ) -> dict:
 128        """
 129        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
 130
 131        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
 132
 133        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
 134
 135        **Args:**
 136        - dataset_id (str): The ID of the dataset.
 137        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 138
 139        **Returns:**
 140        - dict: A dictionary where the key is the file UUID and the value is the file path.
 141        """
 142        return {
 143            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
 144            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 145        }
 146
 147    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
 148        """
 149        Delete a file from a dataset.
 150
 151        **Args:**
 152        - file_id (str): The ID of the file to be deleted.
 153        - dataset_id (str): The ID of the dataset.
 154
 155        **Returns:**
 156        - requests.Response: The response from the request.
 157        """
 158        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}"
 159        logging.info(f"Submitting delete job for file {file_id}")
 160        return self.request_util.run_request(uri=uri, method=DELETE)
 161
 162    def delete_files(
 163            self,
 164            file_ids: list[str],
 165            dataset_id: str,
 166            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
 167            check_interval: int = 15) -> None:
 168        """
 169        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
 170
 171        **Args:**
 172        - file_ids (list[str]): A list of file IDs to be deleted.
 173        - dataset_id (str): The ID of the dataset.
 174        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
 175        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 176        """
 177        SubmitAndMonitorMultipleJobs(
 178            tdr=self,
 179            job_function=self.delete_file,
 180            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
 181            batch_size=batch_size_to_delete_files,
 182            check_interval=check_interval
 183        ).run()
 184
 185    def _delete_snapshots_for_files(self, dataset_id: str, file_ids: set[str]) -> None:
 186        """Delete snapshots that reference any of the provided file IDs."""
 187        snapshots_resp = self.get_dataset_snapshots(dataset_id=dataset_id)
 188        snapshot_items = snapshots_resp.json().get('items', [])
 189        snapshots_to_delete = []
 190        logging.info(
 191            "Checking %d snapshots for references",
 192            len(snapshot_items),
 193        )
 194        for snap in snapshot_items:
 195            snap_id = snap.get('id')
 196            if not snap_id:
 197                continue
 198            snap_files = self.get_files_from_snapshot(snapshot_id=snap_id)
 199            snap_file_ids = {
 200                fd.get('fileId') for fd in snap_files if fd.get('fileId')
 201            }
 202            # Use set intersection to check for any matching file IDs
 203            if snap_file_ids & file_ids:
 204                snapshots_to_delete.append(snap_id)
 205        if snapshots_to_delete:
 206            self.delete_snapshots(snapshot_ids=snapshots_to_delete)
 207        else:
 208            logging.info("No snapshots reference the provided file ids")
 209
 210    def _dry_run_msg(self) -> str:
 211        return '[Dry run] ' if self.dry_run else ''
 212
 213    def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None:
 214        """Delete files from a dataset by their IDs, handling snapshots."""
 215        self._delete_snapshots_for_files(dataset_id=dataset_id, file_ids=file_ids)
 216
 217        logging.info(
 218            f"{self._dry_run_msg()}Submitting delete request for {len(file_ids)} files in "
 219            f"dataset {dataset_id}")
 220        if not self.dry_run:
 221            self.delete_files(
 222                file_ids=list(file_ids),
 223                dataset_id=dataset_id
 224            )
 225
 226    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 227        """
 228        Add a user to a dataset with a specified policy.
 229
 230        **Args:**
 231        - dataset_id (str): The ID of the dataset.
 232        - user (str): The email of the user to be added.
 233        - policy (str): The policy to be assigned to the user.
 234                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 235
 236        **Returns:**
 237        - requests.Response: The response from the request.
 238
 239        **Raises:**
 240        - ValueError: If the policy is not valid.
 241        """
 242        self._check_policy(policy)
 243        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members"
 244        member_dict = {"email": user}
 245        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
 246        return self.request_util.run_request(
 247            uri=uri,
 248            method=POST,
 249            data=json.dumps(member_dict),
 250            content_type=APPLICATION_JSON
 251        )
 252
 253    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 254        """
 255        Remove a user from a dataset.
 256
 257        **Args:**
 258        - dataset_id (str): The ID of the dataset.
 259        - user (str): The email of the user to be removed.
 260        - policy (str): The policy to be removed from the user.
 261                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 262
 263        **Returns:**
 264        - requests.Response: The response from the request.
 265
 266        **Raises:**
 267        - ValueError: If the policy is not valid.
 268        """
 269        self._check_policy(policy)
 270        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}"
 271        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
 272        return self.request_util.run_request(uri=uri, method=DELETE)
 273
 274    def delete_dataset(self, dataset_id: str) -> None:
 275        """
 276        Delete a dataset and monitors the job until completion.
 277
 278        **Args:**
 279            dataset_id (str): The ID of the dataset to be deleted.
 280        """
 281        uri = f"{self.tdr_link}/datasets/{dataset_id}"
 282        logging.info(f"Deleting dataset {dataset_id}")
 283        response = self.request_util.run_request(uri=uri, method=DELETE)
 284        job_id = response.json()['id']
 285        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
 286
 287    def make_snapshot_public(self, snapshot_id: str) -> requests.Response:
 288        """
 289        Make a snapshot public.
 290
 291        **Args:**
 292        - snapshot_id (str): The ID of the snapshot to be made public.
 293
 294        **Returns:**
 295        - requests.Response: The response from the request.
 296        """
 297        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public"
 298        logging.info(f"Making snapshot {snapshot_id} public")
 299        return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")
 300
 301    def get_snapshot_info(
 302            self,
 303            snapshot_id: str,
 304            continue_not_found: bool = False,
 305            info_to_include: Optional[list[str]] = None
 306    ) -> Optional[requests.Response]:
 307        """
 308        Get information about a snapshot.
 309
 310        **Args:**
 311        - snapshot_id (str): The ID of the snapshot.
 312        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
 313        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
 314                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
 315                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
 316
 317        **Returns:**
 318        - requests.Response (optional): The response from the request (returns None if the snapshot is not
 319         found or access is denied).
 320        """
 321        acceptable_return_code = [404, 403] if continue_not_found else []
 322        acceptable_include_info = [
 323            "SOURCES",
 324            "TABLES",
 325            "RELATIONSHIPS",
 326            "ACCESS_INFORMATION",
 327            "PROFILE",
 328            "PROPERTIES",
 329            "DATA_PROJECT",
 330            "CREATION_INFORMATION",
 331            "DUOS"
 332        ]
 333        if info_to_include:
 334            if not all(info in acceptable_include_info for info in info_to_include):
 335                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 336            include_string = '&include='.join(info_to_include)
 337        else:
 338            include_string = ""
 339        uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}"
 340        response = self.request_util.run_request(
 341            uri=uri,
 342            method=GET,
 343            accept_return_codes=acceptable_return_code
 344        )
 345        if response.status_code == 404:
 346            logging.warning(f"Snapshot {snapshot_id} not found")
 347            return None
 348        if response.status_code == 403:
 349            logging.warning(f"Access denied for snapshot {snapshot_id}")
 350            return None
 351        return response
 352
 353    def delete_snapshots(
 354            self,
 355            snapshot_ids: list[str],
 356            batch_size: int = 25,
 357            check_interval: int = 10,
 358            verbose: bool = False) -> None:
 359        """
 360        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
 361
 362        **Args:**
 363        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
 364        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
 365        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
 366        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 367        """
 368        logging.info(f"{self._dry_run_msg()}Deleting {len(snapshot_ids)} snapshots")
 369        if not self.dry_run:
 370            SubmitAndMonitorMultipleJobs(
 371                tdr=self,
 372                job_function=self.delete_snapshot,
 373                job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
 374                batch_size=batch_size,
 375                check_interval=check_interval,
 376                verbose=verbose
 377            ).run()
 378
 379    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
 380        """
 381        Delete a snapshot.
 382
 383        **Args:**
 384        - snapshot_id (str): The ID of the snapshot to be deleted.
 385
 386        **Returns:**
 387        - requests.Response: The response from the request.
 388        """
 389        uri = f"{self.tdr_link}/snapshots/{snapshot_id}"
 390        logging.info(f"Deleting snapshot {snapshot_id}")
 391        return self.request_util.run_request(uri=uri, method=DELETE)
 392
 393    def _yield_existing_datasets(
 394            self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc"
 395    ) -> Any:
 396        """
 397        Get all datasets in TDR, optionally filtered by dataset name.
 398
 399        **Args:**
 400            filter (Optional[str]): A filter string to match dataset names. Defaults to None.
 401            batch_size (int): The number of datasets to retrieve per batch. Defaults to 100.
 402            direction (str): The direction to sort the datasets by creation date. Defaults to "asc".
 403
 404        Yields:
 405            Any: A generator yielding datasets.
 406        """
 407        offset = 0
 408        if filter:
 409            filter_str = f"&filter={filter}"
 410            log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}"
 411        else:
 412            filter_str = ""
 413            log_message = f"Searching for all datasets in batches of {batch_size}"
 414        logging.info(log_message)
 415        while True:
 416            uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}"  # noqa: E501
 417            response = self.request_util.run_request(uri=uri, method=GET)
 418            datasets = response.json()["items"]
 419            if not datasets:
 420                break
 421            for dataset in datasets:
 422                yield dataset
 423            offset += batch_size
 424            break
 425
 426    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
 427        """
 428        Check if a dataset exists by name and optionally by billing profile.
 429
 430        **Args:**
 431        - dataset_name (str): The name of the dataset to check.
 432        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
 433
 434        **Returns:**
 435        - list[dict]: A list of matching datasets.
 436        """
 437        matching_datasets = []
 438        for dataset in self._yield_existing_datasets(filter=dataset_name):
 439            # Search uses wildcard so could grab more datasets where dataset_name is substring
 440            if dataset_name == dataset["name"]:
 441                if billing_profile:
 442                    if dataset["defaultProfileId"] == billing_profile:
 443                        logging.info(
 444                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
 445                        dataset_id = dataset["id"]
 446                        logging.info(f"Dataset ID: {dataset_id}")
 447                        matching_datasets.append(dataset)
 448                    else:
 449                        logging.warning(
 450                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
 451                            f"and not under billing profile {billing_profile}"
 452                        )
 453                        # Datasets names need to be unique regardless of billing profile, so raise an error if
 454                        # a dataset with the same name is found but is not under the requested billing profile
 455                        raise ValueError(
 456                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
 457                else:
 458                    matching_datasets.append(dataset)
 459        return matching_datasets
 460
 461    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
 462        """
 463        Get information about a dataset.
 464
 465        **Args:**
 466        - dataset_id (str): The ID of the dataset.
 467        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
 468        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
 469        Defaults to None.
 470
 471        **Returns:**
 472        - requests.Response: The response from the request.
 473
 474        **Raises:**
 475        - ValueError: If `info_to_include` contains invalid information types.
 476        """
 477        acceptable_include_info = [
 478            "SCHEMA",
 479            "ACCESS_INFORMATION",
 480            "PROFILE",
 481            "PROPERTIES",
 482            "DATA_PROJECT",
 483            "STORAGE",
 484            "SNAPSHOT_BUILDER_SETTING"
 485        ]
 486        if info_to_include:
 487            if not all(info in acceptable_include_info for info in info_to_include):
 488                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 489            include_string = '&include='.join(info_to_include)
 490        else:
 491            include_string = ""
 492        uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}"
 493        return self.request_util.run_request(uri=uri, method=GET)
 494
 495    def get_table_schema_info(
 496            self,
 497            dataset_id: str,
 498            table_name: str,
 499            dataset_info: Optional[dict] = None
 500    ) -> Union[dict, None]:
 501        """
 502        Get schema information for a specific table within a dataset.
 503
 504        **Args:**
 505        - dataset_id (str): The ID of the dataset.
 506        - table_name (str): The name of the table.
 507        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
 508
 509        **Returns:**
 510        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
 511        """
 512        if not dataset_info:
 513            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 514        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
 515            if table["name"] == table_name:
 516                return table
 517        return None
 518
 519    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
 520        """
 521        Retrieve the result of a job.
 522
 523        **Args:**
 524        - job_id (str): The ID of the job.
 525        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
 526
 527        **Returns:**
 528        - requests.Response: The response from the request.
 529        """
 530        uri = f"{self.tdr_link}/jobs/{job_id}/result"
 531        # If job is expected to fail, accept any return code
 532        acceptable_return_code = list(range(100, 600)) if expect_failure else []
 533        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
 534
 535    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
 536        """
 537        Load data into a TDR dataset.
 538
 539        **Args:**
 540        - dataset_id (str): The ID of the dataset.
 541        - data (dict): The data to be ingested.
 542
 543        **Returns:**
 544        - requests.Response: The response from the request.
 545        """
 546        uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest"
 547        logging.info(
 548            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
 549            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
 550        return self.request_util.run_request(
 551            uri=uri,
 552            method=POST,
 553            content_type=APPLICATION_JSON,
 554            data=data
 555        )
 556
 557    def file_ingest_to_dataset(
 558            self,
 559            dataset_id: str,
 560            profile_id: str,
 561            file_list: list[dict],
 562            load_tag: str = "file_ingest_load_tag"
 563    ) -> dict:
 564        """
 565        Load files into a TDR dataset.
 566
 567        **Args:**
 568        - dataset_id (str): The ID of the dataset.
 569        - profile_id (str): The billing profile ID.
 570        - file_list (list[dict]): The list of files to be ingested.
 571        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
 572
 573        **Returns:**
 574        - dict: A dictionary containing the response from the ingest operation job monitoring.
 575        """
 576        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array"
 577        data = {
 578            "profileId": profile_id,
 579            "loadTag": f"{load_tag}",
 580            "maxFailedFileLoads": 0,
 581            "loadArray": file_list
 582        }
 583
 584        response = self.request_util.run_request(
 585            uri=uri,
 586            method=POST,
 587            content_type=APPLICATION_JSON,
 588            data=json.dumps(data)
 589        )
 590        job_id = response.json()['id']
 591        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 592        return job_results  # type: ignore[return-value]
 593
 594    def get_dataset_table_metrics(
 595            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
 596    ) -> list[dict]:
 597        """
 598        Retrieve all metrics for a specific table within a dataset.
 599
 600        **Args:**
 601        - dataset_id (str): The ID of the dataset.
 602        - target_table_name (str): The name of the target table.
 603        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 604
 605        **Returns:**
 606        - list[dict]: A list of dictionaries containing the metrics for the specified table.
 607        """
 608        return [
 609            metric
 610            for metric in self._yield_dataset_metrics(
 611                dataset_id=dataset_id,
 612                target_table_name=target_table_name,
 613                query_limit=query_limit
 614            )
 615        ]
 616
 617    def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any:
 618        """
 619        Yield all entity metrics from a dataset.
 620
 621        **Args:**
 622            dataset_id (str): The ID of the dataset.
 623            target_table_name (str): The name of the target table.
 624            query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
 625
 626        Yields:
 627            Any: A generator yielding dictionaries containing the metrics for the specified table.
 628        """
 629        search_request = {
 630            "offset": 0,
 631            "limit": query_limit,
 632            "sort": "datarepo_row_id"
 633        }
 634        uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}"
 635        while True:
 636            batch_number = int((search_request["offset"] / query_limit)) + 1  # type: ignore[operator]
 637            response = self.request_util.run_request(
 638                uri=uri,
 639                method=POST,
 640                content_type=APPLICATION_JSON,
 641                data=json.dumps(search_request)
 642            )
 643            if not response or not response.json()["result"]:
 644                break
 645            logging.info(
 646                f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " +
 647                f"dataset {dataset_id}"
 648            )
 649            for record in response.json()["result"]:
 650                yield record
 651            search_request["offset"] += query_limit  # type: ignore[operator]
 652
 653    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
 654        """
 655        Get existing IDs from a dataset.
 656
 657        **Args:**
 658        - dataset_id (str): The ID of the dataset.
 659        - target_table_name (str): The name of the target table.
 660        - entity_id (str): The entity ID to retrieve.
 661
 662        **Returns:**
 663        - list[str]: A list of entity IDs from the specified table.
 664        """
 665        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
 666        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
 667
 668    def get_job_status(self, job_id: str) -> requests.Response:
 669        """
 670        Retrieve the status of a job.
 671
 672        **Args:**
 673        - job_id (str): The ID of the job.
 674
 675        **Returns:**
 676        - requests.Response: The response from the request.
 677        """
 678        uri = f"{self.tdr_link}/jobs/{job_id}"
 679        return self.request_util.run_request(uri=uri, method=GET)
 680
 681    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
 682        """
 683        Get all file UUIDs from the metadata of a dataset.
 684
 685        **Args:**
 686        - dataset_id (str): The ID of the dataset.
 687
 688        **Returns:**
 689        - list[str]: A list of file UUIDs from the dataset metadata.
 690        """
 691        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 692        all_metadata_file_uuids = []
 693        tables = 0
 694        for table in dataset_info["schema"]["tables"]:
 695            tables += 1
 696            table_name = table["name"]
 697            logging.info(f"Getting all file information for {table_name}")
 698            # Get just columns where datatype is fileref
 699            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
 700            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
 701            # Get unique list of file uuids
 702            file_uuids = list(
 703                set(
 704                    [
 705                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
 706                    ]
 707                )
 708            )
 709            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
 710            all_metadata_file_uuids.extend(file_uuids)
 711            # Make full list unique
 712            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
 713        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
 714        return all_metadata_file_uuids
 715
 716    def soft_delete_entries(
 717            self,
 718            dataset_id: str,
 719            table_name: str,
 720            datarepo_row_ids: list[str],
 721            check_intervals: int = 15
 722    ) -> Optional[dict]:
 723        """
 724        Soft delete specific records from a table.
 725
 726        **Args:**
 727        - dataset_id (str): The ID of the dataset.
 728        - table_name (str): The name of the target table.
 729        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
 730        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 731
 732        **Returns:**
 733        - dict (optional): A dictionary containing the response from the soft delete operation job
 734        monitoring. Returns None if no row IDs are provided.
 735        """
 736        if not datarepo_row_ids:
 737            logging.info(f"No records found to soft delete in table {table_name}")
 738            return None
 739        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
 740        uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes"
 741        payload = {
 742            "deleteType": "soft",
 743            "specType": "jsonArray",
 744            "tables": [
 745                {
 746                    "tableName": table_name,
 747                    "jsonArraySpec": {
 748                        "rowIds": datarepo_row_ids
 749                    }
 750                }
 751            ]
 752        }
 753        response = self.request_util.run_request(
 754            method=POST,
 755            uri=uri,
 756            data=json.dumps(payload),
 757            content_type=APPLICATION_JSON
 758        )
 759        job_id = response.json()["id"]
 760        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
 761
 762    def soft_delete_all_table_entries(
 763            self,
 764            dataset_id: str,
 765            table_name: str,
 766            query_limit: int = 1000,
 767            check_intervals: int = 15
 768    ) -> Optional[dict]:
 769        """
 770        Soft deletes all records in a table.
 771
 772        **Args:**
 773        - dataset_id (str): The ID of the dataset.
 774        - table_name (str): The name of the target table.
 775        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 776        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 777
 778        **Returns:**
 779        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
 780        None if no row IDs are provided.
 781        """
 782        dataset_metrics = self.get_dataset_table_metrics(
 783            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
 784        )
 785        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
 786        return self.soft_delete_entries(
 787            dataset_id=dataset_id,
 788            table_name=table_name,
 789            datarepo_row_ids=row_ids,
 790            check_intervals=check_intervals
 791        )
 792
 793    def get_or_create_dataset(
 794            self,
 795            dataset_name: str,
 796            billing_profile: str,
 797            schema: dict,
 798            description: str,
 799            relationships: Optional[list[dict]] = None,
 800            delete_existing: bool = False,
 801            continue_if_exists: bool = False,
 802            additional_properties_dict: Optional[dict] = None
 803    ) -> str:
 804        """
 805        Get or create a dataset.
 806
 807        **Args:**
 808        - dataset_name (str): The name of the dataset.
 809        - billing_profile (str): The billing profile ID.
 810        - schema (dict): The schema of the dataset.
 811        - description (str): The description of the dataset.
 812        - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema.
 813                Defaults to None.
 814        - additional_properties_dict (Optional[dict], optional): Additional properties
 815                for the dataset. Defaults to None.
 816        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
 817                Defaults to `False`.
 818        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
 819                Defaults to `False`.
 820
 821        **Returns:**
 822        - str: The ID of the dataset.
 823
 824        **Raises:**
 825        - ValueError: If multiple datasets with the same name are found under the billing profile.
 826        """
 827        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
 828        if existing_datasets:
 829            if not continue_if_exists:
 830                raise ValueError(
 831                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
 832                )
 833            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
 834            if delete_existing:
 835                logging.info(f"Deleting existing dataset {dataset_name}")
 836                self.delete_dataset(existing_datasets[0]["id"])
 837                existing_datasets = []
 838            # If not delete_existing and continue_if_exists then grab existing datasets id
 839            else:
 840                dataset_id = existing_datasets[0]["id"]
 841        if not existing_datasets:
 842            logging.info("Did not find existing dataset")
 843            # Create dataset
 844            dataset_id = self.create_dataset(
 845                schema=schema,
 846                dataset_name=dataset_name,
 847                description=description,
 848                profile_id=billing_profile,
 849                additional_dataset_properties=additional_properties_dict
 850            )
 851        return dataset_id
 852
 853    def create_dataset(  # type: ignore[return]
 854            self,
 855            schema: dict,
 856            dataset_name: str,
 857            description: str,
 858            profile_id: str,
 859            additional_dataset_properties: Optional[dict] = None
 860    ) -> Optional[str]:
 861        """
 862        Create a new dataset.
 863
 864        **Args:**
 865        - schema (dict): The schema of the dataset.
 866        - dataset_name (str): The name of the dataset.
 867        - description (str): The description of the dataset.
 868        - profile_id (str): The billing profile ID.
 869        - additional_dataset_properties (Optional[dict], optional): Additional
 870                properties for the dataset. Defaults to None.
 871
 872        **Returns:**
 873        - Optional[str]: The ID of the created dataset, or None if creation failed.
 874
 875        Raises:
 876        - ValueError: If the schema validation fails.
 877        """
 878        dataset_properties = {
 879            "name": dataset_name,
 880            "description": description,
 881            "defaultProfileId": profile_id,
 882            "region": "us-central1",
 883            "cloudPlatform": GCP,
 884            "schema": schema
 885        }
 886
 887        if additional_dataset_properties:
 888            dataset_properties.update(additional_dataset_properties)
 889        try:
 890            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
 891        except ValidationError as e:
 892            raise ValueError(f"Schema validation error: {e}")
 893        uri = f"{self.tdr_link}/datasets"
 894        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
 895        response = self.request_util.run_request(
 896            method=POST,
 897            uri=uri,
 898            data=json.dumps(dataset_properties),
 899            content_type=APPLICATION_JSON
 900        )
 901        job_id = response.json()["id"]
 902        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 903        dataset_id = job_results["id"]  # type: ignore[index]
 904        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
 905        return dataset_id
 906
 907    def update_dataset_schema(  # type: ignore[return]
 908            self,
 909            dataset_id: str,
 910            update_note: str,
 911            tables_to_add: Optional[list[dict]] = None,
 912            relationships_to_add: Optional[list[dict]] = None,
 913            columns_to_add: Optional[list[dict]] = None
 914    ) -> Optional[str]:
 915        """
 916        Update the schema of a dataset.
 917
 918        **Args:**
 919        - dataset_id (str): The ID of the dataset.
 920        - update_note (str): A note describing the update.
 921        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
 922        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
 923        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
 924
 925        **Returns:**
 926        - Optional[str]: The ID of the updated dataset, or None if the update failed.
 927
 928        **Raises:**
 929        - ValueError: If the schema validation fails.
 930        """
 931        uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema"
 932        request_body: dict = {"description": f"{update_note}", "changes": {}}
 933        if tables_to_add:
 934            request_body["changes"]["addTables"] = tables_to_add
 935        if relationships_to_add:
 936            request_body["changes"]["addRelationships"] = relationships_to_add
 937        if columns_to_add:
 938            request_body["changes"]["addColumns"] = columns_to_add
 939        try:
 940            UpdateSchema(**request_body)
 941        except ValidationError as e:
 942            raise ValueError(f"Schema validation error: {e}")
 943
 944        response = self.request_util.run_request(
 945            uri=uri,
 946            method=POST,
 947            content_type=APPLICATION_JSON,
 948            data=json.dumps(request_body)
 949        )
 950        job_id = response.json()["id"]
 951        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 952        dataset_id = job_results["id"]  # type: ignore[index]
 953        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
 954        return dataset_id
 955
 956    def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]:
 957        """
 958        Get response from a batched endpoint.
 959
 960        Helper method for all GET endpoints that require batching.
 961
 962        Given the URI and the limit (optional), will
 963        loop through batches until all metadata is retrieved.
 964
 965        **Args:**
 966        - uri (str): The base URI for the endpoint (without query params for offset or limit).
 967        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 968
 969        **Returns:**
 970        - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint.
 971        """
 972        batch = 1
 973        offset = 0
 974        metadata: list = []
 975        while True:
 976            logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata")
 977            response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json()
 978
 979            # If no more files, break the loop
 980            if not response_json:
 981                logging.info(f"No more results to retrieve, found {len(metadata)} total records")
 982                break
 983
 984            metadata.extend(response_json)
 985            if len(response_json) < limit:
 986                logging.info(f"Retrieved final batch of results, found {len(metadata)} total records")
 987                break
 988
 989            # Increment the offset by limit for the next page
 990            offset += limit
 991            batch += 1
 992        return metadata
 993
 994    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
 995        """
 996        Return all the metadata about files in a given snapshot.
 997
 998        Not all files can be returned at once, so the API
 999        is used repeatedly until all "batches" have been returned.
1000
1001        **Args:**
1002        - snapshot_id (str): The ID of the snapshot.
1003        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
1004
1005        **Returns:**
1006        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
1007        """
1008        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files"
1009        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
1010
1011    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
1012        """
1013        Return snapshots belonging to specified dataset.
1014
1015        **Args:**
1016        - dataset_id: uuid of dataset to query.
1017
1018        **Returns:**
1019        - requests.Response: The response from the request.
1020        """
1021        uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}"
1022        return self.request_util.run_request(
1023            uri=uri,
1024            method=GET
1025        )
1026
1027    def create_snapshot(
1028            self,
1029            snapshot_name: str,
1030            description: str,
1031            dataset_name: str,
1032            snapshot_mode: str,  # byFullView is entire dataset
1033            profile_id: str,
1034            stewards: Optional[list[str]] = [],
1035            readers: Optional[list[str]] = [],
1036            consent_code: Optional[str] = None,
1037            duos_id: Optional[str] = None,
1038            data_access_control_groups: Optional[list[str]] = None,
1039    ) -> None:
1040        """
1041        Create a snapshot in TDR.
1042
1043        **Returns:**
1044        - requests.Response: The response from the request.
1045        """
1046        uri = f"{self.tdr_link}/snapshots"
1047        payload = {
1048            "name": snapshot_name,
1049            "description": description,
1050            "contents": [
1051                {
1052                    "datasetName": dataset_name,
1053                    "mode": snapshot_mode,
1054                }
1055            ],
1056            "policies": {
1057                "stewards": stewards,
1058                "readers": readers,
1059            },
1060            "profileId": profile_id,
1061            "globalFileIds": True,
1062        }
1063        if consent_code:
1064            payload["consentCode"] = consent_code
1065        if duos_id:
1066            payload["duosId"] = duos_id
1067        if data_access_control_groups:
1068            payload["dataAccessControlGroups"] = data_access_control_groups
1069        logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}")
1070        response = self.request_util.run_request(
1071            uri=uri,
1072            method=POST,
1073            content_type=APPLICATION_JSON,
1074            data=json.dumps(payload)
1075        )
1076        job_id = response.json()["id"]
1077        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
1078        snapshot_id = job_results["id"]  # type: ignore[index]
1079        logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")
1080
1081
1082class FilterOutSampleIdsAlreadyInDataset:
1083    """Class to filter ingest metrics to remove sample IDs that already exist in the dataset."""
1084
1085    def __init__(
1086            self,
1087            ingest_metrics: list[dict],
1088            dataset_id: str,
1089            tdr: TDR,
1090            target_table_name: str,
1091            filter_entity_id: str
1092    ):
1093        """
1094        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1095
1096        **Args:**
1097        - ingest_metrics (list[dict]): The metrics to be ingested.
1098        - dataset_id (str): The ID of the dataset.
1099        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1100        - target_table_name (str): The name of the target table.
1101        - filter_entity_id (str): The entity ID to filter on.
1102        """
1103        self.ingest_metrics = ingest_metrics
1104        """@private"""
1105        self.tdr = tdr
1106        """@private"""
1107        self.dataset_id = dataset_id
1108        """@private"""
1109        self.target_table_name = target_table_name
1110        """@private"""
1111        self.filter_entity_id = filter_entity_id
1112        """@private"""
1113
1114    def run(self) -> list[dict]:
1115        """
1116        Run the filter process to remove sample IDs that already exist in the dataset.
1117
1118        **Returns:**
1119        - list[dict]: The filtered ingest metrics.
1120        """
1121        # Get all sample ids that already exist in dataset
1122        logging.info(
1123            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1124            f"dataset {self.dataset_id}"
1125        )
1126
1127        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1128            dataset_id=self.dataset_id,
1129            target_table_name=self.target_table_name,
1130            entity_id=self.filter_entity_id
1131        )
1132        # Filter out rows that already exist in dataset
1133        filtered_ingest_metrics = [
1134            row
1135            for row in self.ingest_metrics
1136            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1137        ]
1138        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1139            logging.info(
1140                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1141                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1142            )
1143
1144            if filtered_ingest_metrics:
1145                return filtered_ingest_metrics
1146            else:
1147                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1148                return []
1149        else:
1150            logging.info("No rows were filtered out as they all do not exist in dataset")
1151            return filtered_ingest_metrics
class TDR:
  17class TDR:
  18    """Class to interact with the Terra Data Repository (TDR) API."""
  19
  20    PROD_LINK = "https://data.terra.bio/api/repository/v1"
  21    DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1"
  22    """(str): The base URL for the TDR API."""
  23
  24    def __init__(self, request_util: RunRequest, env: str = 'prod', dry_run: bool = False):
  25        """
  26        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
  27
  28        **Args:**
  29        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
  30        """
  31        self.request_util = request_util
  32        # NOTE: dry_run is not fully implemented in this class, only in delete_files_and_snapshots
  33        self.dry_run = dry_run
  34        if env.lower() == 'prod':
  35            self.tdr_link = self.PROD_LINK
  36        elif env.lower() == 'dev':
  37            self.tdr_link = self.DEV_LINK
  38        else:
  39            raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.")
  40        """@private"""
  41
  42    @staticmethod
  43    def _check_policy(policy: str) -> None:
  44        """
  45        Check if the policy is valid.
  46
  47        **Args:**
  48        - policy (str): The role to check.
  49
  50        **Raises:**
  51        - ValueError: If the policy is not one of the allowed options.
  52        """
  53        if policy not in ["steward", "custodian", "snapshot_creator"]:
  54            raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator")
  55
  56    def get_dataset_files(
  57            self,
  58            dataset_id: str,
  59            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
  60    ) -> list[dict]:
  61        """
  62        Get all files in a dataset.
  63
  64        Returns json like below
  65
  66            {
  67                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
  68                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
  69                "path": "/path/set/in/ingest.csv",
  70                "size": 1722,
  71                "checksums": [
  72                    {
  73                        "checksum": "82f7e79v",
  74                        "type": "crc32c"
  75                    },
  76                    {
  77                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
  78                        "type": "md5"
  79                    }
  80                ],
  81                "created": "2024-13-11T15:01:00.256Z",
  82                "description": null,
  83                "fileType": "file",
  84                "fileDetail": {
  85                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
  86                    "mimeType": null,
  87                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
  88                    "loadTag": "RP_3333-RP_3333"
  89                },
  90                "directoryDetail": null
  91            }
  92
  93        **Args:**
  94        - dataset_id (str): The ID of the dataset.
  95        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
  96
  97        **Returns:**
  98        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
  99        """
 100        uri = f"{self.tdr_link}/datasets/{dataset_id}/files"
 101        logging.info(f"Getting all files in dataset {dataset_id}")
 102        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 103
 104    def create_file_dict(
 105            self,
 106            dataset_id: str,
 107            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 108    ) -> dict:
 109        """
 110        Create a dictionary of all files in a dataset where the key is the file UUID.
 111
 112        **Args:**
 113        - dataset_id (str): The ID of the dataset.
 114        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 115
 116        **Returns:**
 117        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
 118        """
 119        return {
 120            file_dict["fileId"]: file_dict
 121            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 122        }
 123
 124    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
 125            self,
 126            dataset_id: str,
 127            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 128    ) -> dict:
 129        """
 130        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
 131
 132        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
 133
 134        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
 135
 136        **Args:**
 137        - dataset_id (str): The ID of the dataset.
 138        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 139
 140        **Returns:**
 141        - dict: A dictionary where the key is the file UUID and the value is the file path.
 142        """
 143        return {
 144            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
 145            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 146        }
 147
 148    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
 149        """
 150        Delete a file from a dataset.
 151
 152        **Args:**
 153        - file_id (str): The ID of the file to be deleted.
 154        - dataset_id (str): The ID of the dataset.
 155
 156        **Returns:**
 157        - requests.Response: The response from the request.
 158        """
 159        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}"
 160        logging.info(f"Submitting delete job for file {file_id}")
 161        return self.request_util.run_request(uri=uri, method=DELETE)
 162
 163    def delete_files(
 164            self,
 165            file_ids: list[str],
 166            dataset_id: str,
 167            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
 168            check_interval: int = 15) -> None:
 169        """
 170        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
 171
 172        **Args:**
 173        - file_ids (list[str]): A list of file IDs to be deleted.
 174        - dataset_id (str): The ID of the dataset.
 175        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
 176        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 177        """
 178        SubmitAndMonitorMultipleJobs(
 179            tdr=self,
 180            job_function=self.delete_file,
 181            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
 182            batch_size=batch_size_to_delete_files,
 183            check_interval=check_interval
 184        ).run()
 185
 186    def _delete_snapshots_for_files(self, dataset_id: str, file_ids: set[str]) -> None:
 187        """Delete snapshots that reference any of the provided file IDs."""
 188        snapshots_resp = self.get_dataset_snapshots(dataset_id=dataset_id)
 189        snapshot_items = snapshots_resp.json().get('items', [])
 190        snapshots_to_delete = []
 191        logging.info(
 192            "Checking %d snapshots for references",
 193            len(snapshot_items),
 194        )
 195        for snap in snapshot_items:
 196            snap_id = snap.get('id')
 197            if not snap_id:
 198                continue
 199            snap_files = self.get_files_from_snapshot(snapshot_id=snap_id)
 200            snap_file_ids = {
 201                fd.get('fileId') for fd in snap_files if fd.get('fileId')
 202            }
 203            # Use set intersection to check for any matching file IDs
 204            if snap_file_ids & file_ids:
 205                snapshots_to_delete.append(snap_id)
 206        if snapshots_to_delete:
 207            self.delete_snapshots(snapshot_ids=snapshots_to_delete)
 208        else:
 209            logging.info("No snapshots reference the provided file ids")
 210
 211    def _dry_run_msg(self) -> str:
 212        return '[Dry run] ' if self.dry_run else ''
 213
 214    def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None:
 215        """Delete files from a dataset by their IDs, handling snapshots."""
 216        self._delete_snapshots_for_files(dataset_id=dataset_id, file_ids=file_ids)
 217
 218        logging.info(
 219            f"{self._dry_run_msg()}Submitting delete request for {len(file_ids)} files in "
 220            f"dataset {dataset_id}")
 221        if not self.dry_run:
 222            self.delete_files(
 223                file_ids=list(file_ids),
 224                dataset_id=dataset_id
 225            )
 226
 227    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 228        """
 229        Add a user to a dataset with a specified policy.
 230
 231        **Args:**
 232        - dataset_id (str): The ID of the dataset.
 233        - user (str): The email of the user to be added.
 234        - policy (str): The policy to be assigned to the user.
 235                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 236
 237        **Returns:**
 238        - requests.Response: The response from the request.
 239
 240        **Raises:**
 241        - ValueError: If the policy is not valid.
 242        """
 243        self._check_policy(policy)
 244        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members"
 245        member_dict = {"email": user}
 246        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
 247        return self.request_util.run_request(
 248            uri=uri,
 249            method=POST,
 250            data=json.dumps(member_dict),
 251            content_type=APPLICATION_JSON
 252        )
 253
 254    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 255        """
 256        Remove a user from a dataset.
 257
 258        **Args:**
 259        - dataset_id (str): The ID of the dataset.
 260        - user (str): The email of the user to be removed.
 261        - policy (str): The policy to be removed from the user.
 262                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 263
 264        **Returns:**
 265        - requests.Response: The response from the request.
 266
 267        **Raises:**
 268        - ValueError: If the policy is not valid.
 269        """
 270        self._check_policy(policy)
 271        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}"
 272        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
 273        return self.request_util.run_request(uri=uri, method=DELETE)
 274
 275    def delete_dataset(self, dataset_id: str) -> None:
 276        """
 277        Delete a dataset and monitors the job until completion.
 278
 279        **Args:**
 280            dataset_id (str): The ID of the dataset to be deleted.
 281        """
 282        uri = f"{self.tdr_link}/datasets/{dataset_id}"
 283        logging.info(f"Deleting dataset {dataset_id}")
 284        response = self.request_util.run_request(uri=uri, method=DELETE)
 285        job_id = response.json()['id']
 286        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
 287
 288    def make_snapshot_public(self, snapshot_id: str) -> requests.Response:
 289        """
 290        Make a snapshot public.
 291
 292        **Args:**
 293        - snapshot_id (str): The ID of the snapshot to be made public.
 294
 295        **Returns:**
 296        - requests.Response: The response from the request.
 297        """
 298        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public"
 299        logging.info(f"Making snapshot {snapshot_id} public")
 300        return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")
 301
 302    def get_snapshot_info(
 303            self,
 304            snapshot_id: str,
 305            continue_not_found: bool = False,
 306            info_to_include: Optional[list[str]] = None
 307    ) -> Optional[requests.Response]:
 308        """
 309        Get information about a snapshot.
 310
 311        **Args:**
 312        - snapshot_id (str): The ID of the snapshot.
 313        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
 314        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
 315                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
 316                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
 317
 318        **Returns:**
 319        - requests.Response (optional): The response from the request (returns None if the snapshot is not
 320         found or access is denied).
 321        """
 322        acceptable_return_code = [404, 403] if continue_not_found else []
 323        acceptable_include_info = [
 324            "SOURCES",
 325            "TABLES",
 326            "RELATIONSHIPS",
 327            "ACCESS_INFORMATION",
 328            "PROFILE",
 329            "PROPERTIES",
 330            "DATA_PROJECT",
 331            "CREATION_INFORMATION",
 332            "DUOS"
 333        ]
 334        if info_to_include:
 335            if not all(info in acceptable_include_info for info in info_to_include):
 336                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 337            include_string = '&include='.join(info_to_include)
 338        else:
 339            include_string = ""
 340        uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}"
 341        response = self.request_util.run_request(
 342            uri=uri,
 343            method=GET,
 344            accept_return_codes=acceptable_return_code
 345        )
 346        if response.status_code == 404:
 347            logging.warning(f"Snapshot {snapshot_id} not found")
 348            return None
 349        if response.status_code == 403:
 350            logging.warning(f"Access denied for snapshot {snapshot_id}")
 351            return None
 352        return response
 353
 354    def delete_snapshots(
 355            self,
 356            snapshot_ids: list[str],
 357            batch_size: int = 25,
 358            check_interval: int = 10,
 359            verbose: bool = False) -> None:
 360        """
 361        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
 362
 363        **Args:**
 364        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
 365        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
 366        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
 367        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 368        """
 369        logging.info(f"{self._dry_run_msg()}Deleting {len(snapshot_ids)} snapshots")
 370        if not self.dry_run:
 371            SubmitAndMonitorMultipleJobs(
 372                tdr=self,
 373                job_function=self.delete_snapshot,
 374                job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
 375                batch_size=batch_size,
 376                check_interval=check_interval,
 377                verbose=verbose
 378            ).run()
 379
 380    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
 381        """
 382        Delete a snapshot.
 383
 384        **Args:**
 385        - snapshot_id (str): The ID of the snapshot to be deleted.
 386
 387        **Returns:**
 388        - requests.Response: The response from the request.
 389        """
 390        uri = f"{self.tdr_link}/snapshots/{snapshot_id}"
 391        logging.info(f"Deleting snapshot {snapshot_id}")
 392        return self.request_util.run_request(uri=uri, method=DELETE)
 393
 394    def _yield_existing_datasets(
 395            self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc"
 396    ) -> Any:
 397        """
 398        Get all datasets in TDR, optionally filtered by dataset name.
 399
 400        **Args:**
 401            filter (Optional[str]): A filter string to match dataset names. Defaults to None.
 402            batch_size (int): The number of datasets to retrieve per batch. Defaults to 100.
 403            direction (str): The direction to sort the datasets by creation date. Defaults to "asc".
 404
 405        Yields:
 406            Any: A generator yielding datasets.
 407        """
 408        offset = 0
 409        if filter:
 410            filter_str = f"&filter={filter}"
 411            log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}"
 412        else:
 413            filter_str = ""
 414            log_message = f"Searching for all datasets in batches of {batch_size}"
 415        logging.info(log_message)
 416        while True:
 417            uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}"  # noqa: E501
 418            response = self.request_util.run_request(uri=uri, method=GET)
 419            datasets = response.json()["items"]
 420            if not datasets:
 421                break
 422            for dataset in datasets:
 423                yield dataset
 424            offset += batch_size
 425            break
 426
 427    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
 428        """
 429        Check if a dataset exists by name and optionally by billing profile.
 430
 431        **Args:**
 432        - dataset_name (str): The name of the dataset to check.
 433        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
 434
 435        **Returns:**
 436        - list[dict]: A list of matching datasets.
 437        """
 438        matching_datasets = []
 439        for dataset in self._yield_existing_datasets(filter=dataset_name):
 440            # Search uses wildcard so could grab more datasets where dataset_name is substring
 441            if dataset_name == dataset["name"]:
 442                if billing_profile:
 443                    if dataset["defaultProfileId"] == billing_profile:
 444                        logging.info(
 445                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
 446                        dataset_id = dataset["id"]
 447                        logging.info(f"Dataset ID: {dataset_id}")
 448                        matching_datasets.append(dataset)
 449                    else:
 450                        logging.warning(
 451                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
 452                            f"and not under billing profile {billing_profile}"
 453                        )
 454                        # Datasets names need to be unique regardless of billing profile, so raise an error if
 455                        # a dataset with the same name is found but is not under the requested billing profile
 456                        raise ValueError(
 457                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
 458                else:
 459                    matching_datasets.append(dataset)
 460        return matching_datasets
 461
 462    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
 463        """
 464        Get information about a dataset.
 465
 466        **Args:**
 467        - dataset_id (str): The ID of the dataset.
 468        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
 469        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
 470        Defaults to None.
 471
 472        **Returns:**
 473        - requests.Response: The response from the request.
 474
 475        **Raises:**
 476        - ValueError: If `info_to_include` contains invalid information types.
 477        """
 478        acceptable_include_info = [
 479            "SCHEMA",
 480            "ACCESS_INFORMATION",
 481            "PROFILE",
 482            "PROPERTIES",
 483            "DATA_PROJECT",
 484            "STORAGE",
 485            "SNAPSHOT_BUILDER_SETTING"
 486        ]
 487        if info_to_include:
 488            if not all(info in acceptable_include_info for info in info_to_include):
 489                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 490            include_string = '&include='.join(info_to_include)
 491        else:
 492            include_string = ""
 493        uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}"
 494        return self.request_util.run_request(uri=uri, method=GET)
 495
 496    def get_table_schema_info(
 497            self,
 498            dataset_id: str,
 499            table_name: str,
 500            dataset_info: Optional[dict] = None
 501    ) -> Union[dict, None]:
 502        """
 503        Get schema information for a specific table within a dataset.
 504
 505        **Args:**
 506        - dataset_id (str): The ID of the dataset.
 507        - table_name (str): The name of the table.
 508        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
 509
 510        **Returns:**
 511        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
 512        """
 513        if not dataset_info:
 514            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 515        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
 516            if table["name"] == table_name:
 517                return table
 518        return None
 519
 520    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
 521        """
 522        Retrieve the result of a job.
 523
 524        **Args:**
 525        - job_id (str): The ID of the job.
 526        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
 527
 528        **Returns:**
 529        - requests.Response: The response from the request.
 530        """
 531        uri = f"{self.tdr_link}/jobs/{job_id}/result"
 532        # If job is expected to fail, accept any return code
 533        acceptable_return_code = list(range(100, 600)) if expect_failure else []
 534        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
 535
 536    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
 537        """
 538        Load data into a TDR dataset.
 539
 540        **Args:**
 541        - dataset_id (str): The ID of the dataset.
 542        - data (dict): The data to be ingested.
 543
 544        **Returns:**
 545        - requests.Response: The response from the request.
 546        """
 547        uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest"
 548        logging.info(
 549            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
 550            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
 551        return self.request_util.run_request(
 552            uri=uri,
 553            method=POST,
 554            content_type=APPLICATION_JSON,
 555            data=data
 556        )
 557
 558    def file_ingest_to_dataset(
 559            self,
 560            dataset_id: str,
 561            profile_id: str,
 562            file_list: list[dict],
 563            load_tag: str = "file_ingest_load_tag"
 564    ) -> dict:
 565        """
 566        Load files into a TDR dataset.
 567
 568        **Args:**
 569        - dataset_id (str): The ID of the dataset.
 570        - profile_id (str): The billing profile ID.
 571        - file_list (list[dict]): The list of files to be ingested.
 572        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
 573
 574        **Returns:**
 575        - dict: A dictionary containing the response from the ingest operation job monitoring.
 576        """
 577        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array"
 578        data = {
 579            "profileId": profile_id,
 580            "loadTag": f"{load_tag}",
 581            "maxFailedFileLoads": 0,
 582            "loadArray": file_list
 583        }
 584
 585        response = self.request_util.run_request(
 586            uri=uri,
 587            method=POST,
 588            content_type=APPLICATION_JSON,
 589            data=json.dumps(data)
 590        )
 591        job_id = response.json()['id']
 592        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 593        return job_results  # type: ignore[return-value]
 594
 595    def get_dataset_table_metrics(
 596            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
 597    ) -> list[dict]:
 598        """
 599        Retrieve all metrics for a specific table within a dataset.
 600
 601        **Args:**
 602        - dataset_id (str): The ID of the dataset.
 603        - target_table_name (str): The name of the target table.
 604        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 605
 606        **Returns:**
 607        - list[dict]: A list of dictionaries containing the metrics for the specified table.
 608        """
 609        return [
 610            metric
 611            for metric in self._yield_dataset_metrics(
 612                dataset_id=dataset_id,
 613                target_table_name=target_table_name,
 614                query_limit=query_limit
 615            )
 616        ]
 617
 618    def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any:
 619        """
 620        Yield all entity metrics from a dataset.
 621
 622        **Args:**
 623            dataset_id (str): The ID of the dataset.
 624            target_table_name (str): The name of the target table.
 625            query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
 626
 627        Yields:
 628            Any: A generator yielding dictionaries containing the metrics for the specified table.
 629        """
 630        search_request = {
 631            "offset": 0,
 632            "limit": query_limit,
 633            "sort": "datarepo_row_id"
 634        }
 635        uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}"
 636        while True:
 637            batch_number = int((search_request["offset"] / query_limit)) + 1  # type: ignore[operator]
 638            response = self.request_util.run_request(
 639                uri=uri,
 640                method=POST,
 641                content_type=APPLICATION_JSON,
 642                data=json.dumps(search_request)
 643            )
 644            if not response or not response.json()["result"]:
 645                break
 646            logging.info(
 647                f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " +
 648                f"dataset {dataset_id}"
 649            )
 650            for record in response.json()["result"]:
 651                yield record
 652            search_request["offset"] += query_limit  # type: ignore[operator]
 653
 654    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
 655        """
 656        Get existing IDs from a dataset.
 657
 658        **Args:**
 659        - dataset_id (str): The ID of the dataset.
 660        - target_table_name (str): The name of the target table.
 661        - entity_id (str): The entity ID to retrieve.
 662
 663        **Returns:**
 664        - list[str]: A list of entity IDs from the specified table.
 665        """
 666        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
 667        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
 668
 669    def get_job_status(self, job_id: str) -> requests.Response:
 670        """
 671        Retrieve the status of a job.
 672
 673        **Args:**
 674        - job_id (str): The ID of the job.
 675
 676        **Returns:**
 677        - requests.Response: The response from the request.
 678        """
 679        uri = f"{self.tdr_link}/jobs/{job_id}"
 680        return self.request_util.run_request(uri=uri, method=GET)
 681
 682    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
 683        """
 684        Get all file UUIDs from the metadata of a dataset.
 685
 686        **Args:**
 687        - dataset_id (str): The ID of the dataset.
 688
 689        **Returns:**
 690        - list[str]: A list of file UUIDs from the dataset metadata.
 691        """
 692        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 693        all_metadata_file_uuids = []
 694        tables = 0
 695        for table in dataset_info["schema"]["tables"]:
 696            tables += 1
 697            table_name = table["name"]
 698            logging.info(f"Getting all file information for {table_name}")
 699            # Get just columns where datatype is fileref
 700            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
 701            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
 702            # Get unique list of file uuids
 703            file_uuids = list(
 704                set(
 705                    [
 706                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
 707                    ]
 708                )
 709            )
 710            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
 711            all_metadata_file_uuids.extend(file_uuids)
 712            # Make full list unique
 713            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
 714        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
 715        return all_metadata_file_uuids
 716
 717    def soft_delete_entries(
 718            self,
 719            dataset_id: str,
 720            table_name: str,
 721            datarepo_row_ids: list[str],
 722            check_intervals: int = 15
 723    ) -> Optional[dict]:
 724        """
 725        Soft delete specific records from a table.
 726
 727        **Args:**
 728        - dataset_id (str): The ID of the dataset.
 729        - table_name (str): The name of the target table.
 730        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
 731        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 732
 733        **Returns:**
 734        - dict (optional): A dictionary containing the response from the soft delete operation job
 735        monitoring. Returns None if no row IDs are provided.
 736        """
 737        if not datarepo_row_ids:
 738            logging.info(f"No records found to soft delete in table {table_name}")
 739            return None
 740        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
 741        uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes"
 742        payload = {
 743            "deleteType": "soft",
 744            "specType": "jsonArray",
 745            "tables": [
 746                {
 747                    "tableName": table_name,
 748                    "jsonArraySpec": {
 749                        "rowIds": datarepo_row_ids
 750                    }
 751                }
 752            ]
 753        }
 754        response = self.request_util.run_request(
 755            method=POST,
 756            uri=uri,
 757            data=json.dumps(payload),
 758            content_type=APPLICATION_JSON
 759        )
 760        job_id = response.json()["id"]
 761        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
 762
 763    def soft_delete_all_table_entries(
 764            self,
 765            dataset_id: str,
 766            table_name: str,
 767            query_limit: int = 1000,
 768            check_intervals: int = 15
 769    ) -> Optional[dict]:
 770        """
 771        Soft deletes all records in a table.
 772
 773        **Args:**
 774        - dataset_id (str): The ID of the dataset.
 775        - table_name (str): The name of the target table.
 776        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 777        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 778
 779        **Returns:**
 780        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
 781        None if no row IDs are provided.
 782        """
 783        dataset_metrics = self.get_dataset_table_metrics(
 784            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
 785        )
 786        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
 787        return self.soft_delete_entries(
 788            dataset_id=dataset_id,
 789            table_name=table_name,
 790            datarepo_row_ids=row_ids,
 791            check_intervals=check_intervals
 792        )
 793
 794    def get_or_create_dataset(
 795            self,
 796            dataset_name: str,
 797            billing_profile: str,
 798            schema: dict,
 799            description: str,
 800            relationships: Optional[list[dict]] = None,
 801            delete_existing: bool = False,
 802            continue_if_exists: bool = False,
 803            additional_properties_dict: Optional[dict] = None
 804    ) -> str:
 805        """
 806        Get or create a dataset.
 807
 808        **Args:**
 809        - dataset_name (str): The name of the dataset.
 810        - billing_profile (str): The billing profile ID.
 811        - schema (dict): The schema of the dataset.
 812        - description (str): The description of the dataset.
 813        - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema.
 814                Defaults to None.
 815        - additional_properties_dict (Optional[dict], optional): Additional properties
 816                for the dataset. Defaults to None.
 817        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
 818                Defaults to `False`.
 819        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
 820                Defaults to `False`.
 821
 822        **Returns:**
 823        - str: The ID of the dataset.
 824
 825        **Raises:**
 826        - ValueError: If multiple datasets with the same name are found under the billing profile.
 827        """
 828        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
 829        if existing_datasets:
 830            if not continue_if_exists:
 831                raise ValueError(
 832                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
 833                )
 834            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
 835            if delete_existing:
 836                logging.info(f"Deleting existing dataset {dataset_name}")
 837                self.delete_dataset(existing_datasets[0]["id"])
 838                existing_datasets = []
 839            # If not delete_existing and continue_if_exists then grab existing datasets id
 840            else:
 841                dataset_id = existing_datasets[0]["id"]
 842        if not existing_datasets:
 843            logging.info("Did not find existing dataset")
 844            # Create dataset
 845            dataset_id = self.create_dataset(
 846                schema=schema,
 847                dataset_name=dataset_name,
 848                description=description,
 849                profile_id=billing_profile,
 850                additional_dataset_properties=additional_properties_dict
 851            )
 852        return dataset_id
 853
 854    def create_dataset(  # type: ignore[return]
 855            self,
 856            schema: dict,
 857            dataset_name: str,
 858            description: str,
 859            profile_id: str,
 860            additional_dataset_properties: Optional[dict] = None
 861    ) -> Optional[str]:
 862        """
 863        Create a new dataset.
 864
 865        **Args:**
 866        - schema (dict): The schema of the dataset.
 867        - dataset_name (str): The name of the dataset.
 868        - description (str): The description of the dataset.
 869        - profile_id (str): The billing profile ID.
 870        - additional_dataset_properties (Optional[dict], optional): Additional
 871                properties for the dataset. Defaults to None.
 872
 873        **Returns:**
 874        - Optional[str]: The ID of the created dataset, or None if creation failed.
 875
 876        Raises:
 877        - ValueError: If the schema validation fails.
 878        """
 879        dataset_properties = {
 880            "name": dataset_name,
 881            "description": description,
 882            "defaultProfileId": profile_id,
 883            "region": "us-central1",
 884            "cloudPlatform": GCP,
 885            "schema": schema
 886        }
 887
 888        if additional_dataset_properties:
 889            dataset_properties.update(additional_dataset_properties)
 890        try:
 891            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
 892        except ValidationError as e:
 893            raise ValueError(f"Schema validation error: {e}")
 894        uri = f"{self.tdr_link}/datasets"
 895        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
 896        response = self.request_util.run_request(
 897            method=POST,
 898            uri=uri,
 899            data=json.dumps(dataset_properties),
 900            content_type=APPLICATION_JSON
 901        )
 902        job_id = response.json()["id"]
 903        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 904        dataset_id = job_results["id"]  # type: ignore[index]
 905        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
 906        return dataset_id
 907
 908    def update_dataset_schema(  # type: ignore[return]
 909            self,
 910            dataset_id: str,
 911            update_note: str,
 912            tables_to_add: Optional[list[dict]] = None,
 913            relationships_to_add: Optional[list[dict]] = None,
 914            columns_to_add: Optional[list[dict]] = None
 915    ) -> Optional[str]:
 916        """
 917        Update the schema of a dataset.
 918
 919        **Args:**
 920        - dataset_id (str): The ID of the dataset.
 921        - update_note (str): A note describing the update.
 922        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
 923        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
 924        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
 925
 926        **Returns:**
 927        - Optional[str]: The ID of the updated dataset, or None if the update failed.
 928
 929        **Raises:**
 930        - ValueError: If the schema validation fails.
 931        """
 932        uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema"
 933        request_body: dict = {"description": f"{update_note}", "changes": {}}
 934        if tables_to_add:
 935            request_body["changes"]["addTables"] = tables_to_add
 936        if relationships_to_add:
 937            request_body["changes"]["addRelationships"] = relationships_to_add
 938        if columns_to_add:
 939            request_body["changes"]["addColumns"] = columns_to_add
 940        try:
 941            UpdateSchema(**request_body)
 942        except ValidationError as e:
 943            raise ValueError(f"Schema validation error: {e}")
 944
 945        response = self.request_util.run_request(
 946            uri=uri,
 947            method=POST,
 948            content_type=APPLICATION_JSON,
 949            data=json.dumps(request_body)
 950        )
 951        job_id = response.json()["id"]
 952        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 953        dataset_id = job_results["id"]  # type: ignore[index]
 954        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
 955        return dataset_id
 956
 957    def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]:
 958        """
 959        Get response from a batched endpoint.
 960
 961        Helper method for all GET endpoints that require batching.
 962
 963        Given the URI and the limit (optional), will
 964        loop through batches until all metadata is retrieved.
 965
 966        **Args:**
 967        - uri (str): The base URI for the endpoint (without query params for offset or limit).
 968        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 969
 970        **Returns:**
 971        - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint.
 972        """
 973        batch = 1
 974        offset = 0
 975        metadata: list = []
 976        while True:
 977            logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata")
 978            response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json()
 979
 980            # If no more files, break the loop
 981            if not response_json:
 982                logging.info(f"No more results to retrieve, found {len(metadata)} total records")
 983                break
 984
 985            metadata.extend(response_json)
 986            if len(response_json) < limit:
 987                logging.info(f"Retrieved final batch of results, found {len(metadata)} total records")
 988                break
 989
 990            # Increment the offset by limit for the next page
 991            offset += limit
 992            batch += 1
 993        return metadata
 994
 995    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
 996        """
 997        Return all the metadata about files in a given snapshot.
 998
 999        Not all files can be returned at once, so the API
1000        is used repeatedly until all "batches" have been returned.
1001
1002        **Args:**
1003        - snapshot_id (str): The ID of the snapshot.
1004        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
1005
1006        **Returns:**
1007        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
1008        """
1009        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files"
1010        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
1011
1012    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
1013        """
1014        Return snapshots belonging to specified dataset.
1015
1016        **Args:**
1017        - dataset_id: uuid of dataset to query.
1018
1019        **Returns:**
1020        - requests.Response: The response from the request.
1021        """
1022        uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}"
1023        return self.request_util.run_request(
1024            uri=uri,
1025            method=GET
1026        )
1027
1028    def create_snapshot(
1029            self,
1030            snapshot_name: str,
1031            description: str,
1032            dataset_name: str,
1033            snapshot_mode: str,  # byFullView is entire dataset
1034            profile_id: str,
1035            stewards: Optional[list[str]] = [],
1036            readers: Optional[list[str]] = [],
1037            consent_code: Optional[str] = None,
1038            duos_id: Optional[str] = None,
1039            data_access_control_groups: Optional[list[str]] = None,
1040    ) -> None:
1041        """
1042        Create a snapshot in TDR.
1043
1044        **Returns:**
1045        - requests.Response: The response from the request.
1046        """
1047        uri = f"{self.tdr_link}/snapshots"
1048        payload = {
1049            "name": snapshot_name,
1050            "description": description,
1051            "contents": [
1052                {
1053                    "datasetName": dataset_name,
1054                    "mode": snapshot_mode,
1055                }
1056            ],
1057            "policies": {
1058                "stewards": stewards,
1059                "readers": readers,
1060            },
1061            "profileId": profile_id,
1062            "globalFileIds": True,
1063        }
1064        if consent_code:
1065            payload["consentCode"] = consent_code
1066        if duos_id:
1067            payload["duosId"] = duos_id
1068        if data_access_control_groups:
1069            payload["dataAccessControlGroups"] = data_access_control_groups
1070        logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}")
1071        response = self.request_util.run_request(
1072            uri=uri,
1073            method=POST,
1074            content_type=APPLICATION_JSON,
1075            data=json.dumps(payload)
1076        )
1077        job_id = response.json()["id"]
1078        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
1079        snapshot_id = job_results["id"]  # type: ignore[index]
1080        logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")

Class to interact with the Terra Data Repository (TDR) API.

TDR( request_util: ops_utils.request_util.RunRequest, env: str = 'prod', dry_run: bool = False)
24    def __init__(self, request_util: RunRequest, env: str = 'prod', dry_run: bool = False):
25        """
26        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
27
28        **Args:**
29        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
30        """
31        self.request_util = request_util
32        # NOTE: dry_run is not fully implemented in this class, only in delete_files_and_snapshots
33        self.dry_run = dry_run
34        if env.lower() == 'prod':
35            self.tdr_link = self.PROD_LINK
36        elif env.lower() == 'dev':
37            self.tdr_link = self.DEV_LINK
38        else:
39            raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.")
40        """@private"""

Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).

Args:

request_util
dry_run
def get_dataset_files(self, dataset_id: str, limit: int = 20000) -> list[dict]:
 56    def get_dataset_files(
 57            self,
 58            dataset_id: str,
 59            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 60    ) -> list[dict]:
 61        """
 62        Get all files in a dataset.
 63
 64        Returns json like below
 65
 66            {
 67                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
 68                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
 69                "path": "/path/set/in/ingest.csv",
 70                "size": 1722,
 71                "checksums": [
 72                    {
 73                        "checksum": "82f7e79v",
 74                        "type": "crc32c"
 75                    },
 76                    {
 77                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
 78                        "type": "md5"
 79                    }
 80                ],
 81                "created": "2024-13-11T15:01:00.256Z",
 82                "description": null,
 83                "fileType": "file",
 84                "fileDetail": {
 85                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
 86                    "mimeType": null,
 87                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
 88                    "loadTag": "RP_3333-RP_3333"
 89                },
 90                "directoryDetail": null
 91            }
 92
 93        **Args:**
 94        - dataset_id (str): The ID of the dataset.
 95        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 96
 97        **Returns:**
 98        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
 99        """
100        uri = f"{self.tdr_link}/datasets/{dataset_id}/files"
101        logging.info(f"Getting all files in dataset {dataset_id}")
102        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)

Get all files in a dataset.

Returns json like below

{
    "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
    "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
    "path": "/path/set/in/ingest.csv",
    "size": 1722,
    "checksums": [
        {
            "checksum": "82f7e79v",
            "type": "crc32c"
        },
        {
            "checksum": "fff973507e30b74fa47a3d6830b84a90",
            "type": "md5"
        }
    ],
    "created": "2024-13-11T15:01:00.256Z",
    "description": null,
    "fileType": "file",
    "fileDetail": {
        "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
        "mimeType": null,
        "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
        "loadTag": "RP_3333-RP_3333"
    },
    "directoryDetail": null
}

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
def create_file_dict(self, dataset_id: str, limit: int = 20000) -> dict:
104    def create_file_dict(
105            self,
106            dataset_id: str,
107            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
108    ) -> dict:
109        """
110        Create a dictionary of all files in a dataset where the key is the file UUID.
111
112        **Args:**
113        - dataset_id (str): The ID of the dataset.
114        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
115
116        **Returns:**
117        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
118        """
119        return {
120            file_dict["fileId"]: file_dict
121            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
122        }

Create a dictionary of all files in a dataset where the key is the file UUID.

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • dict: A dictionary where the key is the file UUID and the value is the file metadata.
def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(self, dataset_id: str, limit: int = 20000) -> dict:
124    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
125            self,
126            dataset_id: str,
127            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
128    ) -> dict:
129        """
130        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
131
132        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
133
134        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
135
136        **Args:**
137        - dataset_id (str): The ID of the dataset.
138        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
139
140        **Returns:**
141        - dict: A dictionary where the key is the file UUID and the value is the file path.
142        """
143        return {
144            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
145            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
146        }

Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.

This assumes that the TDR 'path' is original path of the file in the cloud storage with gs:// stripped out.

This will ONLY work if dataset was created with experimentalSelfHosted = True

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • dict: A dictionary where the key is the file UUID and the value is the file path.
def delete_file(self, file_id: str, dataset_id: str) -> requests.models.Response:
148    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
149        """
150        Delete a file from a dataset.
151
152        **Args:**
153        - file_id (str): The ID of the file to be deleted.
154        - dataset_id (str): The ID of the dataset.
155
156        **Returns:**
157        - requests.Response: The response from the request.
158        """
159        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}"
160        logging.info(f"Submitting delete job for file {file_id}")
161        return self.request_util.run_request(uri=uri, method=DELETE)

Delete a file from a dataset.

Args:

  • file_id (str): The ID of the file to be deleted.
  • dataset_id (str): The ID of the dataset.

Returns:

  • requests.Response: The response from the request.
def delete_files( self, file_ids: list[str], dataset_id: str, batch_size_to_delete_files: int = 200, check_interval: int = 15) -> None:
163    def delete_files(
164            self,
165            file_ids: list[str],
166            dataset_id: str,
167            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
168            check_interval: int = 15) -> None:
169        """
170        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
171
172        **Args:**
173        - file_ids (list[str]): A list of file IDs to be deleted.
174        - dataset_id (str): The ID of the dataset.
175        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
176        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
177        """
178        SubmitAndMonitorMultipleJobs(
179            tdr=self,
180            job_function=self.delete_file,
181            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
182            batch_size=batch_size_to_delete_files,
183            check_interval=check_interval
184        ).run()

Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.

Args:

  • file_ids (list[str]): A list of file IDs to be deleted.
  • dataset_id (str): The ID of the dataset.
  • batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to 200.
  • check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to 15.
def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None:
214    def delete_files_and_snapshots(self, dataset_id: str, file_ids: set[str]) -> None:
215        """Delete files from a dataset by their IDs, handling snapshots."""
216        self._delete_snapshots_for_files(dataset_id=dataset_id, file_ids=file_ids)
217
218        logging.info(
219            f"{self._dry_run_msg()}Submitting delete request for {len(file_ids)} files in "
220            f"dataset {dataset_id}")
221        if not self.dry_run:
222            self.delete_files(
223                file_ids=list(file_ids),
224                dataset_id=dataset_id
225            )

Delete files from a dataset by their IDs, handling snapshots.

def add_user_to_dataset( self, dataset_id: str, user: str, policy: str) -> requests.models.Response:
227    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
228        """
229        Add a user to a dataset with a specified policy.
230
231        **Args:**
232        - dataset_id (str): The ID of the dataset.
233        - user (str): The email of the user to be added.
234        - policy (str): The policy to be assigned to the user.
235                Must be one of `steward`, `custodian`, or `snapshot_creator`.
236
237        **Returns:**
238        - requests.Response: The response from the request.
239
240        **Raises:**
241        - ValueError: If the policy is not valid.
242        """
243        self._check_policy(policy)
244        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members"
245        member_dict = {"email": user}
246        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
247        return self.request_util.run_request(
248            uri=uri,
249            method=POST,
250            data=json.dumps(member_dict),
251            content_type=APPLICATION_JSON
252        )

Add a user to a dataset with a specified policy.

Args:

  • dataset_id (str): The ID of the dataset.
  • user (str): The email of the user to be added.
  • policy (str): The policy to be assigned to the user. Must be one of steward, custodian, or snapshot_creator.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If the policy is not valid.
def remove_user_from_dataset( self, dataset_id: str, user: str, policy: str) -> requests.models.Response:
254    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
255        """
256        Remove a user from a dataset.
257
258        **Args:**
259        - dataset_id (str): The ID of the dataset.
260        - user (str): The email of the user to be removed.
261        - policy (str): The policy to be removed from the user.
262                Must be one of `steward`, `custodian`, or `snapshot_creator`.
263
264        **Returns:**
265        - requests.Response: The response from the request.
266
267        **Raises:**
268        - ValueError: If the policy is not valid.
269        """
270        self._check_policy(policy)
271        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}"
272        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
273        return self.request_util.run_request(uri=uri, method=DELETE)

Remove a user from a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • user (str): The email of the user to be removed.
  • policy (str): The policy to be removed from the user. Must be one of steward, custodian, or snapshot_creator.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If the policy is not valid.
def delete_dataset(self, dataset_id: str) -> None:
275    def delete_dataset(self, dataset_id: str) -> None:
276        """
277        Delete a dataset and monitors the job until completion.
278
279        **Args:**
280            dataset_id (str): The ID of the dataset to be deleted.
281        """
282        uri = f"{self.tdr_link}/datasets/{dataset_id}"
283        logging.info(f"Deleting dataset {dataset_id}")
284        response = self.request_util.run_request(uri=uri, method=DELETE)
285        job_id = response.json()['id']
286        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()

Delete a dataset and monitors the job until completion.

Args: dataset_id (str): The ID of the dataset to be deleted.

def make_snapshot_public(self, snapshot_id: str) -> requests.models.Response:
288    def make_snapshot_public(self, snapshot_id: str) -> requests.Response:
289        """
290        Make a snapshot public.
291
292        **Args:**
293        - snapshot_id (str): The ID of the snapshot to be made public.
294
295        **Returns:**
296        - requests.Response: The response from the request.
297        """
298        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public"
299        logging.info(f"Making snapshot {snapshot_id} public")
300        return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")

Make a snapshot public.

Args:

  • snapshot_id (str): The ID of the snapshot to be made public.

Returns:

  • requests.Response: The response from the request.
def get_snapshot_info( self, snapshot_id: str, continue_not_found: bool = False, info_to_include: Optional[list[str]] = None) -> Optional[requests.models.Response]:
302    def get_snapshot_info(
303            self,
304            snapshot_id: str,
305            continue_not_found: bool = False,
306            info_to_include: Optional[list[str]] = None
307    ) -> Optional[requests.Response]:
308        """
309        Get information about a snapshot.
310
311        **Args:**
312        - snapshot_id (str): The ID of the snapshot.
313        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
314        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
315                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
316                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
317
318        **Returns:**
319        - requests.Response (optional): The response from the request (returns None if the snapshot is not
320         found or access is denied).
321        """
322        acceptable_return_code = [404, 403] if continue_not_found else []
323        acceptable_include_info = [
324            "SOURCES",
325            "TABLES",
326            "RELATIONSHIPS",
327            "ACCESS_INFORMATION",
328            "PROFILE",
329            "PROPERTIES",
330            "DATA_PROJECT",
331            "CREATION_INFORMATION",
332            "DUOS"
333        ]
334        if info_to_include:
335            if not all(info in acceptable_include_info for info in info_to_include):
336                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
337            include_string = '&include='.join(info_to_include)
338        else:
339            include_string = ""
340        uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}"
341        response = self.request_util.run_request(
342            uri=uri,
343            method=GET,
344            accept_return_codes=acceptable_return_code
345        )
346        if response.status_code == 404:
347            logging.warning(f"Snapshot {snapshot_id} not found")
348            return None
349        if response.status_code == 403:
350            logging.warning(f"Access denied for snapshot {snapshot_id}")
351            return None
352        return response

Get information about a snapshot.

Args:

  • snapshot_id (str): The ID of the snapshot.
  • continue_not_found (bool, optional): Whether to accept a 404 response. Defaults to False.
  • info_to_include (list[str], optional): A list of additional information to include. Defaults to None. Options are: SOURCES, TABLES, RELATIONSHIPS, ACCESS_INFORMATION, PROFILE, PROPERTIES, DATA_PROJECT,CREATION_INFORMATION, DUOS

Returns:

  • requests.Response (optional): The response from the request (returns None if the snapshot is not found or access is denied).
def delete_snapshots( self, snapshot_ids: list[str], batch_size: int = 25, check_interval: int = 10, verbose: bool = False) -> None:
354    def delete_snapshots(
355            self,
356            snapshot_ids: list[str],
357            batch_size: int = 25,
358            check_interval: int = 10,
359            verbose: bool = False) -> None:
360        """
361        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
362
363        **Args:**
364        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
365        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
366        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
367        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
368        """
369        logging.info(f"{self._dry_run_msg()}Deleting {len(snapshot_ids)} snapshots")
370        if not self.dry_run:
371            SubmitAndMonitorMultipleJobs(
372                tdr=self,
373                job_function=self.delete_snapshot,
374                job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
375                batch_size=batch_size,
376                check_interval=check_interval,
377                verbose=verbose
378            ).run()

Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.

Args:

  • snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
  • batch_size (int, optional): The number of snapshots to delete per batch. Defaults to 25.
  • check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to 10.
  • verbose (bool, optional): Whether to log detailed information about each job. Defaults to False.
def delete_snapshot(self, snapshot_id: str) -> requests.models.Response:
380    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
381        """
382        Delete a snapshot.
383
384        **Args:**
385        - snapshot_id (str): The ID of the snapshot to be deleted.
386
387        **Returns:**
388        - requests.Response: The response from the request.
389        """
390        uri = f"{self.tdr_link}/snapshots/{snapshot_id}"
391        logging.info(f"Deleting snapshot {snapshot_id}")
392        return self.request_util.run_request(uri=uri, method=DELETE)

Delete a snapshot.

Args:

  • snapshot_id (str): The ID of the snapshot to be deleted.

Returns:

  • requests.Response: The response from the request.
def check_if_dataset_exists( self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
427    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
428        """
429        Check if a dataset exists by name and optionally by billing profile.
430
431        **Args:**
432        - dataset_name (str): The name of the dataset to check.
433        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
434
435        **Returns:**
436        - list[dict]: A list of matching datasets.
437        """
438        matching_datasets = []
439        for dataset in self._yield_existing_datasets(filter=dataset_name):
440            # Search uses wildcard so could grab more datasets where dataset_name is substring
441            if dataset_name == dataset["name"]:
442                if billing_profile:
443                    if dataset["defaultProfileId"] == billing_profile:
444                        logging.info(
445                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
446                        dataset_id = dataset["id"]
447                        logging.info(f"Dataset ID: {dataset_id}")
448                        matching_datasets.append(dataset)
449                    else:
450                        logging.warning(
451                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
452                            f"and not under billing profile {billing_profile}"
453                        )
454                        # Datasets names need to be unique regardless of billing profile, so raise an error if
455                        # a dataset with the same name is found but is not under the requested billing profile
456                        raise ValueError(
457                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
458                else:
459                    matching_datasets.append(dataset)
460        return matching_datasets

Check if a dataset exists by name and optionally by billing profile.

Args:

  • dataset_name (str): The name of the dataset to check.
  • billing_profile (str, optional): The billing profile ID to match. Defaults to None.

Returns:

  • list[dict]: A list of matching datasets.
def get_dataset_info( self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.models.Response:
462    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
463        """
464        Get information about a dataset.
465
466        **Args:**
467        - dataset_id (str): The ID of the dataset.
468        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
469        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
470        Defaults to None.
471
472        **Returns:**
473        - requests.Response: The response from the request.
474
475        **Raises:**
476        - ValueError: If `info_to_include` contains invalid information types.
477        """
478        acceptable_include_info = [
479            "SCHEMA",
480            "ACCESS_INFORMATION",
481            "PROFILE",
482            "PROPERTIES",
483            "DATA_PROJECT",
484            "STORAGE",
485            "SNAPSHOT_BUILDER_SETTING"
486        ]
487        if info_to_include:
488            if not all(info in acceptable_include_info for info in info_to_include):
489                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
490            include_string = '&include='.join(info_to_include)
491        else:
492            include_string = ""
493        uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}"
494        return self.request_util.run_request(uri=uri, method=GET)

Get information about a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • info_to_include (list[str], optional): A list of additional information to include. Valid options include: SCHEMA, ACCESS_INFORMATION, PROFILE, PROPERTIES, DATA_PROJECT, STORAGE, SNAPSHOT_BUILDER_SETTING. Defaults to None.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If info_to_include contains invalid information types.
def get_table_schema_info( self, dataset_id: str, table_name: str, dataset_info: Optional[dict] = None) -> Optional[dict]:
496    def get_table_schema_info(
497            self,
498            dataset_id: str,
499            table_name: str,
500            dataset_info: Optional[dict] = None
501    ) -> Union[dict, None]:
502        """
503        Get schema information for a specific table within a dataset.
504
505        **Args:**
506        - dataset_id (str): The ID of the dataset.
507        - table_name (str): The name of the table.
508        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
509
510        **Returns:**
511        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
512        """
513        if not dataset_info:
514            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
515        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
516            if table["name"] == table_name:
517                return table
518        return None

Get schema information for a specific table within a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the table.
  • dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.

Returns:

  • Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
def get_job_result( self, job_id: str, expect_failure: bool = False) -> requests.models.Response:
520    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
521        """
522        Retrieve the result of a job.
523
524        **Args:**
525        - job_id (str): The ID of the job.
526        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
527
528        **Returns:**
529        - requests.Response: The response from the request.
530        """
531        uri = f"{self.tdr_link}/jobs/{job_id}/result"
532        # If job is expected to fail, accept any return code
533        acceptable_return_code = list(range(100, 600)) if expect_failure else []
534        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)

Retrieve the result of a job.

Args:

  • job_id (str): The ID of the job.
  • expect_failure (bool, optional): Whether the job is expected to fail. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.models.Response:
536    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
537        """
538        Load data into a TDR dataset.
539
540        **Args:**
541        - dataset_id (str): The ID of the dataset.
542        - data (dict): The data to be ingested.
543
544        **Returns:**
545        - requests.Response: The response from the request.
546        """
547        uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest"
548        logging.info(
549            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
550            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
551        return self.request_util.run_request(
552            uri=uri,
553            method=POST,
554            content_type=APPLICATION_JSON,
555            data=data
556        )

Load data into a TDR dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • data (dict): The data to be ingested.

Returns:

  • requests.Response: The response from the request.
def file_ingest_to_dataset( self, dataset_id: str, profile_id: str, file_list: list[dict], load_tag: str = 'file_ingest_load_tag') -> dict:
558    def file_ingest_to_dataset(
559            self,
560            dataset_id: str,
561            profile_id: str,
562            file_list: list[dict],
563            load_tag: str = "file_ingest_load_tag"
564    ) -> dict:
565        """
566        Load files into a TDR dataset.
567
568        **Args:**
569        - dataset_id (str): The ID of the dataset.
570        - profile_id (str): The billing profile ID.
571        - file_list (list[dict]): The list of files to be ingested.
572        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
573
574        **Returns:**
575        - dict: A dictionary containing the response from the ingest operation job monitoring.
576        """
577        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array"
578        data = {
579            "profileId": profile_id,
580            "loadTag": f"{load_tag}",
581            "maxFailedFileLoads": 0,
582            "loadArray": file_list
583        }
584
585        response = self.request_util.run_request(
586            uri=uri,
587            method=POST,
588            content_type=APPLICATION_JSON,
589            data=json.dumps(data)
590        )
591        job_id = response.json()['id']
592        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
593        return job_results  # type: ignore[return-value]

Load files into a TDR dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • profile_id (str): The billing profile ID.
  • file_list (list[dict]): The list of files to be ingested.
  • load_tag (str): The tag to be used in the ingest job. Defaults to file_ingest_load_tag.

Returns:

  • dict: A dictionary containing the response from the ingest operation job monitoring.
def get_dataset_table_metrics( self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> list[dict]:
595    def get_dataset_table_metrics(
596            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
597    ) -> list[dict]:
598        """
599        Retrieve all metrics for a specific table within a dataset.
600
601        **Args:**
602        - dataset_id (str): The ID of the dataset.
603        - target_table_name (str): The name of the target table.
604        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
605
606        **Returns:**
607        - list[dict]: A list of dictionaries containing the metrics for the specified table.
608        """
609        return [
610            metric
611            for metric in self._yield_dataset_metrics(
612                dataset_id=dataset_id,
613                target_table_name=target_table_name,
614                query_limit=query_limit
615            )
616        ]

Retrieve all metrics for a specific table within a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • target_table_name (str): The name of the target table.
  • query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.

Returns:

  • list[dict]: A list of dictionaries containing the metrics for the specified table.
def get_dataset_sample_ids( self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
654    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
655        """
656        Get existing IDs from a dataset.
657
658        **Args:**
659        - dataset_id (str): The ID of the dataset.
660        - target_table_name (str): The name of the target table.
661        - entity_id (str): The entity ID to retrieve.
662
663        **Returns:**
664        - list[str]: A list of entity IDs from the specified table.
665        """
666        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
667        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]

Get existing IDs from a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • target_table_name (str): The name of the target table.
  • entity_id (str): The entity ID to retrieve.

Returns:

  • list[str]: A list of entity IDs from the specified table.
def get_job_status(self, job_id: str) -> requests.models.Response:
669    def get_job_status(self, job_id: str) -> requests.Response:
670        """
671        Retrieve the status of a job.
672
673        **Args:**
674        - job_id (str): The ID of the job.
675
676        **Returns:**
677        - requests.Response: The response from the request.
678        """
679        uri = f"{self.tdr_link}/jobs/{job_id}"
680        return self.request_util.run_request(uri=uri, method=GET)

Retrieve the status of a job.

Args:

  • job_id (str): The ID of the job.

Returns:

  • requests.Response: The response from the request.
def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
682    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
683        """
684        Get all file UUIDs from the metadata of a dataset.
685
686        **Args:**
687        - dataset_id (str): The ID of the dataset.
688
689        **Returns:**
690        - list[str]: A list of file UUIDs from the dataset metadata.
691        """
692        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
693        all_metadata_file_uuids = []
694        tables = 0
695        for table in dataset_info["schema"]["tables"]:
696            tables += 1
697            table_name = table["name"]
698            logging.info(f"Getting all file information for {table_name}")
699            # Get just columns where datatype is fileref
700            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
701            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
702            # Get unique list of file uuids
703            file_uuids = list(
704                set(
705                    [
706                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
707                    ]
708                )
709            )
710            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
711            all_metadata_file_uuids.extend(file_uuids)
712            # Make full list unique
713            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
714        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
715        return all_metadata_file_uuids

Get all file UUIDs from the metadata of a dataset.

Args:

  • dataset_id (str): The ID of the dataset.

Returns:

  • list[str]: A list of file UUIDs from the dataset metadata.
def soft_delete_entries( self, dataset_id: str, table_name: str, datarepo_row_ids: list[str], check_intervals: int = 15) -> Optional[dict]:
717    def soft_delete_entries(
718            self,
719            dataset_id: str,
720            table_name: str,
721            datarepo_row_ids: list[str],
722            check_intervals: int = 15
723    ) -> Optional[dict]:
724        """
725        Soft delete specific records from a table.
726
727        **Args:**
728        - dataset_id (str): The ID of the dataset.
729        - table_name (str): The name of the target table.
730        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
731        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
732
733        **Returns:**
734        - dict (optional): A dictionary containing the response from the soft delete operation job
735        monitoring. Returns None if no row IDs are provided.
736        """
737        if not datarepo_row_ids:
738            logging.info(f"No records found to soft delete in table {table_name}")
739            return None
740        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
741        uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes"
742        payload = {
743            "deleteType": "soft",
744            "specType": "jsonArray",
745            "tables": [
746                {
747                    "tableName": table_name,
748                    "jsonArraySpec": {
749                        "rowIds": datarepo_row_ids
750                    }
751                }
752            ]
753        }
754        response = self.request_util.run_request(
755            method=POST,
756            uri=uri,
757            data=json.dumps(payload),
758            content_type=APPLICATION_JSON
759        )
760        job_id = response.json()["id"]
761        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()

Soft delete specific records from a table.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the target table.
  • datarepo_row_ids (list[str]): A list of row IDs to be deleted.
  • check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to 15.

Returns:

  • dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
def soft_delete_all_table_entries( self, dataset_id: str, table_name: str, query_limit: int = 1000, check_intervals: int = 15) -> Optional[dict]:
763    def soft_delete_all_table_entries(
764            self,
765            dataset_id: str,
766            table_name: str,
767            query_limit: int = 1000,
768            check_intervals: int = 15
769    ) -> Optional[dict]:
770        """
771        Soft deletes all records in a table.
772
773        **Args:**
774        - dataset_id (str): The ID of the dataset.
775        - table_name (str): The name of the target table.
776        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
777        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
778
779        **Returns:**
780        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
781        None if no row IDs are provided.
782        """
783        dataset_metrics = self.get_dataset_table_metrics(
784            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
785        )
786        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
787        return self.soft_delete_entries(
788            dataset_id=dataset_id,
789            table_name=table_name,
790            datarepo_row_ids=row_ids,
791            check_intervals=check_intervals
792        )

Soft deletes all records in a table.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the target table.
  • query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
  • check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to 15.

Returns:

  • dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
def get_or_create_dataset( self, dataset_name: str, billing_profile: str, schema: dict, description: str, relationships: Optional[list[dict]] = None, delete_existing: bool = False, continue_if_exists: bool = False, additional_properties_dict: Optional[dict] = None) -> str:
794    def get_or_create_dataset(
795            self,
796            dataset_name: str,
797            billing_profile: str,
798            schema: dict,
799            description: str,
800            relationships: Optional[list[dict]] = None,
801            delete_existing: bool = False,
802            continue_if_exists: bool = False,
803            additional_properties_dict: Optional[dict] = None
804    ) -> str:
805        """
806        Get or create a dataset.
807
808        **Args:**
809        - dataset_name (str): The name of the dataset.
810        - billing_profile (str): The billing profile ID.
811        - schema (dict): The schema of the dataset.
812        - description (str): The description of the dataset.
813        - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema.
814                Defaults to None.
815        - additional_properties_dict (Optional[dict], optional): Additional properties
816                for the dataset. Defaults to None.
817        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
818                Defaults to `False`.
819        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
820                Defaults to `False`.
821
822        **Returns:**
823        - str: The ID of the dataset.
824
825        **Raises:**
826        - ValueError: If multiple datasets with the same name are found under the billing profile.
827        """
828        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
829        if existing_datasets:
830            if not continue_if_exists:
831                raise ValueError(
832                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
833                )
834            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
835            if delete_existing:
836                logging.info(f"Deleting existing dataset {dataset_name}")
837                self.delete_dataset(existing_datasets[0]["id"])
838                existing_datasets = []
839            # If not delete_existing and continue_if_exists then grab existing datasets id
840            else:
841                dataset_id = existing_datasets[0]["id"]
842        if not existing_datasets:
843            logging.info("Did not find existing dataset")
844            # Create dataset
845            dataset_id = self.create_dataset(
846                schema=schema,
847                dataset_name=dataset_name,
848                description=description,
849                profile_id=billing_profile,
850                additional_dataset_properties=additional_properties_dict
851            )
852        return dataset_id

Get or create a dataset.

Args:

  • dataset_name (str): The name of the dataset.
  • billing_profile (str): The billing profile ID.
  • schema (dict): The schema of the dataset.
  • description (str): The description of the dataset.
  • relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. Defaults to None.
  • additional_properties_dict (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
  • delete_existing (bool, optional): Whether to delete the existing dataset if found. Defaults to False.
  • continue_if_exists (bool, optional): Whether to continue if the dataset already exists. Defaults to False.

Returns:

  • str: The ID of the dataset.

Raises:

  • ValueError: If multiple datasets with the same name are found under the billing profile.
def create_dataset( self, schema: dict, dataset_name: str, description: str, profile_id: str, additional_dataset_properties: Optional[dict] = None) -> Optional[str]:
854    def create_dataset(  # type: ignore[return]
855            self,
856            schema: dict,
857            dataset_name: str,
858            description: str,
859            profile_id: str,
860            additional_dataset_properties: Optional[dict] = None
861    ) -> Optional[str]:
862        """
863        Create a new dataset.
864
865        **Args:**
866        - schema (dict): The schema of the dataset.
867        - dataset_name (str): The name of the dataset.
868        - description (str): The description of the dataset.
869        - profile_id (str): The billing profile ID.
870        - additional_dataset_properties (Optional[dict], optional): Additional
871                properties for the dataset. Defaults to None.
872
873        **Returns:**
874        - Optional[str]: The ID of the created dataset, or None if creation failed.
875
876        Raises:
877        - ValueError: If the schema validation fails.
878        """
879        dataset_properties = {
880            "name": dataset_name,
881            "description": description,
882            "defaultProfileId": profile_id,
883            "region": "us-central1",
884            "cloudPlatform": GCP,
885            "schema": schema
886        }
887
888        if additional_dataset_properties:
889            dataset_properties.update(additional_dataset_properties)
890        try:
891            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
892        except ValidationError as e:
893            raise ValueError(f"Schema validation error: {e}")
894        uri = f"{self.tdr_link}/datasets"
895        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
896        response = self.request_util.run_request(
897            method=POST,
898            uri=uri,
899            data=json.dumps(dataset_properties),
900            content_type=APPLICATION_JSON
901        )
902        job_id = response.json()["id"]
903        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
904        dataset_id = job_results["id"]  # type: ignore[index]
905        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
906        return dataset_id

Create a new dataset.

Args:

  • schema (dict): The schema of the dataset.
  • dataset_name (str): The name of the dataset.
  • description (str): The description of the dataset.
  • profile_id (str): The billing profile ID.
  • additional_dataset_properties (Optional[dict], optional): Additional properties for the dataset. Defaults to None.

Returns:

  • Optional[str]: The ID of the created dataset, or None if creation failed.

Raises:

  • ValueError: If the schema validation fails.
def update_dataset_schema( self, dataset_id: str, update_note: str, tables_to_add: Optional[list[dict]] = None, relationships_to_add: Optional[list[dict]] = None, columns_to_add: Optional[list[dict]] = None) -> Optional[str]:
908    def update_dataset_schema(  # type: ignore[return]
909            self,
910            dataset_id: str,
911            update_note: str,
912            tables_to_add: Optional[list[dict]] = None,
913            relationships_to_add: Optional[list[dict]] = None,
914            columns_to_add: Optional[list[dict]] = None
915    ) -> Optional[str]:
916        """
917        Update the schema of a dataset.
918
919        **Args:**
920        - dataset_id (str): The ID of the dataset.
921        - update_note (str): A note describing the update.
922        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
923        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
924        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
925
926        **Returns:**
927        - Optional[str]: The ID of the updated dataset, or None if the update failed.
928
929        **Raises:**
930        - ValueError: If the schema validation fails.
931        """
932        uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema"
933        request_body: dict = {"description": f"{update_note}", "changes": {}}
934        if tables_to_add:
935            request_body["changes"]["addTables"] = tables_to_add
936        if relationships_to_add:
937            request_body["changes"]["addRelationships"] = relationships_to_add
938        if columns_to_add:
939            request_body["changes"]["addColumns"] = columns_to_add
940        try:
941            UpdateSchema(**request_body)
942        except ValidationError as e:
943            raise ValueError(f"Schema validation error: {e}")
944
945        response = self.request_util.run_request(
946            uri=uri,
947            method=POST,
948            content_type=APPLICATION_JSON,
949            data=json.dumps(request_body)
950        )
951        job_id = response.json()["id"]
952        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
953        dataset_id = job_results["id"]  # type: ignore[index]
954        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
955        return dataset_id

Update the schema of a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • update_note (str): A note describing the update.
  • tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
  • relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
  • columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.

Returns:

  • Optional[str]: The ID of the updated dataset, or None if the update failed.

Raises:

  • ValueError: If the schema validation fails.
def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
 995    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
 996        """
 997        Return all the metadata about files in a given snapshot.
 998
 999        Not all files can be returned at once, so the API
1000        is used repeatedly until all "batches" have been returned.
1001
1002        **Args:**
1003        - snapshot_id (str): The ID of the snapshot.
1004        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
1005
1006        **Returns:**
1007        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
1008        """
1009        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files"
1010        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)

Return all the metadata about files in a given snapshot.

Not all files can be returned at once, so the API is used repeatedly until all "batches" have been returned.

Args:

  • snapshot_id (str): The ID of the snapshot.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.

Returns:

  • list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
def get_dataset_snapshots(self, dataset_id: str) -> requests.models.Response:
1012    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
1013        """
1014        Return snapshots belonging to specified dataset.
1015
1016        **Args:**
1017        - dataset_id: uuid of dataset to query.
1018
1019        **Returns:**
1020        - requests.Response: The response from the request.
1021        """
1022        uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}"
1023        return self.request_util.run_request(
1024            uri=uri,
1025            method=GET
1026        )

Return snapshots belonging to specified dataset.

Args:

  • dataset_id: uuid of dataset to query.

Returns:

  • requests.Response: The response from the request.
def create_snapshot( self, snapshot_name: str, description: str, dataset_name: str, snapshot_mode: str, profile_id: str, stewards: Optional[list[str]] = [], readers: Optional[list[str]] = [], consent_code: Optional[str] = None, duos_id: Optional[str] = None, data_access_control_groups: Optional[list[str]] = None) -> None:
1028    def create_snapshot(
1029            self,
1030            snapshot_name: str,
1031            description: str,
1032            dataset_name: str,
1033            snapshot_mode: str,  # byFullView is entire dataset
1034            profile_id: str,
1035            stewards: Optional[list[str]] = [],
1036            readers: Optional[list[str]] = [],
1037            consent_code: Optional[str] = None,
1038            duos_id: Optional[str] = None,
1039            data_access_control_groups: Optional[list[str]] = None,
1040    ) -> None:
1041        """
1042        Create a snapshot in TDR.
1043
1044        **Returns:**
1045        - requests.Response: The response from the request.
1046        """
1047        uri = f"{self.tdr_link}/snapshots"
1048        payload = {
1049            "name": snapshot_name,
1050            "description": description,
1051            "contents": [
1052                {
1053                    "datasetName": dataset_name,
1054                    "mode": snapshot_mode,
1055                }
1056            ],
1057            "policies": {
1058                "stewards": stewards,
1059                "readers": readers,
1060            },
1061            "profileId": profile_id,
1062            "globalFileIds": True,
1063        }
1064        if consent_code:
1065            payload["consentCode"] = consent_code
1066        if duos_id:
1067            payload["duosId"] = duos_id
1068        if data_access_control_groups:
1069            payload["dataAccessControlGroups"] = data_access_control_groups
1070        logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}")
1071        response = self.request_util.run_request(
1072            uri=uri,
1073            method=POST,
1074            content_type=APPLICATION_JSON,
1075            data=json.dumps(payload)
1076        )
1077        job_id = response.json()["id"]
1078        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
1079        snapshot_id = job_results["id"]  # type: ignore[index]
1080        logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")

Create a snapshot in TDR.

Returns:

  • requests.Response: The response from the request.
class FilterOutSampleIdsAlreadyInDataset:
1083class FilterOutSampleIdsAlreadyInDataset:
1084    """Class to filter ingest metrics to remove sample IDs that already exist in the dataset."""
1085
1086    def __init__(
1087            self,
1088            ingest_metrics: list[dict],
1089            dataset_id: str,
1090            tdr: TDR,
1091            target_table_name: str,
1092            filter_entity_id: str
1093    ):
1094        """
1095        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1096
1097        **Args:**
1098        - ingest_metrics (list[dict]): The metrics to be ingested.
1099        - dataset_id (str): The ID of the dataset.
1100        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1101        - target_table_name (str): The name of the target table.
1102        - filter_entity_id (str): The entity ID to filter on.
1103        """
1104        self.ingest_metrics = ingest_metrics
1105        """@private"""
1106        self.tdr = tdr
1107        """@private"""
1108        self.dataset_id = dataset_id
1109        """@private"""
1110        self.target_table_name = target_table_name
1111        """@private"""
1112        self.filter_entity_id = filter_entity_id
1113        """@private"""
1114
1115    def run(self) -> list[dict]:
1116        """
1117        Run the filter process to remove sample IDs that already exist in the dataset.
1118
1119        **Returns:**
1120        - list[dict]: The filtered ingest metrics.
1121        """
1122        # Get all sample ids that already exist in dataset
1123        logging.info(
1124            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1125            f"dataset {self.dataset_id}"
1126        )
1127
1128        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1129            dataset_id=self.dataset_id,
1130            target_table_name=self.target_table_name,
1131            entity_id=self.filter_entity_id
1132        )
1133        # Filter out rows that already exist in dataset
1134        filtered_ingest_metrics = [
1135            row
1136            for row in self.ingest_metrics
1137            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1138        ]
1139        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1140            logging.info(
1141                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1142                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1143            )
1144
1145            if filtered_ingest_metrics:
1146                return filtered_ingest_metrics
1147            else:
1148                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1149                return []
1150        else:
1151            logging.info("No rows were filtered out as they all do not exist in dataset")
1152            return filtered_ingest_metrics

Class to filter ingest metrics to remove sample IDs that already exist in the dataset.

FilterOutSampleIdsAlreadyInDataset( ingest_metrics: list[dict], dataset_id: str, tdr: TDR, target_table_name: str, filter_entity_id: str)
1086    def __init__(
1087            self,
1088            ingest_metrics: list[dict],
1089            dataset_id: str,
1090            tdr: TDR,
1091            target_table_name: str,
1092            filter_entity_id: str
1093    ):
1094        """
1095        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1096
1097        **Args:**
1098        - ingest_metrics (list[dict]): The metrics to be ingested.
1099        - dataset_id (str): The ID of the dataset.
1100        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1101        - target_table_name (str): The name of the target table.
1102        - filter_entity_id (str): The entity ID to filter on.
1103        """
1104        self.ingest_metrics = ingest_metrics
1105        """@private"""
1106        self.tdr = tdr
1107        """@private"""
1108        self.dataset_id = dataset_id
1109        """@private"""
1110        self.target_table_name = target_table_name
1111        """@private"""
1112        self.filter_entity_id = filter_entity_id
1113        """@private"""

Initialize the FilterOutSampleIdsAlreadyInDataset class.

Args:

  • ingest_metrics (list[dict]): The metrics to be ingested.
  • dataset_id (str): The ID of the dataset.
  • tdr (ops_utils.tdr_utils.tdr_utils.TDR): The TDR instance
  • target_table_name (str): The name of the target table.
  • filter_entity_id (str): The entity ID to filter on.
def run(self) -> list[dict]:
1115    def run(self) -> list[dict]:
1116        """
1117        Run the filter process to remove sample IDs that already exist in the dataset.
1118
1119        **Returns:**
1120        - list[dict]: The filtered ingest metrics.
1121        """
1122        # Get all sample ids that already exist in dataset
1123        logging.info(
1124            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1125            f"dataset {self.dataset_id}"
1126        )
1127
1128        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1129            dataset_id=self.dataset_id,
1130            target_table_name=self.target_table_name,
1131            entity_id=self.filter_entity_id
1132        )
1133        # Filter out rows that already exist in dataset
1134        filtered_ingest_metrics = [
1135            row
1136            for row in self.ingest_metrics
1137            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1138        ]
1139        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1140            logging.info(
1141                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1142                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1143            )
1144
1145            if filtered_ingest_metrics:
1146                return filtered_ingest_metrics
1147            else:
1148                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1149                return []
1150        else:
1151            logging.info("No rows were filtered out as they all do not exist in dataset")
1152            return filtered_ingest_metrics

Run the filter process to remove sample IDs that already exist in the dataset.

Returns:

  • list[dict]: The filtered ingest metrics.