ops_utils.terra_util

Utilities for working with Terra.

  1"""Utilities for working with Terra."""
  2import json
  3import logging
  4import re
  5from typing import Any, Optional
  6import requests
  7import time
  8import zipfile
  9import os
 10
 11from . import deprecated
 12from .vars import GCP
 13from .gcp_utils import GCPCloudFunctions
 14from .request_util import GET, POST, PATCH, PUT, DELETE, RunRequest
 15
 16TERRA_LINK = "https://api.firecloud.org/api"
 17"""@private"""
 18LEONARDO_LINK = "https://leonardo.dsde-prod.broadinstitute.org/api"
 19"""@private"""
 20WORKSPACE_LINK = "https://workspace.dsde-prod.broadinstitute.org/api/workspaces/v1"
 21"""@private"""
 22SAM_LINK = "https://sam.dsde-prod.broadinstitute.org/api"
 23"""@private"""
 24RAWLS_LINK = "https://rawls.dsde-prod.broadinstitute.org/api"
 25"""@private"""
 26
 27MEMBER = "member"
 28ADMIN = "admin"
 29
 30
 31class Terra:
 32    """Class for generic Terra utilities."""
 33
 34    def __init__(self, request_util: RunRequest):
 35        """
 36        Initialize the Terra class.
 37
 38        **Args:**
 39        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
 40            request utility class to handle HTTP requests.
 41        """
 42        self.request_util = request_util
 43        """@private"""
 44
 45    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
 46        """
 47        Fetch the list of accessible workspaces.
 48
 49        **Args:**
 50        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
 51
 52        **Returns:**
 53        - requests.Response: The response from the request.
 54        """
 55        fields_str = "fields=" + ",".join(fields) if fields else ""
 56        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
 57        return self.request_util.run_request(
 58            uri=url,
 59            method=GET
 60        )
 61
 62    def get_pet_account_json(self) -> requests.Response:
 63        """
 64        Get the service account JSON.
 65
 66        **Returns:**
 67        - requests.Response: The response from the request.
 68        """
 69        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
 70        return self.request_util.run_request(
 71            uri=url,
 72            method=GET
 73        )
 74
 75
 76class TerraGroups:
 77    """A class to manage Terra groups and their memberships."""
 78
 79    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
 80    """@private"""
 81    CONFLICT_STATUS_CODE = 409
 82    """@private"""
 83
 84    def __init__(self, request_util: RunRequest):
 85        """
 86        Initialize the TerraGroups class.
 87
 88        **Args:**
 89        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
 90         utility class to handle HTTP requests.
 91        """
 92        self.request_util = request_util
 93        """@private"""
 94
 95    def _check_role(self, role: str) -> None:
 96        """
 97        Check if the role is valid.
 98
 99        Args:
100            role (str): The role to check.
101
102        Raises:
103            ValueError: If the role is not one of the allowed options.
104        """
105        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
106            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
107
108    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
109        """
110        Remove a user from a group.
111
112        **Args:**
113        - group (str): The name of the group.
114        - email (str): The email of the user to remove.
115        - role (str): The role of the user in the group
116            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
117
118        **Returns:**
119        - requests.Response: The response from the request.
120        """
121        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
122        self._check_role(role)
123        logging.info(f"Removing {email} from group {group}")
124        return self.request_util.run_request(
125            uri=url,
126            method=DELETE
127        )
128
129    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
130        """
131        Create a new group.
132
133        **Args:**
134        - group_name (str): The name of the group to create.
135        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
136
137        **Returns:**
138        - requests.Response: The response from the request.
139        """
140        url = f"{SAM_LINK}/groups/v1/{group_name}"
141        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
142        response = self.request_util.run_request(
143            uri=url,
144            method=POST,
145            accept_return_codes=accept_return_codes
146        )
147        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
148            logging.info(f"Group {group_name} already exists. Continuing.")
149            return response
150        else:
151            logging.info(f"Created group {group_name}")
152            return response
153
154    def delete_group(self, group_name: str) -> requests.Response:
155        """
156        Delete a group.
157
158        **Args:**
159        - group_name (str): The name of the group to delete.
160
161        **Returns:**
162        - requests.Response: The response from the request.
163        """
164        url = f"{SAM_LINK}/groups/v1/{group_name}"
165        logging.info(f"Deleting group {group_name}")
166        return self.request_util.run_request(
167            uri=url,
168            method=DELETE
169        )
170
171    def add_user_to_group(
172            self, group: str, email: str, role: str, continue_if_exists: bool = False
173    ) -> requests.Response:
174        """
175        Add a user to a group.
176
177        **Args:**
178        - group (str): The name of the group.
179        - email (str): The email of the user to add.
180        - role (str): The role of the user in the group
181            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
182        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
183                Defaults to `False`.
184
185        **Returns:**
186        - requests.Response: The response from the request.
187        """
188        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
189        self._check_role(role)
190        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
191        logging.info(f"Adding {email} to group {group} as {role}")
192        return self.request_util.run_request(
193            uri=url,
194            method=PUT,
195            accept_return_codes=accept_return_codes
196        )
197
198
199class TerraWorkspace:
200    """Terra workspace class to manage workspaces and their attributes."""
201
202    CONFLICT_STATUS_CODE = 409
203    """@private"""
204
205    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest):
206        """
207        Initialize the TerraWorkspace class.
208
209        **Args:**
210        - billing_project (str): The billing project associated with the workspace.
211        - workspace_name (str): The name of the workspace.
212        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
213            request utility class to handle HTTP requests.
214        """
215        self.billing_project = billing_project
216        """@private"""
217        self.workspace_name = workspace_name
218        """@private"""
219        self.workspace_id = None
220        """@private"""
221        self.resource_id = None
222        """@private"""
223        self.storage_container = None
224        """@private"""
225        self.bucket = None
226        """@private"""
227        self.wds_url = None
228        """@private"""
229        self.account_url: Optional[str] = None
230        """@private"""
231        self.request_util = request_util
232        """@private"""
233
234    def __repr__(self) -> str:
235        """
236        Return a string representation of the TerraWorkspace instance.
237
238        Returns:
239            str: The string representation of the TerraWorkspace instance.
240        """
241        return f"{self.billing_project}/{self.workspace_name}"
242
243    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any:
244        """
245        Yield all entity metrics from the workspace.
246
247        Args:
248            entity (str): The type of entity to query.
249            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
250
251        Yields:
252            Any: The JSON response containing entity metrics.
253        """
254        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
255        response = self.request_util.run_request(
256            uri=url,
257            method=GET,
258            content_type="application/json"
259        )
260        first_page_json = response.json()
261        yield first_page_json
262        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
263        logging.info(
264            f"Looping through {total_pages} pages of data")
265
266        for page in range(2, total_pages + 1):
267            logging.info(f"Getting page {page} of {total_pages}")
268            next_page = self.request_util.run_request(
269                uri=url,
270                method=GET,
271                content_type="application/json",
272                params={"page": page}
273            )
274            yield next_page.json()
275
276    @staticmethod
277    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
278        """Check that all headers follow the standards required by TDR.
279
280        **Args:**
281        - table_name (str): The name of the Terra table.
282        - headers (list[str]): The headers of the Terra table to validate.
283
284        **Raises:**
285        - ValueError if any headers are considered invalid by TDR standards
286        """
287        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
288        tdr_max_header_length = 63
289
290        headers_containing_too_many_characters = []
291        headers_contain_invalid_characters = []
292
293        for header in headers:
294            if len(header) > tdr_max_header_length:
295                headers_containing_too_many_characters.append(header)
296            if not re.match(tdr_header_allowed_pattern, header):
297                headers_contain_invalid_characters.append(header)
298
299        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
300        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
301        header naming."""
302        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
303        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
304        allowed in TDR is {tdr_max_header_length}.\n"""
305        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
306        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
307        only contain numbers, letters, and underscore characters.\n"""
308
309        error_to_report = ""
310        if headers_containing_too_many_characters:
311            error_to_report += too_many_characters_error_message
312        if headers_contain_invalid_characters:
313            error_to_report += invalid_characters_error_message
314        if error_to_report:
315            error_to_report += base_error_message
316            raise ValueError(error_to_report)
317
318    def get_workspace_info(self) -> requests.Response:
319        """
320        Get workspace information.
321
322        **Returns:**
323        - requests.Response: The response from the request.
324        """
325        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}"
326        logging.info(
327            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
328        return self.request_util.run_request(uri=url, method=GET)
329
330    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
331        """
332        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
333
334        **Args:**
335        - entity_type (str): The type of entity to get metrics for.
336        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
337
338        **Returns:**
339        - list[dict]: A list of dictionaries containing entity metrics.
340        """
341        results = []
342        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
343
344        for page in self._yield_all_entity_metrics(entity=entity_type):
345            results.extend(page["results"])
346
347        # If remove_dicts is True, remove dictionaries from the workspace metrics
348        if remove_dicts:
349            for row in results:
350                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
351        return results
352
353    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
354        """
355        Get specific entity metrics for a given entity type and name.
356
357        **Args:**
358        - entity_type (str): The type of entity to get metrics for.
359        - entity_name (str): The name of the entity to get metrics for.
360
361        **Returns:**
362        - requests.Response: The response from the request.
363        """
364        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
365        return self.request_util.run_request(uri=url, method=GET)
366
367    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
368        """
369        Remove dictionaries from the attributes.
370
371        Args:
372            attributes (dict): The attributes to remove dictionaries from.
373
374        Returns:
375            dict: The updated attributes with no dictionaries.
376        """
377        for key, value in attributes.items():
378            attributes[key] = self._remove_dict_from_cell(value)
379        return attributes
380
381    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
382        """
383        Remove a dictionary from a cell.
384
385        Args:
386            cell_value (Any): The dictionary to remove.
387
388        Returns:
389            Any: The updated cell with no dictionaries.
390        """
391        if isinstance(cell_value, dict):
392            entity_name = cell_value.get("entityName")
393            # If the cell value is a dictionary, check if it has an entityName key
394            if entity_name:
395                # If the cell value is a dictionary with an entityName key, return the entityName
396                return entity_name
397            entity_list = cell_value.get("items")
398            if entity_list or entity_list == []:
399                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
400                return [
401                    self._remove_dict_from_cell(entity) for entity in entity_list
402                ]
403            return cell_value
404        return cell_value
405
406    def get_workspace_bucket(self) -> str:
407        """
408        Get the workspace bucket name. Does not include the `gs://` prefix.
409
410        **Returns:**
411        - str: The bucket name.
412        """
413        return self.get_workspace_info().json()["workspace"]["bucketName"]
414
415    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
416        """
417        Get workspace entity information.
418
419        **Args:**
420        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
421
422        **Returns:**
423        - requests.Response: The response from the request.
424        """
425        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
426        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
427        return self.request_util.run_request(uri=url, method=GET)
428
429    def get_workspace_acl(self) -> requests.Response:
430        """
431        Get the workspace access control list (ACL).
432
433        **Returns:**
434        - requests.Response: The response from the request.
435        """
436        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
437        return self.request_util.run_request(
438            uri=url,
439            method=GET
440        )
441
442    def update_user_acl(
443            self,
444            email: str,
445            access_level: str,
446            can_share: bool = False,
447            can_compute: bool = False,
448            invite_users_not_found: bool = False,
449    ) -> requests.Response:
450        """
451        Update the access control list (ACL) for a user in the workspace.
452
453        **Args:**
454        - email (str): The email of the user.
455        - access_level (str): The access level to grant to the user.
456        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
457        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
458        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
459                the workspace. Defaults to `False`
460
461        **Returns:**
462        - requests.Response: The response from the request.
463        """
464        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
465              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
466        payload = {
467            "email": email,
468            "accessLevel": access_level,
469            "canShare": can_share,
470            "canCompute": can_compute,
471        }
472        logging.info(
473            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
474        response = self.request_util.run_request(
475            uri=url,
476            method=PATCH,
477            content_type="application/json",
478            data="[" + json.dumps(payload) + "]"
479        )
480
481        if response.json()["usersNotFound"] and not invite_users_not_found:
482            # Will be a list of one user
483            user_not_found = response.json()["usersNotFound"][0]
484            raise Exception(
485                f'The user {user_not_found["email"]} was not found and access was not updated'
486            )
487        return response
488
489    @deprecated(
490        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
491    )
492    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
493        """
494        Update the metadata for a library dataset.
495
496        **Args:**
497        - library_metadata (dict): The metadata to update.
498        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
499
500        **Returns:**
501        - requests.Response: The response from the request.
502        """
503        acl = f"{TERRA_LINK}/library/{self.billing_project}/{self.workspace_name}" + \
504              f"/metadata?validate={str(validate).lower()}"
505        return self.request_util.run_request(
506            uri=acl,
507            method=PUT,
508            data=json.dumps(library_metadata)
509        )
510
511    def update_multiple_users_acl(
512            self, acl_list: list[dict], invite_users_not_found: bool = False
513    ) -> requests.Response:
514        """
515        Update the access control list (ACL) for multiple users in the workspace.
516
517        **Args:**
518        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
519        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
520                the workspace. Defaults to `False`
521
522        **Returns:**
523        - requests.Response: The response from the request.
524        """
525        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
526            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
527        logging.info(
528            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
529        response = self.request_util.run_request(
530            uri=url,
531            method=PATCH,
532            content_type="application/json",
533            data=json.dumps(acl_list)
534        )
535
536        if response.json()["usersNotFound"] and not invite_users_not_found:
537            # Will be a list of one user
538            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
539            raise Exception(
540                f"The following users were not found and access was not updated: {users_not_found}"
541            )
542        return response
543
544    def create_workspace(
545            self,
546            auth_domain: list[dict] = [],
547            attributes: dict = {},
548            continue_if_exists: bool = False,
549    ) -> requests.Response:
550        """
551        Create a new workspace in Terra.
552
553        **Args:**
554        - auth_domain (list[dict], optional): A list of authorization domains. Should look
555                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
556        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
557        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
558
559        **Returns:**
560        - requests.Response: The response from the request.
561        """
562        payload = {
563            "namespace": self.billing_project,
564            "name": self.workspace_name,
565            "authorizationDomain": auth_domain,
566            "attributes": attributes,
567            "cloudPlatform": GCP
568        }
569        # If workspace already exists then continue if exists
570        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
571        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
572        response = self.request_util.run_request(
573            uri=f"{TERRA_LINK}/workspaces",
574            method=POST,
575            content_type="application/json",
576            data=json.dumps(payload),
577            accept_return_codes=accept_return_codes
578        )
579        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
580            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
581        return response
582
583    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
584        """
585        Create an ingest dictionary for workspace attributes.
586
587        **Args:**
588        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
589
590        **Returns:**
591        - list[dict]: A list of dictionaries containing the workspace attributes.
592        """
593        # If not provided then call API to get it
594        workspace_attributes = (
595            workspace_attributes if workspace_attributes
596            else self.get_workspace_info().json()["workspace"]["attributes"]
597        )
598
599        ingest_dict = []
600        for key, value in workspace_attributes.items():
601            # If value is dict just use 'items' as value
602            if isinstance(value, dict):
603                value = value.get("items")
604            # If value is list convert to comma separated string
605            if isinstance(value, list):
606                value = ", ".join(value)
607            ingest_dict.append(
608                {
609                    "attribute": key,
610                    "value": str(value) if value else None
611                }
612            )
613        return ingest_dict
614
615    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
616        """
617        Upload metadata to the workspace table.
618
619        **Args:**
620        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
621
622        **Returns:**
623        - requests.Response: The response from the request.
624        """
625        endpoint = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
626        data = {"entities": open(entities_tsv, "rb")}
627        return self.request_util.upload_file(
628            uri=endpoint,
629            data=data
630        )
631
632    def get_workspace_workflows(self) -> requests.Response:
633        """
634        Get the workflows for the workspace.
635
636        **Returns:**
637        - requests.Response: The response from the request.
638        """
639        uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
640        return self.request_util.run_request(
641            uri=uri,
642            method=GET
643        )
644
645    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
646        """
647        Import a workflow into the workspace.
648
649        **Args:**
650        - workflow_dict (dict): The dictionary containing the workflow information.
651        - continue_if_exists (bool, optional): Whether to continue if the workflow
652                already exists. Defaults to `False`.
653
654        **Returns:**
655        - requests.Response: The response from the request.
656        """
657        uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
658        workflow_json = json.dumps(workflow_dict)
659        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
660        return self.request_util.run_request(
661            uri=uri,
662            method=POST,
663            data=workflow_json,
664            content_type="application/json",
665            accept_return_codes=accept_return_codes
666        )
667
668    def delete_workspace(self) -> requests.Response:
669        """
670        Delete a Terra workspace.
671
672        **Returns:**
673        - requests.Response: The response from the request.
674        """
675        return self.request_util.run_request(
676            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}",
677            method=DELETE
678        )
679
680    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
681        """
682        Update the attributes for the workspace.
683
684        **Args:**
685        - attributes (dict): The attributes to update.
686
687        **Returns:**
688        - requests.Response: The response from the request.
689        """
690        return self.request_util.run_request(
691            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
692            method=PATCH,
693            data=json.dumps(attributes),
694            content_type="application/json"
695        )
696
697    def leave_workspace(
698            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
699    ) -> requests.Response:
700        """
701        Leave a workspace. If workspace ID not supplied, will look it up.
702
703        **Args:**
704        - workspace_id (str, optional): The workspace ID. Defaults to None.
705        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
706             Defaults to `False`.
707
708        **Returns:**
709        - requests.Response: The response from the request.
710        """
711        if not workspace_id:
712            workspace_info = self.get_workspace_info().json()
713            workspace_id = workspace_info['workspace']['workspaceId']
714        accepted_return_code = [403] if ignore_direct_access_error else []
715
716        res = self.request_util.run_request(
717            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
718            method=DELETE,
719            accept_return_codes=accepted_return_code
720        )
721        if (res.status_code == 403
722                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
723            logging.info(
724                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
725                f"access to the workspace (they could be an owner on the billing project)"
726            )
727        return res
728
729    def change_workspace_public_setting(self, public: bool) -> requests.Response:
730        """
731        Change a workspace's public setting.
732
733        **Args:**
734        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
735         public, `False` otherwise.
736
737        **Returns:**
738        - requests.Response: The response from the request.
739        """
740        body = [
741            {
742                "settingType": "PubliclyReadable",
743                "config": {
744                    "enabled": public
745                }
746            }
747        ]
748        return self.request_util.run_request(
749            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
750            method=PUT,
751            content_type="application/json",
752            data=json.dumps(body)
753        )
754
755    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
756        """
757        Check if a workspace is public.
758
759        **Args:**
760        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
761        it up if not provided. Defaults to None.
762
763        **Returns:**
764        - requests.Response: The response from the request.
765        """
766        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
767        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
768        return self.request_util.run_request(
769            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
770            method=GET
771        )
772
773    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
774        """Delete an entire entity table from a Terra workspace.
775
776        **Args:**
777        - entity_to_delete (str): The name of the entity table to delete.
778
779        **Returns:**
780        - requests.Response: The response from the request.
781        """
782        response = self.request_util.run_request(
783            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
784            method=DELETE
785        )
786        if response.status_code == 204:
787            logging.info(
788                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
789                f"'{self.billing_project}/{self.workspace_name}'"
790            )
791        else:
792            logging.error(
793                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
794                f"table: {response.text}"
795            )
796        return response
797
798    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
799        """Save an entity table version in a Terra workspace.
800
801        **Args:**
802        - entity_type (str): The name of the entity table to save a new version for
803        - version_name (str): The name of the new version
804        """
805        # Get the workspace metrics
806        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
807        file_name = f"{entity_type}.json"
808        # Write the workspace metrics to a JSON file
809        with open(file_name, "w") as json_file:
810            json.dump(workspace_metrics, json_file)
811
812        # Create a zip file with the same naming convention that Terra backend uses
813        timestamp_ms = int(time.time() * 1000)
814        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
815        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
816            zipf.write(file_name, arcname=f"json/{file_name}")
817
818        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
819        workspace_info = self.get_workspace_info().json()
820        path_to_upload_to = os.path.join(
821            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
822        )
823        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
824        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
825        try:
826            active_account = gcp_util.get_active_gcloud_account()
827        except Exception as e:
828            active_account = workspace_info["workspace"]["createdBy"]
829            logging.error(
830                f"Encountered the following exception while attempting to get current GCP account: {e}. "
831                f"Will set the owner of the new metadata version as the workspace creator instead."
832            )
833        gcp_util.upload_blob(
834            source_file=zip_file_name,
835            destination_path=path_to_upload_to,
836            custom_metadata={
837                "createdBy": active_account,
838                "entityType": entity_type,
839                "timestamp": timestamp_ms,
840                "description": version_name,
841            }
842        )
843
844    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
845        """
846        Add a user comment to a submission in Terra.
847
848        **Args:**
849        - submission_id (str): The ID of the submission to add a comment to.
850        - user_comment (str): The comment to add to the submission.
851
852        **Returns:**
853        - requests.Response: The response from the request.
854        """
855        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
856        return self.request_util.run_request(
857            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
858            method=PATCH,
859            content_type="application/json",
860            data=json.dumps({"userComment": user_comment}),
861        )
862
863    def initiate_submission(
864            self,
865            method_config_namespace: str,
866            method_config_name: str,
867            entity_type: str,
868            entity_name: str,
869            expression: str,
870            user_comment: Optional[str],
871            use_call_cache: bool = True
872    ) -> requests.Response:
873        """
874        Initiate a submission within a Terra workspace.
875
876        Note - the workflow being initiated MUST already be imported into the workspace.
877
878        **Args:**
879        - method_config_namespace (str): The namespace of the method configuration.
880        - method_config_name (str): The name of the method configuration to use for the submission
881        (i.e. the workflow name).
882        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
883        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
884        "sample_set_1").
885        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
886        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
887        be launched PER SAMPLE, the expression should be `this.samples`.
888        - user_comment (str, optional): The user comment to add to the submission.
889        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
890
891        **Returns:**
892        - requests.Response: The response from the request.
893        """
894        payload = {
895            "methodConfigurationNamespace": method_config_namespace,
896            "methodConfigurationName": method_config_name,
897            "entityType": entity_type,
898            "entityName": entity_name,
899            "expression": expression,
900            "useCallCache": use_call_cache,
901            "deleteIntermediateOutputFiles": False,
902            "useReferenceDisks": False,
903            "ignoreEmptyOutputs": False,
904        }
905        if user_comment:
906            payload["userComment"] = user_comment
907
908        return self.request_util.run_request(
909            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
910            method=POST,
911            content_type="application/json",
912            data=json.dumps(payload),
913        )
914      
915    def retry_failed_submission(self, submission_id: str) -> requests.Response:
916        """
917        Retry a failed submission in Terra.
918
919        **Args:**
920        - submission_id (str): The ID of the submission to retry.
921
922        **Returns:**
923        - requests.Response: The response from the request.
924        """
925        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
926        payload = {"retryType": "Failed"}
927        logging.info(
928            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
929        )
930        return self.request_util.run_request(
931            uri=url,
932            method=POST,
933            content_type="application/json",
934            data=json.dumps(payload)
935        )
936
937    def get_submission_status(self, submission_id: str) -> requests.Response:
938        """
939        Get the status of a submission in Terra.
940
941        **Args:**
942        - submission_id (str): The ID of the submission.
943
944        **Returns:**
945        - requests.Response: The response from the request.
946        """
947        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
948        logging.info(
949            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
950        )
951        return self.request_util.run_request(
952            uri=url,
953            method=GET
954        )
MEMBER = 'member'
ADMIN = 'admin'
class Terra:
32class Terra:
33    """Class for generic Terra utilities."""
34
35    def __init__(self, request_util: RunRequest):
36        """
37        Initialize the Terra class.
38
39        **Args:**
40        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
41            request utility class to handle HTTP requests.
42        """
43        self.request_util = request_util
44        """@private"""
45
46    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
47        """
48        Fetch the list of accessible workspaces.
49
50        **Args:**
51        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
52
53        **Returns:**
54        - requests.Response: The response from the request.
55        """
56        fields_str = "fields=" + ",".join(fields) if fields else ""
57        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
58        return self.request_util.run_request(
59            uri=url,
60            method=GET
61        )
62
63    def get_pet_account_json(self) -> requests.Response:
64        """
65        Get the service account JSON.
66
67        **Returns:**
68        - requests.Response: The response from the request.
69        """
70        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
71        return self.request_util.run_request(
72            uri=url,
73            method=GET
74        )

Class for generic Terra utilities.

Terra(request_util: ops_utils.request_util.RunRequest)
35    def __init__(self, request_util: RunRequest):
36        """
37        Initialize the Terra class.
38
39        **Args:**
40        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
41            request utility class to handle HTTP requests.
42        """
43        self.request_util = request_util
44        """@private"""

Initialize the Terra class.

Args:

def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.models.Response:
46    def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response:
47        """
48        Fetch the list of accessible workspaces.
49
50        **Args:**
51        - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
52
53        **Returns:**
54        - requests.Response: The response from the request.
55        """
56        fields_str = "fields=" + ",".join(fields) if fields else ""
57        url = f'{RAWLS_LINK}/workspaces?{fields_str}'
58        return self.request_util.run_request(
59            uri=url,
60            method=GET
61        )

Fetch the list of accessible workspaces.

Args:

  • fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.

Returns:

  • requests.Response: The response from the request.
def get_pet_account_json(self) -> requests.models.Response:
63    def get_pet_account_json(self) -> requests.Response:
64        """
65        Get the service account JSON.
66
67        **Returns:**
68        - requests.Response: The response from the request.
69        """
70        url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key"
71        return self.request_util.run_request(
72            uri=url,
73            method=GET
74        )

Get the service account JSON.

Returns:

  • requests.Response: The response from the request.
class TerraGroups:
 77class TerraGroups:
 78    """A class to manage Terra groups and their memberships."""
 79
 80    GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN]
 81    """@private"""
 82    CONFLICT_STATUS_CODE = 409
 83    """@private"""
 84
 85    def __init__(self, request_util: RunRequest):
 86        """
 87        Initialize the TerraGroups class.
 88
 89        **Args:**
 90        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
 91         utility class to handle HTTP requests.
 92        """
 93        self.request_util = request_util
 94        """@private"""
 95
 96    def _check_role(self, role: str) -> None:
 97        """
 98        Check if the role is valid.
 99
100        Args:
101            role (str): The role to check.
102
103        Raises:
104            ValueError: If the role is not one of the allowed options.
105        """
106        if role not in self.GROUP_MEMBERSHIP_OPTIONS:
107            raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}")
108
109    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
110        """
111        Remove a user from a group.
112
113        **Args:**
114        - group (str): The name of the group.
115        - email (str): The email of the user to remove.
116        - role (str): The role of the user in the group
117            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
118
119        **Returns:**
120        - requests.Response: The response from the request.
121        """
122        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
123        self._check_role(role)
124        logging.info(f"Removing {email} from group {group}")
125        return self.request_util.run_request(
126            uri=url,
127            method=DELETE
128        )
129
130    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
131        """
132        Create a new group.
133
134        **Args:**
135        - group_name (str): The name of the group to create.
136        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
137
138        **Returns:**
139        - requests.Response: The response from the request.
140        """
141        url = f"{SAM_LINK}/groups/v1/{group_name}"
142        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
143        response = self.request_util.run_request(
144            uri=url,
145            method=POST,
146            accept_return_codes=accept_return_codes
147        )
148        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
149            logging.info(f"Group {group_name} already exists. Continuing.")
150            return response
151        else:
152            logging.info(f"Created group {group_name}")
153            return response
154
155    def delete_group(self, group_name: str) -> requests.Response:
156        """
157        Delete a group.
158
159        **Args:**
160        - group_name (str): The name of the group to delete.
161
162        **Returns:**
163        - requests.Response: The response from the request.
164        """
165        url = f"{SAM_LINK}/groups/v1/{group_name}"
166        logging.info(f"Deleting group {group_name}")
167        return self.request_util.run_request(
168            uri=url,
169            method=DELETE
170        )
171
172    def add_user_to_group(
173            self, group: str, email: str, role: str, continue_if_exists: bool = False
174    ) -> requests.Response:
175        """
176        Add a user to a group.
177
178        **Args:**
179        - group (str): The name of the group.
180        - email (str): The email of the user to add.
181        - role (str): The role of the user in the group
182            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
183        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
184                Defaults to `False`.
185
186        **Returns:**
187        - requests.Response: The response from the request.
188        """
189        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
190        self._check_role(role)
191        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
192        logging.info(f"Adding {email} to group {group} as {role}")
193        return self.request_util.run_request(
194            uri=url,
195            method=PUT,
196            accept_return_codes=accept_return_codes
197        )

A class to manage Terra groups and their memberships.

TerraGroups(request_util: ops_utils.request_util.RunRequest)
85    def __init__(self, request_util: RunRequest):
86        """
87        Initialize the TerraGroups class.
88
89        **Args:**
90        - request_util (`ops_utils.request_util.RunRequest`): An instance of a request
91         utility class to handle HTTP requests.
92        """
93        self.request_util = request_util
94        """@private"""

Initialize the TerraGroups class.

Args:

def remove_user_from_group(self, group: str, email: str, role: str) -> requests.models.Response:
109    def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response:
110        """
111        Remove a user from a group.
112
113        **Args:**
114        - group (str): The name of the group.
115        - email (str): The email of the user to remove.
116        - role (str): The role of the user in the group
117            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
118
119        **Returns:**
120        - requests.Response: The response from the request.
121        """
122        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
123        self._check_role(role)
124        logging.info(f"Removing {email} from group {group}")
125        return self.request_util.run_request(
126            uri=url,
127            method=DELETE
128        )

Remove a user from a group.

Args:

Returns:

  • requests.Response: The response from the request.
def create_group( self, group_name: str, continue_if_exists: bool = False) -> requests.models.Response:
130    def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response:
131        """
132        Create a new group.
133
134        **Args:**
135        - group_name (str): The name of the group to create.
136        - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`.
137
138        **Returns:**
139        - requests.Response: The response from the request.
140        """
141        url = f"{SAM_LINK}/groups/v1/{group_name}"
142        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
143        response = self.request_util.run_request(
144            uri=url,
145            method=POST,
146            accept_return_codes=accept_return_codes
147        )
148        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
149            logging.info(f"Group {group_name} already exists. Continuing.")
150            return response
151        else:
152            logging.info(f"Created group {group_name}")
153            return response

Create a new group.

Args:

  • group_name (str): The name of the group to create.
  • continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_group(self, group_name: str) -> requests.models.Response:
155    def delete_group(self, group_name: str) -> requests.Response:
156        """
157        Delete a group.
158
159        **Args:**
160        - group_name (str): The name of the group to delete.
161
162        **Returns:**
163        - requests.Response: The response from the request.
164        """
165        url = f"{SAM_LINK}/groups/v1/{group_name}"
166        logging.info(f"Deleting group {group_name}")
167        return self.request_util.run_request(
168            uri=url,
169            method=DELETE
170        )

Delete a group.

Args:

  • group_name (str): The name of the group to delete.

Returns:

  • requests.Response: The response from the request.
def add_user_to_group( self, group: str, email: str, role: str, continue_if_exists: bool = False) -> requests.models.Response:
172    def add_user_to_group(
173            self, group: str, email: str, role: str, continue_if_exists: bool = False
174    ) -> requests.Response:
175        """
176        Add a user to a group.
177
178        **Args:**
179        - group (str): The name of the group.
180        - email (str): The email of the user to add.
181        - role (str): The role of the user in the group
182            (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`).
183        - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
184                Defaults to `False`.
185
186        **Returns:**
187        - requests.Response: The response from the request.
188        """
189        url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}"
190        self._check_role(role)
191        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
192        logging.info(f"Adding {email} to group {group} as {role}")
193        return self.request_util.run_request(
194            uri=url,
195            method=PUT,
196            accept_return_codes=accept_return_codes
197        )

Add a user to a group.

Args:

  • group (str): The name of the group.
  • email (str): The email of the user to add.
  • role (str): The role of the user in the group (must be one of ops_utils.terra_utils.MEMBER or ops_utils.terra_utils.ADMIN).
  • continue_if_exists (bool, optional): Whether to continue if the user is already in the group. Defaults to False.

Returns:

  • requests.Response: The response from the request.
class TerraWorkspace:
200class TerraWorkspace:
201    """Terra workspace class to manage workspaces and their attributes."""
202
203    CONFLICT_STATUS_CODE = 409
204    """@private"""
205
206    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest):
207        """
208        Initialize the TerraWorkspace class.
209
210        **Args:**
211        - billing_project (str): The billing project associated with the workspace.
212        - workspace_name (str): The name of the workspace.
213        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
214            request utility class to handle HTTP requests.
215        """
216        self.billing_project = billing_project
217        """@private"""
218        self.workspace_name = workspace_name
219        """@private"""
220        self.workspace_id = None
221        """@private"""
222        self.resource_id = None
223        """@private"""
224        self.storage_container = None
225        """@private"""
226        self.bucket = None
227        """@private"""
228        self.wds_url = None
229        """@private"""
230        self.account_url: Optional[str] = None
231        """@private"""
232        self.request_util = request_util
233        """@private"""
234
235    def __repr__(self) -> str:
236        """
237        Return a string representation of the TerraWorkspace instance.
238
239        Returns:
240            str: The string representation of the TerraWorkspace instance.
241        """
242        return f"{self.billing_project}/{self.workspace_name}"
243
244    def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any:
245        """
246        Yield all entity metrics from the workspace.
247
248        Args:
249            entity (str): The type of entity to query.
250            total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000.
251
252        Yields:
253            Any: The JSON response containing entity metrics.
254        """
255        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}"  # noqa: E501
256        response = self.request_util.run_request(
257            uri=url,
258            method=GET,
259            content_type="application/json"
260        )
261        first_page_json = response.json()
262        yield first_page_json
263        total_pages = first_page_json["resultMetadata"]["filteredPageCount"]
264        logging.info(
265            f"Looping through {total_pages} pages of data")
266
267        for page in range(2, total_pages + 1):
268            logging.info(f"Getting page {page} of {total_pages}")
269            next_page = self.request_util.run_request(
270                uri=url,
271                method=GET,
272                content_type="application/json",
273                params={"page": page}
274            )
275            yield next_page.json()
276
277    @staticmethod
278    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
279        """Check that all headers follow the standards required by TDR.
280
281        **Args:**
282        - table_name (str): The name of the Terra table.
283        - headers (list[str]): The headers of the Terra table to validate.
284
285        **Raises:**
286        - ValueError if any headers are considered invalid by TDR standards
287        """
288        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
289        tdr_max_header_length = 63
290
291        headers_containing_too_many_characters = []
292        headers_contain_invalid_characters = []
293
294        for header in headers:
295            if len(header) > tdr_max_header_length:
296                headers_containing_too_many_characters.append(header)
297            if not re.match(tdr_header_allowed_pattern, header):
298                headers_contain_invalid_characters.append(header)
299
300        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
301        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
302        header naming."""
303        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
304        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
305        allowed in TDR is {tdr_max_header_length}.\n"""
306        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
307        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
308        only contain numbers, letters, and underscore characters.\n"""
309
310        error_to_report = ""
311        if headers_containing_too_many_characters:
312            error_to_report += too_many_characters_error_message
313        if headers_contain_invalid_characters:
314            error_to_report += invalid_characters_error_message
315        if error_to_report:
316            error_to_report += base_error_message
317            raise ValueError(error_to_report)
318
319    def get_workspace_info(self) -> requests.Response:
320        """
321        Get workspace information.
322
323        **Returns:**
324        - requests.Response: The response from the request.
325        """
326        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}"
327        logging.info(
328            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
329        return self.request_util.run_request(uri=url, method=GET)
330
331    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
332        """
333        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
334
335        **Args:**
336        - entity_type (str): The type of entity to get metrics for.
337        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
338
339        **Returns:**
340        - list[dict]: A list of dictionaries containing entity metrics.
341        """
342        results = []
343        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
344
345        for page in self._yield_all_entity_metrics(entity=entity_type):
346            results.extend(page["results"])
347
348        # If remove_dicts is True, remove dictionaries from the workspace metrics
349        if remove_dicts:
350            for row in results:
351                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
352        return results
353
354    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
355        """
356        Get specific entity metrics for a given entity type and name.
357
358        **Args:**
359        - entity_type (str): The type of entity to get metrics for.
360        - entity_name (str): The name of the entity to get metrics for.
361
362        **Returns:**
363        - requests.Response: The response from the request.
364        """
365        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
366        return self.request_util.run_request(uri=url, method=GET)
367
368    def _remove_dict_from_attributes(self, attributes: dict) -> dict:
369        """
370        Remove dictionaries from the attributes.
371
372        Args:
373            attributes (dict): The attributes to remove dictionaries from.
374
375        Returns:
376            dict: The updated attributes with no dictionaries.
377        """
378        for key, value in attributes.items():
379            attributes[key] = self._remove_dict_from_cell(value)
380        return attributes
381
382    def _remove_dict_from_cell(self, cell_value: Any) -> Any:
383        """
384        Remove a dictionary from a cell.
385
386        Args:
387            cell_value (Any): The dictionary to remove.
388
389        Returns:
390            Any: The updated cell with no dictionaries.
391        """
392        if isinstance(cell_value, dict):
393            entity_name = cell_value.get("entityName")
394            # If the cell value is a dictionary, check if it has an entityName key
395            if entity_name:
396                # If the cell value is a dictionary with an entityName key, return the entityName
397                return entity_name
398            entity_list = cell_value.get("items")
399            if entity_list or entity_list == []:
400                # If the cell value is a list of dictionaries, recursively call this function on each dictionary
401                return [
402                    self._remove_dict_from_cell(entity) for entity in entity_list
403                ]
404            return cell_value
405        return cell_value
406
407    def get_workspace_bucket(self) -> str:
408        """
409        Get the workspace bucket name. Does not include the `gs://` prefix.
410
411        **Returns:**
412        - str: The bucket name.
413        """
414        return self.get_workspace_info().json()["workspace"]["bucketName"]
415
416    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
417        """
418        Get workspace entity information.
419
420        **Args:**
421        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
422
423        **Returns:**
424        - requests.Response: The response from the request.
425        """
426        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
427        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
428        return self.request_util.run_request(uri=url, method=GET)
429
430    def get_workspace_acl(self) -> requests.Response:
431        """
432        Get the workspace access control list (ACL).
433
434        **Returns:**
435        - requests.Response: The response from the request.
436        """
437        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
438        return self.request_util.run_request(
439            uri=url,
440            method=GET
441        )
442
443    def update_user_acl(
444            self,
445            email: str,
446            access_level: str,
447            can_share: bool = False,
448            can_compute: bool = False,
449            invite_users_not_found: bool = False,
450    ) -> requests.Response:
451        """
452        Update the access control list (ACL) for a user in the workspace.
453
454        **Args:**
455        - email (str): The email of the user.
456        - access_level (str): The access level to grant to the user.
457        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
458        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
459        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
460                the workspace. Defaults to `False`
461
462        **Returns:**
463        - requests.Response: The response from the request.
464        """
465        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
466              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
467        payload = {
468            "email": email,
469            "accessLevel": access_level,
470            "canShare": can_share,
471            "canCompute": can_compute,
472        }
473        logging.info(
474            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
475        response = self.request_util.run_request(
476            uri=url,
477            method=PATCH,
478            content_type="application/json",
479            data="[" + json.dumps(payload) + "]"
480        )
481
482        if response.json()["usersNotFound"] and not invite_users_not_found:
483            # Will be a list of one user
484            user_not_found = response.json()["usersNotFound"][0]
485            raise Exception(
486                f'The user {user_not_found["email"]} was not found and access was not updated'
487            )
488        return response
489
490    @deprecated(
491        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
492    )
493    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
494        """
495        Update the metadata for a library dataset.
496
497        **Args:**
498        - library_metadata (dict): The metadata to update.
499        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
500
501        **Returns:**
502        - requests.Response: The response from the request.
503        """
504        acl = f"{TERRA_LINK}/library/{self.billing_project}/{self.workspace_name}" + \
505              f"/metadata?validate={str(validate).lower()}"
506        return self.request_util.run_request(
507            uri=acl,
508            method=PUT,
509            data=json.dumps(library_metadata)
510        )
511
512    def update_multiple_users_acl(
513            self, acl_list: list[dict], invite_users_not_found: bool = False
514    ) -> requests.Response:
515        """
516        Update the access control list (ACL) for multiple users in the workspace.
517
518        **Args:**
519        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
520        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
521                the workspace. Defaults to `False`
522
523        **Returns:**
524        - requests.Response: The response from the request.
525        """
526        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
527            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
528        logging.info(
529            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
530        response = self.request_util.run_request(
531            uri=url,
532            method=PATCH,
533            content_type="application/json",
534            data=json.dumps(acl_list)
535        )
536
537        if response.json()["usersNotFound"] and not invite_users_not_found:
538            # Will be a list of one user
539            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
540            raise Exception(
541                f"The following users were not found and access was not updated: {users_not_found}"
542            )
543        return response
544
545    def create_workspace(
546            self,
547            auth_domain: list[dict] = [],
548            attributes: dict = {},
549            continue_if_exists: bool = False,
550    ) -> requests.Response:
551        """
552        Create a new workspace in Terra.
553
554        **Args:**
555        - auth_domain (list[dict], optional): A list of authorization domains. Should look
556                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
557        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
558        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
559
560        **Returns:**
561        - requests.Response: The response from the request.
562        """
563        payload = {
564            "namespace": self.billing_project,
565            "name": self.workspace_name,
566            "authorizationDomain": auth_domain,
567            "attributes": attributes,
568            "cloudPlatform": GCP
569        }
570        # If workspace already exists then continue if exists
571        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
572        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
573        response = self.request_util.run_request(
574            uri=f"{TERRA_LINK}/workspaces",
575            method=POST,
576            content_type="application/json",
577            data=json.dumps(payload),
578            accept_return_codes=accept_return_codes
579        )
580        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
581            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
582        return response
583
584    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
585        """
586        Create an ingest dictionary for workspace attributes.
587
588        **Args:**
589        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
590
591        **Returns:**
592        - list[dict]: A list of dictionaries containing the workspace attributes.
593        """
594        # If not provided then call API to get it
595        workspace_attributes = (
596            workspace_attributes if workspace_attributes
597            else self.get_workspace_info().json()["workspace"]["attributes"]
598        )
599
600        ingest_dict = []
601        for key, value in workspace_attributes.items():
602            # If value is dict just use 'items' as value
603            if isinstance(value, dict):
604                value = value.get("items")
605            # If value is list convert to comma separated string
606            if isinstance(value, list):
607                value = ", ".join(value)
608            ingest_dict.append(
609                {
610                    "attribute": key,
611                    "value": str(value) if value else None
612                }
613            )
614        return ingest_dict
615
616    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
617        """
618        Upload metadata to the workspace table.
619
620        **Args:**
621        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
622
623        **Returns:**
624        - requests.Response: The response from the request.
625        """
626        endpoint = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
627        data = {"entities": open(entities_tsv, "rb")}
628        return self.request_util.upload_file(
629            uri=endpoint,
630            data=data
631        )
632
633    def get_workspace_workflows(self) -> requests.Response:
634        """
635        Get the workflows for the workspace.
636
637        **Returns:**
638        - requests.Response: The response from the request.
639        """
640        uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
641        return self.request_util.run_request(
642            uri=uri,
643            method=GET
644        )
645
646    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
647        """
648        Import a workflow into the workspace.
649
650        **Args:**
651        - workflow_dict (dict): The dictionary containing the workflow information.
652        - continue_if_exists (bool, optional): Whether to continue if the workflow
653                already exists. Defaults to `False`.
654
655        **Returns:**
656        - requests.Response: The response from the request.
657        """
658        uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
659        workflow_json = json.dumps(workflow_dict)
660        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
661        return self.request_util.run_request(
662            uri=uri,
663            method=POST,
664            data=workflow_json,
665            content_type="application/json",
666            accept_return_codes=accept_return_codes
667        )
668
669    def delete_workspace(self) -> requests.Response:
670        """
671        Delete a Terra workspace.
672
673        **Returns:**
674        - requests.Response: The response from the request.
675        """
676        return self.request_util.run_request(
677            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}",
678            method=DELETE
679        )
680
681    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
682        """
683        Update the attributes for the workspace.
684
685        **Args:**
686        - attributes (dict): The attributes to update.
687
688        **Returns:**
689        - requests.Response: The response from the request.
690        """
691        return self.request_util.run_request(
692            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
693            method=PATCH,
694            data=json.dumps(attributes),
695            content_type="application/json"
696        )
697
698    def leave_workspace(
699            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
700    ) -> requests.Response:
701        """
702        Leave a workspace. If workspace ID not supplied, will look it up.
703
704        **Args:**
705        - workspace_id (str, optional): The workspace ID. Defaults to None.
706        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
707             Defaults to `False`.
708
709        **Returns:**
710        - requests.Response: The response from the request.
711        """
712        if not workspace_id:
713            workspace_info = self.get_workspace_info().json()
714            workspace_id = workspace_info['workspace']['workspaceId']
715        accepted_return_code = [403] if ignore_direct_access_error else []
716
717        res = self.request_util.run_request(
718            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
719            method=DELETE,
720            accept_return_codes=accepted_return_code
721        )
722        if (res.status_code == 403
723                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
724            logging.info(
725                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
726                f"access to the workspace (they could be an owner on the billing project)"
727            )
728        return res
729
730    def change_workspace_public_setting(self, public: bool) -> requests.Response:
731        """
732        Change a workspace's public setting.
733
734        **Args:**
735        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
736         public, `False` otherwise.
737
738        **Returns:**
739        - requests.Response: The response from the request.
740        """
741        body = [
742            {
743                "settingType": "PubliclyReadable",
744                "config": {
745                    "enabled": public
746                }
747            }
748        ]
749        return self.request_util.run_request(
750            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
751            method=PUT,
752            content_type="application/json",
753            data=json.dumps(body)
754        )
755
756    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
757        """
758        Check if a workspace is public.
759
760        **Args:**
761        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
762        it up if not provided. Defaults to None.
763
764        **Returns:**
765        - requests.Response: The response from the request.
766        """
767        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
768        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
769        return self.request_util.run_request(
770            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
771            method=GET
772        )
773
774    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
775        """Delete an entire entity table from a Terra workspace.
776
777        **Args:**
778        - entity_to_delete (str): The name of the entity table to delete.
779
780        **Returns:**
781        - requests.Response: The response from the request.
782        """
783        response = self.request_util.run_request(
784            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
785            method=DELETE
786        )
787        if response.status_code == 204:
788            logging.info(
789                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
790                f"'{self.billing_project}/{self.workspace_name}'"
791            )
792        else:
793            logging.error(
794                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
795                f"table: {response.text}"
796            )
797        return response
798
799    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
800        """Save an entity table version in a Terra workspace.
801
802        **Args:**
803        - entity_type (str): The name of the entity table to save a new version for
804        - version_name (str): The name of the new version
805        """
806        # Get the workspace metrics
807        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
808        file_name = f"{entity_type}.json"
809        # Write the workspace metrics to a JSON file
810        with open(file_name, "w") as json_file:
811            json.dump(workspace_metrics, json_file)
812
813        # Create a zip file with the same naming convention that Terra backend uses
814        timestamp_ms = int(time.time() * 1000)
815        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
816        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
817            zipf.write(file_name, arcname=f"json/{file_name}")
818
819        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
820        workspace_info = self.get_workspace_info().json()
821        path_to_upload_to = os.path.join(
822            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
823        )
824        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
825        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
826        try:
827            active_account = gcp_util.get_active_gcloud_account()
828        except Exception as e:
829            active_account = workspace_info["workspace"]["createdBy"]
830            logging.error(
831                f"Encountered the following exception while attempting to get current GCP account: {e}. "
832                f"Will set the owner of the new metadata version as the workspace creator instead."
833            )
834        gcp_util.upload_blob(
835            source_file=zip_file_name,
836            destination_path=path_to_upload_to,
837            custom_metadata={
838                "createdBy": active_account,
839                "entityType": entity_type,
840                "timestamp": timestamp_ms,
841                "description": version_name,
842            }
843        )
844
845    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
846        """
847        Add a user comment to a submission in Terra.
848
849        **Args:**
850        - submission_id (str): The ID of the submission to add a comment to.
851        - user_comment (str): The comment to add to the submission.
852
853        **Returns:**
854        - requests.Response: The response from the request.
855        """
856        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
857        return self.request_util.run_request(
858            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
859            method=PATCH,
860            content_type="application/json",
861            data=json.dumps({"userComment": user_comment}),
862        )
863
864    def initiate_submission(
865            self,
866            method_config_namespace: str,
867            method_config_name: str,
868            entity_type: str,
869            entity_name: str,
870            expression: str,
871            user_comment: Optional[str],
872            use_call_cache: bool = True
873    ) -> requests.Response:
874        """
875        Initiate a submission within a Terra workspace.
876
877        Note - the workflow being initiated MUST already be imported into the workspace.
878
879        **Args:**
880        - method_config_namespace (str): The namespace of the method configuration.
881        - method_config_name (str): The name of the method configuration to use for the submission
882        (i.e. the workflow name).
883        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
884        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
885        "sample_set_1").
886        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
887        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
888        be launched PER SAMPLE, the expression should be `this.samples`.
889        - user_comment (str, optional): The user comment to add to the submission.
890        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
891
892        **Returns:**
893        - requests.Response: The response from the request.
894        """
895        payload = {
896            "methodConfigurationNamespace": method_config_namespace,
897            "methodConfigurationName": method_config_name,
898            "entityType": entity_type,
899            "entityName": entity_name,
900            "expression": expression,
901            "useCallCache": use_call_cache,
902            "deleteIntermediateOutputFiles": False,
903            "useReferenceDisks": False,
904            "ignoreEmptyOutputs": False,
905        }
906        if user_comment:
907            payload["userComment"] = user_comment
908
909        return self.request_util.run_request(
910            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
911            method=POST,
912            content_type="application/json",
913            data=json.dumps(payload),
914        )
915      
916    def retry_failed_submission(self, submission_id: str) -> requests.Response:
917        """
918        Retry a failed submission in Terra.
919
920        **Args:**
921        - submission_id (str): The ID of the submission to retry.
922
923        **Returns:**
924        - requests.Response: The response from the request.
925        """
926        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
927        payload = {"retryType": "Failed"}
928        logging.info(
929            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
930        )
931        return self.request_util.run_request(
932            uri=url,
933            method=POST,
934            content_type="application/json",
935            data=json.dumps(payload)
936        )
937
938    def get_submission_status(self, submission_id: str) -> requests.Response:
939        """
940        Get the status of a submission in Terra.
941
942        **Args:**
943        - submission_id (str): The ID of the submission.
944
945        **Returns:**
946        - requests.Response: The response from the request.
947        """
948        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
949        logging.info(
950            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
951        )
952        return self.request_util.run_request(
953            uri=url,
954            method=GET
955        )

Terra workspace class to manage workspaces and their attributes.

TerraWorkspace( billing_project: str, workspace_name: str, request_util: ops_utils.request_util.RunRequest)
206    def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest):
207        """
208        Initialize the TerraWorkspace class.
209
210        **Args:**
211        - billing_project (str): The billing project associated with the workspace.
212        - workspace_name (str): The name of the workspace.
213        - request_util (`ops_utils.request_util.RunRequest`): An instance of a
214            request utility class to handle HTTP requests.
215        """
216        self.billing_project = billing_project
217        """@private"""
218        self.workspace_name = workspace_name
219        """@private"""
220        self.workspace_id = None
221        """@private"""
222        self.resource_id = None
223        """@private"""
224        self.storage_container = None
225        """@private"""
226        self.bucket = None
227        """@private"""
228        self.wds_url = None
229        """@private"""
230        self.account_url: Optional[str] = None
231        """@private"""
232        self.request_util = request_util
233        """@private"""

Initialize the TerraWorkspace class.

Args:

  • billing_project (str): The billing project associated with the workspace.
  • workspace_name (str): The name of the workspace.
  • request_util (ops_utils.request_util.RunRequest): An instance of a request utility class to handle HTTP requests.
@staticmethod
def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
277    @staticmethod
278    def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None:
279        """Check that all headers follow the standards required by TDR.
280
281        **Args:**
282        - table_name (str): The name of the Terra table.
283        - headers (list[str]): The headers of the Terra table to validate.
284
285        **Raises:**
286        - ValueError if any headers are considered invalid by TDR standards
287        """
288        tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$"
289        tdr_max_header_length = 63
290
291        headers_containing_too_many_characters = []
292        headers_contain_invalid_characters = []
293
294        for header in headers:
295            if len(header) > tdr_max_header_length:
296                headers_containing_too_many_characters.append(header)
297            if not re.match(tdr_header_allowed_pattern, header):
298                headers_contain_invalid_characters.append(header)
299
300        base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table,
301        and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for
302        header naming."""
303        too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many
304        characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header
305        allowed in TDR is {tdr_max_header_length}.\n"""
306        invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid
307        characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must
308        only contain numbers, letters, and underscore characters.\n"""
309
310        error_to_report = ""
311        if headers_containing_too_many_characters:
312            error_to_report += too_many_characters_error_message
313        if headers_contain_invalid_characters:
314            error_to_report += invalid_characters_error_message
315        if error_to_report:
316            error_to_report += base_error_message
317            raise ValueError(error_to_report)

Check that all headers follow the standards required by TDR.

Args:

  • table_name (str): The name of the Terra table.
  • headers (list[str]): The headers of the Terra table to validate.

Raises:

  • ValueError if any headers are considered invalid by TDR standards
def get_workspace_info(self) -> requests.models.Response:
319    def get_workspace_info(self) -> requests.Response:
320        """
321        Get workspace information.
322
323        **Returns:**
324        - requests.Response: The response from the request.
325        """
326        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}"
327        logging.info(
328            f"Getting workspace info for {self.billing_project}/{self.workspace_name}")
329        return self.request_util.run_request(uri=url, method=GET)

Get workspace information.

Returns:

  • requests.Response: The response from the request.
def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
331    def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]:
332        """
333        Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
334
335        **Args:**
336        - entity_type (str): The type of entity to get metrics for.
337        - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`.
338
339        **Returns:**
340        - list[dict]: A list of dictionaries containing entity metrics.
341        """
342        results = []
343        logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}")
344
345        for page in self._yield_all_entity_metrics(entity=entity_type):
346            results.extend(page["results"])
347
348        # If remove_dicts is True, remove dictionaries from the workspace metrics
349        if remove_dicts:
350            for row in results:
351                row['attributes'] = self._remove_dict_from_attributes(row['attributes'])
352        return results

Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).

Args:

  • entity_type (str): The type of entity to get metrics for.
  • remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to False.

Returns:

  • list[dict]: A list of dictionaries containing entity metrics.
def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.models.Response:
354    def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response:
355        """
356        Get specific entity metrics for a given entity type and name.
357
358        **Args:**
359        - entity_type (str): The type of entity to get metrics for.
360        - entity_name (str): The name of the entity to get metrics for.
361
362        **Returns:**
363        - requests.Response: The response from the request.
364        """
365        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}"
366        return self.request_util.run_request(uri=url, method=GET)

Get specific entity metrics for a given entity type and name.

Args:

  • entity_type (str): The type of entity to get metrics for.
  • entity_name (str): The name of the entity to get metrics for.

Returns:

  • requests.Response: The response from the request.
def get_workspace_bucket(self) -> str:
407    def get_workspace_bucket(self) -> str:
408        """
409        Get the workspace bucket name. Does not include the `gs://` prefix.
410
411        **Returns:**
412        - str: The bucket name.
413        """
414        return self.get_workspace_info().json()["workspace"]["bucketName"]

Get the workspace bucket name. Does not include the gs:// prefix.

Returns:

  • str: The bucket name.
def get_workspace_entity_info(self, use_cache: bool = True) -> requests.models.Response:
416    def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response:
417        """
418        Get workspace entity information.
419
420        **Args:**
421        - use_cache (bool, optional): Whether to use cache. Defaults to `True`.
422
423        **Returns:**
424        - requests.Response: The response from the request.
425        """
426        use_cache = "true" if use_cache else "false"  # type: ignore[assignment]
427        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}"
428        return self.request_util.run_request(uri=url, method=GET)

Get workspace entity information.

Args:

  • use_cache (bool, optional): Whether to use cache. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def get_workspace_acl(self) -> requests.models.Response:
430    def get_workspace_acl(self) -> requests.Response:
431        """
432        Get the workspace access control list (ACL).
433
434        **Returns:**
435        - requests.Response: The response from the request.
436        """
437        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl"
438        return self.request_util.run_request(
439            uri=url,
440            method=GET
441        )

Get the workspace access control list (ACL).

Returns:

  • requests.Response: The response from the request.
def update_user_acl( self, email: str, access_level: str, can_share: bool = False, can_compute: bool = False, invite_users_not_found: bool = False) -> requests.models.Response:
443    def update_user_acl(
444            self,
445            email: str,
446            access_level: str,
447            can_share: bool = False,
448            can_compute: bool = False,
449            invite_users_not_found: bool = False,
450    ) -> requests.Response:
451        """
452        Update the access control list (ACL) for a user in the workspace.
453
454        **Args:**
455        - email (str): The email of the user.
456        - access_level (str): The access level to grant to the user.
457        - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`.
458        - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`.
459        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
460                the workspace. Defaults to `False`
461
462        **Returns:**
463        - requests.Response: The response from the request.
464        """
465        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
466              f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
467        payload = {
468            "email": email,
469            "accessLevel": access_level,
470            "canShare": can_share,
471            "canCompute": can_compute,
472        }
473        logging.info(
474            f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}")
475        response = self.request_util.run_request(
476            uri=url,
477            method=PATCH,
478            content_type="application/json",
479            data="[" + json.dumps(payload) + "]"
480        )
481
482        if response.json()["usersNotFound"] and not invite_users_not_found:
483            # Will be a list of one user
484            user_not_found = response.json()["usersNotFound"][0]
485            raise Exception(
486                f'The user {user_not_found["email"]} was not found and access was not updated'
487            )
488        return response

Update the access control list (ACL) for a user in the workspace.

Args:

  • email (str): The email of the user.
  • access_level (str): The access level to grant to the user.
  • can_share (bool, optional): Whether the user can share the workspace. Defaults to False.
  • can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to False.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
@deprecated('Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.')
def put_metadata_for_library_dataset( self, library_metadata: dict, validate: bool = False) -> requests.models.Response:
490    @deprecated(
491        """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra."""  # noqa: E501
492    )
493    def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response:
494        """
495        Update the metadata for a library dataset.
496
497        **Args:**
498        - library_metadata (dict): The metadata to update.
499        - validate (bool, optional): Whether to validate the metadata. Defaults to `False`.
500
501        **Returns:**
502        - requests.Response: The response from the request.
503        """
504        acl = f"{TERRA_LINK}/library/{self.billing_project}/{self.workspace_name}" + \
505              f"/metadata?validate={str(validate).lower()}"
506        return self.request_util.run_request(
507            uri=acl,
508            method=PUT,
509            data=json.dumps(library_metadata)
510        )

Update the metadata for a library dataset.

Args:

  • library_metadata (dict): The metadata to update.
  • validate (bool, optional): Whether to validate the metadata. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def update_multiple_users_acl( self, acl_list: list[dict], invite_users_not_found: bool = False) -> requests.models.Response:
512    def update_multiple_users_acl(
513            self, acl_list: list[dict], invite_users_not_found: bool = False
514    ) -> requests.Response:
515        """
516        Update the access control list (ACL) for multiple users in the workspace.
517
518        **Args:**
519        - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
520        - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
521                the workspace. Defaults to `False`
522
523        **Returns:**
524        - requests.Response: The response from the request.
525        """
526        url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \
527            f"inviteUsersNotFound={str(invite_users_not_found).lower()}"
528        logging.info(
529            f"Updating users in workspace {self.billing_project}/{self.workspace_name}")
530        response = self.request_util.run_request(
531            uri=url,
532            method=PATCH,
533            content_type="application/json",
534            data=json.dumps(acl_list)
535        )
536
537        if response.json()["usersNotFound"] and not invite_users_not_found:
538            # Will be a list of one user
539            users_not_found = [u["email"] for u in response.json()["usersNotFound"]]
540            raise Exception(
541                f"The following users were not found and access was not updated: {users_not_found}"
542            )
543        return response

Update the access control list (ACL) for multiple users in the workspace.

Args:

  • acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
  • invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access the workspace. Defaults to False

Returns:

  • requests.Response: The response from the request.
def create_workspace( self, auth_domain: list[dict] = [], attributes: dict = {}, continue_if_exists: bool = False) -> requests.models.Response:
545    def create_workspace(
546            self,
547            auth_domain: list[dict] = [],
548            attributes: dict = {},
549            continue_if_exists: bool = False,
550    ) -> requests.Response:
551        """
552        Create a new workspace in Terra.
553
554        **Args:**
555        - auth_domain (list[dict], optional): A list of authorization domains. Should look
556                like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list.
557        - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
558        - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`.
559
560        **Returns:**
561        - requests.Response: The response from the request.
562        """
563        payload = {
564            "namespace": self.billing_project,
565            "name": self.workspace_name,
566            "authorizationDomain": auth_domain,
567            "attributes": attributes,
568            "cloudPlatform": GCP
569        }
570        # If workspace already exists then continue if exists
571        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
572        logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}")
573        response = self.request_util.run_request(
574            uri=f"{TERRA_LINK}/workspaces",
575            method=POST,
576            content_type="application/json",
577            data=json.dumps(payload),
578            accept_return_codes=accept_return_codes
579        )
580        if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE:
581            logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists")
582        return response

Create a new workspace in Terra.

Args:

  • auth_domain (list[dict], optional): A list of authorization domains. Should look like [{"membersGroupName": "some_auth_domain"}]. Defaults to an empty list.
  • attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
  • continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
584    def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]:
585        """
586        Create an ingest dictionary for workspace attributes.
587
588        **Args:**
589        - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
590
591        **Returns:**
592        - list[dict]: A list of dictionaries containing the workspace attributes.
593        """
594        # If not provided then call API to get it
595        workspace_attributes = (
596            workspace_attributes if workspace_attributes
597            else self.get_workspace_info().json()["workspace"]["attributes"]
598        )
599
600        ingest_dict = []
601        for key, value in workspace_attributes.items():
602            # If value is dict just use 'items' as value
603            if isinstance(value, dict):
604                value = value.get("items")
605            # If value is list convert to comma separated string
606            if isinstance(value, list):
607                value = ", ".join(value)
608            ingest_dict.append(
609                {
610                    "attribute": key,
611                    "value": str(value) if value else None
612                }
613            )
614        return ingest_dict

Create an ingest dictionary for workspace attributes.

Args:

  • workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.

Returns:

  • list[dict]: A list of dictionaries containing the workspace attributes.
def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.models.Response:
616    def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response:
617        """
618        Upload metadata to the workspace table.
619
620        **Args:**
621        - entities_tsv (str): The path to the TSV file containing the metadata to upload.
622
623        **Returns:**
624        - requests.Response: The response from the request.
625        """
626        endpoint = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities"
627        data = {"entities": open(entities_tsv, "rb")}
628        return self.request_util.upload_file(
629            uri=endpoint,
630            data=data
631        )

Upload metadata to the workspace table.

Args:

  • entities_tsv (str): The path to the TSV file containing the metadata to upload.

Returns:

  • requests.Response: The response from the request.
def get_workspace_workflows(self) -> requests.models.Response:
633    def get_workspace_workflows(self) -> requests.Response:
634        """
635        Get the workflows for the workspace.
636
637        **Returns:**
638        - requests.Response: The response from the request.
639        """
640        uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true"
641        return self.request_util.run_request(
642            uri=uri,
643            method=GET
644        )

Get the workflows for the workspace.

Returns:

  • requests.Response: The response from the request.
def import_workflow( self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.models.Response:
646    def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response:
647        """
648        Import a workflow into the workspace.
649
650        **Args:**
651        - workflow_dict (dict): The dictionary containing the workflow information.
652        - continue_if_exists (bool, optional): Whether to continue if the workflow
653                already exists. Defaults to `False`.
654
655        **Returns:**
656        - requests.Response: The response from the request.
657        """
658        uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs"
659        workflow_json = json.dumps(workflow_dict)
660        accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else []
661        return self.request_util.run_request(
662            uri=uri,
663            method=POST,
664            data=workflow_json,
665            content_type="application/json",
666            accept_return_codes=accept_return_codes
667        )

Import a workflow into the workspace.

Args:

  • workflow_dict (dict): The dictionary containing the workflow information.
  • continue_if_exists (bool, optional): Whether to continue if the workflow already exists. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def delete_workspace(self) -> requests.models.Response:
669    def delete_workspace(self) -> requests.Response:
670        """
671        Delete a Terra workspace.
672
673        **Returns:**
674        - requests.Response: The response from the request.
675        """
676        return self.request_util.run_request(
677            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}",
678            method=DELETE
679        )

Delete a Terra workspace.

Returns:

  • requests.Response: The response from the request.
def update_workspace_attributes(self, attributes: list[dict]) -> requests.models.Response:
681    def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response:
682        """
683        Update the attributes for the workspace.
684
685        **Args:**
686        - attributes (dict): The attributes to update.
687
688        **Returns:**
689        - requests.Response: The response from the request.
690        """
691        return self.request_util.run_request(
692            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes",
693            method=PATCH,
694            data=json.dumps(attributes),
695            content_type="application/json"
696        )

Update the attributes for the workspace.

Args:

  • attributes (dict): The attributes to update.

Returns:

  • requests.Response: The response from the request.
def leave_workspace( self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False) -> requests.models.Response:
698    def leave_workspace(
699            self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False
700    ) -> requests.Response:
701        """
702        Leave a workspace. If workspace ID not supplied, will look it up.
703
704        **Args:**
705        - workspace_id (str, optional): The workspace ID. Defaults to None.
706        - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
707             Defaults to `False`.
708
709        **Returns:**
710        - requests.Response: The response from the request.
711        """
712        if not workspace_id:
713            workspace_info = self.get_workspace_info().json()
714            workspace_id = workspace_info['workspace']['workspaceId']
715        accepted_return_code = [403] if ignore_direct_access_error else []
716
717        res = self.request_util.run_request(
718            uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave",
719            method=DELETE,
720            accept_return_codes=accepted_return_code
721        )
722        if (res.status_code == 403
723                and res.json()["message"] == "You can only leave a resource that you have direct access to."):
724            logging.info(
725                f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct"
726                f"access to the workspace (they could be an owner on the billing project)"
727            )
728        return res

Leave a workspace. If workspace ID not supplied, will look it up.

Args:

  • workspace_id (str, optional): The workspace ID. Defaults to None.
  • ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. Defaults to False.

Returns:

  • requests.Response: The response from the request.
def change_workspace_public_setting(self, public: bool) -> requests.models.Response:
730    def change_workspace_public_setting(self, public: bool) -> requests.Response:
731        """
732        Change a workspace's public setting.
733
734        **Args:**
735        - public (bool, optional): Whether the workspace should be public. Set to `True` to be made
736         public, `False` otherwise.
737
738        **Returns:**
739        - requests.Response: The response from the request.
740        """
741        body = [
742            {
743                "settingType": "PubliclyReadable",
744                "config": {
745                    "enabled": public
746                }
747            }
748        ]
749        return self.request_util.run_request(
750            uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings",
751            method=PUT,
752            content_type="application/json",
753            data=json.dumps(body)
754        )

Change a workspace's public setting.

Args:

  • public (bool, optional): Whether the workspace should be public. Set to True to be made public, False otherwise.

Returns:

  • requests.Response: The response from the request.
def check_workspace_public(self, bucket: Optional[str] = None) -> requests.models.Response:
756    def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response:
757        """
758        Check if a workspace is public.
759
760        **Args:**
761        - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look
762        it up if not provided. Defaults to None.
763
764        **Returns:**
765        - requests.Response: The response from the request.
766        """
767        workspace_bucket = bucket if bucket else self.get_workspace_bucket()
768        bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-")
769        return self.request_util.run_request(
770            uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public",
771            method=GET
772        )

Check if a workspace is public.

Args:

  • bucket (str, optional): The bucket name (provided without the gs:// prefix). Will look it up if not provided. Defaults to None.

Returns:

  • requests.Response: The response from the request.
def delete_entity_table(self, entity_to_delete: str) -> requests.models.Response:
774    def delete_entity_table(self, entity_to_delete: str) -> requests.Response:
775        """Delete an entire entity table from a Terra workspace.
776
777        **Args:**
778        - entity_to_delete (str): The name of the entity table to delete.
779
780        **Returns:**
781        - requests.Response: The response from the request.
782        """
783        response = self.request_util.run_request(
784            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}",
785            method=DELETE
786        )
787        if response.status_code == 204:
788            logging.info(
789                f"Successfully deleted entity table: '{entity_to_delete}' from workspace: "
790                f"'{self.billing_project}/{self.workspace_name}'"
791            )
792        else:
793            logging.error(
794                f"Encountered the following error while attempting to delete '{entity_to_delete}' "
795                f"table: {response.text}"
796            )
797        return response

Delete an entire entity table from a Terra workspace.

Args:

  • entity_to_delete (str): The name of the entity table to delete.

Returns:

  • requests.Response: The response from the request.
def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
799    def save_entity_table_version(self, entity_type: str, version_name: str) -> None:
800        """Save an entity table version in a Terra workspace.
801
802        **Args:**
803        - entity_type (str): The name of the entity table to save a new version for
804        - version_name (str): The name of the new version
805        """
806        # Get the workspace metrics
807        workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type)
808        file_name = f"{entity_type}.json"
809        # Write the workspace metrics to a JSON file
810        with open(file_name, "w") as json_file:
811            json.dump(workspace_metrics, json_file)
812
813        # Create a zip file with the same naming convention that Terra backend uses
814        timestamp_ms = int(time.time() * 1000)
815        zip_file_name = f"{entity_type}.v{timestamp_ms}.zip"
816        with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf:
817            zipf.write(file_name, arcname=f"json/{file_name}")
818
819        # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live)
820        workspace_info = self.get_workspace_info().json()
821        path_to_upload_to = os.path.join(
822            "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name
823        )
824        gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"])
825        # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails
826        try:
827            active_account = gcp_util.get_active_gcloud_account()
828        except Exception as e:
829            active_account = workspace_info["workspace"]["createdBy"]
830            logging.error(
831                f"Encountered the following exception while attempting to get current GCP account: {e}. "
832                f"Will set the owner of the new metadata version as the workspace creator instead."
833            )
834        gcp_util.upload_blob(
835            source_file=zip_file_name,
836            destination_path=path_to_upload_to,
837            custom_metadata={
838                "createdBy": active_account,
839                "entityType": entity_type,
840                "timestamp": timestamp_ms,
841                "description": version_name,
842            }
843        )

Save an entity table version in a Terra workspace.

Args:

  • entity_type (str): The name of the entity table to save a new version for
  • version_name (str): The name of the new version
def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.models.Response:
845    def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response:
846        """
847        Add a user comment to a submission in Terra.
848
849        **Args:**
850        - submission_id (str): The ID of the submission to add a comment to.
851        - user_comment (str): The comment to add to the submission.
852
853        **Returns:**
854        - requests.Response: The response from the request.
855        """
856        logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'")
857        return self.request_util.run_request(
858            uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}",
859            method=PATCH,
860            content_type="application/json",
861            data=json.dumps({"userComment": user_comment}),
862        )

Add a user comment to a submission in Terra.

Args:

  • submission_id (str): The ID of the submission to add a comment to.
  • user_comment (str): The comment to add to the submission.

Returns:

  • requests.Response: The response from the request.
def initiate_submission( self, method_config_namespace: str, method_config_name: str, entity_type: str, entity_name: str, expression: str, user_comment: Optional[str], use_call_cache: bool = True) -> requests.models.Response:
864    def initiate_submission(
865            self,
866            method_config_namespace: str,
867            method_config_name: str,
868            entity_type: str,
869            entity_name: str,
870            expression: str,
871            user_comment: Optional[str],
872            use_call_cache: bool = True
873    ) -> requests.Response:
874        """
875        Initiate a submission within a Terra workspace.
876
877        Note - the workflow being initiated MUST already be imported into the workspace.
878
879        **Args:**
880        - method_config_namespace (str): The namespace of the method configuration.
881        - method_config_name (str): The name of the method configuration to use for the submission
882        (i.e. the workflow name).
883        - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
884        - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or
885        "sample_set_1").
886        - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is
887        launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should
888        be launched PER SAMPLE, the expression should be `this.samples`.
889        - user_comment (str, optional): The user comment to add to the submission.
890        - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`.
891
892        **Returns:**
893        - requests.Response: The response from the request.
894        """
895        payload = {
896            "methodConfigurationNamespace": method_config_namespace,
897            "methodConfigurationName": method_config_name,
898            "entityType": entity_type,
899            "entityName": entity_name,
900            "expression": expression,
901            "useCallCache": use_call_cache,
902            "deleteIntermediateOutputFiles": False,
903            "useReferenceDisks": False,
904            "ignoreEmptyOutputs": False,
905        }
906        if user_comment:
907            payload["userComment"] = user_comment
908
909        return self.request_util.run_request(
910            uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions",
911            method=POST,
912            content_type="application/json",
913            data=json.dumps(payload),
914        )

Initiate a submission within a Terra workspace.

Note - the workflow being initiated MUST already be imported into the workspace.

Args:

  • method_config_namespace (str): The namespace of the method configuration.
  • method_config_name (str): The name of the method configuration to use for the submission (i.e. the workflow name).
  • entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
  • entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or "sample_set_1").
  • expression (str): The "expression" to use. For example, if the entity_type is sample and the workflow is launching one sample, this can be left as this. If the entity_type is sample_set, but one workflow should be launched PER SAMPLE, the expression should be this.samples.
  • user_comment (str, optional): The user comment to add to the submission.
  • use_call_cache (bool, optional): Whether to use the call caching. Defaults to True.

Returns:

  • requests.Response: The response from the request.
def retry_failed_submission(self, submission_id: str) -> requests.models.Response:
916    def retry_failed_submission(self, submission_id: str) -> requests.Response:
917        """
918        Retry a failed submission in Terra.
919
920        **Args:**
921        - submission_id (str): The ID of the submission to retry.
922
923        **Returns:**
924        - requests.Response: The response from the request.
925        """
926        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry"
927        payload = {"retryType": "Failed"}
928        logging.info(
929            f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
930        )
931        return self.request_util.run_request(
932            uri=url,
933            method=POST,
934            content_type="application/json",
935            data=json.dumps(payload)
936        )

Retry a failed submission in Terra.

Args:

  • submission_id (str): The ID of the submission to retry.

Returns:

  • requests.Response: The response from the request.
def get_submission_status(self, submission_id: str) -> requests.models.Response:
938    def get_submission_status(self, submission_id: str) -> requests.Response:
939        """
940        Get the status of a submission in Terra.
941
942        **Args:**
943        - submission_id (str): The ID of the submission.
944
945        **Returns:**
946        - requests.Response: The response from the request.
947        """
948        url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}"
949        logging.info(
950            f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}"
951        )
952        return self.request_util.run_request(
953            uri=url,
954            method=GET
955        )

Get the status of a submission in Terra.

Args:

  • submission_id (str): The ID of the submission.

Returns:

  • requests.Response: The response from the request.