ops_utils.tdr_utils.tdr_api_utils

Utility classes for interacting with TDR API.

   1"""Utility classes for interacting with TDR API."""
   2
   3import json
   4import logging
   5import requests
   6import re
   7from typing import Any, Optional, Union
   8from urllib.parse import unquote
   9from pydantic import ValidationError
  10
  11from ..request_util import GET, POST, DELETE, RunRequest
  12from ..tdr_api_schema.create_dataset_schema import CreateDatasetSchema
  13from ..tdr_api_schema.update_dataset_schema import UpdateSchema
  14from .tdr_job_utils import MonitorTDRJob, SubmitAndMonitorMultipleJobs
  15from ..vars import ARG_DEFAULTS, GCP
  16
  17
  18class TDR:
  19    """Class to interact with the Terra Data Repository (TDR) API."""
  20
  21    TDR_LINK = "https://data.terra.bio/api/repository/v1"
  22    """(str): The base URL for the TDR API."""
  23
  24    def __init__(self, request_util: RunRequest):
  25        """
  26        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
  27
  28        **Args:**
  29        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
  30        """
  31        self.request_util = request_util
  32        """@private"""
  33
  34    @staticmethod
  35    def _check_policy(policy: str) -> None:
  36        """
  37        Check if the policy is valid.
  38
  39        **Args:**
  40        - policy (str): The role to check.
  41
  42        **Raises:**
  43        - ValueError: If the policy is not one of the allowed options.
  44        """
  45        if policy not in ["steward", "custodian", "snapshot_creator"]:
  46            raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator")
  47
  48    def get_dataset_files(
  49            self,
  50            dataset_id: str,
  51            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
  52    ) -> list[dict]:
  53        """
  54        Get all files in a dataset.
  55
  56        Returns json like below
  57
  58            {
  59                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
  60                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
  61                "path": "/path/set/in/ingest.csv",
  62                "size": 1722,
  63                "checksums": [
  64                    {
  65                        "checksum": "82f7e79v",
  66                        "type": "crc32c"
  67                    },
  68                    {
  69                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
  70                        "type": "md5"
  71                    }
  72                ],
  73                "created": "2024-13-11T15:01:00.256Z",
  74                "description": null,
  75                "fileType": "file",
  76                "fileDetail": {
  77                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
  78                    "mimeType": null,
  79                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
  80                    "loadTag": "RP_3333-RP_3333"
  81                },
  82                "directoryDetail": null
  83            }
  84
  85        **Args:**
  86        - dataset_id (str): The ID of the dataset.
  87        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
  88
  89        **Returns:**
  90        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
  91        """
  92        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files"
  93        logging.info(f"Getting all files in dataset {dataset_id}")
  94        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
  95
  96    def create_file_dict(
  97            self,
  98            dataset_id: str,
  99            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 100    ) -> dict:
 101        """
 102        Create a dictionary of all files in a dataset where the key is the file UUID.
 103
 104        **Args:**
 105        - dataset_id (str): The ID of the dataset.
 106        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 107
 108        **Returns:**
 109        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
 110        """
 111        return {
 112            file_dict["fileId"]: file_dict
 113            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 114        }
 115
 116    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
 117            self,
 118            dataset_id: str,
 119            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 120    ) -> dict:
 121        """
 122        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
 123
 124        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
 125
 126        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
 127
 128        **Args:**
 129        - dataset_id (str): The ID of the dataset.
 130        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 131
 132        **Returns:**
 133        - dict: A dictionary where the key is the file UUID and the value is the file path.
 134        """
 135        return {
 136            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
 137            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
 138        }
 139
 140    def get_sas_token(self, snapshot_id: str = "", dataset_id: str = "") -> dict:
 141        """
 142        Get the SAS token for a snapshot OR dataset. Only one should be provided.
 143
 144        **Args:**
 145        - snapshot_id (str, optional): The ID of the snapshot. Defaults to "".
 146        - dataset_id (str, optional): The ID of the dataset. Defaults to "".
 147
 148        **Returns:**
 149        - dict: A dictionary containing the SAS token and its expiry time. Expiry
 150        time is a string like `2025-02-13T19:31:47Z`.
 151
 152        **Raises:**
 153        - ValueError: If neither `snapshot_id` nor `dataset_id` is provided.
 154        """
 155        if snapshot_id:
 156            uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION"
 157            response = self.request_util.run_request(uri=uri, method=GET)
 158            snapshot_info = json.loads(response.text)
 159            sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
 160        elif dataset_id:
 161            uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include=ACCESS_INFORMATION"
 162            response = self.request_util.run_request(uri=uri, method=GET)
 163            snapshot_info = json.loads(response.text)
 164            sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
 165        else:
 166            raise ValueError("Must provide either snapshot_id or dataset_id")
 167
 168        sas_expiry_time_pattern = re.compile(r"se.+?(?=\&sp)")
 169        expiry_time_str = sas_expiry_time_pattern.search(sas_token)
 170        time_str = unquote(expiry_time_str.group()).replace("se=", "")  # type: ignore[union-attr]
 171
 172        return {"sas_token": sas_token, "expiry_time": time_str}
 173
 174    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
 175        """
 176        Delete a file from a dataset.
 177
 178        **Args:**
 179        - file_id (str): The ID of the file to be deleted.
 180        - dataset_id (str): The ID of the dataset.
 181
 182        **Returns:**
 183        - requests.Response: The response from the request.
 184        """
 185        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/{file_id}"
 186        logging.info(f"Submitting delete job for file {file_id}")
 187        return self.request_util.run_request(uri=uri, method=DELETE)
 188
 189    def delete_files(
 190            self,
 191            file_ids: list[str],
 192            dataset_id: str,
 193            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
 194            check_interval: int = 15) -> None:
 195        """
 196        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
 197
 198        **Args:**
 199        - file_ids (list[str]): A list of file IDs to be deleted.
 200        - dataset_id (str): The ID of the dataset.
 201        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
 202        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 203        """
 204        SubmitAndMonitorMultipleJobs(
 205            tdr=self,
 206            job_function=self.delete_file,
 207            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
 208            batch_size=batch_size_to_delete_files,
 209            check_interval=check_interval
 210        ).run()
 211
 212    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 213        """
 214        Add a user to a dataset with a specified policy.
 215
 216        **Args:**
 217        - dataset_id (str): The ID of the dataset.
 218        - user (str): The email of the user to be added.
 219        - policy (str): The policy to be assigned to the user.
 220                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 221
 222        **Returns:**
 223        - requests.Response: The response from the request.
 224
 225        **Raises:**
 226        - ValueError: If the policy is not valid.
 227        """
 228        self._check_policy(policy)
 229        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members"
 230        member_dict = {"email": user}
 231        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
 232        return self.request_util.run_request(
 233            uri=uri,
 234            method=POST,
 235            data=json.dumps(member_dict),
 236            content_type="application/json"
 237        )
 238
 239    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
 240        """
 241        Remove a user from a dataset.
 242
 243        **Args:**
 244        - dataset_id (str): The ID of the dataset.
 245        - user (str): The email of the user to be removed.
 246        - policy (str): The policy to be removed from the user.
 247                Must be one of `steward`, `custodian`, or `snapshot_creator`.
 248
 249        **Returns:**
 250        - requests.Response: The response from the request.
 251
 252        **Raises:**
 253        - ValueError: If the policy is not valid.
 254        """
 255        self._check_policy(policy)
 256        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members/{user}"
 257        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
 258        return self.request_util.run_request(uri=uri, method=DELETE)
 259
 260    def delete_dataset(self, dataset_id: str) -> None:
 261        """
 262        Delete a dataset and monitors the job until completion.
 263
 264        **Args:**
 265            dataset_id (str): The ID of the dataset to be deleted.
 266        """
 267        uri = f"{self.TDR_LINK}/datasets/{dataset_id}"
 268        logging.info(f"Deleting dataset {dataset_id}")
 269        response = self.request_util.run_request(uri=uri, method=DELETE)
 270        job_id = response.json()['id']
 271        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
 272
 273    def get_snapshot_info(
 274            self,
 275            snapshot_id: str,
 276            continue_not_found: bool = False,
 277            info_to_include: Optional[list[str]] = None
 278    ) -> Optional[requests.Response]:
 279        """
 280        Get information about a snapshot.
 281
 282        **Args:**
 283        - snapshot_id (str): The ID of the snapshot.
 284        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
 285        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
 286                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
 287                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
 288
 289        **Returns:**
 290        - requests.Response (optional): The response from the request (returns None if the snapshot is not
 291         found or access is denied).
 292        """
 293        acceptable_return_code = [404, 403] if continue_not_found else []
 294        acceptable_include_info = [
 295            "SOURCES",
 296            "TABLES",
 297            "RELATIONSHIPS",
 298            "ACCESS_INFORMATION",
 299            "PROFILE",
 300            "PROPERTIES",
 301            "DATA_PROJECT",
 302            "CREATION_INFORMATION",
 303            "DUOS"
 304        ]
 305        if info_to_include:
 306            if not all(info in acceptable_include_info for info in info_to_include):
 307                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 308            include_string = '&include='.join(info_to_include)
 309        else:
 310            include_string = ""
 311        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include={include_string}"
 312        response = self.request_util.run_request(
 313            uri=uri,
 314            method=GET,
 315            accept_return_codes=acceptable_return_code
 316        )
 317        if response.status_code == 404:
 318            logging.warning(f"Snapshot {snapshot_id} not found")
 319            return None
 320        if response.status_code == 403:
 321            logging.warning(f"Access denied for snapshot {snapshot_id}")
 322            return None
 323        return response
 324
 325    def delete_snapshots(
 326            self,
 327            snapshot_ids: list[str],
 328            batch_size: int = 25,
 329            check_interval: int = 10,
 330            verbose: bool = False) -> None:
 331        """
 332        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
 333
 334        **Args:**
 335        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
 336        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
 337        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
 338        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 339        """
 340        SubmitAndMonitorMultipleJobs(
 341            tdr=self,
 342            job_function=self.delete_snapshot,
 343            job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
 344            batch_size=batch_size,
 345            check_interval=check_interval,
 346            verbose=verbose
 347        ).run()
 348
 349    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
 350        """
 351        Delete a snapshot.
 352
 353        **Args:**
 354        - snapshot_id (str): The ID of the snapshot to be deleted.
 355
 356        **Returns:**
 357        - requests.Response: The response from the request.
 358        """
 359        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}"
 360        logging.info(f"Deleting snapshot {snapshot_id}")
 361        return self.request_util.run_request(uri=uri, method=DELETE)
 362
 363    def _yield_existing_datasets(
 364            self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc"
 365    ) -> Any:
 366        """
 367        Get all datasets in TDR, optionally filtered by dataset name.
 368
 369        **Args:**
 370            filter (Optional[str]): A filter string to match dataset names. Defaults to None.
 371            batch_size (int): The number of datasets to retrieve per batch. Defaults to 100.
 372            direction (str): The direction to sort the datasets by creation date. Defaults to "asc".
 373
 374        Yields:
 375            Any: A generator yielding datasets.
 376        """
 377        offset = 0
 378        if filter:
 379            filter_str = f"&filter={filter}"
 380            log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}"
 381        else:
 382            filter_str = ""
 383            log_message = f"Searching for all datasets in batches of {batch_size}"
 384        logging.info(log_message)
 385        while True:
 386            uri = f"{self.TDR_LINK}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}"  # noqa: E501
 387            response = self.request_util.run_request(uri=uri, method=GET)
 388            datasets = response.json()["items"]
 389            if not datasets:
 390                break
 391            for dataset in datasets:
 392                yield dataset
 393            offset += batch_size
 394            break
 395
 396    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
 397        """
 398        Check if a dataset exists by name and optionally by billing profile.
 399
 400        **Args:**
 401        - dataset_name (str): The name of the dataset to check.
 402        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
 403
 404        **Returns:**
 405        - list[dict]: A list of matching datasets.
 406        """
 407        matching_datasets = []
 408        for dataset in self._yield_existing_datasets(filter=dataset_name):
 409            # Search uses wildcard so could grab more datasets where dataset_name is substring
 410            if dataset_name == dataset["name"]:
 411                if billing_profile:
 412                    if dataset["defaultProfileId"] == billing_profile:
 413                        logging.info(
 414                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
 415                        dataset_id = dataset["id"]
 416                        logging.info(f"Dataset ID: {dataset_id}")
 417                        matching_datasets.append(dataset)
 418                    else:
 419                        logging.warning(
 420                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
 421                            f"and not under billing profile {billing_profile}"
 422                        )
 423                        # Datasets names need to be unique regardless of billing profile, so raise an error if
 424                        # a dataset with the same name is found but is not under the requested billing profile
 425                        raise ValueError(
 426                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
 427                else:
 428                    matching_datasets.append(dataset)
 429        return matching_datasets
 430
 431    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
 432        """
 433        Get information about a dataset.
 434
 435        **Args:**
 436        - dataset_id (str): The ID of the dataset.
 437        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
 438        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
 439        Defaults to None.
 440
 441        **Returns:**
 442        - requests.Response: The response from the request.
 443
 444        **Raises:**
 445        - ValueError: If `info_to_include` contains invalid information types.
 446        """
 447        acceptable_include_info = [
 448            "SCHEMA",
 449            "ACCESS_INFORMATION",
 450            "PROFILE",
 451            "PROPERTIES",
 452            "DATA_PROJECT",
 453            "STORAGE",
 454            "SNAPSHOT_BUILDER_SETTING"
 455        ]
 456        if info_to_include:
 457            if not all(info in acceptable_include_info for info in info_to_include):
 458                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
 459            include_string = '&include='.join(info_to_include)
 460        else:
 461            include_string = ""
 462        uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include={include_string}"
 463        return self.request_util.run_request(uri=uri, method=GET)
 464
 465    def get_table_schema_info(
 466            self,
 467            dataset_id: str,
 468            table_name: str,
 469            dataset_info: Optional[dict] = None
 470    ) -> Union[dict, None]:
 471        """
 472        Get schema information for a specific table within a dataset.
 473
 474        **Args:**
 475        - dataset_id (str): The ID of the dataset.
 476        - table_name (str): The name of the table.
 477        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
 478
 479        **Returns:**
 480        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
 481        """
 482        if not dataset_info:
 483            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 484        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
 485            if table["name"] == table_name:
 486                return table
 487        return None
 488
 489    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
 490        """
 491        Retrieve the result of a job.
 492
 493        **Args:**
 494        - job_id (str): The ID of the job.
 495        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
 496
 497        **Returns:**
 498        - requests.Response: The response from the request.
 499        """
 500        uri = f"{self.TDR_LINK}/jobs/{job_id}/result"
 501        # If job is expected to fail, accept any return code
 502        acceptable_return_code = list(range(100, 600)) if expect_failure else []
 503        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
 504
 505    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
 506        """
 507        Load data into a TDR dataset.
 508
 509        **Args:**
 510        - dataset_id (str): The ID of the dataset.
 511        - data (dict): The data to be ingested.
 512
 513        **Returns:**
 514        - requests.Response: The response from the request.
 515        """
 516        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/ingest"
 517        logging.info(
 518            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
 519            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
 520        return self.request_util.run_request(
 521            uri=uri,
 522            method=POST,
 523            content_type="application/json",
 524            data=data
 525        )
 526
 527    def file_ingest_to_dataset(
 528            self,
 529            dataset_id: str,
 530            profile_id: str,
 531            file_list: list[dict],
 532            load_tag: str = "file_ingest_load_tag"
 533    ) -> dict:
 534        """
 535        Load files into a TDR dataset.
 536
 537        **Args:**
 538        - dataset_id (str): The ID of the dataset.
 539        - profile_id (str): The billing profile ID.
 540        - file_list (list[dict]): The list of files to be ingested.
 541        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
 542
 543        **Returns:**
 544        - dict: A dictionary containing the response from the ingest operation job monitoring.
 545        """
 546        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/bulk/array"
 547        data = {
 548            "profileId": profile_id,
 549            "loadTag": f"{load_tag}",
 550            "maxFailedFileLoads": 0,
 551            "loadArray": file_list
 552        }
 553
 554        response = self.request_util.run_request(
 555            uri=uri,
 556            method=POST,
 557            content_type="application/json",
 558            data=json.dumps(data)
 559        )
 560        job_id = response.json()['id']
 561        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 562        return job_results  # type: ignore[return-value]
 563
 564    def get_dataset_table_metrics(
 565            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
 566    ) -> list[dict]:
 567        """
 568        Retrieve all metrics for a specific table within a dataset.
 569
 570        **Args:**
 571        - dataset_id (str): The ID of the dataset.
 572        - target_table_name (str): The name of the target table.
 573        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 574
 575        **Returns:**
 576        - list[dict]: A list of dictionaries containing the metrics for the specified table.
 577        """
 578        return [
 579            metric
 580            for metric in self._yield_dataset_metrics(
 581                dataset_id=dataset_id,
 582                target_table_name=target_table_name,
 583                query_limit=query_limit
 584            )
 585        ]
 586
 587    def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any:
 588        """
 589        Yield all entity metrics from a dataset.
 590
 591        **Args:**
 592            dataset_id (str): The ID of the dataset.
 593            target_table_name (str): The name of the target table.
 594            query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
 595
 596        Yields:
 597            Any: A generator yielding dictionaries containing the metrics for the specified table.
 598        """
 599        search_request = {
 600            "offset": 0,
 601            "limit": query_limit,
 602            "sort": "datarepo_row_id"
 603        }
 604        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/data/{target_table_name}"
 605        while True:
 606            batch_number = int((search_request["offset"] / query_limit)) + 1  # type: ignore[operator]
 607            response = self.request_util.run_request(
 608                uri=uri,
 609                method=POST,
 610                content_type="application/json",
 611                data=json.dumps(search_request)
 612            )
 613            if not response or not response.json()["result"]:
 614                break
 615            logging.info(
 616                f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " +
 617                f"dataset {dataset_id}"
 618            )
 619            for record in response.json()["result"]:
 620                yield record
 621            search_request["offset"] += query_limit  # type: ignore[operator]
 622
 623    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
 624        """
 625        Get existing IDs from a dataset.
 626
 627        **Args:**
 628        - dataset_id (str): The ID of the dataset.
 629        - target_table_name (str): The name of the target table.
 630        - entity_id (str): The entity ID to retrieve.
 631
 632        **Returns:**
 633        - list[str]: A list of entity IDs from the specified table.
 634        """
 635        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
 636        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
 637
 638    def get_job_status(self, job_id: str) -> requests.Response:
 639        """
 640        Retrieve the status of a job.
 641
 642        **Args:**
 643        - job_id (str): The ID of the job.
 644
 645        **Returns:**
 646        - requests.Response: The response from the request.
 647        """
 648        uri = f"{self.TDR_LINK}/jobs/{job_id}"
 649        return self.request_util.run_request(uri=uri, method=GET)
 650
 651    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
 652        """
 653        Get all file UUIDs from the metadata of a dataset.
 654
 655        **Args:**
 656        - dataset_id (str): The ID of the dataset.
 657
 658        **Returns:**
 659        - list[str]: A list of file UUIDs from the dataset metadata.
 660        """
 661        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
 662        all_metadata_file_uuids = []
 663        tables = 0
 664        for table in dataset_info["schema"]["tables"]:
 665            tables += 1
 666            table_name = table["name"]
 667            logging.info(f"Getting all file information for {table_name}")
 668            # Get just columns where datatype is fileref
 669            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
 670            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
 671            # Get unique list of file uuids
 672            file_uuids = list(
 673                set(
 674                    [
 675                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
 676                    ]
 677                )
 678            )
 679            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
 680            all_metadata_file_uuids.extend(file_uuids)
 681            # Make full list unique
 682            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
 683        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
 684        return all_metadata_file_uuids
 685
 686    def soft_delete_entries(
 687            self,
 688            dataset_id: str,
 689            table_name: str,
 690            datarepo_row_ids: list[str],
 691            check_intervals: int = 15
 692    ) -> Optional[dict]:
 693        """
 694        Soft delete specific records from a table.
 695
 696        **Args:**
 697        - dataset_id (str): The ID of the dataset.
 698        - table_name (str): The name of the target table.
 699        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
 700        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 701
 702        **Returns:**
 703        - dict (optional): A dictionary containing the response from the soft delete operation job
 704        monitoring. Returns None if no row IDs are provided.
 705        """
 706        if not datarepo_row_ids:
 707            logging.info(f"No records found to soft delete in table {table_name}")
 708            return None
 709        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
 710        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/deletes"
 711        payload = {
 712            "deleteType": "soft",
 713            "specType": "jsonArray",
 714            "tables": [
 715                {
 716                    "tableName": table_name,
 717                    "jsonArraySpec": {
 718                        "rowIds": datarepo_row_ids
 719                    }
 720                }
 721            ]
 722        }
 723        response = self.request_util.run_request(
 724            method=POST,
 725            uri=uri,
 726            data=json.dumps(payload),
 727            content_type="application/json"
 728        )
 729        job_id = response.json()["id"]
 730        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
 731
 732    def soft_delete_all_table_entries(
 733            self,
 734            dataset_id: str,
 735            table_name: str,
 736            query_limit: int = 1000,
 737            check_intervals: int = 15
 738    ) -> Optional[dict]:
 739        """
 740        Soft deletes all records in a table.
 741
 742        **Args:**
 743        - dataset_id (str): The ID of the dataset.
 744        - table_name (str): The name of the target table.
 745        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 746        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
 747
 748        **Returns:**
 749        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
 750        None if no row IDs are provided.
 751        """
 752        dataset_metrics = self.get_dataset_table_metrics(
 753            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
 754        )
 755        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
 756        return self.soft_delete_entries(
 757            dataset_id=dataset_id,
 758            table_name=table_name,
 759            datarepo_row_ids=row_ids,
 760            check_intervals=check_intervals
 761        )
 762
 763    def get_or_create_dataset(
 764            self,
 765            dataset_name: str,
 766            billing_profile: str,
 767            schema: dict,
 768            description: str,
 769            delete_existing: bool = False,
 770            continue_if_exists: bool = False,
 771            additional_properties_dict: Optional[dict] = None
 772    ) -> str:
 773        """
 774        Get or create a dataset.
 775
 776        **Args:**
 777        - dataset_name (str): The name of the dataset.
 778        - billing_profile (str): The billing profile ID.
 779        - schema (dict): The schema of the dataset.
 780        - description (str): The description of the dataset.
 781        - additional_properties_dict (Optional[dict], optional): Additional properties
 782                for the dataset. Defaults to None.
 783        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
 784                Defaults to `False`.
 785        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
 786                Defaults to `False`.
 787
 788        **Returns:**
 789        - str: The ID of the dataset.
 790
 791        **Raises:**
 792        - ValueError: If multiple datasets with the same name are found under the billing profile.
 793        """
 794        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
 795        if existing_datasets:
 796            if not continue_if_exists:
 797                raise ValueError(
 798                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
 799                )
 800            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
 801            if delete_existing:
 802                logging.info(f"Deleting existing dataset {dataset_name}")
 803                self.delete_dataset(existing_datasets[0]["id"])
 804                existing_datasets = []
 805            # If not delete_existing and continue_if_exists then grab existing datasets id
 806            else:
 807                dataset_id = existing_datasets[0]["id"]
 808        if not existing_datasets:
 809            logging.info("Did not find existing dataset")
 810            # Create dataset
 811            dataset_id = self.create_dataset(
 812                schema=schema,
 813                dataset_name=dataset_name,
 814                description=description,
 815                profile_id=billing_profile,
 816                additional_dataset_properties=additional_properties_dict
 817            )
 818        return dataset_id
 819
 820    def create_dataset(  # type: ignore[return]
 821            self,
 822            schema: dict,
 823            dataset_name: str,
 824            description: str,
 825            profile_id: str,
 826            additional_dataset_properties: Optional[dict] = None
 827    ) -> Optional[str]:
 828        """
 829        Create a new dataset.
 830
 831        **Args:**
 832        - schema (dict): The schema of the dataset.
 833        - dataset_name (str): The name of the dataset.
 834        - description (str): The description of the dataset.
 835        - profile_id (str): The billing profile ID.
 836        - additional_dataset_properties (Optional[dict], optional): Additional
 837                properties for the dataset. Defaults to None.
 838
 839        **Returns:**
 840        - Optional[str]: The ID of the created dataset, or None if creation failed.
 841
 842        Raises:
 843        - ValueError: If the schema validation fails.
 844        """
 845        dataset_properties = {
 846            "name": dataset_name,
 847            "description": description,
 848            "defaultProfileId": profile_id,
 849            "region": "us-central1",
 850            "cloudPlatform": GCP,
 851            "schema": schema
 852        }
 853
 854        if additional_dataset_properties:
 855            dataset_properties.update(additional_dataset_properties)
 856        try:
 857            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
 858        except ValidationError as e:
 859            raise ValueError(f"Schema validation error: {e}")
 860        uri = f"{self.TDR_LINK}/datasets"
 861        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
 862        response = self.request_util.run_request(
 863            method=POST,
 864            uri=uri,
 865            data=json.dumps(dataset_properties),
 866            content_type="application/json"
 867        )
 868        job_id = response.json()["id"]
 869        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 870        dataset_id = job_results["id"]  # type: ignore[index]
 871        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
 872        return dataset_id
 873
 874    def update_dataset_schema(  # type: ignore[return]
 875            self,
 876            dataset_id: str,
 877            update_note: str,
 878            tables_to_add: Optional[list[dict]] = None,
 879            relationships_to_add: Optional[list[dict]] = None,
 880            columns_to_add: Optional[list[dict]] = None
 881    ) -> Optional[str]:
 882        """
 883        Update the schema of a dataset.
 884
 885        **Args:**
 886        - dataset_id (str): The ID of the dataset.
 887        - update_note (str): A note describing the update.
 888        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
 889        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
 890        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
 891
 892        **Returns:**
 893        - Optional[str]: The ID of the updated dataset, or None if the update failed.
 894
 895        **Raises:**
 896        - ValueError: If the schema validation fails.
 897        """
 898        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/updateSchema"
 899        request_body: dict = {"description": f"{update_note}", "changes": {}}
 900        if tables_to_add:
 901            request_body["changes"]["addTables"] = tables_to_add
 902        if relationships_to_add:
 903            request_body["changes"]["addRelationships"] = relationships_to_add
 904        if columns_to_add:
 905            request_body["changes"]["addColumns"] = columns_to_add
 906        try:
 907            UpdateSchema(**request_body)
 908        except ValidationError as e:
 909            raise ValueError(f"Schema validation error: {e}")
 910
 911        response = self.request_util.run_request(
 912            uri=uri,
 913            method=POST,
 914            content_type="application/json",
 915            data=json.dumps(request_body)
 916        )
 917        job_id = response.json()["id"]
 918        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
 919        dataset_id = job_results["id"]  # type: ignore[index]
 920        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
 921        return dataset_id
 922
 923    def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]:
 924        """
 925        Get response from a batched endpoint.
 926
 927        Helper method for all GET endpoints that require batching.
 928
 929        Given the URI and the limit (optional), will
 930        loop through batches until all metadata is retrieved.
 931
 932        **Args:**
 933        - uri (str): The base URI for the endpoint (without query params for offset or limit).
 934        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 935
 936        **Returns:**
 937        - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint.
 938        """
 939        batch = 1
 940        offset = 0
 941        metadata: list = []
 942        while True:
 943            logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata")
 944            response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json()
 945
 946            # If no more files, break the loop
 947            if not response_json:
 948                logging.info(f"No more results to retrieve, found {len(metadata)} total records")
 949                break
 950
 951            metadata.extend(response_json)
 952            # Increment the offset by limit for the next page
 953            offset += limit
 954            batch += 1
 955        return metadata
 956
 957    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
 958        """
 959        Return all the metadata about files in a given snapshot.
 960
 961        Not all files can be returned at once, so the API
 962        is used repeatedly until all "batches" have been returned.
 963
 964        **Args:**
 965        - snapshot_id (str): The ID of the snapshot.
 966        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
 967
 968        **Returns:**
 969        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
 970        """
 971        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}/files"
 972        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 973
 974    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
 975        """
 976        Return snapshots belonging to specified dataset.
 977
 978        **Args:**
 979        - dataset_id: uuid of dataset to query.
 980
 981        **Returns:**
 982        - requests.Response: The response from the request.
 983        """
 984        uri = f"{self.TDR_LINK}/snapshots?datasetIds={dataset_id}"
 985        return self.request_util.run_request(
 986            uri=uri,
 987            method=GET
 988        )
 989
 990
 991class FilterOutSampleIdsAlreadyInDataset:
 992    """Class to filter ingest metrics to remove sample IDs that already exist in the dataset."""
 993
 994    def __init__(
 995            self,
 996            ingest_metrics: list[dict],
 997            dataset_id: str,
 998            tdr: TDR,
 999            target_table_name: str,
1000            filter_entity_id: str
1001    ):
1002        """
1003        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1004
1005        **Args:**
1006        - ingest_metrics (list[dict]): The metrics to be ingested.
1007        - dataset_id (str): The ID of the dataset.
1008        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1009        - target_table_name (str): The name of the target table.
1010        - filter_entity_id (str): The entity ID to filter on.
1011        """
1012        self.ingest_metrics = ingest_metrics
1013        """@private"""
1014        self.tdr = tdr
1015        """@private"""
1016        self.dataset_id = dataset_id
1017        """@private"""
1018        self.target_table_name = target_table_name
1019        """@private"""
1020        self.filter_entity_id = filter_entity_id
1021        """@private"""
1022
1023    def run(self) -> list[dict]:
1024        """
1025        Run the filter process to remove sample IDs that already exist in the dataset.
1026
1027        **Returns:**
1028        - list[dict]: The filtered ingest metrics.
1029        """
1030        # Get all sample ids that already exist in dataset
1031        logging.info(
1032            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1033            f"dataset {self.dataset_id}"
1034        )
1035
1036        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1037            dataset_id=self.dataset_id,
1038            target_table_name=self.target_table_name,
1039            entity_id=self.filter_entity_id
1040        )
1041        # Filter out rows that already exist in dataset
1042        filtered_ingest_metrics = [
1043            row
1044            for row in self.ingest_metrics
1045            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1046        ]
1047        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1048            logging.info(
1049                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1050                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1051            )
1052
1053            if filtered_ingest_metrics:
1054                return filtered_ingest_metrics
1055            else:
1056                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1057                return []
1058        else:
1059            logging.info("No rows were filtered out as they all do not exist in dataset")
1060            return filtered_ingest_metrics
class TDR:
 19class TDR:
 20    """Class to interact with the Terra Data Repository (TDR) API."""
 21
 22    TDR_LINK = "https://data.terra.bio/api/repository/v1"
 23    """(str): The base URL for the TDR API."""
 24
 25    def __init__(self, request_util: RunRequest):
 26        """
 27        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
 28
 29        **Args:**
 30        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
 31        """
 32        self.request_util = request_util
 33        """@private"""
 34
 35    @staticmethod
 36    def _check_policy(policy: str) -> None:
 37        """
 38        Check if the policy is valid.
 39
 40        **Args:**
 41        - policy (str): The role to check.
 42
 43        **Raises:**
 44        - ValueError: If the policy is not one of the allowed options.
 45        """
 46        if policy not in ["steward", "custodian", "snapshot_creator"]:
 47            raise ValueError(f"Policy {policy} is not valid. Must be steward, custodian, or snapshot_creator")
 48
 49    def get_dataset_files(
 50            self,
 51            dataset_id: str,
 52            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
 53    ) -> list[dict]:
 54        """
 55        Get all files in a dataset.
 56
 57        Returns json like below
 58
 59            {
 60                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
 61                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
 62                "path": "/path/set/in/ingest.csv",
 63                "size": 1722,
 64                "checksums": [
 65                    {
 66                        "checksum": "82f7e79v",
 67                        "type": "crc32c"
 68                    },
 69                    {
 70                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
 71                        "type": "md5"
 72                    }
 73                ],
 74                "created": "2024-13-11T15:01:00.256Z",
 75                "description": null,
 76                "fileType": "file",
 77                "fileDetail": {
 78                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
 79                    "mimeType": null,
 80                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
 81                    "loadTag": "RP_3333-RP_3333"
 82                },
 83                "directoryDetail": null
 84            }
 85
 86        **Args:**
 87        - dataset_id (str): The ID of the dataset.
 88        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
 89
 90        **Returns:**
 91        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
 92        """
 93        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files"
 94        logging.info(f"Getting all files in dataset {dataset_id}")
 95        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
 96
 97    def create_file_dict(
 98            self,
 99            dataset_id: str,
100            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
101    ) -> dict:
102        """
103        Create a dictionary of all files in a dataset where the key is the file UUID.
104
105        **Args:**
106        - dataset_id (str): The ID of the dataset.
107        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
108
109        **Returns:**
110        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
111        """
112        return {
113            file_dict["fileId"]: file_dict
114            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
115        }
116
117    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
118            self,
119            dataset_id: str,
120            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
121    ) -> dict:
122        """
123        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
124
125        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
126
127        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
128
129        **Args:**
130        - dataset_id (str): The ID of the dataset.
131        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
132
133        **Returns:**
134        - dict: A dictionary where the key is the file UUID and the value is the file path.
135        """
136        return {
137            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
138            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
139        }
140
141    def get_sas_token(self, snapshot_id: str = "", dataset_id: str = "") -> dict:
142        """
143        Get the SAS token for a snapshot OR dataset. Only one should be provided.
144
145        **Args:**
146        - snapshot_id (str, optional): The ID of the snapshot. Defaults to "".
147        - dataset_id (str, optional): The ID of the dataset. Defaults to "".
148
149        **Returns:**
150        - dict: A dictionary containing the SAS token and its expiry time. Expiry
151        time is a string like `2025-02-13T19:31:47Z`.
152
153        **Raises:**
154        - ValueError: If neither `snapshot_id` nor `dataset_id` is provided.
155        """
156        if snapshot_id:
157            uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION"
158            response = self.request_util.run_request(uri=uri, method=GET)
159            snapshot_info = json.loads(response.text)
160            sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
161        elif dataset_id:
162            uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include=ACCESS_INFORMATION"
163            response = self.request_util.run_request(uri=uri, method=GET)
164            snapshot_info = json.loads(response.text)
165            sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
166        else:
167            raise ValueError("Must provide either snapshot_id or dataset_id")
168
169        sas_expiry_time_pattern = re.compile(r"se.+?(?=\&sp)")
170        expiry_time_str = sas_expiry_time_pattern.search(sas_token)
171        time_str = unquote(expiry_time_str.group()).replace("se=", "")  # type: ignore[union-attr]
172
173        return {"sas_token": sas_token, "expiry_time": time_str}
174
175    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
176        """
177        Delete a file from a dataset.
178
179        **Args:**
180        - file_id (str): The ID of the file to be deleted.
181        - dataset_id (str): The ID of the dataset.
182
183        **Returns:**
184        - requests.Response: The response from the request.
185        """
186        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/{file_id}"
187        logging.info(f"Submitting delete job for file {file_id}")
188        return self.request_util.run_request(uri=uri, method=DELETE)
189
190    def delete_files(
191            self,
192            file_ids: list[str],
193            dataset_id: str,
194            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
195            check_interval: int = 15) -> None:
196        """
197        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
198
199        **Args:**
200        - file_ids (list[str]): A list of file IDs to be deleted.
201        - dataset_id (str): The ID of the dataset.
202        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
203        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
204        """
205        SubmitAndMonitorMultipleJobs(
206            tdr=self,
207            job_function=self.delete_file,
208            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
209            batch_size=batch_size_to_delete_files,
210            check_interval=check_interval
211        ).run()
212
213    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
214        """
215        Add a user to a dataset with a specified policy.
216
217        **Args:**
218        - dataset_id (str): The ID of the dataset.
219        - user (str): The email of the user to be added.
220        - policy (str): The policy to be assigned to the user.
221                Must be one of `steward`, `custodian`, or `snapshot_creator`.
222
223        **Returns:**
224        - requests.Response: The response from the request.
225
226        **Raises:**
227        - ValueError: If the policy is not valid.
228        """
229        self._check_policy(policy)
230        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members"
231        member_dict = {"email": user}
232        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
233        return self.request_util.run_request(
234            uri=uri,
235            method=POST,
236            data=json.dumps(member_dict),
237            content_type="application/json"
238        )
239
240    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
241        """
242        Remove a user from a dataset.
243
244        **Args:**
245        - dataset_id (str): The ID of the dataset.
246        - user (str): The email of the user to be removed.
247        - policy (str): The policy to be removed from the user.
248                Must be one of `steward`, `custodian`, or `snapshot_creator`.
249
250        **Returns:**
251        - requests.Response: The response from the request.
252
253        **Raises:**
254        - ValueError: If the policy is not valid.
255        """
256        self._check_policy(policy)
257        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members/{user}"
258        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
259        return self.request_util.run_request(uri=uri, method=DELETE)
260
261    def delete_dataset(self, dataset_id: str) -> None:
262        """
263        Delete a dataset and monitors the job until completion.
264
265        **Args:**
266            dataset_id (str): The ID of the dataset to be deleted.
267        """
268        uri = f"{self.TDR_LINK}/datasets/{dataset_id}"
269        logging.info(f"Deleting dataset {dataset_id}")
270        response = self.request_util.run_request(uri=uri, method=DELETE)
271        job_id = response.json()['id']
272        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()
273
274    def get_snapshot_info(
275            self,
276            snapshot_id: str,
277            continue_not_found: bool = False,
278            info_to_include: Optional[list[str]] = None
279    ) -> Optional[requests.Response]:
280        """
281        Get information about a snapshot.
282
283        **Args:**
284        - snapshot_id (str): The ID of the snapshot.
285        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
286        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
287                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
288                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
289
290        **Returns:**
291        - requests.Response (optional): The response from the request (returns None if the snapshot is not
292         found or access is denied).
293        """
294        acceptable_return_code = [404, 403] if continue_not_found else []
295        acceptable_include_info = [
296            "SOURCES",
297            "TABLES",
298            "RELATIONSHIPS",
299            "ACCESS_INFORMATION",
300            "PROFILE",
301            "PROPERTIES",
302            "DATA_PROJECT",
303            "CREATION_INFORMATION",
304            "DUOS"
305        ]
306        if info_to_include:
307            if not all(info in acceptable_include_info for info in info_to_include):
308                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
309            include_string = '&include='.join(info_to_include)
310        else:
311            include_string = ""
312        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include={include_string}"
313        response = self.request_util.run_request(
314            uri=uri,
315            method=GET,
316            accept_return_codes=acceptable_return_code
317        )
318        if response.status_code == 404:
319            logging.warning(f"Snapshot {snapshot_id} not found")
320            return None
321        if response.status_code == 403:
322            logging.warning(f"Access denied for snapshot {snapshot_id}")
323            return None
324        return response
325
326    def delete_snapshots(
327            self,
328            snapshot_ids: list[str],
329            batch_size: int = 25,
330            check_interval: int = 10,
331            verbose: bool = False) -> None:
332        """
333        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
334
335        **Args:**
336        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
337        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
338        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
339        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
340        """
341        SubmitAndMonitorMultipleJobs(
342            tdr=self,
343            job_function=self.delete_snapshot,
344            job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
345            batch_size=batch_size,
346            check_interval=check_interval,
347            verbose=verbose
348        ).run()
349
350    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
351        """
352        Delete a snapshot.
353
354        **Args:**
355        - snapshot_id (str): The ID of the snapshot to be deleted.
356
357        **Returns:**
358        - requests.Response: The response from the request.
359        """
360        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}"
361        logging.info(f"Deleting snapshot {snapshot_id}")
362        return self.request_util.run_request(uri=uri, method=DELETE)
363
364    def _yield_existing_datasets(
365            self, filter: Optional[str] = None, batch_size: int = 100, direction: str = "asc"
366    ) -> Any:
367        """
368        Get all datasets in TDR, optionally filtered by dataset name.
369
370        **Args:**
371            filter (Optional[str]): A filter string to match dataset names. Defaults to None.
372            batch_size (int): The number of datasets to retrieve per batch. Defaults to 100.
373            direction (str): The direction to sort the datasets by creation date. Defaults to "asc".
374
375        Yields:
376            Any: A generator yielding datasets.
377        """
378        offset = 0
379        if filter:
380            filter_str = f"&filter={filter}"
381            log_message = f"Searching for datasets with filter {filter} in batches of {batch_size}"
382        else:
383            filter_str = ""
384            log_message = f"Searching for all datasets in batches of {batch_size}"
385        logging.info(log_message)
386        while True:
387            uri = f"{self.TDR_LINK}/datasets?offset={offset}&limit={batch_size}&sort=created_date&direction={direction}{filter_str}"  # noqa: E501
388            response = self.request_util.run_request(uri=uri, method=GET)
389            datasets = response.json()["items"]
390            if not datasets:
391                break
392            for dataset in datasets:
393                yield dataset
394            offset += batch_size
395            break
396
397    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
398        """
399        Check if a dataset exists by name and optionally by billing profile.
400
401        **Args:**
402        - dataset_name (str): The name of the dataset to check.
403        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
404
405        **Returns:**
406        - list[dict]: A list of matching datasets.
407        """
408        matching_datasets = []
409        for dataset in self._yield_existing_datasets(filter=dataset_name):
410            # Search uses wildcard so could grab more datasets where dataset_name is substring
411            if dataset_name == dataset["name"]:
412                if billing_profile:
413                    if dataset["defaultProfileId"] == billing_profile:
414                        logging.info(
415                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
416                        dataset_id = dataset["id"]
417                        logging.info(f"Dataset ID: {dataset_id}")
418                        matching_datasets.append(dataset)
419                    else:
420                        logging.warning(
421                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
422                            f"and not under billing profile {billing_profile}"
423                        )
424                        # Datasets names need to be unique regardless of billing profile, so raise an error if
425                        # a dataset with the same name is found but is not under the requested billing profile
426                        raise ValueError(
427                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
428                else:
429                    matching_datasets.append(dataset)
430        return matching_datasets
431
432    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
433        """
434        Get information about a dataset.
435
436        **Args:**
437        - dataset_id (str): The ID of the dataset.
438        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
439        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
440        Defaults to None.
441
442        **Returns:**
443        - requests.Response: The response from the request.
444
445        **Raises:**
446        - ValueError: If `info_to_include` contains invalid information types.
447        """
448        acceptable_include_info = [
449            "SCHEMA",
450            "ACCESS_INFORMATION",
451            "PROFILE",
452            "PROPERTIES",
453            "DATA_PROJECT",
454            "STORAGE",
455            "SNAPSHOT_BUILDER_SETTING"
456        ]
457        if info_to_include:
458            if not all(info in acceptable_include_info for info in info_to_include):
459                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
460            include_string = '&include='.join(info_to_include)
461        else:
462            include_string = ""
463        uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include={include_string}"
464        return self.request_util.run_request(uri=uri, method=GET)
465
466    def get_table_schema_info(
467            self,
468            dataset_id: str,
469            table_name: str,
470            dataset_info: Optional[dict] = None
471    ) -> Union[dict, None]:
472        """
473        Get schema information for a specific table within a dataset.
474
475        **Args:**
476        - dataset_id (str): The ID of the dataset.
477        - table_name (str): The name of the table.
478        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
479
480        **Returns:**
481        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
482        """
483        if not dataset_info:
484            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
485        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
486            if table["name"] == table_name:
487                return table
488        return None
489
490    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
491        """
492        Retrieve the result of a job.
493
494        **Args:**
495        - job_id (str): The ID of the job.
496        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
497
498        **Returns:**
499        - requests.Response: The response from the request.
500        """
501        uri = f"{self.TDR_LINK}/jobs/{job_id}/result"
502        # If job is expected to fail, accept any return code
503        acceptable_return_code = list(range(100, 600)) if expect_failure else []
504        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)
505
506    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
507        """
508        Load data into a TDR dataset.
509
510        **Args:**
511        - dataset_id (str): The ID of the dataset.
512        - data (dict): The data to be ingested.
513
514        **Returns:**
515        - requests.Response: The response from the request.
516        """
517        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/ingest"
518        logging.info(
519            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
520            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
521        return self.request_util.run_request(
522            uri=uri,
523            method=POST,
524            content_type="application/json",
525            data=data
526        )
527
528    def file_ingest_to_dataset(
529            self,
530            dataset_id: str,
531            profile_id: str,
532            file_list: list[dict],
533            load_tag: str = "file_ingest_load_tag"
534    ) -> dict:
535        """
536        Load files into a TDR dataset.
537
538        **Args:**
539        - dataset_id (str): The ID of the dataset.
540        - profile_id (str): The billing profile ID.
541        - file_list (list[dict]): The list of files to be ingested.
542        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
543
544        **Returns:**
545        - dict: A dictionary containing the response from the ingest operation job monitoring.
546        """
547        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/bulk/array"
548        data = {
549            "profileId": profile_id,
550            "loadTag": f"{load_tag}",
551            "maxFailedFileLoads": 0,
552            "loadArray": file_list
553        }
554
555        response = self.request_util.run_request(
556            uri=uri,
557            method=POST,
558            content_type="application/json",
559            data=json.dumps(data)
560        )
561        job_id = response.json()['id']
562        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
563        return job_results  # type: ignore[return-value]
564
565    def get_dataset_table_metrics(
566            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
567    ) -> list[dict]:
568        """
569        Retrieve all metrics for a specific table within a dataset.
570
571        **Args:**
572        - dataset_id (str): The ID of the dataset.
573        - target_table_name (str): The name of the target table.
574        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
575
576        **Returns:**
577        - list[dict]: A list of dictionaries containing the metrics for the specified table.
578        """
579        return [
580            metric
581            for metric in self._yield_dataset_metrics(
582                dataset_id=dataset_id,
583                target_table_name=target_table_name,
584                query_limit=query_limit
585            )
586        ]
587
588    def _yield_dataset_metrics(self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> Any:
589        """
590        Yield all entity metrics from a dataset.
591
592        **Args:**
593            dataset_id (str): The ID of the dataset.
594            target_table_name (str): The name of the target table.
595            query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
596
597        Yields:
598            Any: A generator yielding dictionaries containing the metrics for the specified table.
599        """
600        search_request = {
601            "offset": 0,
602            "limit": query_limit,
603            "sort": "datarepo_row_id"
604        }
605        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/data/{target_table_name}"
606        while True:
607            batch_number = int((search_request["offset"] / query_limit)) + 1  # type: ignore[operator]
608            response = self.request_util.run_request(
609                uri=uri,
610                method=POST,
611                content_type="application/json",
612                data=json.dumps(search_request)
613            )
614            if not response or not response.json()["result"]:
615                break
616            logging.info(
617                f"Downloading batch {batch_number} of max {query_limit} records from {target_table_name} table " +
618                f"dataset {dataset_id}"
619            )
620            for record in response.json()["result"]:
621                yield record
622            search_request["offset"] += query_limit  # type: ignore[operator]
623
624    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
625        """
626        Get existing IDs from a dataset.
627
628        **Args:**
629        - dataset_id (str): The ID of the dataset.
630        - target_table_name (str): The name of the target table.
631        - entity_id (str): The entity ID to retrieve.
632
633        **Returns:**
634        - list[str]: A list of entity IDs from the specified table.
635        """
636        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
637        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]
638
639    def get_job_status(self, job_id: str) -> requests.Response:
640        """
641        Retrieve the status of a job.
642
643        **Args:**
644        - job_id (str): The ID of the job.
645
646        **Returns:**
647        - requests.Response: The response from the request.
648        """
649        uri = f"{self.TDR_LINK}/jobs/{job_id}"
650        return self.request_util.run_request(uri=uri, method=GET)
651
652    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
653        """
654        Get all file UUIDs from the metadata of a dataset.
655
656        **Args:**
657        - dataset_id (str): The ID of the dataset.
658
659        **Returns:**
660        - list[str]: A list of file UUIDs from the dataset metadata.
661        """
662        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
663        all_metadata_file_uuids = []
664        tables = 0
665        for table in dataset_info["schema"]["tables"]:
666            tables += 1
667            table_name = table["name"]
668            logging.info(f"Getting all file information for {table_name}")
669            # Get just columns where datatype is fileref
670            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
671            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
672            # Get unique list of file uuids
673            file_uuids = list(
674                set(
675                    [
676                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
677                    ]
678                )
679            )
680            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
681            all_metadata_file_uuids.extend(file_uuids)
682            # Make full list unique
683            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
684        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
685        return all_metadata_file_uuids
686
687    def soft_delete_entries(
688            self,
689            dataset_id: str,
690            table_name: str,
691            datarepo_row_ids: list[str],
692            check_intervals: int = 15
693    ) -> Optional[dict]:
694        """
695        Soft delete specific records from a table.
696
697        **Args:**
698        - dataset_id (str): The ID of the dataset.
699        - table_name (str): The name of the target table.
700        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
701        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
702
703        **Returns:**
704        - dict (optional): A dictionary containing the response from the soft delete operation job
705        monitoring. Returns None if no row IDs are provided.
706        """
707        if not datarepo_row_ids:
708            logging.info(f"No records found to soft delete in table {table_name}")
709            return None
710        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
711        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/deletes"
712        payload = {
713            "deleteType": "soft",
714            "specType": "jsonArray",
715            "tables": [
716                {
717                    "tableName": table_name,
718                    "jsonArraySpec": {
719                        "rowIds": datarepo_row_ids
720                    }
721                }
722            ]
723        }
724        response = self.request_util.run_request(
725            method=POST,
726            uri=uri,
727            data=json.dumps(payload),
728            content_type="application/json"
729        )
730        job_id = response.json()["id"]
731        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()
732
733    def soft_delete_all_table_entries(
734            self,
735            dataset_id: str,
736            table_name: str,
737            query_limit: int = 1000,
738            check_intervals: int = 15
739    ) -> Optional[dict]:
740        """
741        Soft deletes all records in a table.
742
743        **Args:**
744        - dataset_id (str): The ID of the dataset.
745        - table_name (str): The name of the target table.
746        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
747        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
748
749        **Returns:**
750        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
751        None if no row IDs are provided.
752        """
753        dataset_metrics = self.get_dataset_table_metrics(
754            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
755        )
756        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
757        return self.soft_delete_entries(
758            dataset_id=dataset_id,
759            table_name=table_name,
760            datarepo_row_ids=row_ids,
761            check_intervals=check_intervals
762        )
763
764    def get_or_create_dataset(
765            self,
766            dataset_name: str,
767            billing_profile: str,
768            schema: dict,
769            description: str,
770            delete_existing: bool = False,
771            continue_if_exists: bool = False,
772            additional_properties_dict: Optional[dict] = None
773    ) -> str:
774        """
775        Get or create a dataset.
776
777        **Args:**
778        - dataset_name (str): The name of the dataset.
779        - billing_profile (str): The billing profile ID.
780        - schema (dict): The schema of the dataset.
781        - description (str): The description of the dataset.
782        - additional_properties_dict (Optional[dict], optional): Additional properties
783                for the dataset. Defaults to None.
784        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
785                Defaults to `False`.
786        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
787                Defaults to `False`.
788
789        **Returns:**
790        - str: The ID of the dataset.
791
792        **Raises:**
793        - ValueError: If multiple datasets with the same name are found under the billing profile.
794        """
795        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
796        if existing_datasets:
797            if not continue_if_exists:
798                raise ValueError(
799                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
800                )
801            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
802            if delete_existing:
803                logging.info(f"Deleting existing dataset {dataset_name}")
804                self.delete_dataset(existing_datasets[0]["id"])
805                existing_datasets = []
806            # If not delete_existing and continue_if_exists then grab existing datasets id
807            else:
808                dataset_id = existing_datasets[0]["id"]
809        if not existing_datasets:
810            logging.info("Did not find existing dataset")
811            # Create dataset
812            dataset_id = self.create_dataset(
813                schema=schema,
814                dataset_name=dataset_name,
815                description=description,
816                profile_id=billing_profile,
817                additional_dataset_properties=additional_properties_dict
818            )
819        return dataset_id
820
821    def create_dataset(  # type: ignore[return]
822            self,
823            schema: dict,
824            dataset_name: str,
825            description: str,
826            profile_id: str,
827            additional_dataset_properties: Optional[dict] = None
828    ) -> Optional[str]:
829        """
830        Create a new dataset.
831
832        **Args:**
833        - schema (dict): The schema of the dataset.
834        - dataset_name (str): The name of the dataset.
835        - description (str): The description of the dataset.
836        - profile_id (str): The billing profile ID.
837        - additional_dataset_properties (Optional[dict], optional): Additional
838                properties for the dataset. Defaults to None.
839
840        **Returns:**
841        - Optional[str]: The ID of the created dataset, or None if creation failed.
842
843        Raises:
844        - ValueError: If the schema validation fails.
845        """
846        dataset_properties = {
847            "name": dataset_name,
848            "description": description,
849            "defaultProfileId": profile_id,
850            "region": "us-central1",
851            "cloudPlatform": GCP,
852            "schema": schema
853        }
854
855        if additional_dataset_properties:
856            dataset_properties.update(additional_dataset_properties)
857        try:
858            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
859        except ValidationError as e:
860            raise ValueError(f"Schema validation error: {e}")
861        uri = f"{self.TDR_LINK}/datasets"
862        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
863        response = self.request_util.run_request(
864            method=POST,
865            uri=uri,
866            data=json.dumps(dataset_properties),
867            content_type="application/json"
868        )
869        job_id = response.json()["id"]
870        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
871        dataset_id = job_results["id"]  # type: ignore[index]
872        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
873        return dataset_id
874
875    def update_dataset_schema(  # type: ignore[return]
876            self,
877            dataset_id: str,
878            update_note: str,
879            tables_to_add: Optional[list[dict]] = None,
880            relationships_to_add: Optional[list[dict]] = None,
881            columns_to_add: Optional[list[dict]] = None
882    ) -> Optional[str]:
883        """
884        Update the schema of a dataset.
885
886        **Args:**
887        - dataset_id (str): The ID of the dataset.
888        - update_note (str): A note describing the update.
889        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
890        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
891        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
892
893        **Returns:**
894        - Optional[str]: The ID of the updated dataset, or None if the update failed.
895
896        **Raises:**
897        - ValueError: If the schema validation fails.
898        """
899        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/updateSchema"
900        request_body: dict = {"description": f"{update_note}", "changes": {}}
901        if tables_to_add:
902            request_body["changes"]["addTables"] = tables_to_add
903        if relationships_to_add:
904            request_body["changes"]["addRelationships"] = relationships_to_add
905        if columns_to_add:
906            request_body["changes"]["addColumns"] = columns_to_add
907        try:
908            UpdateSchema(**request_body)
909        except ValidationError as e:
910            raise ValueError(f"Schema validation error: {e}")
911
912        response = self.request_util.run_request(
913            uri=uri,
914            method=POST,
915            content_type="application/json",
916            data=json.dumps(request_body)
917        )
918        job_id = response.json()["id"]
919        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
920        dataset_id = job_results["id"]  # type: ignore[index]
921        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
922        return dataset_id
923
924    def _get_response_from_batched_endpoint(self, uri: str, limit: int = 1000) -> list[dict]:
925        """
926        Get response from a batched endpoint.
927
928        Helper method for all GET endpoints that require batching.
929
930        Given the URI and the limit (optional), will
931        loop through batches until all metadata is retrieved.
932
933        **Args:**
934        - uri (str): The base URI for the endpoint (without query params for offset or limit).
935        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
936
937        **Returns:**
938        - list[dict]: A list of dictionaries containing the metadata retrieved from the endpoint.
939        """
940        batch = 1
941        offset = 0
942        metadata: list = []
943        while True:
944            logging.info(f"Retrieving {(batch - 1) * limit} to {batch * limit} records in metadata")
945            response_json = self.request_util.run_request(uri=f"{uri}?offset={offset}&limit={limit}", method=GET).json()
946
947            # If no more files, break the loop
948            if not response_json:
949                logging.info(f"No more results to retrieve, found {len(metadata)} total records")
950                break
951
952            metadata.extend(response_json)
953            # Increment the offset by limit for the next page
954            offset += limit
955            batch += 1
956        return metadata
957
958    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
959        """
960        Return all the metadata about files in a given snapshot.
961
962        Not all files can be returned at once, so the API
963        is used repeatedly until all "batches" have been returned.
964
965        **Args:**
966        - snapshot_id (str): The ID of the snapshot.
967        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
968
969        **Returns:**
970        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
971        """
972        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}/files"
973        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)
974
975    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
976        """
977        Return snapshots belonging to specified dataset.
978
979        **Args:**
980        - dataset_id: uuid of dataset to query.
981
982        **Returns:**
983        - requests.Response: The response from the request.
984        """
985        uri = f"{self.TDR_LINK}/snapshots?datasetIds={dataset_id}"
986        return self.request_util.run_request(
987            uri=uri,
988            method=GET
989        )

Class to interact with the Terra Data Repository (TDR) API.

TDR(request_util: ops_utils.request_util.RunRequest)
25    def __init__(self, request_util: RunRequest):
26        """
27        Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).
28
29        **Args:**
30        - request_util (`ops_utils.request_util.RunRequest`): Utility for making HTTP requests.
31        """
32        self.request_util = request_util
33        """@private"""

Initialize the TDR class (A class to interact with the Terra Data Repository (TDR) API).

Args:

def get_dataset_files(self, dataset_id: str, limit: int = 20000) -> list[dict]:
49    def get_dataset_files(
50            self,
51            dataset_id: str,
52            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
53    ) -> list[dict]:
54        """
55        Get all files in a dataset.
56
57        Returns json like below
58
59            {
60                "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
61                "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
62                "path": "/path/set/in/ingest.csv",
63                "size": 1722,
64                "checksums": [
65                    {
66                        "checksum": "82f7e79v",
67                        "type": "crc32c"
68                    },
69                    {
70                        "checksum": "fff973507e30b74fa47a3d6830b84a90",
71                        "type": "md5"
72                    }
73                ],
74                "created": "2024-13-11T15:01:00.256Z",
75                "description": null,
76                "fileType": "file",
77                "fileDetail": {
78                    "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
79                    "mimeType": null,
80                    "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
81                    "loadTag": "RP_3333-RP_3333"
82                },
83                "directoryDetail": null
84            }
85
86        **Args:**
87        - dataset_id (str): The ID of the dataset.
88        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
89
90        **Returns:**
91        - list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
92        """
93        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files"
94        logging.info(f"Getting all files in dataset {dataset_id}")
95        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)

Get all files in a dataset.

Returns json like below

{
    "fileId": "68ba8bfc-1d84-4ef3-99b8-cf1754d5rrrr",
    "collectionId": "b20b6024-5943-4c23-82e7-9c24f545fuy7",
    "path": "/path/set/in/ingest.csv",
    "size": 1722,
    "checksums": [
        {
            "checksum": "82f7e79v",
            "type": "crc32c"
        },
        {
            "checksum": "fff973507e30b74fa47a3d6830b84a90",
            "type": "md5"
        }
    ],
    "created": "2024-13-11T15:01:00.256Z",
    "description": null,
    "fileType": "file",
    "fileDetail": {
        "datasetId": "b20b6024-5943-4c23-82e7-9c24f5456444",
        "mimeType": null,
        "accessUrl": "gs://datarepo-bucket/path/to/actual/file.csv",
        "loadTag": "RP_3333-RP_3333"
    },
    "directoryDetail": null
}

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • list[dict]: A list of dictionaries containing the metadata of the files in the dataset.
def create_file_dict(self, dataset_id: str, limit: int = 20000) -> dict:
 97    def create_file_dict(
 98            self,
 99            dataset_id: str,
100            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
101    ) -> dict:
102        """
103        Create a dictionary of all files in a dataset where the key is the file UUID.
104
105        **Args:**
106        - dataset_id (str): The ID of the dataset.
107        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
108
109        **Returns:**
110        - dict: A dictionary where the key is the file UUID and the value is the file metadata.
111        """
112        return {
113            file_dict["fileId"]: file_dict
114            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
115        }

Create a dictionary of all files in a dataset where the key is the file UUID.

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • dict: A dictionary where the key is the file UUID and the value is the file metadata.
def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(self, dataset_id: str, limit: int = 20000) -> dict:
117    def create_file_uuid_dict_for_ingest_for_experimental_self_hosted_dataset(
118            self,
119            dataset_id: str,
120            limit: int = ARG_DEFAULTS['batch_size_to_list_files']  # type: ignore[assignment]
121    ) -> dict:
122        """
123        Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.
124
125        This assumes that the TDR 'path' is original path of the file in the cloud storage with `gs://` stripped out.
126
127        This will ONLY work if dataset was created with `experimentalSelfHosted = True`
128
129        **Args:**
130        - dataset_id (str): The ID of the dataset.
131        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `20000`.
132
133        **Returns:**
134        - dict: A dictionary where the key is the file UUID and the value is the file path.
135        """
136        return {
137            file_dict['fileDetail']['accessUrl']: file_dict['fileId']
138            for file_dict in self.get_dataset_files(dataset_id=dataset_id, limit=limit)
139        }

Create a dictionary of all files in a dataset where the key is the file 'path' and the value is the file UUID.

This assumes that the TDR 'path' is original path of the file in the cloud storage with gs:// stripped out.

This will ONLY work if dataset was created with experimentalSelfHosted = True

Args:

  • dataset_id (str): The ID of the dataset.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 20000.

Returns:

  • dict: A dictionary where the key is the file UUID and the value is the file path.
def get_sas_token(self, snapshot_id: str = '', dataset_id: str = '') -> dict:
141    def get_sas_token(self, snapshot_id: str = "", dataset_id: str = "") -> dict:
142        """
143        Get the SAS token for a snapshot OR dataset. Only one should be provided.
144
145        **Args:**
146        - snapshot_id (str, optional): The ID of the snapshot. Defaults to "".
147        - dataset_id (str, optional): The ID of the dataset. Defaults to "".
148
149        **Returns:**
150        - dict: A dictionary containing the SAS token and its expiry time. Expiry
151        time is a string like `2025-02-13T19:31:47Z`.
152
153        **Raises:**
154        - ValueError: If neither `snapshot_id` nor `dataset_id` is provided.
155        """
156        if snapshot_id:
157            uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include=ACCESS_INFORMATION"
158            response = self.request_util.run_request(uri=uri, method=GET)
159            snapshot_info = json.loads(response.text)
160            sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
161        elif dataset_id:
162            uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include=ACCESS_INFORMATION"
163            response = self.request_util.run_request(uri=uri, method=GET)
164            snapshot_info = json.loads(response.text)
165            sas_token = snapshot_info["accessInformation"]["parquet"]["sasToken"]
166        else:
167            raise ValueError("Must provide either snapshot_id or dataset_id")
168
169        sas_expiry_time_pattern = re.compile(r"se.+?(?=\&sp)")
170        expiry_time_str = sas_expiry_time_pattern.search(sas_token)
171        time_str = unquote(expiry_time_str.group()).replace("se=", "")  # type: ignore[union-attr]
172
173        return {"sas_token": sas_token, "expiry_time": time_str}

Get the SAS token for a snapshot OR dataset. Only one should be provided.

Args:

  • snapshot_id (str, optional): The ID of the snapshot. Defaults to "".
  • dataset_id (str, optional): The ID of the dataset. Defaults to "".

Returns:

  • dict: A dictionary containing the SAS token and its expiry time. Expiry time is a string like 2025-02-13T19:31:47Z.

Raises:

  • ValueError: If neither snapshot_id nor dataset_id is provided.
def delete_file(self, file_id: str, dataset_id: str) -> requests.models.Response:
175    def delete_file(self, file_id: str, dataset_id: str) -> requests.Response:
176        """
177        Delete a file from a dataset.
178
179        **Args:**
180        - file_id (str): The ID of the file to be deleted.
181        - dataset_id (str): The ID of the dataset.
182
183        **Returns:**
184        - requests.Response: The response from the request.
185        """
186        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/{file_id}"
187        logging.info(f"Submitting delete job for file {file_id}")
188        return self.request_util.run_request(uri=uri, method=DELETE)

Delete a file from a dataset.

Args:

  • file_id (str): The ID of the file to be deleted.
  • dataset_id (str): The ID of the dataset.

Returns:

  • requests.Response: The response from the request.
def delete_files( self, file_ids: list[str], dataset_id: str, batch_size_to_delete_files: int = 200, check_interval: int = 15) -> None:
190    def delete_files(
191            self,
192            file_ids: list[str],
193            dataset_id: str,
194            batch_size_to_delete_files: int = ARG_DEFAULTS["batch_size_to_delete_files"],  # type: ignore[assignment]
195            check_interval: int = 15) -> None:
196        """
197        Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.
198
199        **Args:**
200        - file_ids (list[str]): A list of file IDs to be deleted.
201        - dataset_id (str): The ID of the dataset.
202        - batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to `200`.
203        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
204        """
205        SubmitAndMonitorMultipleJobs(
206            tdr=self,
207            job_function=self.delete_file,
208            job_args_list=[(file_id, dataset_id) for file_id in file_ids],
209            batch_size=batch_size_to_delete_files,
210            check_interval=check_interval
211        ).run()

Delete multiple files from a dataset in batches and monitor delete jobs until completion for each batch.

Args:

  • file_ids (list[str]): A list of file IDs to be deleted.
  • dataset_id (str): The ID of the dataset.
  • batch_size_to_delete_files (int, optional): The number of files to delete per batch. Defaults to 200.
  • check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to 15.
def add_user_to_dataset( self, dataset_id: str, user: str, policy: str) -> requests.models.Response:
213    def add_user_to_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
214        """
215        Add a user to a dataset with a specified policy.
216
217        **Args:**
218        - dataset_id (str): The ID of the dataset.
219        - user (str): The email of the user to be added.
220        - policy (str): The policy to be assigned to the user.
221                Must be one of `steward`, `custodian`, or `snapshot_creator`.
222
223        **Returns:**
224        - requests.Response: The response from the request.
225
226        **Raises:**
227        - ValueError: If the policy is not valid.
228        """
229        self._check_policy(policy)
230        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members"
231        member_dict = {"email": user}
232        logging.info(f"Adding user {user} to dataset {dataset_id} with policy {policy}")
233        return self.request_util.run_request(
234            uri=uri,
235            method=POST,
236            data=json.dumps(member_dict),
237            content_type="application/json"
238        )

Add a user to a dataset with a specified policy.

Args:

  • dataset_id (str): The ID of the dataset.
  • user (str): The email of the user to be added.
  • policy (str): The policy to be assigned to the user. Must be one of steward, custodian, or snapshot_creator.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If the policy is not valid.
def remove_user_from_dataset( self, dataset_id: str, user: str, policy: str) -> requests.models.Response:
240    def remove_user_from_dataset(self, dataset_id: str, user: str, policy: str) -> requests.Response:
241        """
242        Remove a user from a dataset.
243
244        **Args:**
245        - dataset_id (str): The ID of the dataset.
246        - user (str): The email of the user to be removed.
247        - policy (str): The policy to be removed from the user.
248                Must be one of `steward`, `custodian`, or `snapshot_creator`.
249
250        **Returns:**
251        - requests.Response: The response from the request.
252
253        **Raises:**
254        - ValueError: If the policy is not valid.
255        """
256        self._check_policy(policy)
257        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/policies/{policy}/members/{user}"
258        logging.info(f"Removing user {user} from dataset {dataset_id} with policy {policy}")
259        return self.request_util.run_request(uri=uri, method=DELETE)

Remove a user from a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • user (str): The email of the user to be removed.
  • policy (str): The policy to be removed from the user. Must be one of steward, custodian, or snapshot_creator.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If the policy is not valid.
def delete_dataset(self, dataset_id: str) -> None:
261    def delete_dataset(self, dataset_id: str) -> None:
262        """
263        Delete a dataset and monitors the job until completion.
264
265        **Args:**
266            dataset_id (str): The ID of the dataset to be deleted.
267        """
268        uri = f"{self.TDR_LINK}/datasets/{dataset_id}"
269        logging.info(f"Deleting dataset {dataset_id}")
270        response = self.request_util.run_request(uri=uri, method=DELETE)
271        job_id = response.json()['id']
272        MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=False).run()

Delete a dataset and monitors the job until completion.

Args: dataset_id (str): The ID of the dataset to be deleted.

def get_snapshot_info( self, snapshot_id: str, continue_not_found: bool = False, info_to_include: Optional[list[str]] = None) -> Optional[requests.models.Response]:
274    def get_snapshot_info(
275            self,
276            snapshot_id: str,
277            continue_not_found: bool = False,
278            info_to_include: Optional[list[str]] = None
279    ) -> Optional[requests.Response]:
280        """
281        Get information about a snapshot.
282
283        **Args:**
284        - snapshot_id (str): The ID of the snapshot.
285        - continue_not_found (bool, optional): Whether to accept a `404` response. Defaults to `False`.
286        - info_to_include (list[str], optional): A list of additional information to include. Defaults to None.
287                Options are: `SOURCES`, `TABLES`, `RELATIONSHIPS`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`,
288                `DATA_PROJECT`,`CREATION_INFORMATION`, `DUOS`
289
290        **Returns:**
291        - requests.Response (optional): The response from the request (returns None if the snapshot is not
292         found or access is denied).
293        """
294        acceptable_return_code = [404, 403] if continue_not_found else []
295        acceptable_include_info = [
296            "SOURCES",
297            "TABLES",
298            "RELATIONSHIPS",
299            "ACCESS_INFORMATION",
300            "PROFILE",
301            "PROPERTIES",
302            "DATA_PROJECT",
303            "CREATION_INFORMATION",
304            "DUOS"
305        ]
306        if info_to_include:
307            if not all(info in acceptable_include_info for info in info_to_include):
308                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
309            include_string = '&include='.join(info_to_include)
310        else:
311            include_string = ""
312        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}?include={include_string}"
313        response = self.request_util.run_request(
314            uri=uri,
315            method=GET,
316            accept_return_codes=acceptable_return_code
317        )
318        if response.status_code == 404:
319            logging.warning(f"Snapshot {snapshot_id} not found")
320            return None
321        if response.status_code == 403:
322            logging.warning(f"Access denied for snapshot {snapshot_id}")
323            return None
324        return response

Get information about a snapshot.

Args:

  • snapshot_id (str): The ID of the snapshot.
  • continue_not_found (bool, optional): Whether to accept a 404 response. Defaults to False.
  • info_to_include (list[str], optional): A list of additional information to include. Defaults to None. Options are: SOURCES, TABLES, RELATIONSHIPS, ACCESS_INFORMATION, PROFILE, PROPERTIES, DATA_PROJECT,CREATION_INFORMATION, DUOS

Returns:

  • requests.Response (optional): The response from the request (returns None if the snapshot is not found or access is denied).
def delete_snapshots( self, snapshot_ids: list[str], batch_size: int = 25, check_interval: int = 10, verbose: bool = False) -> None:
326    def delete_snapshots(
327            self,
328            snapshot_ids: list[str],
329            batch_size: int = 25,
330            check_interval: int = 10,
331            verbose: bool = False) -> None:
332        """
333        Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.
334
335        **Args:**
336        - snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
337        - batch_size (int, optional): The number of snapshots to delete per batch. Defaults to `25`.
338        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `10`.
339        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
340        """
341        SubmitAndMonitorMultipleJobs(
342            tdr=self,
343            job_function=self.delete_snapshot,
344            job_args_list=[(snapshot_id,) for snapshot_id in snapshot_ids],
345            batch_size=batch_size,
346            check_interval=check_interval,
347            verbose=verbose
348        ).run()

Delete multiple snapshots from a dataset in batches and monitor delete jobs until completion for each batch.

Args:

  • snapshot_ids (list[str]): A list of snapshot IDs to be deleted.
  • batch_size (int, optional): The number of snapshots to delete per batch. Defaults to 25.
  • check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to 10.
  • verbose (bool, optional): Whether to log detailed information about each job. Defaults to False.
def delete_snapshot(self, snapshot_id: str) -> requests.models.Response:
350    def delete_snapshot(self, snapshot_id: str) -> requests.Response:
351        """
352        Delete a snapshot.
353
354        **Args:**
355        - snapshot_id (str): The ID of the snapshot to be deleted.
356
357        **Returns:**
358        - requests.Response: The response from the request.
359        """
360        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}"
361        logging.info(f"Deleting snapshot {snapshot_id}")
362        return self.request_util.run_request(uri=uri, method=DELETE)

Delete a snapshot.

Args:

  • snapshot_id (str): The ID of the snapshot to be deleted.

Returns:

  • requests.Response: The response from the request.
def check_if_dataset_exists( self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
397    def check_if_dataset_exists(self, dataset_name: str, billing_profile: Optional[str] = None) -> list[dict]:
398        """
399        Check if a dataset exists by name and optionally by billing profile.
400
401        **Args:**
402        - dataset_name (str): The name of the dataset to check.
403        - billing_profile (str, optional): The billing profile ID to match. Defaults to None.
404
405        **Returns:**
406        - list[dict]: A list of matching datasets.
407        """
408        matching_datasets = []
409        for dataset in self._yield_existing_datasets(filter=dataset_name):
410            # Search uses wildcard so could grab more datasets where dataset_name is substring
411            if dataset_name == dataset["name"]:
412                if billing_profile:
413                    if dataset["defaultProfileId"] == billing_profile:
414                        logging.info(
415                            f"Dataset {dataset['name']} already exists under billing profile {billing_profile}")
416                        dataset_id = dataset["id"]
417                        logging.info(f"Dataset ID: {dataset_id}")
418                        matching_datasets.append(dataset)
419                    else:
420                        logging.warning(
421                            f"Dataset {dataset['name']} exists but is under {dataset['defaultProfileId']} " +
422                            f"and not under billing profile {billing_profile}"
423                        )
424                        # Datasets names need to be unique regardless of billing profile, so raise an error if
425                        # a dataset with the same name is found but is not under the requested billing profile
426                        raise ValueError(
427                            f"Dataset {dataset_name} already exists but is not under billing profile {billing_profile}")
428                else:
429                    matching_datasets.append(dataset)
430        return matching_datasets

Check if a dataset exists by name and optionally by billing profile.

Args:

  • dataset_name (str): The name of the dataset to check.
  • billing_profile (str, optional): The billing profile ID to match. Defaults to None.

Returns:

  • list[dict]: A list of matching datasets.
def get_dataset_info( self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.models.Response:
432    def get_dataset_info(self, dataset_id: str, info_to_include: Optional[list[str]] = None) -> requests.Response:
433        """
434        Get information about a dataset.
435
436        **Args:**
437        - dataset_id (str): The ID of the dataset.
438        - info_to_include (list[str], optional): A list of additional information to include. Valid options include:
439        `SCHEMA`, `ACCESS_INFORMATION`, `PROFILE`, `PROPERTIES`, `DATA_PROJECT`, `STORAGE`, `SNAPSHOT_BUILDER_SETTING`.
440        Defaults to None.
441
442        **Returns:**
443        - requests.Response: The response from the request.
444
445        **Raises:**
446        - ValueError: If `info_to_include` contains invalid information types.
447        """
448        acceptable_include_info = [
449            "SCHEMA",
450            "ACCESS_INFORMATION",
451            "PROFILE",
452            "PROPERTIES",
453            "DATA_PROJECT",
454            "STORAGE",
455            "SNAPSHOT_BUILDER_SETTING"
456        ]
457        if info_to_include:
458            if not all(info in acceptable_include_info for info in info_to_include):
459                raise ValueError(f"info_to_include must be a subset of {acceptable_include_info}")
460            include_string = '&include='.join(info_to_include)
461        else:
462            include_string = ""
463        uri = f"{self.TDR_LINK}/datasets/{dataset_id}?include={include_string}"
464        return self.request_util.run_request(uri=uri, method=GET)

Get information about a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • info_to_include (list[str], optional): A list of additional information to include. Valid options include: SCHEMA, ACCESS_INFORMATION, PROFILE, PROPERTIES, DATA_PROJECT, STORAGE, SNAPSHOT_BUILDER_SETTING. Defaults to None.

Returns:

  • requests.Response: The response from the request.

Raises:

  • ValueError: If info_to_include contains invalid information types.
def get_table_schema_info( self, dataset_id: str, table_name: str, dataset_info: Optional[dict] = None) -> Optional[dict]:
466    def get_table_schema_info(
467            self,
468            dataset_id: str,
469            table_name: str,
470            dataset_info: Optional[dict] = None
471    ) -> Union[dict, None]:
472        """
473        Get schema information for a specific table within a dataset.
474
475        **Args:**
476        - dataset_id (str): The ID of the dataset.
477        - table_name (str): The name of the table.
478        - dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.
479
480        **Returns:**
481        - Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
482        """
483        if not dataset_info:
484            dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
485        for table in dataset_info["schema"]["tables"]:  # type: ignore[index]
486            if table["name"] == table_name:
487                return table
488        return None

Get schema information for a specific table within a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the table.
  • dataset_info (dict, optional): The dataset information if already retrieved. Defaults to None.

Returns:

  • Union[dict, None]: A dictionary containing the table schema information, or None if the table is not found.
def get_job_result( self, job_id: str, expect_failure: bool = False) -> requests.models.Response:
490    def get_job_result(self, job_id: str, expect_failure: bool = False) -> requests.Response:
491        """
492        Retrieve the result of a job.
493
494        **Args:**
495        - job_id (str): The ID of the job.
496        - expect_failure (bool, optional): Whether the job is expected to fail. Defaults to `False`.
497
498        **Returns:**
499        - requests.Response: The response from the request.
500        """
501        uri = f"{self.TDR_LINK}/jobs/{job_id}/result"
502        # If job is expected to fail, accept any return code
503        acceptable_return_code = list(range(100, 600)) if expect_failure else []
504        return self.request_util.run_request(uri=uri, method=GET, accept_return_codes=acceptable_return_code)

Retrieve the result of a job.

Args:

  • job_id (str): The ID of the job.
  • expect_failure (bool, optional): Whether the job is expected to fail. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.models.Response:
506    def ingest_to_dataset(self, dataset_id: str, data: dict) -> requests.Response:
507        """
508        Load data into a TDR dataset.
509
510        **Args:**
511        - dataset_id (str): The ID of the dataset.
512        - data (dict): The data to be ingested.
513
514        **Returns:**
515        - requests.Response: The response from the request.
516        """
517        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/ingest"
518        logging.info(
519            "If recently added TDR SA to source bucket/dataset/workspace and you receive a 400/403 error, " +
520            "it can sometimes take up to 12/24 hours for permissions to propagate. Try rerunning the script later.")
521        return self.request_util.run_request(
522            uri=uri,
523            method=POST,
524            content_type="application/json",
525            data=data
526        )

Load data into a TDR dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • data (dict): The data to be ingested.

Returns:

  • requests.Response: The response from the request.
def file_ingest_to_dataset( self, dataset_id: str, profile_id: str, file_list: list[dict], load_tag: str = 'file_ingest_load_tag') -> dict:
528    def file_ingest_to_dataset(
529            self,
530            dataset_id: str,
531            profile_id: str,
532            file_list: list[dict],
533            load_tag: str = "file_ingest_load_tag"
534    ) -> dict:
535        """
536        Load files into a TDR dataset.
537
538        **Args:**
539        - dataset_id (str): The ID of the dataset.
540        - profile_id (str): The billing profile ID.
541        - file_list (list[dict]): The list of files to be ingested.
542        - load_tag (str): The tag to be used in the ingest job. Defaults to `file_ingest_load_tag`.
543
544        **Returns:**
545        - dict: A dictionary containing the response from the ingest operation job monitoring.
546        """
547        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/files/bulk/array"
548        data = {
549            "profileId": profile_id,
550            "loadTag": f"{load_tag}",
551            "maxFailedFileLoads": 0,
552            "loadArray": file_list
553        }
554
555        response = self.request_util.run_request(
556            uri=uri,
557            method=POST,
558            content_type="application/json",
559            data=json.dumps(data)
560        )
561        job_id = response.json()['id']
562        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
563        return job_results  # type: ignore[return-value]

Load files into a TDR dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • profile_id (str): The billing profile ID.
  • file_list (list[dict]): The list of files to be ingested.
  • load_tag (str): The tag to be used in the ingest job. Defaults to file_ingest_load_tag.

Returns:

  • dict: A dictionary containing the response from the ingest operation job monitoring.
def get_dataset_table_metrics( self, dataset_id: str, target_table_name: str, query_limit: int = 1000) -> list[dict]:
565    def get_dataset_table_metrics(
566            self, dataset_id: str, target_table_name: str, query_limit: int = 1000
567    ) -> list[dict]:
568        """
569        Retrieve all metrics for a specific table within a dataset.
570
571        **Args:**
572        - dataset_id (str): The ID of the dataset.
573        - target_table_name (str): The name of the target table.
574        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
575
576        **Returns:**
577        - list[dict]: A list of dictionaries containing the metrics for the specified table.
578        """
579        return [
580            metric
581            for metric in self._yield_dataset_metrics(
582                dataset_id=dataset_id,
583                target_table_name=target_table_name,
584                query_limit=query_limit
585            )
586        ]

Retrieve all metrics for a specific table within a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • target_table_name (str): The name of the target table.
  • query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.

Returns:

  • list[dict]: A list of dictionaries containing the metrics for the specified table.
def get_dataset_sample_ids( self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
624    def get_dataset_sample_ids(self, dataset_id: str, target_table_name: str, entity_id: str) -> list[str]:
625        """
626        Get existing IDs from a dataset.
627
628        **Args:**
629        - dataset_id (str): The ID of the dataset.
630        - target_table_name (str): The name of the target table.
631        - entity_id (str): The entity ID to retrieve.
632
633        **Returns:**
634        - list[str]: A list of entity IDs from the specified table.
635        """
636        dataset_metadata = self._yield_dataset_metrics(dataset_id=dataset_id, target_table_name=target_table_name)
637        return [str(sample_dict[entity_id]) for sample_dict in dataset_metadata]

Get existing IDs from a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • target_table_name (str): The name of the target table.
  • entity_id (str): The entity ID to retrieve.

Returns:

  • list[str]: A list of entity IDs from the specified table.
def get_job_status(self, job_id: str) -> requests.models.Response:
639    def get_job_status(self, job_id: str) -> requests.Response:
640        """
641        Retrieve the status of a job.
642
643        **Args:**
644        - job_id (str): The ID of the job.
645
646        **Returns:**
647        - requests.Response: The response from the request.
648        """
649        uri = f"{self.TDR_LINK}/jobs/{job_id}"
650        return self.request_util.run_request(uri=uri, method=GET)

Retrieve the status of a job.

Args:

  • job_id (str): The ID of the job.

Returns:

  • requests.Response: The response from the request.
def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
652    def get_dataset_file_uuids_from_metadata(self, dataset_id: str) -> list[str]:
653        """
654        Get all file UUIDs from the metadata of a dataset.
655
656        **Args:**
657        - dataset_id (str): The ID of the dataset.
658
659        **Returns:**
660        - list[str]: A list of file UUIDs from the dataset metadata.
661        """
662        dataset_info = self.get_dataset_info(dataset_id=dataset_id, info_to_include=["SCHEMA"]).json()
663        all_metadata_file_uuids = []
664        tables = 0
665        for table in dataset_info["schema"]["tables"]:
666            tables += 1
667            table_name = table["name"]
668            logging.info(f"Getting all file information for {table_name}")
669            # Get just columns where datatype is fileref
670            file_columns = [column["name"] for column in table["columns"] if column["datatype"] == "fileref"]
671            dataset_metrics = self.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=table_name)
672            # Get unique list of file uuids
673            file_uuids = list(
674                set(
675                    [
676                        value for metric in dataset_metrics for key, value in metric.items() if key in file_columns
677                    ]
678                )
679            )
680            logging.info(f"Got {len(file_uuids)} file uuids from table '{table_name}'")
681            all_metadata_file_uuids.extend(file_uuids)
682            # Make full list unique
683            all_metadata_file_uuids = list(set(all_metadata_file_uuids))
684        logging.info(f"Got {len(all_metadata_file_uuids)} file uuids from {tables} total table(s)")
685        return all_metadata_file_uuids

Get all file UUIDs from the metadata of a dataset.

Args:

  • dataset_id (str): The ID of the dataset.

Returns:

  • list[str]: A list of file UUIDs from the dataset metadata.
def soft_delete_entries( self, dataset_id: str, table_name: str, datarepo_row_ids: list[str], check_intervals: int = 15) -> Optional[dict]:
687    def soft_delete_entries(
688            self,
689            dataset_id: str,
690            table_name: str,
691            datarepo_row_ids: list[str],
692            check_intervals: int = 15
693    ) -> Optional[dict]:
694        """
695        Soft delete specific records from a table.
696
697        **Args:**
698        - dataset_id (str): The ID of the dataset.
699        - table_name (str): The name of the target table.
700        - datarepo_row_ids (list[str]): A list of row IDs to be deleted.
701        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
702
703        **Returns:**
704        - dict (optional): A dictionary containing the response from the soft delete operation job
705        monitoring. Returns None if no row IDs are provided.
706        """
707        if not datarepo_row_ids:
708            logging.info(f"No records found to soft delete in table {table_name}")
709            return None
710        logging.info(f"Soft deleting {len(datarepo_row_ids)} records from table {table_name}")
711        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/deletes"
712        payload = {
713            "deleteType": "soft",
714            "specType": "jsonArray",
715            "tables": [
716                {
717                    "tableName": table_name,
718                    "jsonArraySpec": {
719                        "rowIds": datarepo_row_ids
720                    }
721                }
722            ]
723        }
724        response = self.request_util.run_request(
725            method=POST,
726            uri=uri,
727            data=json.dumps(payload),
728            content_type="application/json"
729        )
730        job_id = response.json()["id"]
731        return MonitorTDRJob(tdr=self, job_id=job_id, check_interval=check_intervals, return_json=False).run()

Soft delete specific records from a table.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the target table.
  • datarepo_row_ids (list[str]): A list of row IDs to be deleted.
  • check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to 15.

Returns:

  • dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
def soft_delete_all_table_entries( self, dataset_id: str, table_name: str, query_limit: int = 1000, check_intervals: int = 15) -> Optional[dict]:
733    def soft_delete_all_table_entries(
734            self,
735            dataset_id: str,
736            table_name: str,
737            query_limit: int = 1000,
738            check_intervals: int = 15
739    ) -> Optional[dict]:
740        """
741        Soft deletes all records in a table.
742
743        **Args:**
744        - dataset_id (str): The ID of the dataset.
745        - table_name (str): The name of the target table.
746        - query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
747        - check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to `15`.
748
749        **Returns:**
750        - dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns
751        None if no row IDs are provided.
752        """
753        dataset_metrics = self.get_dataset_table_metrics(
754            dataset_id=dataset_id, target_table_name=table_name, query_limit=query_limit
755        )
756        row_ids = [metric["datarepo_row_id"] for metric in dataset_metrics]
757        return self.soft_delete_entries(
758            dataset_id=dataset_id,
759            table_name=table_name,
760            datarepo_row_ids=row_ids,
761            check_intervals=check_intervals
762        )

Soft deletes all records in a table.

Args:

  • dataset_id (str): The ID of the dataset.
  • table_name (str): The name of the target table.
  • query_limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.
  • check_intervals (int, optional): The interval in seconds to wait between status checks. Defaults to 15.

Returns:

  • dict (optional): A dictionary containing the response from the soft delete operation job monitoring. Returns None if no row IDs are provided.
def get_or_create_dataset( self, dataset_name: str, billing_profile: str, schema: dict, description: str, delete_existing: bool = False, continue_if_exists: bool = False, additional_properties_dict: Optional[dict] = None) -> str:
764    def get_or_create_dataset(
765            self,
766            dataset_name: str,
767            billing_profile: str,
768            schema: dict,
769            description: str,
770            delete_existing: bool = False,
771            continue_if_exists: bool = False,
772            additional_properties_dict: Optional[dict] = None
773    ) -> str:
774        """
775        Get or create a dataset.
776
777        **Args:**
778        - dataset_name (str): The name of the dataset.
779        - billing_profile (str): The billing profile ID.
780        - schema (dict): The schema of the dataset.
781        - description (str): The description of the dataset.
782        - additional_properties_dict (Optional[dict], optional): Additional properties
783                for the dataset. Defaults to None.
784        - delete_existing (bool, optional): Whether to delete the existing dataset if found.
785                Defaults to `False`.
786        - continue_if_exists (bool, optional): Whether to continue if the dataset already exists.
787                Defaults to `False`.
788
789        **Returns:**
790        - str: The ID of the dataset.
791
792        **Raises:**
793        - ValueError: If multiple datasets with the same name are found under the billing profile.
794        """
795        existing_datasets = self.check_if_dataset_exists(dataset_name, billing_profile)
796        if existing_datasets:
797            if not continue_if_exists:
798                raise ValueError(
799                    f"Run with continue_if_exists=True to use the existing dataset {dataset_name}"
800                )
801            # If delete_existing is True, delete the existing dataset and set existing_datasets to an empty list
802            if delete_existing:
803                logging.info(f"Deleting existing dataset {dataset_name}")
804                self.delete_dataset(existing_datasets[0]["id"])
805                existing_datasets = []
806            # If not delete_existing and continue_if_exists then grab existing datasets id
807            else:
808                dataset_id = existing_datasets[0]["id"]
809        if not existing_datasets:
810            logging.info("Did not find existing dataset")
811            # Create dataset
812            dataset_id = self.create_dataset(
813                schema=schema,
814                dataset_name=dataset_name,
815                description=description,
816                profile_id=billing_profile,
817                additional_dataset_properties=additional_properties_dict
818            )
819        return dataset_id

Get or create a dataset.

Args:

  • dataset_name (str): The name of the dataset.
  • billing_profile (str): The billing profile ID.
  • schema (dict): The schema of the dataset.
  • description (str): The description of the dataset.
  • additional_properties_dict (Optional[dict], optional): Additional properties for the dataset. Defaults to None.
  • delete_existing (bool, optional): Whether to delete the existing dataset if found. Defaults to False.
  • continue_if_exists (bool, optional): Whether to continue if the dataset already exists. Defaults to False.

Returns:

  • str: The ID of the dataset.

Raises:

  • ValueError: If multiple datasets with the same name are found under the billing profile.
def create_dataset( self, schema: dict, dataset_name: str, description: str, profile_id: str, additional_dataset_properties: Optional[dict] = None) -> Optional[str]:
821    def create_dataset(  # type: ignore[return]
822            self,
823            schema: dict,
824            dataset_name: str,
825            description: str,
826            profile_id: str,
827            additional_dataset_properties: Optional[dict] = None
828    ) -> Optional[str]:
829        """
830        Create a new dataset.
831
832        **Args:**
833        - schema (dict): The schema of the dataset.
834        - dataset_name (str): The name of the dataset.
835        - description (str): The description of the dataset.
836        - profile_id (str): The billing profile ID.
837        - additional_dataset_properties (Optional[dict], optional): Additional
838                properties for the dataset. Defaults to None.
839
840        **Returns:**
841        - Optional[str]: The ID of the created dataset, or None if creation failed.
842
843        Raises:
844        - ValueError: If the schema validation fails.
845        """
846        dataset_properties = {
847            "name": dataset_name,
848            "description": description,
849            "defaultProfileId": profile_id,
850            "region": "us-central1",
851            "cloudPlatform": GCP,
852            "schema": schema
853        }
854
855        if additional_dataset_properties:
856            dataset_properties.update(additional_dataset_properties)
857        try:
858            CreateDatasetSchema(**dataset_properties)  # type: ignore[arg-type]
859        except ValidationError as e:
860            raise ValueError(f"Schema validation error: {e}")
861        uri = f"{self.TDR_LINK}/datasets"
862        logging.info(f"Creating dataset {dataset_name} under billing profile {profile_id}")
863        response = self.request_util.run_request(
864            method=POST,
865            uri=uri,
866            data=json.dumps(dataset_properties),
867            content_type="application/json"
868        )
869        job_id = response.json()["id"]
870        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
871        dataset_id = job_results["id"]  # type: ignore[index]
872        logging.info(f"Successfully created dataset {dataset_name}: {dataset_id}")
873        return dataset_id

Create a new dataset.

Args:

  • schema (dict): The schema of the dataset.
  • dataset_name (str): The name of the dataset.
  • description (str): The description of the dataset.
  • profile_id (str): The billing profile ID.
  • additional_dataset_properties (Optional[dict], optional): Additional properties for the dataset. Defaults to None.

Returns:

  • Optional[str]: The ID of the created dataset, or None if creation failed.

Raises:

  • ValueError: If the schema validation fails.
def update_dataset_schema( self, dataset_id: str, update_note: str, tables_to_add: Optional[list[dict]] = None, relationships_to_add: Optional[list[dict]] = None, columns_to_add: Optional[list[dict]] = None) -> Optional[str]:
875    def update_dataset_schema(  # type: ignore[return]
876            self,
877            dataset_id: str,
878            update_note: str,
879            tables_to_add: Optional[list[dict]] = None,
880            relationships_to_add: Optional[list[dict]] = None,
881            columns_to_add: Optional[list[dict]] = None
882    ) -> Optional[str]:
883        """
884        Update the schema of a dataset.
885
886        **Args:**
887        - dataset_id (str): The ID of the dataset.
888        - update_note (str): A note describing the update.
889        - tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
890        - relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
891        - columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.
892
893        **Returns:**
894        - Optional[str]: The ID of the updated dataset, or None if the update failed.
895
896        **Raises:**
897        - ValueError: If the schema validation fails.
898        """
899        uri = f"{self.TDR_LINK}/datasets/{dataset_id}/updateSchema"
900        request_body: dict = {"description": f"{update_note}", "changes": {}}
901        if tables_to_add:
902            request_body["changes"]["addTables"] = tables_to_add
903        if relationships_to_add:
904            request_body["changes"]["addRelationships"] = relationships_to_add
905        if columns_to_add:
906            request_body["changes"]["addColumns"] = columns_to_add
907        try:
908            UpdateSchema(**request_body)
909        except ValidationError as e:
910            raise ValueError(f"Schema validation error: {e}")
911
912        response = self.request_util.run_request(
913            uri=uri,
914            method=POST,
915            content_type="application/json",
916            data=json.dumps(request_body)
917        )
918        job_id = response.json()["id"]
919        job_results = MonitorTDRJob(tdr=self, job_id=job_id, check_interval=30, return_json=True).run()
920        dataset_id = job_results["id"]  # type: ignore[index]
921        logging.info(f"Successfully ran schema updates in dataset {dataset_id}")
922        return dataset_id

Update the schema of a dataset.

Args:

  • dataset_id (str): The ID of the dataset.
  • update_note (str): A note describing the update.
  • tables_to_add (list[dict], optional): A list of tables to add. Defaults to None.
  • relationships_to_add (list[dict], optional): A list of relationships to add. Defaults to None.
  • columns_to_add (list[dict], optional): A list of columns to add. Defaults to None.

Returns:

  • Optional[str]: The ID of the updated dataset, or None if the update failed.

Raises:

  • ValueError: If the schema validation fails.
def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
958    def get_files_from_snapshot(self, snapshot_id: str, limit: int = 1000) -> list[dict]:
959        """
960        Return all the metadata about files in a given snapshot.
961
962        Not all files can be returned at once, so the API
963        is used repeatedly until all "batches" have been returned.
964
965        **Args:**
966        - snapshot_id (str): The ID of the snapshot.
967        - limit (int, optional): The maximum number of records to retrieve per batch. Defaults to `1000`.
968
969        **Returns:**
970        - list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
971        """
972        uri = f"{self.TDR_LINK}/snapshots/{snapshot_id}/files"
973        return self._get_response_from_batched_endpoint(uri=uri, limit=limit)

Return all the metadata about files in a given snapshot.

Not all files can be returned at once, so the API is used repeatedly until all "batches" have been returned.

Args:

  • snapshot_id (str): The ID of the snapshot.
  • limit (int, optional): The maximum number of records to retrieve per batch. Defaults to 1000.

Returns:

  • list[dict]: A list of dictionaries containing the metadata of the files in the snapshot.
def get_dataset_snapshots(self, dataset_id: str) -> requests.models.Response:
975    def get_dataset_snapshots(self, dataset_id: str) -> requests.Response:
976        """
977        Return snapshots belonging to specified dataset.
978
979        **Args:**
980        - dataset_id: uuid of dataset to query.
981
982        **Returns:**
983        - requests.Response: The response from the request.
984        """
985        uri = f"{self.TDR_LINK}/snapshots?datasetIds={dataset_id}"
986        return self.request_util.run_request(
987            uri=uri,
988            method=GET
989        )

Return snapshots belonging to specified dataset.

Args:

  • dataset_id: uuid of dataset to query.

Returns:

  • requests.Response: The response from the request.
class FilterOutSampleIdsAlreadyInDataset:
 992class FilterOutSampleIdsAlreadyInDataset:
 993    """Class to filter ingest metrics to remove sample IDs that already exist in the dataset."""
 994
 995    def __init__(
 996            self,
 997            ingest_metrics: list[dict],
 998            dataset_id: str,
 999            tdr: TDR,
1000            target_table_name: str,
1001            filter_entity_id: str
1002    ):
1003        """
1004        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1005
1006        **Args:**
1007        - ingest_metrics (list[dict]): The metrics to be ingested.
1008        - dataset_id (str): The ID of the dataset.
1009        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1010        - target_table_name (str): The name of the target table.
1011        - filter_entity_id (str): The entity ID to filter on.
1012        """
1013        self.ingest_metrics = ingest_metrics
1014        """@private"""
1015        self.tdr = tdr
1016        """@private"""
1017        self.dataset_id = dataset_id
1018        """@private"""
1019        self.target_table_name = target_table_name
1020        """@private"""
1021        self.filter_entity_id = filter_entity_id
1022        """@private"""
1023
1024    def run(self) -> list[dict]:
1025        """
1026        Run the filter process to remove sample IDs that already exist in the dataset.
1027
1028        **Returns:**
1029        - list[dict]: The filtered ingest metrics.
1030        """
1031        # Get all sample ids that already exist in dataset
1032        logging.info(
1033            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1034            f"dataset {self.dataset_id}"
1035        )
1036
1037        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1038            dataset_id=self.dataset_id,
1039            target_table_name=self.target_table_name,
1040            entity_id=self.filter_entity_id
1041        )
1042        # Filter out rows that already exist in dataset
1043        filtered_ingest_metrics = [
1044            row
1045            for row in self.ingest_metrics
1046            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1047        ]
1048        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1049            logging.info(
1050                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1051                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1052            )
1053
1054            if filtered_ingest_metrics:
1055                return filtered_ingest_metrics
1056            else:
1057                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1058                return []
1059        else:
1060            logging.info("No rows were filtered out as they all do not exist in dataset")
1061            return filtered_ingest_metrics

Class to filter ingest metrics to remove sample IDs that already exist in the dataset.

FilterOutSampleIdsAlreadyInDataset( ingest_metrics: list[dict], dataset_id: str, tdr: TDR, target_table_name: str, filter_entity_id: str)
 995    def __init__(
 996            self,
 997            ingest_metrics: list[dict],
 998            dataset_id: str,
 999            tdr: TDR,
1000            target_table_name: str,
1001            filter_entity_id: str
1002    ):
1003        """
1004        Initialize the FilterOutSampleIdsAlreadyInDataset class.
1005
1006        **Args:**
1007        - ingest_metrics (list[dict]): The metrics to be ingested.
1008        - dataset_id (str): The ID of the dataset.
1009        - tdr (`ops_utils.tdr_utils.tdr_utils.TDR`): The TDR instance
1010        - target_table_name (str): The name of the target table.
1011        - filter_entity_id (str): The entity ID to filter on.
1012        """
1013        self.ingest_metrics = ingest_metrics
1014        """@private"""
1015        self.tdr = tdr
1016        """@private"""
1017        self.dataset_id = dataset_id
1018        """@private"""
1019        self.target_table_name = target_table_name
1020        """@private"""
1021        self.filter_entity_id = filter_entity_id
1022        """@private"""

Initialize the FilterOutSampleIdsAlreadyInDataset class.

Args:

  • ingest_metrics (list[dict]): The metrics to be ingested.
  • dataset_id (str): The ID of the dataset.
  • tdr (ops_utils.tdr_utils.tdr_utils.TDR): The TDR instance
  • target_table_name (str): The name of the target table.
  • filter_entity_id (str): The entity ID to filter on.
def run(self) -> list[dict]:
1024    def run(self) -> list[dict]:
1025        """
1026        Run the filter process to remove sample IDs that already exist in the dataset.
1027
1028        **Returns:**
1029        - list[dict]: The filtered ingest metrics.
1030        """
1031        # Get all sample ids that already exist in dataset
1032        logging.info(
1033            f"Getting all {self.filter_entity_id} that already exist in table {self.target_table_name} in "
1034            f"dataset {self.dataset_id}"
1035        )
1036
1037        dataset_sample_ids = self.tdr.get_dataset_sample_ids(
1038            dataset_id=self.dataset_id,
1039            target_table_name=self.target_table_name,
1040            entity_id=self.filter_entity_id
1041        )
1042        # Filter out rows that already exist in dataset
1043        filtered_ingest_metrics = [
1044            row
1045            for row in self.ingest_metrics
1046            if str(row[self.filter_entity_id]) not in dataset_sample_ids
1047        ]
1048        if len(filtered_ingest_metrics) < len(self.ingest_metrics):
1049            logging.info(
1050                f"Filtered out {len(self.ingest_metrics) - len(filtered_ingest_metrics)} rows that already exist in "
1051                f"dataset. There is {len(filtered_ingest_metrics)} rows left to ingest"
1052            )
1053
1054            if filtered_ingest_metrics:
1055                return filtered_ingest_metrics
1056            else:
1057                logging.info("All rows filtered out as they all exist in dataset, nothing to ingest")
1058                return []
1059        else:
1060            logging.info("No rows were filtered out as they all do not exist in dataset")
1061            return filtered_ingest_metrics

Run the filter process to remove sample IDs that already exist in the dataset.

Returns:

  • list[dict]: The filtered ingest metrics.