ops_utils.terra_util

Utilities for working with Terra.

   1"""Utilities for working with Terra."""
   2import json
   3import logging
   4import re
   5from typing import Any, Optional
   6import requests
   7import time
   8import zipfile
   9import os
  10
  11from . import deprecated
  12from .vars import GCP, APPLICATION_JSON
  13from .gcp_utils import GCPCloudFunctions
  14from .request_util import GET, POST, PATCH, PUT, DELETE, RunRequest
  15
  16# Constants for Terra API links
  17TERRA_DEV_LINK = "https://firecloud-orchestration.dsde-dev.broadinstitute.org/api"
  18"""@private"""
  19TERRA_PROD_LINK = "https://api.firecloud.org/api"
  20"""@private"""
  21LEONARDO_LINK = "https://leonardo.dsde-prod.broadinstitute.org/api"
  22"""@private"""
  23WORKSPACE_LINK = "https://workspace.dsde-prod.broadinstitute.org/api/workspaces/v1"
  24"""@private"""
  25SAM_LINK = "https://sam.dsde-prod.broadinstitute.org/api"
  26"""@private"""
  27RAWLS_LINK = "https://rawls.dsde-prod.broadinstitute.org/api"
  28"""@private"""
  29
  30MEMBER = "member"
  31ADMIN = "admin"
  32
  33
  34class Terra:
  35    """Class for generic Terra utilities."""
  36
  37    def __init__(self, request_util: RunRequest, env: str = "prod"):
  38        """
  39        Initialize the Terra class.
  40
  41        **Args:**
  42        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
  43            request utility class to handle HTTP requests.
  44        """
  45        self.request_util = request_util
  46        """@private"""
  47
  48    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
  49        """
  50        Fetch the list of accessible workspaces.
  51
  52        **Args:**
  53        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
  54
  55        **Returns:**
  56        - requests.Response: The response from the request.
  57        """
  58        fields_str = "fields=" + ",".join(fields) if fields else ""
  59        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
  60        return self.request_util.run_request(
  61            uri=url,
  62            method=GET
  63        )
  64
  65    def get_pet_account_json(self) -> requests.Response:
  66        """
  67        Get the service account JSON.
  68
  69        **Returns:**
  70        - requests.Response: The response from the request.
  71        """
  72        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
  73        return self.request_util.run_request(
  74            uri=url,
  75            method=GET
  76        )
  77
  78
  79class TerraGroups:
  80    """A class to manage Terra groups and their memberships."""
  81
  82    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
  83    """@private"""
  84    CONFLICT_STATUS_CODE = 409
  85    """@private"""
  86
  87    def __init__(self, request_util: RunRequest):
  88        """
  89        Initialize the TerraGroups class.
  90
  91        **Args:**
  92        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
  93         utility class to handle HTTP requests.
  94        """
  95        self.request_util = request_util
  96        """@private"""
  97
  98    def _check_role(self, role: str) -> None:
  99        """
 100        Check if the role is valid.
 101
 102        Args:
 103            role (str): The role to check.
 104
 105        Raises:
 106            ValueError: If the role is not one of the allowed options.
 107        """
 108        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
 109            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
 110
 111    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
 112        """
 113        Remove a user from a group.
 114
 115        **Args:**
 116        - group (str): The name of the group.
 117        - email (str): The email of the user to remove.
 118        - role (str): The role of the user in the group
 119            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
 120
 121        **Returns:**
 122        - requests.Response: The response from the request.
 123        """
 124        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
 125        self._check_role(role)
 126        logging.info(f"Removing {email} from group {group}")
 127        return self.request_util.run_request(
 128            uri=url,
 129            method=DELETE
 130        )
 131
 132    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
 133        """
 134        Create a new group.
 135
 136        **Args:**
 137        - group_name (str): The name of the group to create.
 138        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
 139
 140        **Returns:**
 141        - requests.Response: The response from the request.
 142        """
 143        url = f"{SAM_LINK}/groups/v1/{group_name}"
 144        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 145        response = self.request_util.run_request(
 146            uri=url,
 147            method=POST,
 148            accept_return_codes=accept_return_codes
 149        )
 150        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
 151            logging.info(f"Group {group_name} already exists. Continuing.")
 152            return response
 153        else:
 154            logging.info(f"Created group {group_name}")
 155            return response
 156
 157    def delete_group(self, group_name: str) -> requests.Response:
 158        """
 159        Delete a group.
 160
 161        **Args:**
 162        - group_name (str): The name of the group to delete.
 163
 164        **Returns:**
 165        - requests.Response: The response from the request.
 166        """
 167        url = f"{SAM_LINK}/groups/v1/{group_name}"
 168        logging.info(f"Deleting group {group_name}")
 169        return self.request_util.run_request(
 170            uri=url,
 171            method=DELETE
 172        )
 173
 174    def add_user_to_group(
 175            self, group: str, email: str, role: str, continue_if_exists: bool = False
 176    ) -> requests.Response:
 177        """
 178        Add a user to a group.
 179
 180        **Args:**
 181        - group (str): The name of the group.
 182        - email (str): The email of the user to add.
 183        - role (str): The role of the user in the group
 184            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
 185        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
 186                Defaults to `False`.
 187
 188        **Returns:**
 189        - requests.Response: The response from the request.
 190        """
 191        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
 192        self._check_role(role)
 193        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 194        logging.info(f"Adding {email} to group {group} as {role}")
 195        return self.request_util.run_request(
 196            uri=url,
 197            method=PUT,
 198            accept_return_codes=accept_return_codes
 199        )
 200
 201
 202class TerraWorkspace:
 203    """Terra workspace class to manage workspaces and their attributes."""
 204
 205    CONFLICT_STATUS_CODE = 409
 206    """@private"""
 207
 208    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
 209        """
 210        Initialize the TerraWorkspace class.
 211
 212        **Args:**
 213        - billing_project (str): The billing project associated with the workspace.
 214        - workspace_name (str): The name of the workspace.
 215        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
 216            request utility class to handle HTTP requests.
 217        """
 218        self.billing_project = billing_project
 219        """@private"""
 220        self.workspace_name = workspace_name
 221        """@private"""
 222        self.workspace_id = None
 223        """@private"""
 224        self.resource_id = None
 225        """@private"""
 226        self.storage_container = None
 227        """@private"""
 228        self.bucket = None
 229        """@private"""
 230        self.wds_url = None
 231        """@private"""
 232        self.account_url: Optional[str] = None
 233        """@private"""
 234        self.request_util = request_util
 235        """@private"""
 236        if env.lower() == "dev":
 237            self.terra_link = TERRA_DEV_LINK
 238            """@private"""
 239        elif env.lower() == "prod":
 240            self.terra_link = TERRA_PROD_LINK
 241            """@private"""
 242        else:
 243            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
 244
 245    def __repr__(self) -> str:
 246        """
 247        Return a string representation of the TerraWorkspace instance.
 248
 249        Returns:
 250            str: The string representation of the TerraWorkspace instance.
 251        """
 252        return f"{self.billing_project}/{self.workspace_name}"
 253
 254    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any:
 255        """
 256        Yield all entity metrics from the workspace.
 257
 258        Args:
 259            entity (str): The type of entity to query.
 260            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
 261
 262        Yields:
 263            Any: The JSON response containing entity metrics.
 264        """
 265        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
 266        response = self.request_util.run_request(
 267            uri=url,
 268            method=GET,
 269            content_type=APPLICATION_JSON
 270        )
 271        raw_text = response.text
 272        first_page_json = json.loads(
 273            raw_text,
 274            parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 275        )
 276        yield first_page_json
 277        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
 278        logging.info(
 279            f"Looping through {total_pages} pages of data")
 280
 281        for page in range(2, total_pages + 1):
 282            logging.info(f"Getting page {page} of {total_pages}")
 283            next_page = self.request_util.run_request(
 284                uri=url,
 285                method=GET,
 286                content_type=APPLICATION_JSON,
 287                params={"page": page}
 288            )
 289            raw_text = next_page.text
 290            page_json = json.loads(
 291                raw_text,
 292                parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 293            )
 294            yield page_json
 295
 296    @staticmethod
 297    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
 298        """Check that all headers follow the standards required by TDR.
 299
 300        **Args:**
 301        - table_name (str): The name of the Terra table.
 302        - headers (list[str]): The headers of the Terra table to validate.
 303
 304        **Raises:**
 305        - ValueError if any headers are considered invalid by TDR standards
 306        """
 307        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
 308        tdr_max_header_length = 63
 309
 310        headers_containing_too_many_characters = []
 311        headers_contain_invalid_characters = []
 312
 313        for header in headers:
 314            if len(header) > tdr_max_header_length:
 315                headers_containing_too_many_characters.append(header)
 316            if not re.match(tdr_header_allowed_pattern, header):
 317                headers_contain_invalid_characters.append(header)
 318
 319        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
 320        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
 321        header naming."""
 322        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
 323        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
 324        allowed in TDR is {tdr_max_header_length}.\n"""
 325        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
 326        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
 327        only contain numbers, letters, and underscore characters.\n"""
 328
 329        error_to_report = ""
 330        if headers_containing_too_many_characters:
 331            error_to_report += too_many_characters_error_message
 332        if headers_contain_invalid_characters:
 333            error_to_report += invalid_characters_error_message
 334        if error_to_report:
 335            error_to_report += base_error_message
 336            raise ValueError(error_to_report)
 337
 338    def get_workspace_info(self) -> requests.Response:
 339        """
 340        Get workspace information.
 341
 342        **Returns:**
 343        - requests.Response: The response from the request.
 344        """
 345        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
 346        logging.info(
 347            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
 348        return self.request_util.run_request(uri=url, method=GET)
 349
 350    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
 351        """
 352        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
 353
 354        **Args:**
 355        - entity_type (str): The type of entity to get metrics for.
 356        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
 357
 358        **Returns:**
 359        - list[dict]: A list of dictionaries containing entity metrics.
 360        """
 361        results = []
 362        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
 363
 364        for page in self._yield_all_entity_metrics(entity=entity_type):
 365            results.extend(page["results"])
 366
 367        # If remove_dicts is True, remove dictionaries from the workspace metrics
 368        if remove_dicts:
 369            for row in results:
 370                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
 371        return results
 372
 373    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
 374        """
 375        Get specific entity metrics for a given entity type and name.
 376
 377        **Args:**
 378        - entity_type (str): The type of entity to get metrics for.
 379        - entity_name (str): The name of the entity to get metrics for.
 380
 381        **Returns:**
 382        - requests.Response: The response from the request.
 383        """
 384        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
 385        return self.request_util.run_request(uri=url, method=GET)
 386
 387    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
 388        """
 389        Remove dictionaries from the attributes.
 390
 391        Args:
 392            attributes (dict): The attributes to remove dictionaries from.
 393
 394        Returns:
 395            dict: The updated attributes with no dictionaries.
 396        """
 397        for key, value in attributes.items():
 398            attributes[key] = self._remove_dict_from_cell(value)
 399        return attributes
 400
 401    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
 402        """
 403        Remove a dictionary from a cell.
 404
 405        Args:
 406            cell_value (Any): The dictionary to remove.
 407
 408        Returns:
 409            Any: The updated cell with no dictionaries.
 410        """
 411        if isinstance(cell_value, dict):
 412            entity_name = cell_value.get("entityName")
 413            # If the cell value is a dictionary, check if it has an entityName key
 414            if entity_name:
 415                # If the cell value is a dictionary with an entityName key, return the entityName
 416                return entity_name
 417            entity_list = cell_value.get("items")
 418            if entity_list or entity_list == []:
 419                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
 420                return [
 421                    self._remove_dict_from_cell(entity) for entity in entity_list
 422                ]
 423            return cell_value
 424        return cell_value
 425
 426    def get_workspace_bucket(self) -> str:
 427        """
 428        Get the workspace bucket name. Does not include the `gs://` prefix.
 429
 430        **Returns:**
 431        - str: The bucket name.
 432        """
 433        return self.get_workspace_info().json()["workspace"]["bucketName"]
 434
 435    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
 436        """
 437        Get workspace entity information.
 438
 439        **Args:**
 440        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
 441
 442        **Returns:**
 443        - requests.Response: The response from the request.
 444        """
 445        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
 446        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
 447        return self.request_util.run_request(uri=url, method=GET)
 448
 449    def get_workspace_acl(self) -> requests.Response:
 450        """
 451        Get the workspace access control list (ACL).
 452
 453        **Returns:**
 454        - requests.Response: The response from the request.
 455        """
 456        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
 457        return self.request_util.run_request(
 458            uri=url,
 459            method=GET
 460        )
 461
 462    def update_user_acl(
 463            self,
 464            email: str,
 465            access_level: str,
 466            can_share: bool = False,
 467            can_compute: bool = False,
 468            invite_users_not_found: bool = False,
 469    ) -> requests.Response:
 470        """
 471        Update the access control list (ACL) for a user in the workspace.
 472
 473        **Args:**
 474        - email (str): The email of the user.
 475        - access_level (str): The access level to grant to the user.
 476        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
 477        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
 478        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 479                the workspace. Defaults to `False`
 480
 481        **Returns:**
 482        - requests.Response: The response from the request.
 483        """
 484        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 485              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 486        payload = {
 487            "email": email,
 488            "accessLevel": access_level,
 489            "canShare": can_share,
 490            "canCompute": can_compute,
 491        }
 492        logging.info(
 493            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
 494        response = self.request_util.run_request(
 495            uri=url,
 496            method=PATCH,
 497            content_type=APPLICATION_JSON,
 498            data="[" + json.dumps(payload) + "]"
 499        )
 500
 501        if response.json()["usersNotFound"] and not invite_users_not_found:
 502            # Will be a list of one user
 503            user_not_found = response.json()["usersNotFound"][0]
 504            raise Exception(
 505                f'The user {user_not_found["email"]} was not found and access was not updated'
 506            )
 507        return response
 508
 509    @deprecated(
 510        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
 511    )
 512    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
 513        """
 514        Update the metadata for a library dataset.
 515
 516        **Args:**
 517        - library_metadata (dict): The metadata to update.
 518        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
 519
 520        **Returns:**
 521        - requests.Response: The response from the request.
 522        """
 523        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
 524              f"/metadata?validate={str(validate).lower()}"
 525        return self.request_util.run_request(
 526            uri=acl,
 527            method=PUT,
 528            data=json.dumps(library_metadata)
 529        )
 530
 531    def update_multiple_users_acl(
 532            self, acl_list: list[dict], invite_users_not_found: bool = False
 533    ) -> requests.Response:
 534        """
 535        Update the access control list (ACL) for multiple users in the workspace.
 536
 537        **Args:**
 538        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
 539        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 540                the workspace. Defaults to `False`
 541
 542        **Returns:**
 543        - requests.Response: The response from the request.
 544        """
 545        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 546            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 547        logging.info(
 548            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
 549        response = self.request_util.run_request(
 550            uri=url,
 551            method=PATCH,
 552            content_type=APPLICATION_JSON,
 553            data=json.dumps(acl_list)
 554        )
 555
 556        if response.json()["usersNotFound"] and not invite_users_not_found:
 557            # Will be a list of one user
 558            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
 559            raise Exception(
 560                f"The following users were not found and access was not updated: {users_not_found}"
 561            )
 562        return response
 563
 564    def create_workspace(
 565            self,
 566            auth_domain: list[dict] = [],
 567            attributes: dict = {},
 568            continue_if_exists: bool = False,
 569    ) -> requests.Response:
 570        """
 571        Create a new workspace in Terra.
 572
 573        **Args:**
 574        - auth_domain (list[dict], optional): A list of authorization domains. Should look
 575                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
 576        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
 577        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
 578
 579        **Returns:**
 580        - requests.Response: The response from the request.
 581        """
 582        payload = {
 583            "namespace": self.billing_project,
 584            "name": self.workspace_name,
 585            "authorizationDomain": auth_domain,
 586            "attributes": attributes,
 587            "cloudPlatform": GCP
 588        }
 589        # If workspace already exists then continue if exists
 590        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 591        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
 592        response = self.request_util.run_request(
 593            uri=f"{self.terra_link}/workspaces",
 594            method=POST,
 595            content_type=APPLICATION_JSON,
 596            data=json.dumps(payload),
 597            accept_return_codes=accept_return_codes
 598        )
 599        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
 600            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
 601        return response
 602
 603    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
 604        """
 605        Create an ingest dictionary for workspace attributes.
 606
 607        **Args:**
 608        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
 609
 610        **Returns:**
 611        - list[dict]: A list of dictionaries containing the workspace attributes.
 612        """
 613        # If not provided then call API to get it
 614        workspace_attributes = (
 615            workspace_attributes if workspace_attributes
 616            else self.get_workspace_info().json()["workspace"]["attributes"]
 617        )
 618
 619        ingest_dict = []
 620        for key, value in workspace_attributes.items():
 621            # If value is dict just use 'items' as value
 622            if isinstance(value, dict):
 623                value = value.get("items")
 624            # If value is list convert to comma separated string
 625            if isinstance(value, list):
 626                value = ", ".join(value)
 627            ingest_dict.append(
 628                {
 629                    "attribute": key,
 630                    "value": str(value) if value else None
 631                }
 632            )
 633        return ingest_dict
 634
 635    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
 636        """
 637        Upload metadata to the workspace table.
 638
 639        **Args:**
 640        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
 641
 642        **Returns:**
 643        - requests.Response: The response from the request.
 644        """
 645        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
 646        data = {"entities": open(entities_tsv, "rb")}
 647        return self.request_util.upload_file(
 648            uri=endpoint,
 649            data=data
 650        )
 651
 652    def get_workspace_workflows(self) -> requests.Response:
 653        """
 654        Get the workflows for the workspace.
 655
 656        **Returns:**
 657        - requests.Response: The response from the request.
 658        """
 659        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
 660        return self.request_util.run_request(
 661            uri=uri,
 662            method=GET
 663        )
 664
 665    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
 666        """
 667        Import a workflow into the workspace.
 668
 669        **Args:**
 670        - workflow_dict (dict): The dictionary containing the workflow information.
 671        - continue_if_exists (bool, optional): Whether to continue if the workflow
 672                already exists. Defaults to `False`.
 673
 674        **Returns:**
 675        - requests.Response: The response from the request.
 676        """
 677        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
 678        workflow_json = json.dumps(workflow_dict)
 679        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 680        return self.request_util.run_request(
 681            uri=uri,
 682            method=POST,
 683            data=workflow_json,
 684            content_type=APPLICATION_JSON,
 685            accept_return_codes=accept_return_codes
 686        )
 687
 688    def delete_workspace(self) -> requests.Response:
 689        """
 690        Delete a Terra workspace.
 691
 692        **Returns:**
 693        - requests.Response: The response from the request.
 694        """
 695        return self.request_util.run_request(
 696            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
 697            method=DELETE
 698        )
 699
 700    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
 701        """
 702        Update the attributes for the workspace.
 703
 704        **Args:**
 705        - attributes (dict): The attributes to update.
 706
 707        **Returns:**
 708        - requests.Response: The response from the request.
 709        """
 710        return self.request_util.run_request(
 711            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
 712            method=PATCH,
 713            data=json.dumps(attributes),
 714            content_type=APPLICATION_JSON
 715        )
 716
 717    def leave_workspace(
 718            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
 719    ) -> requests.Response:
 720        """
 721        Leave a workspace. If workspace ID not supplied, will look it up.
 722
 723        **Args:**
 724        - workspace_id (str, optional): The workspace ID. Defaults to None.
 725        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
 726             Defaults to `False`.
 727
 728        **Returns:**
 729        - requests.Response: The response from the request.
 730        """
 731        if not workspace_id:
 732            workspace_info = self.get_workspace_info().json()
 733            workspace_id = workspace_info['workspace']['workspaceId']
 734        accepted_return_code = [403] if ignore_direct_access_error else []
 735
 736        res = self.request_util.run_request(
 737            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
 738            method=DELETE,
 739            accept_return_codes=accepted_return_code
 740        )
 741        if (res.status_code == 403
 742                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
 743            logging.info(
 744                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
 745                f"access to the workspace (they could be an owner on the billing project)"
 746            )
 747        return res
 748
 749    def change_workspace_public_setting(self, public: bool) -> requests.Response:
 750        """
 751        Change a workspace's public setting.
 752
 753        **Args:**
 754        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
 755         public, `False` otherwise.
 756
 757        **Returns:**
 758        - requests.Response: The response from the request.
 759        """
 760        body = [
 761            {
 762                "settingType": "PubliclyReadable",
 763                "config": {
 764                    "enabled": public
 765                }
 766            }
 767        ]
 768        return self.request_util.run_request(
 769            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
 770            method=PUT,
 771            content_type=APPLICATION_JSON,
 772            data=json.dumps(body)
 773        )
 774
 775    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
 776        """
 777        Check if a workspace is public.
 778
 779        **Args:**
 780        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
 781        it up if not provided. Defaults to None.
 782
 783        **Returns:**
 784        - requests.Response: The response from the request.
 785        """
 786        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
 787        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
 788        return self.request_util.run_request(
 789            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
 790            method=GET
 791        )
 792
 793    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
 794        """Delete an entire entity table from a Terra workspace.
 795
 796        **Args:**
 797        - entity_to_delete (str): The name of the entity table to delete.
 798
 799        **Returns:**
 800        - requests.Response: The response from the request.
 801        """
 802        response = self.request_util.run_request(
 803            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
 804            method=DELETE
 805        )
 806        if response.status_code == 204:
 807            logging.info(
 808                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
 809                f"'{self.billing_project}/{self.workspace_name}'"
 810            )
 811        else:
 812            logging.error(
 813                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
 814                f"table: {response.text}"
 815            )
 816        return response
 817
 818    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
 819        """Save an entity table version in a Terra workspace.
 820
 821        **Args:**
 822        - entity_type (str): The name of the entity table to save a new version for
 823        - version_name (str): The name of the new version
 824        """
 825        # Get the workspace metrics
 826        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
 827        file_name = f"{entity_type}.json"
 828        # Write the workspace metrics to a JSON file
 829        with open(file_name, "w") as json_file:
 830            json.dump(workspace_metrics, json_file)
 831
 832        # Create a zip file with the same naming convention that Terra backend uses
 833        timestamp_ms = int(time.time() * 1000)
 834        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
 835        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
 836            zipf.write(file_name, arcname=f"json/{file_name}")
 837
 838        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
 839        workspace_info = self.get_workspace_info().json()
 840        path_to_upload_to = os.path.join(
 841            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
 842        )
 843        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
 844        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
 845        try:
 846            active_account = gcp_util.get_active_gcloud_account()
 847        except Exception as e:
 848            active_account = workspace_info["workspace"]["createdBy"]
 849            logging.error(
 850                f"Encountered the following exception while attempting to get current GCP account: {e}. "
 851                f"Will set the owner of the new metadata version as the workspace creator instead."
 852            )
 853        gcp_util.upload_blob(
 854            source_file=zip_file_name,
 855            destination_path=path_to_upload_to,
 856            custom_metadata={
 857                "createdBy": active_account,
 858                "entityType": entity_type,
 859                "timestamp": timestamp_ms,
 860                "description": version_name,
 861            }
 862        )
 863
 864    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
 865        """
 866        Add a user comment to a submission in Terra.
 867
 868        **Args:**
 869        - submission_id (str): The ID of the submission to add a comment to.
 870        - user_comment (str): The comment to add to the submission.
 871
 872        **Returns:**
 873        - requests.Response: The response from the request.
 874        """
 875        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
 876        return self.request_util.run_request(
 877            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
 878            method=PATCH,
 879            content_type=APPLICATION_JSON,
 880            data=json.dumps({"userComment": user_comment}),
 881        )
 882
 883    def initiate_submission(
 884            self,
 885            method_config_namespace: str,
 886            method_config_name: str,
 887            entity_type: str,
 888            entity_name: str,
 889            expression: str,
 890            user_comment: Optional[str],
 891            use_call_cache: bool = True
 892    ) -> requests.Response:
 893        """
 894        Initiate a submission within a Terra workspace.
 895
 896        Note - the workflow being initiated MUST already be imported into the workspace.
 897
 898        **Args:**
 899        - method_config_namespace (str): The namespace of the method configuration.
 900        - method_config_name (str): The name of the method configuration to use for the submission
 901        (i.e. the workflow name).
 902        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
 903        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
 904        "sample_set_1").
 905        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
 906        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
 907        be launched PER SAMPLE, the expression should be `this.samples`.
 908        - user_comment (str, optional): The user comment to add to the submission.
 909        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
 910
 911        **Returns:**
 912        - requests.Response: The response from the request.
 913        """
 914        payload = {
 915            "methodConfigurationNamespace": method_config_namespace,
 916            "methodConfigurationName": method_config_name,
 917            "entityType": entity_type,
 918            "entityName": entity_name,
 919            "expression": expression,
 920            "useCallCache": use_call_cache,
 921            "deleteIntermediateOutputFiles": False,
 922            "useReferenceDisks": False,
 923            "ignoreEmptyOutputs": False,
 924        }
 925        if user_comment:
 926            payload["userComment"] = user_comment
 927
 928        return self.request_util.run_request(
 929            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
 930            method=POST,
 931            content_type=APPLICATION_JSON,
 932            data=json.dumps(payload),
 933        )
 934      
 935    def retry_failed_submission(self, submission_id: str) -> requests.Response:
 936        """
 937        Retry a failed submission in Terra.
 938
 939        **Args:**
 940        - submission_id (str): The ID of the submission to retry.
 941
 942        **Returns:**
 943        - requests.Response: The response from the request.
 944        """
 945        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
 946        payload = {"retryType": "Failed"}
 947        logging.info(
 948            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
 949        )
 950        return self.request_util.run_request(
 951            uri=url,
 952            method=POST,
 953            content_type=APPLICATION_JSON,
 954            data=json.dumps(payload)
 955        )
 956
 957    def get_submission_status(self, submission_id: str) -> requests.Response:
 958        """
 959        Get the status of a submission in Terra.
 960
 961        **Args:**
 962        - submission_id (str): The ID of the submission.
 963
 964        **Returns:**
 965        - requests.Response: The response from the request.
 966        """
 967        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
 968        logging.info(
 969            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
 970        )
 971        return self.request_util.run_request(
 972            uri=url,
 973            method=GET
 974        )
 975
 976    def get_workspace_submission_status(self) ->requests.Response:
 977        """
 978        Get the status of all submissions in a Terra workspace.
 979
 980        **Returns:**
 981        - requests.Response: The response from the request.
 982        """
 983        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions"
 984        logging.info(
 985            f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}"
 986        )
 987        return self.request_util.run_request(
 988            uri=url,
 989            method=GET
 990        )
 991
 992    def get_workspace_submission_stats(self, method_name: Optional[str] = None, retrieve_running_ids: bool = True) -> dict:
 993        """
 994        Get submission statistics for a Terra workspace, optionally filtered by method name.
 995
 996        **Args:**
 997        - method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
 998        - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running.
 999          Defaults to `True`.
1000
1001        **Returns:**
1002        - dict: A dictionary containing submission statistics, including counts of workflows in various states
1003        """
1004        submissions = [
1005            s
1006            for s in self.get_workspace_submission_status().json()
1007            # If method_name is provided, filter submissions to only those with that method name
1008            if (s["methodConfigurationName"] == method_name if method_name else True)
1009        ]
1010        method_append = f"with method name '{method_name}'" if method_name else ""
1011        logging.info(
1012            f"{len(submissions)} submissions in "
1013            f"{self.billing_project}/{self.workspace_name} {method_append}"
1014        )
1015        workflow_statuses = {
1016            "submitted": 0,
1017            "queued": 0,
1018            "running": 0,
1019            "aborting": 0,
1020            "aborted": 0,
1021            "failed": 0,
1022            "succeeded": 0,
1023            "id_still_running": [] if retrieve_running_ids else "NA"
1024        }
1025        for submission in submissions:
1026            wf_status = submission["workflowStatuses"]
1027            for status, count in wf_status.items():
1028                if status.lower() in workflow_statuses:
1029                    workflow_statuses[status.lower()] += count
1030            # Only look at individual submissions if retrieve running ids set to true
1031            # and only look at submissions that are still running
1032            if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]:
1033                submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json()
1034                for workflow in submission_detailed["workflows"]:
1035                    if workflow["status"] in ["Running", "Submitted", "Queued"]:
1036                        entity_id = workflow["workflowEntity"]["entityName"]
1037                        workflow_statuses['id_still_running'].append(entity_id)  # type: ignore[attr-defined]
1038        running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued']  # type: ignore[operator]
1039        if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count:  # type: ignore[arg-type]
1040            logging.warning(
1041                f"Discrepancy found between total running/pending workflows, {running_count}, "
1042                f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. "  # type: ignore[arg-type]
1043                "Workflows may have completed between API calls."
1044            )
1045        denominator = workflow_statuses['succeeded'] + workflow_statuses['failed']  # type: ignore[operator]
1046        if denominator > 0:
1047            workflow_statuses['success_rate'] = round(
1048                workflow_statuses['succeeded'] / denominator,
1049                2
1050            )
1051        else:
1052            workflow_statuses['success_rate'] = 0.0
1053        return workflow_statuses
MEMBER = 'member'
ADMIN = 'admin'
class Terra:
35class Terra:
36    """Class for generic Terra utilities."""
37
38    def __init__(self, request_util: RunRequest, env: str = "prod"):
39        """
40        Initialize the Terra class.
41
42        **Args:**
43        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
44            request utility class to handle HTTP requests.
45        """
46        self.request_util = request_util
47        """@private"""
48
49    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
50        """
51        Fetch the list of accessible workspaces.
52
53        **Args:**
54        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
55
56        **Returns:**
57        - requests.Response: The response from the request.
58        """
59        fields_str = "fields=" + ",".join(fields) if fields else ""
60        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
61        return self.request_util.run_request(
62            uri=url,
63            method=GET
64        )
65
66    def get_pet_account_json(self) -> requests.Response:
67        """
68        Get the service account JSON.
69
70        **Returns:**
71        - requests.Response: The response from the request.
72        """
73        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
74        return self.request_util.run_request(
75            uri=url,
76            method=GET
77        )

Class for generic Terra utilities.

Terra(request_util: ops_utils.request_util.RunRequest, env: str = 'prod')
38    def __init__(self, request_util: RunRequest, env: str = "prod"):
39        """
40        Initialize the Terra class.
41
42        **Args:**
43        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
44            request utility class to handle HTTP requests.
45        """
46        self.request_util = request_util
47        """@private"""

Initialize the Terra class.

Args:

def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.models.Response:
49    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
50        """
51        Fetch the list of accessible workspaces.
52
53        **Args:**
54        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
55
56        **Returns:**
57        - requests.Response: The response from the request.
58        """
59        fields_str = "fields=" + ",".join(fields) if fields else ""
60        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
61        return self.request_util.run_request(
62            uri=url,
63            method=GET
64        )

Fetch the list of accessible workspaces.

Args:

  • fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.

Returns:

  • requests.Response: The response from the request.
def get_pet_account_json(self) -> requests.models.Response:
66    def get_pet_account_json(self) -> requests.Response:
67        """
68        Get the service account JSON.
69
70        **Returns:**
71        - requests.Response: The response from the request.
72        """
73        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
74        return self.request_util.run_request(
75            uri=url,
76            method=GET
77        )

Get the service account JSON.

Returns:

  • requests.Response: The response from the request.
class TerraGroups:
 80class TerraGroups:
 81    """A class to manage Terra groups and their memberships."""
 82
 83    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
 84    """@private"""
 85    CONFLICT_STATUS_CODE = 409
 86    """@private"""
 87
 88    def __init__(self, request_util: RunRequest):
 89        """
 90        Initialize the TerraGroups class.
 91
 92        **Args:**
 93        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
 94         utility class to handle HTTP requests.
 95        """
 96        self.request_util = request_util
 97        """@private"""
 98
 99    def _check_role(self, role: str) -> None:
100        """
101        Check if the role is valid.
102
103        Args:
104            role (str): The role to check.
105
106        Raises:
107            ValueError: If the role is not one of the allowed options.
108        """
109        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
110            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
111
112    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
113        """
114        Remove a user from a group.
115
116        **Args:**
117        - group (str): The name of the group.
118        - email (str): The email of the user to remove.
119        - role (str): The role of the user in the group
120            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
121
122        **Returns:**
123        - requests.Response: The response from the request.
124        """
125        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
126        self._check_role(role)
127        logging.info(f"Removing {email} from group {group}")
128        return self.request_util.run_request(
129            uri=url,
130            method=DELETE
131        )
132
133    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
134        """
135        Create a new group.
136
137        **Args:**
138        - group_name (str): The name of the group to create.
139        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
140
141        **Returns:**
142        - requests.Response: The response from the request.
143        """
144        url = f"{SAM_LINK}/groups/v1/{group_name}"
145        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
146        response = self.request_util.run_request(
147            uri=url,
148            method=POST,
149            accept_return_codes=accept_return_codes
150        )
151        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
152            logging.info(f"Group {group_name} already exists. Continuing.")
153            return response
154        else:
155            logging.info(f"Created group {group_name}")
156            return response
157
158    def delete_group(self, group_name: str) -> requests.Response:
159        """
160        Delete a group.
161
162        **Args:**
163        - group_name (str): The name of the group to delete.
164
165        **Returns:**
166        - requests.Response: The response from the request.
167        """
168        url = f"{SAM_LINK}/groups/v1/{group_name}"
169        logging.info(f"Deleting group {group_name}")
170        return self.request_util.run_request(
171            uri=url,
172            method=DELETE
173        )
174
175    def add_user_to_group(
176            self, group: str, email: str, role: str, continue_if_exists: bool = False
177    ) -> requests.Response:
178        """
179        Add a user to a group.
180
181        **Args:**
182        - group (str): The name of the group.
183        - email (str): The email of the user to add.
184        - role (str): The role of the user in the group
185            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
186        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
187                Defaults to `False`.
188
189        **Returns:**
190        - requests.Response: The response from the request.
191        """
192        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
193        self._check_role(role)
194        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
195        logging.info(f"Adding {email} to group {group} as {role}")
196        return self.request_util.run_request(
197            uri=url,
198            method=PUT,
199            accept_return_codes=accept_return_codes
200        )

A class to manage Terra groups and their memberships.

TerraGroups(request_util: ops_utils.request_util.RunRequest)
88    def __init__(self, request_util: RunRequest):
89        """
90        Initialize the TerraGroups class.
91
92        **Args:**
93        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
94         utility class to handle HTTP requests.
95        """
96        self.request_util = request_util
97        """@private"""

Initialize the TerraGroups class.

Args:

def remove_user_from_group(self, group: str, email: str, role: str) -> requests.models.Response:
112    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
113        """
114        Remove a user from a group.
115
116        **Args:**
117        - group (str): The name of the group.
118        - email (str): The email of the user to remove.
119        - role (str): The role of the user in the group
120            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
121
122        **Returns:**
123        - requests.Response: The response from the request.
124        """
125        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
126        self._check_role(role)
127        logging.info(f"Removing {email} from group {group}")
128        return self.request_util.run_request(
129            uri=url,
130            method=DELETE
131        )

Remove a user from a group.

Args:

Returns:

  • requests.Response: The response from the request.
def create_group( self, group_name: str, continue_if_exists: bool = False) -> requests.models.Response:
133    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
134        """
135        Create a new group.
136
137        **Args:**
138        - group_name (str): The name of the group to create.
139        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
140
141        **Returns:**
142        - requests.Response: The response from the request.
143        """
144        url = f"{SAM_LINK}/groups/v1/{group_name}"
145        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
146        response = self.request_util.run_request(
147            uri=url,
148            method=POST,
149            accept_return_codes=accept_return_codes
150        )
151        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
152            logging.info(f"Group {group_name} already exists. Continuing.")
153            return response
154        else:
155            logging.info(f"Created group {group_name}")
156            return response

Create a new group.

Args:

  • group_name (str): The name of the group to create.
  • continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_group(self, group_name: str) -> requests.models.Response:
158    def delete_group(self, group_name: str) -> requests.Response:
159        """
160        Delete a group.
161
162        **Args:**
163        - group_name (str): The name of the group to delete.
164
165        **Returns:**
166        - requests.Response: The response from the request.
167        """
168        url = f"{SAM_LINK}/groups/v1/{group_name}"
169        logging.info(f"Deleting group {group_name}")
170        return self.request_util.run_request(
171            uri=url,
172            method=DELETE
173        )

Delete a group.

Args:

  • group_name (str): The name of the group to delete.

Returns:

  • requests.Response: The response from the request.
def add_user_to_group( self, group: str, email: str, role: str, continue_if_exists: bool = False) -> requests.models.Response:
175    def add_user_to_group(
176            self, group: str, email: str, role: str, continue_if_exists: bool = False
177    ) -> requests.Response:
178        """
179        Add a user to a group.
180
181        **Args:**
182        - group (str): The name of the group.
183        - email (str): The email of the user to add.
184        - role (str): The role of the user in the group
185            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
186        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
187                Defaults to `False`.
188
189        **Returns:**
190        - requests.Response: The response from the request.
191        """
192        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
193        self._check_role(role)
194        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
195        logging.info(f"Adding {email} to group {group} as {role}")
196        return self.request_util.run_request(
197            uri=url,
198            method=PUT,
199            accept_return_codes=accept_return_codes
200        )

Add a user to a group.

Args:

  • group (str): The name of the group.
  • email (str): The email of the user to add.
  • role (str): The role of the user in the group (must be one of ops_utils.terra_utils.MEMBER or ops_utils.terra_utils.ADMIN).
  • continue_if_exists (bool, optional): Whether to continue if the user is already in the group. Defaults to False.

Returns:

  • requests.Response: The response from the request.
class TerraWorkspace:
 203class TerraWorkspace:
 204    """Terra workspace class to manage workspaces and their attributes."""
 205
 206    CONFLICT_STATUS_CODE = 409
 207    """@private"""
 208
 209    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
 210        """
 211        Initialize the TerraWorkspace class.
 212
 213        **Args:**
 214        - billing_project (str): The billing project associated with the workspace.
 215        - workspace_name (str): The name of the workspace.
 216        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
 217            request utility class to handle HTTP requests.
 218        """
 219        self.billing_project = billing_project
 220        """@private"""
 221        self.workspace_name = workspace_name
 222        """@private"""
 223        self.workspace_id = None
 224        """@private"""
 225        self.resource_id = None
 226        """@private"""
 227        self.storage_container = None
 228        """@private"""
 229        self.bucket = None
 230        """@private"""
 231        self.wds_url = None
 232        """@private"""
 233        self.account_url: Optional[str] = None
 234        """@private"""
 235        self.request_util = request_util
 236        """@private"""
 237        if env.lower() == "dev":
 238            self.terra_link = TERRA_DEV_LINK
 239            """@private"""
 240        elif env.lower() == "prod":
 241            self.terra_link = TERRA_PROD_LINK
 242            """@private"""
 243        else:
 244            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
 245
 246    def __repr__(self) -> str:
 247        """
 248        Return a string representation of the TerraWorkspace instance.
 249
 250        Returns:
 251            str: The string representation of the TerraWorkspace instance.
 252        """
 253        return f"{self.billing_project}/{self.workspace_name}"
 254
 255    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any:
 256        """
 257        Yield all entity metrics from the workspace.
 258
 259        Args:
 260            entity (str): The type of entity to query.
 261            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
 262
 263        Yields:
 264            Any: The JSON response containing entity metrics.
 265        """
 266        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
 267        response = self.request_util.run_request(
 268            uri=url,
 269            method=GET,
 270            content_type=APPLICATION_JSON
 271        )
 272        raw_text = response.text
 273        first_page_json = json.loads(
 274            raw_text,
 275            parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 276        )
 277        yield first_page_json
 278        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
 279        logging.info(
 280            f"Looping through {total_pages} pages of data")
 281
 282        for page in range(2, total_pages + 1):
 283            logging.info(f"Getting page {page} of {total_pages}")
 284            next_page = self.request_util.run_request(
 285                uri=url,
 286                method=GET,
 287                content_type=APPLICATION_JSON,
 288                params={"page": page}
 289            )
 290            raw_text = next_page.text
 291            page_json = json.loads(
 292                raw_text,
 293                parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 294            )
 295            yield page_json
 296
 297    @staticmethod
 298    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
 299        """Check that all headers follow the standards required by TDR.
 300
 301        **Args:**
 302        - table_name (str): The name of the Terra table.
 303        - headers (list[str]): The headers of the Terra table to validate.
 304
 305        **Raises:**
 306        - ValueError if any headers are considered invalid by TDR standards
 307        """
 308        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
 309        tdr_max_header_length = 63
 310
 311        headers_containing_too_many_characters = []
 312        headers_contain_invalid_characters = []
 313
 314        for header in headers:
 315            if len(header) > tdr_max_header_length:
 316                headers_containing_too_many_characters.append(header)
 317            if not re.match(tdr_header_allowed_pattern, header):
 318                headers_contain_invalid_characters.append(header)
 319
 320        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
 321        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
 322        header naming."""
 323        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
 324        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
 325        allowed in TDR is {tdr_max_header_length}.\n"""
 326        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
 327        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
 328        only contain numbers, letters, and underscore characters.\n"""
 329
 330        error_to_report = ""
 331        if headers_containing_too_many_characters:
 332            error_to_report += too_many_characters_error_message
 333        if headers_contain_invalid_characters:
 334            error_to_report += invalid_characters_error_message
 335        if error_to_report:
 336            error_to_report += base_error_message
 337            raise ValueError(error_to_report)
 338
 339    def get_workspace_info(self) -> requests.Response:
 340        """
 341        Get workspace information.
 342
 343        **Returns:**
 344        - requests.Response: The response from the request.
 345        """
 346        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
 347        logging.info(
 348            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
 349        return self.request_util.run_request(uri=url, method=GET)
 350
 351    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
 352        """
 353        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
 354
 355        **Args:**
 356        - entity_type (str): The type of entity to get metrics for.
 357        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
 358
 359        **Returns:**
 360        - list[dict]: A list of dictionaries containing entity metrics.
 361        """
 362        results = []
 363        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
 364
 365        for page in self._yield_all_entity_metrics(entity=entity_type):
 366            results.extend(page["results"])
 367
 368        # If remove_dicts is True, remove dictionaries from the workspace metrics
 369        if remove_dicts:
 370            for row in results:
 371                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
 372        return results
 373
 374    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
 375        """
 376        Get specific entity metrics for a given entity type and name.
 377
 378        **Args:**
 379        - entity_type (str): The type of entity to get metrics for.
 380        - entity_name (str): The name of the entity to get metrics for.
 381
 382        **Returns:**
 383        - requests.Response: The response from the request.
 384        """
 385        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
 386        return self.request_util.run_request(uri=url, method=GET)
 387
 388    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
 389        """
 390        Remove dictionaries from the attributes.
 391
 392        Args:
 393            attributes (dict): The attributes to remove dictionaries from.
 394
 395        Returns:
 396            dict: The updated attributes with no dictionaries.
 397        """
 398        for key, value in attributes.items():
 399            attributes[key] = self._remove_dict_from_cell(value)
 400        return attributes
 401
 402    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
 403        """
 404        Remove a dictionary from a cell.
 405
 406        Args:
 407            cell_value (Any): The dictionary to remove.
 408
 409        Returns:
 410            Any: The updated cell with no dictionaries.
 411        """
 412        if isinstance(cell_value, dict):
 413            entity_name = cell_value.get("entityName")
 414            # If the cell value is a dictionary, check if it has an entityName key
 415            if entity_name:
 416                # If the cell value is a dictionary with an entityName key, return the entityName
 417                return entity_name
 418            entity_list = cell_value.get("items")
 419            if entity_list or entity_list == []:
 420                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
 421                return [
 422                    self._remove_dict_from_cell(entity) for entity in entity_list
 423                ]
 424            return cell_value
 425        return cell_value
 426
 427    def get_workspace_bucket(self) -> str:
 428        """
 429        Get the workspace bucket name. Does not include the `gs://` prefix.
 430
 431        **Returns:**
 432        - str: The bucket name.
 433        """
 434        return self.get_workspace_info().json()["workspace"]["bucketName"]
 435
 436    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
 437        """
 438        Get workspace entity information.
 439
 440        **Args:**
 441        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
 442
 443        **Returns:**
 444        - requests.Response: The response from the request.
 445        """
 446        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
 447        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
 448        return self.request_util.run_request(uri=url, method=GET)
 449
 450    def get_workspace_acl(self) -> requests.Response:
 451        """
 452        Get the workspace access control list (ACL).
 453
 454        **Returns:**
 455        - requests.Response: The response from the request.
 456        """
 457        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
 458        return self.request_util.run_request(
 459            uri=url,
 460            method=GET
 461        )
 462
 463    def update_user_acl(
 464            self,
 465            email: str,
 466            access_level: str,
 467            can_share: bool = False,
 468            can_compute: bool = False,
 469            invite_users_not_found: bool = False,
 470    ) -> requests.Response:
 471        """
 472        Update the access control list (ACL) for a user in the workspace.
 473
 474        **Args:**
 475        - email (str): The email of the user.
 476        - access_level (str): The access level to grant to the user.
 477        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
 478        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
 479        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 480                the workspace. Defaults to `False`
 481
 482        **Returns:**
 483        - requests.Response: The response from the request.
 484        """
 485        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 486              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 487        payload = {
 488            "email": email,
 489            "accessLevel": access_level,
 490            "canShare": can_share,
 491            "canCompute": can_compute,
 492        }
 493        logging.info(
 494            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
 495        response = self.request_util.run_request(
 496            uri=url,
 497            method=PATCH,
 498            content_type=APPLICATION_JSON,
 499            data="[" + json.dumps(payload) + "]"
 500        )
 501
 502        if response.json()["usersNotFound"] and not invite_users_not_found:
 503            # Will be a list of one user
 504            user_not_found = response.json()["usersNotFound"][0]
 505            raise Exception(
 506                f'The user {user_not_found["email"]} was not found and access was not updated'
 507            )
 508        return response
 509
 510    @deprecated(
 511        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
 512    )
 513    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
 514        """
 515        Update the metadata for a library dataset.
 516
 517        **Args:**
 518        - library_metadata (dict): The metadata to update.
 519        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
 520
 521        **Returns:**
 522        - requests.Response: The response from the request.
 523        """
 524        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
 525              f"/metadata?validate={str(validate).lower()}"
 526        return self.request_util.run_request(
 527            uri=acl,
 528            method=PUT,
 529            data=json.dumps(library_metadata)
 530        )
 531
 532    def update_multiple_users_acl(
 533            self, acl_list: list[dict], invite_users_not_found: bool = False
 534    ) -> requests.Response:
 535        """
 536        Update the access control list (ACL) for multiple users in the workspace.
 537
 538        **Args:**
 539        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
 540        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 541                the workspace. Defaults to `False`
 542
 543        **Returns:**
 544        - requests.Response: The response from the request.
 545        """
 546        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 547            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 548        logging.info(
 549            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
 550        response = self.request_util.run_request(
 551            uri=url,
 552            method=PATCH,
 553            content_type=APPLICATION_JSON,
 554            data=json.dumps(acl_list)
 555        )
 556
 557        if response.json()["usersNotFound"] and not invite_users_not_found:
 558            # Will be a list of one user
 559            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
 560            raise Exception(
 561                f"The following users were not found and access was not updated: {users_not_found}"
 562            )
 563        return response
 564
 565    def create_workspace(
 566            self,
 567            auth_domain: list[dict] = [],
 568            attributes: dict = {},
 569            continue_if_exists: bool = False,
 570    ) -> requests.Response:
 571        """
 572        Create a new workspace in Terra.
 573
 574        **Args:**
 575        - auth_domain (list[dict], optional): A list of authorization domains. Should look
 576                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
 577        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
 578        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
 579
 580        **Returns:**
 581        - requests.Response: The response from the request.
 582        """
 583        payload = {
 584            "namespace": self.billing_project,
 585            "name": self.workspace_name,
 586            "authorizationDomain": auth_domain,
 587            "attributes": attributes,
 588            "cloudPlatform": GCP
 589        }
 590        # If workspace already exists then continue if exists
 591        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 592        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
 593        response = self.request_util.run_request(
 594            uri=f"{self.terra_link}/workspaces",
 595            method=POST,
 596            content_type=APPLICATION_JSON,
 597            data=json.dumps(payload),
 598            accept_return_codes=accept_return_codes
 599        )
 600        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
 601            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
 602        return response
 603
 604    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
 605        """
 606        Create an ingest dictionary for workspace attributes.
 607
 608        **Args:**
 609        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
 610
 611        **Returns:**
 612        - list[dict]: A list of dictionaries containing the workspace attributes.
 613        """
 614        # If not provided then call API to get it
 615        workspace_attributes = (
 616            workspace_attributes if workspace_attributes
 617            else self.get_workspace_info().json()["workspace"]["attributes"]
 618        )
 619
 620        ingest_dict = []
 621        for key, value in workspace_attributes.items():
 622            # If value is dict just use 'items' as value
 623            if isinstance(value, dict):
 624                value = value.get("items")
 625            # If value is list convert to comma separated string
 626            if isinstance(value, list):
 627                value = ", ".join(value)
 628            ingest_dict.append(
 629                {
 630                    "attribute": key,
 631                    "value": str(value) if value else None
 632                }
 633            )
 634        return ingest_dict
 635
 636    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
 637        """
 638        Upload metadata to the workspace table.
 639
 640        **Args:**
 641        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
 642
 643        **Returns:**
 644        - requests.Response: The response from the request.
 645        """
 646        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
 647        data = {"entities": open(entities_tsv, "rb")}
 648        return self.request_util.upload_file(
 649            uri=endpoint,
 650            data=data
 651        )
 652
 653    def get_workspace_workflows(self) -> requests.Response:
 654        """
 655        Get the workflows for the workspace.
 656
 657        **Returns:**
 658        - requests.Response: The response from the request.
 659        """
 660        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
 661        return self.request_util.run_request(
 662            uri=uri,
 663            method=GET
 664        )
 665
 666    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
 667        """
 668        Import a workflow into the workspace.
 669
 670        **Args:**
 671        - workflow_dict (dict): The dictionary containing the workflow information.
 672        - continue_if_exists (bool, optional): Whether to continue if the workflow
 673                already exists. Defaults to `False`.
 674
 675        **Returns:**
 676        - requests.Response: The response from the request.
 677        """
 678        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
 679        workflow_json = json.dumps(workflow_dict)
 680        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 681        return self.request_util.run_request(
 682            uri=uri,
 683            method=POST,
 684            data=workflow_json,
 685            content_type=APPLICATION_JSON,
 686            accept_return_codes=accept_return_codes
 687        )
 688
 689    def delete_workspace(self) -> requests.Response:
 690        """
 691        Delete a Terra workspace.
 692
 693        **Returns:**
 694        - requests.Response: The response from the request.
 695        """
 696        return self.request_util.run_request(
 697            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
 698            method=DELETE
 699        )
 700
 701    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
 702        """
 703        Update the attributes for the workspace.
 704
 705        **Args:**
 706        - attributes (dict): The attributes to update.
 707
 708        **Returns:**
 709        - requests.Response: The response from the request.
 710        """
 711        return self.request_util.run_request(
 712            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
 713            method=PATCH,
 714            data=json.dumps(attributes),
 715            content_type=APPLICATION_JSON
 716        )
 717
 718    def leave_workspace(
 719            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
 720    ) -> requests.Response:
 721        """
 722        Leave a workspace. If workspace ID not supplied, will look it up.
 723
 724        **Args:**
 725        - workspace_id (str, optional): The workspace ID. Defaults to None.
 726        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
 727             Defaults to `False`.
 728
 729        **Returns:**
 730        - requests.Response: The response from the request.
 731        """
 732        if not workspace_id:
 733            workspace_info = self.get_workspace_info().json()
 734            workspace_id = workspace_info['workspace']['workspaceId']
 735        accepted_return_code = [403] if ignore_direct_access_error else []
 736
 737        res = self.request_util.run_request(
 738            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
 739            method=DELETE,
 740            accept_return_codes=accepted_return_code
 741        )
 742        if (res.status_code == 403
 743                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
 744            logging.info(
 745                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
 746                f"access to the workspace (they could be an owner on the billing project)"
 747            )
 748        return res
 749
 750    def change_workspace_public_setting(self, public: bool) -> requests.Response:
 751        """
 752        Change a workspace's public setting.
 753
 754        **Args:**
 755        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
 756         public, `False` otherwise.
 757
 758        **Returns:**
 759        - requests.Response: The response from the request.
 760        """
 761        body = [
 762            {
 763                "settingType": "PubliclyReadable",
 764                "config": {
 765                    "enabled": public
 766                }
 767            }
 768        ]
 769        return self.request_util.run_request(
 770            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
 771            method=PUT,
 772            content_type=APPLICATION_JSON,
 773            data=json.dumps(body)
 774        )
 775
 776    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
 777        """
 778        Check if a workspace is public.
 779
 780        **Args:**
 781        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
 782        it up if not provided. Defaults to None.
 783
 784        **Returns:**
 785        - requests.Response: The response from the request.
 786        """
 787        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
 788        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
 789        return self.request_util.run_request(
 790            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
 791            method=GET
 792        )
 793
 794    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
 795        """Delete an entire entity table from a Terra workspace.
 796
 797        **Args:**
 798        - entity_to_delete (str): The name of the entity table to delete.
 799
 800        **Returns:**
 801        - requests.Response: The response from the request.
 802        """
 803        response = self.request_util.run_request(
 804            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
 805            method=DELETE
 806        )
 807        if response.status_code == 204:
 808            logging.info(
 809                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
 810                f"'{self.billing_project}/{self.workspace_name}'"
 811            )
 812        else:
 813            logging.error(
 814                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
 815                f"table: {response.text}"
 816            )
 817        return response
 818
 819    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
 820        """Save an entity table version in a Terra workspace.
 821
 822        **Args:**
 823        - entity_type (str): The name of the entity table to save a new version for
 824        - version_name (str): The name of the new version
 825        """
 826        # Get the workspace metrics
 827        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
 828        file_name = f"{entity_type}.json"
 829        # Write the workspace metrics to a JSON file
 830        with open(file_name, "w") as json_file:
 831            json.dump(workspace_metrics, json_file)
 832
 833        # Create a zip file with the same naming convention that Terra backend uses
 834        timestamp_ms = int(time.time() * 1000)
 835        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
 836        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
 837            zipf.write(file_name, arcname=f"json/{file_name}")
 838
 839        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
 840        workspace_info = self.get_workspace_info().json()
 841        path_to_upload_to = os.path.join(
 842            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
 843        )
 844        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
 845        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
 846        try:
 847            active_account = gcp_util.get_active_gcloud_account()
 848        except Exception as e:
 849            active_account = workspace_info["workspace"]["createdBy"]
 850            logging.error(
 851                f"Encountered the following exception while attempting to get current GCP account: {e}. "
 852                f"Will set the owner of the new metadata version as the workspace creator instead."
 853            )
 854        gcp_util.upload_blob(
 855            source_file=zip_file_name,
 856            destination_path=path_to_upload_to,
 857            custom_metadata={
 858                "createdBy": active_account,
 859                "entityType": entity_type,
 860                "timestamp": timestamp_ms,
 861                "description": version_name,
 862            }
 863        )
 864
 865    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
 866        """
 867        Add a user comment to a submission in Terra.
 868
 869        **Args:**
 870        - submission_id (str): The ID of the submission to add a comment to.
 871        - user_comment (str): The comment to add to the submission.
 872
 873        **Returns:**
 874        - requests.Response: The response from the request.
 875        """
 876        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
 877        return self.request_util.run_request(
 878            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
 879            method=PATCH,
 880            content_type=APPLICATION_JSON,
 881            data=json.dumps({"userComment": user_comment}),
 882        )
 883
 884    def initiate_submission(
 885            self,
 886            method_config_namespace: str,
 887            method_config_name: str,
 888            entity_type: str,
 889            entity_name: str,
 890            expression: str,
 891            user_comment: Optional[str],
 892            use_call_cache: bool = True
 893    ) -> requests.Response:
 894        """
 895        Initiate a submission within a Terra workspace.
 896
 897        Note - the workflow being initiated MUST already be imported into the workspace.
 898
 899        **Args:**
 900        - method_config_namespace (str): The namespace of the method configuration.
 901        - method_config_name (str): The name of the method configuration to use for the submission
 902        (i.e. the workflow name).
 903        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
 904        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
 905        "sample_set_1").
 906        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
 907        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
 908        be launched PER SAMPLE, the expression should be `this.samples`.
 909        - user_comment (str, optional): The user comment to add to the submission.
 910        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
 911
 912        **Returns:**
 913        - requests.Response: The response from the request.
 914        """
 915        payload = {
 916            "methodConfigurationNamespace": method_config_namespace,
 917            "methodConfigurationName": method_config_name,
 918            "entityType": entity_type,
 919            "entityName": entity_name,
 920            "expression": expression,
 921            "useCallCache": use_call_cache,
 922            "deleteIntermediateOutputFiles": False,
 923            "useReferenceDisks": False,
 924            "ignoreEmptyOutputs": False,
 925        }
 926        if user_comment:
 927            payload["userComment"] = user_comment
 928
 929        return self.request_util.run_request(
 930            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
 931            method=POST,
 932            content_type=APPLICATION_JSON,
 933            data=json.dumps(payload),
 934        )
 935      
 936    def retry_failed_submission(self, submission_id: str) -> requests.Response:
 937        """
 938        Retry a failed submission in Terra.
 939
 940        **Args:**
 941        - submission_id (str): The ID of the submission to retry.
 942
 943        **Returns:**
 944        - requests.Response: The response from the request.
 945        """
 946        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
 947        payload = {"retryType": "Failed"}
 948        logging.info(
 949            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
 950        )
 951        return self.request_util.run_request(
 952            uri=url,
 953            method=POST,
 954            content_type=APPLICATION_JSON,
 955            data=json.dumps(payload)
 956        )
 957
 958    def get_submission_status(self, submission_id: str) -> requests.Response:
 959        """
 960        Get the status of a submission in Terra.
 961
 962        **Args:**
 963        - submission_id (str): The ID of the submission.
 964
 965        **Returns:**
 966        - requests.Response: The response from the request.
 967        """
 968        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
 969        logging.info(
 970            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
 971        )
 972        return self.request_util.run_request(
 973            uri=url,
 974            method=GET
 975        )
 976
 977    def get_workspace_submission_status(self) ->requests.Response:
 978        """
 979        Get the status of all submissions in a Terra workspace.
 980
 981        **Returns:**
 982        - requests.Response: The response from the request.
 983        """
 984        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions"
 985        logging.info(
 986            f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}"
 987        )
 988        return self.request_util.run_request(
 989            uri=url,
 990            method=GET
 991        )
 992
 993    def get_workspace_submission_stats(self, method_name: Optional[str] = None, retrieve_running_ids: bool = True) -> dict:
 994        """
 995        Get submission statistics for a Terra workspace, optionally filtered by method name.
 996
 997        **Args:**
 998        - method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
 999        - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running.
1000          Defaults to `True`.
1001
1002        **Returns:**
1003        - dict: A dictionary containing submission statistics, including counts of workflows in various states
1004        """
1005        submissions = [
1006            s
1007            for s in self.get_workspace_submission_status().json()
1008            # If method_name is provided, filter submissions to only those with that method name
1009            if (s["methodConfigurationName"] == method_name if method_name else True)
1010        ]
1011        method_append = f"with method name '{method_name}'" if method_name else ""
1012        logging.info(
1013            f"{len(submissions)} submissions in "
1014            f"{self.billing_project}/{self.workspace_name} {method_append}"
1015        )
1016        workflow_statuses = {
1017            "submitted": 0,
1018            "queued": 0,
1019            "running": 0,
1020            "aborting": 0,
1021            "aborted": 0,
1022            "failed": 0,
1023            "succeeded": 0,
1024            "id_still_running": [] if retrieve_running_ids else "NA"
1025        }
1026        for submission in submissions:
1027            wf_status = submission["workflowStatuses"]
1028            for status, count in wf_status.items():
1029                if status.lower() in workflow_statuses:
1030                    workflow_statuses[status.lower()] += count
1031            # Only look at individual submissions if retrieve running ids set to true
1032            # and only look at submissions that are still running
1033            if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]:
1034                submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json()
1035                for workflow in submission_detailed["workflows"]:
1036                    if workflow["status"] in ["Running", "Submitted", "Queued"]:
1037                        entity_id = workflow["workflowEntity"]["entityName"]
1038                        workflow_statuses['id_still_running'].append(entity_id)  # type: ignore[attr-defined]
1039        running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued']  # type: ignore[operator]
1040        if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count:  # type: ignore[arg-type]
1041            logging.warning(
1042                f"Discrepancy found between total running/pending workflows, {running_count}, "
1043                f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. "  # type: ignore[arg-type]
1044                "Workflows may have completed between API calls."
1045            )
1046        denominator = workflow_statuses['succeeded'] + workflow_statuses['failed']  # type: ignore[operator]
1047        if denominator > 0:
1048            workflow_statuses['success_rate'] = round(
1049                workflow_statuses['succeeded'] / denominator,
1050                2
1051            )
1052        else:
1053            workflow_statuses['success_rate'] = 0.0
1054        return workflow_statuses

Terra workspace class to manage workspaces and their attributes.

TerraWorkspace( billing_project: str, workspace_name: str, request_util: ops_utils.request_util.RunRequest, env: str = 'prod')
209    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
210        """
211        Initialize the TerraWorkspace class.
212
213        **Args:**
214        - billing_project (str): The billing project associated with the workspace.
215        - workspace_name (str): The name of the workspace.
216        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
217            request utility class to handle HTTP requests.
218        """
219        self.billing_project = billing_project
220        """@private"""
221        self.workspace_name = workspace_name
222        """@private"""
223        self.workspace_id = None
224        """@private"""
225        self.resource_id = None
226        """@private"""
227        self.storage_container = None
228        """@private"""
229        self.bucket = None
230        """@private"""
231        self.wds_url = None
232        """@private"""
233        self.account_url: Optional[str] = None
234        """@private"""
235        self.request_util = request_util
236        """@private"""
237        if env.lower() == "dev":
238            self.terra_link = TERRA_DEV_LINK
239            """@private"""
240        elif env.lower() == "prod":
241            self.terra_link = TERRA_PROD_LINK
242            """@private"""
243        else:
244            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")

Initialize the TerraWorkspace class.

Args:

  • billing_project (str): The billing project associated with the workspace.
  • workspace_name (str): The name of the workspace.
  • request_util (ops_utils.request_util.RunRequest): An instance of a request utility class to handle HTTP requests.
@staticmethod
def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
297    @staticmethod
298    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
299        """Check that all headers follow the standards required by TDR.
300
301        **Args:**
302        - table_name (str): The name of the Terra table.
303        - headers (list[str]): The headers of the Terra table to validate.
304
305        **Raises:**
306        - ValueError if any headers are considered invalid by TDR standards
307        """
308        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
309        tdr_max_header_length = 63
310
311        headers_containing_too_many_characters = []
312        headers_contain_invalid_characters = []
313
314        for header in headers:
315            if len(header) > tdr_max_header_length:
316                headers_containing_too_many_characters.append(header)
317            if not re.match(tdr_header_allowed_pattern, header):
318                headers_contain_invalid_characters.append(header)
319
320        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
321        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
322        header naming."""
323        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
324        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
325        allowed in TDR is {tdr_max_header_length}.\n"""
326        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
327        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
328        only contain numbers, letters, and underscore characters.\n"""
329
330        error_to_report = ""
331        if headers_containing_too_many_characters:
332            error_to_report += too_many_characters_error_message
333        if headers_contain_invalid_characters:
334            error_to_report += invalid_characters_error_message
335        if error_to_report:
336            error_to_report += base_error_message
337            raise ValueError(error_to_report)

Check that all headers follow the standards required by TDR.

Args:

  • table_name (str): The name of the Terra table.
  • headers (list[str]): The headers of the Terra table to validate.

Raises:

  • ValueError if any headers are considered invalid by TDR standards
def get_workspace_info(self) -> requests.models.Response:
339    def get_workspace_info(self) -> requests.Response:
340        """
341        Get workspace information.
342
343        **Returns:**
344        - requests.Response: The response from the request.
345        """
346        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
347        logging.info(
348            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
349        return self.request_util.run_request(uri=url, method=GET)

Get workspace information.

Returns:

  • requests.Response: The response from the request.
def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
351    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
352        """
353        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
354
355        **Args:**
356        - entity_type (str): The type of entity to get metrics for.
357        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
358
359        **Returns:**
360        - list[dict]: A list of dictionaries containing entity metrics.
361        """
362        results = []
363        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
364
365        for page in self._yield_all_entity_metrics(entity=entity_type):
366            results.extend(page["results"])
367
368        # If remove_dicts is True, remove dictionaries from the workspace metrics
369        if remove_dicts:
370            for row in results:
371                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
372        return results

Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).

Args:

  • entity_type (str): The type of entity to get metrics for.
  • remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to False.

Returns:

  • list[dict]: A list of dictionaries containing entity metrics.
def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.models.Response:
374    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
375        """
376        Get specific entity metrics for a given entity type and name.
377
378        **Args:**
379        - entity_type (str): The type of entity to get metrics for.
380        - entity_name (str): The name of the entity to get metrics for.
381
382        **Returns:**
383        - requests.Response: The response from the request.
384        """
385        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
386        return self.request_util.run_request(uri=url, method=GET)

Get specific entity metrics for a given entity type and name.

Args:

  • entity_type (str): The type of entity to get metrics for.
  • entity_name (str): The name of the entity to get metrics for.

Returns:

  • requests.Response: The response from the request.
def get_workspace_bucket(self) -> str:
427    def get_workspace_bucket(self) -> str:
428        """
429        Get the workspace bucket name. Does not include the `gs://` prefix.
430
431        **Returns:**
432        - str: The bucket name.
433        """
434        return self.get_workspace_info().json()["workspace"]["bucketName"]

Get the workspace bucket name. Does not include the gs:// prefix.

Returns:

  • str: The bucket name.
def get_workspace_entity_info(self, use_cache: bool = True) -> requests.models.Response:
436    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
437        """
438        Get workspace entity information.
439
440        **Args:**
441        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
442
443        **Returns:**
444        - requests.Response: The response from the request.
445        """
446        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
447        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
448        return self.request_util.run_request(uri=url, method=GET)

Get workspace entity information.

Args:

  • use_cache (bool, optional): Whether to use cache. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def get_workspace_acl(self) -> requests.models.Response:
450    def get_workspace_acl(self) -> requests.Response:
451        """
452        Get the workspace access control list (ACL).
453
454        **Returns:**
455        - requests.Response: The response from the request.
456        """
457        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
458        return self.request_util.run_request(
459            uri=url,
460            method=GET
461        )

Get the workspace access control list (ACL).

Returns:

  • requests.Response: The response from the request.
def update_user_acl( self, email: str, access_level: str, can_share: bool = False, can_compute: bool = False, invite_users_not_found: bool = False) -> requests.models.Response:
463    def update_user_acl(
464            self,
465            email: str,
466            access_level: str,
467            can_share: bool = False,
468            can_compute: bool = False,
469            invite_users_not_found: bool = False,
470    ) -> requests.Response:
471        """
472        Update the access control list (ACL) for a user in the workspace.
473
474        **Args:**
475        - email (str): The email of the user.
476        - access_level (str): The access level to grant to the user.
477        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
478        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
479        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
480                the workspace. Defaults to `False`
481
482        **Returns:**
483        - requests.Response: The response from the request.
484        """
485        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
486              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
487        payload = {
488            "email": email,
489            "accessLevel": access_level,
490            "canShare": can_share,
491            "canCompute": can_compute,
492        }
493        logging.info(
494            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
495        response = self.request_util.run_request(
496            uri=url,
497            method=PATCH,
498            content_type=APPLICATION_JSON,
499            data="[" + json.dumps(payload) + "]"
500        )
501
502        if response.json()["usersNotFound"] and not invite_users_not_found:
503            # Will be a list of one user
504            user_not_found = response.json()["usersNotFound"][0]
505            raise Exception(
506                f'The user {user_not_found["email"]} was not found and access was not updated'
507            )
508        return response

Update the access control list (ACL) for a user in the workspace.

Args:

  • email (str): The email of the user.
  • access_level (str): The access level to grant to the user.
  • can_share (bool, optional): Whether the user can share the workspace. Defaults to False.
  • can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to False.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
@deprecated('Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.')
def put_metadata_for_library_dataset( self, library_metadata: dict, validate: bool = False) -> requests.models.Response:
510    @deprecated(
511        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
512    )
513    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
514        """
515        Update the metadata for a library dataset.
516
517        **Args:**
518        - library_metadata (dict): The metadata to update.
519        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
520
521        **Returns:**
522        - requests.Response: The response from the request.
523        """
524        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
525              f"/metadata?validate={str(validate).lower()}"
526        return self.request_util.run_request(
527            uri=acl,
528            method=PUT,
529            data=json.dumps(library_metadata)
530        )

Update the metadata for a library dataset.

Args:

  • library_metadata (dict): The metadata to update.
  • validate (bool, optional): Whether to validate the metadata. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def update_multiple_users_acl( self, acl_list: list[dict], invite_users_not_found: bool = False) -> requests.models.Response:
532    def update_multiple_users_acl(
533            self, acl_list: list[dict], invite_users_not_found: bool = False
534    ) -> requests.Response:
535        """
536        Update the access control list (ACL) for multiple users in the workspace.
537
538        **Args:**
539        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
540        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
541                the workspace. Defaults to `False`
542
543        **Returns:**
544        - requests.Response: The response from the request.
545        """
546        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
547            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
548        logging.info(
549            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
550        response = self.request_util.run_request(
551            uri=url,
552            method=PATCH,
553            content_type=APPLICATION_JSON,
554            data=json.dumps(acl_list)
555        )
556
557        if response.json()["usersNotFound"] and not invite_users_not_found:
558            # Will be a list of one user
559            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
560            raise Exception(
561                f"The following users were not found and access was not updated: {users_not_found}"
562            )
563        return response

Update the access control list (ACL) for multiple users in the workspace.

Args:

  • acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
def create_workspace( self, auth_domain: list[dict] = [], attributes: dict = {}, continue_if_exists: bool = False) -> requests.models.Response:
565    def create_workspace(
566            self,
567            auth_domain: list[dict] = [],
568            attributes: dict = {},
569            continue_if_exists: bool = False,
570    ) -> requests.Response:
571        """
572        Create a new workspace in Terra.
573
574        **Args:**
575        - auth_domain (list[dict], optional): A list of authorization domains. Should look
576                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
577        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
578        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
579
580        **Returns:**
581        - requests.Response: The response from the request.
582        """
583        payload = {
584            "namespace": self.billing_project,
585            "name": self.workspace_name,
586            "authorizationDomain": auth_domain,
587            "attributes": attributes,
588            "cloudPlatform": GCP
589        }
590        # If workspace already exists then continue if exists
591        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
592        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
593        response = self.request_util.run_request(
594            uri=f"{self.terra_link}/workspaces",
595            method=POST,
596            content_type=APPLICATION_JSON,
597            data=json.dumps(payload),
598            accept_return_codes=accept_return_codes
599        )
600        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
601            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
602        return response

Create a new workspace in Terra.

Args:

  • auth_domain (list[dict], optional): A list of authorization domains. Should look like [{"membersGroupName": "some_auth_domain"}]. Defaults to an empty list.
  • attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
  • continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
604    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
605        """
606        Create an ingest dictionary for workspace attributes.
607
608        **Args:**
609        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
610
611        **Returns:**
612        - list[dict]: A list of dictionaries containing the workspace attributes.
613        """
614        # If not provided then call API to get it
615        workspace_attributes = (
616            workspace_attributes if workspace_attributes
617            else self.get_workspace_info().json()["workspace"]["attributes"]
618        )
619
620        ingest_dict = []
621        for key, value in workspace_attributes.items():
622            # If value is dict just use 'items' as value
623            if isinstance(value, dict):
624                value = value.get("items")
625            # If value is list convert to comma separated string
626            if isinstance(value, list):
627                value = ", ".join(value)
628            ingest_dict.append(
629                {
630                    "attribute": key,
631                    "value": str(value) if value else None
632                }
633            )
634        return ingest_dict

Create an ingest dictionary for workspace attributes.

Args:

  • workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.

Returns:

  • list[dict]: A list of dictionaries containing the workspace attributes.
def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.models.Response:
636    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
637        """
638        Upload metadata to the workspace table.
639
640        **Args:**
641        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
642
643        **Returns:**
644        - requests.Response: The response from the request.
645        """
646        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
647        data = {"entities": open(entities_tsv, "rb")}
648        return self.request_util.upload_file(
649            uri=endpoint,
650            data=data
651        )

Upload metadata to the workspace table.

Args:

  • entities_tsv (str): The path to the TSV file containing the metadata to upload.

Returns:

  • requests.Response: The response from the request.
def get_workspace_workflows(self) -> requests.models.Response:
653    def get_workspace_workflows(self) -> requests.Response:
654        """
655        Get the workflows for the workspace.
656
657        **Returns:**
658        - requests.Response: The response from the request.
659        """
660        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
661        return self.request_util.run_request(
662            uri=uri,
663            method=GET
664        )

Get the workflows for the workspace.

Returns:

  • requests.Response: The response from the request.
def import_workflow( self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.models.Response:
666    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
667        """
668        Import a workflow into the workspace.
669
670        **Args:**
671        - workflow_dict (dict): The dictionary containing the workflow information.
672        - continue_if_exists (bool, optional): Whether to continue if the workflow
673                already exists. Defaults to `False`.
674
675        **Returns:**
676        - requests.Response: The response from the request.
677        """
678        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
679        workflow_json = json.dumps(workflow_dict)
680        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
681        return self.request_util.run_request(
682            uri=uri,
683            method=POST,
684            data=workflow_json,
685            content_type=APPLICATION_JSON,
686            accept_return_codes=accept_return_codes
687        )

Import a workflow into the workspace.

Args:

  • workflow_dict (dict): The dictionary containing the workflow information.
  • continue_if_exists (bool, optional): Whether to continue if the workflow already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_workspace(self) -> requests.models.Response:
689    def delete_workspace(self) -> requests.Response:
690        """
691        Delete a Terra workspace.
692
693        **Returns:**
694        - requests.Response: The response from the request.
695        """
696        return self.request_util.run_request(
697            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
698            method=DELETE
699        )

Delete a Terra workspace.

Returns:

  • requests.Response: The response from the request.
def update_workspace_attributes(self, attributes: list[dict]) -> requests.models.Response:
701    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
702        """
703        Update the attributes for the workspace.
704
705        **Args:**
706        - attributes (dict): The attributes to update.
707
708        **Returns:**
709        - requests.Response: The response from the request.
710        """
711        return self.request_util.run_request(
712            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
713            method=PATCH,
714            data=json.dumps(attributes),
715            content_type=APPLICATION_JSON
716        )

Update the attributes for the workspace.

Args:

  • attributes (dict): The attributes to update.

Returns:

  • requests.Response: The response from the request.
def leave_workspace( self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False) -> requests.models.Response:
718    def leave_workspace(
719            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
720    ) -> requests.Response:
721        """
722        Leave a workspace. If workspace ID not supplied, will look it up.
723
724        **Args:**
725        - workspace_id (str, optional): The workspace ID. Defaults to None.
726        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
727             Defaults to `False`.
728
729        **Returns:**
730        - requests.Response: The response from the request.
731        """
732        if not workspace_id:
733            workspace_info = self.get_workspace_info().json()
734            workspace_id = workspace_info['workspace']['workspaceId']
735        accepted_return_code = [403] if ignore_direct_access_error else []
736
737        res = self.request_util.run_request(
738            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
739            method=DELETE,
740            accept_return_codes=accepted_return_code
741        )
742        if (res.status_code == 403
743                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
744            logging.info(
745                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
746                f"access to the workspace (they could be an owner on the billing project)"
747            )
748        return res

Leave a workspace. If workspace ID not supplied, will look it up.

Args:

  • workspace_id (str, optional): The workspace ID. Defaults to None.
  • ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def change_workspace_public_setting(self, public: bool) -> requests.models.Response:
750    def change_workspace_public_setting(self, public: bool) -> requests.Response:
751        """
752        Change a workspace's public setting.
753
754        **Args:**
755        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
756         public, `False` otherwise.
757
758        **Returns:**
759        - requests.Response: The response from the request.
760        """
761        body = [
762            {
763                "settingType": "PubliclyReadable",
764                "config": {
765                    "enabled": public
766                }
767            }
768        ]
769        return self.request_util.run_request(
770            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
771            method=PUT,
772            content_type=APPLICATION_JSON,
773            data=json.dumps(body)
774        )

Change a workspace's public setting.

Args:

  • public (bool, optional): Whether the workspace should be public. Set to True to be made public, False otherwise.

Returns:

  • requests.Response: The response from the request.
def check_workspace_public(self, bucket: Optional[str] = None) -> requests.models.Response:
776    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
777        """
778        Check if a workspace is public.
779
780        **Args:**
781        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
782        it up if not provided. Defaults to None.
783
784        **Returns:**
785        - requests.Response: The response from the request.
786        """
787        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
788        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
789        return self.request_util.run_request(
790            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
791            method=GET
792        )

Check if a workspace is public.

Args:

  • bucket (str, optional): The bucket name (provided without the gs:// prefix). Will look it up if not provided. Defaults to None.

Returns:

  • requests.Response: The response from the request.
def delete_entity_table(self, entity_to_delete: str) -> requests.models.Response:
794    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
795        """Delete an entire entity table from a Terra workspace.
796
797        **Args:**
798        - entity_to_delete (str): The name of the entity table to delete.
799
800        **Returns:**
801        - requests.Response: The response from the request.
802        """
803        response = self.request_util.run_request(
804            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
805            method=DELETE
806        )
807        if response.status_code == 204:
808            logging.info(
809                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
810                f"'{self.billing_project}/{self.workspace_name}'"
811            )
812        else:
813            logging.error(
814                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
815                f"table: {response.text}"
816            )
817        return response

Delete an entire entity table from a Terra workspace.

Args:

  • entity_to_delete (str): The name of the entity table to delete.

Returns:

  • requests.Response: The response from the request.
def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
819    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
820        """Save an entity table version in a Terra workspace.
821
822        **Args:**
823        - entity_type (str): The name of the entity table to save a new version for
824        - version_name (str): The name of the new version
825        """
826        # Get the workspace metrics
827        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
828        file_name = f"{entity_type}.json"
829        # Write the workspace metrics to a JSON file
830        with open(file_name, "w") as json_file:
831            json.dump(workspace_metrics, json_file)
832
833        # Create a zip file with the same naming convention that Terra backend uses
834        timestamp_ms = int(time.time() * 1000)
835        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
836        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
837            zipf.write(file_name, arcname=f"json/{file_name}")
838
839        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
840        workspace_info = self.get_workspace_info().json()
841        path_to_upload_to = os.path.join(
842            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
843        )
844        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
845        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
846        try:
847            active_account = gcp_util.get_active_gcloud_account()
848        except Exception as e:
849            active_account = workspace_info["workspace"]["createdBy"]
850            logging.error(
851                f"Encountered the following exception while attempting to get current GCP account: {e}. "
852                f"Will set the owner of the new metadata version as the workspace creator instead."
853            )
854        gcp_util.upload_blob(
855            source_file=zip_file_name,
856            destination_path=path_to_upload_to,
857            custom_metadata={
858                "createdBy": active_account,
859                "entityType": entity_type,
860                "timestamp": timestamp_ms,
861                "description": version_name,
862            }
863        )

Save an entity table version in a Terra workspace.

Args:

  • entity_type (str): The name of the entity table to save a new version for
  • version_name (str): The name of the new version
def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.models.Response:
865    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
866        """
867        Add a user comment to a submission in Terra.
868
869        **Args:**
870        - submission_id (str): The ID of the submission to add a comment to.
871        - user_comment (str): The comment to add to the submission.
872
873        **Returns:**
874        - requests.Response: The response from the request.
875        """
876        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
877        return self.request_util.run_request(
878            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
879            method=PATCH,
880            content_type=APPLICATION_JSON,
881            data=json.dumps({"userComment": user_comment}),
882        )

Add a user comment to a submission in Terra.

Args:

  • submission_id (str): The ID of the submission to add a comment to.
  • user_comment (str): The comment to add to the submission.

Returns:

  • requests.Response: The response from the request.
def initiate_submission( self, method_config_namespace: str, method_config_name: str, entity_type: str, entity_name: str, expression: str, user_comment: Optional[str], use_call_cache: bool = True) -> requests.models.Response:
884    def initiate_submission(
885            self,
886            method_config_namespace: str,
887            method_config_name: str,
888            entity_type: str,
889            entity_name: str,
890            expression: str,
891            user_comment: Optional[str],
892            use_call_cache: bool = True
893    ) -> requests.Response:
894        """
895        Initiate a submission within a Terra workspace.
896
897        Note - the workflow being initiated MUST already be imported into the workspace.
898
899        **Args:**
900        - method_config_namespace (str): The namespace of the method configuration.
901        - method_config_name (str): The name of the method configuration to use for the submission
902        (i.e. the workflow name).
903        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
904        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
905        "sample_set_1").
906        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
907        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
908        be launched PER SAMPLE, the expression should be `this.samples`.
909        - user_comment (str, optional): The user comment to add to the submission.
910        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
911
912        **Returns:**
913        - requests.Response: The response from the request.
914        """
915        payload = {
916            "methodConfigurationNamespace": method_config_namespace,
917            "methodConfigurationName": method_config_name,
918            "entityType": entity_type,
919            "entityName": entity_name,
920            "expression": expression,
921            "useCallCache": use_call_cache,
922            "deleteIntermediateOutputFiles": False,
923            "useReferenceDisks": False,
924            "ignoreEmptyOutputs": False,
925        }
926        if user_comment:
927            payload["userComment"] = user_comment
928
929        return self.request_util.run_request(
930            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
931            method=POST,
932            content_type=APPLICATION_JSON,
933            data=json.dumps(payload),
934        )

Initiate a submission within a Terra workspace.

Note - the workflow being initiated MUST already be imported into the workspace.

Args:

  • method_config_namespace (str): The namespace of the method configuration.
  • method_config_name (str): The name of the method configuration to use for the submission (i.e. the workflow name).
  • entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
  • entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or "sample_set_1").
  • expression (str): The "expression" to use. For example, if the entity_type is sample and the workflow is launching one sample, this can be left as this. If the entity_type is sample_set, but one workflow should be launched PER SAMPLE, the expression should be this.samples.
  • user_comment (str, optional): The user comment to add to the submission.
  • use_call_cache (bool, optional): Whether to use the call caching. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def retry_failed_submission(self, submission_id: str) -> requests.models.Response:
936    def retry_failed_submission(self, submission_id: str) -> requests.Response:
937        """
938        Retry a failed submission in Terra.
939
940        **Args:**
941        - submission_id (str): The ID of the submission to retry.
942
943        **Returns:**
944        - requests.Response: The response from the request.
945        """
946        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
947        payload = {"retryType": "Failed"}
948        logging.info(
949            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
950        )
951        return self.request_util.run_request(
952            uri=url,
953            method=POST,
954            content_type=APPLICATION_JSON,
955            data=json.dumps(payload)
956        )

Retry a failed submission in Terra.

Args:

  • submission_id (str): The ID of the submission to retry.

Returns:

  • requests.Response: The response from the request.
def get_submission_status(self, submission_id: str) -> requests.models.Response:
958    def get_submission_status(self, submission_id: str) -> requests.Response:
959        """
960        Get the status of a submission in Terra.
961
962        **Args:**
963        - submission_id (str): The ID of the submission.
964
965        **Returns:**
966        - requests.Response: The response from the request.
967        """
968        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
969        logging.info(
970            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
971        )
972        return self.request_util.run_request(
973            uri=url,
974            method=GET
975        )

Get the status of a submission in Terra.

Args:

  • submission_id (str): The ID of the submission.

Returns:

  • requests.Response: The response from the request.
def get_workspace_submission_status(self) -> requests.models.Response:
977    def get_workspace_submission_status(self) ->requests.Response:
978        """
979        Get the status of all submissions in a Terra workspace.
980
981        **Returns:**
982        - requests.Response: The response from the request.
983        """
984        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions"
985        logging.info(
986            f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}"
987        )
988        return self.request_util.run_request(
989            uri=url,
990            method=GET
991        )

Get the status of all submissions in a Terra workspace.

Returns:

  • requests.Response: The response from the request.
def get_workspace_submission_stats( self, method_name: Optional[str] = None, retrieve_running_ids: bool = True) -> dict:
 993    def get_workspace_submission_stats(self, method_name: Optional[str] = None, retrieve_running_ids: bool = True) -> dict:
 994        """
 995        Get submission statistics for a Terra workspace, optionally filtered by method name.
 996
 997        **Args:**
 998        - method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
 999        - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running.
1000          Defaults to `True`.
1001
1002        **Returns:**
1003        - dict: A dictionary containing submission statistics, including counts of workflows in various states
1004        """
1005        submissions = [
1006            s
1007            for s in self.get_workspace_submission_status().json()
1008            # If method_name is provided, filter submissions to only those with that method name
1009            if (s["methodConfigurationName"] == method_name if method_name else True)
1010        ]
1011        method_append = f"with method name '{method_name}'" if method_name else ""
1012        logging.info(
1013            f"{len(submissions)} submissions in "
1014            f"{self.billing_project}/{self.workspace_name} {method_append}"
1015        )
1016        workflow_statuses = {
1017            "submitted": 0,
1018            "queued": 0,
1019            "running": 0,
1020            "aborting": 0,
1021            "aborted": 0,
1022            "failed": 0,
1023            "succeeded": 0,
1024            "id_still_running": [] if retrieve_running_ids else "NA"
1025        }
1026        for submission in submissions:
1027            wf_status = submission["workflowStatuses"]
1028            for status, count in wf_status.items():
1029                if status.lower() in workflow_statuses:
1030                    workflow_statuses[status.lower()] += count
1031            # Only look at individual submissions if retrieve running ids set to true
1032            # and only look at submissions that are still running
1033            if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]:
1034                submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json()
1035                for workflow in submission_detailed["workflows"]:
1036                    if workflow["status"] in ["Running", "Submitted", "Queued"]:
1037                        entity_id = workflow["workflowEntity"]["entityName"]
1038                        workflow_statuses['id_still_running'].append(entity_id)  # type: ignore[attr-defined]
1039        running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued']  # type: ignore[operator]
1040        if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count:  # type: ignore[arg-type]
1041            logging.warning(
1042                f"Discrepancy found between total running/pending workflows, {running_count}, "
1043                f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. "  # type: ignore[arg-type]
1044                "Workflows may have completed between API calls."
1045            )
1046        denominator = workflow_statuses['succeeded'] + workflow_statuses['failed']  # type: ignore[operator]
1047        if denominator > 0:
1048            workflow_statuses['success_rate'] = round(
1049                workflow_statuses['succeeded'] / denominator,
1050                2
1051            )
1052        else:
1053            workflow_statuses['success_rate'] = 0.0
1054        return workflow_statuses

Get submission statistics for a Terra workspace, optionally filtered by method name.

Args:

  • method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
  • retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running. Defaults to True.

Returns:

  • dict: A dictionary containing submission statistics, including counts of workflows in various states