ops_utils.terra_util

Utilities for working with Terra.

  1"""Utilities for working with Terra."""
  2import json
  3import logging
  4import re
  5from typing import Any, Optional
  6import requests
  7import time
  8import zipfile
  9import os
 10
 11from . import deprecated
 12from .vars import GCP, APPLICATION_JSON
 13from .gcp_utils import GCPCloudFunctions
 14from .request_util import GET, POST, PATCH, PUT, DELETE, RunRequest
 15
 16# Constants for Terra API links
 17TERRA_DEV_LINK = "https://firecloud-orchestration.dsde-dev.broadinstitute.org/api"
 18"""@private"""
 19TERRA_PROD_LINK = "https://api.firecloud.org/api"
 20"""@private"""
 21LEONARDO_LINK = "https://leonardo.dsde-prod.broadinstitute.org/api"
 22"""@private"""
 23WORKSPACE_LINK = "https://workspace.dsde-prod.broadinstitute.org/api/workspaces/v1"
 24"""@private"""
 25SAM_LINK = "https://sam.dsde-prod.broadinstitute.org/api"
 26"""@private"""
 27RAWLS_LINK = "https://rawls.dsde-prod.broadinstitute.org/api"
 28"""@private"""
 29
 30MEMBER = "member"
 31ADMIN = "admin"
 32
 33
 34class Terra:
 35    """Class for generic Terra utilities."""
 36
 37    def __init__(self, request_util: RunRequest, env: str = "prod"):
 38        """
 39        Initialize the Terra class.
 40
 41        **Args:**
 42        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
 43            request utility class to handle HTTP requests.
 44        """
 45        self.request_util = request_util
 46        """@private"""
 47
 48    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
 49        """
 50        Fetch the list of accessible workspaces.
 51
 52        **Args:**
 53        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
 54
 55        **Returns:**
 56        - requests.Response: The response from the request.
 57        """
 58        fields_str = "fields=" + ",".join(fields) if fields else ""
 59        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
 60        return self.request_util.run_request(
 61            uri=url,
 62            method=GET
 63        )
 64
 65    def get_pet_account_json(self) -> requests.Response:
 66        """
 67        Get the service account JSON.
 68
 69        **Returns:**
 70        - requests.Response: The response from the request.
 71        """
 72        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
 73        return self.request_util.run_request(
 74            uri=url,
 75            method=GET
 76        )
 77
 78
 79class TerraGroups:
 80    """A class to manage Terra groups and their memberships."""
 81
 82    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
 83    """@private"""
 84    CONFLICT_STATUS_CODE = 409
 85    """@private"""
 86
 87    def __init__(self, request_util: RunRequest):
 88        """
 89        Initialize the TerraGroups class.
 90
 91        **Args:**
 92        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
 93         utility class to handle HTTP requests.
 94        """
 95        self.request_util = request_util
 96        """@private"""
 97
 98    def _check_role(self, role: str) -> None:
 99        """
100        Check if the role is valid.
101
102        Args:
103            role (str): The role to check.
104
105        Raises:
106            ValueError: If the role is not one of the allowed options.
107        """
108        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
109            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
110
111    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
112        """
113        Remove a user from a group.
114
115        **Args:**
116        - group (str): The name of the group.
117        - email (str): The email of the user to remove.
118        - role (str): The role of the user in the group
119            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
120
121        **Returns:**
122        - requests.Response: The response from the request.
123        """
124        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
125        self._check_role(role)
126        logging.info(f"Removing {email} from group {group}")
127        return self.request_util.run_request(
128            uri=url,
129            method=DELETE
130        )
131
132    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
133        """
134        Create a new group.
135
136        **Args:**
137        - group_name (str): The name of the group to create.
138        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
139
140        **Returns:**
141        - requests.Response: The response from the request.
142        """
143        url = f"{SAM_LINK}/groups/v1/{group_name}"
144        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
145        response = self.request_util.run_request(
146            uri=url,
147            method=POST,
148            accept_return_codes=accept_return_codes
149        )
150        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
151            logging.info(f"Group {group_name} already exists. Continuing.")
152            return response
153        else:
154            logging.info(f"Created group {group_name}")
155            return response
156
157    def delete_group(self, group_name: str) -> requests.Response:
158        """
159        Delete a group.
160
161        **Args:**
162        - group_name (str): The name of the group to delete.
163
164        **Returns:**
165        - requests.Response: The response from the request.
166        """
167        url = f"{SAM_LINK}/groups/v1/{group_name}"
168        logging.info(f"Deleting group {group_name}")
169        return self.request_util.run_request(
170            uri=url,
171            method=DELETE
172        )
173
174    def add_user_to_group(
175            self, group: str, email: str, role: str, continue_if_exists: bool = False
176    ) -> requests.Response:
177        """
178        Add a user to a group.
179
180        **Args:**
181        - group (str): The name of the group.
182        - email (str): The email of the user to add.
183        - role (str): The role of the user in the group
184            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
185        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
186                Defaults to `False`.
187
188        **Returns:**
189        - requests.Response: The response from the request.
190        """
191        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
192        self._check_role(role)
193        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
194        logging.info(f"Adding {email} to group {group} as {role}")
195        return self.request_util.run_request(
196            uri=url,
197            method=PUT,
198            accept_return_codes=accept_return_codes
199        )
200
201
202class TerraWorkspace:
203    """Terra workspace class to manage workspaces and their attributes."""
204
205    CONFLICT_STATUS_CODE = 409
206    """@private"""
207
208    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
209        """
210        Initialize the TerraWorkspace class.
211
212        **Args:**
213        - billing_project (str): The billing project associated with the workspace.
214        - workspace_name (str): The name of the workspace.
215        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
216            request utility class to handle HTTP requests.
217        """
218        self.billing_project = billing_project
219        """@private"""
220        self.workspace_name = workspace_name
221        """@private"""
222        self.workspace_id = None
223        """@private"""
224        self.resource_id = None
225        """@private"""
226        self.storage_container = None
227        """@private"""
228        self.bucket = None
229        """@private"""
230        self.wds_url = None
231        """@private"""
232        self.account_url: Optional[str] = None
233        """@private"""
234        self.request_util = request_util
235        """@private"""
236        if env.lower() == "dev":
237            self.terra_link = TERRA_DEV_LINK
238            """@private"""
239        elif env.lower() == "prod":
240            self.terra_link = TERRA_PROD_LINK
241            """@private"""
242        else:
243            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
244
245    def __repr__(self) -> str:
246        """
247        Return a string representation of the TerraWorkspace instance.
248
249        Returns:
250            str: The string representation of the TerraWorkspace instance.
251        """
252        return f"{self.billing_project}/{self.workspace_name}"
253
254    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any:
255        """
256        Yield all entity metrics from the workspace.
257
258        Args:
259            entity (str): The type of entity to query.
260            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
261
262        Yields:
263            Any: The JSON response containing entity metrics.
264        """
265        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
266        response = self.request_util.run_request(
267            uri=url,
268            method=GET,
269            content_type=APPLICATION_JSON
270        )
271        raw_text = response.text
272        first_page_json = json.loads(
273            raw_text,
274            parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
275        )
276        yield first_page_json
277        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
278        logging.info(
279            f"Looping through {total_pages} pages of data")
280
281        for page in range(2, total_pages + 1):
282            logging.info(f"Getting page {page} of {total_pages}")
283            next_page = self.request_util.run_request(
284                uri=url,
285                method=GET,
286                content_type=APPLICATION_JSON,
287                params={"page": page}
288            )
289            raw_text = next_page.text
290            page_json = json.loads(
291                raw_text,
292                parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
293            )
294            yield page_json
295
296    @staticmethod
297    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
298        """Check that all headers follow the standards required by TDR.
299
300        **Args:**
301        - table_name (str): The name of the Terra table.
302        - headers (list[str]): The headers of the Terra table to validate.
303
304        **Raises:**
305        - ValueError if any headers are considered invalid by TDR standards
306        """
307        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
308        tdr_max_header_length = 63
309
310        headers_containing_too_many_characters = []
311        headers_contain_invalid_characters = []
312
313        for header in headers:
314            if len(header) > tdr_max_header_length:
315                headers_containing_too_many_characters.append(header)
316            if not re.match(tdr_header_allowed_pattern, header):
317                headers_contain_invalid_characters.append(header)
318
319        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
320        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
321        header naming."""
322        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
323        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
324        allowed in TDR is {tdr_max_header_length}.\n"""
325        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
326        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
327        only contain numbers, letters, and underscore characters.\n"""
328
329        error_to_report = ""
330        if headers_containing_too_many_characters:
331            error_to_report += too_many_characters_error_message
332        if headers_contain_invalid_characters:
333            error_to_report += invalid_characters_error_message
334        if error_to_report:
335            error_to_report += base_error_message
336            raise ValueError(error_to_report)
337
338    def get_workspace_info(self) -> requests.Response:
339        """
340        Get workspace information.
341
342        **Returns:**
343        - requests.Response: The response from the request.
344        """
345        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
346        logging.info(
347            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
348        return self.request_util.run_request(uri=url, method=GET)
349
350    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
351        """
352        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
353
354        **Args:**
355        - entity_type (str): The type of entity to get metrics for.
356        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
357
358        **Returns:**
359        - list[dict]: A list of dictionaries containing entity metrics.
360        """
361        results = []
362        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
363
364        for page in self._yield_all_entity_metrics(entity=entity_type):
365            results.extend(page["results"])
366
367        # If remove_dicts is True, remove dictionaries from the workspace metrics
368        if remove_dicts:
369            for row in results:
370                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
371        return results
372
373    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
374        """
375        Get specific entity metrics for a given entity type and name.
376
377        **Args:**
378        - entity_type (str): The type of entity to get metrics for.
379        - entity_name (str): The name of the entity to get metrics for.
380
381        **Returns:**
382        - requests.Response: The response from the request.
383        """
384        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
385        return self.request_util.run_request(uri=url, method=GET)
386
387    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
388        """
389        Remove dictionaries from the attributes.
390
391        Args:
392            attributes (dict): The attributes to remove dictionaries from.
393
394        Returns:
395            dict: The updated attributes with no dictionaries.
396        """
397        for key, value in attributes.items():
398            attributes[key] = self._remove_dict_from_cell(value)
399        return attributes
400
401    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
402        """
403        Remove a dictionary from a cell.
404
405        Args:
406            cell_value (Any): The dictionary to remove.
407
408        Returns:
409            Any: The updated cell with no dictionaries.
410        """
411        if isinstance(cell_value, dict):
412            entity_name = cell_value.get("entityName")
413            # If the cell value is a dictionary, check if it has an entityName key
414            if entity_name:
415                # If the cell value is a dictionary with an entityName key, return the entityName
416                return entity_name
417            entity_list = cell_value.get("items")
418            if entity_list or entity_list == []:
419                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
420                return [
421                    self._remove_dict_from_cell(entity) for entity in entity_list
422                ]
423            return cell_value
424        return cell_value
425
426    def get_workspace_bucket(self) -> str:
427        """
428        Get the workspace bucket name. Does not include the `gs://` prefix.
429
430        **Returns:**
431        - str: The bucket name.
432        """
433        return self.get_workspace_info().json()["workspace"]["bucketName"]
434
435    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
436        """
437        Get workspace entity information.
438
439        **Args:**
440        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
441
442        **Returns:**
443        - requests.Response: The response from the request.
444        """
445        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
446        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
447        return self.request_util.run_request(uri=url, method=GET)
448
449    def get_workspace_acl(self) -> requests.Response:
450        """
451        Get the workspace access control list (ACL).
452
453        **Returns:**
454        - requests.Response: The response from the request.
455        """
456        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
457        return self.request_util.run_request(
458            uri=url,
459            method=GET
460        )
461
462    def update_user_acl(
463            self,
464            email: str,
465            access_level: str,
466            can_share: bool = False,
467            can_compute: bool = False,
468            invite_users_not_found: bool = False,
469    ) -> requests.Response:
470        """
471        Update the access control list (ACL) for a user in the workspace.
472
473        **Args:**
474        - email (str): The email of the user.
475        - access_level (str): The access level to grant to the user.
476        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
477        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
478        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
479                the workspace. Defaults to `False`
480
481        **Returns:**
482        - requests.Response: The response from the request.
483        """
484        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
485              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
486        payload = {
487            "email": email,
488            "accessLevel": access_level,
489            "canShare": can_share,
490            "canCompute": can_compute,
491        }
492        logging.info(
493            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
494        response = self.request_util.run_request(
495            uri=url,
496            method=PATCH,
497            content_type=APPLICATION_JSON,
498            data="[" + json.dumps(payload) + "]"
499        )
500
501        if response.json()["usersNotFound"] and not invite_users_not_found:
502            # Will be a list of one user
503            user_not_found = response.json()["usersNotFound"][0]
504            raise Exception(
505                f'The user {user_not_found["email"]} was not found and access was not updated'
506            )
507        return response
508
509    @deprecated(
510        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
511    )
512    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
513        """
514        Update the metadata for a library dataset.
515
516        **Args:**
517        - library_metadata (dict): The metadata to update.
518        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
519
520        **Returns:**
521        - requests.Response: The response from the request.
522        """
523        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
524              f"/metadata?validate={str(validate).lower()}"
525        return self.request_util.run_request(
526            uri=acl,
527            method=PUT,
528            data=json.dumps(library_metadata)
529        )
530
531    def update_multiple_users_acl(
532            self, acl_list: list[dict], invite_users_not_found: bool = False
533    ) -> requests.Response:
534        """
535        Update the access control list (ACL) for multiple users in the workspace.
536
537        **Args:**
538        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
539        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
540                the workspace. Defaults to `False`
541
542        **Returns:**
543        - requests.Response: The response from the request.
544        """
545        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
546            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
547        logging.info(
548            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
549        response = self.request_util.run_request(
550            uri=url,
551            method=PATCH,
552            content_type=APPLICATION_JSON,
553            data=json.dumps(acl_list)
554        )
555
556        if response.json()["usersNotFound"] and not invite_users_not_found:
557            # Will be a list of one user
558            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
559            raise Exception(
560                f"The following users were not found and access was not updated: {users_not_found}"
561            )
562        return response
563
564    def create_workspace(
565            self,
566            auth_domain: list[dict] = [],
567            attributes: dict = {},
568            continue_if_exists: bool = False,
569    ) -> requests.Response:
570        """
571        Create a new workspace in Terra.
572
573        **Args:**
574        - auth_domain (list[dict], optional): A list of authorization domains. Should look
575                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
576        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
577        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
578
579        **Returns:**
580        - requests.Response: The response from the request.
581        """
582        payload = {
583            "namespace": self.billing_project,
584            "name": self.workspace_name,
585            "authorizationDomain": auth_domain,
586            "attributes": attributes,
587            "cloudPlatform": GCP
588        }
589        # If workspace already exists then continue if exists
590        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
591        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
592        response = self.request_util.run_request(
593            uri=f"{self.terra_link}/workspaces",
594            method=POST,
595            content_type=APPLICATION_JSON,
596            data=json.dumps(payload),
597            accept_return_codes=accept_return_codes
598        )
599        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
600            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
601        return response
602
603    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
604        """
605        Create an ingest dictionary for workspace attributes.
606
607        **Args:**
608        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
609
610        **Returns:**
611        - list[dict]: A list of dictionaries containing the workspace attributes.
612        """
613        # If not provided then call API to get it
614        workspace_attributes = (
615            workspace_attributes if workspace_attributes
616            else self.get_workspace_info().json()["workspace"]["attributes"]
617        )
618
619        ingest_dict = []
620        for key, value in workspace_attributes.items():
621            # If value is dict just use 'items' as value
622            if isinstance(value, dict):
623                value = value.get("items")
624            # If value is list convert to comma separated string
625            if isinstance(value, list):
626                value = ", ".join(value)
627            ingest_dict.append(
628                {
629                    "attribute": key,
630                    "value": str(value) if value else None
631                }
632            )
633        return ingest_dict
634
635    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
636        """
637        Upload metadata to the workspace table.
638
639        **Args:**
640        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
641
642        **Returns:**
643        - requests.Response: The response from the request.
644        """
645        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
646        data = {"entities": open(entities_tsv, "rb")}
647        return self.request_util.upload_file(
648            uri=endpoint,
649            data=data
650        )
651
652    def get_workspace_workflows(self) -> requests.Response:
653        """
654        Get the workflows for the workspace.
655
656        **Returns:**
657        - requests.Response: The response from the request.
658        """
659        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
660        return self.request_util.run_request(
661            uri=uri,
662            method=GET
663        )
664
665    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
666        """
667        Import a workflow into the workspace.
668
669        **Args:**
670        - workflow_dict (dict): The dictionary containing the workflow information.
671        - continue_if_exists (bool, optional): Whether to continue if the workflow
672                already exists. Defaults to `False`.
673
674        **Returns:**
675        - requests.Response: The response from the request.
676        """
677        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
678        workflow_json = json.dumps(workflow_dict)
679        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
680        return self.request_util.run_request(
681            uri=uri,
682            method=POST,
683            data=workflow_json,
684            content_type=APPLICATION_JSON,
685            accept_return_codes=accept_return_codes
686        )
687
688    def delete_workspace(self) -> requests.Response:
689        """
690        Delete a Terra workspace.
691
692        **Returns:**
693        - requests.Response: The response from the request.
694        """
695        return self.request_util.run_request(
696            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
697            method=DELETE
698        )
699
700    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
701        """
702        Update the attributes for the workspace.
703
704        **Args:**
705        - attributes (dict): The attributes to update.
706
707        **Returns:**
708        - requests.Response: The response from the request.
709        """
710        return self.request_util.run_request(
711            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
712            method=PATCH,
713            data=json.dumps(attributes),
714            content_type=APPLICATION_JSON
715        )
716
717    def leave_workspace(
718            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
719    ) -> requests.Response:
720        """
721        Leave a workspace. If workspace ID not supplied, will look it up.
722
723        **Args:**
724        - workspace_id (str, optional): The workspace ID. Defaults to None.
725        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
726             Defaults to `False`.
727
728        **Returns:**
729        - requests.Response: The response from the request.
730        """
731        if not workspace_id:
732            workspace_info = self.get_workspace_info().json()
733            workspace_id = workspace_info['workspace']['workspaceId']
734        accepted_return_code = [403] if ignore_direct_access_error else []
735
736        res = self.request_util.run_request(
737            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
738            method=DELETE,
739            accept_return_codes=accepted_return_code
740        )
741        if (res.status_code == 403
742                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
743            logging.info(
744                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
745                f"access to the workspace (they could be an owner on the billing project)"
746            )
747        return res
748
749    def change_workspace_public_setting(self, public: bool) -> requests.Response:
750        """
751        Change a workspace's public setting.
752
753        **Args:**
754        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
755         public, `False` otherwise.
756
757        **Returns:**
758        - requests.Response: The response from the request.
759        """
760        body = [
761            {
762                "settingType": "PubliclyReadable",
763                "config": {
764                    "enabled": public
765                }
766            }
767        ]
768        return self.request_util.run_request(
769            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
770            method=PUT,
771            content_type=APPLICATION_JSON,
772            data=json.dumps(body)
773        )
774
775    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
776        """
777        Check if a workspace is public.
778
779        **Args:**
780        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
781        it up if not provided. Defaults to None.
782
783        **Returns:**
784        - requests.Response: The response from the request.
785        """
786        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
787        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
788        return self.request_util.run_request(
789            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
790            method=GET
791        )
792
793    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
794        """Delete an entire entity table from a Terra workspace.
795
796        **Args:**
797        - entity_to_delete (str): The name of the entity table to delete.
798
799        **Returns:**
800        - requests.Response: The response from the request.
801        """
802        response = self.request_util.run_request(
803            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
804            method=DELETE
805        )
806        if response.status_code == 204:
807            logging.info(
808                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
809                f"'{self.billing_project}/{self.workspace_name}'"
810            )
811        else:
812            logging.error(
813                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
814                f"table: {response.text}"
815            )
816        return response
817
818    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
819        """Save an entity table version in a Terra workspace.
820
821        **Args:**
822        - entity_type (str): The name of the entity table to save a new version for
823        - version_name (str): The name of the new version
824        """
825        # Get the workspace metrics
826        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
827        file_name = f"{entity_type}.json"
828        # Write the workspace metrics to a JSON file
829        with open(file_name, "w") as json_file:
830            json.dump(workspace_metrics, json_file)
831
832        # Create a zip file with the same naming convention that Terra backend uses
833        timestamp_ms = int(time.time() * 1000)
834        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
835        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
836            zipf.write(file_name, arcname=f"json/{file_name}")
837
838        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
839        workspace_info = self.get_workspace_info().json()
840        path_to_upload_to = os.path.join(
841            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
842        )
843        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
844        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
845        try:
846            active_account = gcp_util.get_active_gcloud_account()
847        except Exception as e:
848            active_account = workspace_info["workspace"]["createdBy"]
849            logging.error(
850                f"Encountered the following exception while attempting to get current GCP account: {e}. "
851                f"Will set the owner of the new metadata version as the workspace creator instead."
852            )
853        gcp_util.upload_blob(
854            source_file=zip_file_name,
855            destination_path=path_to_upload_to,
856            custom_metadata={
857                "createdBy": active_account,
858                "entityType": entity_type,
859                "timestamp": timestamp_ms,
860                "description": version_name,
861            }
862        )
863
864    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
865        """
866        Add a user comment to a submission in Terra.
867
868        **Args:**
869        - submission_id (str): The ID of the submission to add a comment to.
870        - user_comment (str): The comment to add to the submission.
871
872        **Returns:**
873        - requests.Response: The response from the request.
874        """
875        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
876        return self.request_util.run_request(
877            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
878            method=PATCH,
879            content_type=APPLICATION_JSON,
880            data=json.dumps({"userComment": user_comment}),
881        )
882
883    def initiate_submission(
884            self,
885            method_config_namespace: str,
886            method_config_name: str,
887            entity_type: str,
888            entity_name: str,
889            expression: str,
890            user_comment: Optional[str],
891            use_call_cache: bool = True
892    ) -> requests.Response:
893        """
894        Initiate a submission within a Terra workspace.
895
896        Note - the workflow being initiated MUST already be imported into the workspace.
897
898        **Args:**
899        - method_config_namespace (str): The namespace of the method configuration.
900        - method_config_name (str): The name of the method configuration to use for the submission
901        (i.e. the workflow name).
902        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
903        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
904        "sample_set_1").
905        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
906        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
907        be launched PER SAMPLE, the expression should be `this.samples`.
908        - user_comment (str, optional): The user comment to add to the submission.
909        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
910
911        **Returns:**
912        - requests.Response: The response from the request.
913        """
914        payload = {
915            "methodConfigurationNamespace": method_config_namespace,
916            "methodConfigurationName": method_config_name,
917            "entityType": entity_type,
918            "entityName": entity_name,
919            "expression": expression,
920            "useCallCache": use_call_cache,
921            "deleteIntermediateOutputFiles": False,
922            "useReferenceDisks": False,
923            "ignoreEmptyOutputs": False,
924        }
925        if user_comment:
926            payload["userComment"] = user_comment
927
928        return self.request_util.run_request(
929            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
930            method=POST,
931            content_type=APPLICATION_JSON,
932            data=json.dumps(payload),
933        )
934      
935    def retry_failed_submission(self, submission_id: str) -> requests.Response:
936        """
937        Retry a failed submission in Terra.
938
939        **Args:**
940        - submission_id (str): The ID of the submission to retry.
941
942        **Returns:**
943        - requests.Response: The response from the request.
944        """
945        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
946        payload = {"retryType": "Failed"}
947        logging.info(
948            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
949        )
950        return self.request_util.run_request(
951            uri=url,
952            method=POST,
953            content_type=APPLICATION_JSON,
954            data=json.dumps(payload)
955        )
956
957    def get_submission_status(self, submission_id: str) -> requests.Response:
958        """
959        Get the status of a submission in Terra.
960
961        **Args:**
962        - submission_id (str): The ID of the submission.
963
964        **Returns:**
965        - requests.Response: The response from the request.
966        """
967        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
968        logging.info(
969            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
970        )
971        return self.request_util.run_request(
972            uri=url,
973            method=GET
974        )
MEMBER = 'member'
ADMIN = 'admin'
class Terra:
35class Terra:
36    """Class for generic Terra utilities."""
37
38    def __init__(self, request_util: RunRequest, env: str = "prod"):
39        """
40        Initialize the Terra class.
41
42        **Args:**
43        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
44            request utility class to handle HTTP requests.
45        """
46        self.request_util = request_util
47        """@private"""
48
49    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
50        """
51        Fetch the list of accessible workspaces.
52
53        **Args:**
54        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
55
56        **Returns:**
57        - requests.Response: The response from the request.
58        """
59        fields_str = "fields=" + ",".join(fields) if fields else ""
60        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
61        return self.request_util.run_request(
62            uri=url,
63            method=GET
64        )
65
66    def get_pet_account_json(self) -> requests.Response:
67        """
68        Get the service account JSON.
69
70        **Returns:**
71        - requests.Response: The response from the request.
72        """
73        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
74        return self.request_util.run_request(
75            uri=url,
76            method=GET
77        )

Class for generic Terra utilities.

Terra(request_util: ops_utils.request_util.RunRequest, env: str = 'prod')
38    def __init__(self, request_util: RunRequest, env: str = "prod"):
39        """
40        Initialize the Terra class.
41
42        **Args:**
43        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
44            request utility class to handle HTTP requests.
45        """
46        self.request_util = request_util
47        """@private"""

Initialize the Terra class.

Args:

def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.models.Response:
49    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
50        """
51        Fetch the list of accessible workspaces.
52
53        **Args:**
54        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
55
56        **Returns:**
57        - requests.Response: The response from the request.
58        """
59        fields_str = "fields=" + ",".join(fields) if fields else ""
60        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
61        return self.request_util.run_request(
62            uri=url,
63            method=GET
64        )

Fetch the list of accessible workspaces.

Args:

  • fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.

Returns:

  • requests.Response: The response from the request.
def get_pet_account_json(self) -> requests.models.Response:
66    def get_pet_account_json(self) -> requests.Response:
67        """
68        Get the service account JSON.
69
70        **Returns:**
71        - requests.Response: The response from the request.
72        """
73        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
74        return self.request_util.run_request(
75            uri=url,
76            method=GET
77        )

Get the service account JSON.

Returns:

  • requests.Response: The response from the request.
class TerraGroups:
 80class TerraGroups:
 81    """A class to manage Terra groups and their memberships."""
 82
 83    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
 84    """@private"""
 85    CONFLICT_STATUS_CODE = 409
 86    """@private"""
 87
 88    def __init__(self, request_util: RunRequest):
 89        """
 90        Initialize the TerraGroups class.
 91
 92        **Args:**
 93        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
 94         utility class to handle HTTP requests.
 95        """
 96        self.request_util = request_util
 97        """@private"""
 98
 99    def _check_role(self, role: str) -> None:
100        """
101        Check if the role is valid.
102
103        Args:
104            role (str): The role to check.
105
106        Raises:
107            ValueError: If the role is not one of the allowed options.
108        """
109        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
110            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
111
112    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
113        """
114        Remove a user from a group.
115
116        **Args:**
117        - group (str): The name of the group.
118        - email (str): The email of the user to remove.
119        - role (str): The role of the user in the group
120            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
121
122        **Returns:**
123        - requests.Response: The response from the request.
124        """
125        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
126        self._check_role(role)
127        logging.info(f"Removing {email} from group {group}")
128        return self.request_util.run_request(
129            uri=url,
130            method=DELETE
131        )
132
133    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
134        """
135        Create a new group.
136
137        **Args:**
138        - group_name (str): The name of the group to create.
139        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
140
141        **Returns:**
142        - requests.Response: The response from the request.
143        """
144        url = f"{SAM_LINK}/groups/v1/{group_name}"
145        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
146        response = self.request_util.run_request(
147            uri=url,
148            method=POST,
149            accept_return_codes=accept_return_codes
150        )
151        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
152            logging.info(f"Group {group_name} already exists. Continuing.")
153            return response
154        else:
155            logging.info(f"Created group {group_name}")
156            return response
157
158    def delete_group(self, group_name: str) -> requests.Response:
159        """
160        Delete a group.
161
162        **Args:**
163        - group_name (str): The name of the group to delete.
164
165        **Returns:**
166        - requests.Response: The response from the request.
167        """
168        url = f"{SAM_LINK}/groups/v1/{group_name}"
169        logging.info(f"Deleting group {group_name}")
170        return self.request_util.run_request(
171            uri=url,
172            method=DELETE
173        )
174
175    def add_user_to_group(
176            self, group: str, email: str, role: str, continue_if_exists: bool = False
177    ) -> requests.Response:
178        """
179        Add a user to a group.
180
181        **Args:**
182        - group (str): The name of the group.
183        - email (str): The email of the user to add.
184        - role (str): The role of the user in the group
185            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
186        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
187                Defaults to `False`.
188
189        **Returns:**
190        - requests.Response: The response from the request.
191        """
192        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
193        self._check_role(role)
194        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
195        logging.info(f"Adding {email} to group {group} as {role}")
196        return self.request_util.run_request(
197            uri=url,
198            method=PUT,
199            accept_return_codes=accept_return_codes
200        )

A class to manage Terra groups and their memberships.

TerraGroups(request_util: ops_utils.request_util.RunRequest)
88    def __init__(self, request_util: RunRequest):
89        """
90        Initialize the TerraGroups class.
91
92        **Args:**
93        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
94         utility class to handle HTTP requests.
95        """
96        self.request_util = request_util
97        """@private"""

Initialize the TerraGroups class.

Args:

def remove_user_from_group(self, group: str, email: str, role: str) -> requests.models.Response:
112    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
113        """
114        Remove a user from a group.
115
116        **Args:**
117        - group (str): The name of the group.
118        - email (str): The email of the user to remove.
119        - role (str): The role of the user in the group
120            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
121
122        **Returns:**
123        - requests.Response: The response from the request.
124        """
125        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
126        self._check_role(role)
127        logging.info(f"Removing {email} from group {group}")
128        return self.request_util.run_request(
129            uri=url,
130            method=DELETE
131        )

Remove a user from a group.

Args:

Returns:

  • requests.Response: The response from the request.
def create_group( self, group_name: str, continue_if_exists: bool = False) -> requests.models.Response:
133    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
134        """
135        Create a new group.
136
137        **Args:**
138        - group_name (str): The name of the group to create.
139        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
140
141        **Returns:**
142        - requests.Response: The response from the request.
143        """
144        url = f"{SAM_LINK}/groups/v1/{group_name}"
145        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
146        response = self.request_util.run_request(
147            uri=url,
148            method=POST,
149            accept_return_codes=accept_return_codes
150        )
151        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
152            logging.info(f"Group {group_name} already exists. Continuing.")
153            return response
154        else:
155            logging.info(f"Created group {group_name}")
156            return response

Create a new group.

Args:

  • group_name (str): The name of the group to create.
  • continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_group(self, group_name: str) -> requests.models.Response:
158    def delete_group(self, group_name: str) -> requests.Response:
159        """
160        Delete a group.
161
162        **Args:**
163        - group_name (str): The name of the group to delete.
164
165        **Returns:**
166        - requests.Response: The response from the request.
167        """
168        url = f"{SAM_LINK}/groups/v1/{group_name}"
169        logging.info(f"Deleting group {group_name}")
170        return self.request_util.run_request(
171            uri=url,
172            method=DELETE
173        )

Delete a group.

Args:

  • group_name (str): The name of the group to delete.

Returns:

  • requests.Response: The response from the request.
def add_user_to_group( self, group: str, email: str, role: str, continue_if_exists: bool = False) -> requests.models.Response:
175    def add_user_to_group(
176            self, group: str, email: str, role: str, continue_if_exists: bool = False
177    ) -> requests.Response:
178        """
179        Add a user to a group.
180
181        **Args:**
182        - group (str): The name of the group.
183        - email (str): The email of the user to add.
184        - role (str): The role of the user in the group
185            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
186        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
187                Defaults to `False`.
188
189        **Returns:**
190        - requests.Response: The response from the request.
191        """
192        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
193        self._check_role(role)
194        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
195        logging.info(f"Adding {email} to group {group} as {role}")
196        return self.request_util.run_request(
197            uri=url,
198            method=PUT,
199            accept_return_codes=accept_return_codes
200        )

Add a user to a group.

Args:

  • group (str): The name of the group.
  • email (str): The email of the user to add.
  • role (str): The role of the user in the group (must be one of ops_utils.terra_utils.MEMBER or ops_utils.terra_utils.ADMIN).
  • continue_if_exists (bool, optional): Whether to continue if the user is already in the group. Defaults to False.

Returns:

  • requests.Response: The response from the request.
class TerraWorkspace:
203class TerraWorkspace:
204    """Terra workspace class to manage workspaces and their attributes."""
205
206    CONFLICT_STATUS_CODE = 409
207    """@private"""
208
209    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
210        """
211        Initialize the TerraWorkspace class.
212
213        **Args:**
214        - billing_project (str): The billing project associated with the workspace.
215        - workspace_name (str): The name of the workspace.
216        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
217            request utility class to handle HTTP requests.
218        """
219        self.billing_project = billing_project
220        """@private"""
221        self.workspace_name = workspace_name
222        """@private"""
223        self.workspace_id = None
224        """@private"""
225        self.resource_id = None
226        """@private"""
227        self.storage_container = None
228        """@private"""
229        self.bucket = None
230        """@private"""
231        self.wds_url = None
232        """@private"""
233        self.account_url: Optional[str] = None
234        """@private"""
235        self.request_util = request_util
236        """@private"""
237        if env.lower() == "dev":
238            self.terra_link = TERRA_DEV_LINK
239            """@private"""
240        elif env.lower() == "prod":
241            self.terra_link = TERRA_PROD_LINK
242            """@private"""
243        else:
244            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
245
246    def __repr__(self) -> str:
247        """
248        Return a string representation of the TerraWorkspace instance.
249
250        Returns:
251            str: The string representation of the TerraWorkspace instance.
252        """
253        return f"{self.billing_project}/{self.workspace_name}"
254
255    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any:
256        """
257        Yield all entity metrics from the workspace.
258
259        Args:
260            entity (str): The type of entity to query.
261            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
262
263        Yields:
264            Any: The JSON response containing entity metrics.
265        """
266        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
267        response = self.request_util.run_request(
268            uri=url,
269            method=GET,
270            content_type=APPLICATION_JSON
271        )
272        raw_text = response.text
273        first_page_json = json.loads(
274            raw_text,
275            parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
276        )
277        yield first_page_json
278        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
279        logging.info(
280            f"Looping through {total_pages} pages of data")
281
282        for page in range(2, total_pages + 1):
283            logging.info(f"Getting page {page} of {total_pages}")
284            next_page = self.request_util.run_request(
285                uri=url,
286                method=GET,
287                content_type=APPLICATION_JSON,
288                params={"page": page}
289            )
290            raw_text = next_page.text
291            page_json = json.loads(
292                raw_text,
293                parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x)
294            )
295            yield page_json
296
297    @staticmethod
298    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
299        """Check that all headers follow the standards required by TDR.
300
301        **Args:**
302        - table_name (str): The name of the Terra table.
303        - headers (list[str]): The headers of the Terra table to validate.
304
305        **Raises:**
306        - ValueError if any headers are considered invalid by TDR standards
307        """
308        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
309        tdr_max_header_length = 63
310
311        headers_containing_too_many_characters = []
312        headers_contain_invalid_characters = []
313
314        for header in headers:
315            if len(header) > tdr_max_header_length:
316                headers_containing_too_many_characters.append(header)
317            if not re.match(tdr_header_allowed_pattern, header):
318                headers_contain_invalid_characters.append(header)
319
320        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
321        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
322        header naming."""
323        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
324        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
325        allowed in TDR is {tdr_max_header_length}.\n"""
326        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
327        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
328        only contain numbers, letters, and underscore characters.\n"""
329
330        error_to_report = ""
331        if headers_containing_too_many_characters:
332            error_to_report += too_many_characters_error_message
333        if headers_contain_invalid_characters:
334            error_to_report += invalid_characters_error_message
335        if error_to_report:
336            error_to_report += base_error_message
337            raise ValueError(error_to_report)
338
339    def get_workspace_info(self) -> requests.Response:
340        """
341        Get workspace information.
342
343        **Returns:**
344        - requests.Response: The response from the request.
345        """
346        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
347        logging.info(
348            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
349        return self.request_util.run_request(uri=url, method=GET)
350
351    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
352        """
353        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
354
355        **Args:**
356        - entity_type (str): The type of entity to get metrics for.
357        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
358
359        **Returns:**
360        - list[dict]: A list of dictionaries containing entity metrics.
361        """
362        results = []
363        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
364
365        for page in self._yield_all_entity_metrics(entity=entity_type):
366            results.extend(page["results"])
367
368        # If remove_dicts is True, remove dictionaries from the workspace metrics
369        if remove_dicts:
370            for row in results:
371                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
372        return results
373
374    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
375        """
376        Get specific entity metrics for a given entity type and name.
377
378        **Args:**
379        - entity_type (str): The type of entity to get metrics for.
380        - entity_name (str): The name of the entity to get metrics for.
381
382        **Returns:**
383        - requests.Response: The response from the request.
384        """
385        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
386        return self.request_util.run_request(uri=url, method=GET)
387
388    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
389        """
390        Remove dictionaries from the attributes.
391
392        Args:
393            attributes (dict): The attributes to remove dictionaries from.
394
395        Returns:
396            dict: The updated attributes with no dictionaries.
397        """
398        for key, value in attributes.items():
399            attributes[key] = self._remove_dict_from_cell(value)
400        return attributes
401
402    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
403        """
404        Remove a dictionary from a cell.
405
406        Args:
407            cell_value (Any): The dictionary to remove.
408
409        Returns:
410            Any: The updated cell with no dictionaries.
411        """
412        if isinstance(cell_value, dict):
413            entity_name = cell_value.get("entityName")
414            # If the cell value is a dictionary, check if it has an entityName key
415            if entity_name:
416                # If the cell value is a dictionary with an entityName key, return the entityName
417                return entity_name
418            entity_list = cell_value.get("items")
419            if entity_list or entity_list == []:
420                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
421                return [
422                    self._remove_dict_from_cell(entity) for entity in entity_list
423                ]
424            return cell_value
425        return cell_value
426
427    def get_workspace_bucket(self) -> str:
428        """
429        Get the workspace bucket name. Does not include the `gs://` prefix.
430
431        **Returns:**
432        - str: The bucket name.
433        """
434        return self.get_workspace_info().json()["workspace"]["bucketName"]
435
436    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
437        """
438        Get workspace entity information.
439
440        **Args:**
441        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
442
443        **Returns:**
444        - requests.Response: The response from the request.
445        """
446        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
447        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
448        return self.request_util.run_request(uri=url, method=GET)
449
450    def get_workspace_acl(self) -> requests.Response:
451        """
452        Get the workspace access control list (ACL).
453
454        **Returns:**
455        - requests.Response: The response from the request.
456        """
457        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
458        return self.request_util.run_request(
459            uri=url,
460            method=GET
461        )
462
463    def update_user_acl(
464            self,
465            email: str,
466            access_level: str,
467            can_share: bool = False,
468            can_compute: bool = False,
469            invite_users_not_found: bool = False,
470    ) -> requests.Response:
471        """
472        Update the access control list (ACL) for a user in the workspace.
473
474        **Args:**
475        - email (str): The email of the user.
476        - access_level (str): The access level to grant to the user.
477        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
478        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
479        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
480                the workspace. Defaults to `False`
481
482        **Returns:**
483        - requests.Response: The response from the request.
484        """
485        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
486              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
487        payload = {
488            "email": email,
489            "accessLevel": access_level,
490            "canShare": can_share,
491            "canCompute": can_compute,
492        }
493        logging.info(
494            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
495        response = self.request_util.run_request(
496            uri=url,
497            method=PATCH,
498            content_type=APPLICATION_JSON,
499            data="[" + json.dumps(payload) + "]"
500        )
501
502        if response.json()["usersNotFound"] and not invite_users_not_found:
503            # Will be a list of one user
504            user_not_found = response.json()["usersNotFound"][0]
505            raise Exception(
506                f'The user {user_not_found["email"]} was not found and access was not updated'
507            )
508        return response
509
510    @deprecated(
511        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
512    )
513    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
514        """
515        Update the metadata for a library dataset.
516
517        **Args:**
518        - library_metadata (dict): The metadata to update.
519        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
520
521        **Returns:**
522        - requests.Response: The response from the request.
523        """
524        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
525              f"/metadata?validate={str(validate).lower()}"
526        return self.request_util.run_request(
527            uri=acl,
528            method=PUT,
529            data=json.dumps(library_metadata)
530        )
531
532    def update_multiple_users_acl(
533            self, acl_list: list[dict], invite_users_not_found: bool = False
534    ) -> requests.Response:
535        """
536        Update the access control list (ACL) for multiple users in the workspace.
537
538        **Args:**
539        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
540        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
541                the workspace. Defaults to `False`
542
543        **Returns:**
544        - requests.Response: The response from the request.
545        """
546        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
547            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
548        logging.info(
549            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
550        response = self.request_util.run_request(
551            uri=url,
552            method=PATCH,
553            content_type=APPLICATION_JSON,
554            data=json.dumps(acl_list)
555        )
556
557        if response.json()["usersNotFound"] and not invite_users_not_found:
558            # Will be a list of one user
559            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
560            raise Exception(
561                f"The following users were not found and access was not updated: {users_not_found}"
562            )
563        return response
564
565    def create_workspace(
566            self,
567            auth_domain: list[dict] = [],
568            attributes: dict = {},
569            continue_if_exists: bool = False,
570    ) -> requests.Response:
571        """
572        Create a new workspace in Terra.
573
574        **Args:**
575        - auth_domain (list[dict], optional): A list of authorization domains. Should look
576                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
577        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
578        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
579
580        **Returns:**
581        - requests.Response: The response from the request.
582        """
583        payload = {
584            "namespace": self.billing_project,
585            "name": self.workspace_name,
586            "authorizationDomain": auth_domain,
587            "attributes": attributes,
588            "cloudPlatform": GCP
589        }
590        # If workspace already exists then continue if exists
591        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
592        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
593        response = self.request_util.run_request(
594            uri=f"{self.terra_link}/workspaces",
595            method=POST,
596            content_type=APPLICATION_JSON,
597            data=json.dumps(payload),
598            accept_return_codes=accept_return_codes
599        )
600        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
601            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
602        return response
603
604    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
605        """
606        Create an ingest dictionary for workspace attributes.
607
608        **Args:**
609        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
610
611        **Returns:**
612        - list[dict]: A list of dictionaries containing the workspace attributes.
613        """
614        # If not provided then call API to get it
615        workspace_attributes = (
616            workspace_attributes if workspace_attributes
617            else self.get_workspace_info().json()["workspace"]["attributes"]
618        )
619
620        ingest_dict = []
621        for key, value in workspace_attributes.items():
622            # If value is dict just use 'items' as value
623            if isinstance(value, dict):
624                value = value.get("items")
625            # If value is list convert to comma separated string
626            if isinstance(value, list):
627                value = ", ".join(value)
628            ingest_dict.append(
629                {
630                    "attribute": key,
631                    "value": str(value) if value else None
632                }
633            )
634        return ingest_dict
635
636    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
637        """
638        Upload metadata to the workspace table.
639
640        **Args:**
641        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
642
643        **Returns:**
644        - requests.Response: The response from the request.
645        """
646        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
647        data = {"entities": open(entities_tsv, "rb")}
648        return self.request_util.upload_file(
649            uri=endpoint,
650            data=data
651        )
652
653    def get_workspace_workflows(self) -> requests.Response:
654        """
655        Get the workflows for the workspace.
656
657        **Returns:**
658        - requests.Response: The response from the request.
659        """
660        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
661        return self.request_util.run_request(
662            uri=uri,
663            method=GET
664        )
665
666    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
667        """
668        Import a workflow into the workspace.
669
670        **Args:**
671        - workflow_dict (dict): The dictionary containing the workflow information.
672        - continue_if_exists (bool, optional): Whether to continue if the workflow
673                already exists. Defaults to `False`.
674
675        **Returns:**
676        - requests.Response: The response from the request.
677        """
678        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
679        workflow_json = json.dumps(workflow_dict)
680        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
681        return self.request_util.run_request(
682            uri=uri,
683            method=POST,
684            data=workflow_json,
685            content_type=APPLICATION_JSON,
686            accept_return_codes=accept_return_codes
687        )
688
689    def delete_workspace(self) -> requests.Response:
690        """
691        Delete a Terra workspace.
692
693        **Returns:**
694        - requests.Response: The response from the request.
695        """
696        return self.request_util.run_request(
697            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
698            method=DELETE
699        )
700
701    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
702        """
703        Update the attributes for the workspace.
704
705        **Args:**
706        - attributes (dict): The attributes to update.
707
708        **Returns:**
709        - requests.Response: The response from the request.
710        """
711        return self.request_util.run_request(
712            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
713            method=PATCH,
714            data=json.dumps(attributes),
715            content_type=APPLICATION_JSON
716        )
717
718    def leave_workspace(
719            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
720    ) -> requests.Response:
721        """
722        Leave a workspace. If workspace ID not supplied, will look it up.
723
724        **Args:**
725        - workspace_id (str, optional): The workspace ID. Defaults to None.
726        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
727             Defaults to `False`.
728
729        **Returns:**
730        - requests.Response: The response from the request.
731        """
732        if not workspace_id:
733            workspace_info = self.get_workspace_info().json()
734            workspace_id = workspace_info['workspace']['workspaceId']
735        accepted_return_code = [403] if ignore_direct_access_error else []
736
737        res = self.request_util.run_request(
738            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
739            method=DELETE,
740            accept_return_codes=accepted_return_code
741        )
742        if (res.status_code == 403
743                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
744            logging.info(
745                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
746                f"access to the workspace (they could be an owner on the billing project)"
747            )
748        return res
749
750    def change_workspace_public_setting(self, public: bool) -> requests.Response:
751        """
752        Change a workspace's public setting.
753
754        **Args:**
755        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
756         public, `False` otherwise.
757
758        **Returns:**
759        - requests.Response: The response from the request.
760        """
761        body = [
762            {
763                "settingType": "PubliclyReadable",
764                "config": {
765                    "enabled": public
766                }
767            }
768        ]
769        return self.request_util.run_request(
770            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
771            method=PUT,
772            content_type=APPLICATION_JSON,
773            data=json.dumps(body)
774        )
775
776    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
777        """
778        Check if a workspace is public.
779
780        **Args:**
781        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
782        it up if not provided. Defaults to None.
783
784        **Returns:**
785        - requests.Response: The response from the request.
786        """
787        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
788        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
789        return self.request_util.run_request(
790            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
791            method=GET
792        )
793
794    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
795        """Delete an entire entity table from a Terra workspace.
796
797        **Args:**
798        - entity_to_delete (str): The name of the entity table to delete.
799
800        **Returns:**
801        - requests.Response: The response from the request.
802        """
803        response = self.request_util.run_request(
804            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
805            method=DELETE
806        )
807        if response.status_code == 204:
808            logging.info(
809                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
810                f"'{self.billing_project}/{self.workspace_name}'"
811            )
812        else:
813            logging.error(
814                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
815                f"table: {response.text}"
816            )
817        return response
818
819    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
820        """Save an entity table version in a Terra workspace.
821
822        **Args:**
823        - entity_type (str): The name of the entity table to save a new version for
824        - version_name (str): The name of the new version
825        """
826        # Get the workspace metrics
827        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
828        file_name = f"{entity_type}.json"
829        # Write the workspace metrics to a JSON file
830        with open(file_name, "w") as json_file:
831            json.dump(workspace_metrics, json_file)
832
833        # Create a zip file with the same naming convention that Terra backend uses
834        timestamp_ms = int(time.time() * 1000)
835        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
836        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
837            zipf.write(file_name, arcname=f"json/{file_name}")
838
839        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
840        workspace_info = self.get_workspace_info().json()
841        path_to_upload_to = os.path.join(
842            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
843        )
844        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
845        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
846        try:
847            active_account = gcp_util.get_active_gcloud_account()
848        except Exception as e:
849            active_account = workspace_info["workspace"]["createdBy"]
850            logging.error(
851                f"Encountered the following exception while attempting to get current GCP account: {e}. "
852                f"Will set the owner of the new metadata version as the workspace creator instead."
853            )
854        gcp_util.upload_blob(
855            source_file=zip_file_name,
856            destination_path=path_to_upload_to,
857            custom_metadata={
858                "createdBy": active_account,
859                "entityType": entity_type,
860                "timestamp": timestamp_ms,
861                "description": version_name,
862            }
863        )
864
865    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
866        """
867        Add a user comment to a submission in Terra.
868
869        **Args:**
870        - submission_id (str): The ID of the submission to add a comment to.
871        - user_comment (str): The comment to add to the submission.
872
873        **Returns:**
874        - requests.Response: The response from the request.
875        """
876        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
877        return self.request_util.run_request(
878            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
879            method=PATCH,
880            content_type=APPLICATION_JSON,
881            data=json.dumps({"userComment": user_comment}),
882        )
883
884    def initiate_submission(
885            self,
886            method_config_namespace: str,
887            method_config_name: str,
888            entity_type: str,
889            entity_name: str,
890            expression: str,
891            user_comment: Optional[str],
892            use_call_cache: bool = True
893    ) -> requests.Response:
894        """
895        Initiate a submission within a Terra workspace.
896
897        Note - the workflow being initiated MUST already be imported into the workspace.
898
899        **Args:**
900        - method_config_namespace (str): The namespace of the method configuration.
901        - method_config_name (str): The name of the method configuration to use for the submission
902        (i.e. the workflow name).
903        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
904        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
905        "sample_set_1").
906        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
907        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
908        be launched PER SAMPLE, the expression should be `this.samples`.
909        - user_comment (str, optional): The user comment to add to the submission.
910        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
911
912        **Returns:**
913        - requests.Response: The response from the request.
914        """
915        payload = {
916            "methodConfigurationNamespace": method_config_namespace,
917            "methodConfigurationName": method_config_name,
918            "entityType": entity_type,
919            "entityName": entity_name,
920            "expression": expression,
921            "useCallCache": use_call_cache,
922            "deleteIntermediateOutputFiles": False,
923            "useReferenceDisks": False,
924            "ignoreEmptyOutputs": False,
925        }
926        if user_comment:
927            payload["userComment"] = user_comment
928
929        return self.request_util.run_request(
930            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
931            method=POST,
932            content_type=APPLICATION_JSON,
933            data=json.dumps(payload),
934        )
935      
936    def retry_failed_submission(self, submission_id: str) -> requests.Response:
937        """
938        Retry a failed submission in Terra.
939
940        **Args:**
941        - submission_id (str): The ID of the submission to retry.
942
943        **Returns:**
944        - requests.Response: The response from the request.
945        """
946        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
947        payload = {"retryType": "Failed"}
948        logging.info(
949            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
950        )
951        return self.request_util.run_request(
952            uri=url,
953            method=POST,
954            content_type=APPLICATION_JSON,
955            data=json.dumps(payload)
956        )
957
958    def get_submission_status(self, submission_id: str) -> requests.Response:
959        """
960        Get the status of a submission in Terra.
961
962        **Args:**
963        - submission_id (str): The ID of the submission.
964
965        **Returns:**
966        - requests.Response: The response from the request.
967        """
968        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
969        logging.info(
970            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
971        )
972        return self.request_util.run_request(
973            uri=url,
974            method=GET
975        )

Terra workspace class to manage workspaces and their attributes.

TerraWorkspace( billing_project: str, workspace_name: str, request_util: ops_utils.request_util.RunRequest, env: str = 'prod')
209    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"):
210        """
211        Initialize the TerraWorkspace class.
212
213        **Args:**
214        - billing_project (str): The billing project associated with the workspace.
215        - workspace_name (str): The name of the workspace.
216        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
217            request utility class to handle HTTP requests.
218        """
219        self.billing_project = billing_project
220        """@private"""
221        self.workspace_name = workspace_name
222        """@private"""
223        self.workspace_id = None
224        """@private"""
225        self.resource_id = None
226        """@private"""
227        self.storage_container = None
228        """@private"""
229        self.bucket = None
230        """@private"""
231        self.wds_url = None
232        """@private"""
233        self.account_url: Optional[str] = None
234        """@private"""
235        self.request_util = request_util
236        """@private"""
237        if env.lower() == "dev":
238            self.terra_link = TERRA_DEV_LINK
239            """@private"""
240        elif env.lower() == "prod":
241            self.terra_link = TERRA_PROD_LINK
242            """@private"""
243        else:
244            raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")

Initialize the TerraWorkspace class.

Args:

  • billing_project (str): The billing project associated with the workspace.
  • workspace_name (str): The name of the workspace.
  • request_util (ops_utils.request_util.RunRequest): An instance of a request utility class to handle HTTP requests.
@staticmethod
def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
297    @staticmethod
298    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
299        """Check that all headers follow the standards required by TDR.
300
301        **Args:**
302        - table_name (str): The name of the Terra table.
303        - headers (list[str]): The headers of the Terra table to validate.
304
305        **Raises:**
306        - ValueError if any headers are considered invalid by TDR standards
307        """
308        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
309        tdr_max_header_length = 63
310
311        headers_containing_too_many_characters = []
312        headers_contain_invalid_characters = []
313
314        for header in headers:
315            if len(header) > tdr_max_header_length:
316                headers_containing_too_many_characters.append(header)
317            if not re.match(tdr_header_allowed_pattern, header):
318                headers_contain_invalid_characters.append(header)
319
320        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
321        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
322        header naming."""
323        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
324        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
325        allowed in TDR is {tdr_max_header_length}.\n"""
326        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
327        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
328        only contain numbers, letters, and underscore characters.\n"""
329
330        error_to_report = ""
331        if headers_containing_too_many_characters:
332            error_to_report += too_many_characters_error_message
333        if headers_contain_invalid_characters:
334            error_to_report += invalid_characters_error_message
335        if error_to_report:
336            error_to_report += base_error_message
337            raise ValueError(error_to_report)

Check that all headers follow the standards required by TDR.

Args:

  • table_name (str): The name of the Terra table.
  • headers (list[str]): The headers of the Terra table to validate.

Raises:

  • ValueError if any headers are considered invalid by TDR standards
def get_workspace_info(self) -> requests.models.Response:
339    def get_workspace_info(self) -> requests.Response:
340        """
341        Get workspace information.
342
343        **Returns:**
344        - requests.Response: The response from the request.
345        """
346        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}"
347        logging.info(
348            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
349        return self.request_util.run_request(uri=url, method=GET)

Get workspace information.

Returns:

  • requests.Response: The response from the request.
def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
351    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
352        """
353        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
354
355        **Args:**
356        - entity_type (str): The type of entity to get metrics for.
357        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
358
359        **Returns:**
360        - list[dict]: A list of dictionaries containing entity metrics.
361        """
362        results = []
363        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
364
365        for page in self._yield_all_entity_metrics(entity=entity_type):
366            results.extend(page["results"])
367
368        # If remove_dicts is True, remove dictionaries from the workspace metrics
369        if remove_dicts:
370            for row in results:
371                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
372        return results

Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).

Args:

  • entity_type (str): The type of entity to get metrics for.
  • remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to False.

Returns:

  • list[dict]: A list of dictionaries containing entity metrics.
def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.models.Response:
374    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
375        """
376        Get specific entity metrics for a given entity type and name.
377
378        **Args:**
379        - entity_type (str): The type of entity to get metrics for.
380        - entity_name (str): The name of the entity to get metrics for.
381
382        **Returns:**
383        - requests.Response: The response from the request.
384        """
385        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
386        return self.request_util.run_request(uri=url, method=GET)

Get specific entity metrics for a given entity type and name.

Args:

  • entity_type (str): The type of entity to get metrics for.
  • entity_name (str): The name of the entity to get metrics for.

Returns:

  • requests.Response: The response from the request.
def get_workspace_bucket(self) -> str:
427    def get_workspace_bucket(self) -> str:
428        """
429        Get the workspace bucket name. Does not include the `gs://` prefix.
430
431        **Returns:**
432        - str: The bucket name.
433        """
434        return self.get_workspace_info().json()["workspace"]["bucketName"]

Get the workspace bucket name. Does not include the gs:// prefix.

Returns:

  • str: The bucket name.
def get_workspace_entity_info(self, use_cache: bool = True) -> requests.models.Response:
436    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
437        """
438        Get workspace entity information.
439
440        **Args:**
441        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
442
443        **Returns:**
444        - requests.Response: The response from the request.
445        """
446        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
447        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
448        return self.request_util.run_request(uri=url, method=GET)

Get workspace entity information.

Args:

  • use_cache (bool, optional): Whether to use cache. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def get_workspace_acl(self) -> requests.models.Response:
450    def get_workspace_acl(self) -> requests.Response:
451        """
452        Get the workspace access control list (ACL).
453
454        **Returns:**
455        - requests.Response: The response from the request.
456        """
457        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
458        return self.request_util.run_request(
459            uri=url,
460            method=GET
461        )

Get the workspace access control list (ACL).

Returns:

  • requests.Response: The response from the request.
def update_user_acl( self, email: str, access_level: str, can_share: bool = False, can_compute: bool = False, invite_users_not_found: bool = False) -> requests.models.Response:
463    def update_user_acl(
464            self,
465            email: str,
466            access_level: str,
467            can_share: bool = False,
468            can_compute: bool = False,
469            invite_users_not_found: bool = False,
470    ) -> requests.Response:
471        """
472        Update the access control list (ACL) for a user in the workspace.
473
474        **Args:**
475        - email (str): The email of the user.
476        - access_level (str): The access level to grant to the user.
477        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
478        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
479        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
480                the workspace. Defaults to `False`
481
482        **Returns:**
483        - requests.Response: The response from the request.
484        """
485        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
486              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
487        payload = {
488            "email": email,
489            "accessLevel": access_level,
490            "canShare": can_share,
491            "canCompute": can_compute,
492        }
493        logging.info(
494            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
495        response = self.request_util.run_request(
496            uri=url,
497            method=PATCH,
498            content_type=APPLICATION_JSON,
499            data="[" + json.dumps(payload) + "]"
500        )
501
502        if response.json()["usersNotFound"] and not invite_users_not_found:
503            # Will be a list of one user
504            user_not_found = response.json()["usersNotFound"][0]
505            raise Exception(
506                f'The user {user_not_found["email"]} was not found and access was not updated'
507            )
508        return response

Update the access control list (ACL) for a user in the workspace.

Args:

  • email (str): The email of the user.
  • access_level (str): The access level to grant to the user.
  • can_share (bool, optional): Whether the user can share the workspace. Defaults to False.
  • can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to False.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
@deprecated('Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.')
def put_metadata_for_library_dataset( self, library_metadata: dict, validate: bool = False) -> requests.models.Response:
510    @deprecated(
511        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
512    )
513    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
514        """
515        Update the metadata for a library dataset.
516
517        **Args:**
518        - library_metadata (dict): The metadata to update.
519        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
520
521        **Returns:**
522        - requests.Response: The response from the request.
523        """
524        acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \
525              f"/metadata?validate={str(validate).lower()}"
526        return self.request_util.run_request(
527            uri=acl,
528            method=PUT,
529            data=json.dumps(library_metadata)
530        )

Update the metadata for a library dataset.

Args:

  • library_metadata (dict): The metadata to update.
  • validate (bool, optional): Whether to validate the metadata. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def update_multiple_users_acl( self, acl_list: list[dict], invite_users_not_found: bool = False) -> requests.models.Response:
532    def update_multiple_users_acl(
533            self, acl_list: list[dict], invite_users_not_found: bool = False
534    ) -> requests.Response:
535        """
536        Update the access control list (ACL) for multiple users in the workspace.
537
538        **Args:**
539        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
540        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
541                the workspace. Defaults to `False`
542
543        **Returns:**
544        - requests.Response: The response from the request.
545        """
546        url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
547            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
548        logging.info(
549            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
550        response = self.request_util.run_request(
551            uri=url,
552            method=PATCH,
553            content_type=APPLICATION_JSON,
554            data=json.dumps(acl_list)
555        )
556
557        if response.json()["usersNotFound"] and not invite_users_not_found:
558            # Will be a list of one user
559            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
560            raise Exception(
561                f"The following users were not found and access was not updated: {users_not_found}"
562            )
563        return response

Update the access control list (ACL) for multiple users in the workspace.

Args:

  • acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
def create_workspace( self, auth_domain: list[dict] = [], attributes: dict = {}, continue_if_exists: bool = False) -> requests.models.Response:
565    def create_workspace(
566            self,
567            auth_domain: list[dict] = [],
568            attributes: dict = {},
569            continue_if_exists: bool = False,
570    ) -> requests.Response:
571        """
572        Create a new workspace in Terra.
573
574        **Args:**
575        - auth_domain (list[dict], optional): A list of authorization domains. Should look
576                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
577        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
578        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
579
580        **Returns:**
581        - requests.Response: The response from the request.
582        """
583        payload = {
584            "namespace": self.billing_project,
585            "name": self.workspace_name,
586            "authorizationDomain": auth_domain,
587            "attributes": attributes,
588            "cloudPlatform": GCP
589        }
590        # If workspace already exists then continue if exists
591        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
592        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
593        response = self.request_util.run_request(
594            uri=f"{self.terra_link}/workspaces",
595            method=POST,
596            content_type=APPLICATION_JSON,
597            data=json.dumps(payload),
598            accept_return_codes=accept_return_codes
599        )
600        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
601            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
602        return response

Create a new workspace in Terra.

Args:

  • auth_domain (list[dict], optional): A list of authorization domains. Should look like [{"membersGroupName": "some_auth_domain"}]. Defaults to an empty list.
  • attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
  • continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
604    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
605        """
606        Create an ingest dictionary for workspace attributes.
607
608        **Args:**
609        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
610
611        **Returns:**
612        - list[dict]: A list of dictionaries containing the workspace attributes.
613        """
614        # If not provided then call API to get it
615        workspace_attributes = (
616            workspace_attributes if workspace_attributes
617            else self.get_workspace_info().json()["workspace"]["attributes"]
618        )
619
620        ingest_dict = []
621        for key, value in workspace_attributes.items():
622            # If value is dict just use 'items' as value
623            if isinstance(value, dict):
624                value = value.get("items")
625            # If value is list convert to comma separated string
626            if isinstance(value, list):
627                value = ", ".join(value)
628            ingest_dict.append(
629                {
630                    "attribute": key,
631                    "value": str(value) if value else None
632                }
633            )
634        return ingest_dict

Create an ingest dictionary for workspace attributes.

Args:

  • workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.

Returns:

  • list[dict]: A list of dictionaries containing the workspace attributes.
def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.models.Response:
636    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
637        """
638        Upload metadata to the workspace table.
639
640        **Args:**
641        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
642
643        **Returns:**
644        - requests.Response: The response from the request.
645        """
646        endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
647        data = {"entities": open(entities_tsv, "rb")}
648        return self.request_util.upload_file(
649            uri=endpoint,
650            data=data
651        )

Upload metadata to the workspace table.

Args:

  • entities_tsv (str): The path to the TSV file containing the metadata to upload.

Returns:

  • requests.Response: The response from the request.
def get_workspace_workflows(self) -> requests.models.Response:
653    def get_workspace_workflows(self) -> requests.Response:
654        """
655        Get the workflows for the workspace.
656
657        **Returns:**
658        - requests.Response: The response from the request.
659        """
660        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
661        return self.request_util.run_request(
662            uri=uri,
663            method=GET
664        )

Get the workflows for the workspace.

Returns:

  • requests.Response: The response from the request.
def import_workflow( self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.models.Response:
666    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
667        """
668        Import a workflow into the workspace.
669
670        **Args:**
671        - workflow_dict (dict): The dictionary containing the workflow information.
672        - continue_if_exists (bool, optional): Whether to continue if the workflow
673                already exists. Defaults to `False`.
674
675        **Returns:**
676        - requests.Response: The response from the request.
677        """
678        uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
679        workflow_json = json.dumps(workflow_dict)
680        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
681        return self.request_util.run_request(
682            uri=uri,
683            method=POST,
684            data=workflow_json,
685            content_type=APPLICATION_JSON,
686            accept_return_codes=accept_return_codes
687        )

Import a workflow into the workspace.

Args:

  • workflow_dict (dict): The dictionary containing the workflow information.
  • continue_if_exists (bool, optional): Whether to continue if the workflow already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_workspace(self) -> requests.models.Response:
689    def delete_workspace(self) -> requests.Response:
690        """
691        Delete a Terra workspace.
692
693        **Returns:**
694        - requests.Response: The response from the request.
695        """
696        return self.request_util.run_request(
697            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}",
698            method=DELETE
699        )

Delete a Terra workspace.

Returns:

  • requests.Response: The response from the request.
def update_workspace_attributes(self, attributes: list[dict]) -> requests.models.Response:
701    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
702        """
703        Update the attributes for the workspace.
704
705        **Args:**
706        - attributes (dict): The attributes to update.
707
708        **Returns:**
709        - requests.Response: The response from the request.
710        """
711        return self.request_util.run_request(
712            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
713            method=PATCH,
714            data=json.dumps(attributes),
715            content_type=APPLICATION_JSON
716        )

Update the attributes for the workspace.

Args:

  • attributes (dict): The attributes to update.

Returns:

  • requests.Response: The response from the request.
def leave_workspace( self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False) -> requests.models.Response:
718    def leave_workspace(
719            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
720    ) -> requests.Response:
721        """
722        Leave a workspace. If workspace ID not supplied, will look it up.
723
724        **Args:**
725        - workspace_id (str, optional): The workspace ID. Defaults to None.
726        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
727             Defaults to `False`.
728
729        **Returns:**
730        - requests.Response: The response from the request.
731        """
732        if not workspace_id:
733            workspace_info = self.get_workspace_info().json()
734            workspace_id = workspace_info['workspace']['workspaceId']
735        accepted_return_code = [403] if ignore_direct_access_error else []
736
737        res = self.request_util.run_request(
738            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
739            method=DELETE,
740            accept_return_codes=accepted_return_code
741        )
742        if (res.status_code == 403
743                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
744            logging.info(
745                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
746                f"access to the workspace (they could be an owner on the billing project)"
747            )
748        return res

Leave a workspace. If workspace ID not supplied, will look it up.

Args:

  • workspace_id (str, optional): The workspace ID. Defaults to None.
  • ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def change_workspace_public_setting(self, public: bool) -> requests.models.Response:
750    def change_workspace_public_setting(self, public: bool) -> requests.Response:
751        """
752        Change a workspace's public setting.
753
754        **Args:**
755        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
756         public, `False` otherwise.
757
758        **Returns:**
759        - requests.Response: The response from the request.
760        """
761        body = [
762            {
763                "settingType": "PubliclyReadable",
764                "config": {
765                    "enabled": public
766                }
767            }
768        ]
769        return self.request_util.run_request(
770            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
771            method=PUT,
772            content_type=APPLICATION_JSON,
773            data=json.dumps(body)
774        )

Change a workspace's public setting.

Args:

  • public (bool, optional): Whether the workspace should be public. Set to True to be made public, False otherwise.

Returns:

  • requests.Response: The response from the request.
def check_workspace_public(self, bucket: Optional[str] = None) -> requests.models.Response:
776    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
777        """
778        Check if a workspace is public.
779
780        **Args:**
781        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
782        it up if not provided. Defaults to None.
783
784        **Returns:**
785        - requests.Response: The response from the request.
786        """
787        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
788        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
789        return self.request_util.run_request(
790            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
791            method=GET
792        )

Check if a workspace is public.

Args:

  • bucket (str, optional): The bucket name (provided without the gs:// prefix). Will look it up if not provided. Defaults to None.

Returns:

  • requests.Response: The response from the request.
def delete_entity_table(self, entity_to_delete: str) -> requests.models.Response:
794    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
795        """Delete an entire entity table from a Terra workspace.
796
797        **Args:**
798        - entity_to_delete (str): The name of the entity table to delete.
799
800        **Returns:**
801        - requests.Response: The response from the request.
802        """
803        response = self.request_util.run_request(
804            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
805            method=DELETE
806        )
807        if response.status_code == 204:
808            logging.info(
809                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
810                f"'{self.billing_project}/{self.workspace_name}'"
811            )
812        else:
813            logging.error(
814                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
815                f"table: {response.text}"
816            )
817        return response

Delete an entire entity table from a Terra workspace.

Args:

  • entity_to_delete (str): The name of the entity table to delete.

Returns:

  • requests.Response: The response from the request.
def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
819    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
820        """Save an entity table version in a Terra workspace.
821
822        **Args:**
823        - entity_type (str): The name of the entity table to save a new version for
824        - version_name (str): The name of the new version
825        """
826        # Get the workspace metrics
827        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
828        file_name = f"{entity_type}.json"
829        # Write the workspace metrics to a JSON file
830        with open(file_name, "w") as json_file:
831            json.dump(workspace_metrics, json_file)
832
833        # Create a zip file with the same naming convention that Terra backend uses
834        timestamp_ms = int(time.time() * 1000)
835        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
836        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
837            zipf.write(file_name, arcname=f"json/{file_name}")
838
839        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
840        workspace_info = self.get_workspace_info().json()
841        path_to_upload_to = os.path.join(
842            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
843        )
844        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
845        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
846        try:
847            active_account = gcp_util.get_active_gcloud_account()
848        except Exception as e:
849            active_account = workspace_info["workspace"]["createdBy"]
850            logging.error(
851                f"Encountered the following exception while attempting to get current GCP account: {e}. "
852                f"Will set the owner of the new metadata version as the workspace creator instead."
853            )
854        gcp_util.upload_blob(
855            source_file=zip_file_name,
856            destination_path=path_to_upload_to,
857            custom_metadata={
858                "createdBy": active_account,
859                "entityType": entity_type,
860                "timestamp": timestamp_ms,
861                "description": version_name,
862            }
863        )

Save an entity table version in a Terra workspace.

Args:

  • entity_type (str): The name of the entity table to save a new version for
  • version_name (str): The name of the new version
def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.models.Response:
865    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
866        """
867        Add a user comment to a submission in Terra.
868
869        **Args:**
870        - submission_id (str): The ID of the submission to add a comment to.
871        - user_comment (str): The comment to add to the submission.
872
873        **Returns:**
874        - requests.Response: The response from the request.
875        """
876        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
877        return self.request_util.run_request(
878            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
879            method=PATCH,
880            content_type=APPLICATION_JSON,
881            data=json.dumps({"userComment": user_comment}),
882        )

Add a user comment to a submission in Terra.

Args:

  • submission_id (str): The ID of the submission to add a comment to.
  • user_comment (str): The comment to add to the submission.

Returns:

  • requests.Response: The response from the request.
def initiate_submission( self, method_config_namespace: str, method_config_name: str, entity_type: str, entity_name: str, expression: str, user_comment: Optional[str], use_call_cache: bool = True) -> requests.models.Response:
884    def initiate_submission(
885            self,
886            method_config_namespace: str,
887            method_config_name: str,
888            entity_type: str,
889            entity_name: str,
890            expression: str,
891            user_comment: Optional[str],
892            use_call_cache: bool = True
893    ) -> requests.Response:
894        """
895        Initiate a submission within a Terra workspace.
896
897        Note - the workflow being initiated MUST already be imported into the workspace.
898
899        **Args:**
900        - method_config_namespace (str): The namespace of the method configuration.
901        - method_config_name (str): The name of the method configuration to use for the submission
902        (i.e. the workflow name).
903        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
904        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
905        "sample_set_1").
906        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
907        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
908        be launched PER SAMPLE, the expression should be `this.samples`.
909        - user_comment (str, optional): The user comment to add to the submission.
910        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
911
912        **Returns:**
913        - requests.Response: The response from the request.
914        """
915        payload = {
916            "methodConfigurationNamespace": method_config_namespace,
917            "methodConfigurationName": method_config_name,
918            "entityType": entity_type,
919            "entityName": entity_name,
920            "expression": expression,
921            "useCallCache": use_call_cache,
922            "deleteIntermediateOutputFiles": False,
923            "useReferenceDisks": False,
924            "ignoreEmptyOutputs": False,
925        }
926        if user_comment:
927            payload["userComment"] = user_comment
928
929        return self.request_util.run_request(
930            uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
931            method=POST,
932            content_type=APPLICATION_JSON,
933            data=json.dumps(payload),
934        )

Initiate a submission within a Terra workspace.

Note - the workflow being initiated MUST already be imported into the workspace.

Args:

  • method_config_namespace (str): The namespace of the method configuration.
  • method_config_name (str): The name of the method configuration to use for the submission (i.e. the workflow name).
  • entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
  • entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or "sample_set_1").
  • expression (str): The "expression" to use. For example, if the entity_type is sample and the workflow is launching one sample, this can be left as this. If the entity_type is sample_set, but one workflow should be launched PER SAMPLE, the expression should be this.samples.
  • user_comment (str, optional): The user comment to add to the submission.
  • use_call_cache (bool, optional): Whether to use the call caching. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def retry_failed_submission(self, submission_id: str) -> requests.models.Response:
936    def retry_failed_submission(self, submission_id: str) -> requests.Response:
937        """
938        Retry a failed submission in Terra.
939
940        **Args:**
941        - submission_id (str): The ID of the submission to retry.
942
943        **Returns:**
944        - requests.Response: The response from the request.
945        """
946        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
947        payload = {"retryType": "Failed"}
948        logging.info(
949            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
950        )
951        return self.request_util.run_request(
952            uri=url,
953            method=POST,
954            content_type=APPLICATION_JSON,
955            data=json.dumps(payload)
956        )

Retry a failed submission in Terra.

Args:

  • submission_id (str): The ID of the submission to retry.

Returns:

  • requests.Response: The response from the request.
def get_submission_status(self, submission_id: str) -> requests.models.Response:
958    def get_submission_status(self, submission_id: str) -> requests.Response:
959        """
960        Get the status of a submission in Terra.
961
962        **Args:**
963        - submission_id (str): The ID of the submission.
964
965        **Returns:**
966        - requests.Response: The response from the request.
967        """
968        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
969        logging.info(
970            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
971        )
972        return self.request_util.run_request(
973            uri=url,
974            method=GET
975        )

Get the status of a submission in Terra.

Args:

  • submission_id (str): The ID of the submission.

Returns:

  • requests.Response: The response from the request.