ops_utils.terra_util
Utilities for working with Terra.
1"""Utilities for working with Terra.""" 2import json 3import logging 4import re 5from typing import Any, Optional 6import requests 7import time 8import zipfile 9import os 10 11from . import deprecated 12from .vars import GCP, APPLICATION_JSON 13from .gcp_utils import GCPCloudFunctions 14from .request_util import GET, POST, PATCH, PUT, DELETE, RunRequest 15 16# Constants for Terra API links 17TERRA_DEV_LINK = "https://firecloud-orchestration.dsde-dev.broadinstitute.org/api" 18"""@private""" 19TERRA_PROD_LINK = "https://api.firecloud.org/api" 20"""@private""" 21LEONARDO_LINK = "https://leonardo.dsde-prod.broadinstitute.org/api" 22"""@private""" 23WORKSPACE_LINK = "https://workspace.dsde-prod.broadinstitute.org/api/workspaces/v1" 24"""@private""" 25SAM_LINK = "https://sam.dsde-prod.broadinstitute.org/api" 26"""@private""" 27RAWLS_LINK = "https://rawls.dsde-prod.broadinstitute.org/api" 28"""@private""" 29 30MEMBER = "member" 31ADMIN = "admin" 32 33 34class Terra: 35 """Class for generic Terra utilities.""" 36 37 def __init__(self, request_util: RunRequest, env: str = "prod"): 38 """ 39 Initialize the Terra class. 40 41 **Args:** 42 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 43 request utility class to handle HTTP requests. 44 """ 45 self.request_util = request_util 46 """@private""" 47 48 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 49 """ 50 Fetch the list of accessible workspaces. 51 52 **Args:** 53 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 54 55 **Returns:** 56 - requests.Response: The response from the request. 57 """ 58 fields_str = "fields=" + ",".join(fields) if fields else "" 59 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 60 return self.request_util.run_request( 61 uri=url, 62 method=GET 63 ) 64 65 def get_pet_account_json(self) -> requests.Response: 66 """ 67 Get the service account JSON. 68 69 **Returns:** 70 - requests.Response: The response from the request. 71 """ 72 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 73 return self.request_util.run_request( 74 uri=url, 75 method=GET 76 ) 77 78 79class TerraGroups: 80 """A class to manage Terra groups and their memberships.""" 81 82 GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN] 83 """@private""" 84 CONFLICT_STATUS_CODE = 409 85 """@private""" 86 87 def __init__(self, request_util: RunRequest): 88 """ 89 Initialize the TerraGroups class. 90 91 **Args:** 92 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 93 utility class to handle HTTP requests. 94 """ 95 self.request_util = request_util 96 """@private""" 97 98 def _check_role(self, role: str) -> None: 99 """ 100 Check if the role is valid. 101 102 Args: 103 role (str): The role to check. 104 105 Raises: 106 ValueError: If the role is not one of the allowed options. 107 """ 108 if role not in self.GROUP_MEMBERSHIP_OPTIONS: 109 raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}") 110 111 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 112 """ 113 Remove a user from a group. 114 115 **Args:** 116 - group (str): The name of the group. 117 - email (str): The email of the user to remove. 118 - role (str): The role of the user in the group 119 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 120 121 **Returns:** 122 - requests.Response: The response from the request. 123 """ 124 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 125 self._check_role(role) 126 logging.info(f"Removing {email} from group {group}") 127 return self.request_util.run_request( 128 uri=url, 129 method=DELETE 130 ) 131 132 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 133 """ 134 Create a new group. 135 136 **Args:** 137 - group_name (str): The name of the group to create. 138 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 139 140 **Returns:** 141 - requests.Response: The response from the request. 142 """ 143 url = f"{SAM_LINK}/groups/v1/{group_name}" 144 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 145 response = self.request_util.run_request( 146 uri=url, 147 method=POST, 148 accept_return_codes=accept_return_codes 149 ) 150 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 151 logging.info(f"Group {group_name} already exists. Continuing.") 152 return response 153 else: 154 logging.info(f"Created group {group_name}") 155 return response 156 157 def delete_group(self, group_name: str) -> requests.Response: 158 """ 159 Delete a group. 160 161 **Args:** 162 - group_name (str): The name of the group to delete. 163 164 **Returns:** 165 - requests.Response: The response from the request. 166 """ 167 url = f"{SAM_LINK}/groups/v1/{group_name}" 168 logging.info(f"Deleting group {group_name}") 169 return self.request_util.run_request( 170 uri=url, 171 method=DELETE 172 ) 173 174 def add_user_to_group( 175 self, group: str, email: str, role: str, continue_if_exists: bool = False 176 ) -> requests.Response: 177 """ 178 Add a user to a group. 179 180 **Args:** 181 - group (str): The name of the group. 182 - email (str): The email of the user to add. 183 - role (str): The role of the user in the group 184 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 185 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 186 Defaults to `False`. 187 188 **Returns:** 189 - requests.Response: The response from the request. 190 """ 191 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 192 self._check_role(role) 193 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 194 logging.info(f"Adding {email} to group {group} as {role}") 195 return self.request_util.run_request( 196 uri=url, 197 method=PUT, 198 accept_return_codes=accept_return_codes 199 ) 200 201 def check_group_members(self, group: str, role: str) -> requests.Response: 202 """ 203 Check the members of a group. 204 205 **Args:** 206 - group (str): The name of the group. 207 - role (str): The role to check for in the group 208 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 209 210 **Returns:** 211 - requests.Response: The response from the request. 212 """ 213 url = f"{SAM_LINK}/groups/v1/{group}/{role}" 214 self._check_role(role) 215 logging.info(f"Checking {role}s in group {group}") 216 return self.request_util.run_request( 217 uri=url, 218 method=GET 219 ) 220 221 222class TerraWorkspace: 223 """Terra workspace class to manage workspaces and their attributes.""" 224 225 CONFLICT_STATUS_CODE = 409 226 """@private""" 227 228 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"): 229 """ 230 Initialize the TerraWorkspace class. 231 232 **Args:** 233 - billing_project (str): The billing project associated with the workspace. 234 - workspace_name (str): The name of the workspace. 235 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 236 request utility class to handle HTTP requests. 237 """ 238 self.billing_project = billing_project 239 """@private""" 240 self.workspace_name = workspace_name 241 """@private""" 242 self.workspace_id = None 243 """@private""" 244 self.resource_id = None 245 """@private""" 246 self.storage_container = None 247 """@private""" 248 self.bucket = None 249 """@private""" 250 self.wds_url = None 251 """@private""" 252 self.account_url: Optional[str] = None 253 """@private""" 254 self.request_util = request_util 255 """@private""" 256 if env.lower() == "dev": 257 self.terra_link = TERRA_DEV_LINK 258 """@private""" 259 elif env.lower() == "prod": 260 self.terra_link = TERRA_PROD_LINK 261 """@private""" 262 else: 263 raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.") 264 265 def __repr__(self) -> str: 266 """ 267 Return a string representation of the TerraWorkspace instance. 268 269 Returns: 270 str: The string representation of the TerraWorkspace instance. 271 """ 272 return f"{self.billing_project}/{self.workspace_name}" 273 274 def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000, verbose = True) -> Any: 275 """ 276 Yield all entity metrics from the workspace. 277 278 Args: 279 entity (str): The type of entity to query. 280 total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000. 281 verbose (bool): If True, will log the progress of fetching entity metrics. Defaults to True. 282 283 Yields: 284 Any: The JSON response containing entity metrics. 285 """ 286 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}" # noqa: E501 287 response = self.request_util.run_request( 288 uri=url, 289 method=GET, 290 content_type=APPLICATION_JSON 291 ) 292 raw_text = response.text 293 first_page_json = json.loads( 294 raw_text, 295 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 296 ) 297 yield first_page_json 298 total_pages = first_page_json["resultMetadata"]["filteredPageCount"] 299 if verbose: 300 logging.info( 301 f"Looping through {total_pages} pages of data") 302 303 for page in range(2, total_pages + 1): 304 if verbose: 305 logging.info(f"Getting page {page} of {total_pages}") 306 next_page = self.request_util.run_request( 307 uri=url, 308 method=GET, 309 content_type=APPLICATION_JSON, 310 params={"page": page} 311 ) 312 raw_text = next_page.text 313 page_json = json.loads( 314 raw_text, 315 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 316 ) 317 yield page_json 318 319 @staticmethod 320 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 321 """Check that all headers follow the standards required by TDR. 322 323 **Args:** 324 - table_name (str): The name of the Terra table. 325 - headers (list[str]): The headers of the Terra table to validate. 326 327 **Raises:** 328 - ValueError if any headers are considered invalid by TDR standards 329 """ 330 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 331 tdr_max_header_length = 63 332 333 headers_containing_too_many_characters = [] 334 headers_contain_invalid_characters = [] 335 336 for header in headers: 337 if len(header) > tdr_max_header_length: 338 headers_containing_too_many_characters.append(header) 339 if not re.match(tdr_header_allowed_pattern, header): 340 headers_contain_invalid_characters.append(header) 341 342 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 343 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 344 header naming.""" 345 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 346 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 347 allowed in TDR is {tdr_max_header_length}.\n""" 348 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 349 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 350 only contain numbers, letters, and underscore characters.\n""" 351 352 error_to_report = "" 353 if headers_containing_too_many_characters: 354 error_to_report += too_many_characters_error_message 355 if headers_contain_invalid_characters: 356 error_to_report += invalid_characters_error_message 357 if error_to_report: 358 error_to_report += base_error_message 359 raise ValueError(error_to_report) 360 361 def get_workspace_info(self) -> requests.Response: 362 """ 363 Get workspace information. 364 365 **Returns:** 366 - requests.Response: The response from the request. 367 """ 368 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}" 369 logging.info( 370 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 371 return self.request_util.run_request(uri=url, method=GET) 372 373 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]: 374 """ 375 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 376 377 **Args:** 378 - entity_type (str): The type of entity to get metrics for. 379 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 380 - verbose (bool, optional): Whether to log verbose output. Defaults to `True`. 381 382 **Returns:** 383 - list[dict]: A list of dictionaries containing entity metrics. 384 """ 385 results = [] 386 if verbose: 387 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 388 389 for page in self._yield_all_entity_metrics(entity=entity_type, verbose=verbose): 390 results.extend(page["results"]) 391 392 # If remove_dicts is True, remove dictionaries from the workspace metrics 393 if remove_dicts: 394 for row in results: 395 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 396 return results 397 398 def get_flat_list_of_table_entity(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]: 399 """ 400 Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add 401 the entity name to the dictionary with key "{entity_type}_id". 402 403 **Args:** 404 - entity_type (str): The type of entity to get metrics for. 405 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 406 - verbose (bool, optional): Whether to log verbose output. Defaults to `True`. 407 408 **Returns:** 409 - list[dict]: A list of dictionaries containing entity metrics. 410 """ 411 table_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type, remove_dicts=remove_dicts, verbose=verbose) 412 convert_metrics = [] 413 for row in table_metrics: 414 converted_row = row['attributes'] 415 converted_row[f"{row['entityType']}_id"] = row['name'] 416 convert_metrics.append(converted_row) 417 return convert_metrics 418 419 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 420 """ 421 Get specific entity metrics for a given entity type and name. 422 423 **Args:** 424 - entity_type (str): The type of entity to get metrics for. 425 - entity_name (str): The name of the entity to get metrics for. 426 427 **Returns:** 428 - requests.Response: The response from the request. 429 """ 430 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" # noqa: E501 431 return self.request_util.run_request(uri=url, method=GET) 432 433 def _remove_dict_from_attributes(self, attributes: dict) -> dict: 434 """ 435 Remove dictionaries from the attributes. 436 437 Args: 438 attributes (dict): The attributes to remove dictionaries from. 439 440 Returns: 441 dict: The updated attributes with no dictionaries. 442 """ 443 for key, value in attributes.items(): 444 attributes[key] = self._remove_dict_from_cell(value) 445 return attributes 446 447 def _remove_dict_from_cell(self, cell_value: Any) -> Any: 448 """ 449 Remove a dictionary from a cell. 450 451 Args: 452 cell_value (Any): The dictionary to remove. 453 454 Returns: 455 Any: The updated cell with no dictionaries. 456 """ 457 if isinstance(cell_value, dict): 458 entity_name = cell_value.get("entityName") 459 # If the cell value is a dictionary, check if it has an entityName key 460 if entity_name: 461 # If the cell value is a dictionary with an entityName key, return the entityName 462 return entity_name 463 entity_list = cell_value.get("items") 464 if entity_list or entity_list == []: 465 # If the cell value is a list of dictionaries, recursively call this function on each dictionary 466 return [ 467 self._remove_dict_from_cell(entity) for entity in entity_list 468 ] 469 return cell_value 470 return cell_value 471 472 def get_workspace_bucket(self) -> str: 473 """ 474 Get the workspace bucket name. Does not include the `gs://` prefix. 475 476 **Returns:** 477 - str: The bucket name. 478 """ 479 return self.get_workspace_info().json()["workspace"]["bucketName"] 480 481 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 482 """ 483 Get workspace entity information. 484 485 **Args:** 486 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 487 488 **Returns:** 489 - requests.Response: The response from the request. 490 """ 491 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 492 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 493 return self.request_util.run_request(uri=url, method=GET) 494 495 def get_workspace_acl(self) -> requests.Response: 496 """ 497 Get the workspace access control list (ACL). 498 499 **Returns:** 500 - requests.Response: The response from the request. 501 """ 502 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 503 return self.request_util.run_request( 504 uri=url, 505 method=GET 506 ) 507 508 def update_user_acl( 509 self, 510 email: str, 511 access_level: str, 512 can_share: bool = False, 513 can_compute: bool = False, 514 invite_users_not_found: bool = False, 515 ) -> requests.Response: 516 """ 517 Update the access control list (ACL) for a user in the workspace. 518 519 **Args:** 520 - email (str): The email of the user. 521 - access_level (str): The access level to grant to the user. 522 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 523 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 524 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 525 the workspace. Defaults to `False` 526 527 **Returns:** 528 - requests.Response: The response from the request. 529 """ 530 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 531 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 532 payload = { 533 "email": email, 534 "accessLevel": access_level, 535 "canShare": can_share, 536 "canCompute": can_compute, 537 } 538 logging.info( 539 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 540 response = self.request_util.run_request( 541 uri=url, 542 method=PATCH, 543 content_type=APPLICATION_JSON, 544 data="[" + json.dumps(payload) + "]" 545 ) 546 547 if response.json()["usersNotFound"] and not invite_users_not_found: 548 # Will be a list of one user 549 user_not_found = response.json()["usersNotFound"][0] 550 raise Exception( 551 f'The user {user_not_found["email"]} was not found and access was not updated' 552 ) 553 return response 554 555 @deprecated( 556 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 557 ) 558 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 559 """ 560 Update the metadata for a library dataset. 561 562 **Args:** 563 - library_metadata (dict): The metadata to update. 564 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 565 566 **Returns:** 567 - requests.Response: The response from the request. 568 """ 569 acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \ 570 f"/metadata?validate={str(validate).lower()}" 571 return self.request_util.run_request( 572 uri=acl, 573 method=PUT, 574 data=json.dumps(library_metadata) 575 ) 576 577 def update_multiple_users_acl( 578 self, acl_list: list[dict], invite_users_not_found: bool = False 579 ) -> requests.Response: 580 """ 581 Update the access control list (ACL) for multiple users in the workspace. 582 583 **Args:** 584 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 585 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 586 the workspace. Defaults to `False` 587 588 **Returns:** 589 - requests.Response: The response from the request. 590 """ 591 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 592 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 593 logging.info( 594 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 595 response = self.request_util.run_request( 596 uri=url, 597 method=PATCH, 598 content_type=APPLICATION_JSON, 599 data=json.dumps(acl_list) 600 ) 601 602 if response.json()["usersNotFound"] and not invite_users_not_found: 603 # Will be a list of one user 604 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 605 raise Exception( 606 f"The following users were not found and access was not updated: {users_not_found}" 607 ) 608 return response 609 610 def create_workspace( 611 self, 612 auth_domain: list[dict] = [], 613 attributes: dict = {}, 614 continue_if_exists: bool = False, 615 ) -> requests.Response: 616 """ 617 Create a new workspace in Terra. 618 619 **Args:** 620 - auth_domain (list[dict], optional): A list of authorization domains. Should look 621 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 622 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 623 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 624 625 **Returns:** 626 - requests.Response: The response from the request. 627 """ 628 payload = { 629 "namespace": self.billing_project, 630 "name": self.workspace_name, 631 "authorizationDomain": auth_domain, 632 "attributes": attributes, 633 "cloudPlatform": GCP 634 } 635 # If workspace already exists then continue if exists 636 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 637 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 638 response = self.request_util.run_request( 639 uri=f"{self.terra_link}/workspaces", 640 method=POST, 641 content_type=APPLICATION_JSON, 642 data=json.dumps(payload), 643 accept_return_codes=accept_return_codes 644 ) 645 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 646 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 647 return response 648 649 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 650 """ 651 Create an ingest dictionary for workspace attributes. 652 653 **Args:** 654 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 655 656 **Returns:** 657 - list[dict]: A list of dictionaries containing the workspace attributes. 658 """ 659 # If not provided then call API to get it 660 workspace_attributes = ( 661 workspace_attributes if workspace_attributes 662 else self.get_workspace_info().json()["workspace"]["attributes"] 663 ) 664 665 ingest_dict = [] 666 for key, value in workspace_attributes.items(): 667 # If value is dict just use 'items' as value 668 if isinstance(value, dict): 669 value = value.get("items") 670 # If value is list convert to comma separated string 671 if isinstance(value, list): 672 value = ", ".join(value) 673 ingest_dict.append( 674 { 675 "attribute": key, 676 "value": str(value) if value else None 677 } 678 ) 679 return ingest_dict 680 681 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 682 """ 683 Upload metadata to the workspace table. 684 685 **Args:** 686 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 687 688 **Returns:** 689 - requests.Response: The response from the request. 690 """ 691 endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 692 data = {"entities": open(entities_tsv, "rb")} 693 return self.request_util.upload_file( 694 uri=endpoint, 695 data=data 696 ) 697 698 def _batch_upsert(self, update_dict: list) -> requests.Response: 699 """ 700 Run batch upsert on workspace table. 701 702 **Args:** 703 - update_dict (dict): A dictionary to update the workspace table. 704 **Returns:** 705 - requests.Response: The response from the request. 706 """ 707 endpoint = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/batchUpsert" 708 return self.request_util.run_request( 709 uri=endpoint, 710 data=json.dumps(update_dict), 711 content_type=APPLICATION_JSON, 712 method=POST 713 ) 714 715 def upload_metadata_with_batch_upsert(self, table_data: dict, force: bool = False) -> requests.Response: 716 """ 717 Upload metadata to one or more workspace entity tables using batch upsert. 718 719 Builds the Terra batch upsert payload from a structured input dictionary and calls 720 `batch_upsert` with the result. 721 722 **Args:** 723 - table_data (dict): A dictionary mapping table names to their data configuration. 724 Each entry should have the following structure: 725 726 ```python 727 { 728 "table_name": { 729 "table_id_column": "column_that_is_the_entity_id", 730 "row_data": [ 731 {"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...}, 732 ... 733 ] 734 }, 735 ... 736 } 737 ``` 738 739 - `table_id_column`: The name of the column whose value is used as the entity name 740 (`name` field in the upsert payload). This column is **not** included as an attribute 741 operation. 742 - `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes 743 an `AddUpdateAttribute` operation. 744 - force (bool, optional): Whether to force update if id column does not match table name + _id. 745 746 **Returns:** 747 - requests.Response: The response from the request. 748 """ 749 upsert_payload = [] 750 table_name_failures = [] 751 for table_name, config in table_data.items(): 752 id_column = config["table_id_column"] 753 if id_column != f"{table_name}_id": 754 table_name_failures.append( 755 f"id column, {id_column}, does not match table {table_name}. This column will be renamed to {table_name}_id." 756 "Use force=True to force update." 757 ) 758 rows = config["row_data"] 759 for row in rows: 760 entity_name = row.get(id_column) 761 if entity_name is None: 762 raise Exception(f"Primary key column '{id_column}' is missing from row data - {row}") 763 operations = [ 764 { 765 "op": "AddUpdateAttribute", 766 "attributeName": col, 767 "addUpdateAttribute": value, 768 } 769 for col, value in row.items() 770 if col != id_column 771 ] 772 upsert_payload.append( 773 { 774 "name": entity_name, 775 "entityType": table_name, 776 "operations": operations, 777 } 778 ) 779 if table_name_failures: 780 for message in table_name_failures: 781 if force: 782 logging.warning(message) 783 else: 784 logging.error(message) 785 if not force: 786 raise Exception("One or more tables have id columns that do not match the expected format." 787 " See error messages above for details. Use force=True to force update.") 788 return self._batch_upsert(upsert_payload) 789 790 def get_workspace_workflows(self) -> requests.Response: 791 """ 792 Get the workflows for the workspace. 793 794 **Returns:** 795 - requests.Response: The response from the request. 796 """ 797 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 798 return self.request_util.run_request( 799 uri=uri, 800 method=GET 801 ) 802 803 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 804 """ 805 Import a workflow into the workspace. 806 807 **Args:** 808 - workflow_dict (dict): The dictionary containing the workflow information. 809 - continue_if_exists (bool, optional): Whether to continue if the workflow 810 already exists. Defaults to `False`. 811 812 **Returns:** 813 - requests.Response: The response from the request. 814 """ 815 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 816 workflow_json = json.dumps(workflow_dict) 817 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 818 return self.request_util.run_request( 819 uri=uri, 820 method=POST, 821 data=workflow_json, 822 content_type=APPLICATION_JSON, 823 accept_return_codes=accept_return_codes 824 ) 825 826 def delete_workspace(self) -> requests.Response: 827 """ 828 Delete a Terra workspace. 829 830 **Returns:** 831 - requests.Response: The response from the request. 832 """ 833 return self.request_util.run_request( 834 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}", 835 method=DELETE 836 ) 837 838 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 839 """ 840 Update the attributes for the workspace. 841 842 **Args:** 843 - attributes (dict): The attributes to update. 844 845 **Returns:** 846 - requests.Response: The response from the request. 847 """ 848 return self.request_util.run_request( 849 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 850 method=PATCH, 851 data=json.dumps(attributes), 852 content_type=APPLICATION_JSON 853 ) 854 855 def leave_workspace( 856 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 857 ) -> requests.Response: 858 """ 859 Leave a workspace. If workspace ID not supplied, will look it up. 860 861 **Args:** 862 - workspace_id (str, optional): The workspace ID. Defaults to None. 863 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 864 Defaults to `False`. 865 866 **Returns:** 867 - requests.Response: The response from the request. 868 """ 869 if not workspace_id: 870 workspace_info = self.get_workspace_info().json() 871 workspace_id = workspace_info['workspace']['workspaceId'] 872 accepted_return_code = [403] if ignore_direct_access_error else [] 873 874 res = self.request_util.run_request( 875 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 876 method=DELETE, 877 accept_return_codes=accepted_return_code 878 ) 879 if (res.status_code == 403 880 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 881 logging.info( 882 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 883 f"access to the workspace (they could be an owner on the billing project)" 884 ) 885 return res 886 887 def set_table_column_order(self, column_order: dict) -> requests.Response: 888 """ 889 Set the column order for one or more entity tables in the workspace. 890 891 **Args:** 892 - column_order (dict): A dictionary mapping table names to their column configuration. Each table entry 893 should have the following structure: 894 895 ``` 896 { 897 "table_name": { 898 "shown": ["col1", "col2", ...], # Columns to display, in order 899 "hidden": ["col3", "col4", ...] # Columns to hide 900 }, 901 ... 902 } 903 ``` 904 905 **Returns:** 906 - requests.Response: The response from the request. 907 """ 908 logging.info( 909 f"Setting column order for tables in workspace {self.billing_project}/{self.workspace_name}" 910 ) 911 return self.update_workspace_attributes( 912 attributes=[ 913 { 914 "op": "AddUpdateAttribute", 915 "attributeName": "workspace-column-defaults", 916 "addUpdateAttribute": json.dumps(column_order) 917 } 918 ] 919 ) 920 921 def change_workspace_public_setting(self, public: bool) -> requests.Response: 922 """ 923 Change a workspace's public setting. 924 925 **Args:** 926 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 927 public, `False` otherwise. 928 929 **Returns:** 930 - requests.Response: The response from the request. 931 """ 932 body = [ 933 { 934 "settingType": "PubliclyReadable", 935 "config": { 936 "enabled": public 937 } 938 } 939 ] 940 return self.request_util.run_request( 941 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 942 method=PUT, 943 content_type=APPLICATION_JSON, 944 data=json.dumps(body) 945 ) 946 947 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 948 """ 949 Check if a workspace is public. 950 951 **Args:** 952 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 953 it up if not provided. Defaults to None. 954 955 **Returns:** 956 - requests.Response: The response from the request. 957 """ 958 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 959 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 960 return self.request_util.run_request( 961 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 962 method=GET 963 ) 964 965 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 966 """Delete an entire entity table from a Terra workspace. 967 968 **Args:** 969 - entity_to_delete (str): The name of the entity table to delete. 970 971 **Returns:** 972 - requests.Response: The response from the request. 973 """ 974 response = self.request_util.run_request( 975 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", # noqa: E501 976 method=DELETE 977 ) 978 if response.status_code == 204: 979 logging.info( 980 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 981 f"'{self.billing_project}/{self.workspace_name}'" 982 ) 983 else: 984 logging.error( 985 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 986 f"table: {response.text}" 987 ) 988 return response 989 990 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 991 """Save an entity table version in a Terra workspace. 992 993 **Args:** 994 - entity_type (str): The name of the entity table to save a new version for 995 - version_name (str): The name of the new version 996 """ 997 # Get the workspace metrics 998 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 999 file_name = f"{entity_type}.json" 1000 # Write the workspace metrics to a JSON file 1001 with open(file_name, "w") as json_file: 1002 json.dump(workspace_metrics, json_file) 1003 1004 # Create a zip file with the same naming convention that Terra backend uses 1005 timestamp_ms = int(time.time() * 1000) 1006 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 1007 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 1008 zipf.write(file_name, arcname=f"json/{file_name}") 1009 1010 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 1011 workspace_info = self.get_workspace_info().json() 1012 path_to_upload_to = os.path.join( 1013 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 1014 ) 1015 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 1016 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 1017 try: 1018 active_account = gcp_util.get_active_gcloud_account() 1019 except Exception as e: 1020 active_account = workspace_info["workspace"]["createdBy"] 1021 logging.error( 1022 f"Encountered the following exception while attempting to get current GCP account: {e}. " 1023 f"Will set the owner of the new metadata version as the workspace creator instead." 1024 ) 1025 gcp_util.upload_blob( 1026 source_file=zip_file_name, 1027 destination_path=path_to_upload_to, 1028 custom_metadata={ 1029 "createdBy": active_account, 1030 "entityType": entity_type, 1031 "timestamp": timestamp_ms, 1032 "description": version_name, 1033 } 1034 ) 1035 1036 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 1037 """ 1038 Add a user comment to a submission in Terra. 1039 1040 **Args:** 1041 - submission_id (str): The ID of the submission to add a comment to. 1042 - user_comment (str): The comment to add to the submission. 1043 1044 **Returns:** 1045 - requests.Response: The response from the request. 1046 """ 1047 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 1048 return self.request_util.run_request( 1049 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 1050 method=PATCH, 1051 content_type=APPLICATION_JSON, 1052 data=json.dumps({"userComment": user_comment}), 1053 ) 1054 1055 def initiate_submission( 1056 self, 1057 method_config_namespace: str, 1058 method_config_name: str, 1059 entity_type: str, 1060 entity_name: str, 1061 expression: str, 1062 user_comment: Optional[str], 1063 use_call_cache: bool = True 1064 ) -> requests.Response: 1065 """ 1066 Initiate a submission within a Terra workspace. 1067 1068 Note - the workflow being initiated MUST already be imported into the workspace. 1069 1070 **Args:** 1071 - method_config_namespace (str): The namespace of the method configuration. 1072 - method_config_name (str): The name of the method configuration to use for the submission 1073 (i.e. the workflow name). 1074 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 1075 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 1076 "sample_set_1"). 1077 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 1078 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 1079 be launched PER SAMPLE, the expression should be `this.samples`. 1080 - user_comment (str, optional): The user comment to add to the submission. 1081 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 1082 1083 **Returns:** 1084 - requests.Response: The response from the request. 1085 """ 1086 payload = { 1087 "methodConfigurationNamespace": method_config_namespace, 1088 "methodConfigurationName": method_config_name, 1089 "entityType": entity_type, 1090 "entityName": entity_name, 1091 "expression": expression, 1092 "useCallCache": use_call_cache, 1093 "deleteIntermediateOutputFiles": False, 1094 "useReferenceDisks": False, 1095 "ignoreEmptyOutputs": False, 1096 } 1097 if user_comment: 1098 payload["userComment"] = user_comment 1099 1100 return self.request_util.run_request( 1101 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 1102 method=POST, 1103 content_type=APPLICATION_JSON, 1104 data=json.dumps(payload), 1105 ) 1106 1107 def retry_failed_submission(self, submission_id: str) -> requests.Response: 1108 """ 1109 Retry a failed submission in Terra. 1110 1111 **Args:** 1112 - submission_id (str): The ID of the submission to retry. 1113 1114 **Returns:** 1115 - requests.Response: The response from the request. 1116 """ 1117 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 1118 payload = {"retryType": "Failed"} 1119 logging.info( 1120 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 1121 ) 1122 return self.request_util.run_request( 1123 uri=url, 1124 method=POST, 1125 content_type=APPLICATION_JSON, 1126 data=json.dumps(payload) 1127 ) 1128 1129 def get_submission_status(self, submission_id: str) -> requests.Response: 1130 """ 1131 Get the status of a submission in Terra. 1132 1133 **Args:** 1134 - submission_id (str): The ID of the submission. 1135 1136 **Returns:** 1137 - requests.Response: The response from the request. 1138 """ 1139 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 1140 logging.info( 1141 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" # noqa: E501 1142 ) 1143 return self.request_util.run_request( 1144 uri=url, 1145 method=GET 1146 ) 1147 1148 def get_workspace_submission_status(self) -> requests.Response: 1149 """ 1150 Get the status of all submissions in a Terra workspace. 1151 1152 **Returns:** 1153 - requests.Response: The response from the request. 1154 """ 1155 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions" 1156 logging.info( 1157 f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}" 1158 ) 1159 return self.request_util.run_request( 1160 uri=url, 1161 method=GET 1162 ) 1163 1164 def get_workflow_status( 1165 self, 1166 submission_id: str, 1167 workflow_id: str, 1168 expand_sub_workflow_metadata: bool = False) -> requests.Response: 1169 """ 1170 Get the status of a workflow in a submission in Terra. 1171 1172 **Args:** 1173 - submission_id (str): The ID of the submission. 1174 - workflow_id (str): The ID of the workflow. 1175 - expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata. 1176 Defaults to `False`. 1177 1178 **Returns:** 1179 - requests.Response: The response from the request. 1180 """ 1181 expand_metadata = '?expandSubWorkflows=true' if expand_sub_workflow_metadata else '' 1182 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/workflows/{workflow_id}{expand_metadata}" # noqa: E501 1183 logging.info( 1184 f"Getting status for workflow: '{workflow_id}' in submission: '{submission_id}' " 1185 f"in workspace {self.billing_project}/{self.workspace_name}" 1186 ) 1187 return self.request_util.run_request( 1188 uri=url, 1189 method=GET 1190 ) 1191 1192 def get_workspace_submission_stats( 1193 self, method_name: Optional[str] = None, retrieve_running_ids: bool = True 1194 ) -> dict: 1195 """ 1196 Get submission statistics for a Terra workspace, optionally filtered by method name. 1197 1198 **Args:** 1199 - method_name (str, optional): The name of the method to filter statistics by. Defaults to None. 1200 - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running. 1201 Defaults to `True`. 1202 1203 **Returns:** 1204 - dict: A dictionary containing submission statistics, including counts of workflows in various states 1205 """ 1206 submissions = [ 1207 s 1208 for s in self.get_workspace_submission_status().json() 1209 # If method_name is provided, filter submissions to only those with that method name 1210 if (s["methodConfigurationName"] == method_name if method_name else True) 1211 ] 1212 method_append = f"with method name '{method_name}'" if method_name else "" 1213 logging.info( 1214 f"{len(submissions)} submissions in " 1215 f"{self.billing_project}/{self.workspace_name} {method_append}" 1216 ) 1217 workflow_statuses = { 1218 "submitted": 0, 1219 "queued": 0, 1220 "running": 0, 1221 "aborting": 0, 1222 "aborted": 0, 1223 "failed": 0, 1224 "succeeded": 0, 1225 "id_still_running": [] if retrieve_running_ids else "NA" 1226 } 1227 for submission in submissions: 1228 wf_status = submission["workflowStatuses"] 1229 for status, count in wf_status.items(): 1230 if status.lower() in workflow_statuses: 1231 workflow_statuses[status.lower()] += count 1232 # Only look at individual submissions if retrieve running ids set to true 1233 # and only look at submissions that are still running 1234 if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]: 1235 submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json() 1236 for workflow in submission_detailed["workflows"]: 1237 if workflow["status"] in ["Running", "Submitted", "Queued"]: 1238 entity_id = workflow["workflowEntity"]["entityName"] 1239 workflow_statuses['id_still_running'].append(entity_id) # type: ignore[attr-defined] 1240 running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued'] # type: ignore[operator] # noqa: E501 1241 if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count: # type: ignore[arg-type] # noqa: E501 1242 logging.warning( 1243 f"Discrepancy found between total running/pending workflows, {running_count}, " 1244 f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. " # type: ignore[arg-type] # noqa: E501 1245 "Workflows may have completed between API calls." 1246 ) 1247 denominator = workflow_statuses['succeeded'] + workflow_statuses['failed'] # type: ignore[operator] 1248 if denominator > 0: 1249 workflow_statuses['success_rate'] = round( 1250 workflow_statuses['succeeded'] / denominator, 1251 2 1252 ) 1253 else: 1254 workflow_statuses['success_rate'] = 0.0 1255 return workflow_statuses 1256 1257 def get_workspace_details(self, terra_google_project_id: str) -> requests.Response: 1258 """ 1259 Get details of a Terra workspace using the Google project ID. 1260 1261 **Args:** 1262 - terra_google_project_id (str): The Google project ID of the Terra workspace. 1263 1264 **Returns:** 1265 - requests.Response: The response from the request. 1266 """ 1267 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}?userProject={terra_google_project_id}" # noqa: E501 1268 logging.info( 1269 f"Getting workspace details for workspace '{self.workspace_name}' using Terra Google" 1270 f" project ID: '{terra_google_project_id}'" 1271 ) 1272 return self.request_util.run_request( 1273 uri=url, 1274 method=GET 1275 )
35class Terra: 36 """Class for generic Terra utilities.""" 37 38 def __init__(self, request_util: RunRequest, env: str = "prod"): 39 """ 40 Initialize the Terra class. 41 42 **Args:** 43 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 44 request utility class to handle HTTP requests. 45 """ 46 self.request_util = request_util 47 """@private""" 48 49 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 50 """ 51 Fetch the list of accessible workspaces. 52 53 **Args:** 54 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 55 56 **Returns:** 57 - requests.Response: The response from the request. 58 """ 59 fields_str = "fields=" + ",".join(fields) if fields else "" 60 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 61 return self.request_util.run_request( 62 uri=url, 63 method=GET 64 ) 65 66 def get_pet_account_json(self) -> requests.Response: 67 """ 68 Get the service account JSON. 69 70 **Returns:** 71 - requests.Response: The response from the request. 72 """ 73 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 74 return self.request_util.run_request( 75 uri=url, 76 method=GET 77 )
Class for generic Terra utilities.
38 def __init__(self, request_util: RunRequest, env: str = "prod"): 39 """ 40 Initialize the Terra class. 41 42 **Args:** 43 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 44 request utility class to handle HTTP requests. 45 """ 46 self.request_util = request_util 47 """@private"""
Initialize the Terra class.
Args:
- request_util (
ops_utils.request_util.RunRequest): An instance of a request utility class to handle HTTP requests.
49 def fetch_accessible_workspaces(self, fields: Optional[list[str]]) -> requests.Response: 50 """ 51 Fetch the list of accessible workspaces. 52 53 **Args:** 54 - fields (list[str], optional): A list of fields to include in the response. If None, all fields are included. 55 56 **Returns:** 57 - requests.Response: The response from the request. 58 """ 59 fields_str = "fields=" + ",".join(fields) if fields else "" 60 url = f'{RAWLS_LINK}/workspaces?{fields_str}' 61 return self.request_util.run_request( 62 uri=url, 63 method=GET 64 )
Fetch the list of accessible workspaces.
Args:
- fields (list[str], optional): A list of fields to include in the response. If None, all fields are included.
Returns:
- requests.Response: The response from the request.
66 def get_pet_account_json(self) -> requests.Response: 67 """ 68 Get the service account JSON. 69 70 **Returns:** 71 - requests.Response: The response from the request. 72 """ 73 url = f"{SAM_LINK}/google/v1/user/petServiceAccount/key" 74 return self.request_util.run_request( 75 uri=url, 76 method=GET 77 )
Get the service account JSON.
Returns:
- requests.Response: The response from the request.
80class TerraGroups: 81 """A class to manage Terra groups and their memberships.""" 82 83 GROUP_MEMBERSHIP_OPTIONS = [MEMBER, ADMIN] 84 """@private""" 85 CONFLICT_STATUS_CODE = 409 86 """@private""" 87 88 def __init__(self, request_util: RunRequest): 89 """ 90 Initialize the TerraGroups class. 91 92 **Args:** 93 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 94 utility class to handle HTTP requests. 95 """ 96 self.request_util = request_util 97 """@private""" 98 99 def _check_role(self, role: str) -> None: 100 """ 101 Check if the role is valid. 102 103 Args: 104 role (str): The role to check. 105 106 Raises: 107 ValueError: If the role is not one of the allowed options. 108 """ 109 if role not in self.GROUP_MEMBERSHIP_OPTIONS: 110 raise ValueError(f"Role must be one of {self.GROUP_MEMBERSHIP_OPTIONS}") 111 112 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 113 """ 114 Remove a user from a group. 115 116 **Args:** 117 - group (str): The name of the group. 118 - email (str): The email of the user to remove. 119 - role (str): The role of the user in the group 120 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 121 122 **Returns:** 123 - requests.Response: The response from the request. 124 """ 125 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 126 self._check_role(role) 127 logging.info(f"Removing {email} from group {group}") 128 return self.request_util.run_request( 129 uri=url, 130 method=DELETE 131 ) 132 133 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 134 """ 135 Create a new group. 136 137 **Args:** 138 - group_name (str): The name of the group to create. 139 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 140 141 **Returns:** 142 - requests.Response: The response from the request. 143 """ 144 url = f"{SAM_LINK}/groups/v1/{group_name}" 145 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 146 response = self.request_util.run_request( 147 uri=url, 148 method=POST, 149 accept_return_codes=accept_return_codes 150 ) 151 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 152 logging.info(f"Group {group_name} already exists. Continuing.") 153 return response 154 else: 155 logging.info(f"Created group {group_name}") 156 return response 157 158 def delete_group(self, group_name: str) -> requests.Response: 159 """ 160 Delete a group. 161 162 **Args:** 163 - group_name (str): The name of the group to delete. 164 165 **Returns:** 166 - requests.Response: The response from the request. 167 """ 168 url = f"{SAM_LINK}/groups/v1/{group_name}" 169 logging.info(f"Deleting group {group_name}") 170 return self.request_util.run_request( 171 uri=url, 172 method=DELETE 173 ) 174 175 def add_user_to_group( 176 self, group: str, email: str, role: str, continue_if_exists: bool = False 177 ) -> requests.Response: 178 """ 179 Add a user to a group. 180 181 **Args:** 182 - group (str): The name of the group. 183 - email (str): The email of the user to add. 184 - role (str): The role of the user in the group 185 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 186 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 187 Defaults to `False`. 188 189 **Returns:** 190 - requests.Response: The response from the request. 191 """ 192 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 193 self._check_role(role) 194 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 195 logging.info(f"Adding {email} to group {group} as {role}") 196 return self.request_util.run_request( 197 uri=url, 198 method=PUT, 199 accept_return_codes=accept_return_codes 200 ) 201 202 def check_group_members(self, group: str, role: str) -> requests.Response: 203 """ 204 Check the members of a group. 205 206 **Args:** 207 - group (str): The name of the group. 208 - role (str): The role to check for in the group 209 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 210 211 **Returns:** 212 - requests.Response: The response from the request. 213 """ 214 url = f"{SAM_LINK}/groups/v1/{group}/{role}" 215 self._check_role(role) 216 logging.info(f"Checking {role}s in group {group}") 217 return self.request_util.run_request( 218 uri=url, 219 method=GET 220 )
A class to manage Terra groups and their memberships.
88 def __init__(self, request_util: RunRequest): 89 """ 90 Initialize the TerraGroups class. 91 92 **Args:** 93 - request_util (`ops_utils.request_util.RunRequest`): An instance of a request 94 utility class to handle HTTP requests. 95 """ 96 self.request_util = request_util 97 """@private"""
Initialize the TerraGroups class.
Args:
- request_util (
ops_utils.request_util.RunRequest): An instance of a request utility class to handle HTTP requests.
112 def remove_user_from_group(self, group: str, email: str, role: str) -> requests.Response: 113 """ 114 Remove a user from a group. 115 116 **Args:** 117 - group (str): The name of the group. 118 - email (str): The email of the user to remove. 119 - role (str): The role of the user in the group 120 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 121 122 **Returns:** 123 - requests.Response: The response from the request. 124 """ 125 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 126 self._check_role(role) 127 logging.info(f"Removing {email} from group {group}") 128 return self.request_util.run_request( 129 uri=url, 130 method=DELETE 131 )
Remove a user from a group.
Args:
- group (str): The name of the group.
- email (str): The email of the user to remove.
- role (str): The role of the user in the group
(must be one of
ops_utils.terra_utils.MEMBERorops_utils.terra_utils.ADMIN).
Returns:
- requests.Response: The response from the request.
133 def create_group(self, group_name: str, continue_if_exists: bool = False) -> requests.Response: 134 """ 135 Create a new group. 136 137 **Args:** 138 - group_name (str): The name of the group to create. 139 - continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to `False`. 140 141 **Returns:** 142 - requests.Response: The response from the request. 143 """ 144 url = f"{SAM_LINK}/groups/v1/{group_name}" 145 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 146 response = self.request_util.run_request( 147 uri=url, 148 method=POST, 149 accept_return_codes=accept_return_codes 150 ) 151 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 152 logging.info(f"Group {group_name} already exists. Continuing.") 153 return response 154 else: 155 logging.info(f"Created group {group_name}") 156 return response
Create a new group.
Args:
- group_name (str): The name of the group to create.
- continue_if_exists (bool, optional): Whether to continue if the group already exists. Defaults to
False.
Returns:
- requests.Response: The response from the request.
158 def delete_group(self, group_name: str) -> requests.Response: 159 """ 160 Delete a group. 161 162 **Args:** 163 - group_name (str): The name of the group to delete. 164 165 **Returns:** 166 - requests.Response: The response from the request. 167 """ 168 url = f"{SAM_LINK}/groups/v1/{group_name}" 169 logging.info(f"Deleting group {group_name}") 170 return self.request_util.run_request( 171 uri=url, 172 method=DELETE 173 )
Delete a group.
Args:
- group_name (str): The name of the group to delete.
Returns:
- requests.Response: The response from the request.
175 def add_user_to_group( 176 self, group: str, email: str, role: str, continue_if_exists: bool = False 177 ) -> requests.Response: 178 """ 179 Add a user to a group. 180 181 **Args:** 182 - group (str): The name of the group. 183 - email (str): The email of the user to add. 184 - role (str): The role of the user in the group 185 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 186 - continue_if_exists (bool, optional): Whether to continue if the user is already in the group. 187 Defaults to `False`. 188 189 **Returns:** 190 - requests.Response: The response from the request. 191 """ 192 url = f"{SAM_LINK}/groups/v1/{group}/{role}/{email}" 193 self._check_role(role) 194 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 195 logging.info(f"Adding {email} to group {group} as {role}") 196 return self.request_util.run_request( 197 uri=url, 198 method=PUT, 199 accept_return_codes=accept_return_codes 200 )
Add a user to a group.
Args:
- group (str): The name of the group.
- email (str): The email of the user to add.
- role (str): The role of the user in the group
(must be one of
ops_utils.terra_utils.MEMBERorops_utils.terra_utils.ADMIN). - continue_if_exists (bool, optional): Whether to continue if the user is already in the group.
Defaults to
False.
Returns:
- requests.Response: The response from the request.
202 def check_group_members(self, group: str, role: str) -> requests.Response: 203 """ 204 Check the members of a group. 205 206 **Args:** 207 - group (str): The name of the group. 208 - role (str): The role to check for in the group 209 (must be one of `ops_utils.terra_utils.MEMBER` or `ops_utils.terra_utils.ADMIN`). 210 211 **Returns:** 212 - requests.Response: The response from the request. 213 """ 214 url = f"{SAM_LINK}/groups/v1/{group}/{role}" 215 self._check_role(role) 216 logging.info(f"Checking {role}s in group {group}") 217 return self.request_util.run_request( 218 uri=url, 219 method=GET 220 )
Check the members of a group.
Args:
- group (str): The name of the group.
- role (str): The role to check for in the group
(must be one of
ops_utils.terra_utils.MEMBERorops_utils.terra_utils.ADMIN).
Returns:
- requests.Response: The response from the request.
223class TerraWorkspace: 224 """Terra workspace class to manage workspaces and their attributes.""" 225 226 CONFLICT_STATUS_CODE = 409 227 """@private""" 228 229 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"): 230 """ 231 Initialize the TerraWorkspace class. 232 233 **Args:** 234 - billing_project (str): The billing project associated with the workspace. 235 - workspace_name (str): The name of the workspace. 236 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 237 request utility class to handle HTTP requests. 238 """ 239 self.billing_project = billing_project 240 """@private""" 241 self.workspace_name = workspace_name 242 """@private""" 243 self.workspace_id = None 244 """@private""" 245 self.resource_id = None 246 """@private""" 247 self.storage_container = None 248 """@private""" 249 self.bucket = None 250 """@private""" 251 self.wds_url = None 252 """@private""" 253 self.account_url: Optional[str] = None 254 """@private""" 255 self.request_util = request_util 256 """@private""" 257 if env.lower() == "dev": 258 self.terra_link = TERRA_DEV_LINK 259 """@private""" 260 elif env.lower() == "prod": 261 self.terra_link = TERRA_PROD_LINK 262 """@private""" 263 else: 264 raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.") 265 266 def __repr__(self) -> str: 267 """ 268 Return a string representation of the TerraWorkspace instance. 269 270 Returns: 271 str: The string representation of the TerraWorkspace instance. 272 """ 273 return f"{self.billing_project}/{self.workspace_name}" 274 275 def _yield_all_entity_metrics(self, entity: str, total_entities_per_page: int = 40000, verbose = True) -> Any: 276 """ 277 Yield all entity metrics from the workspace. 278 279 Args: 280 entity (str): The type of entity to query. 281 total_entities_per_page (int, optional): The number of entities per page. Defaults to 40000. 282 verbose (bool): If True, will log the progress of fetching entity metrics. Defaults to True. 283 284 Yields: 285 Any: The JSON response containing entity metrics. 286 """ 287 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityQuery/{entity}?pageSize={total_entities_per_page}" # noqa: E501 288 response = self.request_util.run_request( 289 uri=url, 290 method=GET, 291 content_type=APPLICATION_JSON 292 ) 293 raw_text = response.text 294 first_page_json = json.loads( 295 raw_text, 296 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 297 ) 298 yield first_page_json 299 total_pages = first_page_json["resultMetadata"]["filteredPageCount"] 300 if verbose: 301 logging.info( 302 f"Looping through {total_pages} pages of data") 303 304 for page in range(2, total_pages + 1): 305 if verbose: 306 logging.info(f"Getting page {page} of {total_pages}") 307 next_page = self.request_util.run_request( 308 uri=url, 309 method=GET, 310 content_type=APPLICATION_JSON, 311 params={"page": page} 312 ) 313 raw_text = next_page.text 314 page_json = json.loads( 315 raw_text, 316 parse_float=lambda x: int(float(x)) if float(x).is_integer() else float(x) 317 ) 318 yield page_json 319 320 @staticmethod 321 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 322 """Check that all headers follow the standards required by TDR. 323 324 **Args:** 325 - table_name (str): The name of the Terra table. 326 - headers (list[str]): The headers of the Terra table to validate. 327 328 **Raises:** 329 - ValueError if any headers are considered invalid by TDR standards 330 """ 331 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 332 tdr_max_header_length = 63 333 334 headers_containing_too_many_characters = [] 335 headers_contain_invalid_characters = [] 336 337 for header in headers: 338 if len(header) > tdr_max_header_length: 339 headers_containing_too_many_characters.append(header) 340 if not re.match(tdr_header_allowed_pattern, header): 341 headers_contain_invalid_characters.append(header) 342 343 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 344 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 345 header naming.""" 346 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 347 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 348 allowed in TDR is {tdr_max_header_length}.\n""" 349 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 350 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 351 only contain numbers, letters, and underscore characters.\n""" 352 353 error_to_report = "" 354 if headers_containing_too_many_characters: 355 error_to_report += too_many_characters_error_message 356 if headers_contain_invalid_characters: 357 error_to_report += invalid_characters_error_message 358 if error_to_report: 359 error_to_report += base_error_message 360 raise ValueError(error_to_report) 361 362 def get_workspace_info(self) -> requests.Response: 363 """ 364 Get workspace information. 365 366 **Returns:** 367 - requests.Response: The response from the request. 368 """ 369 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}" 370 logging.info( 371 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 372 return self.request_util.run_request(uri=url, method=GET) 373 374 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]: 375 """ 376 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 377 378 **Args:** 379 - entity_type (str): The type of entity to get metrics for. 380 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 381 - verbose (bool, optional): Whether to log verbose output. Defaults to `True`. 382 383 **Returns:** 384 - list[dict]: A list of dictionaries containing entity metrics. 385 """ 386 results = [] 387 if verbose: 388 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 389 390 for page in self._yield_all_entity_metrics(entity=entity_type, verbose=verbose): 391 results.extend(page["results"]) 392 393 # If remove_dicts is True, remove dictionaries from the workspace metrics 394 if remove_dicts: 395 for row in results: 396 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 397 return results 398 399 def get_flat_list_of_table_entity(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]: 400 """ 401 Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add 402 the entity name to the dictionary with key "{entity_type}_id". 403 404 **Args:** 405 - entity_type (str): The type of entity to get metrics for. 406 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 407 - verbose (bool, optional): Whether to log verbose output. Defaults to `True`. 408 409 **Returns:** 410 - list[dict]: A list of dictionaries containing entity metrics. 411 """ 412 table_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type, remove_dicts=remove_dicts, verbose=verbose) 413 convert_metrics = [] 414 for row in table_metrics: 415 converted_row = row['attributes'] 416 converted_row[f"{row['entityType']}_id"] = row['name'] 417 convert_metrics.append(converted_row) 418 return convert_metrics 419 420 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 421 """ 422 Get specific entity metrics for a given entity type and name. 423 424 **Args:** 425 - entity_type (str): The type of entity to get metrics for. 426 - entity_name (str): The name of the entity to get metrics for. 427 428 **Returns:** 429 - requests.Response: The response from the request. 430 """ 431 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" # noqa: E501 432 return self.request_util.run_request(uri=url, method=GET) 433 434 def _remove_dict_from_attributes(self, attributes: dict) -> dict: 435 """ 436 Remove dictionaries from the attributes. 437 438 Args: 439 attributes (dict): The attributes to remove dictionaries from. 440 441 Returns: 442 dict: The updated attributes with no dictionaries. 443 """ 444 for key, value in attributes.items(): 445 attributes[key] = self._remove_dict_from_cell(value) 446 return attributes 447 448 def _remove_dict_from_cell(self, cell_value: Any) -> Any: 449 """ 450 Remove a dictionary from a cell. 451 452 Args: 453 cell_value (Any): The dictionary to remove. 454 455 Returns: 456 Any: The updated cell with no dictionaries. 457 """ 458 if isinstance(cell_value, dict): 459 entity_name = cell_value.get("entityName") 460 # If the cell value is a dictionary, check if it has an entityName key 461 if entity_name: 462 # If the cell value is a dictionary with an entityName key, return the entityName 463 return entity_name 464 entity_list = cell_value.get("items") 465 if entity_list or entity_list == []: 466 # If the cell value is a list of dictionaries, recursively call this function on each dictionary 467 return [ 468 self._remove_dict_from_cell(entity) for entity in entity_list 469 ] 470 return cell_value 471 return cell_value 472 473 def get_workspace_bucket(self) -> str: 474 """ 475 Get the workspace bucket name. Does not include the `gs://` prefix. 476 477 **Returns:** 478 - str: The bucket name. 479 """ 480 return self.get_workspace_info().json()["workspace"]["bucketName"] 481 482 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 483 """ 484 Get workspace entity information. 485 486 **Args:** 487 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 488 489 **Returns:** 490 - requests.Response: The response from the request. 491 """ 492 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 493 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 494 return self.request_util.run_request(uri=url, method=GET) 495 496 def get_workspace_acl(self) -> requests.Response: 497 """ 498 Get the workspace access control list (ACL). 499 500 **Returns:** 501 - requests.Response: The response from the request. 502 """ 503 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 504 return self.request_util.run_request( 505 uri=url, 506 method=GET 507 ) 508 509 def update_user_acl( 510 self, 511 email: str, 512 access_level: str, 513 can_share: bool = False, 514 can_compute: bool = False, 515 invite_users_not_found: bool = False, 516 ) -> requests.Response: 517 """ 518 Update the access control list (ACL) for a user in the workspace. 519 520 **Args:** 521 - email (str): The email of the user. 522 - access_level (str): The access level to grant to the user. 523 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 524 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 525 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 526 the workspace. Defaults to `False` 527 528 **Returns:** 529 - requests.Response: The response from the request. 530 """ 531 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 532 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 533 payload = { 534 "email": email, 535 "accessLevel": access_level, 536 "canShare": can_share, 537 "canCompute": can_compute, 538 } 539 logging.info( 540 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 541 response = self.request_util.run_request( 542 uri=url, 543 method=PATCH, 544 content_type=APPLICATION_JSON, 545 data="[" + json.dumps(payload) + "]" 546 ) 547 548 if response.json()["usersNotFound"] and not invite_users_not_found: 549 # Will be a list of one user 550 user_not_found = response.json()["usersNotFound"][0] 551 raise Exception( 552 f'The user {user_not_found["email"]} was not found and access was not updated' 553 ) 554 return response 555 556 @deprecated( 557 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 558 ) 559 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 560 """ 561 Update the metadata for a library dataset. 562 563 **Args:** 564 - library_metadata (dict): The metadata to update. 565 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 566 567 **Returns:** 568 - requests.Response: The response from the request. 569 """ 570 acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \ 571 f"/metadata?validate={str(validate).lower()}" 572 return self.request_util.run_request( 573 uri=acl, 574 method=PUT, 575 data=json.dumps(library_metadata) 576 ) 577 578 def update_multiple_users_acl( 579 self, acl_list: list[dict], invite_users_not_found: bool = False 580 ) -> requests.Response: 581 """ 582 Update the access control list (ACL) for multiple users in the workspace. 583 584 **Args:** 585 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 586 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 587 the workspace. Defaults to `False` 588 589 **Returns:** 590 - requests.Response: The response from the request. 591 """ 592 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 593 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 594 logging.info( 595 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 596 response = self.request_util.run_request( 597 uri=url, 598 method=PATCH, 599 content_type=APPLICATION_JSON, 600 data=json.dumps(acl_list) 601 ) 602 603 if response.json()["usersNotFound"] and not invite_users_not_found: 604 # Will be a list of one user 605 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 606 raise Exception( 607 f"The following users were not found and access was not updated: {users_not_found}" 608 ) 609 return response 610 611 def create_workspace( 612 self, 613 auth_domain: list[dict] = [], 614 attributes: dict = {}, 615 continue_if_exists: bool = False, 616 ) -> requests.Response: 617 """ 618 Create a new workspace in Terra. 619 620 **Args:** 621 - auth_domain (list[dict], optional): A list of authorization domains. Should look 622 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 623 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 624 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 625 626 **Returns:** 627 - requests.Response: The response from the request. 628 """ 629 payload = { 630 "namespace": self.billing_project, 631 "name": self.workspace_name, 632 "authorizationDomain": auth_domain, 633 "attributes": attributes, 634 "cloudPlatform": GCP 635 } 636 # If workspace already exists then continue if exists 637 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 638 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 639 response = self.request_util.run_request( 640 uri=f"{self.terra_link}/workspaces", 641 method=POST, 642 content_type=APPLICATION_JSON, 643 data=json.dumps(payload), 644 accept_return_codes=accept_return_codes 645 ) 646 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 647 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 648 return response 649 650 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 651 """ 652 Create an ingest dictionary for workspace attributes. 653 654 **Args:** 655 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 656 657 **Returns:** 658 - list[dict]: A list of dictionaries containing the workspace attributes. 659 """ 660 # If not provided then call API to get it 661 workspace_attributes = ( 662 workspace_attributes if workspace_attributes 663 else self.get_workspace_info().json()["workspace"]["attributes"] 664 ) 665 666 ingest_dict = [] 667 for key, value in workspace_attributes.items(): 668 # If value is dict just use 'items' as value 669 if isinstance(value, dict): 670 value = value.get("items") 671 # If value is list convert to comma separated string 672 if isinstance(value, list): 673 value = ", ".join(value) 674 ingest_dict.append( 675 { 676 "attribute": key, 677 "value": str(value) if value else None 678 } 679 ) 680 return ingest_dict 681 682 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 683 """ 684 Upload metadata to the workspace table. 685 686 **Args:** 687 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 688 689 **Returns:** 690 - requests.Response: The response from the request. 691 """ 692 endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 693 data = {"entities": open(entities_tsv, "rb")} 694 return self.request_util.upload_file( 695 uri=endpoint, 696 data=data 697 ) 698 699 def _batch_upsert(self, update_dict: list) -> requests.Response: 700 """ 701 Run batch upsert on workspace table. 702 703 **Args:** 704 - update_dict (dict): A dictionary to update the workspace table. 705 **Returns:** 706 - requests.Response: The response from the request. 707 """ 708 endpoint = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/entities/batchUpsert" 709 return self.request_util.run_request( 710 uri=endpoint, 711 data=json.dumps(update_dict), 712 content_type=APPLICATION_JSON, 713 method=POST 714 ) 715 716 def upload_metadata_with_batch_upsert(self, table_data: dict, force: bool = False) -> requests.Response: 717 """ 718 Upload metadata to one or more workspace entity tables using batch upsert. 719 720 Builds the Terra batch upsert payload from a structured input dictionary and calls 721 `batch_upsert` with the result. 722 723 **Args:** 724 - table_data (dict): A dictionary mapping table names to their data configuration. 725 Each entry should have the following structure: 726 727 ```python 728 { 729 "table_name": { 730 "table_id_column": "column_that_is_the_entity_id", 731 "row_data": [ 732 {"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...}, 733 ... 734 ] 735 }, 736 ... 737 } 738 ``` 739 740 - `table_id_column`: The name of the column whose value is used as the entity name 741 (`name` field in the upsert payload). This column is **not** included as an attribute 742 operation. 743 - `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes 744 an `AddUpdateAttribute` operation. 745 - force (bool, optional): Whether to force update if id column does not match table name + _id. 746 747 **Returns:** 748 - requests.Response: The response from the request. 749 """ 750 upsert_payload = [] 751 table_name_failures = [] 752 for table_name, config in table_data.items(): 753 id_column = config["table_id_column"] 754 if id_column != f"{table_name}_id": 755 table_name_failures.append( 756 f"id column, {id_column}, does not match table {table_name}. This column will be renamed to {table_name}_id." 757 "Use force=True to force update." 758 ) 759 rows = config["row_data"] 760 for row in rows: 761 entity_name = row.get(id_column) 762 if entity_name is None: 763 raise Exception(f"Primary key column '{id_column}' is missing from row data - {row}") 764 operations = [ 765 { 766 "op": "AddUpdateAttribute", 767 "attributeName": col, 768 "addUpdateAttribute": value, 769 } 770 for col, value in row.items() 771 if col != id_column 772 ] 773 upsert_payload.append( 774 { 775 "name": entity_name, 776 "entityType": table_name, 777 "operations": operations, 778 } 779 ) 780 if table_name_failures: 781 for message in table_name_failures: 782 if force: 783 logging.warning(message) 784 else: 785 logging.error(message) 786 if not force: 787 raise Exception("One or more tables have id columns that do not match the expected format." 788 " See error messages above for details. Use force=True to force update.") 789 return self._batch_upsert(upsert_payload) 790 791 def get_workspace_workflows(self) -> requests.Response: 792 """ 793 Get the workflows for the workspace. 794 795 **Returns:** 796 - requests.Response: The response from the request. 797 """ 798 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 799 return self.request_util.run_request( 800 uri=uri, 801 method=GET 802 ) 803 804 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 805 """ 806 Import a workflow into the workspace. 807 808 **Args:** 809 - workflow_dict (dict): The dictionary containing the workflow information. 810 - continue_if_exists (bool, optional): Whether to continue if the workflow 811 already exists. Defaults to `False`. 812 813 **Returns:** 814 - requests.Response: The response from the request. 815 """ 816 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 817 workflow_json = json.dumps(workflow_dict) 818 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 819 return self.request_util.run_request( 820 uri=uri, 821 method=POST, 822 data=workflow_json, 823 content_type=APPLICATION_JSON, 824 accept_return_codes=accept_return_codes 825 ) 826 827 def delete_workspace(self) -> requests.Response: 828 """ 829 Delete a Terra workspace. 830 831 **Returns:** 832 - requests.Response: The response from the request. 833 """ 834 return self.request_util.run_request( 835 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}", 836 method=DELETE 837 ) 838 839 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 840 """ 841 Update the attributes for the workspace. 842 843 **Args:** 844 - attributes (dict): The attributes to update. 845 846 **Returns:** 847 - requests.Response: The response from the request. 848 """ 849 return self.request_util.run_request( 850 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 851 method=PATCH, 852 data=json.dumps(attributes), 853 content_type=APPLICATION_JSON 854 ) 855 856 def leave_workspace( 857 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 858 ) -> requests.Response: 859 """ 860 Leave a workspace. If workspace ID not supplied, will look it up. 861 862 **Args:** 863 - workspace_id (str, optional): The workspace ID. Defaults to None. 864 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 865 Defaults to `False`. 866 867 **Returns:** 868 - requests.Response: The response from the request. 869 """ 870 if not workspace_id: 871 workspace_info = self.get_workspace_info().json() 872 workspace_id = workspace_info['workspace']['workspaceId'] 873 accepted_return_code = [403] if ignore_direct_access_error else [] 874 875 res = self.request_util.run_request( 876 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 877 method=DELETE, 878 accept_return_codes=accepted_return_code 879 ) 880 if (res.status_code == 403 881 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 882 logging.info( 883 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 884 f"access to the workspace (they could be an owner on the billing project)" 885 ) 886 return res 887 888 def set_table_column_order(self, column_order: dict) -> requests.Response: 889 """ 890 Set the column order for one or more entity tables in the workspace. 891 892 **Args:** 893 - column_order (dict): A dictionary mapping table names to their column configuration. Each table entry 894 should have the following structure: 895 896 ``` 897 { 898 "table_name": { 899 "shown": ["col1", "col2", ...], # Columns to display, in order 900 "hidden": ["col3", "col4", ...] # Columns to hide 901 }, 902 ... 903 } 904 ``` 905 906 **Returns:** 907 - requests.Response: The response from the request. 908 """ 909 logging.info( 910 f"Setting column order for tables in workspace {self.billing_project}/{self.workspace_name}" 911 ) 912 return self.update_workspace_attributes( 913 attributes=[ 914 { 915 "op": "AddUpdateAttribute", 916 "attributeName": "workspace-column-defaults", 917 "addUpdateAttribute": json.dumps(column_order) 918 } 919 ] 920 ) 921 922 def change_workspace_public_setting(self, public: bool) -> requests.Response: 923 """ 924 Change a workspace's public setting. 925 926 **Args:** 927 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 928 public, `False` otherwise. 929 930 **Returns:** 931 - requests.Response: The response from the request. 932 """ 933 body = [ 934 { 935 "settingType": "PubliclyReadable", 936 "config": { 937 "enabled": public 938 } 939 } 940 ] 941 return self.request_util.run_request( 942 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 943 method=PUT, 944 content_type=APPLICATION_JSON, 945 data=json.dumps(body) 946 ) 947 948 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 949 """ 950 Check if a workspace is public. 951 952 **Args:** 953 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 954 it up if not provided. Defaults to None. 955 956 **Returns:** 957 - requests.Response: The response from the request. 958 """ 959 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 960 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 961 return self.request_util.run_request( 962 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 963 method=GET 964 ) 965 966 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 967 """Delete an entire entity table from a Terra workspace. 968 969 **Args:** 970 - entity_to_delete (str): The name of the entity table to delete. 971 972 **Returns:** 973 - requests.Response: The response from the request. 974 """ 975 response = self.request_util.run_request( 976 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", # noqa: E501 977 method=DELETE 978 ) 979 if response.status_code == 204: 980 logging.info( 981 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 982 f"'{self.billing_project}/{self.workspace_name}'" 983 ) 984 else: 985 logging.error( 986 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 987 f"table: {response.text}" 988 ) 989 return response 990 991 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 992 """Save an entity table version in a Terra workspace. 993 994 **Args:** 995 - entity_type (str): The name of the entity table to save a new version for 996 - version_name (str): The name of the new version 997 """ 998 # Get the workspace metrics 999 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 1000 file_name = f"{entity_type}.json" 1001 # Write the workspace metrics to a JSON file 1002 with open(file_name, "w") as json_file: 1003 json.dump(workspace_metrics, json_file) 1004 1005 # Create a zip file with the same naming convention that Terra backend uses 1006 timestamp_ms = int(time.time() * 1000) 1007 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 1008 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 1009 zipf.write(file_name, arcname=f"json/{file_name}") 1010 1011 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 1012 workspace_info = self.get_workspace_info().json() 1013 path_to_upload_to = os.path.join( 1014 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 1015 ) 1016 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 1017 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 1018 try: 1019 active_account = gcp_util.get_active_gcloud_account() 1020 except Exception as e: 1021 active_account = workspace_info["workspace"]["createdBy"] 1022 logging.error( 1023 f"Encountered the following exception while attempting to get current GCP account: {e}. " 1024 f"Will set the owner of the new metadata version as the workspace creator instead." 1025 ) 1026 gcp_util.upload_blob( 1027 source_file=zip_file_name, 1028 destination_path=path_to_upload_to, 1029 custom_metadata={ 1030 "createdBy": active_account, 1031 "entityType": entity_type, 1032 "timestamp": timestamp_ms, 1033 "description": version_name, 1034 } 1035 ) 1036 1037 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 1038 """ 1039 Add a user comment to a submission in Terra. 1040 1041 **Args:** 1042 - submission_id (str): The ID of the submission to add a comment to. 1043 - user_comment (str): The comment to add to the submission. 1044 1045 **Returns:** 1046 - requests.Response: The response from the request. 1047 """ 1048 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 1049 return self.request_util.run_request( 1050 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 1051 method=PATCH, 1052 content_type=APPLICATION_JSON, 1053 data=json.dumps({"userComment": user_comment}), 1054 ) 1055 1056 def initiate_submission( 1057 self, 1058 method_config_namespace: str, 1059 method_config_name: str, 1060 entity_type: str, 1061 entity_name: str, 1062 expression: str, 1063 user_comment: Optional[str], 1064 use_call_cache: bool = True 1065 ) -> requests.Response: 1066 """ 1067 Initiate a submission within a Terra workspace. 1068 1069 Note - the workflow being initiated MUST already be imported into the workspace. 1070 1071 **Args:** 1072 - method_config_namespace (str): The namespace of the method configuration. 1073 - method_config_name (str): The name of the method configuration to use for the submission 1074 (i.e. the workflow name). 1075 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 1076 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 1077 "sample_set_1"). 1078 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 1079 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 1080 be launched PER SAMPLE, the expression should be `this.samples`. 1081 - user_comment (str, optional): The user comment to add to the submission. 1082 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 1083 1084 **Returns:** 1085 - requests.Response: The response from the request. 1086 """ 1087 payload = { 1088 "methodConfigurationNamespace": method_config_namespace, 1089 "methodConfigurationName": method_config_name, 1090 "entityType": entity_type, 1091 "entityName": entity_name, 1092 "expression": expression, 1093 "useCallCache": use_call_cache, 1094 "deleteIntermediateOutputFiles": False, 1095 "useReferenceDisks": False, 1096 "ignoreEmptyOutputs": False, 1097 } 1098 if user_comment: 1099 payload["userComment"] = user_comment 1100 1101 return self.request_util.run_request( 1102 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 1103 method=POST, 1104 content_type=APPLICATION_JSON, 1105 data=json.dumps(payload), 1106 ) 1107 1108 def retry_failed_submission(self, submission_id: str) -> requests.Response: 1109 """ 1110 Retry a failed submission in Terra. 1111 1112 **Args:** 1113 - submission_id (str): The ID of the submission to retry. 1114 1115 **Returns:** 1116 - requests.Response: The response from the request. 1117 """ 1118 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 1119 payload = {"retryType": "Failed"} 1120 logging.info( 1121 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 1122 ) 1123 return self.request_util.run_request( 1124 uri=url, 1125 method=POST, 1126 content_type=APPLICATION_JSON, 1127 data=json.dumps(payload) 1128 ) 1129 1130 def get_submission_status(self, submission_id: str) -> requests.Response: 1131 """ 1132 Get the status of a submission in Terra. 1133 1134 **Args:** 1135 - submission_id (str): The ID of the submission. 1136 1137 **Returns:** 1138 - requests.Response: The response from the request. 1139 """ 1140 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 1141 logging.info( 1142 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" # noqa: E501 1143 ) 1144 return self.request_util.run_request( 1145 uri=url, 1146 method=GET 1147 ) 1148 1149 def get_workspace_submission_status(self) -> requests.Response: 1150 """ 1151 Get the status of all submissions in a Terra workspace. 1152 1153 **Returns:** 1154 - requests.Response: The response from the request. 1155 """ 1156 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions" 1157 logging.info( 1158 f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}" 1159 ) 1160 return self.request_util.run_request( 1161 uri=url, 1162 method=GET 1163 ) 1164 1165 def get_workflow_status( 1166 self, 1167 submission_id: str, 1168 workflow_id: str, 1169 expand_sub_workflow_metadata: bool = False) -> requests.Response: 1170 """ 1171 Get the status of a workflow in a submission in Terra. 1172 1173 **Args:** 1174 - submission_id (str): The ID of the submission. 1175 - workflow_id (str): The ID of the workflow. 1176 - expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata. 1177 Defaults to `False`. 1178 1179 **Returns:** 1180 - requests.Response: The response from the request. 1181 """ 1182 expand_metadata = '?expandSubWorkflows=true' if expand_sub_workflow_metadata else '' 1183 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/workflows/{workflow_id}{expand_metadata}" # noqa: E501 1184 logging.info( 1185 f"Getting status for workflow: '{workflow_id}' in submission: '{submission_id}' " 1186 f"in workspace {self.billing_project}/{self.workspace_name}" 1187 ) 1188 return self.request_util.run_request( 1189 uri=url, 1190 method=GET 1191 ) 1192 1193 def get_workspace_submission_stats( 1194 self, method_name: Optional[str] = None, retrieve_running_ids: bool = True 1195 ) -> dict: 1196 """ 1197 Get submission statistics for a Terra workspace, optionally filtered by method name. 1198 1199 **Args:** 1200 - method_name (str, optional): The name of the method to filter statistics by. Defaults to None. 1201 - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running. 1202 Defaults to `True`. 1203 1204 **Returns:** 1205 - dict: A dictionary containing submission statistics, including counts of workflows in various states 1206 """ 1207 submissions = [ 1208 s 1209 for s in self.get_workspace_submission_status().json() 1210 # If method_name is provided, filter submissions to only those with that method name 1211 if (s["methodConfigurationName"] == method_name if method_name else True) 1212 ] 1213 method_append = f"with method name '{method_name}'" if method_name else "" 1214 logging.info( 1215 f"{len(submissions)} submissions in " 1216 f"{self.billing_project}/{self.workspace_name} {method_append}" 1217 ) 1218 workflow_statuses = { 1219 "submitted": 0, 1220 "queued": 0, 1221 "running": 0, 1222 "aborting": 0, 1223 "aborted": 0, 1224 "failed": 0, 1225 "succeeded": 0, 1226 "id_still_running": [] if retrieve_running_ids else "NA" 1227 } 1228 for submission in submissions: 1229 wf_status = submission["workflowStatuses"] 1230 for status, count in wf_status.items(): 1231 if status.lower() in workflow_statuses: 1232 workflow_statuses[status.lower()] += count 1233 # Only look at individual submissions if retrieve running ids set to true 1234 # and only look at submissions that are still running 1235 if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]: 1236 submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json() 1237 for workflow in submission_detailed["workflows"]: 1238 if workflow["status"] in ["Running", "Submitted", "Queued"]: 1239 entity_id = workflow["workflowEntity"]["entityName"] 1240 workflow_statuses['id_still_running'].append(entity_id) # type: ignore[attr-defined] 1241 running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued'] # type: ignore[operator] # noqa: E501 1242 if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count: # type: ignore[arg-type] # noqa: E501 1243 logging.warning( 1244 f"Discrepancy found between total running/pending workflows, {running_count}, " 1245 f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. " # type: ignore[arg-type] # noqa: E501 1246 "Workflows may have completed between API calls." 1247 ) 1248 denominator = workflow_statuses['succeeded'] + workflow_statuses['failed'] # type: ignore[operator] 1249 if denominator > 0: 1250 workflow_statuses['success_rate'] = round( 1251 workflow_statuses['succeeded'] / denominator, 1252 2 1253 ) 1254 else: 1255 workflow_statuses['success_rate'] = 0.0 1256 return workflow_statuses 1257 1258 def get_workspace_details(self, terra_google_project_id: str) -> requests.Response: 1259 """ 1260 Get details of a Terra workspace using the Google project ID. 1261 1262 **Args:** 1263 - terra_google_project_id (str): The Google project ID of the Terra workspace. 1264 1265 **Returns:** 1266 - requests.Response: The response from the request. 1267 """ 1268 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}?userProject={terra_google_project_id}" # noqa: E501 1269 logging.info( 1270 f"Getting workspace details for workspace '{self.workspace_name}' using Terra Google" 1271 f" project ID: '{terra_google_project_id}'" 1272 ) 1273 return self.request_util.run_request( 1274 uri=url, 1275 method=GET 1276 )
Terra workspace class to manage workspaces and their attributes.
229 def __init__(self, billing_project: str, workspace_name: str, request_util: RunRequest, env: str = "prod"): 230 """ 231 Initialize the TerraWorkspace class. 232 233 **Args:** 234 - billing_project (str): The billing project associated with the workspace. 235 - workspace_name (str): The name of the workspace. 236 - request_util (`ops_utils.request_util.RunRequest`): An instance of a 237 request utility class to handle HTTP requests. 238 """ 239 self.billing_project = billing_project 240 """@private""" 241 self.workspace_name = workspace_name 242 """@private""" 243 self.workspace_id = None 244 """@private""" 245 self.resource_id = None 246 """@private""" 247 self.storage_container = None 248 """@private""" 249 self.bucket = None 250 """@private""" 251 self.wds_url = None 252 """@private""" 253 self.account_url: Optional[str] = None 254 """@private""" 255 self.request_util = request_util 256 """@private""" 257 if env.lower() == "dev": 258 self.terra_link = TERRA_DEV_LINK 259 """@private""" 260 elif env.lower() == "prod": 261 self.terra_link = TERRA_PROD_LINK 262 """@private""" 263 else: 264 raise ValueError(f"Invalid environment: {env}. Must be 'dev' or 'prod'.")
Initialize the TerraWorkspace class.
Args:
- billing_project (str): The billing project associated with the workspace.
- workspace_name (str): The name of the workspace.
- request_util (
ops_utils.request_util.RunRequest): An instance of a request utility class to handle HTTP requests.
320 @staticmethod 321 def validate_terra_headers_for_tdr_conversion(table_name: str, headers: list[str]) -> None: 322 """Check that all headers follow the standards required by TDR. 323 324 **Args:** 325 - table_name (str): The name of the Terra table. 326 - headers (list[str]): The headers of the Terra table to validate. 327 328 **Raises:** 329 - ValueError if any headers are considered invalid by TDR standards 330 """ 331 tdr_header_allowed_pattern = "^[a-zA-Z][_a-zA-Z0-9]*$" 332 tdr_max_header_length = 63 333 334 headers_containing_too_many_characters = [] 335 headers_contain_invalid_characters = [] 336 337 for header in headers: 338 if len(header) > tdr_max_header_length: 339 headers_containing_too_many_characters.append(header) 340 if not re.match(tdr_header_allowed_pattern, header): 341 headers_contain_invalid_characters.append(header) 342 343 base_error_message = """In order to proceed, please update the problematic header(s) in you Terra table, 344 and then re-attempt the import once all problematic header(s) have been updated to follow TDR rules for 345 header naming.""" 346 too_many_characters_error_message = f"""The following header(s) in table "{table_name}" contain too many 347 characters: "{', '.join(headers_containing_too_many_characters)}". The max number of characters for a header 348 allowed in TDR is {tdr_max_header_length}.\n""" 349 invalid_characters_error_message = f"""The following header(s) in table "{table_name}" contain invalid 350 characters: "{', '.join(headers_contain_invalid_characters)}". TDR headers must start with a letter, and must 351 only contain numbers, letters, and underscore characters.\n""" 352 353 error_to_report = "" 354 if headers_containing_too_many_characters: 355 error_to_report += too_many_characters_error_message 356 if headers_contain_invalid_characters: 357 error_to_report += invalid_characters_error_message 358 if error_to_report: 359 error_to_report += base_error_message 360 raise ValueError(error_to_report)
Check that all headers follow the standards required by TDR.
Args:
- table_name (str): The name of the Terra table.
- headers (list[str]): The headers of the Terra table to validate.
Raises:
- ValueError if any headers are considered invalid by TDR standards
362 def get_workspace_info(self) -> requests.Response: 363 """ 364 Get workspace information. 365 366 **Returns:** 367 - requests.Response: The response from the request. 368 """ 369 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}" 370 logging.info( 371 f"Getting workspace info for {self.billing_project}/{self.workspace_name}") 372 return self.request_util.run_request(uri=url, method=GET)
Get workspace information.
Returns:
- requests.Response: The response from the request.
374 def get_gcp_workspace_metrics(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]: 375 """ 376 Get metrics for a specific entity type in the workspace (specifically for Terra on GCP). 377 378 **Args:** 379 - entity_type (str): The type of entity to get metrics for. 380 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 381 - verbose (bool, optional): Whether to log verbose output. Defaults to `True`. 382 383 **Returns:** 384 - list[dict]: A list of dictionaries containing entity metrics. 385 """ 386 results = [] 387 if verbose: 388 logging.info(f"Getting {entity_type} metadata for {self.billing_project}/{self.workspace_name}") 389 390 for page in self._yield_all_entity_metrics(entity=entity_type, verbose=verbose): 391 results.extend(page["results"]) 392 393 # If remove_dicts is True, remove dictionaries from the workspace metrics 394 if remove_dicts: 395 for row in results: 396 row['attributes'] = self._remove_dict_from_attributes(row['attributes']) 397 return results
Get metrics for a specific entity type in the workspace (specifically for Terra on GCP).
Args:
- entity_type (str): The type of entity to get metrics for.
- remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to
False. - verbose (bool, optional): Whether to log verbose output. Defaults to
True.
Returns:
- list[dict]: A list of dictionaries containing entity metrics.
399 def get_flat_list_of_table_entity(self, entity_type: str, remove_dicts: bool = False, verbose = True) -> list[dict]: 400 """ 401 Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add 402 the entity name to the dictionary with key "{entity_type}_id". 403 404 **Args:** 405 - entity_type (str): The type of entity to get metrics for. 406 - remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to `False`. 407 - verbose (bool, optional): Whether to log verbose output. Defaults to `True`. 408 409 **Returns:** 410 - list[dict]: A list of dictionaries containing entity metrics. 411 """ 412 table_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type, remove_dicts=remove_dicts, verbose=verbose) 413 convert_metrics = [] 414 for row in table_metrics: 415 converted_row = row['attributes'] 416 converted_row[f"{row['entityType']}_id"] = row['name'] 417 convert_metrics.append(converted_row) 418 return convert_metrics
Convert metrics returned by get_gcp_workspace_metrics to a flat list of dictionaries and add the entity name to the dictionary with key "{entity_type}_id".
Args:
- entity_type (str): The type of entity to get metrics for.
- remove_dicts (bool, optional): Whether to remove dictionaries from the workspace metrics. Defaults to
False. - verbose (bool, optional): Whether to log verbose output. Defaults to
True.
Returns:
- list[dict]: A list of dictionaries containing entity metrics.
420 def get_specific_entity_metrics(self, entity_type: str, entity_name: str) -> requests.Response: 421 """ 422 Get specific entity metrics for a given entity type and name. 423 424 **Args:** 425 - entity_type (str): The type of entity to get metrics for. 426 - entity_name (str): The name of the entity to get metrics for. 427 428 **Returns:** 429 - requests.Response: The response from the request. 430 """ 431 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities/{entity_type}/{entity_name}" # noqa: E501 432 return self.request_util.run_request(uri=url, method=GET)
Get specific entity metrics for a given entity type and name.
Args:
- entity_type (str): The type of entity to get metrics for.
- entity_name (str): The name of the entity to get metrics for.
Returns:
- requests.Response: The response from the request.
473 def get_workspace_bucket(self) -> str: 474 """ 475 Get the workspace bucket name. Does not include the `gs://` prefix. 476 477 **Returns:** 478 - str: The bucket name. 479 """ 480 return self.get_workspace_info().json()["workspace"]["bucketName"]
Get the workspace bucket name. Does not include the gs:// prefix.
Returns:
- str: The bucket name.
482 def get_workspace_entity_info(self, use_cache: bool = True) -> requests.Response: 483 """ 484 Get workspace entity information. 485 486 **Args:** 487 - use_cache (bool, optional): Whether to use cache. Defaults to `True`. 488 489 **Returns:** 490 - requests.Response: The response from the request. 491 """ 492 use_cache = "true" if use_cache else "false" # type: ignore[assignment] 493 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entities?useCache={use_cache}" 494 return self.request_util.run_request(uri=url, method=GET)
Get workspace entity information.
Args:
- use_cache (bool, optional): Whether to use cache. Defaults to
True.
Returns:
- requests.Response: The response from the request.
496 def get_workspace_acl(self) -> requests.Response: 497 """ 498 Get the workspace access control list (ACL). 499 500 **Returns:** 501 - requests.Response: The response from the request. 502 """ 503 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl" 504 return self.request_util.run_request( 505 uri=url, 506 method=GET 507 )
Get the workspace access control list (ACL).
Returns:
- requests.Response: The response from the request.
509 def update_user_acl( 510 self, 511 email: str, 512 access_level: str, 513 can_share: bool = False, 514 can_compute: bool = False, 515 invite_users_not_found: bool = False, 516 ) -> requests.Response: 517 """ 518 Update the access control list (ACL) for a user in the workspace. 519 520 **Args:** 521 - email (str): The email of the user. 522 - access_level (str): The access level to grant to the user. 523 - can_share (bool, optional): Whether the user can share the workspace. Defaults to `False`. 524 - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to `False`. 525 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 526 the workspace. Defaults to `False` 527 528 **Returns:** 529 - requests.Response: The response from the request. 530 """ 531 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 532 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 533 payload = { 534 "email": email, 535 "accessLevel": access_level, 536 "canShare": can_share, 537 "canCompute": can_compute, 538 } 539 logging.info( 540 f"Updating user {email} to {access_level} in workspace {self.billing_project}/{self.workspace_name}") 541 response = self.request_util.run_request( 542 uri=url, 543 method=PATCH, 544 content_type=APPLICATION_JSON, 545 data="[" + json.dumps(payload) + "]" 546 ) 547 548 if response.json()["usersNotFound"] and not invite_users_not_found: 549 # Will be a list of one user 550 user_not_found = response.json()["usersNotFound"][0] 551 raise Exception( 552 f'The user {user_not_found["email"]} was not found and access was not updated' 553 ) 554 return response
Update the access control list (ACL) for a user in the workspace.
Args:
- email (str): The email of the user.
- access_level (str): The access level to grant to the user.
- can_share (bool, optional): Whether the user can share the workspace. Defaults to
False. - can_compute (bool, optional): Whether the user can compute in the workspace. Defaults to
False. - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
the workspace. Defaults to
False
Returns:
- requests.Response: The response from the request.
556 @deprecated( 557 """Firecloud functionality has been sunset. There is currently no support for adding library attributes in Terra.""" # noqa: E501 558 ) 559 def put_metadata_for_library_dataset(self, library_metadata: dict, validate: bool = False) -> requests.Response: 560 """ 561 Update the metadata for a library dataset. 562 563 **Args:** 564 - library_metadata (dict): The metadata to update. 565 - validate (bool, optional): Whether to validate the metadata. Defaults to `False`. 566 567 **Returns:** 568 - requests.Response: The response from the request. 569 """ 570 acl = f"{self.terra_link}/library/{self.billing_project}/{self.workspace_name}" + \ 571 f"/metadata?validate={str(validate).lower()}" 572 return self.request_util.run_request( 573 uri=acl, 574 method=PUT, 575 data=json.dumps(library_metadata) 576 )
Update the metadata for a library dataset.
Args:
- library_metadata (dict): The metadata to update.
- validate (bool, optional): Whether to validate the metadata. Defaults to
False.
Returns:
- requests.Response: The response from the request.
578 def update_multiple_users_acl( 579 self, acl_list: list[dict], invite_users_not_found: bool = False 580 ) -> requests.Response: 581 """ 582 Update the access control list (ACL) for multiple users in the workspace. 583 584 **Args:** 585 - acl_list (list[dict]): A list of dictionaries containing the ACL information for each user. 586 - invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access 587 the workspace. Defaults to `False` 588 589 **Returns:** 590 - requests.Response: The response from the request. 591 """ 592 url = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/acl?" + \ 593 f"inviteUsersNotFound={str(invite_users_not_found).lower()}" 594 logging.info( 595 f"Updating users in workspace {self.billing_project}/{self.workspace_name}") 596 response = self.request_util.run_request( 597 uri=url, 598 method=PATCH, 599 content_type=APPLICATION_JSON, 600 data=json.dumps(acl_list) 601 ) 602 603 if response.json()["usersNotFound"] and not invite_users_not_found: 604 # Will be a list of one user 605 users_not_found = [u["email"] for u in response.json()["usersNotFound"]] 606 raise Exception( 607 f"The following users were not found and access was not updated: {users_not_found}" 608 ) 609 return response
Update the access control list (ACL) for multiple users in the workspace.
Args:
- acl_list (list[dict]): A list of dictionaries containing the ACL information for each user.
- invite_users_not_found (bool, optional): Whether a user that's not found should still be invited to access
the workspace. Defaults to
False
Returns:
- requests.Response: The response from the request.
611 def create_workspace( 612 self, 613 auth_domain: list[dict] = [], 614 attributes: dict = {}, 615 continue_if_exists: bool = False, 616 ) -> requests.Response: 617 """ 618 Create a new workspace in Terra. 619 620 **Args:** 621 - auth_domain (list[dict], optional): A list of authorization domains. Should look 622 like `[{"membersGroupName": "some_auth_domain"}]`. Defaults to an empty list. 623 - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary. 624 - continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to `False`. 625 626 **Returns:** 627 - requests.Response: The response from the request. 628 """ 629 payload = { 630 "namespace": self.billing_project, 631 "name": self.workspace_name, 632 "authorizationDomain": auth_domain, 633 "attributes": attributes, 634 "cloudPlatform": GCP 635 } 636 # If workspace already exists then continue if exists 637 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 638 logging.info(f"Creating workspace {self.billing_project}/{self.workspace_name}") 639 response = self.request_util.run_request( 640 uri=f"{self.terra_link}/workspaces", 641 method=POST, 642 content_type=APPLICATION_JSON, 643 data=json.dumps(payload), 644 accept_return_codes=accept_return_codes 645 ) 646 if continue_if_exists and response.status_code == self.CONFLICT_STATUS_CODE: 647 logging.info(f"Workspace {self.billing_project}/{self.workspace_name} already exists") 648 return response
Create a new workspace in Terra.
Args:
- auth_domain (list[dict], optional): A list of authorization domains. Should look
like
[{"membersGroupName": "some_auth_domain"}]. Defaults to an empty list. - attributes (dict, optional): A dictionary of attributes for the workspace. Defaults to an empty dictionary.
- continue_if_exists (bool, optional): Whether to continue if the workspace already exists. Defaults to
False.
Returns:
- requests.Response: The response from the request.
650 def create_workspace_attributes_ingest_dict(self, workspace_attributes: Optional[dict] = None) -> list[dict]: 651 """ 652 Create an ingest dictionary for workspace attributes. 653 654 **Args:** 655 - workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None. 656 657 **Returns:** 658 - list[dict]: A list of dictionaries containing the workspace attributes. 659 """ 660 # If not provided then call API to get it 661 workspace_attributes = ( 662 workspace_attributes if workspace_attributes 663 else self.get_workspace_info().json()["workspace"]["attributes"] 664 ) 665 666 ingest_dict = [] 667 for key, value in workspace_attributes.items(): 668 # If value is dict just use 'items' as value 669 if isinstance(value, dict): 670 value = value.get("items") 671 # If value is list convert to comma separated string 672 if isinstance(value, list): 673 value = ", ".join(value) 674 ingest_dict.append( 675 { 676 "attribute": key, 677 "value": str(value) if value else None 678 } 679 ) 680 return ingest_dict
Create an ingest dictionary for workspace attributes.
Args:
- workspace_attributes (dict, optional): A dictionary of workspace attributes. Defaults to None.
Returns:
- list[dict]: A list of dictionaries containing the workspace attributes.
682 def upload_metadata_to_workspace_table(self, entities_tsv: str) -> requests.Response: 683 """ 684 Upload metadata to the workspace table. 685 686 **Args:** 687 - entities_tsv (str): The path to the TSV file containing the metadata to upload. 688 689 **Returns:** 690 - requests.Response: The response from the request. 691 """ 692 endpoint = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/flexibleImportEntities" 693 data = {"entities": open(entities_tsv, "rb")} 694 return self.request_util.upload_file( 695 uri=endpoint, 696 data=data 697 )
Upload metadata to the workspace table.
Args:
- entities_tsv (str): The path to the TSV file containing the metadata to upload.
Returns:
- requests.Response: The response from the request.
716 def upload_metadata_with_batch_upsert(self, table_data: dict, force: bool = False) -> requests.Response: 717 """ 718 Upload metadata to one or more workspace entity tables using batch upsert. 719 720 Builds the Terra batch upsert payload from a structured input dictionary and calls 721 `batch_upsert` with the result. 722 723 **Args:** 724 - table_data (dict): A dictionary mapping table names to their data configuration. 725 Each entry should have the following structure: 726 727 ```python 728 { 729 "table_name": { 730 "table_id_column": "column_that_is_the_entity_id", 731 "row_data": [ 732 {"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...}, 733 ... 734 ] 735 }, 736 ... 737 } 738 ``` 739 740 - `table_id_column`: The name of the column whose value is used as the entity name 741 (`name` field in the upsert payload). This column is **not** included as an attribute 742 operation. 743 - `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes 744 an `AddUpdateAttribute` operation. 745 - force (bool, optional): Whether to force update if id column does not match table name + _id. 746 747 **Returns:** 748 - requests.Response: The response from the request. 749 """ 750 upsert_payload = [] 751 table_name_failures = [] 752 for table_name, config in table_data.items(): 753 id_column = config["table_id_column"] 754 if id_column != f"{table_name}_id": 755 table_name_failures.append( 756 f"id column, {id_column}, does not match table {table_name}. This column will be renamed to {table_name}_id." 757 "Use force=True to force update." 758 ) 759 rows = config["row_data"] 760 for row in rows: 761 entity_name = row.get(id_column) 762 if entity_name is None: 763 raise Exception(f"Primary key column '{id_column}' is missing from row data - {row}") 764 operations = [ 765 { 766 "op": "AddUpdateAttribute", 767 "attributeName": col, 768 "addUpdateAttribute": value, 769 } 770 for col, value in row.items() 771 if col != id_column 772 ] 773 upsert_payload.append( 774 { 775 "name": entity_name, 776 "entityType": table_name, 777 "operations": operations, 778 } 779 ) 780 if table_name_failures: 781 for message in table_name_failures: 782 if force: 783 logging.warning(message) 784 else: 785 logging.error(message) 786 if not force: 787 raise Exception("One or more tables have id columns that do not match the expected format." 788 " See error messages above for details. Use force=True to force update.") 789 return self._batch_upsert(upsert_payload)
Upload metadata to one or more workspace entity tables using batch upsert.
Builds the Terra batch upsert payload from a structured input dictionary and calls
batch_upsert with the result.
Args:
- table_data (dict): A dictionary mapping table names to their data configuration. Each entry should have the following structure:
{
"table_name": {
"table_id_column": "column_that_is_the_entity_id",
"row_data": [
{"column_that_is_the_entity_id": "row1_id", "column_b": "value1", ...},
...
]
},
...
}
- `table_id_column`: The name of the column whose value is used as the entity name
(`name` field in the upsert payload). This column is **not** included as an attribute
operation.
- `row_data`: A list of row dictionaries. Every key except `table_id_column` becomes
an `AddUpdateAttribute` operation.
- force (bool, optional): Whether to force update if id column does not match table name + _id.
Returns:
- requests.Response: The response from the request.
791 def get_workspace_workflows(self) -> requests.Response: 792 """ 793 Get the workflows for the workspace. 794 795 **Returns:** 796 - requests.Response: The response from the request. 797 """ 798 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs?allRepos=true" 799 return self.request_util.run_request( 800 uri=uri, 801 method=GET 802 )
Get the workflows for the workspace.
Returns:
- requests.Response: The response from the request.
804 def import_workflow(self, workflow_dict: dict, continue_if_exists: bool = False) -> requests.Response: 805 """ 806 Import a workflow into the workspace. 807 808 **Args:** 809 - workflow_dict (dict): The dictionary containing the workflow information. 810 - continue_if_exists (bool, optional): Whether to continue if the workflow 811 already exists. Defaults to `False`. 812 813 **Returns:** 814 - requests.Response: The response from the request. 815 """ 816 uri = f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/methodconfigs" 817 workflow_json = json.dumps(workflow_dict) 818 accept_return_codes = [self.CONFLICT_STATUS_CODE] if continue_if_exists else [] 819 return self.request_util.run_request( 820 uri=uri, 821 method=POST, 822 data=workflow_json, 823 content_type=APPLICATION_JSON, 824 accept_return_codes=accept_return_codes 825 )
Import a workflow into the workspace.
Args:
- workflow_dict (dict): The dictionary containing the workflow information.
- continue_if_exists (bool, optional): Whether to continue if the workflow
already exists. Defaults to
False.
Returns:
- requests.Response: The response from the request.
827 def delete_workspace(self) -> requests.Response: 828 """ 829 Delete a Terra workspace. 830 831 **Returns:** 832 - requests.Response: The response from the request. 833 """ 834 return self.request_util.run_request( 835 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}", 836 method=DELETE 837 )
Delete a Terra workspace.
Returns:
- requests.Response: The response from the request.
839 def update_workspace_attributes(self, attributes: list[dict]) -> requests.Response: 840 """ 841 Update the attributes for the workspace. 842 843 **Args:** 844 - attributes (dict): The attributes to update. 845 846 **Returns:** 847 - requests.Response: The response from the request. 848 """ 849 return self.request_util.run_request( 850 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/updateAttributes", 851 method=PATCH, 852 data=json.dumps(attributes), 853 content_type=APPLICATION_JSON 854 )
Update the attributes for the workspace.
Args:
- attributes (dict): The attributes to update.
Returns:
- requests.Response: The response from the request.
856 def leave_workspace( 857 self, workspace_id: Optional[str] = None, ignore_direct_access_error: bool = False 858 ) -> requests.Response: 859 """ 860 Leave a workspace. If workspace ID not supplied, will look it up. 861 862 **Args:** 863 - workspace_id (str, optional): The workspace ID. Defaults to None. 864 - ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors. 865 Defaults to `False`. 866 867 **Returns:** 868 - requests.Response: The response from the request. 869 """ 870 if not workspace_id: 871 workspace_info = self.get_workspace_info().json() 872 workspace_id = workspace_info['workspace']['workspaceId'] 873 accepted_return_code = [403] if ignore_direct_access_error else [] 874 875 res = self.request_util.run_request( 876 uri=f"{SAM_LINK}/resources/v2/workspace/{workspace_id}/leave", 877 method=DELETE, 878 accept_return_codes=accepted_return_code 879 ) 880 if (res.status_code == 403 881 and res.json()["message"] == "You can only leave a resource that you have direct access to."): 882 logging.info( 883 f"Did not remove user from workspace with id '{workspace_id}' as current user does not have direct" 884 f"access to the workspace (they could be an owner on the billing project)" 885 ) 886 return res
Leave a workspace. If workspace ID not supplied, will look it up.
Args:
- workspace_id (str, optional): The workspace ID. Defaults to None.
- ignore_direct_access_error (Optional[bool], optional): Whether to ignore direct access errors.
Defaults to
False.
Returns:
- requests.Response: The response from the request.
888 def set_table_column_order(self, column_order: dict) -> requests.Response: 889 """ 890 Set the column order for one or more entity tables in the workspace. 891 892 **Args:** 893 - column_order (dict): A dictionary mapping table names to their column configuration. Each table entry 894 should have the following structure: 895 896 ``` 897 { 898 "table_name": { 899 "shown": ["col1", "col2", ...], # Columns to display, in order 900 "hidden": ["col3", "col4", ...] # Columns to hide 901 }, 902 ... 903 } 904 ``` 905 906 **Returns:** 907 - requests.Response: The response from the request. 908 """ 909 logging.info( 910 f"Setting column order for tables in workspace {self.billing_project}/{self.workspace_name}" 911 ) 912 return self.update_workspace_attributes( 913 attributes=[ 914 { 915 "op": "AddUpdateAttribute", 916 "attributeName": "workspace-column-defaults", 917 "addUpdateAttribute": json.dumps(column_order) 918 } 919 ] 920 )
Set the column order for one or more entity tables in the workspace.
Args:
- column_order (dict): A dictionary mapping table names to their column configuration. Each table entry should have the following structure:
{
"table_name": {
"shown": ["col1", "col2", ...], # Columns to display, in order
"hidden": ["col3", "col4", ...] # Columns to hide
},
...
}
Returns:
- requests.Response: The response from the request.
922 def change_workspace_public_setting(self, public: bool) -> requests.Response: 923 """ 924 Change a workspace's public setting. 925 926 **Args:** 927 - public (bool, optional): Whether the workspace should be public. Set to `True` to be made 928 public, `False` otherwise. 929 930 **Returns:** 931 - requests.Response: The response from the request. 932 """ 933 body = [ 934 { 935 "settingType": "PubliclyReadable", 936 "config": { 937 "enabled": public 938 } 939 } 940 ] 941 return self.request_util.run_request( 942 uri=f"{RAWLS_LINK}/workspaces/v2/{self.billing_project}/{self.workspace_name}/settings", 943 method=PUT, 944 content_type=APPLICATION_JSON, 945 data=json.dumps(body) 946 )
Change a workspace's public setting.
Args:
- public (bool, optional): Whether the workspace should be public. Set to
Trueto be made public,Falseotherwise.
Returns:
- requests.Response: The response from the request.
948 def check_workspace_public(self, bucket: Optional[str] = None) -> requests.Response: 949 """ 950 Check if a workspace is public. 951 952 **Args:** 953 - bucket (str, optional): The bucket name (provided without the `gs://` prefix). Will look 954 it up if not provided. Defaults to None. 955 956 **Returns:** 957 - requests.Response: The response from the request. 958 """ 959 workspace_bucket = bucket if bucket else self.get_workspace_bucket() 960 bucket_prefix_stripped = workspace_bucket.removeprefix("fc-secure-").removeprefix("fc-") 961 return self.request_util.run_request( 962 uri=f"{SAM_LINK}/resources/v2/workspace/{bucket_prefix_stripped}/policies/reader/public", 963 method=GET 964 )
Check if a workspace is public.
Args:
- bucket (str, optional): The bucket name (provided without the
gs://prefix). Will look it up if not provided. Defaults to None.
Returns:
- requests.Response: The response from the request.
966 def delete_entity_table(self, entity_to_delete: str) -> requests.Response: 967 """Delete an entire entity table from a Terra workspace. 968 969 **Args:** 970 - entity_to_delete (str): The name of the entity table to delete. 971 972 **Returns:** 973 - requests.Response: The response from the request. 974 """ 975 response = self.request_util.run_request( 976 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/entityTypes/{entity_to_delete}", # noqa: E501 977 method=DELETE 978 ) 979 if response.status_code == 204: 980 logging.info( 981 f"Successfully deleted entity table: '{entity_to_delete}' from workspace: " 982 f"'{self.billing_project}/{self.workspace_name}'" 983 ) 984 else: 985 logging.error( 986 f"Encountered the following error while attempting to delete '{entity_to_delete}' " 987 f"table: {response.text}" 988 ) 989 return response
Delete an entire entity table from a Terra workspace.
Args:
- entity_to_delete (str): The name of the entity table to delete.
Returns:
- requests.Response: The response from the request.
991 def save_entity_table_version(self, entity_type: str, version_name: str) -> None: 992 """Save an entity table version in a Terra workspace. 993 994 **Args:** 995 - entity_type (str): The name of the entity table to save a new version for 996 - version_name (str): The name of the new version 997 """ 998 # Get the workspace metrics 999 workspace_metrics = self.get_gcp_workspace_metrics(entity_type=entity_type) 1000 file_name = f"{entity_type}.json" 1001 # Write the workspace metrics to a JSON file 1002 with open(file_name, "w") as json_file: 1003 json.dump(workspace_metrics, json_file) 1004 1005 # Create a zip file with the same naming convention that Terra backend uses 1006 timestamp_ms = int(time.time() * 1000) 1007 zip_file_name = f"{entity_type}.v{timestamp_ms}.zip" 1008 with zipfile.ZipFile(zip_file_name, "w", zipfile.ZIP_DEFLATED) as zipf: 1009 zipf.write(file_name, arcname=f"json/{file_name}") 1010 1011 # Upload the zip file to subdirectory within the workspace's bucket (where Terra expects it to live) 1012 workspace_info = self.get_workspace_info().json() 1013 path_to_upload_to = os.path.join( 1014 "gs://", workspace_info["workspace"]["bucketName"], ".data-table-versions", entity_type, zip_file_name 1015 ) 1016 gcp_util = GCPCloudFunctions(project=workspace_info["workspace"]["googleProject"]) 1017 # Attempt to get the currently active gcloud account. Default to the workspace creator if that fails 1018 try: 1019 active_account = gcp_util.get_active_gcloud_account() 1020 except Exception as e: 1021 active_account = workspace_info["workspace"]["createdBy"] 1022 logging.error( 1023 f"Encountered the following exception while attempting to get current GCP account: {e}. " 1024 f"Will set the owner of the new metadata version as the workspace creator instead." 1025 ) 1026 gcp_util.upload_blob( 1027 source_file=zip_file_name, 1028 destination_path=path_to_upload_to, 1029 custom_metadata={ 1030 "createdBy": active_account, 1031 "entityType": entity_type, 1032 "timestamp": timestamp_ms, 1033 "description": version_name, 1034 } 1035 )
Save an entity table version in a Terra workspace.
Args:
- entity_type (str): The name of the entity table to save a new version for
- version_name (str): The name of the new version
1037 def add_user_comment_to_submission(self, submission_id: str, user_comment: str) -> requests.Response: 1038 """ 1039 Add a user comment to a submission in Terra. 1040 1041 **Args:** 1042 - submission_id (str): The ID of the submission to add a comment to. 1043 - user_comment (str): The comment to add to the submission. 1044 1045 **Returns:** 1046 - requests.Response: The response from the request. 1047 """ 1048 logging.info(f"Attempting to add user comment: '{user_comment}' to submission: '{submission_id}'") 1049 return self.request_util.run_request( 1050 uri=f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}", 1051 method=PATCH, 1052 content_type=APPLICATION_JSON, 1053 data=json.dumps({"userComment": user_comment}), 1054 )
Add a user comment to a submission in Terra.
Args:
- submission_id (str): The ID of the submission to add a comment to.
- user_comment (str): The comment to add to the submission.
Returns:
- requests.Response: The response from the request.
1056 def initiate_submission( 1057 self, 1058 method_config_namespace: str, 1059 method_config_name: str, 1060 entity_type: str, 1061 entity_name: str, 1062 expression: str, 1063 user_comment: Optional[str], 1064 use_call_cache: bool = True 1065 ) -> requests.Response: 1066 """ 1067 Initiate a submission within a Terra workspace. 1068 1069 Note - the workflow being initiated MUST already be imported into the workspace. 1070 1071 **Args:** 1072 - method_config_namespace (str): The namespace of the method configuration. 1073 - method_config_name (str): The name of the method configuration to use for the submission 1074 (i.e. the workflow name). 1075 - entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set"). 1076 - entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or 1077 "sample_set_1"). 1078 - expression (str): The "expression" to use. For example, if the `entity_type` is `sample` and the workflow is 1079 launching one sample, this can be left as `this`. If the `entity_type` is `sample_set`, but one workflow should 1080 be launched PER SAMPLE, the expression should be `this.samples`. 1081 - user_comment (str, optional): The user comment to add to the submission. 1082 - use_call_cache (bool, optional): Whether to use the call caching. Defaults to `True`. 1083 1084 **Returns:** 1085 - requests.Response: The response from the request. 1086 """ 1087 payload = { 1088 "methodConfigurationNamespace": method_config_namespace, 1089 "methodConfigurationName": method_config_name, 1090 "entityType": entity_type, 1091 "entityName": entity_name, 1092 "expression": expression, 1093 "useCallCache": use_call_cache, 1094 "deleteIntermediateOutputFiles": False, 1095 "useReferenceDisks": False, 1096 "ignoreEmptyOutputs": False, 1097 } 1098 if user_comment: 1099 payload["userComment"] = user_comment 1100 1101 return self.request_util.run_request( 1102 uri=f"{self.terra_link}/workspaces/{self.billing_project}/{self.workspace_name}/submissions", 1103 method=POST, 1104 content_type=APPLICATION_JSON, 1105 data=json.dumps(payload), 1106 )
Initiate a submission within a Terra workspace.
Note - the workflow being initiated MUST already be imported into the workspace.
Args:
- method_config_namespace (str): The namespace of the method configuration.
- method_config_name (str): The name of the method configuration to use for the submission (i.e. the workflow name).
- entity_type (str): The entity type to be used as input to the workflow (e.x. "sample", or "sample_set").
- entity_name (str): The name of the entity to be used as input to the workflow (e.x. "sample_1", or "sample_set_1").
- expression (str): The "expression" to use. For example, if the
entity_typeissampleand the workflow is launching one sample, this can be left asthis. If theentity_typeissample_set, but one workflow should be launched PER SAMPLE, the expression should bethis.samples. - user_comment (str, optional): The user comment to add to the submission.
- use_call_cache (bool, optional): Whether to use the call caching. Defaults to
True.
Returns:
- requests.Response: The response from the request.
1108 def retry_failed_submission(self, submission_id: str) -> requests.Response: 1109 """ 1110 Retry a failed submission in Terra. 1111 1112 **Args:** 1113 - submission_id (str): The ID of the submission to retry. 1114 1115 **Returns:** 1116 - requests.Response: The response from the request. 1117 """ 1118 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/retry" 1119 payload = {"retryType": "Failed"} 1120 logging.info( 1121 f"Retrying failed submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" 1122 ) 1123 return self.request_util.run_request( 1124 uri=url, 1125 method=POST, 1126 content_type=APPLICATION_JSON, 1127 data=json.dumps(payload) 1128 )
Retry a failed submission in Terra.
Args:
- submission_id (str): The ID of the submission to retry.
Returns:
- requests.Response: The response from the request.
1130 def get_submission_status(self, submission_id: str) -> requests.Response: 1131 """ 1132 Get the status of a submission in Terra. 1133 1134 **Args:** 1135 - submission_id (str): The ID of the submission. 1136 1137 **Returns:** 1138 - requests.Response: The response from the request. 1139 """ 1140 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}" 1141 logging.info( 1142 f"Getting status for submission: '{submission_id}' in workspace {self.billing_project}/{self.workspace_name}" # noqa: E501 1143 ) 1144 return self.request_util.run_request( 1145 uri=url, 1146 method=GET 1147 )
Get the status of a submission in Terra.
Args:
- submission_id (str): The ID of the submission.
Returns:
- requests.Response: The response from the request.
1149 def get_workspace_submission_status(self) -> requests.Response: 1150 """ 1151 Get the status of all submissions in a Terra workspace. 1152 1153 **Returns:** 1154 - requests.Response: The response from the request. 1155 """ 1156 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions" 1157 logging.info( 1158 f"Getting status for all submissions in workspace {self.billing_project}/{self.workspace_name}" 1159 ) 1160 return self.request_util.run_request( 1161 uri=url, 1162 method=GET 1163 )
Get the status of all submissions in a Terra workspace.
Returns:
- requests.Response: The response from the request.
1165 def get_workflow_status( 1166 self, 1167 submission_id: str, 1168 workflow_id: str, 1169 expand_sub_workflow_metadata: bool = False) -> requests.Response: 1170 """ 1171 Get the status of a workflow in a submission in Terra. 1172 1173 **Args:** 1174 - submission_id (str): The ID of the submission. 1175 - workflow_id (str): The ID of the workflow. 1176 - expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata. 1177 Defaults to `False`. 1178 1179 **Returns:** 1180 - requests.Response: The response from the request. 1181 """ 1182 expand_metadata = '?expandSubWorkflows=true' if expand_sub_workflow_metadata else '' 1183 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}/submissions/{submission_id}/workflows/{workflow_id}{expand_metadata}" # noqa: E501 1184 logging.info( 1185 f"Getting status for workflow: '{workflow_id}' in submission: '{submission_id}' " 1186 f"in workspace {self.billing_project}/{self.workspace_name}" 1187 ) 1188 return self.request_util.run_request( 1189 uri=url, 1190 method=GET 1191 )
Get the status of a workflow in a submission in Terra.
Args:
- submission_id (str): The ID of the submission.
- workflow_id (str): The ID of the workflow.
- expand_sub_workflow_metadata (bool, optional): Whether to expand the expand_sub workflow metadata.
Defaults to
False.
Returns:
- requests.Response: The response from the request.
1193 def get_workspace_submission_stats( 1194 self, method_name: Optional[str] = None, retrieve_running_ids: bool = True 1195 ) -> dict: 1196 """ 1197 Get submission statistics for a Terra workspace, optionally filtered by method name. 1198 1199 **Args:** 1200 - method_name (str, optional): The name of the method to filter statistics by. Defaults to None. 1201 - retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running. 1202 Defaults to `True`. 1203 1204 **Returns:** 1205 - dict: A dictionary containing submission statistics, including counts of workflows in various states 1206 """ 1207 submissions = [ 1208 s 1209 for s in self.get_workspace_submission_status().json() 1210 # If method_name is provided, filter submissions to only those with that method name 1211 if (s["methodConfigurationName"] == method_name if method_name else True) 1212 ] 1213 method_append = f"with method name '{method_name}'" if method_name else "" 1214 logging.info( 1215 f"{len(submissions)} submissions in " 1216 f"{self.billing_project}/{self.workspace_name} {method_append}" 1217 ) 1218 workflow_statuses = { 1219 "submitted": 0, 1220 "queued": 0, 1221 "running": 0, 1222 "aborting": 0, 1223 "aborted": 0, 1224 "failed": 0, 1225 "succeeded": 0, 1226 "id_still_running": [] if retrieve_running_ids else "NA" 1227 } 1228 for submission in submissions: 1229 wf_status = submission["workflowStatuses"] 1230 for status, count in wf_status.items(): 1231 if status.lower() in workflow_statuses: 1232 workflow_statuses[status.lower()] += count 1233 # Only look at individual submissions if retrieve running ids set to true 1234 # and only look at submissions that are still running 1235 if retrieve_running_ids and submission['status'] not in ["Done", "Aborted"]: 1236 submission_detailed = self.get_submission_status(submission_id=submission["submissionId"]).json() 1237 for workflow in submission_detailed["workflows"]: 1238 if workflow["status"] in ["Running", "Submitted", "Queued"]: 1239 entity_id = workflow["workflowEntity"]["entityName"] 1240 workflow_statuses['id_still_running'].append(entity_id) # type: ignore[attr-defined] 1241 running_count = workflow_statuses['running'] + workflow_statuses['submitted'] + workflow_statuses['queued'] # type: ignore[operator] # noqa: E501 1242 if retrieve_running_ids and len(workflow_statuses['id_still_running']) != running_count: # type: ignore[arg-type] # noqa: E501 1243 logging.warning( 1244 f"Discrepancy found between total running/pending workflows, {running_count}, " 1245 f"and the count of ids still running/pending, {len(workflow_statuses['id_still_running'])}. " # type: ignore[arg-type] # noqa: E501 1246 "Workflows may have completed between API calls." 1247 ) 1248 denominator = workflow_statuses['succeeded'] + workflow_statuses['failed'] # type: ignore[operator] 1249 if denominator > 0: 1250 workflow_statuses['success_rate'] = round( 1251 workflow_statuses['succeeded'] / denominator, 1252 2 1253 ) 1254 else: 1255 workflow_statuses['success_rate'] = 0.0 1256 return workflow_statuses
Get submission statistics for a Terra workspace, optionally filtered by method name.
Args:
- method_name (str, optional): The name of the method to filter statistics by. Defaults to None.
- retrieve_running_ids (bool, optional): Whether to retrieve the IDs of workflows that are still running.
Defaults to
True.
Returns:
- dict: A dictionary containing submission statistics, including counts of workflows in various states
1258 def get_workspace_details(self, terra_google_project_id: str) -> requests.Response: 1259 """ 1260 Get details of a Terra workspace using the Google project ID. 1261 1262 **Args:** 1263 - terra_google_project_id (str): The Google project ID of the Terra workspace. 1264 1265 **Returns:** 1266 - requests.Response: The response from the request. 1267 """ 1268 url = f"{RAWLS_LINK}/workspaces/{self.billing_project}/{self.workspace_name}?userProject={terra_google_project_id}" # noqa: E501 1269 logging.info( 1270 f"Getting workspace details for workspace '{self.workspace_name}' using Terra Google" 1271 f" project ID: '{terra_google_project_id}'" 1272 ) 1273 return self.request_util.run_request( 1274 uri=url, 1275 method=GET 1276 )
Get details of a Terra workspace using the Google project ID.
Args:
- terra_google_project_id (str): The Google project ID of the Terra workspace.
Returns:
- requests.Response: The response from the request.