ops_utils.terra_util

Utilities for working with Terra.

   1"""Utilities for working with Terra."""
   2import json
   3import logging
   4import re
   5from typing import Any, Optional
   6import requests
   7import time
   8import zipfile
   9import os
  10
  11from . import deprecated
  12from .vars import GCP, APPLICATION_JSON
  13from .gcp_utils import GCPCloudFunctions
  14from .request_util import GET, POST, PATCH, PUT, DELETE, RunRequest
  15
  16# Constants for Terra API links
  17TERRA_DEV_LINK = "https://firecloud-orchestration.dsde-dev.broadinstitute.org/api"
  18"""@private"""
  19TERRA_PROD_LINK = "https://api.firecloud.org/api"
  20"""@private"""
  21LEONARDO_LINK = "https://leonardo.dsde-prod.broadinstitute.org/api"
  22"""@private"""
  23WORKSPACE_LINK = "https://workspace.dsde-prod.broadinstitute.org/api/workspaces/v1"
  24"""@private"""
  25SAM_LINK = "https://sam.dsde-prod.broadinstitute.org/api"
  26"""@private"""
  27RAWLS_LINK = "https://rawls.dsde-prod.broadinstitute.org/api"
  28"""@private"""
  29
  30MEMBER = "member"
  31ADMIN = "admin"
  32
  33
  34class Terra:
  35    """Class for generic Terra utilities."""
  36
  37    def __init__(self, request_util: RunRequest, env: str = "prod"):
  38        """
  39        Initialize the Terra class.
  40
  41        **Args:**
  42        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
  43            request utility class to handle HTTP requests.
  44        """
  45        self.request_util = request_util
  46        """@private"""
  47
  48    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
  49        """
  50        Fetch the list of accessible workspaces.
  51
  52        **Args:**
  53        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
  54
  55        **Returns:**
  56        - requests.Response: The response from the request.
  57        """
  58        fields_str = "fields=" + ",".join(fields) if fields else ""
  59        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
  60        return self.request_util.run_request(
  61            uri=url,
  62            method=GET
  63        )
  64
  65    def get_pet_account_json(self) -> requests.Response:
  66        """
  67        Get the service account JSON.
  68
  69        **Returns:**
  70        - requests.Response: The response from the request.
  71        """
  72        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
  73        return self.request_util.run_request(
  74            uri=url,
  75            method=GET
  76        )
  77
  78
  79class TerraGroups:
  80    """A class to manage Terra groups and their memberships."""
  81
  82    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
  83    """@private"""
  84    CONFLICT_STATUS_CODE = 409
  85    """@private"""
  86
  87    def __init__(self, request_util: RunRequest):
  88        """
  89        Initialize the TerraGroups class.
  90
  91        **Args:**
  92        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
  93         utility class to handle HTTP requests.
  94        """
  95        self.request_util = request_util
  96        """@private"""
  97
  98    def _check_role(self, role: str) -> None:
  99        """
 100        Check if the role is valid.
 101
 102        Args:
 103            role (str): The role to check.
 104
 105        Raises:
 106            ValueError: If the role is not one of the allowed options.
 107        """
 108        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
 109            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
 110
 111    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
 112        """
 113        Remove a user from a group.
 114
 115        **Args:**
 116        - group (str): The name of the group.
 117        - email (str): The email of the user to remove.
 118        - role (str): The role of the user in the group
 119            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
 120
 121        **Returns:**
 122        - requests.Response: The response from the request.
 123        """
 124        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
 125        self._check_role(role)
 126        logging.info(f"Removing {email} from group {group}")
 127        return self.request_util.run_request(
 128            uri=url,
 129            method=DELETE
 130        )
 131
 132    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
 133        """
 134        Create a new group.
 135
 136        **Args:**
 137        - group_name (str): The name of the group to create.
 138        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
 139
 140        **Returns:**
 141        - requests.Response: The response from the request.
 142        """
 143        url = f"{SAM_LINK}/groups/v1/{group_name}"
 144        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 145        response = self.request_util.run_request(
 146            uri=url,
 147            method=POST,
 148            accept_return_codes=accept_return_codes
 149        )
 150        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
 151            logging.info(f"Group {group_name} already exists. Continuing.")
 152            return response
 153        else:
 154            logging.info(f"Created group {group_name}")
 155            return response
 156
 157    def delete_group(self, group_name: str) -> requests.Response:
 158        """
 159        Delete a group.
 160
 161        **Args:**
 162        - group_name (str): The name of the group to delete.
 163
 164        **Returns:**
 165        - requests.Response: The response from the request.
 166        """
 167        url = f"{SAM_LINK}/groups/v1/{group_name}"
 168        logging.info(f"Deleting group {group_name}")
 169        return self.request_util.run_request(
 170            uri=url,
 171            method=DELETE
 172        )
 173
 174    def add_user_to_group(
 175            self, group: str, email: str, role: str, continue_if_exists: bool = False
 176    ) -> requests.Response:
 177        """
 178        Add a user to a group.
 179
 180        **Args:**
 181        - group (str): The name of the group.
 182        - email (str): The email of the user to add.
 183        - role (str): The role of the user in the group
 184            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
 185        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
 186                Defaults to `False`.
 187
 188        **Returns:**
 189        - requests.Response: The response from the request.
 190        """
 191        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
 192        self._check_role(role)
 193        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 194        logging.info(f"Adding {email} to group {group} as {role}")
 195        return self.request_util.run_request(
 196            uri=url,
 197            method=PUT,
 198            accept_return_codes=accept_return_codes
 199        )
 200
 201    def check_group_members(self, group: str, role: str) -> requests.Response:
 202        """
 203        Check the members of a group.
 204
 205        **Args:**
 206        - group (str): The name of the group.
 207        - role (str): The role to check for in the group
 208            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
 209
 210        **Returns:**
 211        - requests.Response: The response from the request.
 212        """
 213        url = f"{SAM_LINK}/groups/v1/{group}/{role}"
 214        self._check_role(role)
 215        logging.info(f"Checking {role}s in group {group}")
 216        return self.request_util.run_request(
 217            uri=url,
 218            method=GET
 219        )
 220
 221
 222class TerraWorkspace:
 223    """Terra workspace class to manage workspaces and their attributes."""
 224
 225    CONFLICT_STATUS_CODE = 409
 226    """@private"""
 227
 228    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
 229        """
 230        Initialize the TerraWorkspace class.
 231
 232        **Args:**
 233        - billing_project (str): The billing project associated with the workspace.
 234        - workspace_name (str): The name of the workspace.
 235        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
 236            request utility class to handle HTTP requests.
 237        """
 238        self.billing_project = billing_project
 239        """@private"""
 240        self.workspace_name = workspace_name
 241        """@private"""
 242        self.workspace_id = None
 243        """@private"""
 244        self.resource_id = None
 245        """@private"""
 246        self.storage_container = None
 247        """@private"""
 248        self.bucket = None
 249        """@private"""
 250        self.wds_url = None
 251        """@private"""
 252        self.account_url: Optional[str] = None
 253        """@private"""
 254        self.request_util = request_util
 255        """@private"""
 256        if env.lower() == "dev":
 257            self.terra_link = TERRA_DEV_LINK
 258            """@private"""
 259        elif env.lower() == "prod":
 260            self.terra_link = TERRA_PROD_LINK
 261            """@private"""
 262        else:
 263            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
 264
 265    def __repr__(self) -> str:
 266        """
 267        Return a string representation of the TerraWorkspace instance.
 268
 269        Returns:
 270            str: The string representation of the TerraWorkspace instance.
 271        """
 272        return f"{self.billing_project}/{self.workspace_name}"
 273
 274    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000, verbose = True) -> Any:
 275        """
 276        Yield all entity metrics from the workspace.
 277
 278        Args:
 279            entity (str): The type of entity to query.
 280            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
 281            verbose (bool): If True, will log the progress of fetching entity metrics. Defaults to True.
 282
 283        Yields:
 284            Any: The JSON response containing entity metrics.
 285        """
 286        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
 287        response = self.request_util.run_request(
 288            uri=url,
 289            method=GET,
 290            content_type=APPLICATION_JSON
 291        )
 292        raw_text = response.text
 293        first_page_json = json.loads(
 294            raw_text,
 295            parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 296        )
 297        yield first_page_json
 298        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
 299        if verbose:
 300            logging.info(
 301                f"Looping through {total_pages} pages of data")
 302
 303        for page in range(2, total_pages + 1):
 304            if verbose:
 305                logging.info(f"Getting page {page} of {total_pages}")
 306            next_page = self.request_util.run_request(
 307                uri=url,
 308                method=GET,
 309                content_type=APPLICATION_JSON,
 310                params={"page": page}
 311            )
 312            raw_text = next_page.text
 313            page_json = json.loads(
 314                raw_text,
 315                parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 316            )
 317            yield page_json
 318
 319    @staticmethod
 320    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
 321        """Check that all headers follow the standards required by TDR.
 322
 323        **Args:**
 324        - table_name (str): The name of the Terra table.
 325        - headers (list[str]): The headers of the Terra table to validate.
 326
 327        **Raises:**
 328        - ValueError if any headers are considered invalid by TDR standards
 329        """
 330        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
 331        tdr_max_header_length = 63
 332
 333        headers_containing_too_many_characters = []
 334        headers_contain_invalid_characters = []
 335
 336        for header in headers:
 337            if len(header) > tdr_max_header_length:
 338                headers_containing_too_many_characters.append(header)
 339            if not re.match(tdr_header_allowed_pattern, header):
 340                headers_contain_invalid_characters.append(header)
 341
 342        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
 343        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
 344        header naming."""
 345        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
 346        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
 347        allowed in TDR is {tdr_max_header_length}.\n"""
 348        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
 349        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
 350        only contain numbers, letters, and underscore characters.\n"""
 351
 352        error_to_report = ""
 353        if headers_containing_too_many_characters:
 354            error_to_report += too_many_characters_error_message
 355        if headers_contain_invalid_characters:
 356            error_to_report += invalid_characters_error_message
 357        if error_to_report:
 358            error_to_report += base_error_message
 359            raise ValueError(error_to_report)
 360
 361    def get_workspace_info(self) -> requests.Response:
 362        """
 363        Get workspace information.
 364
 365        **Returns:**
 366        - requests.Response: The response from the request.
 367        """
 368        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
 369        logging.info(
 370            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
 371        return self.request_util.run_request(uri=url, method=GET)
 372
 373    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]:
 374        """
 375        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
 376
 377        **Args:**
 378        - entity_type (str): The type of entity to get metrics for.
 379        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
 380        - verbose (bool, optional): Whether to log verbose output. Defaults to `True`.
 381
 382        **Returns:**
 383        - list[dict]: A list of dictionaries containing entity metrics.
 384        """
 385        results = []
 386        if verbose:
 387            logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
 388
 389        for page in self._yield_all_entity_metrics(entity=entity_type, verbose=verbose):
 390            results.extend(page["results"])
 391
 392        # If remove_dicts is True, remove dictionaries from the workspace metrics
 393        if remove_dicts:
 394            for row in results:
 395                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
 396        return results
 397
 398    def get_flat_list_of_table_entity(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]:
 399        """
 400        Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add
 401        the entity name to the dictionary with key "{entity_type}_id".
 402
 403        **Args:**
 404        - entity_type (str): The type of entity to get metrics for.
 405        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
 406        - verbose (bool, optional): Whether to log verbose output. Defaults to `True`.
 407
 408        **Returns:**
 409        - list[dict]: A list of dictionaries containing entity metrics.
 410        """
 411        table_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type, remove_dicts=remove_dicts, verbose=verbose)
 412        convert_metrics = []
 413        for row in table_metrics:
 414            converted_row = row['attributes']
 415            converted_row[f"{row['entityType']}_id"] = row['name']
 416            convert_metrics.append(converted_row)
 417        return convert_metrics
 418
 419    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
 420        """
 421        Get specific entity metrics for a given entity type and name.
 422
 423        **Args:**
 424        - entity_type (str): The type of entity to get metrics for.
 425        - entity_name (str): The name of the entity to get metrics for.
 426
 427        **Returns:**
 428        - requests.Response: The response from the request.
 429        """
 430        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"  # noqa: E501
 431        return self.request_util.run_request(uri=url, method=GET)
 432
 433    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
 434        """
 435        Remove dictionaries from the attributes.
 436
 437        Args:
 438            attributes (dict): The attributes to remove dictionaries from.
 439
 440        Returns:
 441            dict: The updated attributes with no dictionaries.
 442        """
 443        for key, value in attributes.items():
 444            attributes[key] = self._remove_dict_from_cell(value)
 445        return attributes
 446
 447    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
 448        """
 449        Remove a dictionary from a cell.
 450
 451        Args:
 452            cell_value (Any): The dictionary to remove.
 453
 454        Returns:
 455            Any: The updated cell with no dictionaries.
 456        """
 457        if isinstance(cell_value, dict):
 458            entity_name = cell_value.get("entityName")
 459            # If the cell value is a dictionary, check if it has an entityName key
 460            if entity_name:
 461                # If the cell value is a dictionary with an entityName key, return the entityName
 462                return entity_name
 463            entity_list = cell_value.get("items")
 464            if entity_list or entity_list == []:
 465                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
 466                return [
 467                    self._remove_dict_from_cell(entity) for entity in entity_list
 468                ]
 469            return cell_value
 470        return cell_value
 471
 472    def get_workspace_bucket(self) -> str:
 473        """
 474        Get the workspace bucket name. Does not include the `gs://` prefix.
 475
 476        **Returns:**
 477        - str: The bucket name.
 478        """
 479        return self.get_workspace_info().json()["workspace"]["bucketName"]
 480
 481    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
 482        """
 483        Get workspace entity information.
 484
 485        **Args:**
 486        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
 487
 488        **Returns:**
 489        - requests.Response: The response from the request.
 490        """
 491        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
 492        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
 493        return self.request_util.run_request(uri=url, method=GET)
 494
 495    def get_workspace_acl(self) -> requests.Response:
 496        """
 497        Get the workspace access control list (ACL).
 498
 499        **Returns:**
 500        - requests.Response: The response from the request.
 501        """
 502        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
 503        return self.request_util.run_request(
 504            uri=url,
 505            method=GET
 506        )
 507
 508    def update_user_acl(
 509            self,
 510            email: str,
 511            access_level: str,
 512            can_share: bool = False,
 513            can_compute: bool = False,
 514            invite_users_not_found: bool = False,
 515    ) -> requests.Response:
 516        """
 517        Update the access control list (ACL) for a user in the workspace.
 518
 519        **Args:**
 520        - email (str): The email of the user.
 521        - access_level (str): The access level to grant to the user.
 522        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
 523        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
 524        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 525                the workspace. Defaults to `False`
 526
 527        **Returns:**
 528        - requests.Response: The response from the request.
 529        """
 530        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 531              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 532        payload = {
 533            "email": email,
 534            "accessLevel": access_level,
 535            "canShare": can_share,
 536            "canCompute": can_compute,
 537        }
 538        logging.info(
 539            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
 540        response = self.request_util.run_request(
 541            uri=url,
 542            method=PATCH,
 543            content_type=APPLICATION_JSON,
 544            data="[" + json.dumps(payload) + "]"
 545        )
 546
 547        if response.json()["usersNotFound"] and not invite_users_not_found:
 548            # Will be a list of one user
 549            user_not_found = response.json()["usersNotFound"][0]
 550            raise Exception(
 551                f'The user {user_not_found["email"]} was not found and access was not updated'
 552            )
 553        return response
 554
 555    @deprecated(
 556        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
 557    )
 558    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
 559        """
 560        Update the metadata for a library dataset.
 561
 562        **Args:**
 563        - library_metadata (dict): The metadata to update.
 564        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
 565
 566        **Returns:**
 567        - requests.Response: The response from the request.
 568        """
 569        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
 570              f"/metadata?validate={str(validate).lower()}"
 571        return self.request_util.run_request(
 572            uri=acl,
 573            method=PUT,
 574            data=json.dumps(library_metadata)
 575        )
 576
 577    def update_multiple_users_acl(
 578            self, acl_list: list[dict], invite_users_not_found: bool = False
 579    ) -> requests.Response:
 580        """
 581        Update the access control list (ACL) for multiple users in the workspace.
 582
 583        **Args:**
 584        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
 585        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 586                the workspace. Defaults to `False`
 587
 588        **Returns:**
 589        - requests.Response: The response from the request.
 590        """
 591        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 592            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 593        logging.info(
 594            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
 595        response = self.request_util.run_request(
 596            uri=url,
 597            method=PATCH,
 598            content_type=APPLICATION_JSON,
 599            data=json.dumps(acl_list)
 600        )
 601
 602        if response.json()["usersNotFound"] and not invite_users_not_found:
 603            # Will be a list of one user
 604            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
 605            raise Exception(
 606                f"The following users were not found and access was not updated: {users_not_found}"
 607            )
 608        return response
 609
 610    def create_workspace(
 611            self,
 612            auth_domain: list[dict] = [],
 613            attributes: dict = {},
 614            continue_if_exists: bool = False,
 615    ) -> requests.Response:
 616        """
 617        Create a new workspace in Terra.
 618
 619        **Args:**
 620        - auth_domain (list[dict], optional): A list of authorization domains. Should look
 621                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
 622        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
 623        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
 624
 625        **Returns:**
 626        - requests.Response: The response from the request.
 627        """
 628        payload = {
 629            "namespace": self.billing_project,
 630            "name": self.workspace_name,
 631            "authorizationDomain": auth_domain,
 632            "attributes": attributes,
 633            "cloudPlatform": GCP
 634        }
 635        # If workspace already exists then continue if exists
 636        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 637        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
 638        response = self.request_util.run_request(
 639            uri=f"{self.terra_link}/workspaces",
 640            method=POST,
 641            content_type=APPLICATION_JSON,
 642            data=json.dumps(payload),
 643            accept_return_codes=accept_return_codes
 644        )
 645        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
 646            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
 647        return response
 648
 649    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
 650        """
 651        Create an ingest dictionary for workspace attributes.
 652
 653        **Args:**
 654        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
 655
 656        **Returns:**
 657        - list[dict]: A list of dictionaries containing the workspace attributes.
 658        """
 659        # If not provided then call API to get it
 660        workspace_attributes = (
 661            workspace_attributes if workspace_attributes
 662            else self.get_workspace_info().json()["workspace"]["attributes"]
 663        )
 664
 665        ingest_dict = []
 666        for key, value in workspace_attributes.items():
 667            # If value is dict just use 'items' as value
 668            if isinstance(value, dict):
 669                value = value.get("items")
 670            # If value is list convert to comma separated string
 671            if isinstance(value, list):
 672                value = ", ".join(value)
 673            ingest_dict.append(
 674                {
 675                    "attribute": key,
 676                    "value": str(value) if value else None
 677                }
 678            )
 679        return ingest_dict
 680
 681    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
 682        """
 683        Upload metadata to the workspace table.
 684
 685        **Args:**
 686        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
 687
 688        **Returns:**
 689        - requests.Response: The response from the request.
 690        """
 691        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
 692        data = {"entities": open(entities_tsv, "rb")}
 693        return self.request_util.upload_file(
 694            uri=endpoint,
 695            data=data
 696        )
 697
 698    def _batch_upsert(self, update_dict: list) -> requests.Response:
 699        """
 700        Run batch upsert on workspace table.
 701
 702        **Args:**
 703        - update_dict (dict): A dictionary to update the workspace table.
 704        **Returns:**
 705        - requests.Response: The response from the request.
 706        """
 707        endpoint = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/batchUpsert"
 708        return self.request_util.run_request(
 709            uri=endpoint,
 710            data=json.dumps(update_dict),
 711            content_type=APPLICATION_JSON,
 712            method=POST
 713        )
 714
 715    def upload_metadata_with_batch_upsert(self, table_data: dict, force: bool = False) -> requests.Response:
 716        """
 717        Upload metadata to one or more workspace entity tables using batch upsert.
 718
 719        Builds the Terra batch upsert payload from a structured input dictionary and calls
 720        `batch_upsert` with the result.
 721
 722        **Args:**
 723        - table_data (dict): A dictionary mapping table names to their data configuration.
 724            Each entry should have the following structure:
 725
 726            ```python
 727            {
 728                "table_name": {
 729                    "table_id_column": "column_that_is_the_entity_id",
 730                    "row_data": [
 731                        {"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...},
 732                        ...
 733                    ]
 734                },
 735                ...
 736            }
 737            ```
 738
 739            - `table_id_column`: The name of the column whose value is used as the entity name
 740              (`name` field in the upsert payload). This column is **not** included as an attribute
 741              operation.
 742            - `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes
 743              an `AddUpdateAttribute` operation.
 744        - force (bool, optional): Whether to force update if id column does not match table name + _id.
 745
 746        **Returns:**
 747        - requests.Response: The response from the request.
 748        """
 749        upsert_payload = []
 750        table_name_failures = []
 751        for table_name, config in table_data.items():
 752            id_column = config["table_id_column"]
 753            if id_column != f"{table_name}_id":
 754                table_name_failures.append(
 755                    f"id column, {id_column}, does not match table {table_name}. This column will be renamed to {table_name}_id."
 756                    "Use force=True to force update."
 757                )
 758            rows = config["row_data"]
 759            for row in rows:
 760                entity_name = row.get(id_column)
 761                if entity_name is None:
 762                    raise Exception(f"Primary key column '{id_column}' is missing from row data - {row}")
 763                operations = [
 764                    {
 765                        "op": "AddUpdateAttribute",
 766                        "attributeName": col,
 767                        "addUpdateAttribute": value,
 768                    }
 769                    for col, value in row.items()
 770                    if col != id_column
 771                ]
 772                upsert_payload.append(
 773                    {
 774                        "name": entity_name,
 775                        "entityType": table_name,
 776                        "operations": operations,
 777                    }
 778                )
 779        if table_name_failures:
 780            for message in table_name_failures:
 781                if force:
 782                    logging.warning(message)
 783                else:
 784                    logging.error(message)
 785            if not force:
 786                raise Exception("One or more tables have id columns that do not match the expected format."
 787                                " See error messages above for details. Use force=True to force update.")
 788        return self._batch_upsert(upsert_payload)
 789
 790    def get_workspace_workflows(self) -> requests.Response:
 791        """
 792        Get the workflows for the workspace.
 793
 794        **Returns:**
 795        - requests.Response: The response from the request.
 796        """
 797        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
 798        return self.request_util.run_request(
 799            uri=uri,
 800            method=GET
 801        )
 802
 803    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
 804        """
 805        Import a workflow into the workspace.
 806
 807        **Args:**
 808        - workflow_dict (dict): The dictionary containing the workflow information.
 809        - continue_if_exists (bool, optional): Whether to continue if the workflow
 810                already exists. Defaults to `False`.
 811
 812        **Returns:**
 813        - requests.Response: The response from the request.
 814        """
 815        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
 816        workflow_json = json.dumps(workflow_dict)
 817        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 818        return self.request_util.run_request(
 819            uri=uri,
 820            method=POST,
 821            data=workflow_json,
 822            content_type=APPLICATION_JSON,
 823            accept_return_codes=accept_return_codes
 824        )
 825
 826    def delete_workspace(self) -> requests.Response:
 827        """
 828        Delete a Terra workspace.
 829
 830        **Returns:**
 831        - requests.Response: The response from the request.
 832        """
 833        return self.request_util.run_request(
 834            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
 835            method=DELETE
 836        )
 837
 838    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
 839        """
 840        Update the attributes for the workspace.
 841
 842        **Args:**
 843        - attributes (dict): The attributes to update.
 844
 845        **Returns:**
 846        - requests.Response: The response from the request.
 847        """
 848        return self.request_util.run_request(
 849            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
 850            method=PATCH,
 851            data=json.dumps(attributes),
 852            content_type=APPLICATION_JSON
 853        )
 854
 855    def leave_workspace(
 856            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
 857    ) -> requests.Response:
 858        """
 859        Leave a workspace. If workspace ID not supplied, will look it up.
 860
 861        **Args:**
 862        - workspace_id (str, optional): The workspace ID. Defaults to None.
 863        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
 864             Defaults to `False`.
 865
 866        **Returns:**
 867        - requests.Response: The response from the request.
 868        """
 869        if not workspace_id:
 870            workspace_info = self.get_workspace_info().json()
 871            workspace_id = workspace_info['workspace']['workspaceId']
 872        accepted_return_code = [403] if ignore_direct_access_error else []
 873
 874        res = self.request_util.run_request(
 875            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
 876            method=DELETE,
 877            accept_return_codes=accepted_return_code
 878        )
 879        if (res.status_code == 403
 880                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
 881            logging.info(
 882                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
 883                f"access to the workspace (they could be an owner on the billing project)"
 884            )
 885        return res
 886
 887    def set_table_column_order(self, column_order: dict) -> requests.Response:
 888        """
 889        Set the column order for one or more entity tables in the workspace.
 890
 891        **Args:**
 892        - column_order (dict): A dictionary mapping table names to their column configuration. Each table entry
 893            should have the following structure:
 894
 895            ```
 896            {
 897                "table_name": {
 898                    "shown": ["col1", "col2", ...],  # Columns to display, in order
 899                    "hidden": ["col3", "col4", ...]  # Columns to hide
 900                },
 901                ...
 902            }
 903            ```
 904
 905        **Returns:**
 906        - requests.Response: The response from the request.
 907        """
 908        logging.info(
 909            f"Setting column order for tables in workspace {self.billing_project}/{self.workspace_name}"
 910        )
 911        return self.update_workspace_attributes(
 912            attributes=[
 913                {
 914                    "op": "AddUpdateAttribute",
 915                    "attributeName": "workspace-column-defaults",
 916                    "addUpdateAttribute": json.dumps(column_order)
 917                }
 918            ]
 919        )
 920
 921    def change_workspace_public_setting(self, public: bool) -> requests.Response:
 922        """
 923        Change a workspace's public setting.
 924
 925        **Args:**
 926        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
 927         public, `False` otherwise.
 928
 929        **Returns:**
 930        - requests.Response: The response from the request.
 931        """
 932        body = [
 933            {
 934                "settingType": "PubliclyReadable",
 935                "config": {
 936                    "enabled": public
 937                }
 938            }
 939        ]
 940        return self.request_util.run_request(
 941            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
 942            method=PUT,
 943            content_type=APPLICATION_JSON,
 944            data=json.dumps(body)
 945        )
 946
 947    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
 948        """
 949        Check if a workspace is public.
 950
 951        **Args:**
 952        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
 953        it up if not provided. Defaults to None.
 954
 955        **Returns:**
 956        - requests.Response: The response from the request.
 957        """
 958        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
 959        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
 960        return self.request_util.run_request(
 961            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
 962            method=GET
 963        )
 964
 965    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
 966        """Delete an entire entity table from a Terra workspace.
 967
 968        **Args:**
 969        - entity_to_delete (str): The name of the entity table to delete.
 970
 971        **Returns:**
 972        - requests.Response: The response from the request.
 973        """
 974        response = self.request_util.run_request(
 975            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",  # noqa: E501
 976            method=DELETE
 977        )
 978        if response.status_code == 204:
 979            logging.info(
 980                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
 981                f"'{self.billing_project}/{self.workspace_name}'"
 982            )
 983        else:
 984            logging.error(
 985                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
 986                f"table: {response.text}"
 987            )
 988        return response
 989
 990    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
 991        """Save an entity table version in a Terra workspace.
 992
 993        **Args:**
 994        - entity_type (str): The name of the entity table to save a new version for
 995        - version_name (str): The name of the new version
 996        """
 997        # Get the workspace metrics
 998        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
 999        file_name = f"{entity_type}.json"
1000        # Write the workspace metrics to a JSON file
1001        with open(file_name, "w") as json_file:
1002            json.dump(workspace_metrics, json_file)
1003
1004        # Create a zip file with the same naming convention that Terra backend uses
1005        timestamp_ms = int(time.time() * 1000)
1006        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
1007        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
1008            zipf.write(file_name, arcname=f"json/{file_name}")
1009
1010        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
1011        workspace_info = self.get_workspace_info().json()
1012        path_to_upload_to = os.path.join(
1013            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
1014        )
1015        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
1016        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
1017        try:
1018            active_account = gcp_util.get_active_gcloud_account()
1019        except Exception as e:
1020            active_account = workspace_info["workspace"]["createdBy"]
1021            logging.error(
1022                f"Encountered the following exception while attempting to get current GCP account: {e}. "
1023                f"Will set the owner of the new metadata version as the workspace creator instead."
1024            )
1025        gcp_util.upload_blob(
1026            source_file=zip_file_name,
1027            destination_path=path_to_upload_to,
1028            custom_metadata={
1029                "createdBy": active_account,
1030                "entityType": entity_type,
1031                "timestamp": timestamp_ms,
1032                "description": version_name,
1033            }
1034        )
1035
1036    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
1037        """
1038        Add a user comment to a submission in Terra.
1039
1040        **Args:**
1041        - submission_id (str): The ID of the submission to add a comment to.
1042        - user_comment (str): The comment to add to the submission.
1043
1044        **Returns:**
1045        - requests.Response: The response from the request.
1046        """
1047        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
1048        return self.request_util.run_request(
1049            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
1050            method=PATCH,
1051            content_type=APPLICATION_JSON,
1052            data=json.dumps({"userComment": user_comment}),
1053        )
1054
1055    def initiate_submission(
1056            self,
1057            method_config_namespace: str,
1058            method_config_name: str,
1059            entity_type: str,
1060            entity_name: str,
1061            expression: str,
1062            user_comment: Optional[str],
1063            use_call_cache: bool = True
1064    ) -> requests.Response:
1065        """
1066        Initiate a submission within a Terra workspace.
1067
1068        Note - the workflow being initiated MUST already be imported into the workspace.
1069
1070        **Args:**
1071        - method_config_namespace (str): The namespace of the method configuration.
1072        - method_config_name (str): The name of the method configuration to use for the submission
1073        (i.e. the workflow name).
1074        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
1075        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
1076        "sample_set_1").
1077        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
1078        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
1079        be launched PER SAMPLE, the expression should be `this.samples`.
1080        - user_comment (str, optional): The user comment to add to the submission.
1081        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
1082
1083        **Returns:**
1084        - requests.Response: The response from the request.
1085        """
1086        payload = {
1087            "methodConfigurationNamespace": method_config_namespace,
1088            "methodConfigurationName": method_config_name,
1089            "entityType": entity_type,
1090            "entityName": entity_name,
1091            "expression": expression,
1092            "useCallCache": use_call_cache,
1093            "deleteIntermediateOutputFiles": False,
1094            "useReferenceDisks": False,
1095            "ignoreEmptyOutputs": False,
1096        }
1097        if user_comment:
1098            payload["userComment"] = user_comment
1099
1100        return self.request_util.run_request(
1101            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
1102            method=POST,
1103            content_type=APPLICATION_JSON,
1104            data=json.dumps(payload),
1105        )
1106
1107    def retry_failed_submission(self, submission_id: str) -> requests.Response:
1108        """
1109        Retry a failed submission in Terra.
1110
1111        **Args:**
1112        - submission_id (str): The ID of the submission to retry.
1113
1114        **Returns:**
1115        - requests.Response: The response from the request.
1116        """
1117        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
1118        payload = {"retryType": "Failed"}
1119        logging.info(
1120            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
1121        )
1122        return self.request_util.run_request(
1123            uri=url,
1124            method=POST,
1125            content_type=APPLICATION_JSON,
1126            data=json.dumps(payload)
1127        )
1128
1129    def get_submission_status(self, submission_id: str) -> requests.Response:
1130        """
1131        Get the status of a submission in Terra.
1132
1133        **Args:**
1134        - submission_id (str): The ID of the submission.
1135
1136        **Returns:**
1137        - requests.Response: The response from the request.
1138        """
1139        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
1140        logging.info(
1141            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"  # noqa: E501
1142        )
1143        return self.request_util.run_request(
1144            uri=url,
1145            method=GET
1146        )
1147
1148    def get_workspace_submission_status(self) -> requests.Response:
1149        """
1150        Get the status of all submissions in a Terra workspace.
1151
1152        **Returns:**
1153        - requests.Response: The response from the request.
1154        """
1155        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions"
1156        logging.info(
1157            f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}"
1158        )
1159        return self.request_util.run_request(
1160            uri=url,
1161            method=GET
1162        )
1163
1164    def get_workflow_status(
1165            self,
1166            submission_id: str,
1167            workflow_id: str,
1168            expand_sub_workflow_metadata: bool = False) -> requests.Response:
1169        """
1170        Get the status of a workflow in a submission in Terra.
1171
1172        **Args:**
1173        - submission_id (str): The ID of the submission.
1174        - workflow_id (str): The ID of the workflow.
1175        - expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata.
1176          Defaults to `False`.
1177
1178        **Returns:**
1179        - requests.Response: The response from the request.
1180        """
1181        expand_metadata = '?expandSubWorkflows=true' if expand_sub_workflow_metadata else ''
1182        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/workflows/{workflow_id}{expand_metadata}"  # noqa: E501
1183        logging.info(
1184            f"Getting status for workflow: '{workflow_id}' in submission: '{submission_id}' "
1185            f"in workspace {self.billing_project}/{self.workspace_name}"
1186        )
1187        return self.request_util.run_request(
1188            uri=url,
1189            method=GET
1190        )
1191
1192    def get_workspace_submission_stats(
1193            self, method_name: Optional[str] = None, retrieve_running_ids: bool = True
1194    ) -> dict:
1195        """
1196        Get submission statistics for a Terra workspace, optionally filtered by method name.
1197
1198        **Args:**
1199        - method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
1200        - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running.
1201          Defaults to `True`.
1202
1203        **Returns:**
1204        - dict: A dictionary containing submission statistics, including counts of workflows in various states
1205        """
1206        submissions = [
1207            s
1208            for s in self.get_workspace_submission_status().json()
1209            # If method_name is provided, filter submissions to only those with that method name
1210            if (s["methodConfigurationName"] == method_name if method_name else True)
1211        ]
1212        method_append = f"with method name '{method_name}'" if method_name else ""
1213        logging.info(
1214            f"{len(submissions)} submissions in "
1215            f"{self.billing_project}/{self.workspace_name} {method_append}"
1216        )
1217        workflow_statuses = {
1218            "submitted": 0,
1219            "queued": 0,
1220            "running": 0,
1221            "aborting": 0,
1222            "aborted": 0,
1223            "failed": 0,
1224            "succeeded": 0,
1225            "id_still_running": [] if retrieve_running_ids else "NA"
1226        }
1227        for submission in submissions:
1228            wf_status = submission["workflowStatuses"]
1229            for status, count in wf_status.items():
1230                if status.lower() in workflow_statuses:
1231                    workflow_statuses[status.lower()] += count
1232            # Only look at individual submissions if retrieve running ids set to true
1233            # and only look at submissions that are still running
1234            if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]:
1235                submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json()
1236                for workflow in submission_detailed["workflows"]:
1237                    if workflow["status"] in ["Running", "Submitted", "Queued"]:
1238                        entity_id = workflow["workflowEntity"]["entityName"]
1239                        workflow_statuses['id_still_running'].append(entity_id)  # type: ignore[attr-defined]
1240        running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued']  # type: ignore[operator]  # noqa: E501
1241        if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count:  # type: ignore[arg-type]  # noqa: E501
1242            logging.warning(
1243                f"Discrepancy found between total running/pending workflows, {running_count}, "
1244                f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. "  # type: ignore[arg-type]  # noqa: E501
1245                "Workflows may have completed between API calls."
1246            )
1247        denominator = workflow_statuses['succeeded'] + workflow_statuses['failed']  # type: ignore[operator]
1248        if denominator > 0:
1249            workflow_statuses['success_rate'] = round(
1250                workflow_statuses['succeeded'] / denominator,
1251                2
1252            )
1253        else:
1254            workflow_statuses['success_rate'] = 0.0
1255        return workflow_statuses
1256
1257    def get_workspace_details(self, terra_google_project_id: str) -> requests.Response:
1258        """
1259        Get details of a Terra workspace using the Google project ID.
1260
1261        **Args:**
1262        - terra_google_project_id (str): The Google project ID of the Terra workspace.
1263
1264        **Returns:**
1265        - requests.Response: The response from the request.
1266        """
1267        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}?userProject={terra_google_project_id}"  # noqa: E501
1268        logging.info(
1269            f"Getting workspace details for workspace '{self.workspace_name}' using Terra Google"
1270            f" project ID: '{terra_google_project_id}'"
1271        )
1272        return self.request_util.run_request(
1273            uri=url,
1274            method=GET
1275        )
MEMBER = 'member'
ADMIN = 'admin'
class Terra:
35class Terra:
36    """Class for generic Terra utilities."""
37
38    def __init__(self, request_util: RunRequest, env: str = "prod"):
39        """
40        Initialize the Terra class.
41
42        **Args:**
43        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
44            request utility class to handle HTTP requests.
45        """
46        self.request_util = request_util
47        """@private"""
48
49    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
50        """
51        Fetch the list of accessible workspaces.
52
53        **Args:**
54        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
55
56        **Returns:**
57        - requests.Response: The response from the request.
58        """
59        fields_str = "fields=" + ",".join(fields) if fields else ""
60        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
61        return self.request_util.run_request(
62            uri=url,
63            method=GET
64        )
65
66    def get_pet_account_json(self) -> requests.Response:
67        """
68        Get the service account JSON.
69
70        **Returns:**
71        - requests.Response: The response from the request.
72        """
73        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
74        return self.request_util.run_request(
75            uri=url,
76            method=GET
77        )

Class for generic Terra utilities.

Terra(request_util: ops_utils.request_util.RunRequest, env: str = 'prod')
38    def __init__(self, request_util: RunRequest, env: str = "prod"):
39        """
40        Initialize the Terra class.
41
42        **Args:**
43        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
44            request utility class to handle HTTP requests.
45        """
46        self.request_util = request_util
47        """@private"""

Initialize the Terra class.

Args:

def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.models.Response:
49    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
50        """
51        Fetch the list of accessible workspaces.
52
53        **Args:**
54        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
55
56        **Returns:**
57        - requests.Response: The response from the request.
58        """
59        fields_str = "fields=" + ",".join(fields) if fields else ""
60        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
61        return self.request_util.run_request(
62            uri=url,
63            method=GET
64        )

Fetch the list of accessible workspaces.

Args:

  • fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.

Returns:

  • requests.Response: The response from the request.
def get_pet_account_json(self) -> requests.models.Response:
66    def get_pet_account_json(self) -> requests.Response:
67        """
68        Get the service account JSON.
69
70        **Returns:**
71        - requests.Response: The response from the request.
72        """
73        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
74        return self.request_util.run_request(
75            uri=url,
76            method=GET
77        )

Get the service account JSON.

Returns:

  • requests.Response: The response from the request.
class TerraGroups:
 80class TerraGroups:
 81    """A class to manage Terra groups and their memberships."""
 82
 83    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
 84    """@private"""
 85    CONFLICT_STATUS_CODE = 409
 86    """@private"""
 87
 88    def __init__(self, request_util: RunRequest):
 89        """
 90        Initialize the TerraGroups class.
 91
 92        **Args:**
 93        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
 94         utility class to handle HTTP requests.
 95        """
 96        self.request_util = request_util
 97        """@private"""
 98
 99    def _check_role(self, role: str) -> None:
100        """
101        Check if the role is valid.
102
103        Args:
104            role (str): The role to check.
105
106        Raises:
107            ValueError: If the role is not one of the allowed options.
108        """
109        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
110            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
111
112    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
113        """
114        Remove a user from a group.
115
116        **Args:**
117        - group (str): The name of the group.
118        - email (str): The email of the user to remove.
119        - role (str): The role of the user in the group
120            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
121
122        **Returns:**
123        - requests.Response: The response from the request.
124        """
125        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
126        self._check_role(role)
127        logging.info(f"Removing {email} from group {group}")
128        return self.request_util.run_request(
129            uri=url,
130            method=DELETE
131        )
132
133    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
134        """
135        Create a new group.
136
137        **Args:**
138        - group_name (str): The name of the group to create.
139        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
140
141        **Returns:**
142        - requests.Response: The response from the request.
143        """
144        url = f"{SAM_LINK}/groups/v1/{group_name}"
145        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
146        response = self.request_util.run_request(
147            uri=url,
148            method=POST,
149            accept_return_codes=accept_return_codes
150        )
151        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
152            logging.info(f"Group {group_name} already exists. Continuing.")
153            return response
154        else:
155            logging.info(f"Created group {group_name}")
156            return response
157
158    def delete_group(self, group_name: str) -> requests.Response:
159        """
160        Delete a group.
161
162        **Args:**
163        - group_name (str): The name of the group to delete.
164
165        **Returns:**
166        - requests.Response: The response from the request.
167        """
168        url = f"{SAM_LINK}/groups/v1/{group_name}"
169        logging.info(f"Deleting group {group_name}")
170        return self.request_util.run_request(
171            uri=url,
172            method=DELETE
173        )
174
175    def add_user_to_group(
176            self, group: str, email: str, role: str, continue_if_exists: bool = False
177    ) -> requests.Response:
178        """
179        Add a user to a group.
180
181        **Args:**
182        - group (str): The name of the group.
183        - email (str): The email of the user to add.
184        - role (str): The role of the user in the group
185            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
186        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
187                Defaults to `False`.
188
189        **Returns:**
190        - requests.Response: The response from the request.
191        """
192        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
193        self._check_role(role)
194        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
195        logging.info(f"Adding {email} to group {group} as {role}")
196        return self.request_util.run_request(
197            uri=url,
198            method=PUT,
199            accept_return_codes=accept_return_codes
200        )
201
202    def check_group_members(self, group: str, role: str) -> requests.Response:
203        """
204        Check the members of a group.
205
206        **Args:**
207        - group (str): The name of the group.
208        - role (str): The role to check for in the group
209            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
210
211        **Returns:**
212        - requests.Response: The response from the request.
213        """
214        url = f"{SAM_LINK}/groups/v1/{group}/{role}"
215        self._check_role(role)
216        logging.info(f"Checking {role}s in group {group}")
217        return self.request_util.run_request(
218            uri=url,
219            method=GET
220        )

A class to manage Terra groups and their memberships.

TerraGroups(request_util: ops_utils.request_util.RunRequest)
88    def __init__(self, request_util: RunRequest):
89        """
90        Initialize the TerraGroups class.
91
92        **Args:**
93        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
94         utility class to handle HTTP requests.
95        """
96        self.request_util = request_util
97        """@private"""

Initialize the TerraGroups class.

Args:

def remove_user_from_group(self, group: str, email: str, role: str) -> requests.models.Response:
112    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
113        """
114        Remove a user from a group.
115
116        **Args:**
117        - group (str): The name of the group.
118        - email (str): The email of the user to remove.
119        - role (str): The role of the user in the group
120            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
121
122        **Returns:**
123        - requests.Response: The response from the request.
124        """
125        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
126        self._check_role(role)
127        logging.info(f"Removing {email} from group {group}")
128        return self.request_util.run_request(
129            uri=url,
130            method=DELETE
131        )

Remove a user from a group.

Args:

Returns:

  • requests.Response: The response from the request.
def create_group( self, group_name: str, continue_if_exists: bool = False) -> requests.models.Response:
133    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
134        """
135        Create a new group.
136
137        **Args:**
138        - group_name (str): The name of the group to create.
139        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
140
141        **Returns:**
142        - requests.Response: The response from the request.
143        """
144        url = f"{SAM_LINK}/groups/v1/{group_name}"
145        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
146        response = self.request_util.run_request(
147            uri=url,
148            method=POST,
149            accept_return_codes=accept_return_codes
150        )
151        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
152            logging.info(f"Group {group_name} already exists. Continuing.")
153            return response
154        else:
155            logging.info(f"Created group {group_name}")
156            return response

Create a new group.

Args:

  • group_name (str): The name of the group to create.
  • continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_group(self, group_name: str) -> requests.models.Response:
158    def delete_group(self, group_name: str) -> requests.Response:
159        """
160        Delete a group.
161
162        **Args:**
163        - group_name (str): The name of the group to delete.
164
165        **Returns:**
166        - requests.Response: The response from the request.
167        """
168        url = f"{SAM_LINK}/groups/v1/{group_name}"
169        logging.info(f"Deleting group {group_name}")
170        return self.request_util.run_request(
171            uri=url,
172            method=DELETE
173        )

Delete a group.

Args:

  • group_name (str): The name of the group to delete.

Returns:

  • requests.Response: The response from the request.
def add_user_to_group( self, group: str, email: str, role: str, continue_if_exists: bool = False) -> requests.models.Response:
175    def add_user_to_group(
176            self, group: str, email: str, role: str, continue_if_exists: bool = False
177    ) -> requests.Response:
178        """
179        Add a user to a group.
180
181        **Args:**
182        - group (str): The name of the group.
183        - email (str): The email of the user to add.
184        - role (str): The role of the user in the group
185            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
186        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
187                Defaults to `False`.
188
189        **Returns:**
190        - requests.Response: The response from the request.
191        """
192        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
193        self._check_role(role)
194        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
195        logging.info(f"Adding {email} to group {group} as {role}")
196        return self.request_util.run_request(
197            uri=url,
198            method=PUT,
199            accept_return_codes=accept_return_codes
200        )

Add a user to a group.

Args:

  • group (str): The name of the group.
  • email (str): The email of the user to add.
  • role (str): The role of the user in the group (must be one of ops_utils.terra_utils.MEMBER or ops_utils.terra_utils.ADMIN).
  • continue_if_exists (bool, optional): Whether to continue if the user is already in the group. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def check_group_members(self, group: str, role: str) -> requests.models.Response:
202    def check_group_members(self, group: str, role: str) -> requests.Response:
203        """
204        Check the members of a group.
205
206        **Args:**
207        - group (str): The name of the group.
208        - role (str): The role to check for in the group
209            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
210
211        **Returns:**
212        - requests.Response: The response from the request.
213        """
214        url = f"{SAM_LINK}/groups/v1/{group}/{role}"
215        self._check_role(role)
216        logging.info(f"Checking {role}s in group {group}")
217        return self.request_util.run_request(
218            uri=url,
219            method=GET
220        )

Check the members of a group.

Args:

Returns:

  • requests.Response: The response from the request.
class TerraWorkspace:
 223class TerraWorkspace:
 224    """Terra workspace class to manage workspaces and their attributes."""
 225
 226    CONFLICT_STATUS_CODE = 409
 227    """@private"""
 228
 229    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
 230        """
 231        Initialize the TerraWorkspace class.
 232
 233        **Args:**
 234        - billing_project (str): The billing project associated with the workspace.
 235        - workspace_name (str): The name of the workspace.
 236        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
 237            request utility class to handle HTTP requests.
 238        """
 239        self.billing_project = billing_project
 240        """@private"""
 241        self.workspace_name = workspace_name
 242        """@private"""
 243        self.workspace_id = None
 244        """@private"""
 245        self.resource_id = None
 246        """@private"""
 247        self.storage_container = None
 248        """@private"""
 249        self.bucket = None
 250        """@private"""
 251        self.wds_url = None
 252        """@private"""
 253        self.account_url: Optional[str] = None
 254        """@private"""
 255        self.request_util = request_util
 256        """@private"""
 257        if env.lower() == "dev":
 258            self.terra_link = TERRA_DEV_LINK
 259            """@private"""
 260        elif env.lower() == "prod":
 261            self.terra_link = TERRA_PROD_LINK
 262            """@private"""
 263        else:
 264            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
 265
 266    def __repr__(self) -> str:
 267        """
 268        Return a string representation of the TerraWorkspace instance.
 269
 270        Returns:
 271            str: The string representation of the TerraWorkspace instance.
 272        """
 273        return f"{self.billing_project}/{self.workspace_name}"
 274
 275    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000, verbose = True) -> Any:
 276        """
 277        Yield all entity metrics from the workspace.
 278
 279        Args:
 280            entity (str): The type of entity to query.
 281            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
 282            verbose (bool): If True, will log the progress of fetching entity metrics. Defaults to True.
 283
 284        Yields:
 285            Any: The JSON response containing entity metrics.
 286        """
 287        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
 288        response = self.request_util.run_request(
 289            uri=url,
 290            method=GET,
 291            content_type=APPLICATION_JSON
 292        )
 293        raw_text = response.text
 294        first_page_json = json.loads(
 295            raw_text,
 296            parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 297        )
 298        yield first_page_json
 299        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
 300        if verbose:
 301            logging.info(
 302                f"Looping through {total_pages} pages of data")
 303
 304        for page in range(2, total_pages + 1):
 305            if verbose:
 306                logging.info(f"Getting page {page} of {total_pages}")
 307            next_page = self.request_util.run_request(
 308                uri=url,
 309                method=GET,
 310                content_type=APPLICATION_JSON,
 311                params={"page": page}
 312            )
 313            raw_text = next_page.text
 314            page_json = json.loads(
 315                raw_text,
 316                parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
 317            )
 318            yield page_json
 319
 320    @staticmethod
 321    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
 322        """Check that all headers follow the standards required by TDR.
 323
 324        **Args:**
 325        - table_name (str): The name of the Terra table.
 326        - headers (list[str]): The headers of the Terra table to validate.
 327
 328        **Raises:**
 329        - ValueError if any headers are considered invalid by TDR standards
 330        """
 331        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
 332        tdr_max_header_length = 63
 333
 334        headers_containing_too_many_characters = []
 335        headers_contain_invalid_characters = []
 336
 337        for header in headers:
 338            if len(header) > tdr_max_header_length:
 339                headers_containing_too_many_characters.append(header)
 340            if not re.match(tdr_header_allowed_pattern, header):
 341                headers_contain_invalid_characters.append(header)
 342
 343        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
 344        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
 345        header naming."""
 346        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
 347        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
 348        allowed in TDR is {tdr_max_header_length}.\n"""
 349        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
 350        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
 351        only contain numbers, letters, and underscore characters.\n"""
 352
 353        error_to_report = ""
 354        if headers_containing_too_many_characters:
 355            error_to_report += too_many_characters_error_message
 356        if headers_contain_invalid_characters:
 357            error_to_report += invalid_characters_error_message
 358        if error_to_report:
 359            error_to_report += base_error_message
 360            raise ValueError(error_to_report)
 361
 362    def get_workspace_info(self) -> requests.Response:
 363        """
 364        Get workspace information.
 365
 366        **Returns:**
 367        - requests.Response: The response from the request.
 368        """
 369        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
 370        logging.info(
 371            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
 372        return self.request_util.run_request(uri=url, method=GET)
 373
 374    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]:
 375        """
 376        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
 377
 378        **Args:**
 379        - entity_type (str): The type of entity to get metrics for.
 380        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
 381        - verbose (bool, optional): Whether to log verbose output. Defaults to `True`.
 382
 383        **Returns:**
 384        - list[dict]: A list of dictionaries containing entity metrics.
 385        """
 386        results = []
 387        if verbose:
 388            logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
 389
 390        for page in self._yield_all_entity_metrics(entity=entity_type, verbose=verbose):
 391            results.extend(page["results"])
 392
 393        # If remove_dicts is True, remove dictionaries from the workspace metrics
 394        if remove_dicts:
 395            for row in results:
 396                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
 397        return results
 398
 399    def get_flat_list_of_table_entity(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]:
 400        """
 401        Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add
 402        the entity name to the dictionary with key "{entity_type}_id".
 403
 404        **Args:**
 405        - entity_type (str): The type of entity to get metrics for.
 406        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
 407        - verbose (bool, optional): Whether to log verbose output. Defaults to `True`.
 408
 409        **Returns:**
 410        - list[dict]: A list of dictionaries containing entity metrics.
 411        """
 412        table_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type, remove_dicts=remove_dicts, verbose=verbose)
 413        convert_metrics = []
 414        for row in table_metrics:
 415            converted_row = row['attributes']
 416            converted_row[f"{row['entityType']}_id"] = row['name']
 417            convert_metrics.append(converted_row)
 418        return convert_metrics
 419
 420    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
 421        """
 422        Get specific entity metrics for a given entity type and name.
 423
 424        **Args:**
 425        - entity_type (str): The type of entity to get metrics for.
 426        - entity_name (str): The name of the entity to get metrics for.
 427
 428        **Returns:**
 429        - requests.Response: The response from the request.
 430        """
 431        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"  # noqa: E501
 432        return self.request_util.run_request(uri=url, method=GET)
 433
 434    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
 435        """
 436        Remove dictionaries from the attributes.
 437
 438        Args:
 439            attributes (dict): The attributes to remove dictionaries from.
 440
 441        Returns:
 442            dict: The updated attributes with no dictionaries.
 443        """
 444        for key, value in attributes.items():
 445            attributes[key] = self._remove_dict_from_cell(value)
 446        return attributes
 447
 448    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
 449        """
 450        Remove a dictionary from a cell.
 451
 452        Args:
 453            cell_value (Any): The dictionary to remove.
 454
 455        Returns:
 456            Any: The updated cell with no dictionaries.
 457        """
 458        if isinstance(cell_value, dict):
 459            entity_name = cell_value.get("entityName")
 460            # If the cell value is a dictionary, check if it has an entityName key
 461            if entity_name:
 462                # If the cell value is a dictionary with an entityName key, return the entityName
 463                return entity_name
 464            entity_list = cell_value.get("items")
 465            if entity_list or entity_list == []:
 466                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
 467                return [
 468                    self._remove_dict_from_cell(entity) for entity in entity_list
 469                ]
 470            return cell_value
 471        return cell_value
 472
 473    def get_workspace_bucket(self) -> str:
 474        """
 475        Get the workspace bucket name. Does not include the `gs://` prefix.
 476
 477        **Returns:**
 478        - str: The bucket name.
 479        """
 480        return self.get_workspace_info().json()["workspace"]["bucketName"]
 481
 482    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
 483        """
 484        Get workspace entity information.
 485
 486        **Args:**
 487        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
 488
 489        **Returns:**
 490        - requests.Response: The response from the request.
 491        """
 492        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
 493        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
 494        return self.request_util.run_request(uri=url, method=GET)
 495
 496    def get_workspace_acl(self) -> requests.Response:
 497        """
 498        Get the workspace access control list (ACL).
 499
 500        **Returns:**
 501        - requests.Response: The response from the request.
 502        """
 503        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
 504        return self.request_util.run_request(
 505            uri=url,
 506            method=GET
 507        )
 508
 509    def update_user_acl(
 510            self,
 511            email: str,
 512            access_level: str,
 513            can_share: bool = False,
 514            can_compute: bool = False,
 515            invite_users_not_found: bool = False,
 516    ) -> requests.Response:
 517        """
 518        Update the access control list (ACL) for a user in the workspace.
 519
 520        **Args:**
 521        - email (str): The email of the user.
 522        - access_level (str): The access level to grant to the user.
 523        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
 524        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
 525        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 526                the workspace. Defaults to `False`
 527
 528        **Returns:**
 529        - requests.Response: The response from the request.
 530        """
 531        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 532              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 533        payload = {
 534            "email": email,
 535            "accessLevel": access_level,
 536            "canShare": can_share,
 537            "canCompute": can_compute,
 538        }
 539        logging.info(
 540            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
 541        response = self.request_util.run_request(
 542            uri=url,
 543            method=PATCH,
 544            content_type=APPLICATION_JSON,
 545            data="[" + json.dumps(payload) + "]"
 546        )
 547
 548        if response.json()["usersNotFound"] and not invite_users_not_found:
 549            # Will be a list of one user
 550            user_not_found = response.json()["usersNotFound"][0]
 551            raise Exception(
 552                f'The user {user_not_found["email"]} was not found and access was not updated'
 553            )
 554        return response
 555
 556    @deprecated(
 557        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
 558    )
 559    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
 560        """
 561        Update the metadata for a library dataset.
 562
 563        **Args:**
 564        - library_metadata (dict): The metadata to update.
 565        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
 566
 567        **Returns:**
 568        - requests.Response: The response from the request.
 569        """
 570        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
 571              f"/metadata?validate={str(validate).lower()}"
 572        return self.request_util.run_request(
 573            uri=acl,
 574            method=PUT,
 575            data=json.dumps(library_metadata)
 576        )
 577
 578    def update_multiple_users_acl(
 579            self, acl_list: list[dict], invite_users_not_found: bool = False
 580    ) -> requests.Response:
 581        """
 582        Update the access control list (ACL) for multiple users in the workspace.
 583
 584        **Args:**
 585        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
 586        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
 587                the workspace. Defaults to `False`
 588
 589        **Returns:**
 590        - requests.Response: The response from the request.
 591        """
 592        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
 593            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
 594        logging.info(
 595            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
 596        response = self.request_util.run_request(
 597            uri=url,
 598            method=PATCH,
 599            content_type=APPLICATION_JSON,
 600            data=json.dumps(acl_list)
 601        )
 602
 603        if response.json()["usersNotFound"] and not invite_users_not_found:
 604            # Will be a list of one user
 605            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
 606            raise Exception(
 607                f"The following users were not found and access was not updated: {users_not_found}"
 608            )
 609        return response
 610
 611    def create_workspace(
 612            self,
 613            auth_domain: list[dict] = [],
 614            attributes: dict = {},
 615            continue_if_exists: bool = False,
 616    ) -> requests.Response:
 617        """
 618        Create a new workspace in Terra.
 619
 620        **Args:**
 621        - auth_domain (list[dict], optional): A list of authorization domains. Should look
 622                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
 623        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
 624        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
 625
 626        **Returns:**
 627        - requests.Response: The response from the request.
 628        """
 629        payload = {
 630            "namespace": self.billing_project,
 631            "name": self.workspace_name,
 632            "authorizationDomain": auth_domain,
 633            "attributes": attributes,
 634            "cloudPlatform": GCP
 635        }
 636        # If workspace already exists then continue if exists
 637        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 638        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
 639        response = self.request_util.run_request(
 640            uri=f"{self.terra_link}/workspaces",
 641            method=POST,
 642            content_type=APPLICATION_JSON,
 643            data=json.dumps(payload),
 644            accept_return_codes=accept_return_codes
 645        )
 646        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
 647            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
 648        return response
 649
 650    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
 651        """
 652        Create an ingest dictionary for workspace attributes.
 653
 654        **Args:**
 655        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
 656
 657        **Returns:**
 658        - list[dict]: A list of dictionaries containing the workspace attributes.
 659        """
 660        # If not provided then call API to get it
 661        workspace_attributes = (
 662            workspace_attributes if workspace_attributes
 663            else self.get_workspace_info().json()["workspace"]["attributes"]
 664        )
 665
 666        ingest_dict = []
 667        for key, value in workspace_attributes.items():
 668            # If value is dict just use 'items' as value
 669            if isinstance(value, dict):
 670                value = value.get("items")
 671            # If value is list convert to comma separated string
 672            if isinstance(value, list):
 673                value = ", ".join(value)
 674            ingest_dict.append(
 675                {
 676                    "attribute": key,
 677                    "value": str(value) if value else None
 678                }
 679            )
 680        return ingest_dict
 681
 682    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
 683        """
 684        Upload metadata to the workspace table.
 685
 686        **Args:**
 687        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
 688
 689        **Returns:**
 690        - requests.Response: The response from the request.
 691        """
 692        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
 693        data = {"entities": open(entities_tsv, "rb")}
 694        return self.request_util.upload_file(
 695            uri=endpoint,
 696            data=data
 697        )
 698
 699    def _batch_upsert(self, update_dict: list) -> requests.Response:
 700        """
 701        Run batch upsert on workspace table.
 702
 703        **Args:**
 704        - update_dict (dict): A dictionary to update the workspace table.
 705        **Returns:**
 706        - requests.Response: The response from the request.
 707        """
 708        endpoint = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/batchUpsert"
 709        return self.request_util.run_request(
 710            uri=endpoint,
 711            data=json.dumps(update_dict),
 712            content_type=APPLICATION_JSON,
 713            method=POST
 714        )
 715
 716    def upload_metadata_with_batch_upsert(self, table_data: dict, force: bool = False) -> requests.Response:
 717        """
 718        Upload metadata to one or more workspace entity tables using batch upsert.
 719
 720        Builds the Terra batch upsert payload from a structured input dictionary and calls
 721        `batch_upsert` with the result.
 722
 723        **Args:**
 724        - table_data (dict): A dictionary mapping table names to their data configuration.
 725            Each entry should have the following structure:
 726
 727            ```python
 728            {
 729                "table_name": {
 730                    "table_id_column": "column_that_is_the_entity_id",
 731                    "row_data": [
 732                        {"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...},
 733                        ...
 734                    ]
 735                },
 736                ...
 737            }
 738            ```
 739
 740            - `table_id_column`: The name of the column whose value is used as the entity name
 741              (`name` field in the upsert payload). This column is **not** included as an attribute
 742              operation.
 743            - `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes
 744              an `AddUpdateAttribute` operation.
 745        - force (bool, optional): Whether to force update if id column does not match table name + _id.
 746
 747        **Returns:**
 748        - requests.Response: The response from the request.
 749        """
 750        upsert_payload = []
 751        table_name_failures = []
 752        for table_name, config in table_data.items():
 753            id_column = config["table_id_column"]
 754            if id_column != f"{table_name}_id":
 755                table_name_failures.append(
 756                    f"id column, {id_column}, does not match table {table_name}. This column will be renamed to {table_name}_id."
 757                    "Use force=True to force update."
 758                )
 759            rows = config["row_data"]
 760            for row in rows:
 761                entity_name = row.get(id_column)
 762                if entity_name is None:
 763                    raise Exception(f"Primary key column '{id_column}' is missing from row data - {row}")
 764                operations = [
 765                    {
 766                        "op": "AddUpdateAttribute",
 767                        "attributeName": col,
 768                        "addUpdateAttribute": value,
 769                    }
 770                    for col, value in row.items()
 771                    if col != id_column
 772                ]
 773                upsert_payload.append(
 774                    {
 775                        "name": entity_name,
 776                        "entityType": table_name,
 777                        "operations": operations,
 778                    }
 779                )
 780        if table_name_failures:
 781            for message in table_name_failures:
 782                if force:
 783                    logging.warning(message)
 784                else:
 785                    logging.error(message)
 786            if not force:
 787                raise Exception("One or more tables have id columns that do not match the expected format."
 788                                " See error messages above for details. Use force=True to force update.")
 789        return self._batch_upsert(upsert_payload)
 790
 791    def get_workspace_workflows(self) -> requests.Response:
 792        """
 793        Get the workflows for the workspace.
 794
 795        **Returns:**
 796        - requests.Response: The response from the request.
 797        """
 798        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
 799        return self.request_util.run_request(
 800            uri=uri,
 801            method=GET
 802        )
 803
 804    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
 805        """
 806        Import a workflow into the workspace.
 807
 808        **Args:**
 809        - workflow_dict (dict): The dictionary containing the workflow information.
 810        - continue_if_exists (bool, optional): Whether to continue if the workflow
 811                already exists. Defaults to `False`.
 812
 813        **Returns:**
 814        - requests.Response: The response from the request.
 815        """
 816        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
 817        workflow_json = json.dumps(workflow_dict)
 818        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
 819        return self.request_util.run_request(
 820            uri=uri,
 821            method=POST,
 822            data=workflow_json,
 823            content_type=APPLICATION_JSON,
 824            accept_return_codes=accept_return_codes
 825        )
 826
 827    def delete_workspace(self) -> requests.Response:
 828        """
 829        Delete a Terra workspace.
 830
 831        **Returns:**
 832        - requests.Response: The response from the request.
 833        """
 834        return self.request_util.run_request(
 835            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
 836            method=DELETE
 837        )
 838
 839    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
 840        """
 841        Update the attributes for the workspace.
 842
 843        **Args:**
 844        - attributes (dict): The attributes to update.
 845
 846        **Returns:**
 847        - requests.Response: The response from the request.
 848        """
 849        return self.request_util.run_request(
 850            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
 851            method=PATCH,
 852            data=json.dumps(attributes),
 853            content_type=APPLICATION_JSON
 854        )
 855
 856    def leave_workspace(
 857            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
 858    ) -> requests.Response:
 859        """
 860        Leave a workspace. If workspace ID not supplied, will look it up.
 861
 862        **Args:**
 863        - workspace_id (str, optional): The workspace ID. Defaults to None.
 864        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
 865             Defaults to `False`.
 866
 867        **Returns:**
 868        - requests.Response: The response from the request.
 869        """
 870        if not workspace_id:
 871            workspace_info = self.get_workspace_info().json()
 872            workspace_id = workspace_info['workspace']['workspaceId']
 873        accepted_return_code = [403] if ignore_direct_access_error else []
 874
 875        res = self.request_util.run_request(
 876            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
 877            method=DELETE,
 878            accept_return_codes=accepted_return_code
 879        )
 880        if (res.status_code == 403
 881                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
 882            logging.info(
 883                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
 884                f"access to the workspace (they could be an owner on the billing project)"
 885            )
 886        return res
 887
 888    def set_table_column_order(self, column_order: dict) -> requests.Response:
 889        """
 890        Set the column order for one or more entity tables in the workspace.
 891
 892        **Args:**
 893        - column_order (dict): A dictionary mapping table names to their column configuration. Each table entry
 894            should have the following structure:
 895
 896            ```
 897            {
 898                "table_name": {
 899                    "shown": ["col1", "col2", ...],  # Columns to display, in order
 900                    "hidden": ["col3", "col4", ...]  # Columns to hide
 901                },
 902                ...
 903            }
 904            ```
 905
 906        **Returns:**
 907        - requests.Response: The response from the request.
 908        """
 909        logging.info(
 910            f"Setting column order for tables in workspace {self.billing_project}/{self.workspace_name}"
 911        )
 912        return self.update_workspace_attributes(
 913            attributes=[
 914                {
 915                    "op": "AddUpdateAttribute",
 916                    "attributeName": "workspace-column-defaults",
 917                    "addUpdateAttribute": json.dumps(column_order)
 918                }
 919            ]
 920        )
 921
 922    def change_workspace_public_setting(self, public: bool) -> requests.Response:
 923        """
 924        Change a workspace's public setting.
 925
 926        **Args:**
 927        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
 928         public, `False` otherwise.
 929
 930        **Returns:**
 931        - requests.Response: The response from the request.
 932        """
 933        body = [
 934            {
 935                "settingType": "PubliclyReadable",
 936                "config": {
 937                    "enabled": public
 938                }
 939            }
 940        ]
 941        return self.request_util.run_request(
 942            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
 943            method=PUT,
 944            content_type=APPLICATION_JSON,
 945            data=json.dumps(body)
 946        )
 947
 948    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
 949        """
 950        Check if a workspace is public.
 951
 952        **Args:**
 953        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
 954        it up if not provided. Defaults to None.
 955
 956        **Returns:**
 957        - requests.Response: The response from the request.
 958        """
 959        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
 960        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
 961        return self.request_util.run_request(
 962            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
 963            method=GET
 964        )
 965
 966    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
 967        """Delete an entire entity table from a Terra workspace.
 968
 969        **Args:**
 970        - entity_to_delete (str): The name of the entity table to delete.
 971
 972        **Returns:**
 973        - requests.Response: The response from the request.
 974        """
 975        response = self.request_util.run_request(
 976            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",  # noqa: E501
 977            method=DELETE
 978        )
 979        if response.status_code == 204:
 980            logging.info(
 981                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
 982                f"'{self.billing_project}/{self.workspace_name}'"
 983            )
 984        else:
 985            logging.error(
 986                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
 987                f"table: {response.text}"
 988            )
 989        return response
 990
 991    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
 992        """Save an entity table version in a Terra workspace.
 993
 994        **Args:**
 995        - entity_type (str): The name of the entity table to save a new version for
 996        - version_name (str): The name of the new version
 997        """
 998        # Get the workspace metrics
 999        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
1000        file_name = f"{entity_type}.json"
1001        # Write the workspace metrics to a JSON file
1002        with open(file_name, "w") as json_file:
1003            json.dump(workspace_metrics, json_file)
1004
1005        # Create a zip file with the same naming convention that Terra backend uses
1006        timestamp_ms = int(time.time() * 1000)
1007        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
1008        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
1009            zipf.write(file_name, arcname=f"json/{file_name}")
1010
1011        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
1012        workspace_info = self.get_workspace_info().json()
1013        path_to_upload_to = os.path.join(
1014            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
1015        )
1016        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
1017        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
1018        try:
1019            active_account = gcp_util.get_active_gcloud_account()
1020        except Exception as e:
1021            active_account = workspace_info["workspace"]["createdBy"]
1022            logging.error(
1023                f"Encountered the following exception while attempting to get current GCP account: {e}. "
1024                f"Will set the owner of the new metadata version as the workspace creator instead."
1025            )
1026        gcp_util.upload_blob(
1027            source_file=zip_file_name,
1028            destination_path=path_to_upload_to,
1029            custom_metadata={
1030                "createdBy": active_account,
1031                "entityType": entity_type,
1032                "timestamp": timestamp_ms,
1033                "description": version_name,
1034            }
1035        )
1036
1037    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
1038        """
1039        Add a user comment to a submission in Terra.
1040
1041        **Args:**
1042        - submission_id (str): The ID of the submission to add a comment to.
1043        - user_comment (str): The comment to add to the submission.
1044
1045        **Returns:**
1046        - requests.Response: The response from the request.
1047        """
1048        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
1049        return self.request_util.run_request(
1050            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
1051            method=PATCH,
1052            content_type=APPLICATION_JSON,
1053            data=json.dumps({"userComment": user_comment}),
1054        )
1055
1056    def initiate_submission(
1057            self,
1058            method_config_namespace: str,
1059            method_config_name: str,
1060            entity_type: str,
1061            entity_name: str,
1062            expression: str,
1063            user_comment: Optional[str],
1064            use_call_cache: bool = True
1065    ) -> requests.Response:
1066        """
1067        Initiate a submission within a Terra workspace.
1068
1069        Note - the workflow being initiated MUST already be imported into the workspace.
1070
1071        **Args:**
1072        - method_config_namespace (str): The namespace of the method configuration.
1073        - method_config_name (str): The name of the method configuration to use for the submission
1074        (i.e. the workflow name).
1075        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
1076        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
1077        "sample_set_1").
1078        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
1079        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
1080        be launched PER SAMPLE, the expression should be `this.samples`.
1081        - user_comment (str, optional): The user comment to add to the submission.
1082        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
1083
1084        **Returns:**
1085        - requests.Response: The response from the request.
1086        """
1087        payload = {
1088            "methodConfigurationNamespace": method_config_namespace,
1089            "methodConfigurationName": method_config_name,
1090            "entityType": entity_type,
1091            "entityName": entity_name,
1092            "expression": expression,
1093            "useCallCache": use_call_cache,
1094            "deleteIntermediateOutputFiles": False,
1095            "useReferenceDisks": False,
1096            "ignoreEmptyOutputs": False,
1097        }
1098        if user_comment:
1099            payload["userComment"] = user_comment
1100
1101        return self.request_util.run_request(
1102            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
1103            method=POST,
1104            content_type=APPLICATION_JSON,
1105            data=json.dumps(payload),
1106        )
1107
1108    def retry_failed_submission(self, submission_id: str) -> requests.Response:
1109        """
1110        Retry a failed submission in Terra.
1111
1112        **Args:**
1113        - submission_id (str): The ID of the submission to retry.
1114
1115        **Returns:**
1116        - requests.Response: The response from the request.
1117        """
1118        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
1119        payload = {"retryType": "Failed"}
1120        logging.info(
1121            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
1122        )
1123        return self.request_util.run_request(
1124            uri=url,
1125            method=POST,
1126            content_type=APPLICATION_JSON,
1127            data=json.dumps(payload)
1128        )
1129
1130    def get_submission_status(self, submission_id: str) -> requests.Response:
1131        """
1132        Get the status of a submission in Terra.
1133
1134        **Args:**
1135        - submission_id (str): The ID of the submission.
1136
1137        **Returns:**
1138        - requests.Response: The response from the request.
1139        """
1140        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
1141        logging.info(
1142            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"  # noqa: E501
1143        )
1144        return self.request_util.run_request(
1145            uri=url,
1146            method=GET
1147        )
1148
1149    def get_workspace_submission_status(self) -> requests.Response:
1150        """
1151        Get the status of all submissions in a Terra workspace.
1152
1153        **Returns:**
1154        - requests.Response: The response from the request.
1155        """
1156        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions"
1157        logging.info(
1158            f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}"
1159        )
1160        return self.request_util.run_request(
1161            uri=url,
1162            method=GET
1163        )
1164
1165    def get_workflow_status(
1166            self,
1167            submission_id: str,
1168            workflow_id: str,
1169            expand_sub_workflow_metadata: bool = False) -> requests.Response:
1170        """
1171        Get the status of a workflow in a submission in Terra.
1172
1173        **Args:**
1174        - submission_id (str): The ID of the submission.
1175        - workflow_id (str): The ID of the workflow.
1176        - expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata.
1177          Defaults to `False`.
1178
1179        **Returns:**
1180        - requests.Response: The response from the request.
1181        """
1182        expand_metadata = '?expandSubWorkflows=true' if expand_sub_workflow_metadata else ''
1183        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/workflows/{workflow_id}{expand_metadata}"  # noqa: E501
1184        logging.info(
1185            f"Getting status for workflow: '{workflow_id}' in submission: '{submission_id}' "
1186            f"in workspace {self.billing_project}/{self.workspace_name}"
1187        )
1188        return self.request_util.run_request(
1189            uri=url,
1190            method=GET
1191        )
1192
1193    def get_workspace_submission_stats(
1194            self, method_name: Optional[str] = None, retrieve_running_ids: bool = True
1195    ) -> dict:
1196        """
1197        Get submission statistics for a Terra workspace, optionally filtered by method name.
1198
1199        **Args:**
1200        - method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
1201        - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running.
1202          Defaults to `True`.
1203
1204        **Returns:**
1205        - dict: A dictionary containing submission statistics, including counts of workflows in various states
1206        """
1207        submissions = [
1208            s
1209            for s in self.get_workspace_submission_status().json()
1210            # If method_name is provided, filter submissions to only those with that method name
1211            if (s["methodConfigurationName"] == method_name if method_name else True)
1212        ]
1213        method_append = f"with method name '{method_name}'" if method_name else ""
1214        logging.info(
1215            f"{len(submissions)} submissions in "
1216            f"{self.billing_project}/{self.workspace_name} {method_append}"
1217        )
1218        workflow_statuses = {
1219            "submitted": 0,
1220            "queued": 0,
1221            "running": 0,
1222            "aborting": 0,
1223            "aborted": 0,
1224            "failed": 0,
1225            "succeeded": 0,
1226            "id_still_running": [] if retrieve_running_ids else "NA"
1227        }
1228        for submission in submissions:
1229            wf_status = submission["workflowStatuses"]
1230            for status, count in wf_status.items():
1231                if status.lower() in workflow_statuses:
1232                    workflow_statuses[status.lower()] += count
1233            # Only look at individual submissions if retrieve running ids set to true
1234            # and only look at submissions that are still running
1235            if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]:
1236                submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json()
1237                for workflow in submission_detailed["workflows"]:
1238                    if workflow["status"] in ["Running", "Submitted", "Queued"]:
1239                        entity_id = workflow["workflowEntity"]["entityName"]
1240                        workflow_statuses['id_still_running'].append(entity_id)  # type: ignore[attr-defined]
1241        running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued']  # type: ignore[operator]  # noqa: E501
1242        if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count:  # type: ignore[arg-type]  # noqa: E501
1243            logging.warning(
1244                f"Discrepancy found between total running/pending workflows, {running_count}, "
1245                f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. "  # type: ignore[arg-type]  # noqa: E501
1246                "Workflows may have completed between API calls."
1247            )
1248        denominator = workflow_statuses['succeeded'] + workflow_statuses['failed']  # type: ignore[operator]
1249        if denominator > 0:
1250            workflow_statuses['success_rate'] = round(
1251                workflow_statuses['succeeded'] / denominator,
1252                2
1253            )
1254        else:
1255            workflow_statuses['success_rate'] = 0.0
1256        return workflow_statuses
1257
1258    def get_workspace_details(self, terra_google_project_id: str) -> requests.Response:
1259        """
1260        Get details of a Terra workspace using the Google project ID.
1261
1262        **Args:**
1263        - terra_google_project_id (str): The Google project ID of the Terra workspace.
1264
1265        **Returns:**
1266        - requests.Response: The response from the request.
1267        """
1268        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}?userProject={terra_google_project_id}"  # noqa: E501
1269        logging.info(
1270            f"Getting workspace details for workspace '{self.workspace_name}' using Terra Google"
1271            f" project ID: '{terra_google_project_id}'"
1272        )
1273        return self.request_util.run_request(
1274            uri=url,
1275            method=GET
1276        )

Terra workspace class to manage workspaces and their attributes.

TerraWorkspace( billing_project: str, workspace_name: str, request_util: ops_utils.request_util.RunRequest, env: str = 'prod')
229    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
230        """
231        Initialize the TerraWorkspace class.
232
233        **Args:**
234        - billing_project (str): The billing project associated with the workspace.
235        - workspace_name (str): The name of the workspace.
236        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
237            request utility class to handle HTTP requests.
238        """
239        self.billing_project = billing_project
240        """@private"""
241        self.workspace_name = workspace_name
242        """@private"""
243        self.workspace_id = None
244        """@private"""
245        self.resource_id = None
246        """@private"""
247        self.storage_container = None
248        """@private"""
249        self.bucket = None
250        """@private"""
251        self.wds_url = None
252        """@private"""
253        self.account_url: Optional[str] = None
254        """@private"""
255        self.request_util = request_util
256        """@private"""
257        if env.lower() == "dev":
258            self.terra_link = TERRA_DEV_LINK
259            """@private"""
260        elif env.lower() == "prod":
261            self.terra_link = TERRA_PROD_LINK
262            """@private"""
263        else:
264            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")

Initialize the TerraWorkspace class.

Args:

  • billing_project (str): The billing project associated with the workspace.
  • workspace_name (str): The name of the workspace.
  • request_util (ops_utils.request_util.RunRequest): An instance of a request utility class to handle HTTP requests.
@staticmethod
def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
320    @staticmethod
321    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
322        """Check that all headers follow the standards required by TDR.
323
324        **Args:**
325        - table_name (str): The name of the Terra table.
326        - headers (list[str]): The headers of the Terra table to validate.
327
328        **Raises:**
329        - ValueError if any headers are considered invalid by TDR standards
330        """
331        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
332        tdr_max_header_length = 63
333
334        headers_containing_too_many_characters = []
335        headers_contain_invalid_characters = []
336
337        for header in headers:
338            if len(header) > tdr_max_header_length:
339                headers_containing_too_many_characters.append(header)
340            if not re.match(tdr_header_allowed_pattern, header):
341                headers_contain_invalid_characters.append(header)
342
343        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
344        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
345        header naming."""
346        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
347        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
348        allowed in TDR is {tdr_max_header_length}.\n"""
349        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
350        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
351        only contain numbers, letters, and underscore characters.\n"""
352
353        error_to_report = ""
354        if headers_containing_too_many_characters:
355            error_to_report += too_many_characters_error_message
356        if headers_contain_invalid_characters:
357            error_to_report += invalid_characters_error_message
358        if error_to_report:
359            error_to_report += base_error_message
360            raise ValueError(error_to_report)

Check that all headers follow the standards required by TDR.

Args:

  • table_name (str): The name of the Terra table.
  • headers (list[str]): The headers of the Terra table to validate.

Raises:

  • ValueError if any headers are considered invalid by TDR standards
def get_workspace_info(self) -> requests.models.Response:
362    def get_workspace_info(self) -> requests.Response:
363        """
364        Get workspace information.
365
366        **Returns:**
367        - requests.Response: The response from the request.
368        """
369        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
370        logging.info(
371            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
372        return self.request_util.run_request(uri=url, method=GET)

Get workspace information.

Returns:

  • requests.Response: The response from the request.
def get_gcp_workspace_metrics( self, entity_type: str, remove_dicts: bool = False, verbose=True) -> list[dict]:
374    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]:
375        """
376        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
377
378        **Args:**
379        - entity_type (str): The type of entity to get metrics for.
380        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
381        - verbose (bool, optional): Whether to log verbose output. Defaults to `True`.
382
383        **Returns:**
384        - list[dict]: A list of dictionaries containing entity metrics.
385        """
386        results = []
387        if verbose:
388            logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
389
390        for page in self._yield_all_entity_metrics(entity=entity_type, verbose=verbose):
391            results.extend(page["results"])
392
393        # If remove_dicts is True, remove dictionaries from the workspace metrics
394        if remove_dicts:
395            for row in results:
396                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
397        return results

Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).

Args:

  • entity_type (str): The type of entity to get metrics for.
  • remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to False.
  • verbose (bool, optional): Whether to log verbose output. Defaults to True.

Returns:

  • list[dict]: A list of dictionaries containing entity metrics.
def get_flat_list_of_table_entity( self, entity_type: str, remove_dicts: bool = False, verbose=True) -> list[dict]:
399    def get_flat_list_of_table_entity(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]:
400        """
401        Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add
402        the entity name to the dictionary with key "{entity_type}_id".
403
404        **Args:**
405        - entity_type (str): The type of entity to get metrics for.
406        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
407        - verbose (bool, optional): Whether to log verbose output. Defaults to `True`.
408
409        **Returns:**
410        - list[dict]: A list of dictionaries containing entity metrics.
411        """
412        table_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type, remove_dicts=remove_dicts, verbose=verbose)
413        convert_metrics = []
414        for row in table_metrics:
415            converted_row = row['attributes']
416            converted_row[f"{row['entityType']}_id"] = row['name']
417            convert_metrics.append(converted_row)
418        return convert_metrics

Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add the entity name to the dictionary with key "{entity_type}_id".

Args:

  • entity_type (str): The type of entity to get metrics for.
  • remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to False.
  • verbose (bool, optional): Whether to log verbose output. Defaults to True.

Returns:

  • list[dict]: A list of dictionaries containing entity metrics.
def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.models.Response:
420    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
421        """
422        Get specific entity metrics for a given entity type and name.
423
424        **Args:**
425        - entity_type (str): The type of entity to get metrics for.
426        - entity_name (str): The name of the entity to get metrics for.
427
428        **Returns:**
429        - requests.Response: The response from the request.
430        """
431        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"  # noqa: E501
432        return self.request_util.run_request(uri=url, method=GET)

Get specific entity metrics for a given entity type and name.

Args:

  • entity_type (str): The type of entity to get metrics for.
  • entity_name (str): The name of the entity to get metrics for.

Returns:

  • requests.Response: The response from the request.
def get_workspace_bucket(self) -> str:
473    def get_workspace_bucket(self) -> str:
474        """
475        Get the workspace bucket name. Does not include the `gs://` prefix.
476
477        **Returns:**
478        - str: The bucket name.
479        """
480        return self.get_workspace_info().json()["workspace"]["bucketName"]

Get the workspace bucket name. Does not include the gs:// prefix.

Returns:

  • str: The bucket name.
def get_workspace_entity_info(self, use_cache: bool = True) -> requests.models.Response:
482    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
483        """
484        Get workspace entity information.
485
486        **Args:**
487        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
488
489        **Returns:**
490        - requests.Response: The response from the request.
491        """
492        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
493        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
494        return self.request_util.run_request(uri=url, method=GET)

Get workspace entity information.

Args:

  • use_cache (bool, optional): Whether to use cache. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def get_workspace_acl(self) -> requests.models.Response:
496    def get_workspace_acl(self) -> requests.Response:
497        """
498        Get the workspace access control list (ACL).
499
500        **Returns:**
501        - requests.Response: The response from the request.
502        """
503        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
504        return self.request_util.run_request(
505            uri=url,
506            method=GET
507        )

Get the workspace access control list (ACL).

Returns:

  • requests.Response: The response from the request.
def update_user_acl( self, email: str, access_level: str, can_share: bool = False, can_compute: bool = False, invite_users_not_found: bool = False) -> requests.models.Response:
509    def update_user_acl(
510            self,
511            email: str,
512            access_level: str,
513            can_share: bool = False,
514            can_compute: bool = False,
515            invite_users_not_found: bool = False,
516    ) -> requests.Response:
517        """
518        Update the access control list (ACL) for a user in the workspace.
519
520        **Args:**
521        - email (str): The email of the user.
522        - access_level (str): The access level to grant to the user.
523        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
524        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
525        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
526                the workspace. Defaults to `False`
527
528        **Returns:**
529        - requests.Response: The response from the request.
530        """
531        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
532              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
533        payload = {
534            "email": email,
535            "accessLevel": access_level,
536            "canShare": can_share,
537            "canCompute": can_compute,
538        }
539        logging.info(
540            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
541        response = self.request_util.run_request(
542            uri=url,
543            method=PATCH,
544            content_type=APPLICATION_JSON,
545            data="[" + json.dumps(payload) + "]"
546        )
547
548        if response.json()["usersNotFound"] and not invite_users_not_found:
549            # Will be a list of one user
550            user_not_found = response.json()["usersNotFound"][0]
551            raise Exception(
552                f'The user {user_not_found["email"]} was not found and access was not updated'
553            )
554        return response

Update the access control list (ACL) for a user in the workspace.

Args:

  • email (str): The email of the user.
  • access_level (str): The access level to grant to the user.
  • can_share (bool, optional): Whether the user can share the workspace. Defaults to False.
  • can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to False.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
@deprecated('Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.')
def put_metadata_for_library_dataset( self, library_metadata: dict, validate: bool = False) -> requests.models.Response:
556    @deprecated(
557        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
558    )
559    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
560        """
561        Update the metadata for a library dataset.
562
563        **Args:**
564        - library_metadata (dict): The metadata to update.
565        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
566
567        **Returns:**
568        - requests.Response: The response from the request.
569        """
570        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
571              f"/metadata?validate={str(validate).lower()}"
572        return self.request_util.run_request(
573            uri=acl,
574            method=PUT,
575            data=json.dumps(library_metadata)
576        )

Update the metadata for a library dataset.

Args:

  • library_metadata (dict): The metadata to update.
  • validate (bool, optional): Whether to validate the metadata. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def update_multiple_users_acl( self, acl_list: list[dict], invite_users_not_found: bool = False) -> requests.models.Response:
578    def update_multiple_users_acl(
579            self, acl_list: list[dict], invite_users_not_found: bool = False
580    ) -> requests.Response:
581        """
582        Update the access control list (ACL) for multiple users in the workspace.
583
584        **Args:**
585        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
586        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
587                the workspace. Defaults to `False`
588
589        **Returns:**
590        - requests.Response: The response from the request.
591        """
592        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
593            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
594        logging.info(
595            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
596        response = self.request_util.run_request(
597            uri=url,
598            method=PATCH,
599            content_type=APPLICATION_JSON,
600            data=json.dumps(acl_list)
601        )
602
603        if response.json()["usersNotFound"] and not invite_users_not_found:
604            # Will be a list of one user
605            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
606            raise Exception(
607                f"The following users were not found and access was not updated: {users_not_found}"
608            )
609        return response

Update the access control list (ACL) for multiple users in the workspace.

Args:

  • acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
def create_workspace( self, auth_domain: list[dict] = [], attributes: dict = {}, continue_if_exists: bool = False) -> requests.models.Response:
611    def create_workspace(
612            self,
613            auth_domain: list[dict] = [],
614            attributes: dict = {},
615            continue_if_exists: bool = False,
616    ) -> requests.Response:
617        """
618        Create a new workspace in Terra.
619
620        **Args:**
621        - auth_domain (list[dict], optional): A list of authorization domains. Should look
622                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
623        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
624        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
625
626        **Returns:**
627        - requests.Response: The response from the request.
628        """
629        payload = {
630            "namespace": self.billing_project,
631            "name": self.workspace_name,
632            "authorizationDomain": auth_domain,
633            "attributes": attributes,
634            "cloudPlatform": GCP
635        }
636        # If workspace already exists then continue if exists
637        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
638        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
639        response = self.request_util.run_request(
640            uri=f"{self.terra_link}/workspaces",
641            method=POST,
642            content_type=APPLICATION_JSON,
643            data=json.dumps(payload),
644            accept_return_codes=accept_return_codes
645        )
646        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
647            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
648        return response

Create a new workspace in Terra.

Args:

  • auth_domain (list[dict], optional): A list of authorization domains. Should look like [{"membersGroupName": "some_auth_domain"}]. Defaults to an empty list.
  • attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
  • continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
650    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
651        """
652        Create an ingest dictionary for workspace attributes.
653
654        **Args:**
655        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
656
657        **Returns:**
658        - list[dict]: A list of dictionaries containing the workspace attributes.
659        """
660        # If not provided then call API to get it
661        workspace_attributes = (
662            workspace_attributes if workspace_attributes
663            else self.get_workspace_info().json()["workspace"]["attributes"]
664        )
665
666        ingest_dict = []
667        for key, value in workspace_attributes.items():
668            # If value is dict just use 'items' as value
669            if isinstance(value, dict):
670                value = value.get("items")
671            # If value is list convert to comma separated string
672            if isinstance(value, list):
673                value = ", ".join(value)
674            ingest_dict.append(
675                {
676                    "attribute": key,
677                    "value": str(value) if value else None
678                }
679            )
680        return ingest_dict

Create an ingest dictionary for workspace attributes.

Args:

  • workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.

Returns:

  • list[dict]: A list of dictionaries containing the workspace attributes.
def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.models.Response:
682    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
683        """
684        Upload metadata to the workspace table.
685
686        **Args:**
687        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
688
689        **Returns:**
690        - requests.Response: The response from the request.
691        """
692        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
693        data = {"entities": open(entities_tsv, "rb")}
694        return self.request_util.upload_file(
695            uri=endpoint,
696            data=data
697        )

Upload metadata to the workspace table.

Args:

  • entities_tsv (str): The path to the TSV file containing the metadata to upload.

Returns:

  • requests.Response: The response from the request.
def upload_metadata_with_batch_upsert(self, table_data: dict, force: bool = False) -> requests.models.Response:
716    def upload_metadata_with_batch_upsert(self, table_data: dict, force: bool = False) -> requests.Response:
717        """
718        Upload metadata to one or more workspace entity tables using batch upsert.
719
720        Builds the Terra batch upsert payload from a structured input dictionary and calls
721        `batch_upsert` with the result.
722
723        **Args:**
724        - table_data (dict): A dictionary mapping table names to their data configuration.
725            Each entry should have the following structure:
726
727            ```python
728            {
729                "table_name": {
730                    "table_id_column": "column_that_is_the_entity_id",
731                    "row_data": [
732                        {"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...},
733                        ...
734                    ]
735                },
736                ...
737            }
738            ```
739
740            - `table_id_column`: The name of the column whose value is used as the entity name
741              (`name` field in the upsert payload). This column is **not** included as an attribute
742              operation.
743            - `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes
744              an `AddUpdateAttribute` operation.
745        - force (bool, optional): Whether to force update if id column does not match table name + _id.
746
747        **Returns:**
748        - requests.Response: The response from the request.
749        """
750        upsert_payload = []
751        table_name_failures = []
752        for table_name, config in table_data.items():
753            id_column = config["table_id_column"]
754            if id_column != f"{table_name}_id":
755                table_name_failures.append(
756                    f"id column, {id_column}, does not match table {table_name}. This column will be renamed to {table_name}_id."
757                    "Use force=True to force update."
758                )
759            rows = config["row_data"]
760            for row in rows:
761                entity_name = row.get(id_column)
762                if entity_name is None:
763                    raise Exception(f"Primary key column '{id_column}' is missing from row data - {row}")
764                operations = [
765                    {
766                        "op": "AddUpdateAttribute",
767                        "attributeName": col,
768                        "addUpdateAttribute": value,
769                    }
770                    for col, value in row.items()
771                    if col != id_column
772                ]
773                upsert_payload.append(
774                    {
775                        "name": entity_name,
776                        "entityType": table_name,
777                        "operations": operations,
778                    }
779                )
780        if table_name_failures:
781            for message in table_name_failures:
782                if force:
783                    logging.warning(message)
784                else:
785                    logging.error(message)
786            if not force:
787                raise Exception("One or more tables have id columns that do not match the expected format."
788                                " See error messages above for details. Use force=True to force update.")
789        return self._batch_upsert(upsert_payload)

Upload metadata to one or more workspace entity tables using batch upsert.

Builds the Terra batch upsert payload from a structured input dictionary and calls batch_upsert with the result.

Args:

  • table_data (dict): A dictionary mapping table names to their data configuration. Each entry should have the following structure:
{
    "table_name": {
        "table_id_column": "column_that_is_the_entity_id",
        "row_data": [
            {"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...},
            ...
        ]
    },
    ...
}
- `table_id_column`: The name of the column whose value is used as the entity name (`name` field in the upsert payload). This column is **not** included as an attribute operation. - `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes an `AddUpdateAttribute` operation.

  • force (bool, optional): Whether to force update if id column does not match table name + _id.

Returns:

  • requests.Response: The response from the request.
def get_workspace_workflows(self) -> requests.models.Response:
791    def get_workspace_workflows(self) -> requests.Response:
792        """
793        Get the workflows for the workspace.
794
795        **Returns:**
796        - requests.Response: The response from the request.
797        """
798        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
799        return self.request_util.run_request(
800            uri=uri,
801            method=GET
802        )

Get the workflows for the workspace.

Returns:

  • requests.Response: The response from the request.
def import_workflow( self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.models.Response:
804    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
805        """
806        Import a workflow into the workspace.
807
808        **Args:**
809        - workflow_dict (dict): The dictionary containing the workflow information.
810        - continue_if_exists (bool, optional): Whether to continue if the workflow
811                already exists. Defaults to `False`.
812
813        **Returns:**
814        - requests.Response: The response from the request.
815        """
816        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
817        workflow_json = json.dumps(workflow_dict)
818        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
819        return self.request_util.run_request(
820            uri=uri,
821            method=POST,
822            data=workflow_json,
823            content_type=APPLICATION_JSON,
824            accept_return_codes=accept_return_codes
825        )

Import a workflow into the workspace.

Args:

  • workflow_dict (dict): The dictionary containing the workflow information.
  • continue_if_exists (bool, optional): Whether to continue if the workflow already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_workspace(self) -> requests.models.Response:
827    def delete_workspace(self) -> requests.Response:
828        """
829        Delete a Terra workspace.
830
831        **Returns:**
832        - requests.Response: The response from the request.
833        """
834        return self.request_util.run_request(
835            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
836            method=DELETE
837        )

Delete a Terra workspace.

Returns:

  • requests.Response: The response from the request.
def update_workspace_attributes(self, attributes: list[dict]) -> requests.models.Response:
839    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
840        """
841        Update the attributes for the workspace.
842
843        **Args:**
844        - attributes (dict): The attributes to update.
845
846        **Returns:**
847        - requests.Response: The response from the request.
848        """
849        return self.request_util.run_request(
850            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
851            method=PATCH,
852            data=json.dumps(attributes),
853            content_type=APPLICATION_JSON
854        )

Update the attributes for the workspace.

Args:

  • attributes (dict): The attributes to update.

Returns:

  • requests.Response: The response from the request.
def leave_workspace( self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False) -> requests.models.Response:
856    def leave_workspace(
857            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
858    ) -> requests.Response:
859        """
860        Leave a workspace. If workspace ID not supplied, will look it up.
861
862        **Args:**
863        - workspace_id (str, optional): The workspace ID. Defaults to None.
864        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
865             Defaults to `False`.
866
867        **Returns:**
868        - requests.Response: The response from the request.
869        """
870        if not workspace_id:
871            workspace_info = self.get_workspace_info().json()
872            workspace_id = workspace_info['workspace']['workspaceId']
873        accepted_return_code = [403] if ignore_direct_access_error else []
874
875        res = self.request_util.run_request(
876            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
877            method=DELETE,
878            accept_return_codes=accepted_return_code
879        )
880        if (res.status_code == 403
881                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
882            logging.info(
883                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
884                f"access to the workspace (they could be an owner on the billing project)"
885            )
886        return res

Leave a workspace. If workspace ID not supplied, will look it up.

Args:

  • workspace_id (str, optional): The workspace ID. Defaults to None.
  • ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def set_table_column_order(self, column_order: dict) -> requests.models.Response:
888    def set_table_column_order(self, column_order: dict) -> requests.Response:
889        """
890        Set the column order for one or more entity tables in the workspace.
891
892        **Args:**
893        - column_order (dict): A dictionary mapping table names to their column configuration. Each table entry
894            should have the following structure:
895
896            ```
897            {
898                "table_name": {
899                    "shown": ["col1", "col2", ...],  # Columns to display, in order
900                    "hidden": ["col3", "col4", ...]  # Columns to hide
901                },
902                ...
903            }
904            ```
905
906        **Returns:**
907        - requests.Response: The response from the request.
908        """
909        logging.info(
910            f"Setting column order for tables in workspace {self.billing_project}/{self.workspace_name}"
911        )
912        return self.update_workspace_attributes(
913            attributes=[
914                {
915                    "op": "AddUpdateAttribute",
916                    "attributeName": "workspace-column-defaults",
917                    "addUpdateAttribute": json.dumps(column_order)
918                }
919            ]
920        )

Set the column order for one or more entity tables in the workspace.

Args:

  • column_order (dict): A dictionary mapping table names to their column configuration. Each table entry should have the following structure:
{
    "table_name": {
        "shown": ["col1", "col2", ...],  # Columns to display, in order
        "hidden": ["col3", "col4", ...]  # Columns to hide
    },
    ...
}

Returns:

  • requests.Response: The response from the request.
def change_workspace_public_setting(self, public: bool) -> requests.models.Response:
922    def change_workspace_public_setting(self, public: bool) -> requests.Response:
923        """
924        Change a workspace's public setting.
925
926        **Args:**
927        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
928         public, `False` otherwise.
929
930        **Returns:**
931        - requests.Response: The response from the request.
932        """
933        body = [
934            {
935                "settingType": "PubliclyReadable",
936                "config": {
937                    "enabled": public
938                }
939            }
940        ]
941        return self.request_util.run_request(
942            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
943            method=PUT,
944            content_type=APPLICATION_JSON,
945            data=json.dumps(body)
946        )

Change a workspace's public setting.

Args:

  • public (bool, optional): Whether the workspace should be public. Set to True to be made public, False otherwise.

Returns:

  • requests.Response: The response from the request.
def check_workspace_public(self, bucket: Optional[str] = None) -> requests.models.Response:
948    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
949        """
950        Check if a workspace is public.
951
952        **Args:**
953        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
954        it up if not provided. Defaults to None.
955
956        **Returns:**
957        - requests.Response: The response from the request.
958        """
959        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
960        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
961        return self.request_util.run_request(
962            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
963            method=GET
964        )

Check if a workspace is public.

Args:

  • bucket (str, optional): The bucket name (provided without the gs:// prefix). Will look it up if not provided. Defaults to None.

Returns:

  • requests.Response: The response from the request.
def delete_entity_table(self, entity_to_delete: str) -> requests.models.Response:
966    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
967        """Delete an entire entity table from a Terra workspace.
968
969        **Args:**
970        - entity_to_delete (str): The name of the entity table to delete.
971
972        **Returns:**
973        - requests.Response: The response from the request.
974        """
975        response = self.request_util.run_request(
976            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",  # noqa: E501
977            method=DELETE
978        )
979        if response.status_code == 204:
980            logging.info(
981                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
982                f"'{self.billing_project}/{self.workspace_name}'"
983            )
984        else:
985            logging.error(
986                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
987                f"table: {response.text}"
988            )
989        return response

Delete an entire entity table from a Terra workspace.

Args:

  • entity_to_delete (str): The name of the entity table to delete.

Returns:

  • requests.Response: The response from the request.
def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
 991    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
 992        """Save an entity table version in a Terra workspace.
 993
 994        **Args:**
 995        - entity_type (str): The name of the entity table to save a new version for
 996        - version_name (str): The name of the new version
 997        """
 998        # Get the workspace metrics
 999        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
1000        file_name = f"{entity_type}.json"
1001        # Write the workspace metrics to a JSON file
1002        with open(file_name, "w") as json_file:
1003            json.dump(workspace_metrics, json_file)
1004
1005        # Create a zip file with the same naming convention that Terra backend uses
1006        timestamp_ms = int(time.time() * 1000)
1007        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
1008        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
1009            zipf.write(file_name, arcname=f"json/{file_name}")
1010
1011        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
1012        workspace_info = self.get_workspace_info().json()
1013        path_to_upload_to = os.path.join(
1014            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
1015        )
1016        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
1017        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
1018        try:
1019            active_account = gcp_util.get_active_gcloud_account()
1020        except Exception as e:
1021            active_account = workspace_info["workspace"]["createdBy"]
1022            logging.error(
1023                f"Encountered the following exception while attempting to get current GCP account: {e}. "
1024                f"Will set the owner of the new metadata version as the workspace creator instead."
1025            )
1026        gcp_util.upload_blob(
1027            source_file=zip_file_name,
1028            destination_path=path_to_upload_to,
1029            custom_metadata={
1030                "createdBy": active_account,
1031                "entityType": entity_type,
1032                "timestamp": timestamp_ms,
1033                "description": version_name,
1034            }
1035        )

Save an entity table version in a Terra workspace.

Args:

  • entity_type (str): The name of the entity table to save a new version for
  • version_name (str): The name of the new version
def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.models.Response:
1037    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
1038        """
1039        Add a user comment to a submission in Terra.
1040
1041        **Args:**
1042        - submission_id (str): The ID of the submission to add a comment to.
1043        - user_comment (str): The comment to add to the submission.
1044
1045        **Returns:**
1046        - requests.Response: The response from the request.
1047        """
1048        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
1049        return self.request_util.run_request(
1050            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
1051            method=PATCH,
1052            content_type=APPLICATION_JSON,
1053            data=json.dumps({"userComment": user_comment}),
1054        )

Add a user comment to a submission in Terra.

Args:

  • submission_id (str): The ID of the submission to add a comment to.
  • user_comment (str): The comment to add to the submission.

Returns:

  • requests.Response: The response from the request.
def initiate_submission( self, method_config_namespace: str, method_config_name: str, entity_type: str, entity_name: str, expression: str, user_comment: Optional[str], use_call_cache: bool = True) -> requests.models.Response:
1056    def initiate_submission(
1057            self,
1058            method_config_namespace: str,
1059            method_config_name: str,
1060            entity_type: str,
1061            entity_name: str,
1062            expression: str,
1063            user_comment: Optional[str],
1064            use_call_cache: bool = True
1065    ) -> requests.Response:
1066        """
1067        Initiate a submission within a Terra workspace.
1068
1069        Note - the workflow being initiated MUST already be imported into the workspace.
1070
1071        **Args:**
1072        - method_config_namespace (str): The namespace of the method configuration.
1073        - method_config_name (str): The name of the method configuration to use for the submission
1074        (i.e. the workflow name).
1075        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
1076        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
1077        "sample_set_1").
1078        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
1079        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
1080        be launched PER SAMPLE, the expression should be `this.samples`.
1081        - user_comment (str, optional): The user comment to add to the submission.
1082        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
1083
1084        **Returns:**
1085        - requests.Response: The response from the request.
1086        """
1087        payload = {
1088            "methodConfigurationNamespace": method_config_namespace,
1089            "methodConfigurationName": method_config_name,
1090            "entityType": entity_type,
1091            "entityName": entity_name,
1092            "expression": expression,
1093            "useCallCache": use_call_cache,
1094            "deleteIntermediateOutputFiles": False,
1095            "useReferenceDisks": False,
1096            "ignoreEmptyOutputs": False,
1097        }
1098        if user_comment:
1099            payload["userComment"] = user_comment
1100
1101        return self.request_util.run_request(
1102            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
1103            method=POST,
1104            content_type=APPLICATION_JSON,
1105            data=json.dumps(payload),
1106        )

Initiate a submission within a Terra workspace.

Note - the workflow being initiated MUST already be imported into the workspace.

Args:

  • method_config_namespace (str): The namespace of the method configuration.
  • method_config_name (str): The name of the method configuration to use for the submission (i.e. the workflow name).
  • entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
  • entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or "sample_set_1").
  • expression (str): The "expression" to use. For example, if the entity_type is sample and the workflow is launching one sample, this can be left as this. If the entity_type is sample_set, but one workflow should be launched PER SAMPLE, the expression should be this.samples.
  • user_comment (str, optional): The user comment to add to the submission.
  • use_call_cache (bool, optional): Whether to use the call caching. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def retry_failed_submission(self, submission_id: str) -> requests.models.Response:
1108    def retry_failed_submission(self, submission_id: str) -> requests.Response:
1109        """
1110        Retry a failed submission in Terra.
1111
1112        **Args:**
1113        - submission_id (str): The ID of the submission to retry.
1114
1115        **Returns:**
1116        - requests.Response: The response from the request.
1117        """
1118        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
1119        payload = {"retryType": "Failed"}
1120        logging.info(
1121            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
1122        )
1123        return self.request_util.run_request(
1124            uri=url,
1125            method=POST,
1126            content_type=APPLICATION_JSON,
1127            data=json.dumps(payload)
1128        )

Retry a failed submission in Terra.

Args:

  • submission_id (str): The ID of the submission to retry.

Returns:

  • requests.Response: The response from the request.
def get_submission_status(self, submission_id: str) -> requests.models.Response:
1130    def get_submission_status(self, submission_id: str) -> requests.Response:
1131        """
1132        Get the status of a submission in Terra.
1133
1134        **Args:**
1135        - submission_id (str): The ID of the submission.
1136
1137        **Returns:**
1138        - requests.Response: The response from the request.
1139        """
1140        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
1141        logging.info(
1142            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"  # noqa: E501
1143        )
1144        return self.request_util.run_request(
1145            uri=url,
1146            method=GET
1147        )

Get the status of a submission in Terra.

Args:

  • submission_id (str): The ID of the submission.

Returns:

  • requests.Response: The response from the request.
def get_workspace_submission_status(self) -> requests.models.Response:
1149    def get_workspace_submission_status(self) -> requests.Response:
1150        """
1151        Get the status of all submissions in a Terra workspace.
1152
1153        **Returns:**
1154        - requests.Response: The response from the request.
1155        """
1156        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions"
1157        logging.info(
1158            f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}"
1159        )
1160        return self.request_util.run_request(
1161            uri=url,
1162            method=GET
1163        )

Get the status of all submissions in a Terra workspace.

Returns:

  • requests.Response: The response from the request.
def get_workflow_status( self, submission_id: str, workflow_id: str, expand_sub_workflow_metadata: bool = False) -> requests.models.Response:
1165    def get_workflow_status(
1166            self,
1167            submission_id: str,
1168            workflow_id: str,
1169            expand_sub_workflow_metadata: bool = False) -> requests.Response:
1170        """
1171        Get the status of a workflow in a submission in Terra.
1172
1173        **Args:**
1174        - submission_id (str): The ID of the submission.
1175        - workflow_id (str): The ID of the workflow.
1176        - expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata.
1177          Defaults to `False`.
1178
1179        **Returns:**
1180        - requests.Response: The response from the request.
1181        """
1182        expand_metadata = '?expandSubWorkflows=true' if expand_sub_workflow_metadata else ''
1183        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/workflows/{workflow_id}{expand_metadata}"  # noqa: E501
1184        logging.info(
1185            f"Getting status for workflow: '{workflow_id}' in submission: '{submission_id}' "
1186            f"in workspace {self.billing_project}/{self.workspace_name}"
1187        )
1188        return self.request_util.run_request(
1189            uri=url,
1190            method=GET
1191        )

Get the status of a workflow in a submission in Terra.

Args:

  • submission_id (str): The ID of the submission.
  • workflow_id (str): The ID of the workflow.
  • expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def get_workspace_submission_stats( self, method_name: Optional[str] = None, retrieve_running_ids: bool = True) -> dict:
1193    def get_workspace_submission_stats(
1194            self, method_name: Optional[str] = None, retrieve_running_ids: bool = True
1195    ) -> dict:
1196        """
1197        Get submission statistics for a Terra workspace, optionally filtered by method name.
1198
1199        **Args:**
1200        - method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
1201        - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running.
1202          Defaults to `True`.
1203
1204        **Returns:**
1205        - dict: A dictionary containing submission statistics, including counts of workflows in various states
1206        """
1207        submissions = [
1208            s
1209            for s in self.get_workspace_submission_status().json()
1210            # If method_name is provided, filter submissions to only those with that method name
1211            if (s["methodConfigurationName"] == method_name if method_name else True)
1212        ]
1213        method_append = f"with method name '{method_name}'" if method_name else ""
1214        logging.info(
1215            f"{len(submissions)} submissions in "
1216            f"{self.billing_project}/{self.workspace_name} {method_append}"
1217        )
1218        workflow_statuses = {
1219            "submitted": 0,
1220            "queued": 0,
1221            "running": 0,
1222            "aborting": 0,
1223            "aborted": 0,
1224            "failed": 0,
1225            "succeeded": 0,
1226            "id_still_running": [] if retrieve_running_ids else "NA"
1227        }
1228        for submission in submissions:
1229            wf_status = submission["workflowStatuses"]
1230            for status, count in wf_status.items():
1231                if status.lower() in workflow_statuses:
1232                    workflow_statuses[status.lower()] += count
1233            # Only look at individual submissions if retrieve running ids set to true
1234            # and only look at submissions that are still running
1235            if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]:
1236                submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json()
1237                for workflow in submission_detailed["workflows"]:
1238                    if workflow["status"] in ["Running", "Submitted", "Queued"]:
1239                        entity_id = workflow["workflowEntity"]["entityName"]
1240                        workflow_statuses['id_still_running'].append(entity_id)  # type: ignore[attr-defined]
1241        running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued']  # type: ignore[operator]  # noqa: E501
1242        if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count:  # type: ignore[arg-type]  # noqa: E501
1243            logging.warning(
1244                f"Discrepancy found between total running/pending workflows, {running_count}, "
1245                f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. "  # type: ignore[arg-type]  # noqa: E501
1246                "Workflows may have completed between API calls."
1247            )
1248        denominator = workflow_statuses['succeeded'] + workflow_statuses['failed']  # type: ignore[operator]
1249        if denominator > 0:
1250            workflow_statuses['success_rate'] = round(
1251                workflow_statuses['succeeded'] / denominator,
1252                2
1253            )
1254        else:
1255            workflow_statuses['success_rate'] = 0.0
1256        return workflow_statuses

Get submission statistics for a Terra workspace, optionally filtered by method name.

Args:

  • method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
  • retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running. Defaults to True.

Returns:

  • dict: A dictionary containing submission statistics, including counts of workflows in various states
def get_workspace_details(self, terra_google_project_id: str) -> requests.models.Response:
1258    def get_workspace_details(self, terra_google_project_id: str) -> requests.Response:
1259        """
1260        Get details of a Terra workspace using the Google project ID.
1261
1262        **Args:**
1263        - terra_google_project_id (str): The Google project ID of the Terra workspace.
1264
1265        **Returns:**
1266        - requests.Response: The response from the request.
1267        """
1268        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}?userProject={terra_google_project_id}"  # noqa: E501
1269        logging.info(
1270            f"Getting workspace details for workspace '{self.workspace_name}' using Terra Google"
1271            f" project ID: '{terra_google_project_id}'"
1272        )
1273        return self.request_util.run_request(
1274            uri=url,
1275            method=GET
1276        )

Get details of a Terra workspace using the Google project ID.

Args:

  • terra_google_project_id (str): The Google project ID of the Terra workspace.

Returns:

  • requests.Response: The response from the request.