ops_utils.gcp_utils
Module for GCP utilities.
1"""Module for GCP utilities.""" 2import os 3import logging 4import time 5import io 6import json 7import hashlib 8import base64 9import subprocess 10 11from humanfriendly import format_size, parse_size 12from mimetypes import guess_type 13from typing import Optional, Any 14from google.cloud.storage.blob import Blob 15from google.api_core.exceptions import Forbidden, GoogleAPICallError 16from google.oauth2 import service_account 17from google.cloud import storage 18from google.auth import default 19 20from .vars import ARG_DEFAULTS 21from .thread_pool_executor_util import MultiThreadedJobs 22 23MOVE = "move" 24"""Variable to be used for the "action" when files should be moved.""" 25COPY = "copy" 26"""Variable to be used for the "action" when files should be copied.""" 27MD5_HEX = "hex" 28"""Variable to be used when the generated `md5` should be of the `hex` type.""" 29MD5_BASE64 = "base64" 30"""Variable to be used when the generated `md5` should be of the `base64` type.""" 31 32 33class GCPCloudFunctions: 34 """Class to handle GCP Cloud Functions.""" 35 36 def __init__( 37 self, 38 project: Optional[str] = None, 39 service_account_json: Optional[str] = None 40 ) -> None: 41 """ 42 Initialize the GCPCloudFunctions class. 43 44 Authenticates using service account JSON if provided or default credentials, 45 and sets up the Storage Client. 46 47 Args: 48 project: Optional[str] = None 49 The GCP project ID. If not provided, will use project from service account or default. 50 service_account_json: Optional[str] = None 51 Path to service account JSON key file. If provided, will use these credentials. 52 """ 53 # Initialize credentials and project 54 credentials = None 55 default_project = None 56 57 if service_account_json: 58 credentials = service_account.Credentials.from_service_account_file(service_account_json) 59 # Extract project from service account if not specified 60 if not project: 61 with open(service_account_json, 'r') as f: 62 sa_info = json.load(f) 63 project = sa_info.get('project_id') 64 else: 65 # Use default credentials 66 credentials, default_project = default() 67 68 # Set project if not already set 69 if not project: 70 project = default_project 71 72 self.client = storage.Client(credentials=credentials, project=project) 73 """@private""" 74 75 @staticmethod 76 def _process_cloud_path(cloud_path: str) -> dict: 77 """ 78 Process a GCS cloud path into its components. 79 80 Args: 81 cloud_path (str): The GCS cloud path. 82 83 Returns: 84 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 85 """ 86 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 87 bucket_name = str.split(remaining_url, sep="/")[0] 88 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 89 path_components = { 90 "platform_prefix": platform_prefix, 91 "bucket": bucket_name, 92 "blob_url": blob_name 93 } 94 return path_components 95 96 def load_blob_from_full_path(self, full_path: str) -> Blob: 97 """ 98 Load a GCS blob object from a full GCS path. 99 100 **Args:** 101 - full_path (str): The full GCS path. 102 103 **Returns:** 104 - google.cloud.storage.blob.Blob: The GCS blob object. 105 """ 106 file_path_components = self._process_cloud_path(full_path) 107 108 # Specify the billing project 109 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 110 blob = bucket.blob(file_path_components["blob_url"]) 111 112 # If blob exists in GCS reload it so metadata is there 113 if blob.exists(): 114 blob.reload() 115 return blob 116 117 def check_file_exists(self, full_path: str) -> bool: 118 """ 119 Check if a file exists in GCS. 120 121 **Args:** 122 - full_path (str): The full GCS path. 123 124 **Returns:** 125 - bool: `True` if the file exists, `False` otherwise. 126 """ 127 blob = self.load_blob_from_full_path(full_path) 128 return blob.exists() 129 130 @staticmethod 131 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 132 """ 133 Create a dictionary containing file information. 134 135 Args: 136 bucket_name (str): The name of the GCS bucket. 137 blob (Any): The GCS blob object. 138 file_name_only (bool): Whether to return only the file list. 139 140 Returns: 141 dict: A dictionary containing file information. 142 """ 143 if file_name_only: 144 return { 145 "path": f"gs://{bucket_name}/{blob.name}" 146 } 147 return { 148 "name": os.path.basename(blob.name), 149 "path": f"gs://{bucket_name}/{blob.name}", 150 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 151 "file_extension": os.path.splitext(blob.name)[1], 152 "size_in_bytes": blob.size, 153 "md5_hash": blob.md5_hash, 154 "time_created": blob.time_created.isoformat() if blob.time_created else None, 155 "last_modified": blob.updated.isoformat() if blob.updated else None, 156 } 157 158 @staticmethod 159 def _validate_include_blob( 160 blob: Any, 161 bucket_name: str, 162 file_extensions_to_ignore: list[str] = [], 163 file_strings_to_ignore: list[str] = [], 164 file_extensions_to_include: list[str] = [], 165 verbose: bool = False 166 ) -> bool: 167 """ 168 Validate if a blob should be included based on its file extension. 169 170 Args: 171 file_extensions_to_include (list[str]): List of file extensions to include. 172 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 173 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 174 blob (Any): The GCS blob object. 175 verbose (bool): Whether to log files not being included. 176 177 Returns: 178 bool: True if the blob should be included, False otherwise. 179 """ 180 file_path = f"gs://{bucket_name}/{blob.name}" 181 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 182 if verbose: 183 logging.info(f"Skipping {file_path} as it has an extension to ignore") 184 return False 185 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 186 if verbose: 187 logging.info(f"Skipping {file_path} as it does not have an extension to include") 188 return False 189 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 190 if verbose: 191 logging.info(f"Skipping {file_path} as it has a string to ignore") 192 return False 193 return True 194 195 def list_bucket_contents( 196 self, 197 bucket_name: str, 198 prefix: Optional[str] = None, 199 file_extensions_to_ignore: list[str] = [], 200 file_strings_to_ignore: list[str] = [], 201 file_extensions_to_include: list[str] = [], 202 file_name_only: bool = False, 203 verbose: bool = False, 204 log_progress_interval: int = 10000 205 ) -> list[dict]: 206 """ 207 List contents of a GCS bucket and return a list of dictionaries with file information. 208 209 **Args:** 210 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 211 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 212 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 213 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 214 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 215 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 216 - verbose (bool, optional): Whether to log files not being included. Defaults to `False`. 217 - log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to `10000`. 218 219 **Returns:** 220 - list[dict]: A list of dictionaries containing file information. 221 """ 222 # If the bucket name starts with gs://, remove it 223 if bucket_name.startswith("gs://"): 224 bucket_name = bucket_name.split("/")[2].strip() 225 226 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 227 228 # Get the bucket object and set user_project for Requester Pays 229 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 230 231 # list_blobs returns a lazy iterator — no network calls happen until we iterate. 232 # page_size=1000 is the GCS maximum, minimising the number of paginated round-trips. 233 prefix_msg = f" with prefix '{prefix}'" if prefix else "" 234 logging.info(f"Starting to page through blobs in bucket '{bucket_name}'{prefix_msg}...") 235 blobs = bucket.list_blobs(prefix=prefix, page_size=1000) 236 237 # Iterate with progress logging so large buckets don't appear stuck 238 file_list = [] 239 for blob in blobs: 240 if blob.name.endswith("/"): 241 continue 242 if not self._validate_include_blob( 243 blob=blob, 244 file_extensions_to_ignore=file_extensions_to_ignore, 245 file_strings_to_ignore=file_strings_to_ignore, 246 file_extensions_to_include=file_extensions_to_include, 247 bucket_name=bucket_name 248 ): 249 continue 250 file_list.append( 251 self._create_bucket_contents_dict( 252 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 253 ) 254 ) 255 if verbose and len(file_list) % log_progress_interval == 0: 256 logging.info(f"Processed {len(file_list):,} files so far...") 257 258 logging.info(f"Found {len(file_list):,} files in bucket") 259 return file_list 260 261 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 262 """ 263 Copy a file from one GCS location to another. 264 265 **Args:** 266 - src_cloud_path (str): The source GCS path. 267 - full_destination_path (str): The destination GCS path. 268 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 269 """ 270 try: 271 src_blob = self.load_blob_from_full_path(src_cloud_path) 272 dest_blob = self.load_blob_from_full_path(full_destination_path) 273 274 # Use rewrite so no timeouts 275 rewrite_token = False 276 277 while True: 278 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 279 src_blob, token=rewrite_token 280 ) 281 if verbose: 282 logging.info( 283 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 284 ) 285 if not rewrite_token: 286 break 287 288 except Exception as e: 289 logging.error( 290 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 291 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 292 ) 293 raise 294 295 def delete_cloud_file(self, full_cloud_path: str) -> None: 296 """ 297 Delete a file from GCS. 298 299 **Args:** 300 - full_cloud_path (str): The GCS path of the file to delete. 301 """ 302 blob = self.load_blob_from_full_path(full_cloud_path) 303 blob.delete() 304 305 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 306 """ 307 Move a file from one GCS location to another. 308 309 **Args:** 310 - src_cloud_path (str): The source GCS path. 311 - full_destination_path (str): The destination GCS path. 312 """ 313 self.copy_cloud_file(src_cloud_path, full_destination_path) 314 self.delete_cloud_file(src_cloud_path) 315 316 def get_filesize(self, target_path: str) -> int: 317 """ 318 Get the size of a file in GCS. 319 320 **Args:** 321 - target_path (str): The GCS path of the file. 322 323 **Returns:** 324 - int: The size of the file in bytes. 325 """ 326 blob = self.load_blob_from_full_path(target_path) 327 return blob.size 328 329 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 330 """ 331 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 332 333 **Args:** 334 - src_cloud_path (str): The source GCS path. 335 - dest_cloud_path (str): The destination GCS path. 336 337 **Returns:** 338 - bool: `True` if the files are identical, `False` otherwise. 339 """ 340 src_blob = self.load_blob_from_full_path(src_cloud_path) 341 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 342 343 # If either blob is None or does not exist 344 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 345 return False 346 # If the MD5 hashes exist 347 if src_blob.md5_hash and dest_blob.md5_hash: 348 # And are the same return True 349 if src_blob.md5_hash == dest_blob.md5_hash: 350 return True 351 else: 352 # If md5 do not exist (for larger files they may not) check size matches 353 if src_blob.size == dest_blob.size: 354 return True 355 # Otherwise, return False 356 return False 357 358 def delete_multiple_files( 359 self, 360 files_to_delete: list[str], 361 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 362 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 363 verbose: bool = False, 364 job_complete_for_logging: int = 500 365 ) -> None: 366 """ 367 Delete multiple cloud files in parallel using multi-threading. 368 369 **Args:** 370 - files_to_delete (list[str]): List of GCS paths of the files to delete. 371 - workers (int, optional): Number of worker threads. Defaults to `10`. 372 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 373 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 374 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 375 """ 376 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 377 378 MultiThreadedJobs().run_multi_threaded_job( 379 workers=workers, 380 function=self.delete_cloud_file, 381 list_of_jobs_args_list=list_of_jobs_args_list, 382 max_retries=max_retries, 383 fail_on_error=True, 384 verbose=verbose, 385 collect_output=False, 386 jobs_complete_for_logging=job_complete_for_logging 387 ) 388 389 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 390 """ 391 Validate if source and destination files are identical. 392 393 **Args:** 394 - source_file (str): The source file path. 395 - full_destination_path (str): The destination file path. 396 397 **Returns:** 398 dict: The file dictionary of the files with a boolean indicating if they are identical. 399 """ 400 if self.validate_files_are_same(source_file, full_destination_path): 401 identical = True 402 else: 403 identical = False 404 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 405 406 def loop_and_log_validation_files_multithreaded( 407 self, 408 files_to_validate: list[dict], 409 log_difference: bool, 410 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 411 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 412 job_complete_for_logging: int = 500 413 ) -> list[dict]: 414 """ 415 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 416 417 **Args:** 418 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 419 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 420 this at the start of a copy/move operation to check if files are already copied. 421 - workers (int, optional): Number of worker threads. Defaults to `10`. 422 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 423 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 424 425 **Returns:** 426 - list[dict]: List of dictionaries containing files that are **not** identical. 427 """ 428 logging.info(f"Validating if {len(files_to_validate)} files are identical") 429 430 # Prepare jobs: pass the necessary arguments to each validation 431 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 432 433 # Use multithreaded job runner to validate the files 434 checked_files = MultiThreadedJobs().run_multi_threaded_job( 435 workers=workers, 436 function=self._validate_file_pair, 437 list_of_jobs_args_list=jobs, 438 collect_output=True, 439 max_retries=max_retries, 440 jobs_complete_for_logging=job_complete_for_logging 441 ) 442 # If any files failed to load, raise an exception 443 if files_to_validate and None in checked_files: # type: ignore[operator] 444 logging.error("Failed to validate all files, could not load some blobs") 445 raise Exception("Failed to validate all files") 446 447 # Get all files that are not identical 448 not_identical_files = [ 449 file_dict 450 for file_dict in checked_files # type: ignore[operator, union-attr] 451 if not file_dict['identical'] 452 ] 453 if not_identical_files: 454 if log_difference: 455 for file_dict in not_identical_files: 456 logging.warning( 457 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 458 ) 459 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 460 return not_identical_files 461 462 def multithread_copy_of_files_with_validation( 463 self, 464 files_to_copy: list[dict], 465 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 466 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 467 skip_check_if_already_copied: bool = False 468 ) -> None: 469 """ 470 Copy multiple files in parallel with validation. 471 472 **Args:** 473 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 474 Dictionary should have keys `source_file` and `full_destination_path` 475 - workers (int): Number of worker threads. Defaults to `10`. 476 - max_retries (int): Maximum number of retries. Defaults to `5` 477 - skip_check_if_already_copied (bool, optional): Whether to skip checking 478 if files are already copied and start copying right away. Defaults to `False`. 479 """ 480 if skip_check_if_already_copied: 481 logging.info("Skipping check if files are already copied") 482 updated_files_to_move = files_to_copy 483 else: 484 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 485 files_to_copy, 486 log_difference=False, 487 workers=workers, 488 max_retries=max_retries 489 ) 490 # If all files are already copied, return 491 if not updated_files_to_move: 492 logging.info("All files are already copied") 493 return None 494 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 495 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 496 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 497 # Validate that all files were copied successfully 498 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 499 files_to_copy, 500 workers=workers, 501 log_difference=True, 502 max_retries=max_retries 503 ) 504 if files_not_moved_successfully: 505 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 506 raise Exception("Failed to copy all files") 507 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 508 return None 509 510 def move_or_copy_multiple_files( 511 self, files_to_move: list[dict], 512 action: str, 513 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 514 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 515 verbose: bool = False, 516 jobs_complete_for_logging: int = 500 517 ) -> None: 518 """ 519 Move or copy multiple files in parallel. 520 521 **Args:** 522 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 523 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 524 or `ops_utils.gcp_utils.COPY`). 525 - workers (int): Number of worker threads. Defaults to `10`. 526 - max_retries (int): Maximum number of retries. Defaults to `5`. 527 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 528 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 529 530 **Raises:** 531 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 532 """ 533 if action == MOVE: 534 cloud_function = self.move_cloud_file 535 elif action == COPY: 536 cloud_function = self.copy_cloud_file 537 else: 538 raise ValueError("Must either select move or copy") 539 540 list_of_jobs_args_list = [ 541 [ 542 file_dict['source_file'], file_dict['full_destination_path'] 543 ] 544 for file_dict in files_to_move 545 ] 546 MultiThreadedJobs().run_multi_threaded_job( 547 workers=workers, 548 function=cloud_function, 549 list_of_jobs_args_list=list_of_jobs_args_list, 550 max_retries=max_retries, 551 fail_on_error=True, 552 verbose=verbose, 553 collect_output=False, 554 jobs_complete_for_logging=jobs_complete_for_logging 555 ) 556 557 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 558 """ 559 Read the content of a file from GCS. 560 561 **Args:** 562 - cloud_path (str): The GCS path of the file to read. 563 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 564 565 **Returns:** 566 - bytes: The content of the file as bytes. 567 """ 568 blob = self.load_blob_from_full_path(cloud_path) 569 # Download the file content as bytes 570 content_bytes = blob.download_as_bytes() 571 # Convert bytes to string 572 content_str = content_bytes.decode(encoding) 573 return content_str 574 575 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 576 """ 577 Upload a file to GCS. 578 579 **Args:** 580 - destination_path (str): The destination GCS path. 581 - source_file (str): The source file path. 582 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 583 """ 584 blob = self.load_blob_from_full_path(destination_path) 585 if custom_metadata: 586 blob.metadata = custom_metadata 587 blob.upload_from_filename(source_file) 588 589 def get_object_md5( 590 self, 591 file_path: str, 592 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 593 chunk_size: int = parse_size("256 KB"), 594 logging_bytes: int = parse_size("1 GB"), 595 returned_md5_format: str = "hex" 596 ) -> str: 597 """ 598 Calculate the MD5 checksum of a file in GCS. 599 600 **Args:** 601 - file_path (str): The GCS path of the file. 602 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 603 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 604 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 605 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 606 607 **Returns:** 608 - str: The MD5 checksum of the file. 609 610 **Raises:** 611 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 612 or `ops_utils.gcp_utils.MD5_BASE64` 613 """ 614 if returned_md5_format not in ["hex", "base64"]: 615 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 616 617 blob = self.load_blob_from_full_path(file_path) 618 619 # Create an MD5 hash object 620 md5_hash = hashlib.md5() 621 622 blob_size_str = format_size(blob.size) 623 logging.info(f"Streaming {file_path} which is {blob_size_str}") 624 # Use a BytesIO stream to collect data in chunks and upload it 625 buffer = io.BytesIO() 626 total_bytes_streamed = 0 627 # Keep track of the last logged size for data logging 628 last_logged = 0 629 630 with blob.open("rb") as source_stream: 631 while True: 632 chunk = source_stream.read(chunk_size) 633 if not chunk: 634 break 635 md5_hash.update(chunk) 636 buffer.write(chunk) 637 total_bytes_streamed += len(chunk) 638 # Log progress every 1 gb if verbose used 639 if total_bytes_streamed - last_logged >= logging_bytes: 640 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 641 last_logged = total_bytes_streamed 642 643 if returned_md5_format == "hex": 644 md5 = md5_hash.hexdigest() 645 logging.info(f"MD5 (hex) for {file_path}: {md5}") 646 elif returned_md5_format == "base64": 647 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 648 logging.info(f"MD5 (base64) for {file_path}: {md5}") 649 return md5 650 651 def set_acl_public_read(self, cloud_path: str) -> None: 652 """ 653 Set the file in the bucket to be publicly readable. 654 655 **Args:** 656 - cloud_path (str): The GCS path of the file to be set as public readable. 657 """ 658 blob = self.load_blob_from_full_path(cloud_path) 659 blob.acl.all().grant_read() 660 blob.acl.save() 661 662 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 663 """ 664 Set the file in the bucket to grant OWNER permission to a specific group. 665 666 **Args:** 667 - cloud_path (str): The GCS path of the file. 668 - group_email (str): The email of the group to grant OWNER permission 669 """ 670 blob = self.load_blob_from_full_path(cloud_path) 671 blob.acl.group(group_email).grant_owner() 672 blob.acl.save() 673 674 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 675 """ 676 Set Cache-Control metadata for a file. 677 678 **Args:** 679 - cloud_path (str): The GCS path of the file. 680 - cache_control (str): The Cache-Control metadata to set. 681 """ 682 blob = self.load_blob_from_full_path(cloud_path) 683 blob.cache_control = cache_control 684 blob.patch() 685 686 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 687 """ 688 Get the most recent blob in the bucket. 689 690 If the blob with the most recent timestamp doesn't have 691 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 692 recent file until a log with useful information is encountered. This is useful when combing through 693 GCP activity logs for Terra workspace buckets. 694 695 **Args:** 696 - bucket_name (str): The GCS bucket name. 697 698 **Returns:** 699 - Optional tuple of the blob found and the file contents from the blob 700 """ 701 blobs = sorted( 702 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 703 ) 704 for blob in blobs: 705 # Download the file contents as a string 706 file_contents = blob.download_as_text() 707 708 # Check if the content matches the undesired format 709 lines = file_contents.splitlines() 710 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 711 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 712 continue 713 714 # If it doesn't match the undesired format, return its content 715 logging.info(f"Found valid file: {blob.name}") 716 return blob, file_contents 717 718 logging.info("No valid files found.") 719 return None 720 721 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 722 """ 723 Write content to a file in GCS. 724 725 **Args:** 726 - cloud_path (str): The GCS path of the file to write. 727 - file_contents (str): The content to write. 728 """ 729 blob = self.load_blob_from_full_path(cloud_path) 730 blob.upload_from_string(file_contents) 731 logging.info(f"Successfully wrote content to {cloud_path}") 732 733 @staticmethod 734 def get_active_gcloud_account() -> str: 735 """ 736 Get the active GCP email for the current account. 737 738 **Returns:** 739 - str: The active GCP account email. 740 """ 741 result = subprocess.run( 742 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 743 capture_output=True, 744 text=True, 745 check=True 746 ) 747 return result.stdout.strip() 748 749 def has_write_permission(self, cloud_path: str) -> bool: 750 """ 751 Check if the current user has permission to write to a GCP path. 752 753 This method tests write access by attempting to update the metadata 754 of an existing blob or create a zero-byte temporary file if the blob 755 doesn't exist. The temporary file is deleted immediately if created. 756 757 **Args:** 758 - cloud_path (str): The GCS path to check for write permissions. 759 760 **Returns:** 761 - bool: True if the user has write permission, False otherwise. 762 """ 763 if not cloud_path.startswith("gs://"): 764 raise ValueError("cloud_path must start with 'gs://'") 765 if cloud_path.endswith("/"): 766 logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp") 767 cloud_path = f"{cloud_path}permission_test_temp" 768 try: 769 blob = self.load_blob_from_full_path(cloud_path) 770 if blob.exists(): 771 # Try updating metadata (doesn't change the content) 772 original_metadata = blob.metadata or {} 773 test_metadata = original_metadata.copy() 774 test_metadata["_write_permission_test"] = "true" 775 776 blob.metadata = test_metadata 777 blob.patch() 778 779 # Restore the original metadata 780 blob.metadata = original_metadata 781 blob.patch() 782 783 logging.info(f"Write permission confirmed for existing blob {cloud_path}") 784 return True 785 else: 786 # Try writing a temporary file to the bucket 787 blob.upload_from_string("") 788 789 # Clean up the test file 790 blob.delete() 791 logging.info(f"Write permission confirmed for {cloud_path}") 792 return True 793 except Forbidden: 794 logging.warning(f"No write permission on path {cloud_path}") 795 return False 796 except GoogleAPICallError as e: 797 logging.warning(f"Error testing write access to {cloud_path}: {e}") 798 return False 799 800 def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None: 801 """ 802 Wait for write permissions on a GCP path, checking at regular intervals. 803 804 This method will periodically check if the user has write permission on the specified cloud path. 805 It will continue checking until either write permission is granted or the maximum wait time is reached. 806 807 **Args:** 808 - cloud_path (str): The GCS path to check for write permissions. 809 - interval_wait_time_minutes (int): Time in minutes to wait between permission checks. 810 - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions. 811 812 **Returns:** 813 - bool: True if write permission is granted within the wait time, False otherwise. 814 """ 815 if not cloud_path.startswith("gs://"): 816 raise ValueError("cloud_path must start with 'gs://'") 817 818 # Convert minutes to seconds for the sleep function 819 interval_seconds = interval_wait_time_minutes * 60 820 max_wait_seconds = max_wait_time_minutes * 60 821 822 start_time = time.time() 823 attempt_number = 1 824 825 logging.info( 826 f"Starting to check for write permissions on {cloud_path}. Will check " 827 f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).") 828 829 # First check immediately 830 if self.has_write_permission(cloud_path): 831 logging.info(f"Write permission confirmed on initial check for {cloud_path}") 832 return 833 834 # If first check fails, start periodic checks 835 while time.time() - start_time < max_wait_seconds: 836 elapsed_minutes = (time.time() - start_time) / 60 837 remaining_minutes = max_wait_time_minutes - elapsed_minutes 838 839 logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. " 840 f"Time elapsed: {elapsed_minutes:.1f} minute(s). " 841 f"Time remaining: {remaining_minutes:.1f} minute(s).") 842 843 # Sleep for the interval duration 844 time.sleep(interval_seconds) 845 846 attempt_number += 1 847 logging.info(f"Checking write permissions (attempt {attempt_number})...") 848 849 if self.has_write_permission(cloud_path): 850 elapsed_minutes = (time.time() - start_time) / 60 851 logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}") 852 return 853 854 # If we get here, we've exceeded the maximum wait time 855 raise PermissionError( 856 f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for " 857 f"{cloud_path} after {attempt_number} attempts.") 858 859 def _load_blob_full_path_to_bucket_contents_dict(self, full_path: str, file_name_only: bool) -> dict: 860 """Load a blob from a full GCS path and convert it to a bucket-contents dict. 861 862 This is intended for use with multithreaded batch operations. 863 864 Raises: 865 ValueError: If the blob does not exist. 866 """ 867 file_path_components = self._process_cloud_path(full_path) 868 blob = self.load_blob_from_full_path(full_path) 869 870 # load_blob_from_full_path reloads metadata when exists() is True. 871 # Ensure the object exists so downstream consumers always get a valid dict. 872 if not blob.exists(): 873 raise ValueError(f"Blob does not exist: {full_path}") 874 875 return self._create_bucket_contents_dict( 876 bucket_name=file_path_components["bucket"], 877 blob=blob, 878 file_name_only=file_name_only 879 ) 880 881 def check_files_exist_multithreaded( 882 self, 883 full_paths: list[str], 884 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 885 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 886 job_complete_for_logging: int = 500 887 ) -> dict[str, bool]: 888 """Check existence of multiple GCS files in parallel. 889 890 **Args:** 891 - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to check. 892 - workers (int, optional): Number of worker threads. Defaults to `10`. 893 - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`. 894 - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`. 895 896 **Returns:** 897 - dict[str, bool]: A dictionary where each key is a GCS path and the value is ``True`` if the file exists, 898 ``False`` otherwise. 899 """ 900 # _check_file_exists_in_gcs is needed because check_file_exists returns a plain bool, which would make it 901 # impossible to associate each result back to its path after the threads complete. By wrapping 902 # it here we return both the path and the result together, so the final dict comprehension 903 # can correctly map path -> bool regardless of the order results come back from the thread pool. 904 def _check_file_exists_in_gcs(path: str) -> dict: 905 return {"path": path, "exists": self.check_file_exists(path)} 906 907 jobs = [[path] for path in full_paths] 908 909 results = MultiThreadedJobs().run_multi_threaded_job( 910 workers=workers, 911 function=_check_file_exists_in_gcs, 912 list_of_jobs_args_list=jobs, 913 collect_output=True, 914 max_retries=max_retries, 915 fail_on_error=True, 916 jobs_complete_for_logging=job_complete_for_logging 917 ) 918 return {item["path"]: item["exists"] for item in results} # type: ignore[union-attr] 919 920 def read_files_multithreaded( 921 self, 922 full_paths: list[str], 923 encoding: str = "utf-8", 924 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 925 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 926 job_complete_for_logging: int = 500 927 ) -> dict[str, str]: 928 """Read the contents of multiple GCS files in parallel. 929 930 **Args:** 931 - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to read. 932 - encoding (str, optional): The encoding to use when decoding file contents. Defaults to `utf-8`. 933 - workers (int, optional): Number of worker threads. Defaults to `10`. 934 - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`. 935 - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`. 936 937 **Returns:** 938 - dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string. 939 """ 940 # _read_file_contents_from_gcs is needed because read_file returns a plain str, which would make it 941 # impossible to associate each result back to its path after the threads complete. By wrapping 942 # it here we return both the path and the contents together, so the final dict comprehension 943 # can correctly map path -> contents regardless of the order results come back from the thread pool. 944 def _read_file_contents_from_gcs(path: str) -> dict: 945 return {"path": path, "contents": self.read_file(path, encoding=encoding)} 946 947 jobs = [[path] for path in full_paths] 948 949 results = MultiThreadedJobs().run_multi_threaded_job( 950 workers=workers, 951 function=_read_file_contents_from_gcs, 952 list_of_jobs_args_list=jobs, 953 collect_output=True, 954 max_retries=max_retries, 955 fail_on_error=True, 956 jobs_complete_for_logging=job_complete_for_logging 957 ) 958 return {item["path"]: item["contents"] for item in results} # type: ignore[union-attr] 959 960 def load_blobs_from_full_paths_multithreaded( 961 self, 962 full_paths: list[str], 963 file_name_only: bool = False, 964 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 965 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 966 job_complete_for_logging: int = 500 967 ) -> list[dict]: 968 """Load multiple blobs in parallel from a list of full GCS paths. 969 970 Args: 971 full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object). 972 file_name_only: Whether to return only the file path in each dict. 973 workers: Number of worker threads. 974 max_retries: Maximum number of retries per job. 975 job_complete_for_logging: Emit progress logs every N completed jobs. 976 977 Returns: 978 List of dictionaries shaped like :meth:`_create_bucket_contents_dict`. 979 980 Raises: 981 Exception: If any blob cannot be loaded / does not exist / returns no output. 982 """ 983 if not full_paths: 984 return [] 985 986 # De-duplicate input paths and keep order 987 unique_paths = list(dict.fromkeys(full_paths)) 988 jobs = [(path, file_name_only) for path in unique_paths] 989 990 results = MultiThreadedJobs().run_multi_threaded_job( 991 workers=workers, 992 function=self._load_blob_full_path_to_bucket_contents_dict, 993 list_of_jobs_args_list=jobs, 994 collect_output=True, 995 max_retries=max_retries, 996 fail_on_error=True, 997 jobs_complete_for_logging=job_complete_for_logging 998 ) 999 1000 # If any jobs failed, MultiThreadedJobs would have raised. Still defensively validate output. 1001 if results is None or (unique_paths and (None in results)): # type: ignore[operator] 1002 raise Exception("Failed to load all blobs") 1003 1004 if len(results) != len(unique_paths) or any(not item for item in results): # type: ignore[arg-type] 1005 raise Exception("Failed to load all blobs") 1006 1007 return results # type: ignore[return-value]
Variable to be used for the "action" when files should be moved.
Variable to be used for the "action" when files should be copied.
Variable to be used when the generated md5 should be of the hex type.
Variable to be used when the generated md5 should be of the base64 type.
34class GCPCloudFunctions: 35 """Class to handle GCP Cloud Functions.""" 36 37 def __init__( 38 self, 39 project: Optional[str] = None, 40 service_account_json: Optional[str] = None 41 ) -> None: 42 """ 43 Initialize the GCPCloudFunctions class. 44 45 Authenticates using service account JSON if provided or default credentials, 46 and sets up the Storage Client. 47 48 Args: 49 project: Optional[str] = None 50 The GCP project ID. If not provided, will use project from service account or default. 51 service_account_json: Optional[str] = None 52 Path to service account JSON key file. If provided, will use these credentials. 53 """ 54 # Initialize credentials and project 55 credentials = None 56 default_project = None 57 58 if service_account_json: 59 credentials = service_account.Credentials.from_service_account_file(service_account_json) 60 # Extract project from service account if not specified 61 if not project: 62 with open(service_account_json, 'r') as f: 63 sa_info = json.load(f) 64 project = sa_info.get('project_id') 65 else: 66 # Use default credentials 67 credentials, default_project = default() 68 69 # Set project if not already set 70 if not project: 71 project = default_project 72 73 self.client = storage.Client(credentials=credentials, project=project) 74 """@private""" 75 76 @staticmethod 77 def _process_cloud_path(cloud_path: str) -> dict: 78 """ 79 Process a GCS cloud path into its components. 80 81 Args: 82 cloud_path (str): The GCS cloud path. 83 84 Returns: 85 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 86 """ 87 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 88 bucket_name = str.split(remaining_url, sep="/")[0] 89 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 90 path_components = { 91 "platform_prefix": platform_prefix, 92 "bucket": bucket_name, 93 "blob_url": blob_name 94 } 95 return path_components 96 97 def load_blob_from_full_path(self, full_path: str) -> Blob: 98 """ 99 Load a GCS blob object from a full GCS path. 100 101 **Args:** 102 - full_path (str): The full GCS path. 103 104 **Returns:** 105 - google.cloud.storage.blob.Blob: The GCS blob object. 106 """ 107 file_path_components = self._process_cloud_path(full_path) 108 109 # Specify the billing project 110 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 111 blob = bucket.blob(file_path_components["blob_url"]) 112 113 # If blob exists in GCS reload it so metadata is there 114 if blob.exists(): 115 blob.reload() 116 return blob 117 118 def check_file_exists(self, full_path: str) -> bool: 119 """ 120 Check if a file exists in GCS. 121 122 **Args:** 123 - full_path (str): The full GCS path. 124 125 **Returns:** 126 - bool: `True` if the file exists, `False` otherwise. 127 """ 128 blob = self.load_blob_from_full_path(full_path) 129 return blob.exists() 130 131 @staticmethod 132 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 133 """ 134 Create a dictionary containing file information. 135 136 Args: 137 bucket_name (str): The name of the GCS bucket. 138 blob (Any): The GCS blob object. 139 file_name_only (bool): Whether to return only the file list. 140 141 Returns: 142 dict: A dictionary containing file information. 143 """ 144 if file_name_only: 145 return { 146 "path": f"gs://{bucket_name}/{blob.name}" 147 } 148 return { 149 "name": os.path.basename(blob.name), 150 "path": f"gs://{bucket_name}/{blob.name}", 151 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 152 "file_extension": os.path.splitext(blob.name)[1], 153 "size_in_bytes": blob.size, 154 "md5_hash": blob.md5_hash, 155 "time_created": blob.time_created.isoformat() if blob.time_created else None, 156 "last_modified": blob.updated.isoformat() if blob.updated else None, 157 } 158 159 @staticmethod 160 def _validate_include_blob( 161 blob: Any, 162 bucket_name: str, 163 file_extensions_to_ignore: list[str] = [], 164 file_strings_to_ignore: list[str] = [], 165 file_extensions_to_include: list[str] = [], 166 verbose: bool = False 167 ) -> bool: 168 """ 169 Validate if a blob should be included based on its file extension. 170 171 Args: 172 file_extensions_to_include (list[str]): List of file extensions to include. 173 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 174 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 175 blob (Any): The GCS blob object. 176 verbose (bool): Whether to log files not being included. 177 178 Returns: 179 bool: True if the blob should be included, False otherwise. 180 """ 181 file_path = f"gs://{bucket_name}/{blob.name}" 182 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 183 if verbose: 184 logging.info(f"Skipping {file_path} as it has an extension to ignore") 185 return False 186 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 187 if verbose: 188 logging.info(f"Skipping {file_path} as it does not have an extension to include") 189 return False 190 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 191 if verbose: 192 logging.info(f"Skipping {file_path} as it has a string to ignore") 193 return False 194 return True 195 196 def list_bucket_contents( 197 self, 198 bucket_name: str, 199 prefix: Optional[str] = None, 200 file_extensions_to_ignore: list[str] = [], 201 file_strings_to_ignore: list[str] = [], 202 file_extensions_to_include: list[str] = [], 203 file_name_only: bool = False, 204 verbose: bool = False, 205 log_progress_interval: int = 10000 206 ) -> list[dict]: 207 """ 208 List contents of a GCS bucket and return a list of dictionaries with file information. 209 210 **Args:** 211 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 212 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 213 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 214 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 215 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 216 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 217 - verbose (bool, optional): Whether to log files not being included. Defaults to `False`. 218 - log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to `10000`. 219 220 **Returns:** 221 - list[dict]: A list of dictionaries containing file information. 222 """ 223 # If the bucket name starts with gs://, remove it 224 if bucket_name.startswith("gs://"): 225 bucket_name = bucket_name.split("/")[2].strip() 226 227 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 228 229 # Get the bucket object and set user_project for Requester Pays 230 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 231 232 # list_blobs returns a lazy iterator — no network calls happen until we iterate. 233 # page_size=1000 is the GCS maximum, minimising the number of paginated round-trips. 234 prefix_msg = f" with prefix '{prefix}'" if prefix else "" 235 logging.info(f"Starting to page through blobs in bucket '{bucket_name}'{prefix_msg}...") 236 blobs = bucket.list_blobs(prefix=prefix, page_size=1000) 237 238 # Iterate with progress logging so large buckets don't appear stuck 239 file_list = [] 240 for blob in blobs: 241 if blob.name.endswith("/"): 242 continue 243 if not self._validate_include_blob( 244 blob=blob, 245 file_extensions_to_ignore=file_extensions_to_ignore, 246 file_strings_to_ignore=file_strings_to_ignore, 247 file_extensions_to_include=file_extensions_to_include, 248 bucket_name=bucket_name 249 ): 250 continue 251 file_list.append( 252 self._create_bucket_contents_dict( 253 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 254 ) 255 ) 256 if verbose and len(file_list) % log_progress_interval == 0: 257 logging.info(f"Processed {len(file_list):,} files so far...") 258 259 logging.info(f"Found {len(file_list):,} files in bucket") 260 return file_list 261 262 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 263 """ 264 Copy a file from one GCS location to another. 265 266 **Args:** 267 - src_cloud_path (str): The source GCS path. 268 - full_destination_path (str): The destination GCS path. 269 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 270 """ 271 try: 272 src_blob = self.load_blob_from_full_path(src_cloud_path) 273 dest_blob = self.load_blob_from_full_path(full_destination_path) 274 275 # Use rewrite so no timeouts 276 rewrite_token = False 277 278 while True: 279 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 280 src_blob, token=rewrite_token 281 ) 282 if verbose: 283 logging.info( 284 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 285 ) 286 if not rewrite_token: 287 break 288 289 except Exception as e: 290 logging.error( 291 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 292 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 293 ) 294 raise 295 296 def delete_cloud_file(self, full_cloud_path: str) -> None: 297 """ 298 Delete a file from GCS. 299 300 **Args:** 301 - full_cloud_path (str): The GCS path of the file to delete. 302 """ 303 blob = self.load_blob_from_full_path(full_cloud_path) 304 blob.delete() 305 306 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 307 """ 308 Move a file from one GCS location to another. 309 310 **Args:** 311 - src_cloud_path (str): The source GCS path. 312 - full_destination_path (str): The destination GCS path. 313 """ 314 self.copy_cloud_file(src_cloud_path, full_destination_path) 315 self.delete_cloud_file(src_cloud_path) 316 317 def get_filesize(self, target_path: str) -> int: 318 """ 319 Get the size of a file in GCS. 320 321 **Args:** 322 - target_path (str): The GCS path of the file. 323 324 **Returns:** 325 - int: The size of the file in bytes. 326 """ 327 blob = self.load_blob_from_full_path(target_path) 328 return blob.size 329 330 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 331 """ 332 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 333 334 **Args:** 335 - src_cloud_path (str): The source GCS path. 336 - dest_cloud_path (str): The destination GCS path. 337 338 **Returns:** 339 - bool: `True` if the files are identical, `False` otherwise. 340 """ 341 src_blob = self.load_blob_from_full_path(src_cloud_path) 342 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 343 344 # If either blob is None or does not exist 345 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 346 return False 347 # If the MD5 hashes exist 348 if src_blob.md5_hash and dest_blob.md5_hash: 349 # And are the same return True 350 if src_blob.md5_hash == dest_blob.md5_hash: 351 return True 352 else: 353 # If md5 do not exist (for larger files they may not) check size matches 354 if src_blob.size == dest_blob.size: 355 return True 356 # Otherwise, return False 357 return False 358 359 def delete_multiple_files( 360 self, 361 files_to_delete: list[str], 362 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 363 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 364 verbose: bool = False, 365 job_complete_for_logging: int = 500 366 ) -> None: 367 """ 368 Delete multiple cloud files in parallel using multi-threading. 369 370 **Args:** 371 - files_to_delete (list[str]): List of GCS paths of the files to delete. 372 - workers (int, optional): Number of worker threads. Defaults to `10`. 373 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 374 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 375 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 376 """ 377 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 378 379 MultiThreadedJobs().run_multi_threaded_job( 380 workers=workers, 381 function=self.delete_cloud_file, 382 list_of_jobs_args_list=list_of_jobs_args_list, 383 max_retries=max_retries, 384 fail_on_error=True, 385 verbose=verbose, 386 collect_output=False, 387 jobs_complete_for_logging=job_complete_for_logging 388 ) 389 390 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 391 """ 392 Validate if source and destination files are identical. 393 394 **Args:** 395 - source_file (str): The source file path. 396 - full_destination_path (str): The destination file path. 397 398 **Returns:** 399 dict: The file dictionary of the files with a boolean indicating if they are identical. 400 """ 401 if self.validate_files_are_same(source_file, full_destination_path): 402 identical = True 403 else: 404 identical = False 405 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 406 407 def loop_and_log_validation_files_multithreaded( 408 self, 409 files_to_validate: list[dict], 410 log_difference: bool, 411 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 412 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 413 job_complete_for_logging: int = 500 414 ) -> list[dict]: 415 """ 416 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 417 418 **Args:** 419 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 420 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 421 this at the start of a copy/move operation to check if files are already copied. 422 - workers (int, optional): Number of worker threads. Defaults to `10`. 423 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 424 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 425 426 **Returns:** 427 - list[dict]: List of dictionaries containing files that are **not** identical. 428 """ 429 logging.info(f"Validating if {len(files_to_validate)} files are identical") 430 431 # Prepare jobs: pass the necessary arguments to each validation 432 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 433 434 # Use multithreaded job runner to validate the files 435 checked_files = MultiThreadedJobs().run_multi_threaded_job( 436 workers=workers, 437 function=self._validate_file_pair, 438 list_of_jobs_args_list=jobs, 439 collect_output=True, 440 max_retries=max_retries, 441 jobs_complete_for_logging=job_complete_for_logging 442 ) 443 # If any files failed to load, raise an exception 444 if files_to_validate and None in checked_files: # type: ignore[operator] 445 logging.error("Failed to validate all files, could not load some blobs") 446 raise Exception("Failed to validate all files") 447 448 # Get all files that are not identical 449 not_identical_files = [ 450 file_dict 451 for file_dict in checked_files # type: ignore[operator, union-attr] 452 if not file_dict['identical'] 453 ] 454 if not_identical_files: 455 if log_difference: 456 for file_dict in not_identical_files: 457 logging.warning( 458 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 459 ) 460 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 461 return not_identical_files 462 463 def multithread_copy_of_files_with_validation( 464 self, 465 files_to_copy: list[dict], 466 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 467 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 468 skip_check_if_already_copied: bool = False 469 ) -> None: 470 """ 471 Copy multiple files in parallel with validation. 472 473 **Args:** 474 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 475 Dictionary should have keys `source_file` and `full_destination_path` 476 - workers (int): Number of worker threads. Defaults to `10`. 477 - max_retries (int): Maximum number of retries. Defaults to `5` 478 - skip_check_if_already_copied (bool, optional): Whether to skip checking 479 if files are already copied and start copying right away. Defaults to `False`. 480 """ 481 if skip_check_if_already_copied: 482 logging.info("Skipping check if files are already copied") 483 updated_files_to_move = files_to_copy 484 else: 485 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 486 files_to_copy, 487 log_difference=False, 488 workers=workers, 489 max_retries=max_retries 490 ) 491 # If all files are already copied, return 492 if not updated_files_to_move: 493 logging.info("All files are already copied") 494 return None 495 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 496 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 497 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 498 # Validate that all files were copied successfully 499 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 500 files_to_copy, 501 workers=workers, 502 log_difference=True, 503 max_retries=max_retries 504 ) 505 if files_not_moved_successfully: 506 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 507 raise Exception("Failed to copy all files") 508 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 509 return None 510 511 def move_or_copy_multiple_files( 512 self, files_to_move: list[dict], 513 action: str, 514 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 515 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 516 verbose: bool = False, 517 jobs_complete_for_logging: int = 500 518 ) -> None: 519 """ 520 Move or copy multiple files in parallel. 521 522 **Args:** 523 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 524 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 525 or `ops_utils.gcp_utils.COPY`). 526 - workers (int): Number of worker threads. Defaults to `10`. 527 - max_retries (int): Maximum number of retries. Defaults to `5`. 528 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 529 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 530 531 **Raises:** 532 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 533 """ 534 if action == MOVE: 535 cloud_function = self.move_cloud_file 536 elif action == COPY: 537 cloud_function = self.copy_cloud_file 538 else: 539 raise ValueError("Must either select move or copy") 540 541 list_of_jobs_args_list = [ 542 [ 543 file_dict['source_file'], file_dict['full_destination_path'] 544 ] 545 for file_dict in files_to_move 546 ] 547 MultiThreadedJobs().run_multi_threaded_job( 548 workers=workers, 549 function=cloud_function, 550 list_of_jobs_args_list=list_of_jobs_args_list, 551 max_retries=max_retries, 552 fail_on_error=True, 553 verbose=verbose, 554 collect_output=False, 555 jobs_complete_for_logging=jobs_complete_for_logging 556 ) 557 558 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 559 """ 560 Read the content of a file from GCS. 561 562 **Args:** 563 - cloud_path (str): The GCS path of the file to read. 564 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 565 566 **Returns:** 567 - bytes: The content of the file as bytes. 568 """ 569 blob = self.load_blob_from_full_path(cloud_path) 570 # Download the file content as bytes 571 content_bytes = blob.download_as_bytes() 572 # Convert bytes to string 573 content_str = content_bytes.decode(encoding) 574 return content_str 575 576 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 577 """ 578 Upload a file to GCS. 579 580 **Args:** 581 - destination_path (str): The destination GCS path. 582 - source_file (str): The source file path. 583 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 584 """ 585 blob = self.load_blob_from_full_path(destination_path) 586 if custom_metadata: 587 blob.metadata = custom_metadata 588 blob.upload_from_filename(source_file) 589 590 def get_object_md5( 591 self, 592 file_path: str, 593 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 594 chunk_size: int = parse_size("256 KB"), 595 logging_bytes: int = parse_size("1 GB"), 596 returned_md5_format: str = "hex" 597 ) -> str: 598 """ 599 Calculate the MD5 checksum of a file in GCS. 600 601 **Args:** 602 - file_path (str): The GCS path of the file. 603 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 604 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 605 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 606 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 607 608 **Returns:** 609 - str: The MD5 checksum of the file. 610 611 **Raises:** 612 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 613 or `ops_utils.gcp_utils.MD5_BASE64` 614 """ 615 if returned_md5_format not in ["hex", "base64"]: 616 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 617 618 blob = self.load_blob_from_full_path(file_path) 619 620 # Create an MD5 hash object 621 md5_hash = hashlib.md5() 622 623 blob_size_str = format_size(blob.size) 624 logging.info(f"Streaming {file_path} which is {blob_size_str}") 625 # Use a BytesIO stream to collect data in chunks and upload it 626 buffer = io.BytesIO() 627 total_bytes_streamed = 0 628 # Keep track of the last logged size for data logging 629 last_logged = 0 630 631 with blob.open("rb") as source_stream: 632 while True: 633 chunk = source_stream.read(chunk_size) 634 if not chunk: 635 break 636 md5_hash.update(chunk) 637 buffer.write(chunk) 638 total_bytes_streamed += len(chunk) 639 # Log progress every 1 gb if verbose used 640 if total_bytes_streamed - last_logged >= logging_bytes: 641 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 642 last_logged = total_bytes_streamed 643 644 if returned_md5_format == "hex": 645 md5 = md5_hash.hexdigest() 646 logging.info(f"MD5 (hex) for {file_path}: {md5}") 647 elif returned_md5_format == "base64": 648 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 649 logging.info(f"MD5 (base64) for {file_path}: {md5}") 650 return md5 651 652 def set_acl_public_read(self, cloud_path: str) -> None: 653 """ 654 Set the file in the bucket to be publicly readable. 655 656 **Args:** 657 - cloud_path (str): The GCS path of the file to be set as public readable. 658 """ 659 blob = self.load_blob_from_full_path(cloud_path) 660 blob.acl.all().grant_read() 661 blob.acl.save() 662 663 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 664 """ 665 Set the file in the bucket to grant OWNER permission to a specific group. 666 667 **Args:** 668 - cloud_path (str): The GCS path of the file. 669 - group_email (str): The email of the group to grant OWNER permission 670 """ 671 blob = self.load_blob_from_full_path(cloud_path) 672 blob.acl.group(group_email).grant_owner() 673 blob.acl.save() 674 675 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 676 """ 677 Set Cache-Control metadata for a file. 678 679 **Args:** 680 - cloud_path (str): The GCS path of the file. 681 - cache_control (str): The Cache-Control metadata to set. 682 """ 683 blob = self.load_blob_from_full_path(cloud_path) 684 blob.cache_control = cache_control 685 blob.patch() 686 687 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 688 """ 689 Get the most recent blob in the bucket. 690 691 If the blob with the most recent timestamp doesn't have 692 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 693 recent file until a log with useful information is encountered. This is useful when combing through 694 GCP activity logs for Terra workspace buckets. 695 696 **Args:** 697 - bucket_name (str): The GCS bucket name. 698 699 **Returns:** 700 - Optional tuple of the blob found and the file contents from the blob 701 """ 702 blobs = sorted( 703 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 704 ) 705 for blob in blobs: 706 # Download the file contents as a string 707 file_contents = blob.download_as_text() 708 709 # Check if the content matches the undesired format 710 lines = file_contents.splitlines() 711 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 712 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 713 continue 714 715 # If it doesn't match the undesired format, return its content 716 logging.info(f"Found valid file: {blob.name}") 717 return blob, file_contents 718 719 logging.info("No valid files found.") 720 return None 721 722 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 723 """ 724 Write content to a file in GCS. 725 726 **Args:** 727 - cloud_path (str): The GCS path of the file to write. 728 - file_contents (str): The content to write. 729 """ 730 blob = self.load_blob_from_full_path(cloud_path) 731 blob.upload_from_string(file_contents) 732 logging.info(f"Successfully wrote content to {cloud_path}") 733 734 @staticmethod 735 def get_active_gcloud_account() -> str: 736 """ 737 Get the active GCP email for the current account. 738 739 **Returns:** 740 - str: The active GCP account email. 741 """ 742 result = subprocess.run( 743 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 744 capture_output=True, 745 text=True, 746 check=True 747 ) 748 return result.stdout.strip() 749 750 def has_write_permission(self, cloud_path: str) -> bool: 751 """ 752 Check if the current user has permission to write to a GCP path. 753 754 This method tests write access by attempting to update the metadata 755 of an existing blob or create a zero-byte temporary file if the blob 756 doesn't exist. The temporary file is deleted immediately if created. 757 758 **Args:** 759 - cloud_path (str): The GCS path to check for write permissions. 760 761 **Returns:** 762 - bool: True if the user has write permission, False otherwise. 763 """ 764 if not cloud_path.startswith("gs://"): 765 raise ValueError("cloud_path must start with 'gs://'") 766 if cloud_path.endswith("/"): 767 logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp") 768 cloud_path = f"{cloud_path}permission_test_temp" 769 try: 770 blob = self.load_blob_from_full_path(cloud_path) 771 if blob.exists(): 772 # Try updating metadata (doesn't change the content) 773 original_metadata = blob.metadata or {} 774 test_metadata = original_metadata.copy() 775 test_metadata["_write_permission_test"] = "true" 776 777 blob.metadata = test_metadata 778 blob.patch() 779 780 # Restore the original metadata 781 blob.metadata = original_metadata 782 blob.patch() 783 784 logging.info(f"Write permission confirmed for existing blob {cloud_path}") 785 return True 786 else: 787 # Try writing a temporary file to the bucket 788 blob.upload_from_string("") 789 790 # Clean up the test file 791 blob.delete() 792 logging.info(f"Write permission confirmed for {cloud_path}") 793 return True 794 except Forbidden: 795 logging.warning(f"No write permission on path {cloud_path}") 796 return False 797 except GoogleAPICallError as e: 798 logging.warning(f"Error testing write access to {cloud_path}: {e}") 799 return False 800 801 def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None: 802 """ 803 Wait for write permissions on a GCP path, checking at regular intervals. 804 805 This method will periodically check if the user has write permission on the specified cloud path. 806 It will continue checking until either write permission is granted or the maximum wait time is reached. 807 808 **Args:** 809 - cloud_path (str): The GCS path to check for write permissions. 810 - interval_wait_time_minutes (int): Time in minutes to wait between permission checks. 811 - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions. 812 813 **Returns:** 814 - bool: True if write permission is granted within the wait time, False otherwise. 815 """ 816 if not cloud_path.startswith("gs://"): 817 raise ValueError("cloud_path must start with 'gs://'") 818 819 # Convert minutes to seconds for the sleep function 820 interval_seconds = interval_wait_time_minutes * 60 821 max_wait_seconds = max_wait_time_minutes * 60 822 823 start_time = time.time() 824 attempt_number = 1 825 826 logging.info( 827 f"Starting to check for write permissions on {cloud_path}. Will check " 828 f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).") 829 830 # First check immediately 831 if self.has_write_permission(cloud_path): 832 logging.info(f"Write permission confirmed on initial check for {cloud_path}") 833 return 834 835 # If first check fails, start periodic checks 836 while time.time() - start_time < max_wait_seconds: 837 elapsed_minutes = (time.time() - start_time) / 60 838 remaining_minutes = max_wait_time_minutes - elapsed_minutes 839 840 logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. " 841 f"Time elapsed: {elapsed_minutes:.1f} minute(s). " 842 f"Time remaining: {remaining_minutes:.1f} minute(s).") 843 844 # Sleep for the interval duration 845 time.sleep(interval_seconds) 846 847 attempt_number += 1 848 logging.info(f"Checking write permissions (attempt {attempt_number})...") 849 850 if self.has_write_permission(cloud_path): 851 elapsed_minutes = (time.time() - start_time) / 60 852 logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}") 853 return 854 855 # If we get here, we've exceeded the maximum wait time 856 raise PermissionError( 857 f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for " 858 f"{cloud_path} after {attempt_number} attempts.") 859 860 def _load_blob_full_path_to_bucket_contents_dict(self, full_path: str, file_name_only: bool) -> dict: 861 """Load a blob from a full GCS path and convert it to a bucket-contents dict. 862 863 This is intended for use with multithreaded batch operations. 864 865 Raises: 866 ValueError: If the blob does not exist. 867 """ 868 file_path_components = self._process_cloud_path(full_path) 869 blob = self.load_blob_from_full_path(full_path) 870 871 # load_blob_from_full_path reloads metadata when exists() is True. 872 # Ensure the object exists so downstream consumers always get a valid dict. 873 if not blob.exists(): 874 raise ValueError(f"Blob does not exist: {full_path}") 875 876 return self._create_bucket_contents_dict( 877 bucket_name=file_path_components["bucket"], 878 blob=blob, 879 file_name_only=file_name_only 880 ) 881 882 def check_files_exist_multithreaded( 883 self, 884 full_paths: list[str], 885 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 886 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 887 job_complete_for_logging: int = 500 888 ) -> dict[str, bool]: 889 """Check existence of multiple GCS files in parallel. 890 891 **Args:** 892 - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to check. 893 - workers (int, optional): Number of worker threads. Defaults to `10`. 894 - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`. 895 - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`. 896 897 **Returns:** 898 - dict[str, bool]: A dictionary where each key is a GCS path and the value is ``True`` if the file exists, 899 ``False`` otherwise. 900 """ 901 # _check_file_exists_in_gcs is needed because check_file_exists returns a plain bool, which would make it 902 # impossible to associate each result back to its path after the threads complete. By wrapping 903 # it here we return both the path and the result together, so the final dict comprehension 904 # can correctly map path -> bool regardless of the order results come back from the thread pool. 905 def _check_file_exists_in_gcs(path: str) -> dict: 906 return {"path": path, "exists": self.check_file_exists(path)} 907 908 jobs = [[path] for path in full_paths] 909 910 results = MultiThreadedJobs().run_multi_threaded_job( 911 workers=workers, 912 function=_check_file_exists_in_gcs, 913 list_of_jobs_args_list=jobs, 914 collect_output=True, 915 max_retries=max_retries, 916 fail_on_error=True, 917 jobs_complete_for_logging=job_complete_for_logging 918 ) 919 return {item["path"]: item["exists"] for item in results} # type: ignore[union-attr] 920 921 def read_files_multithreaded( 922 self, 923 full_paths: list[str], 924 encoding: str = "utf-8", 925 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 926 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 927 job_complete_for_logging: int = 500 928 ) -> dict[str, str]: 929 """Read the contents of multiple GCS files in parallel. 930 931 **Args:** 932 - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to read. 933 - encoding (str, optional): The encoding to use when decoding file contents. Defaults to `utf-8`. 934 - workers (int, optional): Number of worker threads. Defaults to `10`. 935 - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`. 936 - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`. 937 938 **Returns:** 939 - dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string. 940 """ 941 # _read_file_contents_from_gcs is needed because read_file returns a plain str, which would make it 942 # impossible to associate each result back to its path after the threads complete. By wrapping 943 # it here we return both the path and the contents together, so the final dict comprehension 944 # can correctly map path -> contents regardless of the order results come back from the thread pool. 945 def _read_file_contents_from_gcs(path: str) -> dict: 946 return {"path": path, "contents": self.read_file(path, encoding=encoding)} 947 948 jobs = [[path] for path in full_paths] 949 950 results = MultiThreadedJobs().run_multi_threaded_job( 951 workers=workers, 952 function=_read_file_contents_from_gcs, 953 list_of_jobs_args_list=jobs, 954 collect_output=True, 955 max_retries=max_retries, 956 fail_on_error=True, 957 jobs_complete_for_logging=job_complete_for_logging 958 ) 959 return {item["path"]: item["contents"] for item in results} # type: ignore[union-attr] 960 961 def load_blobs_from_full_paths_multithreaded( 962 self, 963 full_paths: list[str], 964 file_name_only: bool = False, 965 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 966 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 967 job_complete_for_logging: int = 500 968 ) -> list[dict]: 969 """Load multiple blobs in parallel from a list of full GCS paths. 970 971 Args: 972 full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object). 973 file_name_only: Whether to return only the file path in each dict. 974 workers: Number of worker threads. 975 max_retries: Maximum number of retries per job. 976 job_complete_for_logging: Emit progress logs every N completed jobs. 977 978 Returns: 979 List of dictionaries shaped like :meth:`_create_bucket_contents_dict`. 980 981 Raises: 982 Exception: If any blob cannot be loaded / does not exist / returns no output. 983 """ 984 if not full_paths: 985 return [] 986 987 # De-duplicate input paths and keep order 988 unique_paths = list(dict.fromkeys(full_paths)) 989 jobs = [(path, file_name_only) for path in unique_paths] 990 991 results = MultiThreadedJobs().run_multi_threaded_job( 992 workers=workers, 993 function=self._load_blob_full_path_to_bucket_contents_dict, 994 list_of_jobs_args_list=jobs, 995 collect_output=True, 996 max_retries=max_retries, 997 fail_on_error=True, 998 jobs_complete_for_logging=job_complete_for_logging 999 ) 1000 1001 # If any jobs failed, MultiThreadedJobs would have raised. Still defensively validate output. 1002 if results is None or (unique_paths and (None in results)): # type: ignore[operator] 1003 raise Exception("Failed to load all blobs") 1004 1005 if len(results) != len(unique_paths) or any(not item for item in results): # type: ignore[arg-type] 1006 raise Exception("Failed to load all blobs") 1007 1008 return results # type: ignore[return-value]
Class to handle GCP Cloud Functions.
37 def __init__( 38 self, 39 project: Optional[str] = None, 40 service_account_json: Optional[str] = None 41 ) -> None: 42 """ 43 Initialize the GCPCloudFunctions class. 44 45 Authenticates using service account JSON if provided or default credentials, 46 and sets up the Storage Client. 47 48 Args: 49 project: Optional[str] = None 50 The GCP project ID. If not provided, will use project from service account or default. 51 service_account_json: Optional[str] = None 52 Path to service account JSON key file. If provided, will use these credentials. 53 """ 54 # Initialize credentials and project 55 credentials = None 56 default_project = None 57 58 if service_account_json: 59 credentials = service_account.Credentials.from_service_account_file(service_account_json) 60 # Extract project from service account if not specified 61 if not project: 62 with open(service_account_json, 'r') as f: 63 sa_info = json.load(f) 64 project = sa_info.get('project_id') 65 else: 66 # Use default credentials 67 credentials, default_project = default() 68 69 # Set project if not already set 70 if not project: 71 project = default_project 72 73 self.client = storage.Client(credentials=credentials, project=project) 74 """@private"""
Initialize the GCPCloudFunctions class.
Authenticates using service account JSON if provided or default credentials, and sets up the Storage Client.
Args: project: Optional[str] = None The GCP project ID. If not provided, will use project from service account or default. service_account_json: Optional[str] = None Path to service account JSON key file. If provided, will use these credentials.
97 def load_blob_from_full_path(self, full_path: str) -> Blob: 98 """ 99 Load a GCS blob object from a full GCS path. 100 101 **Args:** 102 - full_path (str): The full GCS path. 103 104 **Returns:** 105 - google.cloud.storage.blob.Blob: The GCS blob object. 106 """ 107 file_path_components = self._process_cloud_path(full_path) 108 109 # Specify the billing project 110 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 111 blob = bucket.blob(file_path_components["blob_url"]) 112 113 # If blob exists in GCS reload it so metadata is there 114 if blob.exists(): 115 blob.reload() 116 return blob
Load a GCS blob object from a full GCS path.
Args:
- full_path (str): The full GCS path.
Returns:
- google.cloud.storage.blob.Blob: The GCS blob object.
118 def check_file_exists(self, full_path: str) -> bool: 119 """ 120 Check if a file exists in GCS. 121 122 **Args:** 123 - full_path (str): The full GCS path. 124 125 **Returns:** 126 - bool: `True` if the file exists, `False` otherwise. 127 """ 128 blob = self.load_blob_from_full_path(full_path) 129 return blob.exists()
Check if a file exists in GCS.
Args:
- full_path (str): The full GCS path.
Returns:
- bool:
Trueif the file exists,Falseotherwise.
196 def list_bucket_contents( 197 self, 198 bucket_name: str, 199 prefix: Optional[str] = None, 200 file_extensions_to_ignore: list[str] = [], 201 file_strings_to_ignore: list[str] = [], 202 file_extensions_to_include: list[str] = [], 203 file_name_only: bool = False, 204 verbose: bool = False, 205 log_progress_interval: int = 10000 206 ) -> list[dict]: 207 """ 208 List contents of a GCS bucket and return a list of dictionaries with file information. 209 210 **Args:** 211 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 212 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 213 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 214 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 215 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 216 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 217 - verbose (bool, optional): Whether to log files not being included. Defaults to `False`. 218 - log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to `10000`. 219 220 **Returns:** 221 - list[dict]: A list of dictionaries containing file information. 222 """ 223 # If the bucket name starts with gs://, remove it 224 if bucket_name.startswith("gs://"): 225 bucket_name = bucket_name.split("/")[2].strip() 226 227 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 228 229 # Get the bucket object and set user_project for Requester Pays 230 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 231 232 # list_blobs returns a lazy iterator — no network calls happen until we iterate. 233 # page_size=1000 is the GCS maximum, minimising the number of paginated round-trips. 234 prefix_msg = f" with prefix '{prefix}'" if prefix else "" 235 logging.info(f"Starting to page through blobs in bucket '{bucket_name}'{prefix_msg}...") 236 blobs = bucket.list_blobs(prefix=prefix, page_size=1000) 237 238 # Iterate with progress logging so large buckets don't appear stuck 239 file_list = [] 240 for blob in blobs: 241 if blob.name.endswith("/"): 242 continue 243 if not self._validate_include_blob( 244 blob=blob, 245 file_extensions_to_ignore=file_extensions_to_ignore, 246 file_strings_to_ignore=file_strings_to_ignore, 247 file_extensions_to_include=file_extensions_to_include, 248 bucket_name=bucket_name 249 ): 250 continue 251 file_list.append( 252 self._create_bucket_contents_dict( 253 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 254 ) 255 ) 256 if verbose and len(file_list) % log_progress_interval == 0: 257 logging.info(f"Processed {len(file_list):,} files so far...") 258 259 logging.info(f"Found {len(file_list):,} files in bucket") 260 return file_list
List contents of a GCS bucket and return a list of dictionaries with file information.
Args:
- bucket_name (str): The name of the GCS bucket. If includes
gs://, it will be removed. - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
- file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
- file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
- file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
- file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to
False. - verbose (bool, optional): Whether to log files not being included. Defaults to
False. - log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to
10000.
Returns:
- list[dict]: A list of dictionaries containing file information.
262 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 263 """ 264 Copy a file from one GCS location to another. 265 266 **Args:** 267 - src_cloud_path (str): The source GCS path. 268 - full_destination_path (str): The destination GCS path. 269 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 270 """ 271 try: 272 src_blob = self.load_blob_from_full_path(src_cloud_path) 273 dest_blob = self.load_blob_from_full_path(full_destination_path) 274 275 # Use rewrite so no timeouts 276 rewrite_token = False 277 278 while True: 279 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 280 src_blob, token=rewrite_token 281 ) 282 if verbose: 283 logging.info( 284 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 285 ) 286 if not rewrite_token: 287 break 288 289 except Exception as e: 290 logging.error( 291 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 292 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 293 ) 294 raise
Copy a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
- verbose (bool, optional): Whether to log progress. Defaults to
False.
296 def delete_cloud_file(self, full_cloud_path: str) -> None: 297 """ 298 Delete a file from GCS. 299 300 **Args:** 301 - full_cloud_path (str): The GCS path of the file to delete. 302 """ 303 blob = self.load_blob_from_full_path(full_cloud_path) 304 blob.delete()
Delete a file from GCS.
Args:
- full_cloud_path (str): The GCS path of the file to delete.
306 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 307 """ 308 Move a file from one GCS location to another. 309 310 **Args:** 311 - src_cloud_path (str): The source GCS path. 312 - full_destination_path (str): The destination GCS path. 313 """ 314 self.copy_cloud_file(src_cloud_path, full_destination_path) 315 self.delete_cloud_file(src_cloud_path)
Move a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
317 def get_filesize(self, target_path: str) -> int: 318 """ 319 Get the size of a file in GCS. 320 321 **Args:** 322 - target_path (str): The GCS path of the file. 323 324 **Returns:** 325 - int: The size of the file in bytes. 326 """ 327 blob = self.load_blob_from_full_path(target_path) 328 return blob.size
Get the size of a file in GCS.
Args:
- target_path (str): The GCS path of the file.
Returns:
- int: The size of the file in bytes.
330 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 331 """ 332 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 333 334 **Args:** 335 - src_cloud_path (str): The source GCS path. 336 - dest_cloud_path (str): The destination GCS path. 337 338 **Returns:** 339 - bool: `True` if the files are identical, `False` otherwise. 340 """ 341 src_blob = self.load_blob_from_full_path(src_cloud_path) 342 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 343 344 # If either blob is None or does not exist 345 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 346 return False 347 # If the MD5 hashes exist 348 if src_blob.md5_hash and dest_blob.md5_hash: 349 # And are the same return True 350 if src_blob.md5_hash == dest_blob.md5_hash: 351 return True 352 else: 353 # If md5 do not exist (for larger files they may not) check size matches 354 if src_blob.size == dest_blob.size: 355 return True 356 # Otherwise, return False 357 return False
Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
Args:
- src_cloud_path (str): The source GCS path.
- dest_cloud_path (str): The destination GCS path.
Returns:
- bool:
Trueif the files are identical,Falseotherwise.
359 def delete_multiple_files( 360 self, 361 files_to_delete: list[str], 362 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 363 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 364 verbose: bool = False, 365 job_complete_for_logging: int = 500 366 ) -> None: 367 """ 368 Delete multiple cloud files in parallel using multi-threading. 369 370 **Args:** 371 - files_to_delete (list[str]): List of GCS paths of the files to delete. 372 - workers (int, optional): Number of worker threads. Defaults to `10`. 373 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 374 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 375 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 376 """ 377 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 378 379 MultiThreadedJobs().run_multi_threaded_job( 380 workers=workers, 381 function=self.delete_cloud_file, 382 list_of_jobs_args_list=list_of_jobs_args_list, 383 max_retries=max_retries, 384 fail_on_error=True, 385 verbose=verbose, 386 collect_output=False, 387 jobs_complete_for_logging=job_complete_for_logging 388 )
Delete multiple cloud files in parallel using multi-threading.
Args:
- files_to_delete (list[str]): List of GCS paths of the files to delete.
- workers (int, optional): Number of worker threads. Defaults to
10. - max_retries (int, optional): Maximum number of retries. Defaults to
5. - verbose (bool, optional): Whether to log each job's success. Defaults to
False. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500.
407 def loop_and_log_validation_files_multithreaded( 408 self, 409 files_to_validate: list[dict], 410 log_difference: bool, 411 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 412 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 413 job_complete_for_logging: int = 500 414 ) -> list[dict]: 415 """ 416 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 417 418 **Args:** 419 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 420 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 421 this at the start of a copy/move operation to check if files are already copied. 422 - workers (int, optional): Number of worker threads. Defaults to `10`. 423 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 424 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 425 426 **Returns:** 427 - list[dict]: List of dictionaries containing files that are **not** identical. 428 """ 429 logging.info(f"Validating if {len(files_to_validate)} files are identical") 430 431 # Prepare jobs: pass the necessary arguments to each validation 432 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 433 434 # Use multithreaded job runner to validate the files 435 checked_files = MultiThreadedJobs().run_multi_threaded_job( 436 workers=workers, 437 function=self._validate_file_pair, 438 list_of_jobs_args_list=jobs, 439 collect_output=True, 440 max_retries=max_retries, 441 jobs_complete_for_logging=job_complete_for_logging 442 ) 443 # If any files failed to load, raise an exception 444 if files_to_validate and None in checked_files: # type: ignore[operator] 445 logging.error("Failed to validate all files, could not load some blobs") 446 raise Exception("Failed to validate all files") 447 448 # Get all files that are not identical 449 not_identical_files = [ 450 file_dict 451 for file_dict in checked_files # type: ignore[operator, union-attr] 452 if not file_dict['identical'] 453 ] 454 if not_identical_files: 455 if log_difference: 456 for file_dict in not_identical_files: 457 logging.warning( 458 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 459 ) 460 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 461 return not_identical_files
Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
Args:
- files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
- log_difference (bool): Whether to log differences if files are not identical. Set
Falseif you are running this at the start of a copy/move operation to check if files are already copied. - workers (int, optional): Number of worker threads. Defaults to
10. - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to
5. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500.
Returns:
- list[dict]: List of dictionaries containing files that are not identical.
463 def multithread_copy_of_files_with_validation( 464 self, 465 files_to_copy: list[dict], 466 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 467 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 468 skip_check_if_already_copied: bool = False 469 ) -> None: 470 """ 471 Copy multiple files in parallel with validation. 472 473 **Args:** 474 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 475 Dictionary should have keys `source_file` and `full_destination_path` 476 - workers (int): Number of worker threads. Defaults to `10`. 477 - max_retries (int): Maximum number of retries. Defaults to `5` 478 - skip_check_if_already_copied (bool, optional): Whether to skip checking 479 if files are already copied and start copying right away. Defaults to `False`. 480 """ 481 if skip_check_if_already_copied: 482 logging.info("Skipping check if files are already copied") 483 updated_files_to_move = files_to_copy 484 else: 485 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 486 files_to_copy, 487 log_difference=False, 488 workers=workers, 489 max_retries=max_retries 490 ) 491 # If all files are already copied, return 492 if not updated_files_to_move: 493 logging.info("All files are already copied") 494 return None 495 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 496 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 497 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 498 # Validate that all files were copied successfully 499 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 500 files_to_copy, 501 workers=workers, 502 log_difference=True, 503 max_retries=max_retries 504 ) 505 if files_not_moved_successfully: 506 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 507 raise Exception("Failed to copy all files") 508 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 509 return None
Copy multiple files in parallel with validation.
Args:
- files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
Dictionary should have keys
source_fileandfull_destination_path - workers (int): Number of worker threads. Defaults to
10. - max_retries (int): Maximum number of retries. Defaults to
5 - skip_check_if_already_copied (bool, optional): Whether to skip checking
if files are already copied and start copying right away. Defaults to
False.
511 def move_or_copy_multiple_files( 512 self, files_to_move: list[dict], 513 action: str, 514 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 515 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 516 verbose: bool = False, 517 jobs_complete_for_logging: int = 500 518 ) -> None: 519 """ 520 Move or copy multiple files in parallel. 521 522 **Args:** 523 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 524 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 525 or `ops_utils.gcp_utils.COPY`). 526 - workers (int): Number of worker threads. Defaults to `10`. 527 - max_retries (int): Maximum number of retries. Defaults to `5`. 528 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 529 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 530 531 **Raises:** 532 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 533 """ 534 if action == MOVE: 535 cloud_function = self.move_cloud_file 536 elif action == COPY: 537 cloud_function = self.copy_cloud_file 538 else: 539 raise ValueError("Must either select move or copy") 540 541 list_of_jobs_args_list = [ 542 [ 543 file_dict['source_file'], file_dict['full_destination_path'] 544 ] 545 for file_dict in files_to_move 546 ] 547 MultiThreadedJobs().run_multi_threaded_job( 548 workers=workers, 549 function=cloud_function, 550 list_of_jobs_args_list=list_of_jobs_args_list, 551 max_retries=max_retries, 552 fail_on_error=True, 553 verbose=verbose, 554 collect_output=False, 555 jobs_complete_for_logging=jobs_complete_for_logging 556 )
Move or copy multiple files in parallel.
Args:
- files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
- action (str): The action to perform (should be one of
ops_utils.gcp_utils.MOVEorops_utils.gcp_utils.COPY). - workers (int): Number of worker threads. Defaults to
10. - max_retries (int): Maximum number of retries. Defaults to
5. - verbose (bool, optional): Whether to log each job's success. Defaults to
False. - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500.
Raises:
- ValueError: If the action is not one of
ops_utils.gcp_utils.MOVEorops_utils.gcp_utils.COPY.
558 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 559 """ 560 Read the content of a file from GCS. 561 562 **Args:** 563 - cloud_path (str): The GCS path of the file to read. 564 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 565 566 **Returns:** 567 - bytes: The content of the file as bytes. 568 """ 569 blob = self.load_blob_from_full_path(cloud_path) 570 # Download the file content as bytes 571 content_bytes = blob.download_as_bytes() 572 # Convert bytes to string 573 content_str = content_bytes.decode(encoding) 574 return content_str
Read the content of a file from GCS.
Args:
- cloud_path (str): The GCS path of the file to read.
- encoding (str, optional): The encoding to use. Defaults to
utf-8.
Returns:
- bytes: The content of the file as bytes.
576 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 577 """ 578 Upload a file to GCS. 579 580 **Args:** 581 - destination_path (str): The destination GCS path. 582 - source_file (str): The source file path. 583 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 584 """ 585 blob = self.load_blob_from_full_path(destination_path) 586 if custom_metadata: 587 blob.metadata = custom_metadata 588 blob.upload_from_filename(source_file)
Upload a file to GCS.
Args:
- destination_path (str): The destination GCS path.
- source_file (str): The source file path.
- custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
590 def get_object_md5( 591 self, 592 file_path: str, 593 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 594 chunk_size: int = parse_size("256 KB"), 595 logging_bytes: int = parse_size("1 GB"), 596 returned_md5_format: str = "hex" 597 ) -> str: 598 """ 599 Calculate the MD5 checksum of a file in GCS. 600 601 **Args:** 602 - file_path (str): The GCS path of the file. 603 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 604 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 605 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 606 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 607 608 **Returns:** 609 - str: The MD5 checksum of the file. 610 611 **Raises:** 612 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 613 or `ops_utils.gcp_utils.MD5_BASE64` 614 """ 615 if returned_md5_format not in ["hex", "base64"]: 616 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 617 618 blob = self.load_blob_from_full_path(file_path) 619 620 # Create an MD5 hash object 621 md5_hash = hashlib.md5() 622 623 blob_size_str = format_size(blob.size) 624 logging.info(f"Streaming {file_path} which is {blob_size_str}") 625 # Use a BytesIO stream to collect data in chunks and upload it 626 buffer = io.BytesIO() 627 total_bytes_streamed = 0 628 # Keep track of the last logged size for data logging 629 last_logged = 0 630 631 with blob.open("rb") as source_stream: 632 while True: 633 chunk = source_stream.read(chunk_size) 634 if not chunk: 635 break 636 md5_hash.update(chunk) 637 buffer.write(chunk) 638 total_bytes_streamed += len(chunk) 639 # Log progress every 1 gb if verbose used 640 if total_bytes_streamed - last_logged >= logging_bytes: 641 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 642 last_logged = total_bytes_streamed 643 644 if returned_md5_format == "hex": 645 md5 = md5_hash.hexdigest() 646 logging.info(f"MD5 (hex) for {file_path}: {md5}") 647 elif returned_md5_format == "base64": 648 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 649 logging.info(f"MD5 (base64) for {file_path}: {md5}") 650 return md5
Calculate the MD5 checksum of a file in GCS.
Args:
- file_path (str): The GCS path of the file.
- chunk_size (int, optional): The size of each chunk to read. Defaults to
256 KB. - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to
1 GB. - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to
hex. Options areops_utils.gcp_utils.MD5_HEXorops_utils.gcp_utils.MD5_BASE64.
Returns:
- str: The MD5 checksum of the file.
Raises:
- ValueError: If the
returned_md5_formatis not one ofops_utils.gcp_utils.MD5_HEXorops_utils.gcp_utils.MD5_BASE64
652 def set_acl_public_read(self, cloud_path: str) -> None: 653 """ 654 Set the file in the bucket to be publicly readable. 655 656 **Args:** 657 - cloud_path (str): The GCS path of the file to be set as public readable. 658 """ 659 blob = self.load_blob_from_full_path(cloud_path) 660 blob.acl.all().grant_read() 661 blob.acl.save()
Set the file in the bucket to be publicly readable.
Args:
- cloud_path (str): The GCS path of the file to be set as public readable.
663 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 664 """ 665 Set the file in the bucket to grant OWNER permission to a specific group. 666 667 **Args:** 668 - cloud_path (str): The GCS path of the file. 669 - group_email (str): The email of the group to grant OWNER permission 670 """ 671 blob = self.load_blob_from_full_path(cloud_path) 672 blob.acl.group(group_email).grant_owner() 673 blob.acl.save()
Set the file in the bucket to grant OWNER permission to a specific group.
Args:
- cloud_path (str): The GCS path of the file.
- group_email (str): The email of the group to grant OWNER permission
675 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 676 """ 677 Set Cache-Control metadata for a file. 678 679 **Args:** 680 - cloud_path (str): The GCS path of the file. 681 - cache_control (str): The Cache-Control metadata to set. 682 """ 683 blob = self.load_blob_from_full_path(cloud_path) 684 blob.cache_control = cache_control 685 blob.patch()
Set Cache-Control metadata for a file.
Args:
- cloud_path (str): The GCS path of the file.
- cache_control (str): The Cache-Control metadata to set.
687 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 688 """ 689 Get the most recent blob in the bucket. 690 691 If the blob with the most recent timestamp doesn't have 692 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 693 recent file until a log with useful information is encountered. This is useful when combing through 694 GCP activity logs for Terra workspace buckets. 695 696 **Args:** 697 - bucket_name (str): The GCS bucket name. 698 699 **Returns:** 700 - Optional tuple of the blob found and the file contents from the blob 701 """ 702 blobs = sorted( 703 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 704 ) 705 for blob in blobs: 706 # Download the file contents as a string 707 file_contents = blob.download_as_text() 708 709 # Check if the content matches the undesired format 710 lines = file_contents.splitlines() 711 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 712 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 713 continue 714 715 # If it doesn't match the undesired format, return its content 716 logging.info(f"Found valid file: {blob.name}") 717 return blob, file_contents 718 719 logging.info("No valid files found.") 720 return None
Get the most recent blob in the bucket.
If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.
Args:
- bucket_name (str): The GCS bucket name.
Returns:
- Optional tuple of the blob found and the file contents from the blob
722 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 723 """ 724 Write content to a file in GCS. 725 726 **Args:** 727 - cloud_path (str): The GCS path of the file to write. 728 - file_contents (str): The content to write. 729 """ 730 blob = self.load_blob_from_full_path(cloud_path) 731 blob.upload_from_string(file_contents) 732 logging.info(f"Successfully wrote content to {cloud_path}")
Write content to a file in GCS.
Args:
- cloud_path (str): The GCS path of the file to write.
- file_contents (str): The content to write.
734 @staticmethod 735 def get_active_gcloud_account() -> str: 736 """ 737 Get the active GCP email for the current account. 738 739 **Returns:** 740 - str: The active GCP account email. 741 """ 742 result = subprocess.run( 743 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 744 capture_output=True, 745 text=True, 746 check=True 747 ) 748 return result.stdout.strip()
Get the active GCP email for the current account.
Returns:
- str: The active GCP account email.
750 def has_write_permission(self, cloud_path: str) -> bool: 751 """ 752 Check if the current user has permission to write to a GCP path. 753 754 This method tests write access by attempting to update the metadata 755 of an existing blob or create a zero-byte temporary file if the blob 756 doesn't exist. The temporary file is deleted immediately if created. 757 758 **Args:** 759 - cloud_path (str): The GCS path to check for write permissions. 760 761 **Returns:** 762 - bool: True if the user has write permission, False otherwise. 763 """ 764 if not cloud_path.startswith("gs://"): 765 raise ValueError("cloud_path must start with 'gs://'") 766 if cloud_path.endswith("/"): 767 logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp") 768 cloud_path = f"{cloud_path}permission_test_temp" 769 try: 770 blob = self.load_blob_from_full_path(cloud_path) 771 if blob.exists(): 772 # Try updating metadata (doesn't change the content) 773 original_metadata = blob.metadata or {} 774 test_metadata = original_metadata.copy() 775 test_metadata["_write_permission_test"] = "true" 776 777 blob.metadata = test_metadata 778 blob.patch() 779 780 # Restore the original metadata 781 blob.metadata = original_metadata 782 blob.patch() 783 784 logging.info(f"Write permission confirmed for existing blob {cloud_path}") 785 return True 786 else: 787 # Try writing a temporary file to the bucket 788 blob.upload_from_string("") 789 790 # Clean up the test file 791 blob.delete() 792 logging.info(f"Write permission confirmed for {cloud_path}") 793 return True 794 except Forbidden: 795 logging.warning(f"No write permission on path {cloud_path}") 796 return False 797 except GoogleAPICallError as e: 798 logging.warning(f"Error testing write access to {cloud_path}: {e}") 799 return False
Check if the current user has permission to write to a GCP path.
This method tests write access by attempting to update the metadata of an existing blob or create a zero-byte temporary file if the blob doesn't exist. The temporary file is deleted immediately if created.
Args:
- cloud_path (str): The GCS path to check for write permissions.
Returns:
- bool: True if the user has write permission, False otherwise.
801 def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None: 802 """ 803 Wait for write permissions on a GCP path, checking at regular intervals. 804 805 This method will periodically check if the user has write permission on the specified cloud path. 806 It will continue checking until either write permission is granted or the maximum wait time is reached. 807 808 **Args:** 809 - cloud_path (str): The GCS path to check for write permissions. 810 - interval_wait_time_minutes (int): Time in minutes to wait between permission checks. 811 - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions. 812 813 **Returns:** 814 - bool: True if write permission is granted within the wait time, False otherwise. 815 """ 816 if not cloud_path.startswith("gs://"): 817 raise ValueError("cloud_path must start with 'gs://'") 818 819 # Convert minutes to seconds for the sleep function 820 interval_seconds = interval_wait_time_minutes * 60 821 max_wait_seconds = max_wait_time_minutes * 60 822 823 start_time = time.time() 824 attempt_number = 1 825 826 logging.info( 827 f"Starting to check for write permissions on {cloud_path}. Will check " 828 f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).") 829 830 # First check immediately 831 if self.has_write_permission(cloud_path): 832 logging.info(f"Write permission confirmed on initial check for {cloud_path}") 833 return 834 835 # If first check fails, start periodic checks 836 while time.time() - start_time < max_wait_seconds: 837 elapsed_minutes = (time.time() - start_time) / 60 838 remaining_minutes = max_wait_time_minutes - elapsed_minutes 839 840 logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. " 841 f"Time elapsed: {elapsed_minutes:.1f} minute(s). " 842 f"Time remaining: {remaining_minutes:.1f} minute(s).") 843 844 # Sleep for the interval duration 845 time.sleep(interval_seconds) 846 847 attempt_number += 1 848 logging.info(f"Checking write permissions (attempt {attempt_number})...") 849 850 if self.has_write_permission(cloud_path): 851 elapsed_minutes = (time.time() - start_time) / 60 852 logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}") 853 return 854 855 # If we get here, we've exceeded the maximum wait time 856 raise PermissionError( 857 f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for " 858 f"{cloud_path} after {attempt_number} attempts.")
Wait for write permissions on a GCP path, checking at regular intervals.
This method will periodically check if the user has write permission on the specified cloud path. It will continue checking until either write permission is granted or the maximum wait time is reached.
Args:
- cloud_path (str): The GCS path to check for write permissions.
- interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
- max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
Returns:
- bool: True if write permission is granted within the wait time, False otherwise.
882 def check_files_exist_multithreaded( 883 self, 884 full_paths: list[str], 885 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 886 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 887 job_complete_for_logging: int = 500 888 ) -> dict[str, bool]: 889 """Check existence of multiple GCS files in parallel. 890 891 **Args:** 892 - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to check. 893 - workers (int, optional): Number of worker threads. Defaults to `10`. 894 - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`. 895 - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`. 896 897 **Returns:** 898 - dict[str, bool]: A dictionary where each key is a GCS path and the value is ``True`` if the file exists, 899 ``False`` otherwise. 900 """ 901 # _check_file_exists_in_gcs is needed because check_file_exists returns a plain bool, which would make it 902 # impossible to associate each result back to its path after the threads complete. By wrapping 903 # it here we return both the path and the result together, so the final dict comprehension 904 # can correctly map path -> bool regardless of the order results come back from the thread pool. 905 def _check_file_exists_in_gcs(path: str) -> dict: 906 return {"path": path, "exists": self.check_file_exists(path)} 907 908 jobs = [[path] for path in full_paths] 909 910 results = MultiThreadedJobs().run_multi_threaded_job( 911 workers=workers, 912 function=_check_file_exists_in_gcs, 913 list_of_jobs_args_list=jobs, 914 collect_output=True, 915 max_retries=max_retries, 916 fail_on_error=True, 917 jobs_complete_for_logging=job_complete_for_logging 918 ) 919 return {item["path"]: item["exists"] for item in results} # type: ignore[union-attr]
Check existence of multiple GCS files in parallel.
Args:
- full_paths (list[str]): List of full GCS paths (e.g.
gs://bucket/path/to/object) to check. - workers (int, optional): Number of worker threads. Defaults to
10. - max_retries (int, optional): Maximum number of retries per job. Defaults to
5. - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to
500.
Returns:
- dict[str, bool]: A dictionary where each key is a GCS path and the value is
Trueif the file exists,Falseotherwise.
921 def read_files_multithreaded( 922 self, 923 full_paths: list[str], 924 encoding: str = "utf-8", 925 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 926 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 927 job_complete_for_logging: int = 500 928 ) -> dict[str, str]: 929 """Read the contents of multiple GCS files in parallel. 930 931 **Args:** 932 - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to read. 933 - encoding (str, optional): The encoding to use when decoding file contents. Defaults to `utf-8`. 934 - workers (int, optional): Number of worker threads. Defaults to `10`. 935 - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`. 936 - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`. 937 938 **Returns:** 939 - dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string. 940 """ 941 # _read_file_contents_from_gcs is needed because read_file returns a plain str, which would make it 942 # impossible to associate each result back to its path after the threads complete. By wrapping 943 # it here we return both the path and the contents together, so the final dict comprehension 944 # can correctly map path -> contents regardless of the order results come back from the thread pool. 945 def _read_file_contents_from_gcs(path: str) -> dict: 946 return {"path": path, "contents": self.read_file(path, encoding=encoding)} 947 948 jobs = [[path] for path in full_paths] 949 950 results = MultiThreadedJobs().run_multi_threaded_job( 951 workers=workers, 952 function=_read_file_contents_from_gcs, 953 list_of_jobs_args_list=jobs, 954 collect_output=True, 955 max_retries=max_retries, 956 fail_on_error=True, 957 jobs_complete_for_logging=job_complete_for_logging 958 ) 959 return {item["path"]: item["contents"] for item in results} # type: ignore[union-attr]
Read the contents of multiple GCS files in parallel.
Args:
- full_paths (list[str]): List of full GCS paths (e.g.
gs://bucket/path/to/object) to read. - encoding (str, optional): The encoding to use when decoding file contents. Defaults to
utf-8. - workers (int, optional): Number of worker threads. Defaults to
10. - max_retries (int, optional): Maximum number of retries per job. Defaults to
5. - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to
500.
Returns:
- dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string.
961 def load_blobs_from_full_paths_multithreaded( 962 self, 963 full_paths: list[str], 964 file_name_only: bool = False, 965 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 966 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 967 job_complete_for_logging: int = 500 968 ) -> list[dict]: 969 """Load multiple blobs in parallel from a list of full GCS paths. 970 971 Args: 972 full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object). 973 file_name_only: Whether to return only the file path in each dict. 974 workers: Number of worker threads. 975 max_retries: Maximum number of retries per job. 976 job_complete_for_logging: Emit progress logs every N completed jobs. 977 978 Returns: 979 List of dictionaries shaped like :meth:`_create_bucket_contents_dict`. 980 981 Raises: 982 Exception: If any blob cannot be loaded / does not exist / returns no output. 983 """ 984 if not full_paths: 985 return [] 986 987 # De-duplicate input paths and keep order 988 unique_paths = list(dict.fromkeys(full_paths)) 989 jobs = [(path, file_name_only) for path in unique_paths] 990 991 results = MultiThreadedJobs().run_multi_threaded_job( 992 workers=workers, 993 function=self._load_blob_full_path_to_bucket_contents_dict, 994 list_of_jobs_args_list=jobs, 995 collect_output=True, 996 max_retries=max_retries, 997 fail_on_error=True, 998 jobs_complete_for_logging=job_complete_for_logging 999 ) 1000 1001 # If any jobs failed, MultiThreadedJobs would have raised. Still defensively validate output. 1002 if results is None or (unique_paths and (None in results)): # type: ignore[operator] 1003 raise Exception("Failed to load all blobs") 1004 1005 if len(results) != len(unique_paths) or any(not item for item in results): # type: ignore[arg-type] 1006 raise Exception("Failed to load all blobs") 1007 1008 return results # type: ignore[return-value]
Load multiple blobs in parallel from a list of full GCS paths.
Args: full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object). file_name_only: Whether to return only the file path in each dict. workers: Number of worker threads. max_retries: Maximum number of retries per job. job_complete_for_logging: Emit progress logs every N completed jobs.
Returns:
List of dictionaries shaped like _create_bucket_contents_dict().
Raises: Exception: If any blob cannot be loaded / does not exist / returns no output.