ops_utils.terra_util
Utilities for working with Terra.
1"""Utilities for working with Terra.""" 2import json 3import logging 4import re 5from typing import Any, Optional 6import requests 7import time 8import zipfile 9import os 10 11from . import deprecated 12from .vars import GCP 13from .gcp_utils import GCPCloudFunctions 14from .request_util import GET, POST, PATCH, PUT, DELETE, RunRequest 15 16TERRA_LINK = "https://api.firecloud.org/api" 17"""@private""" 18LEONARDO_LINK = "https://leonardo.dsde-prod.broadinstitute.org/api" 19"""@private""" 20WORKSPACE_LINK = "https://workspace.dsde-prod.broadinstitute.org/api/workspaces/v1" 21"""@private""" 22SAM_LINK = "https://sam.dsde-prod.broadinstitute.org/api" 23"""@private""" 24RAWLS_LINK = "https://rawls.dsde-prod.broadinstitute.org/api" 25"""@private""" 26 27MEMBER = "member" 28ADMIN = "admin" 29 30 31class Terra: 32 """Class for generic Terra utilities.""" 33 34 def __init__(self, request_util: RunRequest): 35 """ 36 Initialize the Terra class. 37 38 **Args:** 39 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 40 request utility class to handle HTTP requests. 41 """ 42 self.request_util = request_util 43 """@private""" 44 45 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 46 """ 47 Fetch the list of accessible workspaces. 48 49 **Args:** 50 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 51 52 **Returns:** 53 - requests.Response: The response from the request. 54 """ 55 fields_str = "fields=" + ",".join(fields) if fields else "" 56 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 57 return self.request_util.run_request( 58 uri=url, 59 method=GET 60 ) 61 62 def get_pet_account_json(self) -> requests.Response: 63 """ 64 Get the service account JSON. 65 66 **Returns:** 67 - requests.Response: The response from the request. 68 """ 69 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 70 return self.request_util.run_request( 71 uri=url, 72 method=GET 73 ) 74 75 76class TerraGroups: 77 """A class to manage Terra groups and their memberships.""" 78 79 GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN] 80 """@private""" 81 CONFLICT_STATUS_CODE = 409 82 """@private""" 83 84 def __init__(self, request_util: RunRequest): 85 """ 86 Initialize the TerraGroups class. 87 88 **Args:** 89 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 90 utility class to handle HTTP requests. 91 """ 92 self.request_util = request_util 93 """@private""" 94 95 def _check_role(self, role: str) -> None: 96 """ 97 Check if the role is valid. 98 99 Args: 100 role (str): The role to check. 101 102 Raises: 103 ValueError: If the role is not one of the allowed options. 104 """ 105 if role not in self.GROUP_MEMBERSHIP_OPTIONS: 106 raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}") 107 108 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 109 """ 110 Remove a user from a group. 111 112 **Args:** 113 - group (str): The name of the group. 114 - email (str): The email of the user to remove. 115 - role (str): The role of the user in the group 116 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 117 118 **Returns:** 119 - requests.Response: The response from the request. 120 """ 121 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 122 self._check_role(role) 123 logging.info(f"Removing {email} from group {group}") 124 return self.request_util.run_request( 125 uri=url, 126 method=DELETE 127 ) 128 129 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 130 """ 131 Create a new group. 132 133 **Args:** 134 - group_name (str): The name of the group to create. 135 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 136 137 **Returns:** 138 - requests.Response: The response from the request. 139 """ 140 url = f"{SAM_LINK}/groups/v1/{group_name}" 141 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 142 response = self.request_util.run_request( 143 uri=url, 144 method=POST, 145 accept_return_codes=accept_return_codes 146 ) 147 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 148 logging.info(f"Group {group_name} already exists. Continuing.") 149 return response 150 else: 151 logging.info(f"Created group {group_name}") 152 return response 153 154 def delete_group(self, group_name: str) -> requests.Response: 155 """ 156 Delete a group. 157 158 **Args:** 159 - group_name (str): The name of the group to delete. 160 161 **Returns:** 162 - requests.Response: The response from the request. 163 """ 164 url = f"{SAM_LINK}/groups/v1/{group_name}" 165 logging.info(f"Deleting group {group_name}") 166 return self.request_util.run_request( 167 uri=url, 168 method=DELETE 169 ) 170 171 def add_user_to_group( 172 self, group: str, email: str, role: str, continue_if_exists: bool = False 173 ) -> requests.Response: 174 """ 175 Add a user to a group. 176 177 **Args:** 178 - group (str): The name of the group. 179 - email (str): The email of the user to add. 180 - role (str): The role of the user in the group 181 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 182 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 183 Defaults to `False`. 184 185 **Returns:** 186 - requests.Response: The response from the request. 187 """ 188 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 189 self._check_role(role) 190 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 191 logging.info(f"Adding {email} to group {group} as {role}") 192 return self.request_util.run_request( 193 uri=url, 194 method=PUT, 195 accept_return_codes=accept_return_codes 196 ) 197 198 199class TerraWorkspace: 200 """Terra workspace class to manage workspaces and their attributes.""" 201 202 CONFLICT_STATUS_CODE = 409 203 """@private""" 204 205 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest): 206 """ 207 Initialize the TerraWorkspace class. 208 209 **Args:** 210 - billing_project (str): The billing project associated with the workspace. 211 - workspace_name (str): The name of the workspace. 212 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 213 request utility class to handle HTTP requests. 214 """ 215 self.billing_project = billing_project 216 """@private""" 217 self.workspace_name = workspace_name 218 """@private""" 219 self.workspace_id = None 220 """@private""" 221 self.resource_id = None 222 """@private""" 223 self.storage_container = None 224 """@private""" 225 self.bucket = None 226 """@private""" 227 self.wds_url = None 228 """@private""" 229 self.account_url: Optional[str] = None 230 """@private""" 231 self.request_util = request_util 232 """@private""" 233 234 def __repr__(self) -> str: 235 """ 236 Return a string representation of the TerraWorkspace instance. 237 238 Returns: 239 str: The string representation of the TerraWorkspace instance. 240 """ 241 return f"{self.billing_project}/{self.workspace_name}" 242 243 def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any: 244 """ 245 Yield all entity metrics from the workspace. 246 247 Args: 248 entity (str): The type of entity to query. 249 total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000. 250 251 Yields: 252 Any: The JSON response containing entity metrics. 253 """ 254 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}" # noqa: E501 255 response = self.request_util.run_request( 256 uri=url, 257 method=GET, 258 content_type="application/json" 259 ) 260 first_page_json = response.json() 261 yield first_page_json 262 total_pages = first_page_json["resultMetadata"]["filteredPageCount"] 263 logging.info( 264 f"Looping through {total_pages} pages of data") 265 266 for page in range(2, total_pages + 1): 267 logging.info(f"Getting page {page} of {total_pages}") 268 next_page = self.request_util.run_request( 269 uri=url, 270 method=GET, 271 content_type="application/json", 272 params={"page": page} 273 ) 274 yield next_page.json() 275 276 @staticmethod 277 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 278 """Check that all headers follow the standards required by TDR. 279 280 **Args:** 281 - table_name (str): The name of the Terra table. 282 - headers (list[str]): The headers of the Terra table to validate. 283 284 **Raises:** 285 - ValueError if any headers are considered invalid by TDR standards 286 """ 287 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 288 tdr_max_header_length = 63 289 290 headers_containing_too_many_characters = [] 291 headers_contain_invalid_characters = [] 292 293 for header in headers: 294 if len(header) > tdr_max_header_length: 295 headers_containing_too_many_characters.append(header) 296 if not re.match(tdr_header_allowed_pattern, header): 297 headers_contain_invalid_characters.append(header) 298 299 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 300 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 301 header naming.""" 302 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 303 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 304 allowed in TDR is {tdr_max_header_length}.\n""" 305 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 306 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 307 only contain numbers, letters, and underscore characters.\n""" 308 309 error_to_report = "" 310 if headers_containing_too_many_characters: 311 error_to_report += too_many_characters_error_message 312 if headers_contain_invalid_characters: 313 error_to_report += invalid_characters_error_message 314 if error_to_report: 315 error_to_report += base_error_message 316 raise ValueError(error_to_report) 317 318 def get_workspace_info(self) -> requests.Response: 319 """ 320 Get workspace information. 321 322 **Returns:** 323 - requests.Response: The response from the request. 324 """ 325 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}" 326 logging.info( 327 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 328 return self.request_util.run_request(uri=url, method=GET) 329 330 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]: 331 """ 332 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 333 334 **Args:** 335 - entity_type (str): The type of entity to get metrics for. 336 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 337 338 **Returns:** 339 - list[dict]: A list of dictionaries containing entity metrics. 340 """ 341 results = [] 342 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 343 344 for page in self._yield_all_entity_metrics(entity=entity_type): 345 results.extend(page["results"]) 346 347 # If remove_dicts is True, remove dictionaries from the workspace metrics 348 if remove_dicts: 349 for row in results: 350 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 351 return results 352 353 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 354 """ 355 Get specific entity metrics for a given entity type and name. 356 357 **Args:** 358 - entity_type (str): The type of entity to get metrics for. 359 - entity_name (str): The name of the entity to get metrics for. 360 361 **Returns:** 362 - requests.Response: The response from the request. 363 """ 364 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" 365 return self.request_util.run_request(uri=url, method=GET) 366 367 def _remove_dict_from_attributes(self, attributes: dict) -> dict: 368 """ 369 Remove dictionaries from the attributes. 370 371 Args: 372 attributes (dict): The attributes to remove dictionaries from. 373 374 Returns: 375 dict: The updated attributes with no dictionaries. 376 """ 377 for key, value in attributes.items(): 378 attributes[key] = self._remove_dict_from_cell(value) 379 return attributes 380 381 def _remove_dict_from_cell(self, cell_value: Any) -> Any: 382 """ 383 Remove a dictionary from a cell. 384 385 Args: 386 cell_value (Any): The dictionary to remove. 387 388 Returns: 389 Any: The updated cell with no dictionaries. 390 """ 391 if isinstance(cell_value, dict): 392 entity_name = cell_value.get("entityName") 393 # If the cell value is a dictionary, check if it has an entityName key 394 if entity_name: 395 # If the cell value is a dictionary with an entityName key, return the entityName 396 return entity_name 397 entity_list = cell_value.get("items") 398 if entity_list or entity_list == []: 399 # If the cell value is a list of dictionaries, recursively call this function on each dictionary 400 return [ 401 self._remove_dict_from_cell(entity) for entity in entity_list 402 ] 403 return cell_value 404 return cell_value 405 406 def get_workspace_bucket(self) -> str: 407 """ 408 Get the workspace bucket name. Does not include the `gs://` prefix. 409 410 **Returns:** 411 - str: The bucket name. 412 """ 413 return self.get_workspace_info().json()["workspace"]["bucketName"] 414 415 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 416 """ 417 Get workspace entity information. 418 419 **Args:** 420 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 421 422 **Returns:** 423 - requests.Response: The response from the request. 424 """ 425 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 426 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 427 return self.request_util.run_request(uri=url, method=GET) 428 429 def get_workspace_acl(self) -> requests.Response: 430 """ 431 Get the workspace access control list (ACL). 432 433 **Returns:** 434 - requests.Response: The response from the request. 435 """ 436 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 437 return self.request_util.run_request( 438 uri=url, 439 method=GET 440 ) 441 442 def update_user_acl( 443 self, 444 email: str, 445 access_level: str, 446 can_share: bool = False, 447 can_compute: bool = False, 448 invite_users_not_found: bool = False, 449 ) -> requests.Response: 450 """ 451 Update the access control list (ACL) for a user in the workspace. 452 453 **Args:** 454 - email (str): The email of the user. 455 - access_level (str): The access level to grant to the user. 456 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 457 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 458 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 459 the workspace. Defaults to `False` 460 461 **Returns:** 462 - requests.Response: The response from the request. 463 """ 464 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 465 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 466 payload = { 467 "email": email, 468 "accessLevel": access_level, 469 "canShare": can_share, 470 "canCompute": can_compute, 471 } 472 logging.info( 473 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 474 response = self.request_util.run_request( 475 uri=url, 476 method=PATCH, 477 content_type="application/json", 478 data="[" + json.dumps(payload) + "]" 479 ) 480 481 if response.json()["usersNotFound"] and not invite_users_not_found: 482 # Will be a list of one user 483 user_not_found = response.json()["usersNotFound"][0] 484 raise Exception( 485 f'The user {user_not_found["email"]} was not found and access was not updated' 486 ) 487 return response 488 489 @deprecated( 490 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 491 ) 492 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 493 """ 494 Update the metadata for a library dataset. 495 496 **Args:** 497 - library_metadata (dict): The metadata to update. 498 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 499 500 **Returns:** 501 - requests.Response: The response from the request. 502 """ 503 acl = f"{TERRA_LINK}/library/{self.billing_project}/{self.workspace_name}" + \ 504 f"/metadata?validate={str(validate).lower()}" 505 return self.request_util.run_request( 506 uri=acl, 507 method=PUT, 508 data=json.dumps(library_metadata) 509 ) 510 511 def update_multiple_users_acl( 512 self, acl_list: list[dict], invite_users_not_found: bool = False 513 ) -> requests.Response: 514 """ 515 Update the access control list (ACL) for multiple users in the workspace. 516 517 **Args:** 518 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 519 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 520 the workspace. Defaults to `False` 521 522 **Returns:** 523 - requests.Response: The response from the request. 524 """ 525 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 526 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 527 logging.info( 528 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 529 response = self.request_util.run_request( 530 uri=url, 531 method=PATCH, 532 content_type="application/json", 533 data=json.dumps(acl_list) 534 ) 535 536 if response.json()["usersNotFound"] and not invite_users_not_found: 537 # Will be a list of one user 538 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 539 raise Exception( 540 f"The following users were not found and access was not updated: {users_not_found}" 541 ) 542 return response 543 544 def create_workspace( 545 self, 546 auth_domain: list[dict] = [], 547 attributes: dict = {}, 548 continue_if_exists: bool = False, 549 ) -> requests.Response: 550 """ 551 Create a new workspace in Terra. 552 553 **Args:** 554 - auth_domain (list[dict], optional): A list of authorization domains. Should look 555 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 556 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 557 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 558 559 **Returns:** 560 - requests.Response: The response from the request. 561 """ 562 payload = { 563 "namespace": self.billing_project, 564 "name": self.workspace_name, 565 "authorizationDomain": auth_domain, 566 "attributes": attributes, 567 "cloudPlatform": GCP 568 } 569 # If workspace already exists then continue if exists 570 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 571 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 572 response = self.request_util.run_request( 573 uri=f"{TERRA_LINK}/workspaces", 574 method=POST, 575 content_type="application/json", 576 data=json.dumps(payload), 577 accept_return_codes=accept_return_codes 578 ) 579 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 580 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 581 return response 582 583 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 584 """ 585 Create an ingest dictionary for workspace attributes. 586 587 **Args:** 588 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 589 590 **Returns:** 591 - list[dict]: A list of dictionaries containing the workspace attributes. 592 """ 593 # If not provided then call API to get it 594 workspace_attributes = ( 595 workspace_attributes if workspace_attributes 596 else self.get_workspace_info().json()["workspace"]["attributes"] 597 ) 598 599 ingest_dict = [] 600 for key, value in workspace_attributes.items(): 601 # If value is dict just use 'items' as value 602 if isinstance(value, dict): 603 value = value.get("items") 604 # If value is list convert to comma separated string 605 if isinstance(value, list): 606 value = ", ".join(value) 607 ingest_dict.append( 608 { 609 "attribute": key, 610 "value": str(value) if value else None 611 } 612 ) 613 return ingest_dict 614 615 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 616 """ 617 Upload metadata to the workspace table. 618 619 **Args:** 620 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 621 622 **Returns:** 623 - requests.Response: The response from the request. 624 """ 625 endpoint = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 626 data = {"entities": open(entities_tsv, "rb")} 627 return self.request_util.upload_file( 628 uri=endpoint, 629 data=data 630 ) 631 632 def get_workspace_workflows(self) -> requests.Response: 633 """ 634 Get the workflows for the workspace. 635 636 **Returns:** 637 - requests.Response: The response from the request. 638 """ 639 uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 640 return self.request_util.run_request( 641 uri=uri, 642 method=GET 643 ) 644 645 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 646 """ 647 Import a workflow into the workspace. 648 649 **Args:** 650 - workflow_dict (dict): The dictionary containing the workflow information. 651 - continue_if_exists (bool, optional): Whether to continue if the workflow 652 already exists. Defaults to `False`. 653 654 **Returns:** 655 - requests.Response: The response from the request. 656 """ 657 uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 658 workflow_json = json.dumps(workflow_dict) 659 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 660 return self.request_util.run_request( 661 uri=uri, 662 method=POST, 663 data=workflow_json, 664 content_type="application/json", 665 accept_return_codes=accept_return_codes 666 ) 667 668 def delete_workspace(self) -> requests.Response: 669 """ 670 Delete a Terra workspace. 671 672 **Returns:** 673 - requests.Response: The response from the request. 674 """ 675 return self.request_util.run_request( 676 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}", 677 method=DELETE 678 ) 679 680 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 681 """ 682 Update the attributes for the workspace. 683 684 **Args:** 685 - attributes (dict): The attributes to update. 686 687 **Returns:** 688 - requests.Response: The response from the request. 689 """ 690 return self.request_util.run_request( 691 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 692 method=PATCH, 693 data=json.dumps(attributes), 694 content_type="application/json" 695 ) 696 697 def leave_workspace( 698 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 699 ) -> requests.Response: 700 """ 701 Leave a workspace. If workspace ID not supplied, will look it up. 702 703 **Args:** 704 - workspace_id (str, optional): The workspace ID. Defaults to None. 705 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 706 Defaults to `False`. 707 708 **Returns:** 709 - requests.Response: The response from the request. 710 """ 711 if not workspace_id: 712 workspace_info = self.get_workspace_info().json() 713 workspace_id = workspace_info['workspace']['workspaceId'] 714 accepted_return_code = [403] if ignore_direct_access_error else [] 715 716 res = self.request_util.run_request( 717 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 718 method=DELETE, 719 accept_return_codes=accepted_return_code 720 ) 721 if (res.status_code == 403 722 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 723 logging.info( 724 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 725 f"access to the workspace (they could be an owner on the billing project)" 726 ) 727 return res 728 729 def change_workspace_public_setting(self, public: bool) -> requests.Response: 730 """ 731 Change a workspace's public setting. 732 733 **Args:** 734 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 735 public, `False` otherwise. 736 737 **Returns:** 738 - requests.Response: The response from the request. 739 """ 740 body = [ 741 { 742 "settingType": "PubliclyReadable", 743 "config": { 744 "enabled": public 745 } 746 } 747 ] 748 return self.request_util.run_request( 749 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 750 method=PUT, 751 content_type="application/json", 752 data=json.dumps(body) 753 ) 754 755 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 756 """ 757 Check if a workspace is public. 758 759 **Args:** 760 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 761 it up if not provided. Defaults to None. 762 763 **Returns:** 764 - requests.Response: The response from the request. 765 """ 766 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 767 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 768 return self.request_util.run_request( 769 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 770 method=GET 771 ) 772 773 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 774 """Delete an entire entity table from a Terra workspace. 775 776 **Args:** 777 - entity_to_delete (str): The name of the entity table to delete. 778 779 **Returns:** 780 - requests.Response: The response from the request. 781 """ 782 response = self.request_util.run_request( 783 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", 784 method=DELETE 785 ) 786 if response.status_code == 204: 787 logging.info( 788 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 789 f"'{self.billing_project}/{self.workspace_name}'" 790 ) 791 else: 792 logging.error( 793 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 794 f"table: {response.text}" 795 ) 796 return response 797 798 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 799 """Save an entity table version in a Terra workspace. 800 801 **Args:** 802 - entity_type (str): The name of the entity table to save a new version for 803 - version_name (str): The name of the new version 804 """ 805 # Get the workspace metrics 806 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 807 file_name = f"{entity_type}.json" 808 # Write the workspace metrics to a JSON file 809 with open(file_name, "w") as json_file: 810 json.dump(workspace_metrics, json_file) 811 812 # Create a zip file with the same naming convention that Terra backend uses 813 timestamp_ms = int(time.time() * 1000) 814 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 815 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 816 zipf.write(file_name, arcname=f"json/{file_name}") 817 818 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 819 workspace_info = self.get_workspace_info().json() 820 path_to_upload_to = os.path.join( 821 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 822 ) 823 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 824 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 825 try: 826 active_account = gcp_util.get_active_gcloud_account() 827 except Exception as e: 828 active_account = workspace_info["workspace"]["createdBy"] 829 logging.error( 830 f"Encountered the following exception while attempting to get current GCP account: {e}. " 831 f"Will set the owner of the new metadata version as the workspace creator instead." 832 ) 833 gcp_util.upload_blob( 834 source_file=zip_file_name, 835 destination_path=path_to_upload_to, 836 custom_metadata={ 837 "createdBy": active_account, 838 "entityType": entity_type, 839 "timestamp": timestamp_ms, 840 "description": version_name, 841 } 842 ) 843 844 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 845 """ 846 Add a user comment to a submission in Terra. 847 848 **Args:** 849 - submission_id (str): The ID of the submission to add a comment to. 850 - user_comment (str): The comment to add to the submission. 851 852 **Returns:** 853 - requests.Response: The response from the request. 854 """ 855 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 856 return self.request_util.run_request( 857 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 858 method=PATCH, 859 content_type="application/json", 860 data=json.dumps({"userComment": user_comment}), 861 ) 862 863 def initiate_submission( 864 self, 865 method_config_namespace: str, 866 method_config_name: str, 867 entity_type: str, 868 entity_name: str, 869 expression: str, 870 user_comment: Optional[str], 871 use_call_cache: bool = True 872 ) -> requests.Response: 873 """ 874 Initiate a submission within a Terra workspace. 875 876 Note - the workflow being initiated MUST already be imported into the workspace. 877 878 **Args:** 879 - method_config_namespace (str): The namespace of the method configuration. 880 - method_config_name (str): The name of the method configuration to use for the submission 881 (i.e. the workflow name). 882 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 883 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 884 "sample_set_1"). 885 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 886 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 887 be launched PER SAMPLE, the expression should be `this.samples`. 888 - user_comment (str, optional): The user comment to add to the submission. 889 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 890 891 **Returns:** 892 - requests.Response: The response from the request. 893 """ 894 payload = { 895 "methodConfigurationNamespace": method_config_namespace, 896 "methodConfigurationName": method_config_name, 897 "entityType": entity_type, 898 "entityName": entity_name, 899 "expression": expression, 900 "useCallCache": use_call_cache, 901 "deleteIntermediateOutputFiles": False, 902 "useReferenceDisks": False, 903 "ignoreEmptyOutputs": False, 904 } 905 if user_comment: 906 payload["userComment"] = user_comment 907 908 return self.request_util.run_request( 909 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 910 method=POST, 911 content_type="application/json", 912 data=json.dumps(payload), 913 ) 914 915 def retry_failed_submission(self, submission_id: str) -> requests.Response: 916 """ 917 Retry a failed submission in Terra. 918 919 **Args:** 920 - submission_id (str): The ID of the submission to retry. 921 922 **Returns:** 923 - requests.Response: The response from the request. 924 """ 925 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 926 payload = {"retryType": "Failed"} 927 logging.info( 928 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 929 ) 930 return self.request_util.run_request( 931 uri=url, 932 method=POST, 933 content_type="application/json", 934 data=json.dumps(payload) 935 ) 936 937 def get_submission_status(self, submission_id: str) -> requests.Response: 938 """ 939 Get the status of a submission in Terra. 940 941 **Args:** 942 - submission_id (str): The ID of the submission. 943 944 **Returns:** 945 - requests.Response: The response from the request. 946 """ 947 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 948 logging.info( 949 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 950 ) 951 return self.request_util.run_request( 952 uri=url, 953 method=GET 954 )
32class Terra: 33 """Class for generic Terra utilities.""" 34 35 def __init__(self, request_util: RunRequest): 36 """ 37 Initialize the Terra class. 38 39 **Args:** 40 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 41 request utility class to handle HTTP requests. 42 """ 43 self.request_util = request_util 44 """@private""" 45 46 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 47 """ 48 Fetch the list of accessible workspaces. 49 50 **Args:** 51 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 52 53 **Returns:** 54 - requests.Response: The response from the request. 55 """ 56 fields_str = "fields=" + ",".join(fields) if fields else "" 57 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 58 return self.request_util.run_request( 59 uri=url, 60 method=GET 61 ) 62 63 def get_pet_account_json(self) -> requests.Response: 64 """ 65 Get the service account JSON. 66 67 **Returns:** 68 - requests.Response: The response from the request. 69 """ 70 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 71 return self.request_util.run_request( 72 uri=url, 73 method=GET 74 )
Class for generic Terra utilities.
35 def __init__(self, request_util: RunRequest): 36 """ 37 Initialize the Terra class. 38 39 **Args:** 40 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 41 request utility class to handle HTTP requests. 42 """ 43 self.request_util = request_util 44 """@private"""
Initialize the Terra class.
Args:
- request_util (
ops_utils.request_util.RunRequest
): An instance of a request utility class to handle HTTP requests.
46 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 47 """ 48 Fetch the list of accessible workspaces. 49 50 **Args:** 51 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 52 53 **Returns:** 54 - requests.Response: The response from the request. 55 """ 56 fields_str = "fields=" + ",".join(fields) if fields else "" 57 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 58 return self.request_util.run_request( 59 uri=url, 60 method=GET 61 )
Fetch the list of accessible workspaces.
Args:
- fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
Returns:
- requests.Response: The response from the request.
63 def get_pet_account_json(self) -> requests.Response: 64 """ 65 Get the service account JSON. 66 67 **Returns:** 68 - requests.Response: The response from the request. 69 """ 70 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 71 return self.request_util.run_request( 72 uri=url, 73 method=GET 74 )
Get the service account JSON.
Returns:
- requests.Response: The response from the request.
77class TerraGroups: 78 """A class to manage Terra groups and their memberships.""" 79 80 GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN] 81 """@private""" 82 CONFLICT_STATUS_CODE = 409 83 """@private""" 84 85 def __init__(self, request_util: RunRequest): 86 """ 87 Initialize the TerraGroups class. 88 89 **Args:** 90 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 91 utility class to handle HTTP requests. 92 """ 93 self.request_util = request_util 94 """@private""" 95 96 def _check_role(self, role: str) -> None: 97 """ 98 Check if the role is valid. 99 100 Args: 101 role (str): The role to check. 102 103 Raises: 104 ValueError: If the role is not one of the allowed options. 105 """ 106 if role not in self.GROUP_MEMBERSHIP_OPTIONS: 107 raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}") 108 109 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 110 """ 111 Remove a user from a group. 112 113 **Args:** 114 - group (str): The name of the group. 115 - email (str): The email of the user to remove. 116 - role (str): The role of the user in the group 117 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 118 119 **Returns:** 120 - requests.Response: The response from the request. 121 """ 122 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 123 self._check_role(role) 124 logging.info(f"Removing {email} from group {group}") 125 return self.request_util.run_request( 126 uri=url, 127 method=DELETE 128 ) 129 130 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 131 """ 132 Create a new group. 133 134 **Args:** 135 - group_name (str): The name of the group to create. 136 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 137 138 **Returns:** 139 - requests.Response: The response from the request. 140 """ 141 url = f"{SAM_LINK}/groups/v1/{group_name}" 142 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 143 response = self.request_util.run_request( 144 uri=url, 145 method=POST, 146 accept_return_codes=accept_return_codes 147 ) 148 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 149 logging.info(f"Group {group_name} already exists. Continuing.") 150 return response 151 else: 152 logging.info(f"Created group {group_name}") 153 return response 154 155 def delete_group(self, group_name: str) -> requests.Response: 156 """ 157 Delete a group. 158 159 **Args:** 160 - group_name (str): The name of the group to delete. 161 162 **Returns:** 163 - requests.Response: The response from the request. 164 """ 165 url = f"{SAM_LINK}/groups/v1/{group_name}" 166 logging.info(f"Deleting group {group_name}") 167 return self.request_util.run_request( 168 uri=url, 169 method=DELETE 170 ) 171 172 def add_user_to_group( 173 self, group: str, email: str, role: str, continue_if_exists: bool = False 174 ) -> requests.Response: 175 """ 176 Add a user to a group. 177 178 **Args:** 179 - group (str): The name of the group. 180 - email (str): The email of the user to add. 181 - role (str): The role of the user in the group 182 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 183 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 184 Defaults to `False`. 185 186 **Returns:** 187 - requests.Response: The response from the request. 188 """ 189 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 190 self._check_role(role) 191 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 192 logging.info(f"Adding {email} to group {group} as {role}") 193 return self.request_util.run_request( 194 uri=url, 195 method=PUT, 196 accept_return_codes=accept_return_codes 197 )
A class to manage Terra groups and their memberships.
85 def __init__(self, request_util: RunRequest): 86 """ 87 Initialize the TerraGroups class. 88 89 **Args:** 90 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 91 utility class to handle HTTP requests. 92 """ 93 self.request_util = request_util 94 """@private"""
Initialize the TerraGroups class.
Args:
- request_util (
ops_utils.request_util.RunRequest
): An instance of a request utility class to handle HTTP requests.
109 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 110 """ 111 Remove a user from a group. 112 113 **Args:** 114 - group (str): The name of the group. 115 - email (str): The email of the user to remove. 116 - role (str): The role of the user in the group 117 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 118 119 **Returns:** 120 - requests.Response: The response from the request. 121 """ 122 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 123 self._check_role(role) 124 logging.info(f"Removing {email} from group {group}") 125 return self.request_util.run_request( 126 uri=url, 127 method=DELETE 128 )
Remove a user from a group.
Args:
- group (str): The name of the group.
- email (str): The email of the user to remove.
- role (str): The role of the user in the group
(must be one of
ops_utils.terra_utils.MEMBER
orops_utils.terra_utils.ADMIN
).
Returns:
- requests.Response: The response from the request.
130 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 131 """ 132 Create a new group. 133 134 **Args:** 135 - group_name (str): The name of the group to create. 136 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 137 138 **Returns:** 139 - requests.Response: The response from the request. 140 """ 141 url = f"{SAM_LINK}/groups/v1/{group_name}" 142 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 143 response = self.request_util.run_request( 144 uri=url, 145 method=POST, 146 accept_return_codes=accept_return_codes 147 ) 148 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 149 logging.info(f"Group {group_name} already exists. Continuing.") 150 return response 151 else: 152 logging.info(f"Created group {group_name}") 153 return response
Create a new group.
Args:
- group_name (str): The name of the group to create.
- continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
155 def delete_group(self, group_name: str) -> requests.Response: 156 """ 157 Delete a group. 158 159 **Args:** 160 - group_name (str): The name of the group to delete. 161 162 **Returns:** 163 - requests.Response: The response from the request. 164 """ 165 url = f"{SAM_LINK}/groups/v1/{group_name}" 166 logging.info(f"Deleting group {group_name}") 167 return self.request_util.run_request( 168 uri=url, 169 method=DELETE 170 )
Delete a group.
Args:
- group_name (str): The name of the group to delete.
Returns:
- requests.Response: The response from the request.
172 def add_user_to_group( 173 self, group: str, email: str, role: str, continue_if_exists: bool = False 174 ) -> requests.Response: 175 """ 176 Add a user to a group. 177 178 **Args:** 179 - group (str): The name of the group. 180 - email (str): The email of the user to add. 181 - role (str): The role of the user in the group 182 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 183 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 184 Defaults to `False`. 185 186 **Returns:** 187 - requests.Response: The response from the request. 188 """ 189 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 190 self._check_role(role) 191 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 192 logging.info(f"Adding {email} to group {group} as {role}") 193 return self.request_util.run_request( 194 uri=url, 195 method=PUT, 196 accept_return_codes=accept_return_codes 197 )
Add a user to a group.
Args:
- group (str): The name of the group.
- email (str): The email of the user to add.
- role (str): The role of the user in the group
(must be one of
ops_utils.terra_utils.MEMBER
orops_utils.terra_utils.ADMIN
). - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
Defaults to
False
.
Returns:
- requests.Response: The response from the request.
200class TerraWorkspace: 201 """Terra workspace class to manage workspaces and their attributes.""" 202 203 CONFLICT_STATUS_CODE = 409 204 """@private""" 205 206 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest): 207 """ 208 Initialize the TerraWorkspace class. 209 210 **Args:** 211 - billing_project (str): The billing project associated with the workspace. 212 - workspace_name (str): The name of the workspace. 213 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 214 request utility class to handle HTTP requests. 215 """ 216 self.billing_project = billing_project 217 """@private""" 218 self.workspace_name = workspace_name 219 """@private""" 220 self.workspace_id = None 221 """@private""" 222 self.resource_id = None 223 """@private""" 224 self.storage_container = None 225 """@private""" 226 self.bucket = None 227 """@private""" 228 self.wds_url = None 229 """@private""" 230 self.account_url: Optional[str] = None 231 """@private""" 232 self.request_util = request_util 233 """@private""" 234 235 def __repr__(self) -> str: 236 """ 237 Return a string representation of the TerraWorkspace instance. 238 239 Returns: 240 str: The string representation of the TerraWorkspace instance. 241 """ 242 return f"{self.billing_project}/{self.workspace_name}" 243 244 def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000) -> Any: 245 """ 246 Yield all entity metrics from the workspace. 247 248 Args: 249 entity (str): The type of entity to query. 250 total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000. 251 252 Yields: 253 Any: The JSON response containing entity metrics. 254 """ 255 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}" # noqa: E501 256 response = self.request_util.run_request( 257 uri=url, 258 method=GET, 259 content_type="application/json" 260 ) 261 first_page_json = response.json() 262 yield first_page_json 263 total_pages = first_page_json["resultMetadata"]["filteredPageCount"] 264 logging.info( 265 f"Looping through {total_pages} pages of data") 266 267 for page in range(2, total_pages + 1): 268 logging.info(f"Getting page {page} of {total_pages}") 269 next_page = self.request_util.run_request( 270 uri=url, 271 method=GET, 272 content_type="application/json", 273 params={"page": page} 274 ) 275 yield next_page.json() 276 277 @staticmethod 278 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 279 """Check that all headers follow the standards required by TDR. 280 281 **Args:** 282 - table_name (str): The name of the Terra table. 283 - headers (list[str]): The headers of the Terra table to validate. 284 285 **Raises:** 286 - ValueError if any headers are considered invalid by TDR standards 287 """ 288 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 289 tdr_max_header_length = 63 290 291 headers_containing_too_many_characters = [] 292 headers_contain_invalid_characters = [] 293 294 for header in headers: 295 if len(header) > tdr_max_header_length: 296 headers_containing_too_many_characters.append(header) 297 if not re.match(tdr_header_allowed_pattern, header): 298 headers_contain_invalid_characters.append(header) 299 300 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 301 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 302 header naming.""" 303 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 304 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 305 allowed in TDR is {tdr_max_header_length}.\n""" 306 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 307 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 308 only contain numbers, letters, and underscore characters.\n""" 309 310 error_to_report = "" 311 if headers_containing_too_many_characters: 312 error_to_report += too_many_characters_error_message 313 if headers_contain_invalid_characters: 314 error_to_report += invalid_characters_error_message 315 if error_to_report: 316 error_to_report += base_error_message 317 raise ValueError(error_to_report) 318 319 def get_workspace_info(self) -> requests.Response: 320 """ 321 Get workspace information. 322 323 **Returns:** 324 - requests.Response: The response from the request. 325 """ 326 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}" 327 logging.info( 328 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 329 return self.request_util.run_request(uri=url, method=GET) 330 331 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]: 332 """ 333 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 334 335 **Args:** 336 - entity_type (str): The type of entity to get metrics for. 337 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 338 339 **Returns:** 340 - list[dict]: A list of dictionaries containing entity metrics. 341 """ 342 results = [] 343 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 344 345 for page in self._yield_all_entity_metrics(entity=entity_type): 346 results.extend(page["results"]) 347 348 # If remove_dicts is True, remove dictionaries from the workspace metrics 349 if remove_dicts: 350 for row in results: 351 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 352 return results 353 354 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 355 """ 356 Get specific entity metrics for a given entity type and name. 357 358 **Args:** 359 - entity_type (str): The type of entity to get metrics for. 360 - entity_name (str): The name of the entity to get metrics for. 361 362 **Returns:** 363 - requests.Response: The response from the request. 364 """ 365 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" 366 return self.request_util.run_request(uri=url, method=GET) 367 368 def _remove_dict_from_attributes(self, attributes: dict) -> dict: 369 """ 370 Remove dictionaries from the attributes. 371 372 Args: 373 attributes (dict): The attributes to remove dictionaries from. 374 375 Returns: 376 dict: The updated attributes with no dictionaries. 377 """ 378 for key, value in attributes.items(): 379 attributes[key] = self._remove_dict_from_cell(value) 380 return attributes 381 382 def _remove_dict_from_cell(self, cell_value: Any) -> Any: 383 """ 384 Remove a dictionary from a cell. 385 386 Args: 387 cell_value (Any): The dictionary to remove. 388 389 Returns: 390 Any: The updated cell with no dictionaries. 391 """ 392 if isinstance(cell_value, dict): 393 entity_name = cell_value.get("entityName") 394 # If the cell value is a dictionary, check if it has an entityName key 395 if entity_name: 396 # If the cell value is a dictionary with an entityName key, return the entityName 397 return entity_name 398 entity_list = cell_value.get("items") 399 if entity_list or entity_list == []: 400 # If the cell value is a list of dictionaries, recursively call this function on each dictionary 401 return [ 402 self._remove_dict_from_cell(entity) for entity in entity_list 403 ] 404 return cell_value 405 return cell_value 406 407 def get_workspace_bucket(self) -> str: 408 """ 409 Get the workspace bucket name. Does not include the `gs://` prefix. 410 411 **Returns:** 412 - str: The bucket name. 413 """ 414 return self.get_workspace_info().json()["workspace"]["bucketName"] 415 416 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 417 """ 418 Get workspace entity information. 419 420 **Args:** 421 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 422 423 **Returns:** 424 - requests.Response: The response from the request. 425 """ 426 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 427 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 428 return self.request_util.run_request(uri=url, method=GET) 429 430 def get_workspace_acl(self) -> requests.Response: 431 """ 432 Get the workspace access control list (ACL). 433 434 **Returns:** 435 - requests.Response: The response from the request. 436 """ 437 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 438 return self.request_util.run_request( 439 uri=url, 440 method=GET 441 ) 442 443 def update_user_acl( 444 self, 445 email: str, 446 access_level: str, 447 can_share: bool = False, 448 can_compute: bool = False, 449 invite_users_not_found: bool = False, 450 ) -> requests.Response: 451 """ 452 Update the access control list (ACL) for a user in the workspace. 453 454 **Args:** 455 - email (str): The email of the user. 456 - access_level (str): The access level to grant to the user. 457 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 458 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 459 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 460 the workspace. Defaults to `False` 461 462 **Returns:** 463 - requests.Response: The response from the request. 464 """ 465 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 466 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 467 payload = { 468 "email": email, 469 "accessLevel": access_level, 470 "canShare": can_share, 471 "canCompute": can_compute, 472 } 473 logging.info( 474 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 475 response = self.request_util.run_request( 476 uri=url, 477 method=PATCH, 478 content_type="application/json", 479 data="[" + json.dumps(payload) + "]" 480 ) 481 482 if response.json()["usersNotFound"] and not invite_users_not_found: 483 # Will be a list of one user 484 user_not_found = response.json()["usersNotFound"][0] 485 raise Exception( 486 f'The user {user_not_found["email"]} was not found and access was not updated' 487 ) 488 return response 489 490 @deprecated( 491 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 492 ) 493 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 494 """ 495 Update the metadata for a library dataset. 496 497 **Args:** 498 - library_metadata (dict): The metadata to update. 499 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 500 501 **Returns:** 502 - requests.Response: The response from the request. 503 """ 504 acl = f"{TERRA_LINK}/library/{self.billing_project}/{self.workspace_name}" + \ 505 f"/metadata?validate={str(validate).lower()}" 506 return self.request_util.run_request( 507 uri=acl, 508 method=PUT, 509 data=json.dumps(library_metadata) 510 ) 511 512 def update_multiple_users_acl( 513 self, acl_list: list[dict], invite_users_not_found: bool = False 514 ) -> requests.Response: 515 """ 516 Update the access control list (ACL) for multiple users in the workspace. 517 518 **Args:** 519 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 520 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 521 the workspace. Defaults to `False` 522 523 **Returns:** 524 - requests.Response: The response from the request. 525 """ 526 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 527 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 528 logging.info( 529 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 530 response = self.request_util.run_request( 531 uri=url, 532 method=PATCH, 533 content_type="application/json", 534 data=json.dumps(acl_list) 535 ) 536 537 if response.json()["usersNotFound"] and not invite_users_not_found: 538 # Will be a list of one user 539 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 540 raise Exception( 541 f"The following users were not found and access was not updated: {users_not_found}" 542 ) 543 return response 544 545 def create_workspace( 546 self, 547 auth_domain: list[dict] = [], 548 attributes: dict = {}, 549 continue_if_exists: bool = False, 550 ) -> requests.Response: 551 """ 552 Create a new workspace in Terra. 553 554 **Args:** 555 - auth_domain (list[dict], optional): A list of authorization domains. Should look 556 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 557 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 558 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 559 560 **Returns:** 561 - requests.Response: The response from the request. 562 """ 563 payload = { 564 "namespace": self.billing_project, 565 "name": self.workspace_name, 566 "authorizationDomain": auth_domain, 567 "attributes": attributes, 568 "cloudPlatform": GCP 569 } 570 # If workspace already exists then continue if exists 571 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 572 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 573 response = self.request_util.run_request( 574 uri=f"{TERRA_LINK}/workspaces", 575 method=POST, 576 content_type="application/json", 577 data=json.dumps(payload), 578 accept_return_codes=accept_return_codes 579 ) 580 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 581 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 582 return response 583 584 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 585 """ 586 Create an ingest dictionary for workspace attributes. 587 588 **Args:** 589 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 590 591 **Returns:** 592 - list[dict]: A list of dictionaries containing the workspace attributes. 593 """ 594 # If not provided then call API to get it 595 workspace_attributes = ( 596 workspace_attributes if workspace_attributes 597 else self.get_workspace_info().json()["workspace"]["attributes"] 598 ) 599 600 ingest_dict = [] 601 for key, value in workspace_attributes.items(): 602 # If value is dict just use 'items' as value 603 if isinstance(value, dict): 604 value = value.get("items") 605 # If value is list convert to comma separated string 606 if isinstance(value, list): 607 value = ", ".join(value) 608 ingest_dict.append( 609 { 610 "attribute": key, 611 "value": str(value) if value else None 612 } 613 ) 614 return ingest_dict 615 616 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 617 """ 618 Upload metadata to the workspace table. 619 620 **Args:** 621 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 622 623 **Returns:** 624 - requests.Response: The response from the request. 625 """ 626 endpoint = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 627 data = {"entities": open(entities_tsv, "rb")} 628 return self.request_util.upload_file( 629 uri=endpoint, 630 data=data 631 ) 632 633 def get_workspace_workflows(self) -> requests.Response: 634 """ 635 Get the workflows for the workspace. 636 637 **Returns:** 638 - requests.Response: The response from the request. 639 """ 640 uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 641 return self.request_util.run_request( 642 uri=uri, 643 method=GET 644 ) 645 646 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 647 """ 648 Import a workflow into the workspace. 649 650 **Args:** 651 - workflow_dict (dict): The dictionary containing the workflow information. 652 - continue_if_exists (bool, optional): Whether to continue if the workflow 653 already exists. Defaults to `False`. 654 655 **Returns:** 656 - requests.Response: The response from the request. 657 """ 658 uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 659 workflow_json = json.dumps(workflow_dict) 660 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 661 return self.request_util.run_request( 662 uri=uri, 663 method=POST, 664 data=workflow_json, 665 content_type="application/json", 666 accept_return_codes=accept_return_codes 667 ) 668 669 def delete_workspace(self) -> requests.Response: 670 """ 671 Delete a Terra workspace. 672 673 **Returns:** 674 - requests.Response: The response from the request. 675 """ 676 return self.request_util.run_request( 677 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}", 678 method=DELETE 679 ) 680 681 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 682 """ 683 Update the attributes for the workspace. 684 685 **Args:** 686 - attributes (dict): The attributes to update. 687 688 **Returns:** 689 - requests.Response: The response from the request. 690 """ 691 return self.request_util.run_request( 692 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 693 method=PATCH, 694 data=json.dumps(attributes), 695 content_type="application/json" 696 ) 697 698 def leave_workspace( 699 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 700 ) -> requests.Response: 701 """ 702 Leave a workspace. If workspace ID not supplied, will look it up. 703 704 **Args:** 705 - workspace_id (str, optional): The workspace ID. Defaults to None. 706 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 707 Defaults to `False`. 708 709 **Returns:** 710 - requests.Response: The response from the request. 711 """ 712 if not workspace_id: 713 workspace_info = self.get_workspace_info().json() 714 workspace_id = workspace_info['workspace']['workspaceId'] 715 accepted_return_code = [403] if ignore_direct_access_error else [] 716 717 res = self.request_util.run_request( 718 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 719 method=DELETE, 720 accept_return_codes=accepted_return_code 721 ) 722 if (res.status_code == 403 723 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 724 logging.info( 725 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 726 f"access to the workspace (they could be an owner on the billing project)" 727 ) 728 return res 729 730 def change_workspace_public_setting(self, public: bool) -> requests.Response: 731 """ 732 Change a workspace's public setting. 733 734 **Args:** 735 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 736 public, `False` otherwise. 737 738 **Returns:** 739 - requests.Response: The response from the request. 740 """ 741 body = [ 742 { 743 "settingType": "PubliclyReadable", 744 "config": { 745 "enabled": public 746 } 747 } 748 ] 749 return self.request_util.run_request( 750 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 751 method=PUT, 752 content_type="application/json", 753 data=json.dumps(body) 754 ) 755 756 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 757 """ 758 Check if a workspace is public. 759 760 **Args:** 761 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 762 it up if not provided. Defaults to None. 763 764 **Returns:** 765 - requests.Response: The response from the request. 766 """ 767 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 768 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 769 return self.request_util.run_request( 770 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 771 method=GET 772 ) 773 774 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 775 """Delete an entire entity table from a Terra workspace. 776 777 **Args:** 778 - entity_to_delete (str): The name of the entity table to delete. 779 780 **Returns:** 781 - requests.Response: The response from the request. 782 """ 783 response = self.request_util.run_request( 784 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", 785 method=DELETE 786 ) 787 if response.status_code == 204: 788 logging.info( 789 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 790 f"'{self.billing_project}/{self.workspace_name}'" 791 ) 792 else: 793 logging.error( 794 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 795 f"table: {response.text}" 796 ) 797 return response 798 799 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 800 """Save an entity table version in a Terra workspace. 801 802 **Args:** 803 - entity_type (str): The name of the entity table to save a new version for 804 - version_name (str): The name of the new version 805 """ 806 # Get the workspace metrics 807 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 808 file_name = f"{entity_type}.json" 809 # Write the workspace metrics to a JSON file 810 with open(file_name, "w") as json_file: 811 json.dump(workspace_metrics, json_file) 812 813 # Create a zip file with the same naming convention that Terra backend uses 814 timestamp_ms = int(time.time() * 1000) 815 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 816 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 817 zipf.write(file_name, arcname=f"json/{file_name}") 818 819 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 820 workspace_info = self.get_workspace_info().json() 821 path_to_upload_to = os.path.join( 822 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 823 ) 824 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 825 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 826 try: 827 active_account = gcp_util.get_active_gcloud_account() 828 except Exception as e: 829 active_account = workspace_info["workspace"]["createdBy"] 830 logging.error( 831 f"Encountered the following exception while attempting to get current GCP account: {e}. " 832 f"Will set the owner of the new metadata version as the workspace creator instead." 833 ) 834 gcp_util.upload_blob( 835 source_file=zip_file_name, 836 destination_path=path_to_upload_to, 837 custom_metadata={ 838 "createdBy": active_account, 839 "entityType": entity_type, 840 "timestamp": timestamp_ms, 841 "description": version_name, 842 } 843 ) 844 845 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 846 """ 847 Add a user comment to a submission in Terra. 848 849 **Args:** 850 - submission_id (str): The ID of the submission to add a comment to. 851 - user_comment (str): The comment to add to the submission. 852 853 **Returns:** 854 - requests.Response: The response from the request. 855 """ 856 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 857 return self.request_util.run_request( 858 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 859 method=PATCH, 860 content_type="application/json", 861 data=json.dumps({"userComment": user_comment}), 862 ) 863 864 def initiate_submission( 865 self, 866 method_config_namespace: str, 867 method_config_name: str, 868 entity_type: str, 869 entity_name: str, 870 expression: str, 871 user_comment: Optional[str], 872 use_call_cache: bool = True 873 ) -> requests.Response: 874 """ 875 Initiate a submission within a Terra workspace. 876 877 Note - the workflow being initiated MUST already be imported into the workspace. 878 879 **Args:** 880 - method_config_namespace (str): The namespace of the method configuration. 881 - method_config_name (str): The name of the method configuration to use for the submission 882 (i.e. the workflow name). 883 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 884 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 885 "sample_set_1"). 886 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 887 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 888 be launched PER SAMPLE, the expression should be `this.samples`. 889 - user_comment (str, optional): The user comment to add to the submission. 890 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 891 892 **Returns:** 893 - requests.Response: The response from the request. 894 """ 895 payload = { 896 "methodConfigurationNamespace": method_config_namespace, 897 "methodConfigurationName": method_config_name, 898 "entityType": entity_type, 899 "entityName": entity_name, 900 "expression": expression, 901 "useCallCache": use_call_cache, 902 "deleteIntermediateOutputFiles": False, 903 "useReferenceDisks": False, 904 "ignoreEmptyOutputs": False, 905 } 906 if user_comment: 907 payload["userComment"] = user_comment 908 909 return self.request_util.run_request( 910 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 911 method=POST, 912 content_type="application/json", 913 data=json.dumps(payload), 914 ) 915 916 def retry_failed_submission(self, submission_id: str) -> requests.Response: 917 """ 918 Retry a failed submission in Terra. 919 920 **Args:** 921 - submission_id (str): The ID of the submission to retry. 922 923 **Returns:** 924 - requests.Response: The response from the request. 925 """ 926 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 927 payload = {"retryType": "Failed"} 928 logging.info( 929 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 930 ) 931 return self.request_util.run_request( 932 uri=url, 933 method=POST, 934 content_type="application/json", 935 data=json.dumps(payload) 936 ) 937 938 def get_submission_status(self, submission_id: str) -> requests.Response: 939 """ 940 Get the status of a submission in Terra. 941 942 **Args:** 943 - submission_id (str): The ID of the submission. 944 945 **Returns:** 946 - requests.Response: The response from the request. 947 """ 948 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 949 logging.info( 950 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 951 ) 952 return self.request_util.run_request( 953 uri=url, 954 method=GET 955 )
Terra workspace class to manage workspaces and their attributes.
206 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest): 207 """ 208 Initialize the TerraWorkspace class. 209 210 **Args:** 211 - billing_project (str): The billing project associated with the workspace. 212 - workspace_name (str): The name of the workspace. 213 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 214 request utility class to handle HTTP requests. 215 """ 216 self.billing_project = billing_project 217 """@private""" 218 self.workspace_name = workspace_name 219 """@private""" 220 self.workspace_id = None 221 """@private""" 222 self.resource_id = None 223 """@private""" 224 self.storage_container = None 225 """@private""" 226 self.bucket = None 227 """@private""" 228 self.wds_url = None 229 """@private""" 230 self.account_url: Optional[str] = None 231 """@private""" 232 self.request_util = request_util 233 """@private"""
Initialize the TerraWorkspace class.
Args:
- billing_project (str): The billing project associated with the workspace.
- workspace_name (str): The name of the workspace.
- request_util (
ops_utils.request_util.RunRequest
): An instance of a request utility class to handle HTTP requests.
277 @staticmethod 278 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 279 """Check that all headers follow the standards required by TDR. 280 281 **Args:** 282 - table_name (str): The name of the Terra table. 283 - headers (list[str]): The headers of the Terra table to validate. 284 285 **Raises:** 286 - ValueError if any headers are considered invalid by TDR standards 287 """ 288 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 289 tdr_max_header_length = 63 290 291 headers_containing_too_many_characters = [] 292 headers_contain_invalid_characters = [] 293 294 for header in headers: 295 if len(header) > tdr_max_header_length: 296 headers_containing_too_many_characters.append(header) 297 if not re.match(tdr_header_allowed_pattern, header): 298 headers_contain_invalid_characters.append(header) 299 300 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 301 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 302 header naming.""" 303 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 304 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 305 allowed in TDR is {tdr_max_header_length}.\n""" 306 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 307 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 308 only contain numbers, letters, and underscore characters.\n""" 309 310 error_to_report = "" 311 if headers_containing_too_many_characters: 312 error_to_report += too_many_characters_error_message 313 if headers_contain_invalid_characters: 314 error_to_report += invalid_characters_error_message 315 if error_to_report: 316 error_to_report += base_error_message 317 raise ValueError(error_to_report)
Check that all headers follow the standards required by TDR.
Args:
- table_name (str): The name of the Terra table.
- headers (list[str]): The headers of the Terra table to validate.
Raises:
- ValueError if any headers are considered invalid by TDR standards
319 def get_workspace_info(self) -> requests.Response: 320 """ 321 Get workspace information. 322 323 **Returns:** 324 - requests.Response: The response from the request. 325 """ 326 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}" 327 logging.info( 328 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 329 return self.request_util.run_request(uri=url, method=GET)
Get workspace information.
Returns:
- requests.Response: The response from the request.
331 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False) -> list[dict]: 332 """ 333 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 334 335 **Args:** 336 - entity_type (str): The type of entity to get metrics for. 337 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 338 339 **Returns:** 340 - list[dict]: A list of dictionaries containing entity metrics. 341 """ 342 results = [] 343 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 344 345 for page in self._yield_all_entity_metrics(entity=entity_type): 346 results.extend(page["results"]) 347 348 # If remove_dicts is True, remove dictionaries from the workspace metrics 349 if remove_dicts: 350 for row in results: 351 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 352 return results
Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
Args:
- entity_type (str): The type of entity to get metrics for.
- remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to
False
.
Returns:
- list[dict]: A list of dictionaries containing entity metrics.
354 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 355 """ 356 Get specific entity metrics for a given entity type and name. 357 358 **Args:** 359 - entity_type (str): The type of entity to get metrics for. 360 - entity_name (str): The name of the entity to get metrics for. 361 362 **Returns:** 363 - requests.Response: The response from the request. 364 """ 365 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" 366 return self.request_util.run_request(uri=url, method=GET)
Get specific entity metrics for a given entity type and name.
Args:
- entity_type (str): The type of entity to get metrics for.
- entity_name (str): The name of the entity to get metrics for.
Returns:
- requests.Response: The response from the request.
407 def get_workspace_bucket(self) -> str: 408 """ 409 Get the workspace bucket name. Does not include the `gs://` prefix. 410 411 **Returns:** 412 - str: The bucket name. 413 """ 414 return self.get_workspace_info().json()["workspace"]["bucketName"]
Get the workspace bucket name. Does not include the gs://
prefix.
Returns:
- str: The bucket name.
416 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 417 """ 418 Get workspace entity information. 419 420 **Args:** 421 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 422 423 **Returns:** 424 - requests.Response: The response from the request. 425 """ 426 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 427 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 428 return self.request_util.run_request(uri=url, method=GET)
Get workspace entity information.
Args:
- use_cache (bool, optional): Whether to use cache. Defaults to
True
.
Returns:
- requests.Response: The response from the request.
430 def get_workspace_acl(self) -> requests.Response: 431 """ 432 Get the workspace access control list (ACL). 433 434 **Returns:** 435 - requests.Response: The response from the request. 436 """ 437 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 438 return self.request_util.run_request( 439 uri=url, 440 method=GET 441 )
Get the workspace access control list (ACL).
Returns:
- requests.Response: The response from the request.
443 def update_user_acl( 444 self, 445 email: str, 446 access_level: str, 447 can_share: bool = False, 448 can_compute: bool = False, 449 invite_users_not_found: bool = False, 450 ) -> requests.Response: 451 """ 452 Update the access control list (ACL) for a user in the workspace. 453 454 **Args:** 455 - email (str): The email of the user. 456 - access_level (str): The access level to grant to the user. 457 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 458 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 459 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 460 the workspace. Defaults to `False` 461 462 **Returns:** 463 - requests.Response: The response from the request. 464 """ 465 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 466 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 467 payload = { 468 "email": email, 469 "accessLevel": access_level, 470 "canShare": can_share, 471 "canCompute": can_compute, 472 } 473 logging.info( 474 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 475 response = self.request_util.run_request( 476 uri=url, 477 method=PATCH, 478 content_type="application/json", 479 data="[" + json.dumps(payload) + "]" 480 ) 481 482 if response.json()["usersNotFound"] and not invite_users_not_found: 483 # Will be a list of one user 484 user_not_found = response.json()["usersNotFound"][0] 485 raise Exception( 486 f'The user {user_not_found["email"]} was not found and access was not updated' 487 ) 488 return response
Update the access control list (ACL) for a user in the workspace.
Args:
- email (str): The email of the user.
- access_level (str): The access level to grant to the user.
- can_share (bool, optional): Whether the user can share the workspace. Defaults to
False
. - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to
False
. - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
the workspace. Defaults to
False
Returns:
- requests.Response: The response from the request.
490 @deprecated( 491 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 492 ) 493 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 494 """ 495 Update the metadata for a library dataset. 496 497 **Args:** 498 - library_metadata (dict): The metadata to update. 499 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 500 501 **Returns:** 502 - requests.Response: The response from the request. 503 """ 504 acl = f"{TERRA_LINK}/library/{self.billing_project}/{self.workspace_name}" + \ 505 f"/metadata?validate={str(validate).lower()}" 506 return self.request_util.run_request( 507 uri=acl, 508 method=PUT, 509 data=json.dumps(library_metadata) 510 )
Update the metadata for a library dataset.
Args:
- library_metadata (dict): The metadata to update.
- validate (bool, optional): Whether to validate the metadata. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
512 def update_multiple_users_acl( 513 self, acl_list: list[dict], invite_users_not_found: bool = False 514 ) -> requests.Response: 515 """ 516 Update the access control list (ACL) for multiple users in the workspace. 517 518 **Args:** 519 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 520 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 521 the workspace. Defaults to `False` 522 523 **Returns:** 524 - requests.Response: The response from the request. 525 """ 526 url = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 527 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 528 logging.info( 529 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 530 response = self.request_util.run_request( 531 uri=url, 532 method=PATCH, 533 content_type="application/json", 534 data=json.dumps(acl_list) 535 ) 536 537 if response.json()["usersNotFound"] and not invite_users_not_found: 538 # Will be a list of one user 539 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 540 raise Exception( 541 f"The following users were not found and access was not updated: {users_not_found}" 542 ) 543 return response
Update the access control list (ACL) for multiple users in the workspace.
Args:
- acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
- invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
the workspace. Defaults to
False
Returns:
- requests.Response: The response from the request.
545 def create_workspace( 546 self, 547 auth_domain: list[dict] = [], 548 attributes: dict = {}, 549 continue_if_exists: bool = False, 550 ) -> requests.Response: 551 """ 552 Create a new workspace in Terra. 553 554 **Args:** 555 - auth_domain (list[dict], optional): A list of authorization domains. Should look 556 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 557 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 558 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 559 560 **Returns:** 561 - requests.Response: The response from the request. 562 """ 563 payload = { 564 "namespace": self.billing_project, 565 "name": self.workspace_name, 566 "authorizationDomain": auth_domain, 567 "attributes": attributes, 568 "cloudPlatform": GCP 569 } 570 # If workspace already exists then continue if exists 571 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 572 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 573 response = self.request_util.run_request( 574 uri=f"{TERRA_LINK}/workspaces", 575 method=POST, 576 content_type="application/json", 577 data=json.dumps(payload), 578 accept_return_codes=accept_return_codes 579 ) 580 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 581 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 582 return response
Create a new workspace in Terra.
Args:
- auth_domain (list[dict], optional): A list of authorization domains. Should look
like
[{"membersGroupName": "some_auth_domain"}]
. Defaults to an empty list. - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
- continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
584 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 585 """ 586 Create an ingest dictionary for workspace attributes. 587 588 **Args:** 589 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 590 591 **Returns:** 592 - list[dict]: A list of dictionaries containing the workspace attributes. 593 """ 594 # If not provided then call API to get it 595 workspace_attributes = ( 596 workspace_attributes if workspace_attributes 597 else self.get_workspace_info().json()["workspace"]["attributes"] 598 ) 599 600 ingest_dict = [] 601 for key, value in workspace_attributes.items(): 602 # If value is dict just use 'items' as value 603 if isinstance(value, dict): 604 value = value.get("items") 605 # If value is list convert to comma separated string 606 if isinstance(value, list): 607 value = ", ".join(value) 608 ingest_dict.append( 609 { 610 "attribute": key, 611 "value": str(value) if value else None 612 } 613 ) 614 return ingest_dict
Create an ingest dictionary for workspace attributes.
Args:
- workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
Returns:
- list[dict]: A list of dictionaries containing the workspace attributes.
616 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 617 """ 618 Upload metadata to the workspace table. 619 620 **Args:** 621 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 622 623 **Returns:** 624 - requests.Response: The response from the request. 625 """ 626 endpoint = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 627 data = {"entities": open(entities_tsv, "rb")} 628 return self.request_util.upload_file( 629 uri=endpoint, 630 data=data 631 )
Upload metadata to the workspace table.
Args:
- entities_tsv (str): The path to the TSV file containing the metadata to upload.
Returns:
- requests.Response: The response from the request.
633 def get_workspace_workflows(self) -> requests.Response: 634 """ 635 Get the workflows for the workspace. 636 637 **Returns:** 638 - requests.Response: The response from the request. 639 """ 640 uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 641 return self.request_util.run_request( 642 uri=uri, 643 method=GET 644 )
Get the workflows for the workspace.
Returns:
- requests.Response: The response from the request.
646 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 647 """ 648 Import a workflow into the workspace. 649 650 **Args:** 651 - workflow_dict (dict): The dictionary containing the workflow information. 652 - continue_if_exists (bool, optional): Whether to continue if the workflow 653 already exists. Defaults to `False`. 654 655 **Returns:** 656 - requests.Response: The response from the request. 657 """ 658 uri = f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 659 workflow_json = json.dumps(workflow_dict) 660 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 661 return self.request_util.run_request( 662 uri=uri, 663 method=POST, 664 data=workflow_json, 665 content_type="application/json", 666 accept_return_codes=accept_return_codes 667 )
Import a workflow into the workspace.
Args:
- workflow_dict (dict): The dictionary containing the workflow information.
- continue_if_exists (bool, optional): Whether to continue if the workflow
already exists. Defaults to
False
.
Returns:
- requests.Response: The response from the request.
669 def delete_workspace(self) -> requests.Response: 670 """ 671 Delete a Terra workspace. 672 673 **Returns:** 674 - requests.Response: The response from the request. 675 """ 676 return self.request_util.run_request( 677 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}", 678 method=DELETE 679 )
Delete a Terra workspace.
Returns:
- requests.Response: The response from the request.
681 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 682 """ 683 Update the attributes for the workspace. 684 685 **Args:** 686 - attributes (dict): The attributes to update. 687 688 **Returns:** 689 - requests.Response: The response from the request. 690 """ 691 return self.request_util.run_request( 692 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 693 method=PATCH, 694 data=json.dumps(attributes), 695 content_type="application/json" 696 )
Update the attributes for the workspace.
Args:
- attributes (dict): The attributes to update.
Returns:
- requests.Response: The response from the request.
698 def leave_workspace( 699 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 700 ) -> requests.Response: 701 """ 702 Leave a workspace. If workspace ID not supplied, will look it up. 703 704 **Args:** 705 - workspace_id (str, optional): The workspace ID. Defaults to None. 706 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 707 Defaults to `False`. 708 709 **Returns:** 710 - requests.Response: The response from the request. 711 """ 712 if not workspace_id: 713 workspace_info = self.get_workspace_info().json() 714 workspace_id = workspace_info['workspace']['workspaceId'] 715 accepted_return_code = [403] if ignore_direct_access_error else [] 716 717 res = self.request_util.run_request( 718 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 719 method=DELETE, 720 accept_return_codes=accepted_return_code 721 ) 722 if (res.status_code == 403 723 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 724 logging.info( 725 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 726 f"access to the workspace (they could be an owner on the billing project)" 727 ) 728 return res
Leave a workspace. If workspace ID not supplied, will look it up.
Args:
- workspace_id (str, optional): The workspace ID. Defaults to None.
- ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
Defaults to
False
.
Returns:
- requests.Response: The response from the request.
730 def change_workspace_public_setting(self, public: bool) -> requests.Response: 731 """ 732 Change a workspace's public setting. 733 734 **Args:** 735 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 736 public, `False` otherwise. 737 738 **Returns:** 739 - requests.Response: The response from the request. 740 """ 741 body = [ 742 { 743 "settingType": "PubliclyReadable", 744 "config": { 745 "enabled": public 746 } 747 } 748 ] 749 return self.request_util.run_request( 750 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 751 method=PUT, 752 content_type="application/json", 753 data=json.dumps(body) 754 )
Change a workspace's public setting.
Args:
- public (bool, optional): Whether the workspace should be public. Set to
True
to be made public,False
otherwise.
Returns:
- requests.Response: The response from the request.
756 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 757 """ 758 Check if a workspace is public. 759 760 **Args:** 761 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 762 it up if not provided. Defaults to None. 763 764 **Returns:** 765 - requests.Response: The response from the request. 766 """ 767 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 768 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 769 return self.request_util.run_request( 770 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 771 method=GET 772 )
Check if a workspace is public.
Args:
- bucket (str, optional): The bucket name (provided without the
gs://
prefix). Will look it up if not provided. Defaults to None.
Returns:
- requests.Response: The response from the request.
774 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 775 """Delete an entire entity table from a Terra workspace. 776 777 **Args:** 778 - entity_to_delete (str): The name of the entity table to delete. 779 780 **Returns:** 781 - requests.Response: The response from the request. 782 """ 783 response = self.request_util.run_request( 784 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", 785 method=DELETE 786 ) 787 if response.status_code == 204: 788 logging.info( 789 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 790 f"'{self.billing_project}/{self.workspace_name}'" 791 ) 792 else: 793 logging.error( 794 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 795 f"table: {response.text}" 796 ) 797 return response
Delete an entire entity table from a Terra workspace.
Args:
- entity_to_delete (str): The name of the entity table to delete.
Returns:
- requests.Response: The response from the request.
799 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 800 """Save an entity table version in a Terra workspace. 801 802 **Args:** 803 - entity_type (str): The name of the entity table to save a new version for 804 - version_name (str): The name of the new version 805 """ 806 # Get the workspace metrics 807 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 808 file_name = f"{entity_type}.json" 809 # Write the workspace metrics to a JSON file 810 with open(file_name, "w") as json_file: 811 json.dump(workspace_metrics, json_file) 812 813 # Create a zip file with the same naming convention that Terra backend uses 814 timestamp_ms = int(time.time() * 1000) 815 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 816 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 817 zipf.write(file_name, arcname=f"json/{file_name}") 818 819 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 820 workspace_info = self.get_workspace_info().json() 821 path_to_upload_to = os.path.join( 822 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 823 ) 824 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 825 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 826 try: 827 active_account = gcp_util.get_active_gcloud_account() 828 except Exception as e: 829 active_account = workspace_info["workspace"]["createdBy"] 830 logging.error( 831 f"Encountered the following exception while attempting to get current GCP account: {e}. " 832 f"Will set the owner of the new metadata version as the workspace creator instead." 833 ) 834 gcp_util.upload_blob( 835 source_file=zip_file_name, 836 destination_path=path_to_upload_to, 837 custom_metadata={ 838 "createdBy": active_account, 839 "entityType": entity_type, 840 "timestamp": timestamp_ms, 841 "description": version_name, 842 } 843 )
Save an entity table version in a Terra workspace.
Args:
- entity_type (str): The name of the entity table to save a new version for
- version_name (str): The name of the new version
845 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 846 """ 847 Add a user comment to a submission in Terra. 848 849 **Args:** 850 - submission_id (str): The ID of the submission to add a comment to. 851 - user_comment (str): The comment to add to the submission. 852 853 **Returns:** 854 - requests.Response: The response from the request. 855 """ 856 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 857 return self.request_util.run_request( 858 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 859 method=PATCH, 860 content_type="application/json", 861 data=json.dumps({"userComment": user_comment}), 862 )
Add a user comment to a submission in Terra.
Args:
- submission_id (str): The ID of the submission to add a comment to.
- user_comment (str): The comment to add to the submission.
Returns:
- requests.Response: The response from the request.
864 def initiate_submission( 865 self, 866 method_config_namespace: str, 867 method_config_name: str, 868 entity_type: str, 869 entity_name: str, 870 expression: str, 871 user_comment: Optional[str], 872 use_call_cache: bool = True 873 ) -> requests.Response: 874 """ 875 Initiate a submission within a Terra workspace. 876 877 Note - the workflow being initiated MUST already be imported into the workspace. 878 879 **Args:** 880 - method_config_namespace (str): The namespace of the method configuration. 881 - method_config_name (str): The name of the method configuration to use for the submission 882 (i.e. the workflow name). 883 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 884 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 885 "sample_set_1"). 886 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 887 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 888 be launched PER SAMPLE, the expression should be `this.samples`. 889 - user_comment (str, optional): The user comment to add to the submission. 890 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 891 892 **Returns:** 893 - requests.Response: The response from the request. 894 """ 895 payload = { 896 "methodConfigurationNamespace": method_config_namespace, 897 "methodConfigurationName": method_config_name, 898 "entityType": entity_type, 899 "entityName": entity_name, 900 "expression": expression, 901 "useCallCache": use_call_cache, 902 "deleteIntermediateOutputFiles": False, 903 "useReferenceDisks": False, 904 "ignoreEmptyOutputs": False, 905 } 906 if user_comment: 907 payload["userComment"] = user_comment 908 909 return self.request_util.run_request( 910 uri=f"{TERRA_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 911 method=POST, 912 content_type="application/json", 913 data=json.dumps(payload), 914 )
Initiate a submission within a Terra workspace.
Note - the workflow being initiated MUST already be imported into the workspace.
Args:
- method_config_namespace (str): The namespace of the method configuration.
- method_config_name (str): The name of the method configuration to use for the submission (i.e. the workflow name).
- entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
- entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or "sample_set_1").
- expression (str): The "expression" to use. For example, if the
entity_type
issample
and the workflow is launching one sample, this can be left asthis
. If theentity_type
issample_set
, but one workflow should be launched PER SAMPLE, the expression should bethis.samples
. - user_comment (str, optional): The user comment to add to the submission.
- use_call_cache (bool, optional): Whether to use the call caching. Defaults to
True
.
Returns:
- requests.Response: The response from the request.
916 def retry_failed_submission(self, submission_id: str) -> requests.Response: 917 """ 918 Retry a failed submission in Terra. 919 920 **Args:** 921 - submission_id (str): The ID of the submission to retry. 922 923 **Returns:** 924 - requests.Response: The response from the request. 925 """ 926 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 927 payload = {"retryType": "Failed"} 928 logging.info( 929 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 930 ) 931 return self.request_util.run_request( 932 uri=url, 933 method=POST, 934 content_type="application/json", 935 data=json.dumps(payload) 936 )
Retry a failed submission in Terra.
Args:
- submission_id (str): The ID of the submission to retry.
Returns:
- requests.Response: The response from the request.
938 def get_submission_status(self, submission_id: str) -> requests.Response: 939 """ 940 Get the status of a submission in Terra. 941 942 **Args:** 943 - submission_id (str): The ID of the submission. 944 945 **Returns:** 946 - requests.Response: The response from the request. 947 """ 948 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 949 logging.info( 950 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 951 ) 952 return self.request_util.run_request( 953 uri=url, 954 method=GET 955 )
Get the status of a submission in Terra.
Args:
- submission_id (str): The ID of the submission.
Returns:
- requests.Response: The response from the request.