ops_utils.tdr_utils.tdr_api_utils

Utility classes for interacting with TDR API.

   1"""Utility classes for interacting with TDR API."""
   2
   3import json
   4import logging
   5import requests
   6from typing import Any, Optional, Union
   7from pydantic import ValidationError
   8
   9from ..request_util import GET, POST, DELETE, PUT, RunRequest
  10from ..tdr_api_schema.create_dataset_schema import CreateDatasetSchema
  11from ..tdr_api_schema.update_dataset_schema import UpdateSchema
  12from .tdr_job_utils import MonitorTDRJob, SubmitAndMonitorMultipleJobs
  13from ..vars import ARG_DEFAULTS, GCP, APPLICATION_JSON
  14
  15
  16class TDR:
  17    """Class to interact with the Terra Data Repository (TDR) API."""
  18
  19    PROD_LINK = "https://data.terra.bio/api/repository/v1"
  20    DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1"
  21    """(str): The base URL for the TDR API."""
  22
  23    def __init__(self, request_util: RunRequest, env: str = 'prod'):
  24        """
  25        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
  26
  27        **Args:**
  28        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
  29        """
  30        self.request_util = request_util
  31        if env.lower() == 'prod':
  32            self.tdr_link = self.PROD_LINK
  33        elif env.lower() == 'dev':
  34            self.tdr_link = self.DEV_LINK
  35        else:
  36            raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.")
  37        """@private"""
  38
  39    @staticmethod
  40    def _check_policy(policy: str) -> None:
  41        """
  42        Check if the policy is valid.
  43
  44        **Args:**
  45        - policy (str): The role to check.
  46
  47        **Raises:**
  48        - ValueError: If the policy is not one of the allowed options.
  49        """
  50        if policy not in ["steward", "custodian", "snapshot_creator"]:
  51            raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator")
  52
  53    def get_dataset_files(
  54            self,
  55            dataset_id: str,
  56            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
  57    ) -> list[dict]:
  58        """
  59        Get all files in a dataset.
  60
  61        Returns json like below
  62
  63            {
  64                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
  65                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
  66                "path": "/path/set/in/ingest.csv",
  67                "size": 1722,
  68                "checksums": [
  69                    {
  70                        "checksum": "82f7e79v",
  71                        "type": "crc32c"
  72                    },
  73                    {
  74                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
  75                        "type": "md5"
  76                    }
  77                ],
  78                "created": "2024-13-11T15:01:00.256Z",
  79                "description": null,
  80                "fileType": "file",
  81                "fileDetail": {
  82                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
  83                    "mimeType": null,
  84                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
  85                    "loadTag": "RP_3333-RP_3333"
  86                },
  87                "directoryDetail": null
  88            }
  89
  90        **Args:**
  91        - dataset_id (str): The ID of the dataset.
  92        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
  93
  94        **Returns:**
  95        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
  96        """
  97        uri = f"{self.tdr_link}/datasets/{dataset_id}/files"
  98        logging.info(f"Getting all files in dataset {dataset_id}")
  99        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 100
 101    def create_file_dict(
 102            self,
 103            dataset_id: str,
 104            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 105    ) -> dict:
 106        """
 107        Create a dictionary of all files in a dataset where the key is the file UUID.
 108
 109        **Args:**
 110        - dataset_id (str): The ID of the dataset.
 111        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 112
 113        **Returns:**
 114        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
 115        """
 116        return {
 117            file_dict["fileId"]: file_dict
 118            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 119        }
 120
 121    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
 122            self,
 123            dataset_id: str,
 124            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 125    ) -> dict:
 126        """
 127        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
 128
 129        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
 130
 131        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
 132
 133        **Args:**
 134        - dataset_id (str): The ID of the dataset.
 135        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 136
 137        **Returns:**
 138        - dict: A dictionary where the key is the file UUID and the value is the file path.
 139        """
 140        return {
 141            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
 142            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 143        }
 144
 145    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
 146        """
 147        Delete a file from a dataset.
 148
 149        **Args:**
 150        - file_id (str): The ID of the file to be deleted.
 151        - dataset_id (str): The ID of the dataset.
 152
 153        **Returns:**
 154        - requests.Response: The response from the request.
 155        """
 156        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}"
 157        logging.info(f"Submitting delete job for file {file_id}")
 158        return self.request_util.run_request(uri=uri, method=DELETE)
 159
 160    def delete_files(
 161            self,
 162            file_ids: list[str],
 163            dataset_id: str,
 164            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
 165            check_interval: int = 15) -> None:
 166        """
 167        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
 168
 169        **Args:**
 170        - file_ids (list[str]): A list of file IDs to be deleted.
 171        - dataset_id (str): The ID of the dataset.
 172        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
 173        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 174        """
 175        SubmitAndMonitorMultipleJobs(
 176            tdr=self,
 177            job_function=self.delete_file,
 178            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
 179            batch_size=batch_size_to_delete_files,
 180            check_interval=check_interval
 181        ).run()
 182
 183    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 184        """
 185        Add a user to a dataset with a specified policy.
 186
 187        **Args:**
 188        - dataset_id (str): The ID of the dataset.
 189        - user (str): The email of the user to be added.
 190        - policy (str): The policy to be assigned to the user.
 191                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 192
 193        **Returns:**
 194        - requests.Response: The response from the request.
 195
 196        **Raises:**
 197        - ValueError: If the policy is not valid.
 198        """
 199        self._check_policy(policy)
 200        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members"
 201        member_dict = {"email": user}
 202        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
 203        return self.request_util.run_request(
 204            uri=uri,
 205            method=POST,
 206            data=json.dumps(member_dict),
 207            content_type=APPLICATION_JSON
 208        )
 209
 210    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 211        """
 212        Remove a user from a dataset.
 213
 214        **Args:**
 215        - dataset_id (str): The ID of the dataset.
 216        - user (str): The email of the user to be removed.
 217        - policy (str): The policy to be removed from the user.
 218                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 219
 220        **Returns:**
 221        - requests.Response: The response from the request.
 222
 223        **Raises:**
 224        - ValueError: If the policy is not valid.
 225        """
 226        self._check_policy(policy)
 227        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}"
 228        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
 229        return self.request_util.run_request(uri=uri, method=DELETE)
 230
 231    def delete_dataset(self, dataset_id: str) -> None:
 232        """
 233        Delete a dataset and monitors the job until completion.
 234
 235        **Args:**
 236            dataset_id (str): The ID of the dataset to be deleted.
 237        """
 238        uri = f"{self.tdr_link}/datasets/{dataset_id}"
 239        logging.info(f"Deleting dataset {dataset_id}")
 240        response = self.request_util.run_request(uri=uri, method=DELETE)
 241        job_id = response.json()['id']
 242        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
 243
 244    def make_snapshot_public(self, snapshot_id: str) -> requests.Response:
 245        """
 246        Make a snapshot public.
 247
 248        **Args:**
 249        - snapshot_id (str): The ID of the snapshot to be made public.
 250
 251        **Returns:**
 252        - requests.Response: The response from the request.
 253        """
 254        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public"
 255        logging.info(f"Making snapshot {snapshot_id} public")
 256        return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")
 257
 258    def get_snapshot_info(
 259            self,
 260            snapshot_id: str,
 261            continue_not_found: bool = False,
 262            info_to_include: Optional[list[str]] = None
 263    ) -> Optional[requests.Response]:
 264        """
 265        Get information about a snapshot.
 266
 267        **Args:**
 268        - snapshot_id (str): The ID of the snapshot.
 269        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
 270        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
 271                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
 272                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
 273
 274        **Returns:**
 275        - requests.Response (optional): The response from the request (returns None if the snapshot is not
 276         found or access is denied).
 277        """
 278        acceptable_return_code = [404, 403] if continue_not_found else []
 279        acceptable_include_info = [
 280            "SOURCES",
 281            "TABLES",
 282            "RELATIONSHIPS",
 283            "ACCESS_INFORMATION",
 284            "PROFILE",
 285            "PROPERTIES",
 286            "DATA_PROJECT",
 287            "CREATION_INFORMATION",
 288            "DUOS"
 289        ]
 290        if info_to_include:
 291            if not all(info in acceptable_include_info for info in info_to_include):
 292                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 293            include_string = '&include='.join(info_to_include)
 294        else:
 295            include_string = ""
 296        uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}"
 297        response = self.request_util.run_request(
 298            uri=uri,
 299            method=GET,
 300            accept_return_codes=acceptable_return_code
 301        )
 302        if response.status_code == 404:
 303            logging.warning(f"Snapshot {snapshot_id} not found")
 304            return None
 305        if response.status_code == 403:
 306            logging.warning(f"Access denied for snapshot {snapshot_id}")
 307            return None
 308        return response
 309
 310    def delete_snapshots(
 311            self,
 312            snapshot_ids: list[str],
 313            batch_size: int = 25,
 314            check_interval: int = 10,
 315            verbose: bool = False) -> None:
 316        """
 317        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
 318
 319        **Args:**
 320        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
 321        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
 322        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
 323        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 324        """
 325        SubmitAndMonitorMultipleJobs(
 326            tdr=self,
 327            job_function=self.delete_snapshot,
 328            job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
 329            batch_size=batch_size,
 330            check_interval=check_interval,
 331            verbose=verbose
 332        ).run()
 333
 334    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
 335        """
 336        Delete a snapshot.
 337
 338        **Args:**
 339        - snapshot_id (str): The ID of the snapshot to be deleted.
 340
 341        **Returns:**
 342        - requests.Response: The response from the request.
 343        """
 344        uri = f"{self.tdr_link}/snapshots/{snapshot_id}"
 345        logging.info(f"Deleting snapshot {snapshot_id}")
 346        return self.request_util.run_request(uri=uri, method=DELETE)
 347
 348    def _yield_existing_datasets(
 349            self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc"
 350    ) -> Any:
 351        """
 352        Get all datasets in TDR, optionally filtered by dataset name.
 353
 354        **Args:**
 355            filter (Optional[str]): A filter string to match dataset names. Defaults to None.
 356            batch_size (int): The number of datasets to retrieve per batch. Defaults to 100.
 357            direction (str): The direction to sort the datasets by creation date. Defaults to "asc".
 358
 359        Yields:
 360            Any: A generator yielding datasets.
 361        """
 362        offset = 0
 363        if filter:
 364            filter_str = f"&filter={filter}"
 365            log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}"
 366        else:
 367            filter_str = ""
 368            log_message = f"Searching for all datasets in batches of {batch_size}"
 369        logging.info(log_message)
 370        while True:
 371            uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}"  # noqa: E501
 372            response = self.request_util.run_request(uri=uri, method=GET)
 373            datasets = response.json()["items"]
 374            if not datasets:
 375                break
 376            for dataset in datasets:
 377                yield dataset
 378            offset += batch_size
 379            break
 380
 381    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
 382        """
 383        Check if a dataset exists by name and optionally by billing profile.
 384
 385        **Args:**
 386        - dataset_name (str): The name of the dataset to check.
 387        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
 388
 389        **Returns:**
 390        - list[dict]: A list of matching datasets.
 391        """
 392        matching_datasets = []
 393        for dataset in self._yield_existing_datasets(filter=dataset_name):
 394            # Search uses wildcard so could grab more datasets where dataset_name is substring
 395            if dataset_name == dataset["name"]:
 396                if billing_profile:
 397                    if dataset["defaultProfileId"] == billing_profile:
 398                        logging.info(
 399                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
 400                        dataset_id = dataset["id"]
 401                        logging.info(f"Dataset ID: {dataset_id}")
 402                        matching_datasets.append(dataset)
 403                    else:
 404                        logging.warning(
 405                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
 406                            f"and not under billing profile {billing_profile}"
 407                        )
 408                        # Datasets names need to be unique regardless of billing profile, so raise an error if
 409                        # a dataset with the same name is found but is not under the requested billing profile
 410                        raise ValueError(
 411                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
 412                else:
 413                    matching_datasets.append(dataset)
 414        return matching_datasets
 415
 416    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
 417        """
 418        Get information about a dataset.
 419
 420        **Args:**
 421        - dataset_id (str): The ID of the dataset.
 422        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
 423        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
 424        Defaults to None.
 425
 426        **Returns:**
 427        - requests.Response: The response from the request.
 428
 429        **Raises:**
 430        - ValueError: If `info_to_include` contains invalid information types.
 431        """
 432        acceptable_include_info = [
 433            "SCHEMA",
 434            "ACCESS_INFORMATION",
 435            "PROFILE",
 436            "PROPERTIES",
 437            "DATA_PROJECT",
 438            "STORAGE",
 439            "SNAPSHOT_BUILDER_SETTING"
 440        ]
 441        if info_to_include:
 442            if not all(info in acceptable_include_info for info in info_to_include):
 443                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 444            include_string = '&include='.join(info_to_include)
 445        else:
 446            include_string = ""
 447        uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}"
 448        return self.request_util.run_request(uri=uri, method=GET)
 449
 450    def get_table_schema_info(
 451            self,
 452            dataset_id: str,
 453            table_name: str,
 454            dataset_info: Optional[dict] = None
 455    ) -> Union[dict, None]:
 456        """
 457        Get schema information for a specific table within a dataset.
 458
 459        **Args:**
 460        - dataset_id (str): The ID of the dataset.
 461        - table_name (str): The name of the table.
 462        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
 463
 464        **Returns:**
 465        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
 466        """
 467        if not dataset_info:
 468            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 469        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
 470            if table["name"] == table_name:
 471                return table
 472        return None
 473
 474    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
 475        """
 476        Retrieve the result of a job.
 477
 478        **Args:**
 479        - job_id (str): The ID of the job.
 480        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
 481
 482        **Returns:**
 483        - requests.Response: The response from the request.
 484        """
 485        uri = f"{self.tdr_link}/jobs/{job_id}/result"
 486        # If job is expected to fail, accept any return code
 487        acceptable_return_code = list(range(100, 600)) if expect_failure else []
 488        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
 489
 490    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
 491        """
 492        Load data into a TDR dataset.
 493
 494        **Args:**
 495        - dataset_id (str): The ID of the dataset.
 496        - data (dict): The data to be ingested.
 497
 498        **Returns:**
 499        - requests.Response: The response from the request.
 500        """
 501        uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest"
 502        logging.info(
 503            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
 504            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
 505        return self.request_util.run_request(
 506            uri=uri,
 507            method=POST,
 508            content_type=APPLICATION_JSON,
 509            data=data
 510        )
 511
 512    def file_ingest_to_dataset(
 513            self,
 514            dataset_id: str,
 515            profile_id: str,
 516            file_list: list[dict],
 517            load_tag: str = "file_ingest_load_tag"
 518    ) -> dict:
 519        """
 520        Load files into a TDR dataset.
 521
 522        **Args:**
 523        - dataset_id (str): The ID of the dataset.
 524        - profile_id (str): The billing profile ID.
 525        - file_list (list[dict]): The list of files to be ingested.
 526        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
 527
 528        **Returns:**
 529        - dict: A dictionary containing the response from the ingest operation job monitoring.
 530        """
 531        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array"
 532        data = {
 533            "profileId": profile_id,
 534            "loadTag": f"{load_tag}",
 535            "maxFailedFileLoads": 0,
 536            "loadArray": file_list
 537        }
 538
 539        response = self.request_util.run_request(
 540            uri=uri,
 541            method=POST,
 542            content_type=APPLICATION_JSON,
 543            data=json.dumps(data)
 544        )
 545        job_id = response.json()['id']
 546        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 547        return job_results  # type: ignore[return-value]
 548
 549    def get_dataset_table_metrics(
 550            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
 551    ) -> list[dict]:
 552        """
 553        Retrieve all metrics for a specific table within a dataset.
 554
 555        **Args:**
 556        - dataset_id (str): The ID of the dataset.
 557        - target_table_name (str): The name of the target table.
 558        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 559
 560        **Returns:**
 561        - list[dict]: A list of dictionaries containing the metrics for the specified table.
 562        """
 563        return [
 564            metric
 565            for metric in self._yield_dataset_metrics(
 566                dataset_id=dataset_id,
 567                target_table_name=target_table_name,
 568                query_limit=query_limit
 569            )
 570        ]
 571
 572    def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any:
 573        """
 574        Yield all entity metrics from a dataset.
 575
 576        **Args:**
 577            dataset_id (str): The ID of the dataset.
 578            target_table_name (str): The name of the target table.
 579            query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
 580
 581        Yields:
 582            Any: A generator yielding dictionaries containing the metrics for the specified table.
 583        """
 584        search_request = {
 585            "offset": 0,
 586            "limit": query_limit,
 587            "sort": "datarepo_row_id"
 588        }
 589        uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}"
 590        while True:
 591            batch_number = int((search_request["offset"] / query_limit)) + 1  # type: ignore[operator]
 592            response = self.request_util.run_request(
 593                uri=uri,
 594                method=POST,
 595                content_type=APPLICATION_JSON,
 596                data=json.dumps(search_request)
 597            )
 598            if not response or not response.json()["result"]:
 599                break
 600            logging.info(
 601                f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " +
 602                f"dataset {dataset_id}"
 603            )
 604            for record in response.json()["result"]:
 605                yield record
 606            search_request["offset"] += query_limit  # type: ignore[operator]
 607
 608    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
 609        """
 610        Get existing IDs from a dataset.
 611
 612        **Args:**
 613        - dataset_id (str): The ID of the dataset.
 614        - target_table_name (str): The name of the target table.
 615        - entity_id (str): The entity ID to retrieve.
 616
 617        **Returns:**
 618        - list[str]: A list of entity IDs from the specified table.
 619        """
 620        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
 621        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
 622
 623    def get_job_status(self, job_id: str) -> requests.Response:
 624        """
 625        Retrieve the status of a job.
 626
 627        **Args:**
 628        - job_id (str): The ID of the job.
 629
 630        **Returns:**
 631        - requests.Response: The response from the request.
 632        """
 633        uri = f"{self.tdr_link}/jobs/{job_id}"
 634        return self.request_util.run_request(uri=uri, method=GET)
 635
 636    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
 637        """
 638        Get all file UUIDs from the metadata of a dataset.
 639
 640        **Args:**
 641        - dataset_id (str): The ID of the dataset.
 642
 643        **Returns:**
 644        - list[str]: A list of file UUIDs from the dataset metadata.
 645        """
 646        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 647        all_metadata_file_uuids = []
 648        tables = 0
 649        for table in dataset_info["schema"]["tables"]:
 650            tables += 1
 651            table_name = table["name"]
 652            logging.info(f"Getting all file information for {table_name}")
 653            # Get just columns where datatype is fileref
 654            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
 655            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
 656            # Get unique list of file uuids
 657            file_uuids = list(
 658                set(
 659                    [
 660                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
 661                    ]
 662                )
 663            )
 664            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
 665            all_metadata_file_uuids.extend(file_uuids)
 666            # Make full list unique
 667            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
 668        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
 669        return all_metadata_file_uuids
 670
 671    def soft_delete_entries(
 672            self,
 673            dataset_id: str,
 674            table_name: str,
 675            datarepo_row_ids: list[str],
 676            check_intervals: int = 15
 677    ) -> Optional[dict]:
 678        """
 679        Soft delete specific records from a table.
 680
 681        **Args:**
 682        - dataset_id (str): The ID of the dataset.
 683        - table_name (str): The name of the target table.
 684        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
 685        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 686
 687        **Returns:**
 688        - dict (optional): A dictionary containing the response from the soft delete operation job
 689        monitoring. Returns None if no row IDs are provided.
 690        """
 691        if not datarepo_row_ids:
 692            logging.info(f"No records found to soft delete in table {table_name}")
 693            return None
 694        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
 695        uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes"
 696        payload = {
 697            "deleteType": "soft",
 698            "specType": "jsonArray",
 699            "tables": [
 700                {
 701                    "tableName": table_name,
 702                    "jsonArraySpec": {
 703                        "rowIds": datarepo_row_ids
 704                    }
 705                }
 706            ]
 707        }
 708        response = self.request_util.run_request(
 709            method=POST,
 710            uri=uri,
 711            data=json.dumps(payload),
 712            content_type=APPLICATION_JSON
 713        )
 714        job_id = response.json()["id"]
 715        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
 716
 717    def soft_delete_all_table_entries(
 718            self,
 719            dataset_id: str,
 720            table_name: str,
 721            query_limit: int = 1000,
 722            check_intervals: int = 15
 723    ) -> Optional[dict]:
 724        """
 725        Soft deletes all records in a table.
 726
 727        **Args:**
 728        - dataset_id (str): The ID of the dataset.
 729        - table_name (str): The name of the target table.
 730        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 731        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 732
 733        **Returns:**
 734        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
 735        None if no row IDs are provided.
 736        """
 737        dataset_metrics = self.get_dataset_table_metrics(
 738            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
 739        )
 740        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
 741        return self.soft_delete_entries(
 742            dataset_id=dataset_id,
 743            table_name=table_name,
 744            datarepo_row_ids=row_ids,
 745            check_intervals=check_intervals
 746        )
 747
 748    def get_or_create_dataset(
 749            self,
 750            dataset_name: str,
 751            billing_profile: str,
 752            schema: dict,
 753            description: str,
 754            relationships: Optional[list[dict]] = None,
 755            delete_existing: bool = False,
 756            continue_if_exists: bool = False,
 757            additional_properties_dict: Optional[dict] = None
 758    ) -> str:
 759        """
 760        Get or create a dataset.
 761
 762        **Args:**
 763        - dataset_name (str): The name of the dataset.
 764        - billing_profile (str): The billing profile ID.
 765        - schema (dict): The schema of the dataset.
 766        - description (str): The description of the dataset.
 767        - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema.
 768                Defaults to None.
 769        - additional_properties_dict (Optional[dict], optional): Additional properties
 770                for the dataset. Defaults to None.
 771        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
 772                Defaults to `False`.
 773        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
 774                Defaults to `False`.
 775
 776        **Returns:**
 777        - str: The ID of the dataset.
 778
 779        **Raises:**
 780        - ValueError: If multiple datasets with the same name are found under the billing profile.
 781        """
 782        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
 783        if existing_datasets:
 784            if not continue_if_exists:
 785                raise ValueError(
 786                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
 787                )
 788            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
 789            if delete_existing:
 790                logging.info(f"Deleting existing dataset {dataset_name}")
 791                self.delete_dataset(existing_datasets[0]["id"])
 792                existing_datasets = []
 793            # If not delete_existing and continue_if_exists then grab existing datasets id
 794            else:
 795                dataset_id = existing_datasets[0]["id"]
 796        if not existing_datasets:
 797            logging.info("Did not find existing dataset")
 798            # Create dataset
 799            dataset_id = self.create_dataset(
 800                schema=schema,
 801                dataset_name=dataset_name,
 802                description=description,
 803                profile_id=billing_profile,
 804                additional_dataset_properties=additional_properties_dict
 805            )
 806        return dataset_id
 807
 808    def create_dataset(  # type: ignore[return]
 809            self,
 810            schema: dict,
 811            dataset_name: str,
 812            description: str,
 813            profile_id: str,
 814            additional_dataset_properties: Optional[dict] = None
 815    ) -> Optional[str]:
 816        """
 817        Create a new dataset.
 818
 819        **Args:**
 820        - schema (dict): The schema of the dataset.
 821        - dataset_name (str): The name of the dataset.
 822        - description (str): The description of the dataset.
 823        - profile_id (str): The billing profile ID.
 824        - additional_dataset_properties (Optional[dict], optional): Additional
 825                properties for the dataset. Defaults to None.
 826
 827        **Returns:**
 828        - Optional[str]: The ID of the created dataset, or None if creation failed.
 829
 830        Raises:
 831        - ValueError: If the schema validation fails.
 832        """
 833        dataset_properties = {
 834            "name": dataset_name,
 835            "description": description,
 836            "defaultProfileId": profile_id,
 837            "region": "us-central1",
 838            "cloudPlatform": GCP,
 839            "schema": schema
 840        }
 841
 842        if additional_dataset_properties:
 843            dataset_properties.update(additional_dataset_properties)
 844        try:
 845            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
 846        except ValidationError as e:
 847            raise ValueError(f"Schema validation error: {e}")
 848        uri = f"{self.tdr_link}/datasets"
 849        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
 850        response = self.request_util.run_request(
 851            method=POST,
 852            uri=uri,
 853            data=json.dumps(dataset_properties),
 854            content_type=APPLICATION_JSON
 855        )
 856        job_id = response.json()["id"]
 857        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 858        dataset_id = job_results["id"]  # type: ignore[index]
 859        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
 860        return dataset_id
 861
 862    def update_dataset_schema(  # type: ignore[return]
 863            self,
 864            dataset_id: str,
 865            update_note: str,
 866            tables_to_add: Optional[list[dict]] = None,
 867            relationships_to_add: Optional[list[dict]] = None,
 868            columns_to_add: Optional[list[dict]] = None
 869    ) -> Optional[str]:
 870        """
 871        Update the schema of a dataset.
 872
 873        **Args:**
 874        - dataset_id (str): The ID of the dataset.
 875        - update_note (str): A note describing the update.
 876        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
 877        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
 878        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
 879
 880        **Returns:**
 881        - Optional[str]: The ID of the updated dataset, or None if the update failed.
 882
 883        **Raises:**
 884        - ValueError: If the schema validation fails.
 885        """
 886        uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema"
 887        request_body: dict = {"description": f"{update_note}", "changes": {}}
 888        if tables_to_add:
 889            request_body["changes"]["addTables"] = tables_to_add
 890        if relationships_to_add:
 891            request_body["changes"]["addRelationships"] = relationships_to_add
 892        if columns_to_add:
 893            request_body["changes"]["addColumns"] = columns_to_add
 894        try:
 895            UpdateSchema(**request_body)
 896        except ValidationError as e:
 897            raise ValueError(f"Schema validation error: {e}")
 898
 899        response = self.request_util.run_request(
 900            uri=uri,
 901            method=POST,
 902            content_type=APPLICATION_JSON,
 903            data=json.dumps(request_body)
 904        )
 905        job_id = response.json()["id"]
 906        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 907        dataset_id = job_results["id"]  # type: ignore[index]
 908        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
 909        return dataset_id
 910
 911    def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]:
 912        """
 913        Get response from a batched endpoint.
 914
 915        Helper method for all GET endpoints that require batching.
 916
 917        Given the URI and the limit (optional), will
 918        loop through batches until all metadata is retrieved.
 919
 920        **Args:**
 921        - uri (str): The base URI for the endpoint (without query params for offset or limit).
 922        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 923
 924        **Returns:**
 925        - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint.
 926        """
 927        batch = 1
 928        offset = 0
 929        metadata: list = []
 930        while True:
 931            logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata")
 932            response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json()
 933
 934            # If no more files, break the loop
 935            if not response_json:
 936                logging.info(f"No more results to retrieve, found {len(metadata)} total records")
 937                break
 938
 939            metadata.extend(response_json)
 940            # Increment the offset by limit for the next page
 941            offset += limit
 942            batch += 1
 943        return metadata
 944
 945    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
 946        """
 947        Return all the metadata about files in a given snapshot.
 948
 949        Not all files can be returned at once, so the API
 950        is used repeatedly until all "batches" have been returned.
 951
 952        **Args:**
 953        - snapshot_id (str): The ID of the snapshot.
 954        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 955
 956        **Returns:**
 957        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
 958        """
 959        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files"
 960        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 961
 962    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
 963        """
 964        Return snapshots belonging to specified dataset.
 965
 966        **Args:**
 967        - dataset_id: uuid of dataset to query.
 968
 969        **Returns:**
 970        - requests.Response: The response from the request.
 971        """
 972        uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}"
 973        return self.request_util.run_request(
 974            uri=uri,
 975            method=GET
 976        )
 977
 978    def create_snapshot(
 979            self,
 980            snapshot_name: str,
 981            description: str,
 982            dataset_name: str,
 983            snapshot_mode: str,  # byFullView is entire dataset
 984            profile_id: str,
 985            stewards: Optional[list[str]] = [],
 986            readers: Optional[list[str]] = [],
 987            consent_code: Optional[str] = None,
 988            duos_id: Optional[str] = None,
 989            data_access_control_groups: Optional[list[str]] = None,
 990    ) -> None:
 991        """
 992        Create a snapshot in TDR.
 993
 994        **Returns:**
 995        - requests.Response: The response from the request.
 996        """
 997        uri = f"{self.tdr_link}/snapshots"
 998        payload = {
 999            "name": snapshot_name,
1000            "description": description,
1001            "contents": [
1002                {
1003                    "datasetName": dataset_name,
1004                    "mode": snapshot_mode,
1005                }
1006            ],
1007            "policies": {
1008                "stewards": stewards,
1009                "readers": readers,
1010            },
1011            "profileId": profile_id,
1012            "globalFileIds": True,
1013        }
1014        if consent_code:
1015            payload["consentCode"] = consent_code
1016        if duos_id:
1017            payload["duosId"] = duos_id
1018        if data_access_control_groups:
1019            payload["dataAccessControlGroups"] = data_access_control_groups
1020        logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}")
1021        response = self.request_util.run_request(
1022            uri=uri,
1023            method=POST,
1024            content_type=APPLICATION_JSON,
1025            data=json.dumps(payload)
1026        )
1027        job_id = response.json()["id"]
1028        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
1029        snapshot_id = job_results["id"]  # type: ignore[index]
1030        logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")
1031
1032
1033class FilterOutSampleIdsAlreadyInDataset:
1034    """Class to filter ingest metrics to remove sample IDs that already exist in the dataset."""
1035
1036    def __init__(
1037            self,
1038            ingest_metrics: list[dict],
1039            dataset_id: str,
1040            tdr: TDR,
1041            target_table_name: str,
1042            filter_entity_id: str
1043    ):
1044        """
1045        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1046
1047        **Args:**
1048        - ingest_metrics (list[dict]): The metrics to be ingested.
1049        - dataset_id (str): The ID of the dataset.
1050        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1051        - target_table_name (str): The name of the target table.
1052        - filter_entity_id (str): The entity ID to filter on.
1053        """
1054        self.ingest_metrics = ingest_metrics
1055        """@private"""
1056        self.tdr = tdr
1057        """@private"""
1058        self.dataset_id = dataset_id
1059        """@private"""
1060        self.target_table_name = target_table_name
1061        """@private"""
1062        self.filter_entity_id = filter_entity_id
1063        """@private"""
1064
1065    def run(self) -> list[dict]:
1066        """
1067        Run the filter process to remove sample IDs that already exist in the dataset.
1068
1069        **Returns:**
1070        - list[dict]: The filtered ingest metrics.
1071        """
1072        # Get all sample ids that already exist in dataset
1073        logging.info(
1074            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1075            f"dataset {self.dataset_id}"
1076        )
1077
1078        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1079            dataset_id=self.dataset_id,
1080            target_table_name=self.target_table_name,
1081            entity_id=self.filter_entity_id
1082        )
1083        # Filter out rows that already exist in dataset
1084        filtered_ingest_metrics = [
1085            row
1086            for row in self.ingest_metrics
1087            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1088        ]
1089        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1090            logging.info(
1091                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1092                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1093            )
1094
1095            if filtered_ingest_metrics:
1096                return filtered_ingest_metrics
1097            else:
1098                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1099                return []
1100        else:
1101            logging.info("No rows were filtered out as they all do not exist in dataset")
1102            return filtered_ingest_metrics
class TDR:
  17class TDR:
  18    """Class to interact with the Terra Data Repository (TDR) API."""
  19
  20    PROD_LINK = "https://data.terra.bio/api/repository/v1"
  21    DEV_LINK = "https://jade.datarepo-dev.broadinstitute.org/api/repository/v1"
  22    """(str): The base URL for the TDR API."""
  23
  24    def __init__(self, request_util: RunRequest, env: str = 'prod'):
  25        """
  26        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
  27
  28        **Args:**
  29        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
  30        """
  31        self.request_util = request_util
  32        if env.lower() == 'prod':
  33            self.tdr_link = self.PROD_LINK
  34        elif env.lower() == 'dev':
  35            self.tdr_link = self.DEV_LINK
  36        else:
  37            raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.")
  38        """@private"""
  39
  40    @staticmethod
  41    def _check_policy(policy: str) -> None:
  42        """
  43        Check if the policy is valid.
  44
  45        **Args:**
  46        - policy (str): The role to check.
  47
  48        **Raises:**
  49        - ValueError: If the policy is not one of the allowed options.
  50        """
  51        if policy not in ["steward", "custodian", "snapshot_creator"]:
  52            raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator")
  53
  54    def get_dataset_files(
  55            self,
  56            dataset_id: str,
  57            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
  58    ) -> list[dict]:
  59        """
  60        Get all files in a dataset.
  61
  62        Returns json like below
  63
  64            {
  65                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
  66                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
  67                "path": "/path/set/in/ingest.csv",
  68                "size": 1722,
  69                "checksums": [
  70                    {
  71                        "checksum": "82f7e79v",
  72                        "type": "crc32c"
  73                    },
  74                    {
  75                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
  76                        "type": "md5"
  77                    }
  78                ],
  79                "created": "2024-13-11T15:01:00.256Z",
  80                "description": null,
  81                "fileType": "file",
  82                "fileDetail": {
  83                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
  84                    "mimeType": null,
  85                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
  86                    "loadTag": "RP_3333-RP_3333"
  87                },
  88                "directoryDetail": null
  89            }
  90
  91        **Args:**
  92        - dataset_id (str): The ID of the dataset.
  93        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
  94
  95        **Returns:**
  96        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
  97        """
  98        uri = f"{self.tdr_link}/datasets/{dataset_id}/files"
  99        logging.info(f"Getting all files in dataset {dataset_id}")
 100        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 101
 102    def create_file_dict(
 103            self,
 104            dataset_id: str,
 105            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 106    ) -> dict:
 107        """
 108        Create a dictionary of all files in a dataset where the key is the file UUID.
 109
 110        **Args:**
 111        - dataset_id (str): The ID of the dataset.
 112        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 113
 114        **Returns:**
 115        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
 116        """
 117        return {
 118            file_dict["fileId"]: file_dict
 119            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 120        }
 121
 122    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
 123            self,
 124            dataset_id: str,
 125            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 126    ) -> dict:
 127        """
 128        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
 129
 130        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
 131
 132        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
 133
 134        **Args:**
 135        - dataset_id (str): The ID of the dataset.
 136        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 137
 138        **Returns:**
 139        - dict: A dictionary where the key is the file UUID and the value is the file path.
 140        """
 141        return {
 142            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
 143            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 144        }
 145
 146    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
 147        """
 148        Delete a file from a dataset.
 149
 150        **Args:**
 151        - file_id (str): The ID of the file to be deleted.
 152        - dataset_id (str): The ID of the dataset.
 153
 154        **Returns:**
 155        - requests.Response: The response from the request.
 156        """
 157        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}"
 158        logging.info(f"Submitting delete job for file {file_id}")
 159        return self.request_util.run_request(uri=uri, method=DELETE)
 160
 161    def delete_files(
 162            self,
 163            file_ids: list[str],
 164            dataset_id: str,
 165            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
 166            check_interval: int = 15) -> None:
 167        """
 168        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
 169
 170        **Args:**
 171        - file_ids (list[str]): A list of file IDs to be deleted.
 172        - dataset_id (str): The ID of the dataset.
 173        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
 174        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 175        """
 176        SubmitAndMonitorMultipleJobs(
 177            tdr=self,
 178            job_function=self.delete_file,
 179            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
 180            batch_size=batch_size_to_delete_files,
 181            check_interval=check_interval
 182        ).run()
 183
 184    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 185        """
 186        Add a user to a dataset with a specified policy.
 187
 188        **Args:**
 189        - dataset_id (str): The ID of the dataset.
 190        - user (str): The email of the user to be added.
 191        - policy (str): The policy to be assigned to the user.
 192                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 193
 194        **Returns:**
 195        - requests.Response: The response from the request.
 196
 197        **Raises:**
 198        - ValueError: If the policy is not valid.
 199        """
 200        self._check_policy(policy)
 201        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members"
 202        member_dict = {"email": user}
 203        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
 204        return self.request_util.run_request(
 205            uri=uri,
 206            method=POST,
 207            data=json.dumps(member_dict),
 208            content_type=APPLICATION_JSON
 209        )
 210
 211    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 212        """
 213        Remove a user from a dataset.
 214
 215        **Args:**
 216        - dataset_id (str): The ID of the dataset.
 217        - user (str): The email of the user to be removed.
 218        - policy (str): The policy to be removed from the user.
 219                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 220
 221        **Returns:**
 222        - requests.Response: The response from the request.
 223
 224        **Raises:**
 225        - ValueError: If the policy is not valid.
 226        """
 227        self._check_policy(policy)
 228        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}"
 229        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
 230        return self.request_util.run_request(uri=uri, method=DELETE)
 231
 232    def delete_dataset(self, dataset_id: str) -> None:
 233        """
 234        Delete a dataset and monitors the job until completion.
 235
 236        **Args:**
 237            dataset_id (str): The ID of the dataset to be deleted.
 238        """
 239        uri = f"{self.tdr_link}/datasets/{dataset_id}"
 240        logging.info(f"Deleting dataset {dataset_id}")
 241        response = self.request_util.run_request(uri=uri, method=DELETE)
 242        job_id = response.json()['id']
 243        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
 244
 245    def make_snapshot_public(self, snapshot_id: str) -> requests.Response:
 246        """
 247        Make a snapshot public.
 248
 249        **Args:**
 250        - snapshot_id (str): The ID of the snapshot to be made public.
 251
 252        **Returns:**
 253        - requests.Response: The response from the request.
 254        """
 255        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public"
 256        logging.info(f"Making snapshot {snapshot_id} public")
 257        return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")
 258
 259    def get_snapshot_info(
 260            self,
 261            snapshot_id: str,
 262            continue_not_found: bool = False,
 263            info_to_include: Optional[list[str]] = None
 264    ) -> Optional[requests.Response]:
 265        """
 266        Get information about a snapshot.
 267
 268        **Args:**
 269        - snapshot_id (str): The ID of the snapshot.
 270        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
 271        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
 272                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
 273                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
 274
 275        **Returns:**
 276        - requests.Response (optional): The response from the request (returns None if the snapshot is not
 277         found or access is denied).
 278        """
 279        acceptable_return_code = [404, 403] if continue_not_found else []
 280        acceptable_include_info = [
 281            "SOURCES",
 282            "TABLES",
 283            "RELATIONSHIPS",
 284            "ACCESS_INFORMATION",
 285            "PROFILE",
 286            "PROPERTIES",
 287            "DATA_PROJECT",
 288            "CREATION_INFORMATION",
 289            "DUOS"
 290        ]
 291        if info_to_include:
 292            if not all(info in acceptable_include_info for info in info_to_include):
 293                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 294            include_string = '&include='.join(info_to_include)
 295        else:
 296            include_string = ""
 297        uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}"
 298        response = self.request_util.run_request(
 299            uri=uri,
 300            method=GET,
 301            accept_return_codes=acceptable_return_code
 302        )
 303        if response.status_code == 404:
 304            logging.warning(f"Snapshot {snapshot_id} not found")
 305            return None
 306        if response.status_code == 403:
 307            logging.warning(f"Access denied for snapshot {snapshot_id}")
 308            return None
 309        return response
 310
 311    def delete_snapshots(
 312            self,
 313            snapshot_ids: list[str],
 314            batch_size: int = 25,
 315            check_interval: int = 10,
 316            verbose: bool = False) -> None:
 317        """
 318        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
 319
 320        **Args:**
 321        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
 322        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
 323        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
 324        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 325        """
 326        SubmitAndMonitorMultipleJobs(
 327            tdr=self,
 328            job_function=self.delete_snapshot,
 329            job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
 330            batch_size=batch_size,
 331            check_interval=check_interval,
 332            verbose=verbose
 333        ).run()
 334
 335    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
 336        """
 337        Delete a snapshot.
 338
 339        **Args:**
 340        - snapshot_id (str): The ID of the snapshot to be deleted.
 341
 342        **Returns:**
 343        - requests.Response: The response from the request.
 344        """
 345        uri = f"{self.tdr_link}/snapshots/{snapshot_id}"
 346        logging.info(f"Deleting snapshot {snapshot_id}")
 347        return self.request_util.run_request(uri=uri, method=DELETE)
 348
 349    def _yield_existing_datasets(
 350            self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc"
 351    ) -> Any:
 352        """
 353        Get all datasets in TDR, optionally filtered by dataset name.
 354
 355        **Args:**
 356            filter (Optional[str]): A filter string to match dataset names. Defaults to None.
 357            batch_size (int): The number of datasets to retrieve per batch. Defaults to 100.
 358            direction (str): The direction to sort the datasets by creation date. Defaults to "asc".
 359
 360        Yields:
 361            Any: A generator yielding datasets.
 362        """
 363        offset = 0
 364        if filter:
 365            filter_str = f"&filter={filter}"
 366            log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}"
 367        else:
 368            filter_str = ""
 369            log_message = f"Searching for all datasets in batches of {batch_size}"
 370        logging.info(log_message)
 371        while True:
 372            uri = f"{self.tdr_link}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}"  # noqa: E501
 373            response = self.request_util.run_request(uri=uri, method=GET)
 374            datasets = response.json()["items"]
 375            if not datasets:
 376                break
 377            for dataset in datasets:
 378                yield dataset
 379            offset += batch_size
 380            break
 381
 382    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
 383        """
 384        Check if a dataset exists by name and optionally by billing profile.
 385
 386        **Args:**
 387        - dataset_name (str): The name of the dataset to check.
 388        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
 389
 390        **Returns:**
 391        - list[dict]: A list of matching datasets.
 392        """
 393        matching_datasets = []
 394        for dataset in self._yield_existing_datasets(filter=dataset_name):
 395            # Search uses wildcard so could grab more datasets where dataset_name is substring
 396            if dataset_name == dataset["name"]:
 397                if billing_profile:
 398                    if dataset["defaultProfileId"] == billing_profile:
 399                        logging.info(
 400                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
 401                        dataset_id = dataset["id"]
 402                        logging.info(f"Dataset ID: {dataset_id}")
 403                        matching_datasets.append(dataset)
 404                    else:
 405                        logging.warning(
 406                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
 407                            f"and not under billing profile {billing_profile}"
 408                        )
 409                        # Datasets names need to be unique regardless of billing profile, so raise an error if
 410                        # a dataset with the same name is found but is not under the requested billing profile
 411                        raise ValueError(
 412                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
 413                else:
 414                    matching_datasets.append(dataset)
 415        return matching_datasets
 416
 417    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
 418        """
 419        Get information about a dataset.
 420
 421        **Args:**
 422        - dataset_id (str): The ID of the dataset.
 423        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
 424        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
 425        Defaults to None.
 426
 427        **Returns:**
 428        - requests.Response: The response from the request.
 429
 430        **Raises:**
 431        - ValueError: If `info_to_include` contains invalid information types.
 432        """
 433        acceptable_include_info = [
 434            "SCHEMA",
 435            "ACCESS_INFORMATION",
 436            "PROFILE",
 437            "PROPERTIES",
 438            "DATA_PROJECT",
 439            "STORAGE",
 440            "SNAPSHOT_BUILDER_SETTING"
 441        ]
 442        if info_to_include:
 443            if not all(info in acceptable_include_info for info in info_to_include):
 444                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 445            include_string = '&include='.join(info_to_include)
 446        else:
 447            include_string = ""
 448        uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}"
 449        return self.request_util.run_request(uri=uri, method=GET)
 450
 451    def get_table_schema_info(
 452            self,
 453            dataset_id: str,
 454            table_name: str,
 455            dataset_info: Optional[dict] = None
 456    ) -> Union[dict, None]:
 457        """
 458        Get schema information for a specific table within a dataset.
 459
 460        **Args:**
 461        - dataset_id (str): The ID of the dataset.
 462        - table_name (str): The name of the table.
 463        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
 464
 465        **Returns:**
 466        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
 467        """
 468        if not dataset_info:
 469            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 470        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
 471            if table["name"] == table_name:
 472                return table
 473        return None
 474
 475    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
 476        """
 477        Retrieve the result of a job.
 478
 479        **Args:**
 480        - job_id (str): The ID of the job.
 481        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
 482
 483        **Returns:**
 484        - requests.Response: The response from the request.
 485        """
 486        uri = f"{self.tdr_link}/jobs/{job_id}/result"
 487        # If job is expected to fail, accept any return code
 488        acceptable_return_code = list(range(100, 600)) if expect_failure else []
 489        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
 490
 491    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
 492        """
 493        Load data into a TDR dataset.
 494
 495        **Args:**
 496        - dataset_id (str): The ID of the dataset.
 497        - data (dict): The data to be ingested.
 498
 499        **Returns:**
 500        - requests.Response: The response from the request.
 501        """
 502        uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest"
 503        logging.info(
 504            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
 505            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
 506        return self.request_util.run_request(
 507            uri=uri,
 508            method=POST,
 509            content_type=APPLICATION_JSON,
 510            data=data
 511        )
 512
 513    def file_ingest_to_dataset(
 514            self,
 515            dataset_id: str,
 516            profile_id: str,
 517            file_list: list[dict],
 518            load_tag: str = "file_ingest_load_tag"
 519    ) -> dict:
 520        """
 521        Load files into a TDR dataset.
 522
 523        **Args:**
 524        - dataset_id (str): The ID of the dataset.
 525        - profile_id (str): The billing profile ID.
 526        - file_list (list[dict]): The list of files to be ingested.
 527        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
 528
 529        **Returns:**
 530        - dict: A dictionary containing the response from the ingest operation job monitoring.
 531        """
 532        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array"
 533        data = {
 534            "profileId": profile_id,
 535            "loadTag": f"{load_tag}",
 536            "maxFailedFileLoads": 0,
 537            "loadArray": file_list
 538        }
 539
 540        response = self.request_util.run_request(
 541            uri=uri,
 542            method=POST,
 543            content_type=APPLICATION_JSON,
 544            data=json.dumps(data)
 545        )
 546        job_id = response.json()['id']
 547        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 548        return job_results  # type: ignore[return-value]
 549
 550    def get_dataset_table_metrics(
 551            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
 552    ) -> list[dict]:
 553        """
 554        Retrieve all metrics for a specific table within a dataset.
 555
 556        **Args:**
 557        - dataset_id (str): The ID of the dataset.
 558        - target_table_name (str): The name of the target table.
 559        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 560
 561        **Returns:**
 562        - list[dict]: A list of dictionaries containing the metrics for the specified table.
 563        """
 564        return [
 565            metric
 566            for metric in self._yield_dataset_metrics(
 567                dataset_id=dataset_id,
 568                target_table_name=target_table_name,
 569                query_limit=query_limit
 570            )
 571        ]
 572
 573    def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any:
 574        """
 575        Yield all entity metrics from a dataset.
 576
 577        **Args:**
 578            dataset_id (str): The ID of the dataset.
 579            target_table_name (str): The name of the target table.
 580            query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
 581
 582        Yields:
 583            Any: A generator yielding dictionaries containing the metrics for the specified table.
 584        """
 585        search_request = {
 586            "offset": 0,
 587            "limit": query_limit,
 588            "sort": "datarepo_row_id"
 589        }
 590        uri = f"{self.tdr_link}/datasets/{dataset_id}/data/{target_table_name}"
 591        while True:
 592            batch_number = int((search_request["offset"] / query_limit)) + 1  # type: ignore[operator]
 593            response = self.request_util.run_request(
 594                uri=uri,
 595                method=POST,
 596                content_type=APPLICATION_JSON,
 597                data=json.dumps(search_request)
 598            )
 599            if not response or not response.json()["result"]:
 600                break
 601            logging.info(
 602                f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " +
 603                f"dataset {dataset_id}"
 604            )
 605            for record in response.json()["result"]:
 606                yield record
 607            search_request["offset"] += query_limit  # type: ignore[operator]
 608
 609    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
 610        """
 611        Get existing IDs from a dataset.
 612
 613        **Args:**
 614        - dataset_id (str): The ID of the dataset.
 615        - target_table_name (str): The name of the target table.
 616        - entity_id (str): The entity ID to retrieve.
 617
 618        **Returns:**
 619        - list[str]: A list of entity IDs from the specified table.
 620        """
 621        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
 622        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
 623
 624    def get_job_status(self, job_id: str) -> requests.Response:
 625        """
 626        Retrieve the status of a job.
 627
 628        **Args:**
 629        - job_id (str): The ID of the job.
 630
 631        **Returns:**
 632        - requests.Response: The response from the request.
 633        """
 634        uri = f"{self.tdr_link}/jobs/{job_id}"
 635        return self.request_util.run_request(uri=uri, method=GET)
 636
 637    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
 638        """
 639        Get all file UUIDs from the metadata of a dataset.
 640
 641        **Args:**
 642        - dataset_id (str): The ID of the dataset.
 643
 644        **Returns:**
 645        - list[str]: A list of file UUIDs from the dataset metadata.
 646        """
 647        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 648        all_metadata_file_uuids = []
 649        tables = 0
 650        for table in dataset_info["schema"]["tables"]:
 651            tables += 1
 652            table_name = table["name"]
 653            logging.info(f"Getting all file information for {table_name}")
 654            # Get just columns where datatype is fileref
 655            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
 656            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
 657            # Get unique list of file uuids
 658            file_uuids = list(
 659                set(
 660                    [
 661                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
 662                    ]
 663                )
 664            )
 665            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
 666            all_metadata_file_uuids.extend(file_uuids)
 667            # Make full list unique
 668            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
 669        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
 670        return all_metadata_file_uuids
 671
 672    def soft_delete_entries(
 673            self,
 674            dataset_id: str,
 675            table_name: str,
 676            datarepo_row_ids: list[str],
 677            check_intervals: int = 15
 678    ) -> Optional[dict]:
 679        """
 680        Soft delete specific records from a table.
 681
 682        **Args:**
 683        - dataset_id (str): The ID of the dataset.
 684        - table_name (str): The name of the target table.
 685        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
 686        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 687
 688        **Returns:**
 689        - dict (optional): A dictionary containing the response from the soft delete operation job
 690        monitoring. Returns None if no row IDs are provided.
 691        """
 692        if not datarepo_row_ids:
 693            logging.info(f"No records found to soft delete in table {table_name}")
 694            return None
 695        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
 696        uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes"
 697        payload = {
 698            "deleteType": "soft",
 699            "specType": "jsonArray",
 700            "tables": [
 701                {
 702                    "tableName": table_name,
 703                    "jsonArraySpec": {
 704                        "rowIds": datarepo_row_ids
 705                    }
 706                }
 707            ]
 708        }
 709        response = self.request_util.run_request(
 710            method=POST,
 711            uri=uri,
 712            data=json.dumps(payload),
 713            content_type=APPLICATION_JSON
 714        )
 715        job_id = response.json()["id"]
 716        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
 717
 718    def soft_delete_all_table_entries(
 719            self,
 720            dataset_id: str,
 721            table_name: str,
 722            query_limit: int = 1000,
 723            check_intervals: int = 15
 724    ) -> Optional[dict]:
 725        """
 726        Soft deletes all records in a table.
 727
 728        **Args:**
 729        - dataset_id (str): The ID of the dataset.
 730        - table_name (str): The name of the target table.
 731        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 732        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 733
 734        **Returns:**
 735        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
 736        None if no row IDs are provided.
 737        """
 738        dataset_metrics = self.get_dataset_table_metrics(
 739            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
 740        )
 741        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
 742        return self.soft_delete_entries(
 743            dataset_id=dataset_id,
 744            table_name=table_name,
 745            datarepo_row_ids=row_ids,
 746            check_intervals=check_intervals
 747        )
 748
 749    def get_or_create_dataset(
 750            self,
 751            dataset_name: str,
 752            billing_profile: str,
 753            schema: dict,
 754            description: str,
 755            relationships: Optional[list[dict]] = None,
 756            delete_existing: bool = False,
 757            continue_if_exists: bool = False,
 758            additional_properties_dict: Optional[dict] = None
 759    ) -> str:
 760        """
 761        Get or create a dataset.
 762
 763        **Args:**
 764        - dataset_name (str): The name of the dataset.
 765        - billing_profile (str): The billing profile ID.
 766        - schema (dict): The schema of the dataset.
 767        - description (str): The description of the dataset.
 768        - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema.
 769                Defaults to None.
 770        - additional_properties_dict (Optional[dict], optional): Additional properties
 771                for the dataset. Defaults to None.
 772        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
 773                Defaults to `False`.
 774        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
 775                Defaults to `False`.
 776
 777        **Returns:**
 778        - str: The ID of the dataset.
 779
 780        **Raises:**
 781        - ValueError: If multiple datasets with the same name are found under the billing profile.
 782        """
 783        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
 784        if existing_datasets:
 785            if not continue_if_exists:
 786                raise ValueError(
 787                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
 788                )
 789            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
 790            if delete_existing:
 791                logging.info(f"Deleting existing dataset {dataset_name}")
 792                self.delete_dataset(existing_datasets[0]["id"])
 793                existing_datasets = []
 794            # If not delete_existing and continue_if_exists then grab existing datasets id
 795            else:
 796                dataset_id = existing_datasets[0]["id"]
 797        if not existing_datasets:
 798            logging.info("Did not find existing dataset")
 799            # Create dataset
 800            dataset_id = self.create_dataset(
 801                schema=schema,
 802                dataset_name=dataset_name,
 803                description=description,
 804                profile_id=billing_profile,
 805                additional_dataset_properties=additional_properties_dict
 806            )
 807        return dataset_id
 808
 809    def create_dataset(  # type: ignore[return]
 810            self,
 811            schema: dict,
 812            dataset_name: str,
 813            description: str,
 814            profile_id: str,
 815            additional_dataset_properties: Optional[dict] = None
 816    ) -> Optional[str]:
 817        """
 818        Create a new dataset.
 819
 820        **Args:**
 821        - schema (dict): The schema of the dataset.
 822        - dataset_name (str): The name of the dataset.
 823        - description (str): The description of the dataset.
 824        - profile_id (str): The billing profile ID.
 825        - additional_dataset_properties (Optional[dict], optional): Additional
 826                properties for the dataset. Defaults to None.
 827
 828        **Returns:**
 829        - Optional[str]: The ID of the created dataset, or None if creation failed.
 830
 831        Raises:
 832        - ValueError: If the schema validation fails.
 833        """
 834        dataset_properties = {
 835            "name": dataset_name,
 836            "description": description,
 837            "defaultProfileId": profile_id,
 838            "region": "us-central1",
 839            "cloudPlatform": GCP,
 840            "schema": schema
 841        }
 842
 843        if additional_dataset_properties:
 844            dataset_properties.update(additional_dataset_properties)
 845        try:
 846            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
 847        except ValidationError as e:
 848            raise ValueError(f"Schema validation error: {e}")
 849        uri = f"{self.tdr_link}/datasets"
 850        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
 851        response = self.request_util.run_request(
 852            method=POST,
 853            uri=uri,
 854            data=json.dumps(dataset_properties),
 855            content_type=APPLICATION_JSON
 856        )
 857        job_id = response.json()["id"]
 858        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 859        dataset_id = job_results["id"]  # type: ignore[index]
 860        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
 861        return dataset_id
 862
 863    def update_dataset_schema(  # type: ignore[return]
 864            self,
 865            dataset_id: str,
 866            update_note: str,
 867            tables_to_add: Optional[list[dict]] = None,
 868            relationships_to_add: Optional[list[dict]] = None,
 869            columns_to_add: Optional[list[dict]] = None
 870    ) -> Optional[str]:
 871        """
 872        Update the schema of a dataset.
 873
 874        **Args:**
 875        - dataset_id (str): The ID of the dataset.
 876        - update_note (str): A note describing the update.
 877        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
 878        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
 879        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
 880
 881        **Returns:**
 882        - Optional[str]: The ID of the updated dataset, or None if the update failed.
 883
 884        **Raises:**
 885        - ValueError: If the schema validation fails.
 886        """
 887        uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema"
 888        request_body: dict = {"description": f"{update_note}", "changes": {}}
 889        if tables_to_add:
 890            request_body["changes"]["addTables"] = tables_to_add
 891        if relationships_to_add:
 892            request_body["changes"]["addRelationships"] = relationships_to_add
 893        if columns_to_add:
 894            request_body["changes"]["addColumns"] = columns_to_add
 895        try:
 896            UpdateSchema(**request_body)
 897        except ValidationError as e:
 898            raise ValueError(f"Schema validation error: {e}")
 899
 900        response = self.request_util.run_request(
 901            uri=uri,
 902            method=POST,
 903            content_type=APPLICATION_JSON,
 904            data=json.dumps(request_body)
 905        )
 906        job_id = response.json()["id"]
 907        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 908        dataset_id = job_results["id"]  # type: ignore[index]
 909        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
 910        return dataset_id
 911
 912    def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]:
 913        """
 914        Get response from a batched endpoint.
 915
 916        Helper method for all GET endpoints that require batching.
 917
 918        Given the URI and the limit (optional), will
 919        loop through batches until all metadata is retrieved.
 920
 921        **Args:**
 922        - uri (str): The base URI for the endpoint (without query params for offset or limit).
 923        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 924
 925        **Returns:**
 926        - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint.
 927        """
 928        batch = 1
 929        offset = 0
 930        metadata: list = []
 931        while True:
 932            logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata")
 933            response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json()
 934
 935            # If no more files, break the loop
 936            if not response_json:
 937                logging.info(f"No more results to retrieve, found {len(metadata)} total records")
 938                break
 939
 940            metadata.extend(response_json)
 941            # Increment the offset by limit for the next page
 942            offset += limit
 943            batch += 1
 944        return metadata
 945
 946    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
 947        """
 948        Return all the metadata about files in a given snapshot.
 949
 950        Not all files can be returned at once, so the API
 951        is used repeatedly until all "batches" have been returned.
 952
 953        **Args:**
 954        - snapshot_id (str): The ID of the snapshot.
 955        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 956
 957        **Returns:**
 958        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
 959        """
 960        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files"
 961        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 962
 963    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
 964        """
 965        Return snapshots belonging to specified dataset.
 966
 967        **Args:**
 968        - dataset_id: uuid of dataset to query.
 969
 970        **Returns:**
 971        - requests.Response: The response from the request.
 972        """
 973        uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}"
 974        return self.request_util.run_request(
 975            uri=uri,
 976            method=GET
 977        )
 978
 979    def create_snapshot(
 980            self,
 981            snapshot_name: str,
 982            description: str,
 983            dataset_name: str,
 984            snapshot_mode: str,  # byFullView is entire dataset
 985            profile_id: str,
 986            stewards: Optional[list[str]] = [],
 987            readers: Optional[list[str]] = [],
 988            consent_code: Optional[str] = None,
 989            duos_id: Optional[str] = None,
 990            data_access_control_groups: Optional[list[str]] = None,
 991    ) -> None:
 992        """
 993        Create a snapshot in TDR.
 994
 995        **Returns:**
 996        - requests.Response: The response from the request.
 997        """
 998        uri = f"{self.tdr_link}/snapshots"
 999        payload = {
1000            "name": snapshot_name,
1001            "description": description,
1002            "contents": [
1003                {
1004                    "datasetName": dataset_name,
1005                    "mode": snapshot_mode,
1006                }
1007            ],
1008            "policies": {
1009                "stewards": stewards,
1010                "readers": readers,
1011            },
1012            "profileId": profile_id,
1013            "globalFileIds": True,
1014        }
1015        if consent_code:
1016            payload["consentCode"] = consent_code
1017        if duos_id:
1018            payload["duosId"] = duos_id
1019        if data_access_control_groups:
1020            payload["dataAccessControlGroups"] = data_access_control_groups
1021        logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}")
1022        response = self.request_util.run_request(
1023            uri=uri,
1024            method=POST,
1025            content_type=APPLICATION_JSON,
1026            data=json.dumps(payload)
1027        )
1028        job_id = response.json()["id"]
1029        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
1030        snapshot_id = job_results["id"]  # type: ignore[index]
1031        logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")

Class to interact with the Terra Data Repository (TDR) API.

TDR(request_util: ops_utils.request_util.RunRequest, env: str = 'prod')
24    def __init__(self, request_util: RunRequest, env: str = 'prod'):
25        """
26        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
27
28        **Args:**
29        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
30        """
31        self.request_util = request_util
32        if env.lower() == 'prod':
33            self.tdr_link = self.PROD_LINK
34        elif env.lower() == 'dev':
35            self.tdr_link = self.DEV_LINK
36        else:
37            raise RuntimeError(f"Unsupported environment: {env}. Must be 'prod' or 'dev'.")
38        """@private"""

Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).

Args:

request_util
def get_dataset_files(self, dataset_id: str, limit: int = 20000) -> list[dict]:
 54    def get_dataset_files(
 55            self,
 56            dataset_id: str,
 57            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 58    ) -> list[dict]:
 59        """
 60        Get all files in a dataset.
 61
 62        Returns json like below
 63
 64            {
 65                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
 66                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
 67                "path": "/path/set/in/ingest.csv",
 68                "size": 1722,
 69                "checksums": [
 70                    {
 71                        "checksum": "82f7e79v",
 72                        "type": "crc32c"
 73                    },
 74                    {
 75                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
 76                        "type": "md5"
 77                    }
 78                ],
 79                "created": "2024-13-11T15:01:00.256Z",
 80                "description": null,
 81                "fileType": "file",
 82                "fileDetail": {
 83                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
 84                    "mimeType": null,
 85                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
 86                    "loadTag": "RP_3333-RP_3333"
 87                },
 88                "directoryDetail": null
 89            }
 90
 91        **Args:**
 92        - dataset_id (str): The ID of the dataset.
 93        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 94
 95        **Returns:**
 96        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
 97        """
 98        uri = f"{self.tdr_link}/datasets/{dataset_id}/files"
 99        logging.info(f"Getting all files in dataset {dataset_id}")
100        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)

Get all files in a dataset.

Returns json like below

{
    "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
    "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
    "path": "/path/set/in/ingest.csv",
    "size": 1722,
    "checksums": [
        {
            "checksum": "82f7e79v",
            "type": "crc32c"
        },
        {
            "checksum": "fff973507e30b74fa47a3d6830b84a90",
            "type": "md5"
        }
    ],
    "created": "2024-13-11T15:01:00.256Z",
    "description": null,
    "fileType": "file",
    "fileDetail": {
        "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
        "mimeType": null,
        "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
        "loadTag": "RP_3333-RP_3333"
    },
    "directoryDetail": null
}

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
def create_file_dict(self, dataset_id: str, limit: int = 20000) -> dict:
102    def create_file_dict(
103            self,
104            dataset_id: str,
105            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
106    ) -> dict:
107        """
108        Create a dictionary of all files in a dataset where the key is the file UUID.
109
110        **Args:**
111        - dataset_id (str): The ID of the dataset.
112        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
113
114        **Returns:**
115        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
116        """
117        return {
118            file_dict["fileId"]: file_dict
119            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
120        }

Create a dictionary of all files in a dataset where the key is the file UUID.

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • dict: A dictionary where the key is the file UUID and the value is the file metadata.
def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(self, dataset_id: str, limit: int = 20000) -> dict:
122    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
123            self,
124            dataset_id: str,
125            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
126    ) -> dict:
127        """
128        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
129
130        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
131
132        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
133
134        **Args:**
135        - dataset_id (str): The ID of the dataset.
136        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
137
138        **Returns:**
139        - dict: A dictionary where the key is the file UUID and the value is the file path.
140        """
141        return {
142            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
143            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
144        }

Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.

This assumes that the TDR 'path' is original path of the file in the cloud storage with gs:// stripped out.

This will ONLY work if dataset was created with experimentalSelfHosted = True

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • dict: A dictionary where the key is the file UUID and the value is the file path.
def delete_file(self, file_id: str, dataset_id: str) -> requests.models.Response:
146    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
147        """
148        Delete a file from a dataset.
149
150        **Args:**
151        - file_id (str): The ID of the file to be deleted.
152        - dataset_id (str): The ID of the dataset.
153
154        **Returns:**
155        - requests.Response: The response from the request.
156        """
157        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/{file_id}"
158        logging.info(f"Submitting delete job for file {file_id}")
159        return self.request_util.run_request(uri=uri, method=DELETE)

Delete a file from a dataset.

Args:

  • file_id (str): The ID of the file to be deleted.
  • dataset_id (str): The ID of the dataset.

Returns:

  • requests.Response: The response from the request.
def delete_files( self, file_ids: list[str], dataset_id: str, batch_size_to_delete_files: int = 200, check_interval: int = 15) -> None:
161    def delete_files(
162            self,
163            file_ids: list[str],
164            dataset_id: str,
165            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
166            check_interval: int = 15) -> None:
167        """
168        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
169
170        **Args:**
171        - file_ids (list[str]): A list of file IDs to be deleted.
172        - dataset_id (str): The ID of the dataset.
173        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
174        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
175        """
176        SubmitAndMonitorMultipleJobs(
177            tdr=self,
178            job_function=self.delete_file,
179            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
180            batch_size=batch_size_to_delete_files,
181            check_interval=check_interval
182        ).run()

Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.

Args:

  • file_ids (list[str]): A list of file IDs to be deleted.
  • dataset_id (str): The ID of the dataset.
  • batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to 200.
  • check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to 15.
def add_user_to_dataset( self, dataset_id: str, user: str, policy: str) -> requests.models.Response:
184    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
185        """
186        Add a user to a dataset with a specified policy.
187
188        **Args:**
189        - dataset_id (str): The ID of the dataset.
190        - user (str): The email of the user to be added.
191        - policy (str): The policy to be assigned to the user.
192                Must be one of `steward`, `custodian`, or `snapshot_creator`.
193
194        **Returns:**
195        - requests.Response: The response from the request.
196
197        **Raises:**
198        - ValueError: If the policy is not valid.
199        """
200        self._check_policy(policy)
201        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members"
202        member_dict = {"email": user}
203        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
204        return self.request_util.run_request(
205            uri=uri,
206            method=POST,
207            data=json.dumps(member_dict),
208            content_type=APPLICATION_JSON
209        )

Add a user to a dataset with a specified policy.

Args:

  • dataset_id (str): The ID of the dataset.
  • user (str): The email of the user to be added.
  • policy (str): The policy to be assigned to the user. Must be one of steward, custodian, or snapshot_creator.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If the policy is not valid.
def remove_user_from_dataset( self, dataset_id: str, user: str, policy: str) -> requests.models.Response:
211    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
212        """
213        Remove a user from a dataset.
214
215        **Args:**
216        - dataset_id (str): The ID of the dataset.
217        - user (str): The email of the user to be removed.
218        - policy (str): The policy to be removed from the user.
219                Must be one of `steward`, `custodian`, or `snapshot_creator`.
220
221        **Returns:**
222        - requests.Response: The response from the request.
223
224        **Raises:**
225        - ValueError: If the policy is not valid.
226        """
227        self._check_policy(policy)
228        uri = f"{self.tdr_link}/datasets/{dataset_id}/policies/{policy}/members/{user}"
229        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
230        return self.request_util.run_request(uri=uri, method=DELETE)

Remove a user from a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • user (str): The email of the user to be removed.
  • policy (str): The policy to be removed from the user. Must be one of steward, custodian, or snapshot_creator.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If the policy is not valid.
def delete_dataset(self, dataset_id: str) -> None:
232    def delete_dataset(self, dataset_id: str) -> None:
233        """
234        Delete a dataset and monitors the job until completion.
235
236        **Args:**
237            dataset_id (str): The ID of the dataset to be deleted.
238        """
239        uri = f"{self.tdr_link}/datasets/{dataset_id}"
240        logging.info(f"Deleting dataset {dataset_id}")
241        response = self.request_util.run_request(uri=uri, method=DELETE)
242        job_id = response.json()['id']
243        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()

Delete a dataset and monitors the job until completion.

Args: dataset_id (str): The ID of the dataset to be deleted.

def make_snapshot_public(self, snapshot_id: str) -> requests.models.Response:
245    def make_snapshot_public(self, snapshot_id: str) -> requests.Response:
246        """
247        Make a snapshot public.
248
249        **Args:**
250        - snapshot_id (str): The ID of the snapshot to be made public.
251
252        **Returns:**
253        - requests.Response: The response from the request.
254        """
255        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/public"
256        logging.info(f"Making snapshot {snapshot_id} public")
257        return self.request_util.run_request(uri=uri, method=PUT, content_type=APPLICATION_JSON, data="true")

Make a snapshot public.

Args:

  • snapshot_id (str): The ID of the snapshot to be made public.

Returns:

  • requests.Response: The response from the request.
def get_snapshot_info( self, snapshot_id: str, continue_not_found: bool = False, info_to_include: Optional[list[str]] = None) -> Optional[requests.models.Response]:
259    def get_snapshot_info(
260            self,
261            snapshot_id: str,
262            continue_not_found: bool = False,
263            info_to_include: Optional[list[str]] = None
264    ) -> Optional[requests.Response]:
265        """
266        Get information about a snapshot.
267
268        **Args:**
269        - snapshot_id (str): The ID of the snapshot.
270        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
271        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
272                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
273                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
274
275        **Returns:**
276        - requests.Response (optional): The response from the request (returns None if the snapshot is not
277         found or access is denied).
278        """
279        acceptable_return_code = [404, 403] if continue_not_found else []
280        acceptable_include_info = [
281            "SOURCES",
282            "TABLES",
283            "RELATIONSHIPS",
284            "ACCESS_INFORMATION",
285            "PROFILE",
286            "PROPERTIES",
287            "DATA_PROJECT",
288            "CREATION_INFORMATION",
289            "DUOS"
290        ]
291        if info_to_include:
292            if not all(info in acceptable_include_info for info in info_to_include):
293                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
294            include_string = '&include='.join(info_to_include)
295        else:
296            include_string = ""
297        uri = f"{self.tdr_link}/snapshots/{snapshot_id}?include={include_string}"
298        response = self.request_util.run_request(
299            uri=uri,
300            method=GET,
301            accept_return_codes=acceptable_return_code
302        )
303        if response.status_code == 404:
304            logging.warning(f"Snapshot {snapshot_id} not found")
305            return None
306        if response.status_code == 403:
307            logging.warning(f"Access denied for snapshot {snapshot_id}")
308            return None
309        return response

Get information about a snapshot.

Args:

  • snapshot_id (str): The ID of the snapshot.
  • continue_not_found (bool, optional): Whether to accept a 404 response. Defaults to False.
  • info_to_include (list[str], optional): A list of additional information to include. Defaults to None. Options are: SOURCES, TABLES, RELATIONSHIPS, ACCESS_INFORMATION, PROFILE, PROPERTIES, DATA_PROJECT,CREATION_INFORMATION, DUOS

Returns:

  • requests.Response (optional): The response from the request (returns None if the snapshot is not found or access is denied).
def delete_snapshots( self, snapshot_ids: list[str], batch_size: int = 25, check_interval: int = 10, verbose: bool = False) -> None:
311    def delete_snapshots(
312            self,
313            snapshot_ids: list[str],
314            batch_size: int = 25,
315            check_interval: int = 10,
316            verbose: bool = False) -> None:
317        """
318        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
319
320        **Args:**
321        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
322        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
323        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
324        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
325        """
326        SubmitAndMonitorMultipleJobs(
327            tdr=self,
328            job_function=self.delete_snapshot,
329            job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
330            batch_size=batch_size,
331            check_interval=check_interval,
332            verbose=verbose
333        ).run()

Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.

Args:

  • snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
  • batch_size (int, optional): The number of snapshots to delete per batch. Defaults to 25.
  • check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to 10.
  • verbose (bool, optional): Whether to log detailed information about each job. Defaults to False.
def delete_snapshot(self, snapshot_id: str) -> requests.models.Response:
335    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
336        """
337        Delete a snapshot.
338
339        **Args:**
340        - snapshot_id (str): The ID of the snapshot to be deleted.
341
342        **Returns:**
343        - requests.Response: The response from the request.
344        """
345        uri = f"{self.tdr_link}/snapshots/{snapshot_id}"
346        logging.info(f"Deleting snapshot {snapshot_id}")
347        return self.request_util.run_request(uri=uri, method=DELETE)

Delete a snapshot.

Args:

  • snapshot_id (str): The ID of the snapshot to be deleted.

Returns:

  • requests.Response: The response from the request.
def check_if_dataset_exists( self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
382    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
383        """
384        Check if a dataset exists by name and optionally by billing profile.
385
386        **Args:**
387        - dataset_name (str): The name of the dataset to check.
388        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
389
390        **Returns:**
391        - list[dict]: A list of matching datasets.
392        """
393        matching_datasets = []
394        for dataset in self._yield_existing_datasets(filter=dataset_name):
395            # Search uses wildcard so could grab more datasets where dataset_name is substring
396            if dataset_name == dataset["name"]:
397                if billing_profile:
398                    if dataset["defaultProfileId"] == billing_profile:
399                        logging.info(
400                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
401                        dataset_id = dataset["id"]
402                        logging.info(f"Dataset ID: {dataset_id}")
403                        matching_datasets.append(dataset)
404                    else:
405                        logging.warning(
406                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
407                            f"and not under billing profile {billing_profile}"
408                        )
409                        # Datasets names need to be unique regardless of billing profile, so raise an error if
410                        # a dataset with the same name is found but is not under the requested billing profile
411                        raise ValueError(
412                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
413                else:
414                    matching_datasets.append(dataset)
415        return matching_datasets

Check if a dataset exists by name and optionally by billing profile.

Args:

  • dataset_name (str): The name of the dataset to check.
  • billing_profile (str, optional): The billing profile ID to match. Defaults to None.

Returns:

  • list[dict]: A list of matching datasets.
def get_dataset_info( self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.models.Response:
417    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
418        """
419        Get information about a dataset.
420
421        **Args:**
422        - dataset_id (str): The ID of the dataset.
423        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
424        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
425        Defaults to None.
426
427        **Returns:**
428        - requests.Response: The response from the request.
429
430        **Raises:**
431        - ValueError: If `info_to_include` contains invalid information types.
432        """
433        acceptable_include_info = [
434            "SCHEMA",
435            "ACCESS_INFORMATION",
436            "PROFILE",
437            "PROPERTIES",
438            "DATA_PROJECT",
439            "STORAGE",
440            "SNAPSHOT_BUILDER_SETTING"
441        ]
442        if info_to_include:
443            if not all(info in acceptable_include_info for info in info_to_include):
444                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
445            include_string = '&include='.join(info_to_include)
446        else:
447            include_string = ""
448        uri = f"{self.tdr_link}/datasets/{dataset_id}?include={include_string}"
449        return self.request_util.run_request(uri=uri, method=GET)

Get information about a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • info_to_include (list[str], optional): A list of additional information to include. Valid options include: SCHEMA, ACCESS_INFORMATION, PROFILE, PROPERTIES, DATA_PROJECT, STORAGE, SNAPSHOT_BUILDER_SETTING. Defaults to None.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If info_to_include contains invalid information types.
def get_table_schema_info( self, dataset_id: str, table_name: str, dataset_info: Optional[dict] = None) -> Optional[dict]:
451    def get_table_schema_info(
452            self,
453            dataset_id: str,
454            table_name: str,
455            dataset_info: Optional[dict] = None
456    ) -> Union[dict, None]:
457        """
458        Get schema information for a specific table within a dataset.
459
460        **Args:**
461        - dataset_id (str): The ID of the dataset.
462        - table_name (str): The name of the table.
463        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
464
465        **Returns:**
466        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
467        """
468        if not dataset_info:
469            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
470        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
471            if table["name"] == table_name:
472                return table
473        return None

Get schema information for a specific table within a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the table.
  • dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.

Returns:

  • Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
def get_job_result( self, job_id: str, expect_failure: bool = False) -> requests.models.Response:
475    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
476        """
477        Retrieve the result of a job.
478
479        **Args:**
480        - job_id (str): The ID of the job.
481        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
482
483        **Returns:**
484        - requests.Response: The response from the request.
485        """
486        uri = f"{self.tdr_link}/jobs/{job_id}/result"
487        # If job is expected to fail, accept any return code
488        acceptable_return_code = list(range(100, 600)) if expect_failure else []
489        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)

Retrieve the result of a job.

Args:

  • job_id (str): The ID of the job.
  • expect_failure (bool, optional): Whether the job is expected to fail. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.models.Response:
491    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
492        """
493        Load data into a TDR dataset.
494
495        **Args:**
496        - dataset_id (str): The ID of the dataset.
497        - data (dict): The data to be ingested.
498
499        **Returns:**
500        - requests.Response: The response from the request.
501        """
502        uri = f"{self.tdr_link}/datasets/{dataset_id}/ingest"
503        logging.info(
504            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
505            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
506        return self.request_util.run_request(
507            uri=uri,
508            method=POST,
509            content_type=APPLICATION_JSON,
510            data=data
511        )

Load data into a TDR dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • data (dict): The data to be ingested.

Returns:

  • requests.Response: The response from the request.
def file_ingest_to_dataset( self, dataset_id: str, profile_id: str, file_list: list[dict], load_tag: str = 'file_ingest_load_tag') -> dict:
513    def file_ingest_to_dataset(
514            self,
515            dataset_id: str,
516            profile_id: str,
517            file_list: list[dict],
518            load_tag: str = "file_ingest_load_tag"
519    ) -> dict:
520        """
521        Load files into a TDR dataset.
522
523        **Args:**
524        - dataset_id (str): The ID of the dataset.
525        - profile_id (str): The billing profile ID.
526        - file_list (list[dict]): The list of files to be ingested.
527        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
528
529        **Returns:**
530        - dict: A dictionary containing the response from the ingest operation job monitoring.
531        """
532        uri = f"{self.tdr_link}/datasets/{dataset_id}/files/bulk/array"
533        data = {
534            "profileId": profile_id,
535            "loadTag": f"{load_tag}",
536            "maxFailedFileLoads": 0,
537            "loadArray": file_list
538        }
539
540        response = self.request_util.run_request(
541            uri=uri,
542            method=POST,
543            content_type=APPLICATION_JSON,
544            data=json.dumps(data)
545        )
546        job_id = response.json()['id']
547        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
548        return job_results  # type: ignore[return-value]

Load files into a TDR dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • profile_id (str): The billing profile ID.
  • file_list (list[dict]): The list of files to be ingested.
  • load_tag (str): The tag to be used in the ingest job. Defaults to file_ingest_load_tag.

Returns:

  • dict: A dictionary containing the response from the ingest operation job monitoring.
def get_dataset_table_metrics( self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> list[dict]:
550    def get_dataset_table_metrics(
551            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
552    ) -> list[dict]:
553        """
554        Retrieve all metrics for a specific table within a dataset.
555
556        **Args:**
557        - dataset_id (str): The ID of the dataset.
558        - target_table_name (str): The name of the target table.
559        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
560
561        **Returns:**
562        - list[dict]: A list of dictionaries containing the metrics for the specified table.
563        """
564        return [
565            metric
566            for metric in self._yield_dataset_metrics(
567                dataset_id=dataset_id,
568                target_table_name=target_table_name,
569                query_limit=query_limit
570            )
571        ]

Retrieve all metrics for a specific table within a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • target_table_name (str): The name of the target table.
  • query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.

Returns:

  • list[dict]: A list of dictionaries containing the metrics for the specified table.
def get_dataset_sample_ids( self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
609    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
610        """
611        Get existing IDs from a dataset.
612
613        **Args:**
614        - dataset_id (str): The ID of the dataset.
615        - target_table_name (str): The name of the target table.
616        - entity_id (str): The entity ID to retrieve.
617
618        **Returns:**
619        - list[str]: A list of entity IDs from the specified table.
620        """
621        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
622        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]

Get existing IDs from a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • target_table_name (str): The name of the target table.
  • entity_id (str): The entity ID to retrieve.

Returns:

  • list[str]: A list of entity IDs from the specified table.
def get_job_status(self, job_id: str) -> requests.models.Response:
624    def get_job_status(self, job_id: str) -> requests.Response:
625        """
626        Retrieve the status of a job.
627
628        **Args:**
629        - job_id (str): The ID of the job.
630
631        **Returns:**
632        - requests.Response: The response from the request.
633        """
634        uri = f"{self.tdr_link}/jobs/{job_id}"
635        return self.request_util.run_request(uri=uri, method=GET)

Retrieve the status of a job.

Args:

  • job_id (str): The ID of the job.

Returns:

  • requests.Response: The response from the request.
def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
637    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
638        """
639        Get all file UUIDs from the metadata of a dataset.
640
641        **Args:**
642        - dataset_id (str): The ID of the dataset.
643
644        **Returns:**
645        - list[str]: A list of file UUIDs from the dataset metadata.
646        """
647        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
648        all_metadata_file_uuids = []
649        tables = 0
650        for table in dataset_info["schema"]["tables"]:
651            tables += 1
652            table_name = table["name"]
653            logging.info(f"Getting all file information for {table_name}")
654            # Get just columns where datatype is fileref
655            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
656            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
657            # Get unique list of file uuids
658            file_uuids = list(
659                set(
660                    [
661                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
662                    ]
663                )
664            )
665            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
666            all_metadata_file_uuids.extend(file_uuids)
667            # Make full list unique
668            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
669        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
670        return all_metadata_file_uuids

Get all file UUIDs from the metadata of a dataset.

Args:

  • dataset_id (str): The ID of the dataset.

Returns:

  • list[str]: A list of file UUIDs from the dataset metadata.
def soft_delete_entries( self, dataset_id: str, table_name: str, datarepo_row_ids: list[str], check_intervals: int = 15) -> Optional[dict]:
672    def soft_delete_entries(
673            self,
674            dataset_id: str,
675            table_name: str,
676            datarepo_row_ids: list[str],
677            check_intervals: int = 15
678    ) -> Optional[dict]:
679        """
680        Soft delete specific records from a table.
681
682        **Args:**
683        - dataset_id (str): The ID of the dataset.
684        - table_name (str): The name of the target table.
685        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
686        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
687
688        **Returns:**
689        - dict (optional): A dictionary containing the response from the soft delete operation job
690        monitoring. Returns None if no row IDs are provided.
691        """
692        if not datarepo_row_ids:
693            logging.info(f"No records found to soft delete in table {table_name}")
694            return None
695        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
696        uri = f"{self.tdr_link}/datasets/{dataset_id}/deletes"
697        payload = {
698            "deleteType": "soft",
699            "specType": "jsonArray",
700            "tables": [
701                {
702                    "tableName": table_name,
703                    "jsonArraySpec": {
704                        "rowIds": datarepo_row_ids
705                    }
706                }
707            ]
708        }
709        response = self.request_util.run_request(
710            method=POST,
711            uri=uri,
712            data=json.dumps(payload),
713            content_type=APPLICATION_JSON
714        )
715        job_id = response.json()["id"]
716        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()

Soft delete specific records from a table.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the target table.
  • datarepo_row_ids (list[str]): A list of row IDs to be deleted.
  • check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to 15.

Returns:

  • dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
def soft_delete_all_table_entries( self, dataset_id: str, table_name: str, query_limit: int = 1000, check_intervals: int = 15) -> Optional[dict]:
718    def soft_delete_all_table_entries(
719            self,
720            dataset_id: str,
721            table_name: str,
722            query_limit: int = 1000,
723            check_intervals: int = 15
724    ) -> Optional[dict]:
725        """
726        Soft deletes all records in a table.
727
728        **Args:**
729        - dataset_id (str): The ID of the dataset.
730        - table_name (str): The name of the target table.
731        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
732        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
733
734        **Returns:**
735        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
736        None if no row IDs are provided.
737        """
738        dataset_metrics = self.get_dataset_table_metrics(
739            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
740        )
741        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
742        return self.soft_delete_entries(
743            dataset_id=dataset_id,
744            table_name=table_name,
745            datarepo_row_ids=row_ids,
746            check_intervals=check_intervals
747        )

Soft deletes all records in a table.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the target table.
  • query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
  • check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to 15.

Returns:

  • dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
def get_or_create_dataset( self, dataset_name: str, billing_profile: str, schema: dict, description: str, relationships: Optional[list[dict]] = None, delete_existing: bool = False, continue_if_exists: bool = False, additional_properties_dict: Optional[dict] = None) -> str:
749    def get_or_create_dataset(
750            self,
751            dataset_name: str,
752            billing_profile: str,
753            schema: dict,
754            description: str,
755            relationships: Optional[list[dict]] = None,
756            delete_existing: bool = False,
757            continue_if_exists: bool = False,
758            additional_properties_dict: Optional[dict] = None
759    ) -> str:
760        """
761        Get or create a dataset.
762
763        **Args:**
764        - dataset_name (str): The name of the dataset.
765        - billing_profile (str): The billing profile ID.
766        - schema (dict): The schema of the dataset.
767        - description (str): The description of the dataset.
768        - relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema.
769                Defaults to None.
770        - additional_properties_dict (Optional[dict], optional): Additional properties
771                for the dataset. Defaults to None.
772        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
773                Defaults to `False`.
774        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
775                Defaults to `False`.
776
777        **Returns:**
778        - str: The ID of the dataset.
779
780        **Raises:**
781        - ValueError: If multiple datasets with the same name are found under the billing profile.
782        """
783        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
784        if existing_datasets:
785            if not continue_if_exists:
786                raise ValueError(
787                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
788                )
789            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
790            if delete_existing:
791                logging.info(f"Deleting existing dataset {dataset_name}")
792                self.delete_dataset(existing_datasets[0]["id"])
793                existing_datasets = []
794            # If not delete_existing and continue_if_exists then grab existing datasets id
795            else:
796                dataset_id = existing_datasets[0]["id"]
797        if not existing_datasets:
798            logging.info("Did not find existing dataset")
799            # Create dataset
800            dataset_id = self.create_dataset(
801                schema=schema,
802                dataset_name=dataset_name,
803                description=description,
804                profile_id=billing_profile,
805                additional_dataset_properties=additional_properties_dict
806            )
807        return dataset_id

Get or create a dataset.

Args:

  • dataset_name (str): The name of the dataset.
  • billing_profile (str): The billing profile ID.
  • schema (dict): The schema of the dataset.
  • description (str): The description of the dataset.
  • relationships (Optional[list[dict]], optional): A list of relationships to add to the dataset schema. Defaults to None.
  • additional_properties_dict (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
  • delete_existing (bool, optional): Whether to delete the existing dataset if found. Defaults to False.
  • continue_if_exists (bool, optional): Whether to continue if the dataset already exists. Defaults to False.

Returns:

  • str: The ID of the dataset.

Raises:

  • ValueError: If multiple datasets with the same name are found under the billing profile.
def create_dataset( self, schema: dict, dataset_name: str, description: str, profile_id: str, additional_dataset_properties: Optional[dict] = None) -> Optional[str]:
809    def create_dataset(  # type: ignore[return]
810            self,
811            schema: dict,
812            dataset_name: str,
813            description: str,
814            profile_id: str,
815            additional_dataset_properties: Optional[dict] = None
816    ) -> Optional[str]:
817        """
818        Create a new dataset.
819
820        **Args:**
821        - schema (dict): The schema of the dataset.
822        - dataset_name (str): The name of the dataset.
823        - description (str): The description of the dataset.
824        - profile_id (str): The billing profile ID.
825        - additional_dataset_properties (Optional[dict], optional): Additional
826                properties for the dataset. Defaults to None.
827
828        **Returns:**
829        - Optional[str]: The ID of the created dataset, or None if creation failed.
830
831        Raises:
832        - ValueError: If the schema validation fails.
833        """
834        dataset_properties = {
835            "name": dataset_name,
836            "description": description,
837            "defaultProfileId": profile_id,
838            "region": "us-central1",
839            "cloudPlatform": GCP,
840            "schema": schema
841        }
842
843        if additional_dataset_properties:
844            dataset_properties.update(additional_dataset_properties)
845        try:
846            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
847        except ValidationError as e:
848            raise ValueError(f"Schema validation error: {e}")
849        uri = f"{self.tdr_link}/datasets"
850        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
851        response = self.request_util.run_request(
852            method=POST,
853            uri=uri,
854            data=json.dumps(dataset_properties),
855            content_type=APPLICATION_JSON
856        )
857        job_id = response.json()["id"]
858        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
859        dataset_id = job_results["id"]  # type: ignore[index]
860        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
861        return dataset_id

Create a new dataset.

Args:

  • schema (dict): The schema of the dataset.
  • dataset_name (str): The name of the dataset.
  • description (str): The description of the dataset.
  • profile_id (str): The billing profile ID.
  • additional_dataset_properties (Optional[dict], optional): Additional properties for the dataset. Defaults to None.

Returns:

  • Optional[str]: The ID of the created dataset, or None if creation failed.

Raises:

  • ValueError: If the schema validation fails.
def update_dataset_schema( self, dataset_id: str, update_note: str, tables_to_add: Optional[list[dict]] = None, relationships_to_add: Optional[list[dict]] = None, columns_to_add: Optional[list[dict]] = None) -> Optional[str]:
863    def update_dataset_schema(  # type: ignore[return]
864            self,
865            dataset_id: str,
866            update_note: str,
867            tables_to_add: Optional[list[dict]] = None,
868            relationships_to_add: Optional[list[dict]] = None,
869            columns_to_add: Optional[list[dict]] = None
870    ) -> Optional[str]:
871        """
872        Update the schema of a dataset.
873
874        **Args:**
875        - dataset_id (str): The ID of the dataset.
876        - update_note (str): A note describing the update.
877        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
878        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
879        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
880
881        **Returns:**
882        - Optional[str]: The ID of the updated dataset, or None if the update failed.
883
884        **Raises:**
885        - ValueError: If the schema validation fails.
886        """
887        uri = f"{self.tdr_link}/datasets/{dataset_id}/updateSchema"
888        request_body: dict = {"description": f"{update_note}", "changes": {}}
889        if tables_to_add:
890            request_body["changes"]["addTables"] = tables_to_add
891        if relationships_to_add:
892            request_body["changes"]["addRelationships"] = relationships_to_add
893        if columns_to_add:
894            request_body["changes"]["addColumns"] = columns_to_add
895        try:
896            UpdateSchema(**request_body)
897        except ValidationError as e:
898            raise ValueError(f"Schema validation error: {e}")
899
900        response = self.request_util.run_request(
901            uri=uri,
902            method=POST,
903            content_type=APPLICATION_JSON,
904            data=json.dumps(request_body)
905        )
906        job_id = response.json()["id"]
907        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
908        dataset_id = job_results["id"]  # type: ignore[index]
909        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
910        return dataset_id

Update the schema of a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • update_note (str): A note describing the update.
  • tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
  • relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
  • columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.

Returns:

  • Optional[str]: The ID of the updated dataset, or None if the update failed.

Raises:

  • ValueError: If the schema validation fails.
def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
946    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
947        """
948        Return all the metadata about files in a given snapshot.
949
950        Not all files can be returned at once, so the API
951        is used repeatedly until all "batches" have been returned.
952
953        **Args:**
954        - snapshot_id (str): The ID of the snapshot.
955        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
956
957        **Returns:**
958        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
959        """
960        uri = f"{self.tdr_link}/snapshots/{snapshot_id}/files"
961        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)

Return all the metadata about files in a given snapshot.

Not all files can be returned at once, so the API is used repeatedly until all "batches" have been returned.

Args:

  • snapshot_id (str): The ID of the snapshot.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.

Returns:

  • list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
def get_dataset_snapshots(self, dataset_id: str) -> requests.models.Response:
963    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
964        """
965        Return snapshots belonging to specified dataset.
966
967        **Args:**
968        - dataset_id: uuid of dataset to query.
969
970        **Returns:**
971        - requests.Response: The response from the request.
972        """
973        uri = f"{self.tdr_link}/snapshots?datasetIds={dataset_id}"
974        return self.request_util.run_request(
975            uri=uri,
976            method=GET
977        )

Return snapshots belonging to specified dataset.

Args:

  • dataset_id: uuid of dataset to query.

Returns:

  • requests.Response: The response from the request.
def create_snapshot( self, snapshot_name: str, description: str, dataset_name: str, snapshot_mode: str, profile_id: str, stewards: Optional[list[str]] = [], readers: Optional[list[str]] = [], consent_code: Optional[str] = None, duos_id: Optional[str] = None, data_access_control_groups: Optional[list[str]] = None) -> None:
 979    def create_snapshot(
 980            self,
 981            snapshot_name: str,
 982            description: str,
 983            dataset_name: str,
 984            snapshot_mode: str,  # byFullView is entire dataset
 985            profile_id: str,
 986            stewards: Optional[list[str]] = [],
 987            readers: Optional[list[str]] = [],
 988            consent_code: Optional[str] = None,
 989            duos_id: Optional[str] = None,
 990            data_access_control_groups: Optional[list[str]] = None,
 991    ) -> None:
 992        """
 993        Create a snapshot in TDR.
 994
 995        **Returns:**
 996        - requests.Response: The response from the request.
 997        """
 998        uri = f"{self.tdr_link}/snapshots"
 999        payload = {
1000            "name": snapshot_name,
1001            "description": description,
1002            "contents": [
1003                {
1004                    "datasetName": dataset_name,
1005                    "mode": snapshot_mode,
1006                }
1007            ],
1008            "policies": {
1009                "stewards": stewards,
1010                "readers": readers,
1011            },
1012            "profileId": profile_id,
1013            "globalFileIds": True,
1014        }
1015        if consent_code:
1016            payload["consentCode"] = consent_code
1017        if duos_id:
1018            payload["duosId"] = duos_id
1019        if data_access_control_groups:
1020            payload["dataAccessControlGroups"] = data_access_control_groups
1021        logging.info(f"Creating snapshot {snapshot_name} in dataset {dataset_name}")
1022        response = self.request_util.run_request(
1023            uri=uri,
1024            method=POST,
1025            content_type=APPLICATION_JSON,
1026            data=json.dumps(payload)
1027        )
1028        job_id = response.json()["id"]
1029        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
1030        snapshot_id = job_results["id"]  # type: ignore[index]
1031        logging.info(f"Successfully created snapshot {snapshot_name} - {snapshot_id}")

Create a snapshot in TDR.

Returns:

  • requests.Response: The response from the request.
class FilterOutSampleIdsAlreadyInDataset:
1034class FilterOutSampleIdsAlreadyInDataset:
1035    """Class to filter ingest metrics to remove sample IDs that already exist in the dataset."""
1036
1037    def __init__(
1038            self,
1039            ingest_metrics: list[dict],
1040            dataset_id: str,
1041            tdr: TDR,
1042            target_table_name: str,
1043            filter_entity_id: str
1044    ):
1045        """
1046        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1047
1048        **Args:**
1049        - ingest_metrics (list[dict]): The metrics to be ingested.
1050        - dataset_id (str): The ID of the dataset.
1051        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1052        - target_table_name (str): The name of the target table.
1053        - filter_entity_id (str): The entity ID to filter on.
1054        """
1055        self.ingest_metrics = ingest_metrics
1056        """@private"""
1057        self.tdr = tdr
1058        """@private"""
1059        self.dataset_id = dataset_id
1060        """@private"""
1061        self.target_table_name = target_table_name
1062        """@private"""
1063        self.filter_entity_id = filter_entity_id
1064        """@private"""
1065
1066    def run(self) -> list[dict]:
1067        """
1068        Run the filter process to remove sample IDs that already exist in the dataset.
1069
1070        **Returns:**
1071        - list[dict]: The filtered ingest metrics.
1072        """
1073        # Get all sample ids that already exist in dataset
1074        logging.info(
1075            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1076            f"dataset {self.dataset_id}"
1077        )
1078
1079        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1080            dataset_id=self.dataset_id,
1081            target_table_name=self.target_table_name,
1082            entity_id=self.filter_entity_id
1083        )
1084        # Filter out rows that already exist in dataset
1085        filtered_ingest_metrics = [
1086            row
1087            for row in self.ingest_metrics
1088            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1089        ]
1090        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1091            logging.info(
1092                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1093                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1094            )
1095
1096            if filtered_ingest_metrics:
1097                return filtered_ingest_metrics
1098            else:
1099                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1100                return []
1101        else:
1102            logging.info("No rows were filtered out as they all do not exist in dataset")
1103            return filtered_ingest_metrics

Class to filter ingest metrics to remove sample IDs that already exist in the dataset.

FilterOutSampleIdsAlreadyInDataset( ingest_metrics: list[dict], dataset_id: str, tdr: TDR, target_table_name: str, filter_entity_id: str)
1037    def __init__(
1038            self,
1039            ingest_metrics: list[dict],
1040            dataset_id: str,
1041            tdr: TDR,
1042            target_table_name: str,
1043            filter_entity_id: str
1044    ):
1045        """
1046        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1047
1048        **Args:**
1049        - ingest_metrics (list[dict]): The metrics to be ingested.
1050        - dataset_id (str): The ID of the dataset.
1051        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1052        - target_table_name (str): The name of the target table.
1053        - filter_entity_id (str): The entity ID to filter on.
1054        """
1055        self.ingest_metrics = ingest_metrics
1056        """@private"""
1057        self.tdr = tdr
1058        """@private"""
1059        self.dataset_id = dataset_id
1060        """@private"""
1061        self.target_table_name = target_table_name
1062        """@private"""
1063        self.filter_entity_id = filter_entity_id
1064        """@private"""

Initialize the FilterOutSampleIdsAlreadyInDataset class.

Args:

  • ingest_metrics (list[dict]): The metrics to be ingested.
  • dataset_id (str): The ID of the dataset.
  • tdr (ops_utils.tdr_utils.tdr_utils.TDR): The TDR instance
  • target_table_name (str): The name of the target table.
  • filter_entity_id (str): The entity ID to filter on.
def run(self) -> list[dict]:
1066    def run(self) -> list[dict]:
1067        """
1068        Run the filter process to remove sample IDs that already exist in the dataset.
1069
1070        **Returns:**
1071        - list[dict]: The filtered ingest metrics.
1072        """
1073        # Get all sample ids that already exist in dataset
1074        logging.info(
1075            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1076            f"dataset {self.dataset_id}"
1077        )
1078
1079        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1080            dataset_id=self.dataset_id,
1081            target_table_name=self.target_table_name,
1082            entity_id=self.filter_entity_id
1083        )
1084        # Filter out rows that already exist in dataset
1085        filtered_ingest_metrics = [
1086            row
1087            for row in self.ingest_metrics
1088            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1089        ]
1090        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1091            logging.info(
1092                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1093                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1094            )
1095
1096            if filtered_ingest_metrics:
1097                return filtered_ingest_metrics
1098            else:
1099                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1100                return []
1101        else:
1102            logging.info("No rows were filtered out as they all do not exist in dataset")
1103            return filtered_ingest_metrics

Run the filter process to remove sample IDs that already exist in the dataset.

Returns:

  • list[dict]: The filtered ingest metrics.