ops_utils.gcp_utils
Module for GCP utilities.
1"""Module for GCP utilities.""" 2import os 3import logging 4import time 5import io 6import json 7import hashlib 8import base64 9import subprocess 10 11from humanfriendly import format_size, parse_size 12from mimetypes import guess_type 13from typing import Optional, Any 14from google.cloud.storage.blob import Blob 15from google.api_core.exceptions import Forbidden, GoogleAPICallError 16from google.oauth2 import service_account 17from google.cloud import storage 18from google.auth import default 19 20from .vars import ARG_DEFAULTS 21from .thread_pool_executor_util import MultiThreadedJobs 22 23MOVE = "move" 24"""Variable to be used for the "action" when files should be moved.""" 25COPY = "copy" 26"""Variable to be used for the "action" when files should be copied.""" 27MD5_HEX = "hex" 28"""Variable to be used when the generated `md5` should be of the `hex` type.""" 29MD5_BASE64 = "base64" 30"""Variable to be used when the generated `md5` should be of the `base64` type.""" 31 32 33class GCPCloudFunctions: 34 """Class to handle GCP Cloud Functions.""" 35 36 def __init__( 37 self, 38 project: Optional[str] = None, 39 service_account_json: Optional[str] = None 40 ) -> None: 41 """ 42 Initialize the GCPCloudFunctions class. 43 44 Authenticates using service account JSON if provided or default credentials, 45 and sets up the Storage Client. 46 47 Args: 48 project: Optional[str] = None 49 The GCP project ID. If not provided, will use project from service account or default. 50 service_account_json: Optional[str] = None 51 Path to service account JSON key file. If provided, will use these credentials. 52 """ 53 # Initialize credentials and project 54 credentials = None 55 default_project = None 56 57 if service_account_json: 58 credentials = service_account.Credentials.from_service_account_file(service_account_json) 59 # Extract project from service account if not specified 60 if not project: 61 with open(service_account_json, 'r') as f: 62 sa_info = json.load(f) 63 project = sa_info.get('project_id') 64 else: 65 # Use default credentials 66 credentials, default_project = default() 67 68 # Set project if not already set 69 if not project: 70 project = default_project 71 72 self.client = storage.Client(credentials=credentials, project=project) 73 """@private""" 74 75 @staticmethod 76 def _process_cloud_path(cloud_path: str) -> dict: 77 """ 78 Process a GCS cloud path into its components. 79 80 Args: 81 cloud_path (str): The GCS cloud path. 82 83 Returns: 84 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 85 """ 86 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 87 bucket_name = str.split(remaining_url, sep="/")[0] 88 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 89 path_components = { 90 "platform_prefix": platform_prefix, 91 "bucket": bucket_name, 92 "blob_url": blob_name 93 } 94 return path_components 95 96 def load_blob_from_full_path(self, full_path: str) -> Blob: 97 """ 98 Load a GCS blob object from a full GCS path. 99 100 **Args:** 101 - full_path (str): The full GCS path. 102 103 **Returns:** 104 - google.cloud.storage.blob.Blob: The GCS blob object. 105 """ 106 file_path_components = self._process_cloud_path(full_path) 107 108 # Specify the billing project 109 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 110 blob = bucket.blob(file_path_components["blob_url"]) 111 112 # If blob exists in GCS reload it so metadata is there 113 if blob.exists(): 114 blob.reload() 115 return blob 116 117 def check_file_exists(self, full_path: str) -> bool: 118 """ 119 Check if a file exists in GCS. 120 121 **Args:** 122 - full_path (str): The full GCS path. 123 124 **Returns:** 125 - bool: `True` if the file exists, `False` otherwise. 126 """ 127 blob = self.load_blob_from_full_path(full_path) 128 return blob.exists() 129 130 @staticmethod 131 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 132 """ 133 Create a dictionary containing file information. 134 135 Args: 136 bucket_name (str): The name of the GCS bucket. 137 blob (Any): The GCS blob object. 138 file_name_only (bool): Whether to return only the file list. 139 140 Returns: 141 dict: A dictionary containing file information. 142 """ 143 if file_name_only: 144 return { 145 "path": f"gs://{bucket_name}/{blob.name}" 146 } 147 return { 148 "name": os.path.basename(blob.name), 149 "path": f"gs://{bucket_name}/{blob.name}", 150 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 151 "file_extension": os.path.splitext(blob.name)[1], 152 "size_in_bytes": blob.size, 153 "md5_hash": blob.md5_hash 154 } 155 156 @staticmethod 157 def _validate_include_blob( 158 blob: Any, 159 bucket_name: str, 160 file_extensions_to_ignore: list[str] = [], 161 file_strings_to_ignore: list[str] = [], 162 file_extensions_to_include: list[str] = [], 163 verbose: bool = False 164 ) -> bool: 165 """ 166 Validate if a blob should be included based on its file extension. 167 168 Args: 169 file_extensions_to_include (list[str]): List of file extensions to include. 170 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 171 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 172 blob (Any): The GCS blob object. 173 verbose (bool): Whether to log files not being included. 174 175 Returns: 176 bool: True if the blob should be included, False otherwise. 177 """ 178 file_path = f"gs://{bucket_name}/{blob.name}" 179 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 180 if verbose: 181 logging.info(f"Skipping {file_path} as it has an extension to ignore") 182 return False 183 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 184 if verbose: 185 logging.info(f"Skipping {file_path} as it does not have an extension to include") 186 return False 187 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 188 if verbose: 189 logging.info(f"Skipping {file_path} as it has a string to ignore") 190 return False 191 return True 192 193 def list_bucket_contents( 194 self, 195 bucket_name: str, 196 prefix: Optional[str] = None, 197 file_extensions_to_ignore: list[str] = [], 198 file_strings_to_ignore: list[str] = [], 199 file_extensions_to_include: list[str] = [], 200 file_name_only: bool = False 201 ) -> list[dict]: 202 """ 203 List contents of a GCS bucket and return a list of dictionaries with file information. 204 205 **Args:** 206 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 207 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 208 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 209 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 210 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 211 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 212 213 **Returns:** 214 - list[dict]: A list of dictionaries containing file information. 215 """ 216 # If the bucket name starts with gs://, remove it 217 if bucket_name.startswith("gs://"): 218 bucket_name = bucket_name.split("/")[2].strip() 219 220 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 221 222 # Get the bucket object and set user_project for Requester Pays 223 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 224 225 # List blobs within the bucket 226 blobs = bucket.list_blobs(prefix=prefix) 227 logging.info("Finished listing blobs. Processing files now.") 228 229 # Create a list of dictionaries containing file information 230 file_list = [ 231 self._create_bucket_contents_dict( 232 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 233 ) 234 for blob in blobs 235 if self._validate_include_blob( 236 blob=blob, 237 file_extensions_to_ignore=file_extensions_to_ignore, 238 file_strings_to_ignore=file_strings_to_ignore, 239 file_extensions_to_include=file_extensions_to_include, 240 bucket_name=bucket_name 241 ) and not blob.name.endswith("/") 242 ] 243 logging.info(f"Found {len(file_list)} files in bucket") 244 return file_list 245 246 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 247 """ 248 Copy a file from one GCS location to another. 249 250 **Args:** 251 - src_cloud_path (str): The source GCS path. 252 - full_destination_path (str): The destination GCS path. 253 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 254 """ 255 try: 256 src_blob = self.load_blob_from_full_path(src_cloud_path) 257 dest_blob = self.load_blob_from_full_path(full_destination_path) 258 259 # Use rewrite so no timeouts 260 rewrite_token = False 261 262 while True: 263 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 264 src_blob, token=rewrite_token 265 ) 266 if verbose: 267 logging.info( 268 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 269 ) 270 if not rewrite_token: 271 break 272 273 except Exception as e: 274 logging.error( 275 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 276 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 277 ) 278 raise 279 280 def delete_cloud_file(self, full_cloud_path: str) -> None: 281 """ 282 Delete a file from GCS. 283 284 **Args:** 285 - full_cloud_path (str): The GCS path of the file to delete. 286 """ 287 blob = self.load_blob_from_full_path(full_cloud_path) 288 blob.delete() 289 290 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 291 """ 292 Move a file from one GCS location to another. 293 294 **Args:** 295 - src_cloud_path (str): The source GCS path. 296 - full_destination_path (str): The destination GCS path. 297 """ 298 self.copy_cloud_file(src_cloud_path, full_destination_path) 299 self.delete_cloud_file(src_cloud_path) 300 301 def get_filesize(self, target_path: str) -> int: 302 """ 303 Get the size of a file in GCS. 304 305 **Args:** 306 - target_path (str): The GCS path of the file. 307 308 **Returns:** 309 - int: The size of the file in bytes. 310 """ 311 blob = self.load_blob_from_full_path(target_path) 312 return blob.size 313 314 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 315 """ 316 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 317 318 **Args:** 319 - src_cloud_path (str): The source GCS path. 320 - dest_cloud_path (str): The destination GCS path. 321 322 **Returns:** 323 - bool: `True` if the files are identical, `False` otherwise. 324 """ 325 src_blob = self.load_blob_from_full_path(src_cloud_path) 326 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 327 328 # If either blob is None or does not exist 329 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 330 return False 331 # If the MD5 hashes exist 332 if src_blob.md5_hash and dest_blob.md5_hash: 333 # And are the same return True 334 if src_blob.md5_hash == dest_blob.md5_hash: 335 return True 336 else: 337 # If md5 do not exist (for larger files they may not) check size matches 338 if src_blob.size == dest_blob.size: 339 return True 340 # Otherwise, return False 341 return False 342 343 def delete_multiple_files( 344 self, 345 files_to_delete: list[str], 346 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 347 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 348 verbose: bool = False, 349 job_complete_for_logging: int = 500 350 ) -> None: 351 """ 352 Delete multiple cloud files in parallel using multi-threading. 353 354 **Args:** 355 - files_to_delete (list[str]): List of GCS paths of the files to delete. 356 - workers (int, optional): Number of worker threads. Defaults to `10`. 357 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 358 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 359 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 360 """ 361 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 362 363 MultiThreadedJobs().run_multi_threaded_job( 364 workers=workers, 365 function=self.delete_cloud_file, 366 list_of_jobs_args_list=list_of_jobs_args_list, 367 max_retries=max_retries, 368 fail_on_error=True, 369 verbose=verbose, 370 collect_output=False, 371 jobs_complete_for_logging=job_complete_for_logging 372 ) 373 374 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 375 """ 376 Validate if source and destination files are identical. 377 378 **Args:** 379 - source_file (str): The source file path. 380 - full_destination_path (str): The destination file path. 381 382 **Returns:** 383 dict: The file dictionary of the files with a boolean indicating if they are identical. 384 """ 385 if self.validate_files_are_same(source_file, full_destination_path): 386 identical = True 387 else: 388 identical = False 389 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 390 391 def loop_and_log_validation_files_multithreaded( 392 self, 393 files_to_validate: list[dict], 394 log_difference: bool, 395 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 396 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 397 job_complete_for_logging: int = 500 398 ) -> list[dict]: 399 """ 400 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 401 402 **Args:** 403 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 404 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 405 this at the start of a copy/move operation to check if files are already copied. 406 - workers (int, optional): Number of worker threads. Defaults to `10`. 407 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 408 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 409 410 **Returns:** 411 - list[dict]: List of dictionaries containing files that are **not** identical. 412 """ 413 logging.info(f"Validating if {len(files_to_validate)} files are identical") 414 415 # Prepare jobs: pass the necessary arguments to each validation 416 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 417 418 # Use multithreaded job runner to validate the files 419 checked_files = MultiThreadedJobs().run_multi_threaded_job( 420 workers=workers, 421 function=self._validate_file_pair, 422 list_of_jobs_args_list=jobs, 423 collect_output=True, 424 max_retries=max_retries, 425 jobs_complete_for_logging=job_complete_for_logging 426 ) 427 # If any files failed to load, raise an exception 428 if files_to_validate and None in checked_files: # type: ignore[operator] 429 logging.error("Failed to validate all files, could not load some blobs") 430 raise Exception("Failed to validate all files") 431 432 # Get all files that are not identical 433 not_identical_files = [ 434 file_dict 435 for file_dict in checked_files # type: ignore[operator, union-attr] 436 if not file_dict['identical'] 437 ] 438 if not_identical_files: 439 if log_difference: 440 for file_dict in not_identical_files: 441 logging.warning( 442 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 443 ) 444 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 445 return not_identical_files 446 447 def multithread_copy_of_files_with_validation( 448 self, 449 files_to_copy: list[dict], 450 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 451 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 452 skip_check_if_already_copied: bool = False 453 ) -> None: 454 """ 455 Copy multiple files in parallel with validation. 456 457 **Args:** 458 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 459 Dictionary should have keys `source_file` and `full_destination_path` 460 - workers (int): Number of worker threads. Defaults to `10`. 461 - max_retries (int): Maximum number of retries. Defaults to `5` 462 - skip_check_if_already_copied (bool, optional): Whether to skip checking 463 if files are already copied and start copying right away. Defaults to `False`. 464 """ 465 if skip_check_if_already_copied: 466 logging.info("Skipping check if files are already copied") 467 updated_files_to_move = files_to_copy 468 else: 469 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 470 files_to_copy, 471 log_difference=False, 472 workers=workers, 473 max_retries=max_retries 474 ) 475 # If all files are already copied, return 476 if not updated_files_to_move: 477 logging.info("All files are already copied") 478 return None 479 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 480 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 481 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 482 # Validate that all files were copied successfully 483 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 484 files_to_copy, 485 workers=workers, 486 log_difference=True, 487 max_retries=max_retries 488 ) 489 if files_not_moved_successfully: 490 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 491 raise Exception("Failed to copy all files") 492 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 493 return None 494 495 def move_or_copy_multiple_files( 496 self, files_to_move: list[dict], 497 action: str, 498 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 499 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 500 verbose: bool = False, 501 jobs_complete_for_logging: int = 500 502 ) -> None: 503 """ 504 Move or copy multiple files in parallel. 505 506 **Args:** 507 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 508 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 509 or `ops_utils.gcp_utils.COPY`). 510 - workers (int): Number of worker threads. Defaults to `10`. 511 - max_retries (int): Maximum number of retries. Defaults to `5`. 512 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 513 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 514 515 **Raises:** 516 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 517 """ 518 if action == MOVE: 519 cloud_function = self.move_cloud_file 520 elif action == COPY: 521 cloud_function = self.copy_cloud_file 522 else: 523 raise ValueError("Must either select move or copy") 524 525 list_of_jobs_args_list = [ 526 [ 527 file_dict['source_file'], file_dict['full_destination_path'] 528 ] 529 for file_dict in files_to_move 530 ] 531 MultiThreadedJobs().run_multi_threaded_job( 532 workers=workers, 533 function=cloud_function, 534 list_of_jobs_args_list=list_of_jobs_args_list, 535 max_retries=max_retries, 536 fail_on_error=True, 537 verbose=verbose, 538 collect_output=False, 539 jobs_complete_for_logging=jobs_complete_for_logging 540 ) 541 542 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 543 """ 544 Read the content of a file from GCS. 545 546 **Args:** 547 - cloud_path (str): The GCS path of the file to read. 548 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 549 550 **Returns:** 551 - bytes: The content of the file as bytes. 552 """ 553 blob = self.load_blob_from_full_path(cloud_path) 554 # Download the file content as bytes 555 content_bytes = blob.download_as_bytes() 556 # Convert bytes to string 557 content_str = content_bytes.decode(encoding) 558 return content_str 559 560 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 561 """ 562 Upload a file to GCS. 563 564 **Args:** 565 - destination_path (str): The destination GCS path. 566 - source_file (str): The source file path. 567 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 568 """ 569 blob = self.load_blob_from_full_path(destination_path) 570 if custom_metadata: 571 blob.metadata = custom_metadata 572 blob.upload_from_filename(source_file) 573 574 def get_object_md5( 575 self, 576 file_path: str, 577 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 578 chunk_size: int = parse_size("256 KB"), 579 logging_bytes: int = parse_size("1 GB"), 580 returned_md5_format: str = "hex" 581 ) -> str: 582 """ 583 Calculate the MD5 checksum of a file in GCS. 584 585 **Args:** 586 - file_path (str): The GCS path of the file. 587 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 588 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 589 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 590 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 591 592 **Returns:** 593 - str: The MD5 checksum of the file. 594 595 **Raises:** 596 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 597 or `ops_utils.gcp_utils.MD5_BASE64` 598 """ 599 if returned_md5_format not in ["hex", "base64"]: 600 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 601 602 blob = self.load_blob_from_full_path(file_path) 603 604 # Create an MD5 hash object 605 md5_hash = hashlib.md5() 606 607 blob_size_str = format_size(blob.size) 608 logging.info(f"Streaming {file_path} which is {blob_size_str}") 609 # Use a BytesIO stream to collect data in chunks and upload it 610 buffer = io.BytesIO() 611 total_bytes_streamed = 0 612 # Keep track of the last logged size for data logging 613 last_logged = 0 614 615 with blob.open("rb") as source_stream: 616 while True: 617 chunk = source_stream.read(chunk_size) 618 if not chunk: 619 break 620 md5_hash.update(chunk) 621 buffer.write(chunk) 622 total_bytes_streamed += len(chunk) 623 # Log progress every 1 gb if verbose used 624 if total_bytes_streamed - last_logged >= logging_bytes: 625 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 626 last_logged = total_bytes_streamed 627 628 if returned_md5_format == "hex": 629 md5 = md5_hash.hexdigest() 630 logging.info(f"MD5 (hex) for {file_path}: {md5}") 631 elif returned_md5_format == "base64": 632 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 633 logging.info(f"MD5 (base64) for {file_path}: {md5}") 634 return md5 635 636 def set_acl_public_read(self, cloud_path: str) -> None: 637 """ 638 Set the file in the bucket to be publicly readable. 639 640 **Args:** 641 - cloud_path (str): The GCS path of the file to be set as public readable. 642 """ 643 blob = self.load_blob_from_full_path(cloud_path) 644 blob.acl.all().grant_read() 645 blob.acl.save() 646 647 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 648 """ 649 Set the file in the bucket to grant OWNER permission to a specific group. 650 651 **Args:** 652 - cloud_path (str): The GCS path of the file. 653 - group_email (str): The email of the group to grant OWNER permission 654 """ 655 blob = self.load_blob_from_full_path(cloud_path) 656 blob.acl.group(group_email).grant_owner() 657 blob.acl.save() 658 659 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 660 """ 661 Set Cache-Control metadata for a file. 662 663 **Args:** 664 - cloud_path (str): The GCS path of the file. 665 - cache_control (str): The Cache-Control metadata to set. 666 """ 667 blob = self.load_blob_from_full_path(cloud_path) 668 blob.cache_control = cache_control 669 blob.patch() 670 671 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 672 """ 673 Get the most recent blob in the bucket. 674 675 If the blob with the most recent timestamp doesn't have 676 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 677 recent file until a log with useful information is encountered. This is useful when combing through 678 GCP activity logs for Terra workspace buckets. 679 680 **Args:** 681 - bucket_name (str): The GCS bucket name. 682 683 **Returns:** 684 - Optional tuple of the blob found and the file contents from the blob 685 """ 686 blobs = sorted( 687 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 688 ) 689 for blob in blobs: 690 # Download the file contents as a string 691 file_contents = blob.download_as_text() 692 693 # Check if the content matches the undesired format 694 lines = file_contents.splitlines() 695 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 696 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 697 continue 698 699 # If it doesn't match the undesired format, return its content 700 logging.info(f"Found valid file: {blob.name}") 701 return blob, file_contents 702 703 logging.info("No valid files found.") 704 return None 705 706 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 707 """ 708 Write content to a file in GCS. 709 710 **Args:** 711 - cloud_path (str): The GCS path of the file to write. 712 - file_contents (str): The content to write. 713 """ 714 blob = self.load_blob_from_full_path(cloud_path) 715 blob.upload_from_string(file_contents) 716 logging.info(f"Successfully wrote content to {cloud_path}") 717 718 @staticmethod 719 def get_active_gcloud_account() -> str: 720 """ 721 Get the active GCP email for the current account. 722 723 **Returns:** 724 - str: The active GCP account email. 725 """ 726 result = subprocess.run( 727 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 728 capture_output=True, 729 text=True, 730 check=True 731 ) 732 return result.stdout.strip() 733 734 def has_write_permission(self, cloud_path: str) -> bool: 735 """ 736 Check if the current user has permission to write to a GCP path. 737 738 This method tests write access by attempting to update the metadata 739 of an existing blob or create a zero-byte temporary file if the blob 740 doesn't exist. The temporary file is deleted immediately if created. 741 742 **Args:** 743 - cloud_path (str): The GCS path to check for write permissions. 744 745 **Returns:** 746 - bool: True if the user has write permission, False otherwise. 747 """ 748 if not cloud_path.startswith("gs://"): 749 raise ValueError("cloud_path must start with 'gs://'") 750 if cloud_path.endswith("/"): 751 logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp") 752 cloud_path = f"{cloud_path}permission_test_temp" 753 try: 754 blob = self.load_blob_from_full_path(cloud_path) 755 if blob.exists(): 756 # Try updating metadata (doesn't change the content) 757 original_metadata = blob.metadata or {} 758 test_metadata = original_metadata.copy() 759 test_metadata["_write_permission_test"] = "true" 760 761 blob.metadata = test_metadata 762 blob.patch() 763 764 # Restore the original metadata 765 blob.metadata = original_metadata 766 blob.patch() 767 768 logging.info(f"Write permission confirmed for existing blob {cloud_path}") 769 return True 770 else: 771 # Try writing a temporary file to the bucket 772 blob.upload_from_string("") 773 774 # Clean up the test file 775 blob.delete() 776 logging.info(f"Write permission confirmed for {cloud_path}") 777 return True 778 except Forbidden: 779 logging.warning(f"No write permission on path {cloud_path}") 780 return False 781 except GoogleAPICallError as e: 782 logging.warning(f"Error testing write access to {cloud_path}: {e}") 783 return False 784 785 def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None: 786 """ 787 Wait for write permissions on a GCP path, checking at regular intervals. 788 789 This method will periodically check if the user has write permission on the specified cloud path. 790 It will continue checking until either write permission is granted or the maximum wait time is reached. 791 792 **Args:** 793 - cloud_path (str): The GCS path to check for write permissions. 794 - interval_wait_time_minutes (int): Time in minutes to wait between permission checks. 795 - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions. 796 797 **Returns:** 798 - bool: True if write permission is granted within the wait time, False otherwise. 799 """ 800 if not cloud_path.startswith("gs://"): 801 raise ValueError("cloud_path must start with 'gs://'") 802 803 # Convert minutes to seconds for the sleep function 804 interval_seconds = interval_wait_time_minutes * 60 805 max_wait_seconds = max_wait_time_minutes * 60 806 807 start_time = time.time() 808 attempt_number = 1 809 810 logging.info( 811 f"Starting to check for write permissions on {cloud_path}. Will check " 812 f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).") 813 814 # First check immediately 815 if self.has_write_permission(cloud_path): 816 logging.info(f"Write permission confirmed on initial check for {cloud_path}") 817 return 818 819 # If first check fails, start periodic checks 820 while time.time() - start_time < max_wait_seconds: 821 elapsed_minutes = (time.time() - start_time) / 60 822 remaining_minutes = max_wait_time_minutes - elapsed_minutes 823 824 logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. " 825 f"Time elapsed: {elapsed_minutes:.1f} minute(s). " 826 f"Time remaining: {remaining_minutes:.1f} minute(s).") 827 828 # Sleep for the interval duration 829 time.sleep(interval_seconds) 830 831 attempt_number += 1 832 logging.info(f"Checking write permissions (attempt {attempt_number})...") 833 834 if self.has_write_permission(cloud_path): 835 elapsed_minutes = (time.time() - start_time) / 60 836 logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}") 837 return 838 839 # If we get here, we've exceeded the maximum wait time 840 raise PermissionError( 841 f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for " 842 f"{cloud_path} after {attempt_number} attempts.")
Variable to be used for the "action" when files should be moved.
Variable to be used for the "action" when files should be copied.
Variable to be used when the generated md5
should be of the hex
type.
Variable to be used when the generated md5
should be of the base64
type.
34class GCPCloudFunctions: 35 """Class to handle GCP Cloud Functions.""" 36 37 def __init__( 38 self, 39 project: Optional[str] = None, 40 service_account_json: Optional[str] = None 41 ) -> None: 42 """ 43 Initialize the GCPCloudFunctions class. 44 45 Authenticates using service account JSON if provided or default credentials, 46 and sets up the Storage Client. 47 48 Args: 49 project: Optional[str] = None 50 The GCP project ID. If not provided, will use project from service account or default. 51 service_account_json: Optional[str] = None 52 Path to service account JSON key file. If provided, will use these credentials. 53 """ 54 # Initialize credentials and project 55 credentials = None 56 default_project = None 57 58 if service_account_json: 59 credentials = service_account.Credentials.from_service_account_file(service_account_json) 60 # Extract project from service account if not specified 61 if not project: 62 with open(service_account_json, 'r') as f: 63 sa_info = json.load(f) 64 project = sa_info.get('project_id') 65 else: 66 # Use default credentials 67 credentials, default_project = default() 68 69 # Set project if not already set 70 if not project: 71 project = default_project 72 73 self.client = storage.Client(credentials=credentials, project=project) 74 """@private""" 75 76 @staticmethod 77 def _process_cloud_path(cloud_path: str) -> dict: 78 """ 79 Process a GCS cloud path into its components. 80 81 Args: 82 cloud_path (str): The GCS cloud path. 83 84 Returns: 85 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 86 """ 87 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 88 bucket_name = str.split(remaining_url, sep="/")[0] 89 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 90 path_components = { 91 "platform_prefix": platform_prefix, 92 "bucket": bucket_name, 93 "blob_url": blob_name 94 } 95 return path_components 96 97 def load_blob_from_full_path(self, full_path: str) -> Blob: 98 """ 99 Load a GCS blob object from a full GCS path. 100 101 **Args:** 102 - full_path (str): The full GCS path. 103 104 **Returns:** 105 - google.cloud.storage.blob.Blob: The GCS blob object. 106 """ 107 file_path_components = self._process_cloud_path(full_path) 108 109 # Specify the billing project 110 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 111 blob = bucket.blob(file_path_components["blob_url"]) 112 113 # If blob exists in GCS reload it so metadata is there 114 if blob.exists(): 115 blob.reload() 116 return blob 117 118 def check_file_exists(self, full_path: str) -> bool: 119 """ 120 Check if a file exists in GCS. 121 122 **Args:** 123 - full_path (str): The full GCS path. 124 125 **Returns:** 126 - bool: `True` if the file exists, `False` otherwise. 127 """ 128 blob = self.load_blob_from_full_path(full_path) 129 return blob.exists() 130 131 @staticmethod 132 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 133 """ 134 Create a dictionary containing file information. 135 136 Args: 137 bucket_name (str): The name of the GCS bucket. 138 blob (Any): The GCS blob object. 139 file_name_only (bool): Whether to return only the file list. 140 141 Returns: 142 dict: A dictionary containing file information. 143 """ 144 if file_name_only: 145 return { 146 "path": f"gs://{bucket_name}/{blob.name}" 147 } 148 return { 149 "name": os.path.basename(blob.name), 150 "path": f"gs://{bucket_name}/{blob.name}", 151 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 152 "file_extension": os.path.splitext(blob.name)[1], 153 "size_in_bytes": blob.size, 154 "md5_hash": blob.md5_hash 155 } 156 157 @staticmethod 158 def _validate_include_blob( 159 blob: Any, 160 bucket_name: str, 161 file_extensions_to_ignore: list[str] = [], 162 file_strings_to_ignore: list[str] = [], 163 file_extensions_to_include: list[str] = [], 164 verbose: bool = False 165 ) -> bool: 166 """ 167 Validate if a blob should be included based on its file extension. 168 169 Args: 170 file_extensions_to_include (list[str]): List of file extensions to include. 171 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 172 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 173 blob (Any): The GCS blob object. 174 verbose (bool): Whether to log files not being included. 175 176 Returns: 177 bool: True if the blob should be included, False otherwise. 178 """ 179 file_path = f"gs://{bucket_name}/{blob.name}" 180 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 181 if verbose: 182 logging.info(f"Skipping {file_path} as it has an extension to ignore") 183 return False 184 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 185 if verbose: 186 logging.info(f"Skipping {file_path} as it does not have an extension to include") 187 return False 188 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 189 if verbose: 190 logging.info(f"Skipping {file_path} as it has a string to ignore") 191 return False 192 return True 193 194 def list_bucket_contents( 195 self, 196 bucket_name: str, 197 prefix: Optional[str] = None, 198 file_extensions_to_ignore: list[str] = [], 199 file_strings_to_ignore: list[str] = [], 200 file_extensions_to_include: list[str] = [], 201 file_name_only: bool = False 202 ) -> list[dict]: 203 """ 204 List contents of a GCS bucket and return a list of dictionaries with file information. 205 206 **Args:** 207 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 208 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 209 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 210 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 211 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 212 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 213 214 **Returns:** 215 - list[dict]: A list of dictionaries containing file information. 216 """ 217 # If the bucket name starts with gs://, remove it 218 if bucket_name.startswith("gs://"): 219 bucket_name = bucket_name.split("/")[2].strip() 220 221 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 222 223 # Get the bucket object and set user_project for Requester Pays 224 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 225 226 # List blobs within the bucket 227 blobs = bucket.list_blobs(prefix=prefix) 228 logging.info("Finished listing blobs. Processing files now.") 229 230 # Create a list of dictionaries containing file information 231 file_list = [ 232 self._create_bucket_contents_dict( 233 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 234 ) 235 for blob in blobs 236 if self._validate_include_blob( 237 blob=blob, 238 file_extensions_to_ignore=file_extensions_to_ignore, 239 file_strings_to_ignore=file_strings_to_ignore, 240 file_extensions_to_include=file_extensions_to_include, 241 bucket_name=bucket_name 242 ) and not blob.name.endswith("/") 243 ] 244 logging.info(f"Found {len(file_list)} files in bucket") 245 return file_list 246 247 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 248 """ 249 Copy a file from one GCS location to another. 250 251 **Args:** 252 - src_cloud_path (str): The source GCS path. 253 - full_destination_path (str): The destination GCS path. 254 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 255 """ 256 try: 257 src_blob = self.load_blob_from_full_path(src_cloud_path) 258 dest_blob = self.load_blob_from_full_path(full_destination_path) 259 260 # Use rewrite so no timeouts 261 rewrite_token = False 262 263 while True: 264 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 265 src_blob, token=rewrite_token 266 ) 267 if verbose: 268 logging.info( 269 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 270 ) 271 if not rewrite_token: 272 break 273 274 except Exception as e: 275 logging.error( 276 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 277 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 278 ) 279 raise 280 281 def delete_cloud_file(self, full_cloud_path: str) -> None: 282 """ 283 Delete a file from GCS. 284 285 **Args:** 286 - full_cloud_path (str): The GCS path of the file to delete. 287 """ 288 blob = self.load_blob_from_full_path(full_cloud_path) 289 blob.delete() 290 291 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 292 """ 293 Move a file from one GCS location to another. 294 295 **Args:** 296 - src_cloud_path (str): The source GCS path. 297 - full_destination_path (str): The destination GCS path. 298 """ 299 self.copy_cloud_file(src_cloud_path, full_destination_path) 300 self.delete_cloud_file(src_cloud_path) 301 302 def get_filesize(self, target_path: str) -> int: 303 """ 304 Get the size of a file in GCS. 305 306 **Args:** 307 - target_path (str): The GCS path of the file. 308 309 **Returns:** 310 - int: The size of the file in bytes. 311 """ 312 blob = self.load_blob_from_full_path(target_path) 313 return blob.size 314 315 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 316 """ 317 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 318 319 **Args:** 320 - src_cloud_path (str): The source GCS path. 321 - dest_cloud_path (str): The destination GCS path. 322 323 **Returns:** 324 - bool: `True` if the files are identical, `False` otherwise. 325 """ 326 src_blob = self.load_blob_from_full_path(src_cloud_path) 327 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 328 329 # If either blob is None or does not exist 330 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 331 return False 332 # If the MD5 hashes exist 333 if src_blob.md5_hash and dest_blob.md5_hash: 334 # And are the same return True 335 if src_blob.md5_hash == dest_blob.md5_hash: 336 return True 337 else: 338 # If md5 do not exist (for larger files they may not) check size matches 339 if src_blob.size == dest_blob.size: 340 return True 341 # Otherwise, return False 342 return False 343 344 def delete_multiple_files( 345 self, 346 files_to_delete: list[str], 347 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 348 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 349 verbose: bool = False, 350 job_complete_for_logging: int = 500 351 ) -> None: 352 """ 353 Delete multiple cloud files in parallel using multi-threading. 354 355 **Args:** 356 - files_to_delete (list[str]): List of GCS paths of the files to delete. 357 - workers (int, optional): Number of worker threads. Defaults to `10`. 358 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 359 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 360 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 361 """ 362 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 363 364 MultiThreadedJobs().run_multi_threaded_job( 365 workers=workers, 366 function=self.delete_cloud_file, 367 list_of_jobs_args_list=list_of_jobs_args_list, 368 max_retries=max_retries, 369 fail_on_error=True, 370 verbose=verbose, 371 collect_output=False, 372 jobs_complete_for_logging=job_complete_for_logging 373 ) 374 375 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 376 """ 377 Validate if source and destination files are identical. 378 379 **Args:** 380 - source_file (str): The source file path. 381 - full_destination_path (str): The destination file path. 382 383 **Returns:** 384 dict: The file dictionary of the files with a boolean indicating if they are identical. 385 """ 386 if self.validate_files_are_same(source_file, full_destination_path): 387 identical = True 388 else: 389 identical = False 390 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 391 392 def loop_and_log_validation_files_multithreaded( 393 self, 394 files_to_validate: list[dict], 395 log_difference: bool, 396 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 397 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 398 job_complete_for_logging: int = 500 399 ) -> list[dict]: 400 """ 401 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 402 403 **Args:** 404 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 405 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 406 this at the start of a copy/move operation to check if files are already copied. 407 - workers (int, optional): Number of worker threads. Defaults to `10`. 408 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 409 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 410 411 **Returns:** 412 - list[dict]: List of dictionaries containing files that are **not** identical. 413 """ 414 logging.info(f"Validating if {len(files_to_validate)} files are identical") 415 416 # Prepare jobs: pass the necessary arguments to each validation 417 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 418 419 # Use multithreaded job runner to validate the files 420 checked_files = MultiThreadedJobs().run_multi_threaded_job( 421 workers=workers, 422 function=self._validate_file_pair, 423 list_of_jobs_args_list=jobs, 424 collect_output=True, 425 max_retries=max_retries, 426 jobs_complete_for_logging=job_complete_for_logging 427 ) 428 # If any files failed to load, raise an exception 429 if files_to_validate and None in checked_files: # type: ignore[operator] 430 logging.error("Failed to validate all files, could not load some blobs") 431 raise Exception("Failed to validate all files") 432 433 # Get all files that are not identical 434 not_identical_files = [ 435 file_dict 436 for file_dict in checked_files # type: ignore[operator, union-attr] 437 if not file_dict['identical'] 438 ] 439 if not_identical_files: 440 if log_difference: 441 for file_dict in not_identical_files: 442 logging.warning( 443 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 444 ) 445 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 446 return not_identical_files 447 448 def multithread_copy_of_files_with_validation( 449 self, 450 files_to_copy: list[dict], 451 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 452 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 453 skip_check_if_already_copied: bool = False 454 ) -> None: 455 """ 456 Copy multiple files in parallel with validation. 457 458 **Args:** 459 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 460 Dictionary should have keys `source_file` and `full_destination_path` 461 - workers (int): Number of worker threads. Defaults to `10`. 462 - max_retries (int): Maximum number of retries. Defaults to `5` 463 - skip_check_if_already_copied (bool, optional): Whether to skip checking 464 if files are already copied and start copying right away. Defaults to `False`. 465 """ 466 if skip_check_if_already_copied: 467 logging.info("Skipping check if files are already copied") 468 updated_files_to_move = files_to_copy 469 else: 470 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 471 files_to_copy, 472 log_difference=False, 473 workers=workers, 474 max_retries=max_retries 475 ) 476 # If all files are already copied, return 477 if not updated_files_to_move: 478 logging.info("All files are already copied") 479 return None 480 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 481 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 482 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 483 # Validate that all files were copied successfully 484 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 485 files_to_copy, 486 workers=workers, 487 log_difference=True, 488 max_retries=max_retries 489 ) 490 if files_not_moved_successfully: 491 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 492 raise Exception("Failed to copy all files") 493 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 494 return None 495 496 def move_or_copy_multiple_files( 497 self, files_to_move: list[dict], 498 action: str, 499 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 500 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 501 verbose: bool = False, 502 jobs_complete_for_logging: int = 500 503 ) -> None: 504 """ 505 Move or copy multiple files in parallel. 506 507 **Args:** 508 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 509 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 510 or `ops_utils.gcp_utils.COPY`). 511 - workers (int): Number of worker threads. Defaults to `10`. 512 - max_retries (int): Maximum number of retries. Defaults to `5`. 513 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 514 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 515 516 **Raises:** 517 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 518 """ 519 if action == MOVE: 520 cloud_function = self.move_cloud_file 521 elif action == COPY: 522 cloud_function = self.copy_cloud_file 523 else: 524 raise ValueError("Must either select move or copy") 525 526 list_of_jobs_args_list = [ 527 [ 528 file_dict['source_file'], file_dict['full_destination_path'] 529 ] 530 for file_dict in files_to_move 531 ] 532 MultiThreadedJobs().run_multi_threaded_job( 533 workers=workers, 534 function=cloud_function, 535 list_of_jobs_args_list=list_of_jobs_args_list, 536 max_retries=max_retries, 537 fail_on_error=True, 538 verbose=verbose, 539 collect_output=False, 540 jobs_complete_for_logging=jobs_complete_for_logging 541 ) 542 543 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 544 """ 545 Read the content of a file from GCS. 546 547 **Args:** 548 - cloud_path (str): The GCS path of the file to read. 549 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 550 551 **Returns:** 552 - bytes: The content of the file as bytes. 553 """ 554 blob = self.load_blob_from_full_path(cloud_path) 555 # Download the file content as bytes 556 content_bytes = blob.download_as_bytes() 557 # Convert bytes to string 558 content_str = content_bytes.decode(encoding) 559 return content_str 560 561 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 562 """ 563 Upload a file to GCS. 564 565 **Args:** 566 - destination_path (str): The destination GCS path. 567 - source_file (str): The source file path. 568 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 569 """ 570 blob = self.load_blob_from_full_path(destination_path) 571 if custom_metadata: 572 blob.metadata = custom_metadata 573 blob.upload_from_filename(source_file) 574 575 def get_object_md5( 576 self, 577 file_path: str, 578 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 579 chunk_size: int = parse_size("256 KB"), 580 logging_bytes: int = parse_size("1 GB"), 581 returned_md5_format: str = "hex" 582 ) -> str: 583 """ 584 Calculate the MD5 checksum of a file in GCS. 585 586 **Args:** 587 - file_path (str): The GCS path of the file. 588 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 589 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 590 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 591 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 592 593 **Returns:** 594 - str: The MD5 checksum of the file. 595 596 **Raises:** 597 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 598 or `ops_utils.gcp_utils.MD5_BASE64` 599 """ 600 if returned_md5_format not in ["hex", "base64"]: 601 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 602 603 blob = self.load_blob_from_full_path(file_path) 604 605 # Create an MD5 hash object 606 md5_hash = hashlib.md5() 607 608 blob_size_str = format_size(blob.size) 609 logging.info(f"Streaming {file_path} which is {blob_size_str}") 610 # Use a BytesIO stream to collect data in chunks and upload it 611 buffer = io.BytesIO() 612 total_bytes_streamed = 0 613 # Keep track of the last logged size for data logging 614 last_logged = 0 615 616 with blob.open("rb") as source_stream: 617 while True: 618 chunk = source_stream.read(chunk_size) 619 if not chunk: 620 break 621 md5_hash.update(chunk) 622 buffer.write(chunk) 623 total_bytes_streamed += len(chunk) 624 # Log progress every 1 gb if verbose used 625 if total_bytes_streamed - last_logged >= logging_bytes: 626 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 627 last_logged = total_bytes_streamed 628 629 if returned_md5_format == "hex": 630 md5 = md5_hash.hexdigest() 631 logging.info(f"MD5 (hex) for {file_path}: {md5}") 632 elif returned_md5_format == "base64": 633 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 634 logging.info(f"MD5 (base64) for {file_path}: {md5}") 635 return md5 636 637 def set_acl_public_read(self, cloud_path: str) -> None: 638 """ 639 Set the file in the bucket to be publicly readable. 640 641 **Args:** 642 - cloud_path (str): The GCS path of the file to be set as public readable. 643 """ 644 blob = self.load_blob_from_full_path(cloud_path) 645 blob.acl.all().grant_read() 646 blob.acl.save() 647 648 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 649 """ 650 Set the file in the bucket to grant OWNER permission to a specific group. 651 652 **Args:** 653 - cloud_path (str): The GCS path of the file. 654 - group_email (str): The email of the group to grant OWNER permission 655 """ 656 blob = self.load_blob_from_full_path(cloud_path) 657 blob.acl.group(group_email).grant_owner() 658 blob.acl.save() 659 660 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 661 """ 662 Set Cache-Control metadata for a file. 663 664 **Args:** 665 - cloud_path (str): The GCS path of the file. 666 - cache_control (str): The Cache-Control metadata to set. 667 """ 668 blob = self.load_blob_from_full_path(cloud_path) 669 blob.cache_control = cache_control 670 blob.patch() 671 672 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 673 """ 674 Get the most recent blob in the bucket. 675 676 If the blob with the most recent timestamp doesn't have 677 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 678 recent file until a log with useful information is encountered. This is useful when combing through 679 GCP activity logs for Terra workspace buckets. 680 681 **Args:** 682 - bucket_name (str): The GCS bucket name. 683 684 **Returns:** 685 - Optional tuple of the blob found and the file contents from the blob 686 """ 687 blobs = sorted( 688 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 689 ) 690 for blob in blobs: 691 # Download the file contents as a string 692 file_contents = blob.download_as_text() 693 694 # Check if the content matches the undesired format 695 lines = file_contents.splitlines() 696 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 697 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 698 continue 699 700 # If it doesn't match the undesired format, return its content 701 logging.info(f"Found valid file: {blob.name}") 702 return blob, file_contents 703 704 logging.info("No valid files found.") 705 return None 706 707 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 708 """ 709 Write content to a file in GCS. 710 711 **Args:** 712 - cloud_path (str): The GCS path of the file to write. 713 - file_contents (str): The content to write. 714 """ 715 blob = self.load_blob_from_full_path(cloud_path) 716 blob.upload_from_string(file_contents) 717 logging.info(f"Successfully wrote content to {cloud_path}") 718 719 @staticmethod 720 def get_active_gcloud_account() -> str: 721 """ 722 Get the active GCP email for the current account. 723 724 **Returns:** 725 - str: The active GCP account email. 726 """ 727 result = subprocess.run( 728 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 729 capture_output=True, 730 text=True, 731 check=True 732 ) 733 return result.stdout.strip() 734 735 def has_write_permission(self, cloud_path: str) -> bool: 736 """ 737 Check if the current user has permission to write to a GCP path. 738 739 This method tests write access by attempting to update the metadata 740 of an existing blob or create a zero-byte temporary file if the blob 741 doesn't exist. The temporary file is deleted immediately if created. 742 743 **Args:** 744 - cloud_path (str): The GCS path to check for write permissions. 745 746 **Returns:** 747 - bool: True if the user has write permission, False otherwise. 748 """ 749 if not cloud_path.startswith("gs://"): 750 raise ValueError("cloud_path must start with 'gs://'") 751 if cloud_path.endswith("/"): 752 logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp") 753 cloud_path = f"{cloud_path}permission_test_temp" 754 try: 755 blob = self.load_blob_from_full_path(cloud_path) 756 if blob.exists(): 757 # Try updating metadata (doesn't change the content) 758 original_metadata = blob.metadata or {} 759 test_metadata = original_metadata.copy() 760 test_metadata["_write_permission_test"] = "true" 761 762 blob.metadata = test_metadata 763 blob.patch() 764 765 # Restore the original metadata 766 blob.metadata = original_metadata 767 blob.patch() 768 769 logging.info(f"Write permission confirmed for existing blob {cloud_path}") 770 return True 771 else: 772 # Try writing a temporary file to the bucket 773 blob.upload_from_string("") 774 775 # Clean up the test file 776 blob.delete() 777 logging.info(f"Write permission confirmed for {cloud_path}") 778 return True 779 except Forbidden: 780 logging.warning(f"No write permission on path {cloud_path}") 781 return False 782 except GoogleAPICallError as e: 783 logging.warning(f"Error testing write access to {cloud_path}: {e}") 784 return False 785 786 def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None: 787 """ 788 Wait for write permissions on a GCP path, checking at regular intervals. 789 790 This method will periodically check if the user has write permission on the specified cloud path. 791 It will continue checking until either write permission is granted or the maximum wait time is reached. 792 793 **Args:** 794 - cloud_path (str): The GCS path to check for write permissions. 795 - interval_wait_time_minutes (int): Time in minutes to wait between permission checks. 796 - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions. 797 798 **Returns:** 799 - bool: True if write permission is granted within the wait time, False otherwise. 800 """ 801 if not cloud_path.startswith("gs://"): 802 raise ValueError("cloud_path must start with 'gs://'") 803 804 # Convert minutes to seconds for the sleep function 805 interval_seconds = interval_wait_time_minutes * 60 806 max_wait_seconds = max_wait_time_minutes * 60 807 808 start_time = time.time() 809 attempt_number = 1 810 811 logging.info( 812 f"Starting to check for write permissions on {cloud_path}. Will check " 813 f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).") 814 815 # First check immediately 816 if self.has_write_permission(cloud_path): 817 logging.info(f"Write permission confirmed on initial check for {cloud_path}") 818 return 819 820 # If first check fails, start periodic checks 821 while time.time() - start_time < max_wait_seconds: 822 elapsed_minutes = (time.time() - start_time) / 60 823 remaining_minutes = max_wait_time_minutes - elapsed_minutes 824 825 logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. " 826 f"Time elapsed: {elapsed_minutes:.1f} minute(s). " 827 f"Time remaining: {remaining_minutes:.1f} minute(s).") 828 829 # Sleep for the interval duration 830 time.sleep(interval_seconds) 831 832 attempt_number += 1 833 logging.info(f"Checking write permissions (attempt {attempt_number})...") 834 835 if self.has_write_permission(cloud_path): 836 elapsed_minutes = (time.time() - start_time) / 60 837 logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}") 838 return 839 840 # If we get here, we've exceeded the maximum wait time 841 raise PermissionError( 842 f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for " 843 f"{cloud_path} after {attempt_number} attempts.")
Class to handle GCP Cloud Functions.
37 def __init__( 38 self, 39 project: Optional[str] = None, 40 service_account_json: Optional[str] = None 41 ) -> None: 42 """ 43 Initialize the GCPCloudFunctions class. 44 45 Authenticates using service account JSON if provided or default credentials, 46 and sets up the Storage Client. 47 48 Args: 49 project: Optional[str] = None 50 The GCP project ID. If not provided, will use project from service account or default. 51 service_account_json: Optional[str] = None 52 Path to service account JSON key file. If provided, will use these credentials. 53 """ 54 # Initialize credentials and project 55 credentials = None 56 default_project = None 57 58 if service_account_json: 59 credentials = service_account.Credentials.from_service_account_file(service_account_json) 60 # Extract project from service account if not specified 61 if not project: 62 with open(service_account_json, 'r') as f: 63 sa_info = json.load(f) 64 project = sa_info.get('project_id') 65 else: 66 # Use default credentials 67 credentials, default_project = default() 68 69 # Set project if not already set 70 if not project: 71 project = default_project 72 73 self.client = storage.Client(credentials=credentials, project=project) 74 """@private"""
Initialize the GCPCloudFunctions class.
Authenticates using service account JSON if provided or default credentials, and sets up the Storage Client.
Args: project: Optional[str] = None The GCP project ID. If not provided, will use project from service account or default. service_account_json: Optional[str] = None Path to service account JSON key file. If provided, will use these credentials.
97 def load_blob_from_full_path(self, full_path: str) -> Blob: 98 """ 99 Load a GCS blob object from a full GCS path. 100 101 **Args:** 102 - full_path (str): The full GCS path. 103 104 **Returns:** 105 - google.cloud.storage.blob.Blob: The GCS blob object. 106 """ 107 file_path_components = self._process_cloud_path(full_path) 108 109 # Specify the billing project 110 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 111 blob = bucket.blob(file_path_components["blob_url"]) 112 113 # If blob exists in GCS reload it so metadata is there 114 if blob.exists(): 115 blob.reload() 116 return blob
Load a GCS blob object from a full GCS path.
Args:
- full_path (str): The full GCS path.
Returns:
- google.cloud.storage.blob.Blob: The GCS blob object.
118 def check_file_exists(self, full_path: str) -> bool: 119 """ 120 Check if a file exists in GCS. 121 122 **Args:** 123 - full_path (str): The full GCS path. 124 125 **Returns:** 126 - bool: `True` if the file exists, `False` otherwise. 127 """ 128 blob = self.load_blob_from_full_path(full_path) 129 return blob.exists()
Check if a file exists in GCS.
Args:
- full_path (str): The full GCS path.
Returns:
- bool:
True
if the file exists,False
otherwise.
194 def list_bucket_contents( 195 self, 196 bucket_name: str, 197 prefix: Optional[str] = None, 198 file_extensions_to_ignore: list[str] = [], 199 file_strings_to_ignore: list[str] = [], 200 file_extensions_to_include: list[str] = [], 201 file_name_only: bool = False 202 ) -> list[dict]: 203 """ 204 List contents of a GCS bucket and return a list of dictionaries with file information. 205 206 **Args:** 207 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 208 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 209 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 210 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 211 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 212 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 213 214 **Returns:** 215 - list[dict]: A list of dictionaries containing file information. 216 """ 217 # If the bucket name starts with gs://, remove it 218 if bucket_name.startswith("gs://"): 219 bucket_name = bucket_name.split("/")[2].strip() 220 221 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 222 223 # Get the bucket object and set user_project for Requester Pays 224 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 225 226 # List blobs within the bucket 227 blobs = bucket.list_blobs(prefix=prefix) 228 logging.info("Finished listing blobs. Processing files now.") 229 230 # Create a list of dictionaries containing file information 231 file_list = [ 232 self._create_bucket_contents_dict( 233 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 234 ) 235 for blob in blobs 236 if self._validate_include_blob( 237 blob=blob, 238 file_extensions_to_ignore=file_extensions_to_ignore, 239 file_strings_to_ignore=file_strings_to_ignore, 240 file_extensions_to_include=file_extensions_to_include, 241 bucket_name=bucket_name 242 ) and not blob.name.endswith("/") 243 ] 244 logging.info(f"Found {len(file_list)} files in bucket") 245 return file_list
List contents of a GCS bucket and return a list of dictionaries with file information.
Args:
- bucket_name (str): The name of the GCS bucket. If includes
gs://
, it will be removed. - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
- file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
- file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
- file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
- file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to
False
.
Returns:
- list[dict]: A list of dictionaries containing file information.
247 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 248 """ 249 Copy a file from one GCS location to another. 250 251 **Args:** 252 - src_cloud_path (str): The source GCS path. 253 - full_destination_path (str): The destination GCS path. 254 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 255 """ 256 try: 257 src_blob = self.load_blob_from_full_path(src_cloud_path) 258 dest_blob = self.load_blob_from_full_path(full_destination_path) 259 260 # Use rewrite so no timeouts 261 rewrite_token = False 262 263 while True: 264 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 265 src_blob, token=rewrite_token 266 ) 267 if verbose: 268 logging.info( 269 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 270 ) 271 if not rewrite_token: 272 break 273 274 except Exception as e: 275 logging.error( 276 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 277 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 278 ) 279 raise
Copy a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
- verbose (bool, optional): Whether to log progress. Defaults to
False
.
281 def delete_cloud_file(self, full_cloud_path: str) -> None: 282 """ 283 Delete a file from GCS. 284 285 **Args:** 286 - full_cloud_path (str): The GCS path of the file to delete. 287 """ 288 blob = self.load_blob_from_full_path(full_cloud_path) 289 blob.delete()
Delete a file from GCS.
Args:
- full_cloud_path (str): The GCS path of the file to delete.
291 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 292 """ 293 Move a file from one GCS location to another. 294 295 **Args:** 296 - src_cloud_path (str): The source GCS path. 297 - full_destination_path (str): The destination GCS path. 298 """ 299 self.copy_cloud_file(src_cloud_path, full_destination_path) 300 self.delete_cloud_file(src_cloud_path)
Move a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
302 def get_filesize(self, target_path: str) -> int: 303 """ 304 Get the size of a file in GCS. 305 306 **Args:** 307 - target_path (str): The GCS path of the file. 308 309 **Returns:** 310 - int: The size of the file in bytes. 311 """ 312 blob = self.load_blob_from_full_path(target_path) 313 return blob.size
Get the size of a file in GCS.
Args:
- target_path (str): The GCS path of the file.
Returns:
- int: The size of the file in bytes.
315 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 316 """ 317 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 318 319 **Args:** 320 - src_cloud_path (str): The source GCS path. 321 - dest_cloud_path (str): The destination GCS path. 322 323 **Returns:** 324 - bool: `True` if the files are identical, `False` otherwise. 325 """ 326 src_blob = self.load_blob_from_full_path(src_cloud_path) 327 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 328 329 # If either blob is None or does not exist 330 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 331 return False 332 # If the MD5 hashes exist 333 if src_blob.md5_hash and dest_blob.md5_hash: 334 # And are the same return True 335 if src_blob.md5_hash == dest_blob.md5_hash: 336 return True 337 else: 338 # If md5 do not exist (for larger files they may not) check size matches 339 if src_blob.size == dest_blob.size: 340 return True 341 # Otherwise, return False 342 return False
Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
Args:
- src_cloud_path (str): The source GCS path.
- dest_cloud_path (str): The destination GCS path.
Returns:
- bool:
True
if the files are identical,False
otherwise.
344 def delete_multiple_files( 345 self, 346 files_to_delete: list[str], 347 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 348 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 349 verbose: bool = False, 350 job_complete_for_logging: int = 500 351 ) -> None: 352 """ 353 Delete multiple cloud files in parallel using multi-threading. 354 355 **Args:** 356 - files_to_delete (list[str]): List of GCS paths of the files to delete. 357 - workers (int, optional): Number of worker threads. Defaults to `10`. 358 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 359 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 360 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 361 """ 362 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 363 364 MultiThreadedJobs().run_multi_threaded_job( 365 workers=workers, 366 function=self.delete_cloud_file, 367 list_of_jobs_args_list=list_of_jobs_args_list, 368 max_retries=max_retries, 369 fail_on_error=True, 370 verbose=verbose, 371 collect_output=False, 372 jobs_complete_for_logging=job_complete_for_logging 373 )
Delete multiple cloud files in parallel using multi-threading.
Args:
- files_to_delete (list[str]): List of GCS paths of the files to delete.
- workers (int, optional): Number of worker threads. Defaults to
10
. - max_retries (int, optional): Maximum number of retries. Defaults to
5
. - verbose (bool, optional): Whether to log each job's success. Defaults to
False
. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
392 def loop_and_log_validation_files_multithreaded( 393 self, 394 files_to_validate: list[dict], 395 log_difference: bool, 396 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 397 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 398 job_complete_for_logging: int = 500 399 ) -> list[dict]: 400 """ 401 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 402 403 **Args:** 404 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 405 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 406 this at the start of a copy/move operation to check if files are already copied. 407 - workers (int, optional): Number of worker threads. Defaults to `10`. 408 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 409 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 410 411 **Returns:** 412 - list[dict]: List of dictionaries containing files that are **not** identical. 413 """ 414 logging.info(f"Validating if {len(files_to_validate)} files are identical") 415 416 # Prepare jobs: pass the necessary arguments to each validation 417 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 418 419 # Use multithreaded job runner to validate the files 420 checked_files = MultiThreadedJobs().run_multi_threaded_job( 421 workers=workers, 422 function=self._validate_file_pair, 423 list_of_jobs_args_list=jobs, 424 collect_output=True, 425 max_retries=max_retries, 426 jobs_complete_for_logging=job_complete_for_logging 427 ) 428 # If any files failed to load, raise an exception 429 if files_to_validate and None in checked_files: # type: ignore[operator] 430 logging.error("Failed to validate all files, could not load some blobs") 431 raise Exception("Failed to validate all files") 432 433 # Get all files that are not identical 434 not_identical_files = [ 435 file_dict 436 for file_dict in checked_files # type: ignore[operator, union-attr] 437 if not file_dict['identical'] 438 ] 439 if not_identical_files: 440 if log_difference: 441 for file_dict in not_identical_files: 442 logging.warning( 443 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 444 ) 445 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 446 return not_identical_files
Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
Args:
- files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
- log_difference (bool): Whether to log differences if files are not identical. Set
False
if you are running this at the start of a copy/move operation to check if files are already copied. - workers (int, optional): Number of worker threads. Defaults to
10
. - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to
5
. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
Returns:
- list[dict]: List of dictionaries containing files that are not identical.
448 def multithread_copy_of_files_with_validation( 449 self, 450 files_to_copy: list[dict], 451 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 452 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 453 skip_check_if_already_copied: bool = False 454 ) -> None: 455 """ 456 Copy multiple files in parallel with validation. 457 458 **Args:** 459 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 460 Dictionary should have keys `source_file` and `full_destination_path` 461 - workers (int): Number of worker threads. Defaults to `10`. 462 - max_retries (int): Maximum number of retries. Defaults to `5` 463 - skip_check_if_already_copied (bool, optional): Whether to skip checking 464 if files are already copied and start copying right away. Defaults to `False`. 465 """ 466 if skip_check_if_already_copied: 467 logging.info("Skipping check if files are already copied") 468 updated_files_to_move = files_to_copy 469 else: 470 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 471 files_to_copy, 472 log_difference=False, 473 workers=workers, 474 max_retries=max_retries 475 ) 476 # If all files are already copied, return 477 if not updated_files_to_move: 478 logging.info("All files are already copied") 479 return None 480 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 481 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 482 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 483 # Validate that all files were copied successfully 484 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 485 files_to_copy, 486 workers=workers, 487 log_difference=True, 488 max_retries=max_retries 489 ) 490 if files_not_moved_successfully: 491 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 492 raise Exception("Failed to copy all files") 493 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 494 return None
Copy multiple files in parallel with validation.
Args:
- files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
Dictionary should have keys
source_file
andfull_destination_path
- workers (int): Number of worker threads. Defaults to
10
. - max_retries (int): Maximum number of retries. Defaults to
5
- skip_check_if_already_copied (bool, optional): Whether to skip checking
if files are already copied and start copying right away. Defaults to
False
.
496 def move_or_copy_multiple_files( 497 self, files_to_move: list[dict], 498 action: str, 499 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 500 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 501 verbose: bool = False, 502 jobs_complete_for_logging: int = 500 503 ) -> None: 504 """ 505 Move or copy multiple files in parallel. 506 507 **Args:** 508 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 509 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 510 or `ops_utils.gcp_utils.COPY`). 511 - workers (int): Number of worker threads. Defaults to `10`. 512 - max_retries (int): Maximum number of retries. Defaults to `5`. 513 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 514 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 515 516 **Raises:** 517 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 518 """ 519 if action == MOVE: 520 cloud_function = self.move_cloud_file 521 elif action == COPY: 522 cloud_function = self.copy_cloud_file 523 else: 524 raise ValueError("Must either select move or copy") 525 526 list_of_jobs_args_list = [ 527 [ 528 file_dict['source_file'], file_dict['full_destination_path'] 529 ] 530 for file_dict in files_to_move 531 ] 532 MultiThreadedJobs().run_multi_threaded_job( 533 workers=workers, 534 function=cloud_function, 535 list_of_jobs_args_list=list_of_jobs_args_list, 536 max_retries=max_retries, 537 fail_on_error=True, 538 verbose=verbose, 539 collect_output=False, 540 jobs_complete_for_logging=jobs_complete_for_logging 541 )
Move or copy multiple files in parallel.
Args:
- files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
- action (str): The action to perform (should be one of
ops_utils.gcp_utils.MOVE
orops_utils.gcp_utils.COPY
). - workers (int): Number of worker threads. Defaults to
10
. - max_retries (int): Maximum number of retries. Defaults to
5
. - verbose (bool, optional): Whether to log each job's success. Defaults to
False
. - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
Raises:
- ValueError: If the action is not one of
ops_utils.gcp_utils.MOVE
orops_utils.gcp_utils.COPY
.
543 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 544 """ 545 Read the content of a file from GCS. 546 547 **Args:** 548 - cloud_path (str): The GCS path of the file to read. 549 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 550 551 **Returns:** 552 - bytes: The content of the file as bytes. 553 """ 554 blob = self.load_blob_from_full_path(cloud_path) 555 # Download the file content as bytes 556 content_bytes = blob.download_as_bytes() 557 # Convert bytes to string 558 content_str = content_bytes.decode(encoding) 559 return content_str
Read the content of a file from GCS.
Args:
- cloud_path (str): The GCS path of the file to read.
- encoding (str, optional): The encoding to use. Defaults to
utf-8
.
Returns:
- bytes: The content of the file as bytes.
561 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 562 """ 563 Upload a file to GCS. 564 565 **Args:** 566 - destination_path (str): The destination GCS path. 567 - source_file (str): The source file path. 568 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 569 """ 570 blob = self.load_blob_from_full_path(destination_path) 571 if custom_metadata: 572 blob.metadata = custom_metadata 573 blob.upload_from_filename(source_file)
Upload a file to GCS.
Args:
- destination_path (str): The destination GCS path.
- source_file (str): The source file path.
- custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
575 def get_object_md5( 576 self, 577 file_path: str, 578 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 579 chunk_size: int = parse_size("256 KB"), 580 logging_bytes: int = parse_size("1 GB"), 581 returned_md5_format: str = "hex" 582 ) -> str: 583 """ 584 Calculate the MD5 checksum of a file in GCS. 585 586 **Args:** 587 - file_path (str): The GCS path of the file. 588 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 589 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 590 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 591 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 592 593 **Returns:** 594 - str: The MD5 checksum of the file. 595 596 **Raises:** 597 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 598 or `ops_utils.gcp_utils.MD5_BASE64` 599 """ 600 if returned_md5_format not in ["hex", "base64"]: 601 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 602 603 blob = self.load_blob_from_full_path(file_path) 604 605 # Create an MD5 hash object 606 md5_hash = hashlib.md5() 607 608 blob_size_str = format_size(blob.size) 609 logging.info(f"Streaming {file_path} which is {blob_size_str}") 610 # Use a BytesIO stream to collect data in chunks and upload it 611 buffer = io.BytesIO() 612 total_bytes_streamed = 0 613 # Keep track of the last logged size for data logging 614 last_logged = 0 615 616 with blob.open("rb") as source_stream: 617 while True: 618 chunk = source_stream.read(chunk_size) 619 if not chunk: 620 break 621 md5_hash.update(chunk) 622 buffer.write(chunk) 623 total_bytes_streamed += len(chunk) 624 # Log progress every 1 gb if verbose used 625 if total_bytes_streamed - last_logged >= logging_bytes: 626 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 627 last_logged = total_bytes_streamed 628 629 if returned_md5_format == "hex": 630 md5 = md5_hash.hexdigest() 631 logging.info(f"MD5 (hex) for {file_path}: {md5}") 632 elif returned_md5_format == "base64": 633 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 634 logging.info(f"MD5 (base64) for {file_path}: {md5}") 635 return md5
Calculate the MD5 checksum of a file in GCS.
Args:
- file_path (str): The GCS path of the file.
- chunk_size (int, optional): The size of each chunk to read. Defaults to
256 KB
. - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to
1 GB
. - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to
hex
. Options areops_utils.gcp_utils.MD5_HEX
orops_utils.gcp_utils.MD5_BASE64
.
Returns:
- str: The MD5 checksum of the file.
Raises:
- ValueError: If the
returned_md5_format
is not one ofops_utils.gcp_utils.MD5_HEX
orops_utils.gcp_utils.MD5_BASE64
637 def set_acl_public_read(self, cloud_path: str) -> None: 638 """ 639 Set the file in the bucket to be publicly readable. 640 641 **Args:** 642 - cloud_path (str): The GCS path of the file to be set as public readable. 643 """ 644 blob = self.load_blob_from_full_path(cloud_path) 645 blob.acl.all().grant_read() 646 blob.acl.save()
Set the file in the bucket to be publicly readable.
Args:
- cloud_path (str): The GCS path of the file to be set as public readable.
648 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 649 """ 650 Set the file in the bucket to grant OWNER permission to a specific group. 651 652 **Args:** 653 - cloud_path (str): The GCS path of the file. 654 - group_email (str): The email of the group to grant OWNER permission 655 """ 656 blob = self.load_blob_from_full_path(cloud_path) 657 blob.acl.group(group_email).grant_owner() 658 blob.acl.save()
Set the file in the bucket to grant OWNER permission to a specific group.
Args:
- cloud_path (str): The GCS path of the file.
- group_email (str): The email of the group to grant OWNER permission
660 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 661 """ 662 Set Cache-Control metadata for a file. 663 664 **Args:** 665 - cloud_path (str): The GCS path of the file. 666 - cache_control (str): The Cache-Control metadata to set. 667 """ 668 blob = self.load_blob_from_full_path(cloud_path) 669 blob.cache_control = cache_control 670 blob.patch()
Set Cache-Control metadata for a file.
Args:
- cloud_path (str): The GCS path of the file.
- cache_control (str): The Cache-Control metadata to set.
672 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 673 """ 674 Get the most recent blob in the bucket. 675 676 If the blob with the most recent timestamp doesn't have 677 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 678 recent file until a log with useful information is encountered. This is useful when combing through 679 GCP activity logs for Terra workspace buckets. 680 681 **Args:** 682 - bucket_name (str): The GCS bucket name. 683 684 **Returns:** 685 - Optional tuple of the blob found and the file contents from the blob 686 """ 687 blobs = sorted( 688 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 689 ) 690 for blob in blobs: 691 # Download the file contents as a string 692 file_contents = blob.download_as_text() 693 694 # Check if the content matches the undesired format 695 lines = file_contents.splitlines() 696 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 697 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 698 continue 699 700 # If it doesn't match the undesired format, return its content 701 logging.info(f"Found valid file: {blob.name}") 702 return blob, file_contents 703 704 logging.info("No valid files found.") 705 return None
Get the most recent blob in the bucket.
If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.
Args:
- bucket_name (str): The GCS bucket name.
Returns:
- Optional tuple of the blob found and the file contents from the blob
707 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 708 """ 709 Write content to a file in GCS. 710 711 **Args:** 712 - cloud_path (str): The GCS path of the file to write. 713 - file_contents (str): The content to write. 714 """ 715 blob = self.load_blob_from_full_path(cloud_path) 716 blob.upload_from_string(file_contents) 717 logging.info(f"Successfully wrote content to {cloud_path}")
Write content to a file in GCS.
Args:
- cloud_path (str): The GCS path of the file to write.
- file_contents (str): The content to write.
719 @staticmethod 720 def get_active_gcloud_account() -> str: 721 """ 722 Get the active GCP email for the current account. 723 724 **Returns:** 725 - str: The active GCP account email. 726 """ 727 result = subprocess.run( 728 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 729 capture_output=True, 730 text=True, 731 check=True 732 ) 733 return result.stdout.strip()
Get the active GCP email for the current account.
Returns:
- str: The active GCP account email.
735 def has_write_permission(self, cloud_path: str) -> bool: 736 """ 737 Check if the current user has permission to write to a GCP path. 738 739 This method tests write access by attempting to update the metadata 740 of an existing blob or create a zero-byte temporary file if the blob 741 doesn't exist. The temporary file is deleted immediately if created. 742 743 **Args:** 744 - cloud_path (str): The GCS path to check for write permissions. 745 746 **Returns:** 747 - bool: True if the user has write permission, False otherwise. 748 """ 749 if not cloud_path.startswith("gs://"): 750 raise ValueError("cloud_path must start with 'gs://'") 751 if cloud_path.endswith("/"): 752 logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp") 753 cloud_path = f"{cloud_path}permission_test_temp" 754 try: 755 blob = self.load_blob_from_full_path(cloud_path) 756 if blob.exists(): 757 # Try updating metadata (doesn't change the content) 758 original_metadata = blob.metadata or {} 759 test_metadata = original_metadata.copy() 760 test_metadata["_write_permission_test"] = "true" 761 762 blob.metadata = test_metadata 763 blob.patch() 764 765 # Restore the original metadata 766 blob.metadata = original_metadata 767 blob.patch() 768 769 logging.info(f"Write permission confirmed for existing blob {cloud_path}") 770 return True 771 else: 772 # Try writing a temporary file to the bucket 773 blob.upload_from_string("") 774 775 # Clean up the test file 776 blob.delete() 777 logging.info(f"Write permission confirmed for {cloud_path}") 778 return True 779 except Forbidden: 780 logging.warning(f"No write permission on path {cloud_path}") 781 return False 782 except GoogleAPICallError as e: 783 logging.warning(f"Error testing write access to {cloud_path}: {e}") 784 return False
Check if the current user has permission to write to a GCP path.
This method tests write access by attempting to update the metadata of an existing blob or create a zero-byte temporary file if the blob doesn't exist. The temporary file is deleted immediately if created.
Args:
- cloud_path (str): The GCS path to check for write permissions.
Returns:
- bool: True if the user has write permission, False otherwise.
786 def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None: 787 """ 788 Wait for write permissions on a GCP path, checking at regular intervals. 789 790 This method will periodically check if the user has write permission on the specified cloud path. 791 It will continue checking until either write permission is granted or the maximum wait time is reached. 792 793 **Args:** 794 - cloud_path (str): The GCS path to check for write permissions. 795 - interval_wait_time_minutes (int): Time in minutes to wait between permission checks. 796 - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions. 797 798 **Returns:** 799 - bool: True if write permission is granted within the wait time, False otherwise. 800 """ 801 if not cloud_path.startswith("gs://"): 802 raise ValueError("cloud_path must start with 'gs://'") 803 804 # Convert minutes to seconds for the sleep function 805 interval_seconds = interval_wait_time_minutes * 60 806 max_wait_seconds = max_wait_time_minutes * 60 807 808 start_time = time.time() 809 attempt_number = 1 810 811 logging.info( 812 f"Starting to check for write permissions on {cloud_path}. Will check " 813 f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).") 814 815 # First check immediately 816 if self.has_write_permission(cloud_path): 817 logging.info(f"Write permission confirmed on initial check for {cloud_path}") 818 return 819 820 # If first check fails, start periodic checks 821 while time.time() - start_time < max_wait_seconds: 822 elapsed_minutes = (time.time() - start_time) / 60 823 remaining_minutes = max_wait_time_minutes - elapsed_minutes 824 825 logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. " 826 f"Time elapsed: {elapsed_minutes:.1f} minute(s). " 827 f"Time remaining: {remaining_minutes:.1f} minute(s).") 828 829 # Sleep for the interval duration 830 time.sleep(interval_seconds) 831 832 attempt_number += 1 833 logging.info(f"Checking write permissions (attempt {attempt_number})...") 834 835 if self.has_write_permission(cloud_path): 836 elapsed_minutes = (time.time() - start_time) / 60 837 logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}") 838 return 839 840 # If we get here, we've exceeded the maximum wait time 841 raise PermissionError( 842 f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for " 843 f"{cloud_path} after {attempt_number} attempts.")
Wait for write permissions on a GCP path, checking at regular intervals.
This method will periodically check if the user has write permission on the specified cloud path. It will continue checking until either write permission is granted or the maximum wait time is reached.
Args:
- cloud_path (str): The GCS path to check for write permissions.
- interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
- max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
Returns:
- bool: True if write permission is granted within the wait time, False otherwise.