ops_utils.terra_util
Utilities for working with Terra.
1"""Utilities for working with Terra.""" 2import json 3import logging 4import re 5from typing import Any, Optional 6import requests 7import time 8import zipfile 9import os 10 11from . import deprecated 12from .vars import GCP, APPLICATION_JSON 13from .gcp_utils import GCPCloudFunctions 14from .request_util import GET, POST, PATCH, PUT, DELETE, RunRequest 15 16# Constants for Terra API links 17TERRA_DEV_LINK = "https://firecloud-orchestration.dsde-dev.broadinstitute.org/api" 18"""@private""" 19TERRA_PROD_LINK = "https://api.firecloud.org/api" 20"""@private""" 21LEONARDO_LINK = "https://leonardo.dsde-prod.broadinstitute.org/api" 22"""@private""" 23WORKSPACE_LINK = "https://workspace.dsde-prod.broadinstitute.org/api/workspaces/v1" 24"""@private""" 25SAM_LINK = "https://sam.dsde-prod.broadinstitute.org/api" 26"""@private""" 27RAWLS_LINK = "https://rawls.dsde-prod.broadinstitute.org/api" 28"""@private""" 29 30MEMBER = "member" 31ADMIN = "admin" 32 33 34class Terra: 35 """Class for generic Terra utilities.""" 36 37 def __init__(self, request_util: RunRequest, env: str = "prod"): 38 """ 39 Initialize the Terra class. 40 41 **Args:** 42 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 43 request utility class to handle HTTP requests. 44 """ 45 self.request_util = request_util 46 """@private""" 47 48 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 49 """ 50 Fetch the list of accessible workspaces. 51 52 **Args:** 53 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 54 55 **Returns:** 56 - requests.Response: The response from the request. 57 """ 58 fields_str = "fields=" + ",".join(fields) if fields else "" 59 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 60 return self.request_util.run_request( 61 uri=url, 62 method=GET 63 ) 64 65 def get_pet_account_json(self) -> requests.Response: 66 """ 67 Get the service account JSON. 68 69 **Returns:** 70 - requests.Response: The response from the request. 71 """ 72 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 73 return self.request_util.run_request( 74 uri=url, 75 method=GET 76 ) 77 78 79class TerraGroups: 80 """A class to manage Terra groups and their memberships.""" 81 82 GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN] 83 """@private""" 84 CONFLICT_STATUS_CODE = 409 85 """@private""" 86 87 def __init__(self, request_util: RunRequest): 88 """ 89 Initialize the TerraGroups class. 90 91 **Args:** 92 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 93 utility class to handle HTTP requests. 94 """ 95 self.request_util = request_util 96 """@private""" 97 98 def _check_role(self, role: str) -> None: 99 """ 100 Check if the role is valid. 101 102 Args: 103 role (str): The role to check. 104 105 Raises: 106 ValueError: If the role is not one of the allowed options. 107 """ 108 if role not in self.GROUP_MEMBERSHIP_OPTIONS: 109 raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}") 110 111 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 112 """ 113 Remove a user from a group. 114 115 **Args:** 116 - group (str): The name of the group. 117 - email (str): The email of the user to remove. 118 - role (str): The role of the user in the group 119 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 120 121 **Returns:** 122 - requests.Response: The response from the request. 123 """ 124 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 125 self._check_role(role) 126 logging.info(f"Removing {email} from group {group}") 127 return self.request_util.run_request( 128 uri=url, 129 method=DELETE 130 ) 131 132 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 133 """ 134 Create a new group. 135 136 **Args:** 137 - group_name (str): The name of the group to create. 138 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 139 140 **Returns:** 141 - requests.Response: The response from the request. 142 """ 143 url = f"{SAM_LINK}/groups/v1/{group_name}" 144 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 145 response = self.request_util.run_request( 146 uri=url, 147 method=POST, 148 accept_return_codes=accept_return_codes 149 ) 150 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 151 logging.info(f"Group {group_name} already exists. Continuing.") 152 return response 153 else: 154 logging.info(f"Created group {group_name}") 155 return response 156 157 def delete_group(self, group_name: str) -> requests.Response: 158 """ 159 Delete a group. 160 161 **Args:** 162 - group_name (str): The name of the group to delete. 163 164 **Returns:** 165 - requests.Response: The response from the request. 166 """ 167 url = f"{SAM_LINK}/groups/v1/{group_name}" 168 logging.info(f"Deleting group {group_name}") 169 return self.request_util.run_request( 170 uri=url, 171 method=DELETE 172 ) 173 174 def add_user_to_group( 175 self, group: str, email: str, role: str, continue_if_exists: bool = False 176 ) -> requests.Response: 177 """ 178 Add a user to a group. 179 180 **Args:** 181 - group (str): The name of the group. 182 - email (str): The email of the user to add. 183 - role (str): The role of the user in the group 184 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 185 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 186 Defaults to `False`. 187 188 **Returns:** 189 - requests.Response: The response from the request. 190 """ 191 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 192 self._check_role(role) 193 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 194 logging.info(f"Adding {email} to group {group} as {role}") 195 return self.request_util.run_request( 196 uri=url, 197 method=PUT, 198 accept_return_codes=accept_return_codes 199 ) 200 201 202class TerraWorkspace: 203 """Terra workspace class to manage workspaces and their attributes.""" 204 205 CONFLICT_STATUS_CODE = 409 206 """@private""" 207 208 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"): 209 """ 210 Initialize the TerraWorkspace class. 211 212 **Args:** 213 - billing_project (str): The billing project associated with the workspace. 214 - workspace_name (str): The name of the workspace. 215 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 216 request utility class to handle HTTP requests. 217 """ 218 self.billing_project = billing_project 219 """@private""" 220 self.workspace_name = workspace_name 221 """@private""" 222 self.workspace_id = None 223 """@private""" 224 self.resource_id = None 225 """@private""" 226 self.storage_container = None 227 """@private""" 228 self.bucket = None 229 """@private""" 230 self.wds_url = None 231 """@private""" 232 self.account_url: Optional[str] = None 233 """@private""" 234 self.request_util = request_util 235 """@private""" 236 if env.lower() == "dev": 237 self.terra_link = TERRA_DEV_LINK 238 """@private""" 239 elif env.lower() == "prod": 240 self.terra_link = TERRA_PROD_LINK 241 """@private""" 242 else: 243 raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.") 244 245 def __repr__(self) -> str: 246 """ 247 Return a string representation of the TerraWorkspace instance. 248 249 Returns: 250 str: The string representation of the TerraWorkspace instance. 251 """ 252 return f"{self.billing_project}/{self.workspace_name}" 253 254 def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any: 255 """ 256 Yield all entity metrics from the workspace. 257 258 Args: 259 entity (str): The type of entity to query. 260 total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000. 261 262 Yields: 263 Any: The JSON response containing entity metrics. 264 """ 265 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}" # noqa: E501 266 response = self.request_util.run_request( 267 uri=url, 268 method=GET, 269 content_type=APPLICATION_JSON 270 ) 271 raw_text = response.text 272 first_page_json = json.loads( 273 raw_text, 274 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 275 ) 276 yield first_page_json 277 total_pages = first_page_json["resultMetadata"]["filteredPageCount"] 278 logging.info( 279 f"Looping through {total_pages} pages of data") 280 281 for page in range(2, total_pages + 1): 282 logging.info(f"Getting page {page} of {total_pages}") 283 next_page = self.request_util.run_request( 284 uri=url, 285 method=GET, 286 content_type=APPLICATION_JSON, 287 params={"page": page} 288 ) 289 raw_text = next_page.text 290 page_json = json.loads( 291 raw_text, 292 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 293 ) 294 yield page_json 295 296 @staticmethod 297 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 298 """Check that all headers follow the standards required by TDR. 299 300 **Args:** 301 - table_name (str): The name of the Terra table. 302 - headers (list[str]): The headers of the Terra table to validate. 303 304 **Raises:** 305 - ValueError if any headers are considered invalid by TDR standards 306 """ 307 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 308 tdr_max_header_length = 63 309 310 headers_containing_too_many_characters = [] 311 headers_contain_invalid_characters = [] 312 313 for header in headers: 314 if len(header) > tdr_max_header_length: 315 headers_containing_too_many_characters.append(header) 316 if not re.match(tdr_header_allowed_pattern, header): 317 headers_contain_invalid_characters.append(header) 318 319 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 320 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 321 header naming.""" 322 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 323 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 324 allowed in TDR is {tdr_max_header_length}.\n""" 325 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 326 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 327 only contain numbers, letters, and underscore characters.\n""" 328 329 error_to_report = "" 330 if headers_containing_too_many_characters: 331 error_to_report += too_many_characters_error_message 332 if headers_contain_invalid_characters: 333 error_to_report += invalid_characters_error_message 334 if error_to_report: 335 error_to_report += base_error_message 336 raise ValueError(error_to_report) 337 338 def get_workspace_info(self) -> requests.Response: 339 """ 340 Get workspace information. 341 342 **Returns:** 343 - requests.Response: The response from the request. 344 """ 345 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}" 346 logging.info( 347 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 348 return self.request_util.run_request(uri=url, method=GET) 349 350 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]: 351 """ 352 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 353 354 **Args:** 355 - entity_type (str): The type of entity to get metrics for. 356 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 357 358 **Returns:** 359 - list[dict]: A list of dictionaries containing entity metrics. 360 """ 361 results = [] 362 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 363 364 for page in self._yield_all_entity_metrics(entity=entity_type): 365 results.extend(page["results"]) 366 367 # If remove_dicts is True, remove dictionaries from the workspace metrics 368 if remove_dicts: 369 for row in results: 370 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 371 return results 372 373 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 374 """ 375 Get specific entity metrics for a given entity type and name. 376 377 **Args:** 378 - entity_type (str): The type of entity to get metrics for. 379 - entity_name (str): The name of the entity to get metrics for. 380 381 **Returns:** 382 - requests.Response: The response from the request. 383 """ 384 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" 385 return self.request_util.run_request(uri=url, method=GET) 386 387 def _remove_dict_from_attributes(self, attributes: dict) -> dict: 388 """ 389 Remove dictionaries from the attributes. 390 391 Args: 392 attributes (dict): The attributes to remove dictionaries from. 393 394 Returns: 395 dict: The updated attributes with no dictionaries. 396 """ 397 for key, value in attributes.items(): 398 attributes[key] = self._remove_dict_from_cell(value) 399 return attributes 400 401 def _remove_dict_from_cell(self, cell_value: Any) -> Any: 402 """ 403 Remove a dictionary from a cell. 404 405 Args: 406 cell_value (Any): The dictionary to remove. 407 408 Returns: 409 Any: The updated cell with no dictionaries. 410 """ 411 if isinstance(cell_value, dict): 412 entity_name = cell_value.get("entityName") 413 # If the cell value is a dictionary, check if it has an entityName key 414 if entity_name: 415 # If the cell value is a dictionary with an entityName key, return the entityName 416 return entity_name 417 entity_list = cell_value.get("items") 418 if entity_list or entity_list == []: 419 # If the cell value is a list of dictionaries, recursively call this function on each dictionary 420 return [ 421 self._remove_dict_from_cell(entity) for entity in entity_list 422 ] 423 return cell_value 424 return cell_value 425 426 def get_workspace_bucket(self) -> str: 427 """ 428 Get the workspace bucket name. Does not include the `gs://` prefix. 429 430 **Returns:** 431 - str: The bucket name. 432 """ 433 return self.get_workspace_info().json()["workspace"]["bucketName"] 434 435 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 436 """ 437 Get workspace entity information. 438 439 **Args:** 440 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 441 442 **Returns:** 443 - requests.Response: The response from the request. 444 """ 445 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 446 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 447 return self.request_util.run_request(uri=url, method=GET) 448 449 def get_workspace_acl(self) -> requests.Response: 450 """ 451 Get the workspace access control list (ACL). 452 453 **Returns:** 454 - requests.Response: The response from the request. 455 """ 456 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 457 return self.request_util.run_request( 458 uri=url, 459 method=GET 460 ) 461 462 def update_user_acl( 463 self, 464 email: str, 465 access_level: str, 466 can_share: bool = False, 467 can_compute: bool = False, 468 invite_users_not_found: bool = False, 469 ) -> requests.Response: 470 """ 471 Update the access control list (ACL) for a user in the workspace. 472 473 **Args:** 474 - email (str): The email of the user. 475 - access_level (str): The access level to grant to the user. 476 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 477 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 478 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 479 the workspace. Defaults to `False` 480 481 **Returns:** 482 - requests.Response: The response from the request. 483 """ 484 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 485 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 486 payload = { 487 "email": email, 488 "accessLevel": access_level, 489 "canShare": can_share, 490 "canCompute": can_compute, 491 } 492 logging.info( 493 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 494 response = self.request_util.run_request( 495 uri=url, 496 method=PATCH, 497 content_type=APPLICATION_JSON, 498 data="[" + json.dumps(payload) + "]" 499 ) 500 501 if response.json()["usersNotFound"] and not invite_users_not_found: 502 # Will be a list of one user 503 user_not_found = response.json()["usersNotFound"][0] 504 raise Exception( 505 f'The user {user_not_found["email"]} was not found and access was not updated' 506 ) 507 return response 508 509 @deprecated( 510 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 511 ) 512 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 513 """ 514 Update the metadata for a library dataset. 515 516 **Args:** 517 - library_metadata (dict): The metadata to update. 518 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 519 520 **Returns:** 521 - requests.Response: The response from the request. 522 """ 523 acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \ 524 f"/metadata?validate={str(validate).lower()}" 525 return self.request_util.run_request( 526 uri=acl, 527 method=PUT, 528 data=json.dumps(library_metadata) 529 ) 530 531 def update_multiple_users_acl( 532 self, acl_list: list[dict], invite_users_not_found: bool = False 533 ) -> requests.Response: 534 """ 535 Update the access control list (ACL) for multiple users in the workspace. 536 537 **Args:** 538 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 539 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 540 the workspace. Defaults to `False` 541 542 **Returns:** 543 - requests.Response: The response from the request. 544 """ 545 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 546 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 547 logging.info( 548 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 549 response = self.request_util.run_request( 550 uri=url, 551 method=PATCH, 552 content_type=APPLICATION_JSON, 553 data=json.dumps(acl_list) 554 ) 555 556 if response.json()["usersNotFound"] and not invite_users_not_found: 557 # Will be a list of one user 558 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 559 raise Exception( 560 f"The following users were not found and access was not updated: {users_not_found}" 561 ) 562 return response 563 564 def create_workspace( 565 self, 566 auth_domain: list[dict] = [], 567 attributes: dict = {}, 568 continue_if_exists: bool = False, 569 ) -> requests.Response: 570 """ 571 Create a new workspace in Terra. 572 573 **Args:** 574 - auth_domain (list[dict], optional): A list of authorization domains. Should look 575 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 576 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 577 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 578 579 **Returns:** 580 - requests.Response: The response from the request. 581 """ 582 payload = { 583 "namespace": self.billing_project, 584 "name": self.workspace_name, 585 "authorizationDomain": auth_domain, 586 "attributes": attributes, 587 "cloudPlatform": GCP 588 } 589 # If workspace already exists then continue if exists 590 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 591 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 592 response = self.request_util.run_request( 593 uri=f"{self.terra_link}/workspaces", 594 method=POST, 595 content_type=APPLICATION_JSON, 596 data=json.dumps(payload), 597 accept_return_codes=accept_return_codes 598 ) 599 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 600 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 601 return response 602 603 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 604 """ 605 Create an ingest dictionary for workspace attributes. 606 607 **Args:** 608 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 609 610 **Returns:** 611 - list[dict]: A list of dictionaries containing the workspace attributes. 612 """ 613 # If not provided then call API to get it 614 workspace_attributes = ( 615 workspace_attributes if workspace_attributes 616 else self.get_workspace_info().json()["workspace"]["attributes"] 617 ) 618 619 ingest_dict = [] 620 for key, value in workspace_attributes.items(): 621 # If value is dict just use 'items' as value 622 if isinstance(value, dict): 623 value = value.get("items") 624 # If value is list convert to comma separated string 625 if isinstance(value, list): 626 value = ", ".join(value) 627 ingest_dict.append( 628 { 629 "attribute": key, 630 "value": str(value) if value else None 631 } 632 ) 633 return ingest_dict 634 635 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 636 """ 637 Upload metadata to the workspace table. 638 639 **Args:** 640 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 641 642 **Returns:** 643 - requests.Response: The response from the request. 644 """ 645 endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 646 data = {"entities": open(entities_tsv, "rb")} 647 return self.request_util.upload_file( 648 uri=endpoint, 649 data=data 650 ) 651 652 def get_workspace_workflows(self) -> requests.Response: 653 """ 654 Get the workflows for the workspace. 655 656 **Returns:** 657 - requests.Response: The response from the request. 658 """ 659 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 660 return self.request_util.run_request( 661 uri=uri, 662 method=GET 663 ) 664 665 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 666 """ 667 Import a workflow into the workspace. 668 669 **Args:** 670 - workflow_dict (dict): The dictionary containing the workflow information. 671 - continue_if_exists (bool, optional): Whether to continue if the workflow 672 already exists. Defaults to `False`. 673 674 **Returns:** 675 - requests.Response: The response from the request. 676 """ 677 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 678 workflow_json = json.dumps(workflow_dict) 679 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 680 return self.request_util.run_request( 681 uri=uri, 682 method=POST, 683 data=workflow_json, 684 content_type=APPLICATION_JSON, 685 accept_return_codes=accept_return_codes 686 ) 687 688 def delete_workspace(self) -> requests.Response: 689 """ 690 Delete a Terra workspace. 691 692 **Returns:** 693 - requests.Response: The response from the request. 694 """ 695 return self.request_util.run_request( 696 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}", 697 method=DELETE 698 ) 699 700 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 701 """ 702 Update the attributes for the workspace. 703 704 **Args:** 705 - attributes (dict): The attributes to update. 706 707 **Returns:** 708 - requests.Response: The response from the request. 709 """ 710 return self.request_util.run_request( 711 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 712 method=PATCH, 713 data=json.dumps(attributes), 714 content_type=APPLICATION_JSON 715 ) 716 717 def leave_workspace( 718 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 719 ) -> requests.Response: 720 """ 721 Leave a workspace. If workspace ID not supplied, will look it up. 722 723 **Args:** 724 - workspace_id (str, optional): The workspace ID. Defaults to None. 725 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 726 Defaults to `False`. 727 728 **Returns:** 729 - requests.Response: The response from the request. 730 """ 731 if not workspace_id: 732 workspace_info = self.get_workspace_info().json() 733 workspace_id = workspace_info['workspace']['workspaceId'] 734 accepted_return_code = [403] if ignore_direct_access_error else [] 735 736 res = self.request_util.run_request( 737 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 738 method=DELETE, 739 accept_return_codes=accepted_return_code 740 ) 741 if (res.status_code == 403 742 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 743 logging.info( 744 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 745 f"access to the workspace (they could be an owner on the billing project)" 746 ) 747 return res 748 749 def change_workspace_public_setting(self, public: bool) -> requests.Response: 750 """ 751 Change a workspace's public setting. 752 753 **Args:** 754 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 755 public, `False` otherwise. 756 757 **Returns:** 758 - requests.Response: The response from the request. 759 """ 760 body = [ 761 { 762 "settingType": "PubliclyReadable", 763 "config": { 764 "enabled": public 765 } 766 } 767 ] 768 return self.request_util.run_request( 769 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 770 method=PUT, 771 content_type=APPLICATION_JSON, 772 data=json.dumps(body) 773 ) 774 775 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 776 """ 777 Check if a workspace is public. 778 779 **Args:** 780 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 781 it up if not provided. Defaults to None. 782 783 **Returns:** 784 - requests.Response: The response from the request. 785 """ 786 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 787 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 788 return self.request_util.run_request( 789 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 790 method=GET 791 ) 792 793 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 794 """Delete an entire entity table from a Terra workspace. 795 796 **Args:** 797 - entity_to_delete (str): The name of the entity table to delete. 798 799 **Returns:** 800 - requests.Response: The response from the request. 801 """ 802 response = self.request_util.run_request( 803 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", 804 method=DELETE 805 ) 806 if response.status_code == 204: 807 logging.info( 808 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 809 f"'{self.billing_project}/{self.workspace_name}'" 810 ) 811 else: 812 logging.error( 813 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 814 f"table: {response.text}" 815 ) 816 return response 817 818 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 819 """Save an entity table version in a Terra workspace. 820 821 **Args:** 822 - entity_type (str): The name of the entity table to save a new version for 823 - version_name (str): The name of the new version 824 """ 825 # Get the workspace metrics 826 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 827 file_name = f"{entity_type}.json" 828 # Write the workspace metrics to a JSON file 829 with open(file_name, "w") as json_file: 830 json.dump(workspace_metrics, json_file) 831 832 # Create a zip file with the same naming convention that Terra backend uses 833 timestamp_ms = int(time.time() * 1000) 834 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 835 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 836 zipf.write(file_name, arcname=f"json/{file_name}") 837 838 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 839 workspace_info = self.get_workspace_info().json() 840 path_to_upload_to = os.path.join( 841 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 842 ) 843 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 844 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 845 try: 846 active_account = gcp_util.get_active_gcloud_account() 847 except Exception as e: 848 active_account = workspace_info["workspace"]["createdBy"] 849 logging.error( 850 f"Encountered the following exception while attempting to get current GCP account: {e}. " 851 f"Will set the owner of the new metadata version as the workspace creator instead." 852 ) 853 gcp_util.upload_blob( 854 source_file=zip_file_name, 855 destination_path=path_to_upload_to, 856 custom_metadata={ 857 "createdBy": active_account, 858 "entityType": entity_type, 859 "timestamp": timestamp_ms, 860 "description": version_name, 861 } 862 ) 863 864 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 865 """ 866 Add a user comment to a submission in Terra. 867 868 **Args:** 869 - submission_id (str): The ID of the submission to add a comment to. 870 - user_comment (str): The comment to add to the submission. 871 872 **Returns:** 873 - requests.Response: The response from the request. 874 """ 875 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 876 return self.request_util.run_request( 877 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 878 method=PATCH, 879 content_type=APPLICATION_JSON, 880 data=json.dumps({"userComment": user_comment}), 881 ) 882 883 def initiate_submission( 884 self, 885 method_config_namespace: str, 886 method_config_name: str, 887 entity_type: str, 888 entity_name: str, 889 expression: str, 890 user_comment: Optional[str], 891 use_call_cache: bool = True 892 ) -> requests.Response: 893 """ 894 Initiate a submission within a Terra workspace. 895 896 Note - the workflow being initiated MUST already be imported into the workspace. 897 898 **Args:** 899 - method_config_namespace (str): The namespace of the method configuration. 900 - method_config_name (str): The name of the method configuration to use for the submission 901 (i.e. the workflow name). 902 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 903 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 904 "sample_set_1"). 905 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 906 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 907 be launched PER SAMPLE, the expression should be `this.samples`. 908 - user_comment (str, optional): The user comment to add to the submission. 909 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 910 911 **Returns:** 912 - requests.Response: The response from the request. 913 """ 914 payload = { 915 "methodConfigurationNamespace": method_config_namespace, 916 "methodConfigurationName": method_config_name, 917 "entityType": entity_type, 918 "entityName": entity_name, 919 "expression": expression, 920 "useCallCache": use_call_cache, 921 "deleteIntermediateOutputFiles": False, 922 "useReferenceDisks": False, 923 "ignoreEmptyOutputs": False, 924 } 925 if user_comment: 926 payload["userComment"] = user_comment 927 928 return self.request_util.run_request( 929 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 930 method=POST, 931 content_type=APPLICATION_JSON, 932 data=json.dumps(payload), 933 ) 934 935 def retry_failed_submission(self, submission_id: str) -> requests.Response: 936 """ 937 Retry a failed submission in Terra. 938 939 **Args:** 940 - submission_id (str): The ID of the submission to retry. 941 942 **Returns:** 943 - requests.Response: The response from the request. 944 """ 945 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 946 payload = {"retryType": "Failed"} 947 logging.info( 948 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 949 ) 950 return self.request_util.run_request( 951 uri=url, 952 method=POST, 953 content_type=APPLICATION_JSON, 954 data=json.dumps(payload) 955 ) 956 957 def get_submission_status(self, submission_id: str) -> requests.Response: 958 """ 959 Get the status of a submission in Terra. 960 961 **Args:** 962 - submission_id (str): The ID of the submission. 963 964 **Returns:** 965 - requests.Response: The response from the request. 966 """ 967 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 968 logging.info( 969 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 970 ) 971 return self.request_util.run_request( 972 uri=url, 973 method=GET 974 )
35class Terra: 36 """Class for generic Terra utilities.""" 37 38 def __init__(self, request_util: RunRequest, env: str = "prod"): 39 """ 40 Initialize the Terra class. 41 42 **Args:** 43 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 44 request utility class to handle HTTP requests. 45 """ 46 self.request_util = request_util 47 """@private""" 48 49 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 50 """ 51 Fetch the list of accessible workspaces. 52 53 **Args:** 54 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 55 56 **Returns:** 57 - requests.Response: The response from the request. 58 """ 59 fields_str = "fields=" + ",".join(fields) if fields else "" 60 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 61 return self.request_util.run_request( 62 uri=url, 63 method=GET 64 ) 65 66 def get_pet_account_json(self) -> requests.Response: 67 """ 68 Get the service account JSON. 69 70 **Returns:** 71 - requests.Response: The response from the request. 72 """ 73 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 74 return self.request_util.run_request( 75 uri=url, 76 method=GET 77 )
Class for generic Terra utilities.
38 def __init__(self, request_util: RunRequest, env: str = "prod"): 39 """ 40 Initialize the Terra class. 41 42 **Args:** 43 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 44 request utility class to handle HTTP requests. 45 """ 46 self.request_util = request_util 47 """@private"""
Initialize the Terra class.
Args:
- request_util (
ops_utils.request_util.RunRequest
): An instance of a request utility class to handle HTTP requests.
49 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 50 """ 51 Fetch the list of accessible workspaces. 52 53 **Args:** 54 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 55 56 **Returns:** 57 - requests.Response: The response from the request. 58 """ 59 fields_str = "fields=" + ",".join(fields) if fields else "" 60 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 61 return self.request_util.run_request( 62 uri=url, 63 method=GET 64 )
Fetch the list of accessible workspaces.
Args:
- fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
Returns:
- requests.Response: The response from the request.
66 def get_pet_account_json(self) -> requests.Response: 67 """ 68 Get the service account JSON. 69 70 **Returns:** 71 - requests.Response: The response from the request. 72 """ 73 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 74 return self.request_util.run_request( 75 uri=url, 76 method=GET 77 )
Get the service account JSON.
Returns:
- requests.Response: The response from the request.
80class TerraGroups: 81 """A class to manage Terra groups and their memberships.""" 82 83 GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN] 84 """@private""" 85 CONFLICT_STATUS_CODE = 409 86 """@private""" 87 88 def __init__(self, request_util: RunRequest): 89 """ 90 Initialize the TerraGroups class. 91 92 **Args:** 93 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 94 utility class to handle HTTP requests. 95 """ 96 self.request_util = request_util 97 """@private""" 98 99 def _check_role(self, role: str) -> None: 100 """ 101 Check if the role is valid. 102 103 Args: 104 role (str): The role to check. 105 106 Raises: 107 ValueError: If the role is not one of the allowed options. 108 """ 109 if role not in self.GROUP_MEMBERSHIP_OPTIONS: 110 raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}") 111 112 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 113 """ 114 Remove a user from a group. 115 116 **Args:** 117 - group (str): The name of the group. 118 - email (str): The email of the user to remove. 119 - role (str): The role of the user in the group 120 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 121 122 **Returns:** 123 - requests.Response: The response from the request. 124 """ 125 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 126 self._check_role(role) 127 logging.info(f"Removing {email} from group {group}") 128 return self.request_util.run_request( 129 uri=url, 130 method=DELETE 131 ) 132 133 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 134 """ 135 Create a new group. 136 137 **Args:** 138 - group_name (str): The name of the group to create. 139 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 140 141 **Returns:** 142 - requests.Response: The response from the request. 143 """ 144 url = f"{SAM_LINK}/groups/v1/{group_name}" 145 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 146 response = self.request_util.run_request( 147 uri=url, 148 method=POST, 149 accept_return_codes=accept_return_codes 150 ) 151 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 152 logging.info(f"Group {group_name} already exists. Continuing.") 153 return response 154 else: 155 logging.info(f"Created group {group_name}") 156 return response 157 158 def delete_group(self, group_name: str) -> requests.Response: 159 """ 160 Delete a group. 161 162 **Args:** 163 - group_name (str): The name of the group to delete. 164 165 **Returns:** 166 - requests.Response: The response from the request. 167 """ 168 url = f"{SAM_LINK}/groups/v1/{group_name}" 169 logging.info(f"Deleting group {group_name}") 170 return self.request_util.run_request( 171 uri=url, 172 method=DELETE 173 ) 174 175 def add_user_to_group( 176 self, group: str, email: str, role: str, continue_if_exists: bool = False 177 ) -> requests.Response: 178 """ 179 Add a user to a group. 180 181 **Args:** 182 - group (str): The name of the group. 183 - email (str): The email of the user to add. 184 - role (str): The role of the user in the group 185 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 186 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 187 Defaults to `False`. 188 189 **Returns:** 190 - requests.Response: The response from the request. 191 """ 192 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 193 self._check_role(role) 194 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 195 logging.info(f"Adding {email} to group {group} as {role}") 196 return self.request_util.run_request( 197 uri=url, 198 method=PUT, 199 accept_return_codes=accept_return_codes 200 )
A class to manage Terra groups and their memberships.
88 def __init__(self, request_util: RunRequest): 89 """ 90 Initialize the TerraGroups class. 91 92 **Args:** 93 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 94 utility class to handle HTTP requests. 95 """ 96 self.request_util = request_util 97 """@private"""
Initialize the TerraGroups class.
Args:
- request_util (
ops_utils.request_util.RunRequest
): An instance of a request utility class to handle HTTP requests.
112 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 113 """ 114 Remove a user from a group. 115 116 **Args:** 117 - group (str): The name of the group. 118 - email (str): The email of the user to remove. 119 - role (str): The role of the user in the group 120 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 121 122 **Returns:** 123 - requests.Response: The response from the request. 124 """ 125 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 126 self._check_role(role) 127 logging.info(f"Removing {email} from group {group}") 128 return self.request_util.run_request( 129 uri=url, 130 method=DELETE 131 )
Remove a user from a group.
Args:
- group (str): The name of the group.
- email (str): The email of the user to remove.
- role (str): The role of the user in the group
(must be one of
ops_utils.terra_utils.MEMBER
orops_utils.terra_utils.ADMIN
).
Returns:
- requests.Response: The response from the request.
133 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 134 """ 135 Create a new group. 136 137 **Args:** 138 - group_name (str): The name of the group to create. 139 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 140 141 **Returns:** 142 - requests.Response: The response from the request. 143 """ 144 url = f"{SAM_LINK}/groups/v1/{group_name}" 145 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 146 response = self.request_util.run_request( 147 uri=url, 148 method=POST, 149 accept_return_codes=accept_return_codes 150 ) 151 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 152 logging.info(f"Group {group_name} already exists. Continuing.") 153 return response 154 else: 155 logging.info(f"Created group {group_name}") 156 return response
Create a new group.
Args:
- group_name (str): The name of the group to create.
- continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
158 def delete_group(self, group_name: str) -> requests.Response: 159 """ 160 Delete a group. 161 162 **Args:** 163 - group_name (str): The name of the group to delete. 164 165 **Returns:** 166 - requests.Response: The response from the request. 167 """ 168 url = f"{SAM_LINK}/groups/v1/{group_name}" 169 logging.info(f"Deleting group {group_name}") 170 return self.request_util.run_request( 171 uri=url, 172 method=DELETE 173 )
Delete a group.
Args:
- group_name (str): The name of the group to delete.
Returns:
- requests.Response: The response from the request.
175 def add_user_to_group( 176 self, group: str, email: str, role: str, continue_if_exists: bool = False 177 ) -> requests.Response: 178 """ 179 Add a user to a group. 180 181 **Args:** 182 - group (str): The name of the group. 183 - email (str): The email of the user to add. 184 - role (str): The role of the user in the group 185 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 186 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 187 Defaults to `False`. 188 189 **Returns:** 190 - requests.Response: The response from the request. 191 """ 192 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 193 self._check_role(role) 194 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 195 logging.info(f"Adding {email} to group {group} as {role}") 196 return self.request_util.run_request( 197 uri=url, 198 method=PUT, 199 accept_return_codes=accept_return_codes 200 )
Add a user to a group.
Args:
- group (str): The name of the group.
- email (str): The email of the user to add.
- role (str): The role of the user in the group
(must be one of
ops_utils.terra_utils.MEMBER
orops_utils.terra_utils.ADMIN
). - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
Defaults to
False
.
Returns:
- requests.Response: The response from the request.
203class TerraWorkspace: 204 """Terra workspace class to manage workspaces and their attributes.""" 205 206 CONFLICT_STATUS_CODE = 409 207 """@private""" 208 209 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"): 210 """ 211 Initialize the TerraWorkspace class. 212 213 **Args:** 214 - billing_project (str): The billing project associated with the workspace. 215 - workspace_name (str): The name of the workspace. 216 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 217 request utility class to handle HTTP requests. 218 """ 219 self.billing_project = billing_project 220 """@private""" 221 self.workspace_name = workspace_name 222 """@private""" 223 self.workspace_id = None 224 """@private""" 225 self.resource_id = None 226 """@private""" 227 self.storage_container = None 228 """@private""" 229 self.bucket = None 230 """@private""" 231 self.wds_url = None 232 """@private""" 233 self.account_url: Optional[str] = None 234 """@private""" 235 self.request_util = request_util 236 """@private""" 237 if env.lower() == "dev": 238 self.terra_link = TERRA_DEV_LINK 239 """@private""" 240 elif env.lower() == "prod": 241 self.terra_link = TERRA_PROD_LINK 242 """@private""" 243 else: 244 raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.") 245 246 def __repr__(self) -> str: 247 """ 248 Return a string representation of the TerraWorkspace instance. 249 250 Returns: 251 str: The string representation of the TerraWorkspace instance. 252 """ 253 return f"{self.billing_project}/{self.workspace_name}" 254 255 def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any: 256 """ 257 Yield all entity metrics from the workspace. 258 259 Args: 260 entity (str): The type of entity to query. 261 total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000. 262 263 Yields: 264 Any: The JSON response containing entity metrics. 265 """ 266 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}" # noqa: E501 267 response = self.request_util.run_request( 268 uri=url, 269 method=GET, 270 content_type=APPLICATION_JSON 271 ) 272 raw_text = response.text 273 first_page_json = json.loads( 274 raw_text, 275 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 276 ) 277 yield first_page_json 278 total_pages = first_page_json["resultMetadata"]["filteredPageCount"] 279 logging.info( 280 f"Looping through {total_pages} pages of data") 281 282 for page in range(2, total_pages + 1): 283 logging.info(f"Getting page {page} of {total_pages}") 284 next_page = self.request_util.run_request( 285 uri=url, 286 method=GET, 287 content_type=APPLICATION_JSON, 288 params={"page": page} 289 ) 290 raw_text = next_page.text 291 page_json = json.loads( 292 raw_text, 293 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 294 ) 295 yield page_json 296 297 @staticmethod 298 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 299 """Check that all headers follow the standards required by TDR. 300 301 **Args:** 302 - table_name (str): The name of the Terra table. 303 - headers (list[str]): The headers of the Terra table to validate. 304 305 **Raises:** 306 - ValueError if any headers are considered invalid by TDR standards 307 """ 308 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 309 tdr_max_header_length = 63 310 311 headers_containing_too_many_characters = [] 312 headers_contain_invalid_characters = [] 313 314 for header in headers: 315 if len(header) > tdr_max_header_length: 316 headers_containing_too_many_characters.append(header) 317 if not re.match(tdr_header_allowed_pattern, header): 318 headers_contain_invalid_characters.append(header) 319 320 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 321 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 322 header naming.""" 323 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 324 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 325 allowed in TDR is {tdr_max_header_length}.\n""" 326 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 327 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 328 only contain numbers, letters, and underscore characters.\n""" 329 330 error_to_report = "" 331 if headers_containing_too_many_characters: 332 error_to_report += too_many_characters_error_message 333 if headers_contain_invalid_characters: 334 error_to_report += invalid_characters_error_message 335 if error_to_report: 336 error_to_report += base_error_message 337 raise ValueError(error_to_report) 338 339 def get_workspace_info(self) -> requests.Response: 340 """ 341 Get workspace information. 342 343 **Returns:** 344 - requests.Response: The response from the request. 345 """ 346 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}" 347 logging.info( 348 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 349 return self.request_util.run_request(uri=url, method=GET) 350 351 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]: 352 """ 353 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 354 355 **Args:** 356 - entity_type (str): The type of entity to get metrics for. 357 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 358 359 **Returns:** 360 - list[dict]: A list of dictionaries containing entity metrics. 361 """ 362 results = [] 363 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 364 365 for page in self._yield_all_entity_metrics(entity=entity_type): 366 results.extend(page["results"]) 367 368 # If remove_dicts is True, remove dictionaries from the workspace metrics 369 if remove_dicts: 370 for row in results: 371 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 372 return results 373 374 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 375 """ 376 Get specific entity metrics for a given entity type and name. 377 378 **Args:** 379 - entity_type (str): The type of entity to get metrics for. 380 - entity_name (str): The name of the entity to get metrics for. 381 382 **Returns:** 383 - requests.Response: The response from the request. 384 """ 385 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" 386 return self.request_util.run_request(uri=url, method=GET) 387 388 def _remove_dict_from_attributes(self, attributes: dict) -> dict: 389 """ 390 Remove dictionaries from the attributes. 391 392 Args: 393 attributes (dict): The attributes to remove dictionaries from. 394 395 Returns: 396 dict: The updated attributes with no dictionaries. 397 """ 398 for key, value in attributes.items(): 399 attributes[key] = self._remove_dict_from_cell(value) 400 return attributes 401 402 def _remove_dict_from_cell(self, cell_value: Any) -> Any: 403 """ 404 Remove a dictionary from a cell. 405 406 Args: 407 cell_value (Any): The dictionary to remove. 408 409 Returns: 410 Any: The updated cell with no dictionaries. 411 """ 412 if isinstance(cell_value, dict): 413 entity_name = cell_value.get("entityName") 414 # If the cell value is a dictionary, check if it has an entityName key 415 if entity_name: 416 # If the cell value is a dictionary with an entityName key, return the entityName 417 return entity_name 418 entity_list = cell_value.get("items") 419 if entity_list or entity_list == []: 420 # If the cell value is a list of dictionaries, recursively call this function on each dictionary 421 return [ 422 self._remove_dict_from_cell(entity) for entity in entity_list 423 ] 424 return cell_value 425 return cell_value 426 427 def get_workspace_bucket(self) -> str: 428 """ 429 Get the workspace bucket name. Does not include the `gs://` prefix. 430 431 **Returns:** 432 - str: The bucket name. 433 """ 434 return self.get_workspace_info().json()["workspace"]["bucketName"] 435 436 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 437 """ 438 Get workspace entity information. 439 440 **Args:** 441 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 442 443 **Returns:** 444 - requests.Response: The response from the request. 445 """ 446 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 447 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 448 return self.request_util.run_request(uri=url, method=GET) 449 450 def get_workspace_acl(self) -> requests.Response: 451 """ 452 Get the workspace access control list (ACL). 453 454 **Returns:** 455 - requests.Response: The response from the request. 456 """ 457 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 458 return self.request_util.run_request( 459 uri=url, 460 method=GET 461 ) 462 463 def update_user_acl( 464 self, 465 email: str, 466 access_level: str, 467 can_share: bool = False, 468 can_compute: bool = False, 469 invite_users_not_found: bool = False, 470 ) -> requests.Response: 471 """ 472 Update the access control list (ACL) for a user in the workspace. 473 474 **Args:** 475 - email (str): The email of the user. 476 - access_level (str): The access level to grant to the user. 477 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 478 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 479 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 480 the workspace. Defaults to `False` 481 482 **Returns:** 483 - requests.Response: The response from the request. 484 """ 485 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 486 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 487 payload = { 488 "email": email, 489 "accessLevel": access_level, 490 "canShare": can_share, 491 "canCompute": can_compute, 492 } 493 logging.info( 494 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 495 response = self.request_util.run_request( 496 uri=url, 497 method=PATCH, 498 content_type=APPLICATION_JSON, 499 data="[" + json.dumps(payload) + "]" 500 ) 501 502 if response.json()["usersNotFound"] and not invite_users_not_found: 503 # Will be a list of one user 504 user_not_found = response.json()["usersNotFound"][0] 505 raise Exception( 506 f'The user {user_not_found["email"]} was not found and access was not updated' 507 ) 508 return response 509 510 @deprecated( 511 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 512 ) 513 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 514 """ 515 Update the metadata for a library dataset. 516 517 **Args:** 518 - library_metadata (dict): The metadata to update. 519 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 520 521 **Returns:** 522 - requests.Response: The response from the request. 523 """ 524 acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \ 525 f"/metadata?validate={str(validate).lower()}" 526 return self.request_util.run_request( 527 uri=acl, 528 method=PUT, 529 data=json.dumps(library_metadata) 530 ) 531 532 def update_multiple_users_acl( 533 self, acl_list: list[dict], invite_users_not_found: bool = False 534 ) -> requests.Response: 535 """ 536 Update the access control list (ACL) for multiple users in the workspace. 537 538 **Args:** 539 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 540 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 541 the workspace. Defaults to `False` 542 543 **Returns:** 544 - requests.Response: The response from the request. 545 """ 546 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 547 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 548 logging.info( 549 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 550 response = self.request_util.run_request( 551 uri=url, 552 method=PATCH, 553 content_type=APPLICATION_JSON, 554 data=json.dumps(acl_list) 555 ) 556 557 if response.json()["usersNotFound"] and not invite_users_not_found: 558 # Will be a list of one user 559 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 560 raise Exception( 561 f"The following users were not found and access was not updated: {users_not_found}" 562 ) 563 return response 564 565 def create_workspace( 566 self, 567 auth_domain: list[dict] = [], 568 attributes: dict = {}, 569 continue_if_exists: bool = False, 570 ) -> requests.Response: 571 """ 572 Create a new workspace in Terra. 573 574 **Args:** 575 - auth_domain (list[dict], optional): A list of authorization domains. Should look 576 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 577 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 578 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 579 580 **Returns:** 581 - requests.Response: The response from the request. 582 """ 583 payload = { 584 "namespace": self.billing_project, 585 "name": self.workspace_name, 586 "authorizationDomain": auth_domain, 587 "attributes": attributes, 588 "cloudPlatform": GCP 589 } 590 # If workspace already exists then continue if exists 591 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 592 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 593 response = self.request_util.run_request( 594 uri=f"{self.terra_link}/workspaces", 595 method=POST, 596 content_type=APPLICATION_JSON, 597 data=json.dumps(payload), 598 accept_return_codes=accept_return_codes 599 ) 600 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 601 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 602 return response 603 604 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 605 """ 606 Create an ingest dictionary for workspace attributes. 607 608 **Args:** 609 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 610 611 **Returns:** 612 - list[dict]: A list of dictionaries containing the workspace attributes. 613 """ 614 # If not provided then call API to get it 615 workspace_attributes = ( 616 workspace_attributes if workspace_attributes 617 else self.get_workspace_info().json()["workspace"]["attributes"] 618 ) 619 620 ingest_dict = [] 621 for key, value in workspace_attributes.items(): 622 # If value is dict just use 'items' as value 623 if isinstance(value, dict): 624 value = value.get("items") 625 # If value is list convert to comma separated string 626 if isinstance(value, list): 627 value = ", ".join(value) 628 ingest_dict.append( 629 { 630 "attribute": key, 631 "value": str(value) if value else None 632 } 633 ) 634 return ingest_dict 635 636 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 637 """ 638 Upload metadata to the workspace table. 639 640 **Args:** 641 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 642 643 **Returns:** 644 - requests.Response: The response from the request. 645 """ 646 endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 647 data = {"entities": open(entities_tsv, "rb")} 648 return self.request_util.upload_file( 649 uri=endpoint, 650 data=data 651 ) 652 653 def get_workspace_workflows(self) -> requests.Response: 654 """ 655 Get the workflows for the workspace. 656 657 **Returns:** 658 - requests.Response: The response from the request. 659 """ 660 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 661 return self.request_util.run_request( 662 uri=uri, 663 method=GET 664 ) 665 666 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 667 """ 668 Import a workflow into the workspace. 669 670 **Args:** 671 - workflow_dict (dict): The dictionary containing the workflow information. 672 - continue_if_exists (bool, optional): Whether to continue if the workflow 673 already exists. Defaults to `False`. 674 675 **Returns:** 676 - requests.Response: The response from the request. 677 """ 678 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 679 workflow_json = json.dumps(workflow_dict) 680 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 681 return self.request_util.run_request( 682 uri=uri, 683 method=POST, 684 data=workflow_json, 685 content_type=APPLICATION_JSON, 686 accept_return_codes=accept_return_codes 687 ) 688 689 def delete_workspace(self) -> requests.Response: 690 """ 691 Delete a Terra workspace. 692 693 **Returns:** 694 - requests.Response: The response from the request. 695 """ 696 return self.request_util.run_request( 697 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}", 698 method=DELETE 699 ) 700 701 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 702 """ 703 Update the attributes for the workspace. 704 705 **Args:** 706 - attributes (dict): The attributes to update. 707 708 **Returns:** 709 - requests.Response: The response from the request. 710 """ 711 return self.request_util.run_request( 712 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 713 method=PATCH, 714 data=json.dumps(attributes), 715 content_type=APPLICATION_JSON 716 ) 717 718 def leave_workspace( 719 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 720 ) -> requests.Response: 721 """ 722 Leave a workspace. If workspace ID not supplied, will look it up. 723 724 **Args:** 725 - workspace_id (str, optional): The workspace ID. Defaults to None. 726 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 727 Defaults to `False`. 728 729 **Returns:** 730 - requests.Response: The response from the request. 731 """ 732 if not workspace_id: 733 workspace_info = self.get_workspace_info().json() 734 workspace_id = workspace_info['workspace']['workspaceId'] 735 accepted_return_code = [403] if ignore_direct_access_error else [] 736 737 res = self.request_util.run_request( 738 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 739 method=DELETE, 740 accept_return_codes=accepted_return_code 741 ) 742 if (res.status_code == 403 743 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 744 logging.info( 745 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 746 f"access to the workspace (they could be an owner on the billing project)" 747 ) 748 return res 749 750 def change_workspace_public_setting(self, public: bool) -> requests.Response: 751 """ 752 Change a workspace's public setting. 753 754 **Args:** 755 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 756 public, `False` otherwise. 757 758 **Returns:** 759 - requests.Response: The response from the request. 760 """ 761 body = [ 762 { 763 "settingType": "PubliclyReadable", 764 "config": { 765 "enabled": public 766 } 767 } 768 ] 769 return self.request_util.run_request( 770 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 771 method=PUT, 772 content_type=APPLICATION_JSON, 773 data=json.dumps(body) 774 ) 775 776 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 777 """ 778 Check if a workspace is public. 779 780 **Args:** 781 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 782 it up if not provided. Defaults to None. 783 784 **Returns:** 785 - requests.Response: The response from the request. 786 """ 787 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 788 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 789 return self.request_util.run_request( 790 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 791 method=GET 792 ) 793 794 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 795 """Delete an entire entity table from a Terra workspace. 796 797 **Args:** 798 - entity_to_delete (str): The name of the entity table to delete. 799 800 **Returns:** 801 - requests.Response: The response from the request. 802 """ 803 response = self.request_util.run_request( 804 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", 805 method=DELETE 806 ) 807 if response.status_code == 204: 808 logging.info( 809 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 810 f"'{self.billing_project}/{self.workspace_name}'" 811 ) 812 else: 813 logging.error( 814 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 815 f"table: {response.text}" 816 ) 817 return response 818 819 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 820 """Save an entity table version in a Terra workspace. 821 822 **Args:** 823 - entity_type (str): The name of the entity table to save a new version for 824 - version_name (str): The name of the new version 825 """ 826 # Get the workspace metrics 827 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 828 file_name = f"{entity_type}.json" 829 # Write the workspace metrics to a JSON file 830 with open(file_name, "w") as json_file: 831 json.dump(workspace_metrics, json_file) 832 833 # Create a zip file with the same naming convention that Terra backend uses 834 timestamp_ms = int(time.time() * 1000) 835 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 836 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 837 zipf.write(file_name, arcname=f"json/{file_name}") 838 839 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 840 workspace_info = self.get_workspace_info().json() 841 path_to_upload_to = os.path.join( 842 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 843 ) 844 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 845 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 846 try: 847 active_account = gcp_util.get_active_gcloud_account() 848 except Exception as e: 849 active_account = workspace_info["workspace"]["createdBy"] 850 logging.error( 851 f"Encountered the following exception while attempting to get current GCP account: {e}. " 852 f"Will set the owner of the new metadata version as the workspace creator instead." 853 ) 854 gcp_util.upload_blob( 855 source_file=zip_file_name, 856 destination_path=path_to_upload_to, 857 custom_metadata={ 858 "createdBy": active_account, 859 "entityType": entity_type, 860 "timestamp": timestamp_ms, 861 "description": version_name, 862 } 863 ) 864 865 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 866 """ 867 Add a user comment to a submission in Terra. 868 869 **Args:** 870 - submission_id (str): The ID of the submission to add a comment to. 871 - user_comment (str): The comment to add to the submission. 872 873 **Returns:** 874 - requests.Response: The response from the request. 875 """ 876 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 877 return self.request_util.run_request( 878 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 879 method=PATCH, 880 content_type=APPLICATION_JSON, 881 data=json.dumps({"userComment": user_comment}), 882 ) 883 884 def initiate_submission( 885 self, 886 method_config_namespace: str, 887 method_config_name: str, 888 entity_type: str, 889 entity_name: str, 890 expression: str, 891 user_comment: Optional[str], 892 use_call_cache: bool = True 893 ) -> requests.Response: 894 """ 895 Initiate a submission within a Terra workspace. 896 897 Note - the workflow being initiated MUST already be imported into the workspace. 898 899 **Args:** 900 - method_config_namespace (str): The namespace of the method configuration. 901 - method_config_name (str): The name of the method configuration to use for the submission 902 (i.e. the workflow name). 903 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 904 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 905 "sample_set_1"). 906 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 907 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 908 be launched PER SAMPLE, the expression should be `this.samples`. 909 - user_comment (str, optional): The user comment to add to the submission. 910 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 911 912 **Returns:** 913 - requests.Response: The response from the request. 914 """ 915 payload = { 916 "methodConfigurationNamespace": method_config_namespace, 917 "methodConfigurationName": method_config_name, 918 "entityType": entity_type, 919 "entityName": entity_name, 920 "expression": expression, 921 "useCallCache": use_call_cache, 922 "deleteIntermediateOutputFiles": False, 923 "useReferenceDisks": False, 924 "ignoreEmptyOutputs": False, 925 } 926 if user_comment: 927 payload["userComment"] = user_comment 928 929 return self.request_util.run_request( 930 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 931 method=POST, 932 content_type=APPLICATION_JSON, 933 data=json.dumps(payload), 934 ) 935 936 def retry_failed_submission(self, submission_id: str) -> requests.Response: 937 """ 938 Retry a failed submission in Terra. 939 940 **Args:** 941 - submission_id (str): The ID of the submission to retry. 942 943 **Returns:** 944 - requests.Response: The response from the request. 945 """ 946 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 947 payload = {"retryType": "Failed"} 948 logging.info( 949 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 950 ) 951 return self.request_util.run_request( 952 uri=url, 953 method=POST, 954 content_type=APPLICATION_JSON, 955 data=json.dumps(payload) 956 ) 957 958 def get_submission_status(self, submission_id: str) -> requests.Response: 959 """ 960 Get the status of a submission in Terra. 961 962 **Args:** 963 - submission_id (str): The ID of the submission. 964 965 **Returns:** 966 - requests.Response: The response from the request. 967 """ 968 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 969 logging.info( 970 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 971 ) 972 return self.request_util.run_request( 973 uri=url, 974 method=GET 975 )
Terra workspace class to manage workspaces and their attributes.
209 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"): 210 """ 211 Initialize the TerraWorkspace class. 212 213 **Args:** 214 - billing_project (str): The billing project associated with the workspace. 215 - workspace_name (str): The name of the workspace. 216 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 217 request utility class to handle HTTP requests. 218 """ 219 self.billing_project = billing_project 220 """@private""" 221 self.workspace_name = workspace_name 222 """@private""" 223 self.workspace_id = None 224 """@private""" 225 self.resource_id = None 226 """@private""" 227 self.storage_container = None 228 """@private""" 229 self.bucket = None 230 """@private""" 231 self.wds_url = None 232 """@private""" 233 self.account_url: Optional[str] = None 234 """@private""" 235 self.request_util = request_util 236 """@private""" 237 if env.lower() == "dev": 238 self.terra_link = TERRA_DEV_LINK 239 """@private""" 240 elif env.lower() == "prod": 241 self.terra_link = TERRA_PROD_LINK 242 """@private""" 243 else: 244 raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
Initialize the TerraWorkspace class.
Args:
- billing_project (str): The billing project associated with the workspace.
- workspace_name (str): The name of the workspace.
- request_util (
ops_utils.request_util.RunRequest
): An instance of a request utility class to handle HTTP requests.
297 @staticmethod 298 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 299 """Check that all headers follow the standards required by TDR. 300 301 **Args:** 302 - table_name (str): The name of the Terra table. 303 - headers (list[str]): The headers of the Terra table to validate. 304 305 **Raises:** 306 - ValueError if any headers are considered invalid by TDR standards 307 """ 308 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 309 tdr_max_header_length = 63 310 311 headers_containing_too_many_characters = [] 312 headers_contain_invalid_characters = [] 313 314 for header in headers: 315 if len(header) > tdr_max_header_length: 316 headers_containing_too_many_characters.append(header) 317 if not re.match(tdr_header_allowed_pattern, header): 318 headers_contain_invalid_characters.append(header) 319 320 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 321 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 322 header naming.""" 323 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 324 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 325 allowed in TDR is {tdr_max_header_length}.\n""" 326 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 327 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 328 only contain numbers, letters, and underscore characters.\n""" 329 330 error_to_report = "" 331 if headers_containing_too_many_characters: 332 error_to_report += too_many_characters_error_message 333 if headers_contain_invalid_characters: 334 error_to_report += invalid_characters_error_message 335 if error_to_report: 336 error_to_report += base_error_message 337 raise ValueError(error_to_report)
Check that all headers follow the standards required by TDR.
Args:
- table_name (str): The name of the Terra table.
- headers (list[str]): The headers of the Terra table to validate.
Raises:
- ValueError if any headers are considered invalid by TDR standards
339 def get_workspace_info(self) -> requests.Response: 340 """ 341 Get workspace information. 342 343 **Returns:** 344 - requests.Response: The response from the request. 345 """ 346 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}" 347 logging.info( 348 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 349 return self.request_util.run_request(uri=url, method=GET)
Get workspace information.
Returns:
- requests.Response: The response from the request.
351 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]: 352 """ 353 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 354 355 **Args:** 356 - entity_type (str): The type of entity to get metrics for. 357 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 358 359 **Returns:** 360 - list[dict]: A list of dictionaries containing entity metrics. 361 """ 362 results = [] 363 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 364 365 for page in self._yield_all_entity_metrics(entity=entity_type): 366 results.extend(page["results"]) 367 368 # If remove_dicts is True, remove dictionaries from the workspace metrics 369 if remove_dicts: 370 for row in results: 371 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 372 return results
Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
Args:
- entity_type (str): The type of entity to get metrics for.
- remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to
False
.
Returns:
- list[dict]: A list of dictionaries containing entity metrics.
374 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 375 """ 376 Get specific entity metrics for a given entity type and name. 377 378 **Args:** 379 - entity_type (str): The type of entity to get metrics for. 380 - entity_name (str): The name of the entity to get metrics for. 381 382 **Returns:** 383 - requests.Response: The response from the request. 384 """ 385 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" 386 return self.request_util.run_request(uri=url, method=GET)
Get specific entity metrics for a given entity type and name.
Args:
- entity_type (str): The type of entity to get metrics for.
- entity_name (str): The name of the entity to get metrics for.
Returns:
- requests.Response: The response from the request.
427 def get_workspace_bucket(self) -> str: 428 """ 429 Get the workspace bucket name. Does not include the `gs://` prefix. 430 431 **Returns:** 432 - str: The bucket name. 433 """ 434 return self.get_workspace_info().json()["workspace"]["bucketName"]
Get the workspace bucket name. Does not include the gs://
prefix.
Returns:
- str: The bucket name.
436 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 437 """ 438 Get workspace entity information. 439 440 **Args:** 441 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 442 443 **Returns:** 444 - requests.Response: The response from the request. 445 """ 446 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 447 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 448 return self.request_util.run_request(uri=url, method=GET)
Get workspace entity information.
Args:
- use_cache (bool, optional): Whether to use cache. Defaults to
True
.
Returns:
- requests.Response: The response from the request.
450 def get_workspace_acl(self) -> requests.Response: 451 """ 452 Get the workspace access control list (ACL). 453 454 **Returns:** 455 - requests.Response: The response from the request. 456 """ 457 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 458 return self.request_util.run_request( 459 uri=url, 460 method=GET 461 )
Get the workspace access control list (ACL).
Returns:
- requests.Response: The response from the request.
463 def update_user_acl( 464 self, 465 email: str, 466 access_level: str, 467 can_share: bool = False, 468 can_compute: bool = False, 469 invite_users_not_found: bool = False, 470 ) -> requests.Response: 471 """ 472 Update the access control list (ACL) for a user in the workspace. 473 474 **Args:** 475 - email (str): The email of the user. 476 - access_level (str): The access level to grant to the user. 477 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 478 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 479 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 480 the workspace. Defaults to `False` 481 482 **Returns:** 483 - requests.Response: The response from the request. 484 """ 485 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 486 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 487 payload = { 488 "email": email, 489 "accessLevel": access_level, 490 "canShare": can_share, 491 "canCompute": can_compute, 492 } 493 logging.info( 494 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 495 response = self.request_util.run_request( 496 uri=url, 497 method=PATCH, 498 content_type=APPLICATION_JSON, 499 data="[" + json.dumps(payload) + "]" 500 ) 501 502 if response.json()["usersNotFound"] and not invite_users_not_found: 503 # Will be a list of one user 504 user_not_found = response.json()["usersNotFound"][0] 505 raise Exception( 506 f'The user {user_not_found["email"]} was not found and access was not updated' 507 ) 508 return response
Update the access control list (ACL) for a user in the workspace.
Args:
- email (str): The email of the user.
- access_level (str): The access level to grant to the user.
- can_share (bool, optional): Whether the user can share the workspace. Defaults to
False
. - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to
False
. - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
the workspace. Defaults to
False
Returns:
- requests.Response: The response from the request.
510 @deprecated( 511 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 512 ) 513 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 514 """ 515 Update the metadata for a library dataset. 516 517 **Args:** 518 - library_metadata (dict): The metadata to update. 519 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 520 521 **Returns:** 522 - requests.Response: The response from the request. 523 """ 524 acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \ 525 f"/metadata?validate={str(validate).lower()}" 526 return self.request_util.run_request( 527 uri=acl, 528 method=PUT, 529 data=json.dumps(library_metadata) 530 )
Update the metadata for a library dataset.
Args:
- library_metadata (dict): The metadata to update.
- validate (bool, optional): Whether to validate the metadata. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
532 def update_multiple_users_acl( 533 self, acl_list: list[dict], invite_users_not_found: bool = False 534 ) -> requests.Response: 535 """ 536 Update the access control list (ACL) for multiple users in the workspace. 537 538 **Args:** 539 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 540 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 541 the workspace. Defaults to `False` 542 543 **Returns:** 544 - requests.Response: The response from the request. 545 """ 546 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 547 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 548 logging.info( 549 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 550 response = self.request_util.run_request( 551 uri=url, 552 method=PATCH, 553 content_type=APPLICATION_JSON, 554 data=json.dumps(acl_list) 555 ) 556 557 if response.json()["usersNotFound"] and not invite_users_not_found: 558 # Will be a list of one user 559 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 560 raise Exception( 561 f"The following users were not found and access was not updated: {users_not_found}" 562 ) 563 return response
Update the access control list (ACL) for multiple users in the workspace.
Args:
- acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
- invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
the workspace. Defaults to
False
Returns:
- requests.Response: The response from the request.
565 def create_workspace( 566 self, 567 auth_domain: list[dict] = [], 568 attributes: dict = {}, 569 continue_if_exists: bool = False, 570 ) -> requests.Response: 571 """ 572 Create a new workspace in Terra. 573 574 **Args:** 575 - auth_domain (list[dict], optional): A list of authorization domains. Should look 576 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 577 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 578 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 579 580 **Returns:** 581 - requests.Response: The response from the request. 582 """ 583 payload = { 584 "namespace": self.billing_project, 585 "name": self.workspace_name, 586 "authorizationDomain": auth_domain, 587 "attributes": attributes, 588 "cloudPlatform": GCP 589 } 590 # If workspace already exists then continue if exists 591 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 592 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 593 response = self.request_util.run_request( 594 uri=f"{self.terra_link}/workspaces", 595 method=POST, 596 content_type=APPLICATION_JSON, 597 data=json.dumps(payload), 598 accept_return_codes=accept_return_codes 599 ) 600 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 601 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 602 return response
Create a new workspace in Terra.
Args:
- auth_domain (list[dict], optional): A list of authorization domains. Should look
like
[{"membersGroupName": "some_auth_domain"}]
. Defaults to an empty list. - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
- continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
604 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 605 """ 606 Create an ingest dictionary for workspace attributes. 607 608 **Args:** 609 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 610 611 **Returns:** 612 - list[dict]: A list of dictionaries containing the workspace attributes. 613 """ 614 # If not provided then call API to get it 615 workspace_attributes = ( 616 workspace_attributes if workspace_attributes 617 else self.get_workspace_info().json()["workspace"]["attributes"] 618 ) 619 620 ingest_dict = [] 621 for key, value in workspace_attributes.items(): 622 # If value is dict just use 'items' as value 623 if isinstance(value, dict): 624 value = value.get("items") 625 # If value is list convert to comma separated string 626 if isinstance(value, list): 627 value = ", ".join(value) 628 ingest_dict.append( 629 { 630 "attribute": key, 631 "value": str(value) if value else None 632 } 633 ) 634 return ingest_dict
Create an ingest dictionary for workspace attributes.
Args:
- workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
Returns:
- list[dict]: A list of dictionaries containing the workspace attributes.
636 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 637 """ 638 Upload metadata to the workspace table. 639 640 **Args:** 641 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 642 643 **Returns:** 644 - requests.Response: The response from the request. 645 """ 646 endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 647 data = {"entities": open(entities_tsv, "rb")} 648 return self.request_util.upload_file( 649 uri=endpoint, 650 data=data 651 )
Upload metadata to the workspace table.
Args:
- entities_tsv (str): The path to the TSV file containing the metadata to upload.
Returns:
- requests.Response: The response from the request.
653 def get_workspace_workflows(self) -> requests.Response: 654 """ 655 Get the workflows for the workspace. 656 657 **Returns:** 658 - requests.Response: The response from the request. 659 """ 660 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 661 return self.request_util.run_request( 662 uri=uri, 663 method=GET 664 )
Get the workflows for the workspace.
Returns:
- requests.Response: The response from the request.
666 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 667 """ 668 Import a workflow into the workspace. 669 670 **Args:** 671 - workflow_dict (dict): The dictionary containing the workflow information. 672 - continue_if_exists (bool, optional): Whether to continue if the workflow 673 already exists. Defaults to `False`. 674 675 **Returns:** 676 - requests.Response: The response from the request. 677 """ 678 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 679 workflow_json = json.dumps(workflow_dict) 680 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 681 return self.request_util.run_request( 682 uri=uri, 683 method=POST, 684 data=workflow_json, 685 content_type=APPLICATION_JSON, 686 accept_return_codes=accept_return_codes 687 )
Import a workflow into the workspace.
Args:
- workflow_dict (dict): The dictionary containing the workflow information.
- continue_if_exists (bool, optional): Whether to continue if the workflow
already exists. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
689 def delete_workspace(self) -> requests.Response: 690 """ 691 Delete a Terra workspace. 692 693 **Returns:** 694 - requests.Response: The response from the request. 695 """ 696 return self.request_util.run_request( 697 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}", 698 method=DELETE 699 )
Delete a Terra workspace.
Returns:
- requests.Response: The response from the request.
701 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 702 """ 703 Update the attributes for the workspace. 704 705 **Args:** 706 - attributes (dict): The attributes to update. 707 708 **Returns:** 709 - requests.Response: The response from the request. 710 """ 711 return self.request_util.run_request( 712 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 713 method=PATCH, 714 data=json.dumps(attributes), 715 content_type=APPLICATION_JSON 716 )
Update the attributes for the workspace.
Args:
- attributes (dict): The attributes to update.
Returns:
- requests.Response: The response from the request.
718 def leave_workspace( 719 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 720 ) -> requests.Response: 721 """ 722 Leave a workspace. If workspace ID not supplied, will look it up. 723 724 **Args:** 725 - workspace_id (str, optional): The workspace ID. Defaults to None. 726 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 727 Defaults to `False`. 728 729 **Returns:** 730 - requests.Response: The response from the request. 731 """ 732 if not workspace_id: 733 workspace_info = self.get_workspace_info().json() 734 workspace_id = workspace_info['workspace']['workspaceId'] 735 accepted_return_code = [403] if ignore_direct_access_error else [] 736 737 res = self.request_util.run_request( 738 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 739 method=DELETE, 740 accept_return_codes=accepted_return_code 741 ) 742 if (res.status_code == 403 743 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 744 logging.info( 745 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 746 f"access to the workspace (they could be an owner on the billing project)" 747 ) 748 return res
Leave a workspace. If workspace ID not supplied, will look it up.
Args:
- workspace_id (str, optional): The workspace ID. Defaults to None.
- ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
Defaults to
False
.
Returns:
- requests.Response: The response from the request.
750 def change_workspace_public_setting(self, public: bool) -> requests.Response: 751 """ 752 Change a workspace's public setting. 753 754 **Args:** 755 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 756 public, `False` otherwise. 757 758 **Returns:** 759 - requests.Response: The response from the request. 760 """ 761 body = [ 762 { 763 "settingType": "PubliclyReadable", 764 "config": { 765 "enabled": public 766 } 767 } 768 ] 769 return self.request_util.run_request( 770 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 771 method=PUT, 772 content_type=APPLICATION_JSON, 773 data=json.dumps(body) 774 )
Change a workspace's public setting.
Args:
- public (bool, optional): Whether the workspace should be public. Set to
True
to be made public,False
otherwise.
Returns:
- requests.Response: The response from the request.
776 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 777 """ 778 Check if a workspace is public. 779 780 **Args:** 781 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 782 it up if not provided. Defaults to None. 783 784 **Returns:** 785 - requests.Response: The response from the request. 786 """ 787 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 788 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 789 return self.request_util.run_request( 790 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 791 method=GET 792 )
Check if a workspace is public.
Args:
- bucket (str, optional): The bucket name (provided without the
gs://
prefix). Will look it up if not provided. Defaults to None.
Returns:
- requests.Response: The response from the request.
794 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 795 """Delete an entire entity table from a Terra workspace. 796 797 **Args:** 798 - entity_to_delete (str): The name of the entity table to delete. 799 800 **Returns:** 801 - requests.Response: The response from the request. 802 """ 803 response = self.request_util.run_request( 804 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", 805 method=DELETE 806 ) 807 if response.status_code == 204: 808 logging.info( 809 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 810 f"'{self.billing_project}/{self.workspace_name}'" 811 ) 812 else: 813 logging.error( 814 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 815 f"table: {response.text}" 816 ) 817 return response
Delete an entire entity table from a Terra workspace.
Args:
- entity_to_delete (str): The name of the entity table to delete.
Returns:
- requests.Response: The response from the request.
819 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 820 """Save an entity table version in a Terra workspace. 821 822 **Args:** 823 - entity_type (str): The name of the entity table to save a new version for 824 - version_name (str): The name of the new version 825 """ 826 # Get the workspace metrics 827 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 828 file_name = f"{entity_type}.json" 829 # Write the workspace metrics to a JSON file 830 with open(file_name, "w") as json_file: 831 json.dump(workspace_metrics, json_file) 832 833 # Create a zip file with the same naming convention that Terra backend uses 834 timestamp_ms = int(time.time() * 1000) 835 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 836 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 837 zipf.write(file_name, arcname=f"json/{file_name}") 838 839 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 840 workspace_info = self.get_workspace_info().json() 841 path_to_upload_to = os.path.join( 842 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 843 ) 844 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 845 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 846 try: 847 active_account = gcp_util.get_active_gcloud_account() 848 except Exception as e: 849 active_account = workspace_info["workspace"]["createdBy"] 850 logging.error( 851 f"Encountered the following exception while attempting to get current GCP account: {e}. " 852 f"Will set the owner of the new metadata version as the workspace creator instead." 853 ) 854 gcp_util.upload_blob( 855 source_file=zip_file_name, 856 destination_path=path_to_upload_to, 857 custom_metadata={ 858 "createdBy": active_account, 859 "entityType": entity_type, 860 "timestamp": timestamp_ms, 861 "description": version_name, 862 } 863 )
Save an entity table version in a Terra workspace.
Args:
- entity_type (str): The name of the entity table to save a new version for
- version_name (str): The name of the new version
865 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 866 """ 867 Add a user comment to a submission in Terra. 868 869 **Args:** 870 - submission_id (str): The ID of the submission to add a comment to. 871 - user_comment (str): The comment to add to the submission. 872 873 **Returns:** 874 - requests.Response: The response from the request. 875 """ 876 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 877 return self.request_util.run_request( 878 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 879 method=PATCH, 880 content_type=APPLICATION_JSON, 881 data=json.dumps({"userComment": user_comment}), 882 )
Add a user comment to a submission in Terra.
Args:
- submission_id (str): The ID of the submission to add a comment to.
- user_comment (str): The comment to add to the submission.
Returns:
- requests.Response: The response from the request.
884 def initiate_submission( 885 self, 886 method_config_namespace: str, 887 method_config_name: str, 888 entity_type: str, 889 entity_name: str, 890 expression: str, 891 user_comment: Optional[str], 892 use_call_cache: bool = True 893 ) -> requests.Response: 894 """ 895 Initiate a submission within a Terra workspace. 896 897 Note - the workflow being initiated MUST already be imported into the workspace. 898 899 **Args:** 900 - method_config_namespace (str): The namespace of the method configuration. 901 - method_config_name (str): The name of the method configuration to use for the submission 902 (i.e. the workflow name). 903 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 904 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 905 "sample_set_1"). 906 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 907 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 908 be launched PER SAMPLE, the expression should be `this.samples`. 909 - user_comment (str, optional): The user comment to add to the submission. 910 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 911 912 **Returns:** 913 - requests.Response: The response from the request. 914 """ 915 payload = { 916 "methodConfigurationNamespace": method_config_namespace, 917 "methodConfigurationName": method_config_name, 918 "entityType": entity_type, 919 "entityName": entity_name, 920 "expression": expression, 921 "useCallCache": use_call_cache, 922 "deleteIntermediateOutputFiles": False, 923 "useReferenceDisks": False, 924 "ignoreEmptyOutputs": False, 925 } 926 if user_comment: 927 payload["userComment"] = user_comment 928 929 return self.request_util.run_request( 930 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 931 method=POST, 932 content_type=APPLICATION_JSON, 933 data=json.dumps(payload), 934 )
Initiate a submission within a Terra workspace.
Note - the workflow being initiated MUST already be imported into the workspace.
Args:
- method_config_namespace (str): The namespace of the method configuration.
- method_config_name (str): The name of the method configuration to use for the submission (i.e. the workflow name).
- entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
- entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or "sample_set_1").
- expression (str): The "expression" to use. For example, if the
entity_type
issample
and the workflow is launching one sample, this can be left asthis
. If theentity_type
issample_set
, but one workflow should be launched PER SAMPLE, the expression should bethis.samples
. - user_comment (str, optional): The user comment to add to the submission.
- use_call_cache (bool, optional): Whether to use the call caching. Defaults to
True
.
Returns:
- requests.Response: The response from the request.
936 def retry_failed_submission(self, submission_id: str) -> requests.Response: 937 """ 938 Retry a failed submission in Terra. 939 940 **Args:** 941 - submission_id (str): The ID of the submission to retry. 942 943 **Returns:** 944 - requests.Response: The response from the request. 945 """ 946 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 947 payload = {"retryType": "Failed"} 948 logging.info( 949 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 950 ) 951 return self.request_util.run_request( 952 uri=url, 953 method=POST, 954 content_type=APPLICATION_JSON, 955 data=json.dumps(payload) 956 )
Retry a failed submission in Terra.
Args:
- submission_id (str): The ID of the submission to retry.
Returns:
- requests.Response: The response from the request.
958 def get_submission_status(self, submission_id: str) -> requests.Response: 959 """ 960 Get the status of a submission in Terra. 961 962 **Args:** 963 - submission_id (str): The ID of the submission. 964 965 **Returns:** 966 - requests.Response: The response from the request. 967 """ 968 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 969 logging.info( 970 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 971 ) 972 return self.request_util.run_request( 973 uri=url, 974 method=GET 975 )
Get the status of a submission in Terra.
Args:
- submission_id (str): The ID of the submission.
Returns:
- requests.Response: The response from the request.