ops_utils.gcp_utils
Module for GCP utilities.
1"""Module for GCP utilities.""" 2import os 3import logging 4import io 5import json 6import hashlib 7import base64 8import subprocess 9 10from humanfriendly import format_size, parse_size 11from mimetypes import guess_type 12from typing import Optional, Any 13from google.cloud.storage.blob import Blob 14from google.oauth2 import service_account 15from google.cloud import storage 16from google.auth import default 17 18from .vars import ARG_DEFAULTS 19from .thread_pool_executor_util import MultiThreadedJobs 20 21MOVE = "move" 22"""Variable to be used for the "action" when files should be moved.""" 23COPY = "copy" 24"""Variable to be used for the "action" when files should be copied.""" 25MD5_HEX = "hex" 26"""Variable to be used when the generated `md5` should be of the `hex` type.""" 27MD5_BASE64 = "base64" 28"""Variable to be used when the generated `md5` should be of the `base64` type.""" 29 30 31class GCPCloudFunctions: 32 """Class to handle GCP Cloud Functions.""" 33 34 def __init__( 35 self, 36 project: Optional[str] = None, 37 service_account_json: Optional[str] = None 38 ) -> None: 39 """ 40 Initialize the GCPCloudFunctions class. 41 42 Authenticates using service account JSON if provided or default credentials, 43 and sets up the Storage Client. 44 45 Args: 46 project: Optional[str] = None 47 The GCP project ID. If not provided, will use project from service account or default. 48 service_account_json: Optional[str] = None 49 Path to service account JSON key file. If provided, will use these credentials. 50 """ 51 # Initialize credentials and project 52 credentials = None 53 default_project = None 54 55 if service_account_json: 56 credentials = service_account.Credentials.from_service_account_file(service_account_json) 57 # Extract project from service account if not specified 58 if not project: 59 with open(service_account_json, 'r') as f: 60 sa_info = json.load(f) 61 project = sa_info.get('project_id') 62 else: 63 # Use default credentials 64 credentials, default_project = default() 65 66 # Set project if not already set 67 if not project: 68 project = default_project 69 70 self.client = storage.Client(credentials=credentials, project=project) 71 """@private""" 72 73 @staticmethod 74 def _process_cloud_path(cloud_path: str) -> dict: 75 """ 76 Process a GCS cloud path into its components. 77 78 Args: 79 cloud_path (str): The GCS cloud path. 80 81 Returns: 82 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 83 """ 84 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 85 bucket_name = str.split(remaining_url, sep="/")[0] 86 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 87 path_components = { 88 "platform_prefix": platform_prefix, 89 "bucket": bucket_name, 90 "blob_url": blob_name 91 } 92 return path_components 93 94 def load_blob_from_full_path(self, full_path: str) -> Blob: 95 """ 96 Load a GCS blob object from a full GCS path. 97 98 **Args:** 99 - full_path (str): The full GCS path. 100 101 **Returns:** 102 - google.cloud.storage.blob.Blob: The GCS blob object. 103 """ 104 file_path_components = self._process_cloud_path(full_path) 105 106 # Specify the billing project 107 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 108 blob = bucket.blob(file_path_components["blob_url"]) 109 110 # If blob exists in GCS reload it so metadata is there 111 if blob.exists(): 112 blob.reload() 113 return blob 114 115 def check_file_exists(self, full_path: str) -> bool: 116 """ 117 Check if a file exists in GCS. 118 119 **Args:** 120 - full_path (str): The full GCS path. 121 122 **Returns:** 123 - bool: `True` if the file exists, `False` otherwise. 124 """ 125 blob = self.load_blob_from_full_path(full_path) 126 return blob.exists() 127 128 @staticmethod 129 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 130 """ 131 Create a dictionary containing file information. 132 133 Args: 134 bucket_name (str): The name of the GCS bucket. 135 blob (Any): The GCS blob object. 136 file_name_only (bool): Whether to return only the file list. 137 138 Returns: 139 dict: A dictionary containing file information. 140 """ 141 if file_name_only: 142 return { 143 "path": f"gs://{bucket_name}/{blob.name}" 144 } 145 return { 146 "name": os.path.basename(blob.name), 147 "path": f"gs://{bucket_name}/{blob.name}", 148 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 149 "file_extension": os.path.splitext(blob.name)[1], 150 "size_in_bytes": blob.size, 151 "md5_hash": blob.md5_hash 152 } 153 154 @staticmethod 155 def _validate_include_blob( 156 blob: Any, 157 bucket_name: str, 158 file_extensions_to_ignore: list[str] = [], 159 file_strings_to_ignore: list[str] = [], 160 file_extensions_to_include: list[str] = [], 161 verbose: bool = False 162 ) -> bool: 163 """ 164 Validate if a blob should be included based on its file extension. 165 166 Args: 167 file_extensions_to_include (list[str]): List of file extensions to include. 168 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 169 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 170 blob (Any): The GCS blob object. 171 verbose (bool): Whether to log files not being included. 172 173 Returns: 174 bool: True if the blob should be included, False otherwise. 175 """ 176 file_path = f"gs://{bucket_name}/{blob.name}" 177 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 178 if verbose: 179 logging.info(f"Skipping {file_path} as it has an extension to ignore") 180 return False 181 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 182 if verbose: 183 logging.info(f"Skipping {file_path} as it does not have an extension to include") 184 return False 185 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 186 if verbose: 187 logging.info(f"Skipping {file_path} as it has a string to ignore") 188 return False 189 return True 190 191 def list_bucket_contents( 192 self, 193 bucket_name: str, 194 prefix: Optional[str] = None, 195 file_extensions_to_ignore: list[str] = [], 196 file_strings_to_ignore: list[str] = [], 197 file_extensions_to_include: list[str] = [], 198 file_name_only: bool = False 199 ) -> list[dict]: 200 """ 201 List contents of a GCS bucket and return a list of dictionaries with file information. 202 203 **Args:** 204 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 205 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 206 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 207 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 208 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 209 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 210 211 **Returns:** 212 - list[dict]: A list of dictionaries containing file information. 213 """ 214 # If the bucket name starts with gs://, remove it 215 if bucket_name.startswith("gs://"): 216 bucket_name = bucket_name.split("/")[2].strip() 217 218 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 219 220 # Get the bucket object and set user_project for Requester Pays 221 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 222 223 # List blobs within the bucket 224 blobs = bucket.list_blobs(prefix=prefix) 225 logging.info("Finished listing blobs. Processing files now.") 226 227 # Create a list of dictionaries containing file information 228 file_list = [ 229 self._create_bucket_contents_dict( 230 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 231 ) 232 for blob in blobs 233 if self._validate_include_blob( 234 blob=blob, 235 file_extensions_to_ignore=file_extensions_to_ignore, 236 file_strings_to_ignore=file_strings_to_ignore, 237 file_extensions_to_include=file_extensions_to_include, 238 bucket_name=bucket_name 239 ) and not blob.name.endswith("/") 240 ] 241 logging.info(f"Found {len(file_list)} files in bucket") 242 return file_list 243 244 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 245 """ 246 Copy a file from one GCS location to another. 247 248 **Args:** 249 - src_cloud_path (str): The source GCS path. 250 - full_destination_path (str): The destination GCS path. 251 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 252 """ 253 try: 254 src_blob = self.load_blob_from_full_path(src_cloud_path) 255 dest_blob = self.load_blob_from_full_path(full_destination_path) 256 257 # Use rewrite so no timeouts 258 rewrite_token = False 259 260 while True: 261 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 262 src_blob, token=rewrite_token 263 ) 264 if verbose: 265 logging.info( 266 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 267 ) 268 if not rewrite_token: 269 break 270 271 except Exception as e: 272 logging.error( 273 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 274 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 275 ) 276 raise 277 278 def delete_cloud_file(self, full_cloud_path: str) -> None: 279 """ 280 Delete a file from GCS. 281 282 **Args:** 283 - full_cloud_path (str): The GCS path of the file to delete. 284 """ 285 blob = self.load_blob_from_full_path(full_cloud_path) 286 blob.delete() 287 288 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 289 """ 290 Move a file from one GCS location to another. 291 292 **Args:** 293 - src_cloud_path (str): The source GCS path. 294 - full_destination_path (str): The destination GCS path. 295 """ 296 self.copy_cloud_file(src_cloud_path, full_destination_path) 297 self.delete_cloud_file(src_cloud_path) 298 299 def get_filesize(self, target_path: str) -> int: 300 """ 301 Get the size of a file in GCS. 302 303 **Args:** 304 - target_path (str): The GCS path of the file. 305 306 **Returns:** 307 - int: The size of the file in bytes. 308 """ 309 blob = self.load_blob_from_full_path(target_path) 310 return blob.size 311 312 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 313 """ 314 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 315 316 **Args:** 317 - src_cloud_path (str): The source GCS path. 318 - dest_cloud_path (str): The destination GCS path. 319 320 **Returns:** 321 - bool: `True` if the files are identical, `False` otherwise. 322 """ 323 src_blob = self.load_blob_from_full_path(src_cloud_path) 324 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 325 326 # If either blob is None or does not exist 327 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 328 return False 329 # If the MD5 hashes exist 330 if src_blob.md5_hash and dest_blob.md5_hash: 331 # And are the same return True 332 if src_blob.md5_hash == dest_blob.md5_hash: 333 return True 334 else: 335 # If md5 do not exist (for larger files they may not) check size matches 336 if src_blob.size == dest_blob.size: 337 return True 338 # Otherwise, return False 339 return False 340 341 def delete_multiple_files( 342 self, 343 files_to_delete: list[str], 344 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 345 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 346 verbose: bool = False, 347 job_complete_for_logging: int = 500 348 ) -> None: 349 """ 350 Delete multiple cloud files in parallel using multi-threading. 351 352 **Args:** 353 - files_to_delete (list[str]): List of GCS paths of the files to delete. 354 - workers (int, optional): Number of worker threads. Defaults to `10`. 355 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 356 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 357 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 358 """ 359 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 360 361 MultiThreadedJobs().run_multi_threaded_job( 362 workers=workers, 363 function=self.delete_cloud_file, 364 list_of_jobs_args_list=list_of_jobs_args_list, 365 max_retries=max_retries, 366 fail_on_error=True, 367 verbose=verbose, 368 collect_output=False, 369 jobs_complete_for_logging=job_complete_for_logging 370 ) 371 372 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 373 """ 374 Validate if source and destination files are identical. 375 376 **Args:** 377 - source_file (str): The source file path. 378 - full_destination_path (str): The destination file path. 379 380 **Returns:** 381 dict: The file dictionary of the files with a boolean indicating if they are identical. 382 """ 383 if self.validate_files_are_same(source_file, full_destination_path): 384 identical = True 385 else: 386 identical = False 387 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 388 389 def loop_and_log_validation_files_multithreaded( 390 self, 391 files_to_validate: list[dict], 392 log_difference: bool, 393 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 394 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 395 job_complete_for_logging: int = 500 396 ) -> list[dict]: 397 """ 398 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 399 400 **Args:** 401 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 402 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 403 this at the start of a copy/move operation to check if files are already copied. 404 - workers (int, optional): Number of worker threads. Defaults to `10`. 405 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 406 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 407 408 **Returns:** 409 - list[dict]: List of dictionaries containing files that are **not** identical. 410 """ 411 logging.info(f"Validating if {len(files_to_validate)} files are identical") 412 413 # Prepare jobs: pass the necessary arguments to each validation 414 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 415 416 # Use multithreaded job runner to validate the files 417 checked_files = MultiThreadedJobs().run_multi_threaded_job( 418 workers=workers, 419 function=self._validate_file_pair, 420 list_of_jobs_args_list=jobs, 421 collect_output=True, 422 max_retries=max_retries, 423 jobs_complete_for_logging=job_complete_for_logging 424 ) 425 # If any files failed to load, raise an exception 426 if files_to_validate and None in checked_files: # type: ignore[operator] 427 logging.error("Failed to validate all files, could not load some blobs") 428 raise Exception("Failed to validate all files") 429 430 # Get all files that are not identical 431 not_identical_files = [ 432 file_dict 433 for file_dict in checked_files # type: ignore[operator, union-attr] 434 if not file_dict['identical'] 435 ] 436 if not_identical_files: 437 if log_difference: 438 for file_dict in not_identical_files: 439 logging.warning( 440 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 441 ) 442 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 443 return not_identical_files 444 445 def multithread_copy_of_files_with_validation( 446 self, 447 files_to_copy: list[dict], 448 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 449 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 450 skip_check_if_already_copied: bool = False 451 ) -> None: 452 """ 453 Copy multiple files in parallel with validation. 454 455 **Args:** 456 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 457 Dictionary should have keys `source_file` and `full_destination_path` 458 - workers (int): Number of worker threads. Defaults to `10`. 459 - max_retries (int): Maximum number of retries. Defaults to `5` 460 - skip_check_if_already_copied (bool, optional): Whether to skip checking 461 if files are already copied and start copying right away. Defaults to `False`. 462 """ 463 if skip_check_if_already_copied: 464 logging.info("Skipping check if files are already copied") 465 updated_files_to_move = files_to_copy 466 else: 467 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 468 files_to_copy, 469 log_difference=False, 470 workers=workers, 471 max_retries=max_retries 472 ) 473 # If all files are already copied, return 474 if not updated_files_to_move: 475 logging.info("All files are already copied") 476 return None 477 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 478 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 479 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 480 # Validate that all files were copied successfully 481 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 482 files_to_copy, 483 workers=workers, 484 log_difference=True, 485 max_retries=max_retries 486 ) 487 if files_not_moved_successfully: 488 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 489 raise Exception("Failed to copy all files") 490 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 491 return None 492 493 def move_or_copy_multiple_files( 494 self, files_to_move: list[dict], 495 action: str, 496 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 497 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 498 verbose: bool = False, 499 jobs_complete_for_logging: int = 500 500 ) -> None: 501 """ 502 Move or copy multiple files in parallel. 503 504 **Args:** 505 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 506 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 507 or `ops_utils.gcp_utils.COPY`). 508 - workers (int): Number of worker threads. Defaults to `10`. 509 - max_retries (int): Maximum number of retries. Defaults to `5`. 510 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 511 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 512 513 **Raises:** 514 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 515 """ 516 if action == MOVE: 517 cloud_function = self.move_cloud_file 518 elif action == COPY: 519 cloud_function = self.copy_cloud_file 520 else: 521 raise ValueError("Must either select move or copy") 522 523 list_of_jobs_args_list = [ 524 [ 525 file_dict['source_file'], file_dict['full_destination_path'] 526 ] 527 for file_dict in files_to_move 528 ] 529 MultiThreadedJobs().run_multi_threaded_job( 530 workers=workers, 531 function=cloud_function, 532 list_of_jobs_args_list=list_of_jobs_args_list, 533 max_retries=max_retries, 534 fail_on_error=True, 535 verbose=verbose, 536 collect_output=False, 537 jobs_complete_for_logging=jobs_complete_for_logging 538 ) 539 540 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 541 """ 542 Read the content of a file from GCS. 543 544 **Args:** 545 - cloud_path (str): The GCS path of the file to read. 546 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 547 548 **Returns:** 549 - bytes: The content of the file as bytes. 550 """ 551 blob = self.load_blob_from_full_path(cloud_path) 552 # Download the file content as bytes 553 content_bytes = blob.download_as_bytes() 554 # Convert bytes to string 555 content_str = content_bytes.decode(encoding) 556 return content_str 557 558 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 559 """ 560 Upload a file to GCS. 561 562 **Args:** 563 - destination_path (str): The destination GCS path. 564 - source_file (str): The source file path. 565 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 566 """ 567 blob = self.load_blob_from_full_path(destination_path) 568 if custom_metadata: 569 blob.metadata = custom_metadata 570 blob.upload_from_filename(source_file) 571 572 def get_object_md5( 573 self, 574 file_path: str, 575 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 576 chunk_size: int = parse_size("256 KB"), 577 logging_bytes: int = parse_size("1 GB"), 578 returned_md5_format: str = "hex" 579 ) -> str: 580 """ 581 Calculate the MD5 checksum of a file in GCS. 582 583 **Args:** 584 - file_path (str): The GCS path of the file. 585 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 586 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 587 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 588 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 589 590 **Returns:** 591 - str: The MD5 checksum of the file. 592 593 **Raises:** 594 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 595 or `ops_utils.gcp_utils.MD5_BASE64` 596 """ 597 if returned_md5_format not in ["hex", "base64"]: 598 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 599 600 blob = self.load_blob_from_full_path(file_path) 601 602 # Create an MD5 hash object 603 md5_hash = hashlib.md5() 604 605 blob_size_str = format_size(blob.size) 606 logging.info(f"Streaming {file_path} which is {blob_size_str}") 607 # Use a BytesIO stream to collect data in chunks and upload it 608 buffer = io.BytesIO() 609 total_bytes_streamed = 0 610 # Keep track of the last logged size for data logging 611 last_logged = 0 612 613 with blob.open("rb") as source_stream: 614 while True: 615 chunk = source_stream.read(chunk_size) 616 if not chunk: 617 break 618 md5_hash.update(chunk) 619 buffer.write(chunk) 620 total_bytes_streamed += len(chunk) 621 # Log progress every 1 gb if verbose used 622 if total_bytes_streamed - last_logged >= logging_bytes: 623 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 624 last_logged = total_bytes_streamed 625 626 if returned_md5_format == "hex": 627 md5 = md5_hash.hexdigest() 628 logging.info(f"MD5 (hex) for {file_path}: {md5}") 629 elif returned_md5_format == "base64": 630 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 631 logging.info(f"MD5 (base64) for {file_path}: {md5}") 632 return md5 633 634 def set_acl_public_read(self, cloud_path: str) -> None: 635 """ 636 Set the file in the bucket to be publicly readable. 637 638 **Args:** 639 - cloud_path (str): The GCS path of the file to be set as public readable. 640 """ 641 blob = self.load_blob_from_full_path(cloud_path) 642 blob.acl.all().grant_read() 643 blob.acl.save() 644 645 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 646 """ 647 Set the file in the bucket to grant OWNER permission to a specific group. 648 649 **Args:** 650 - cloud_path (str): The GCS path of the file. 651 - group_email (str): The email of the group to grant OWNER permission 652 """ 653 blob = self.load_blob_from_full_path(cloud_path) 654 blob.acl.group(group_email).grant_owner() 655 blob.acl.save() 656 657 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 658 """ 659 Set Cache-Control metadata for a file. 660 661 **Args:** 662 - cloud_path (str): The GCS path of the file. 663 - cache_control (str): The Cache-Control metadata to set. 664 """ 665 blob = self.load_blob_from_full_path(cloud_path) 666 blob.cache_control = cache_control 667 blob.patch() 668 669 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 670 """ 671 Get the most recent blob in the bucket. 672 673 If the blob with the most recent timestamp doesn't have 674 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 675 recent file until a log with useful information is encountered. This is useful when combing through 676 GCP activity logs for Terra workspace buckets. 677 678 **Args:** 679 - bucket_name (str): The GCS bucket name. 680 681 **Returns:** 682 - Optional tuple of the blob found and the file contents from the blob 683 """ 684 blobs = sorted( 685 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 686 ) 687 for blob in blobs: 688 # Download the file contents as a string 689 file_contents = blob.download_as_text() 690 691 # Check if the content matches the undesired format 692 lines = file_contents.splitlines() 693 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 694 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 695 continue 696 697 # If it doesn't match the undesired format, return its content 698 logging.info(f"Found valid file: {blob.name}") 699 return blob, file_contents 700 701 logging.info("No valid files found.") 702 return None 703 704 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 705 """ 706 Write content to a file in GCS. 707 708 **Args:** 709 - cloud_path (str): The GCS path of the file to write. 710 - file_contents (str): The content to write. 711 """ 712 blob = self.load_blob_from_full_path(cloud_path) 713 blob.upload_from_string(file_contents) 714 logging.info(f"Successfully wrote content to {cloud_path}") 715 716 @staticmethod 717 def get_active_gcloud_account() -> str: 718 """ 719 Get the active GCP email for the current account. 720 721 **Returns:** 722 - str: The active GCP account email. 723 """ 724 result = subprocess.run( 725 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 726 capture_output=True, 727 text=True, 728 check=True 729 ) 730 return result.stdout.strip() 731 732
Variable to be used for the "action" when files should be moved.
Variable to be used for the "action" when files should be copied.
Variable to be used when the generated md5
should be of the hex
type.
Variable to be used when the generated md5
should be of the base64
type.
32class GCPCloudFunctions: 33 """Class to handle GCP Cloud Functions.""" 34 35 def __init__( 36 self, 37 project: Optional[str] = None, 38 service_account_json: Optional[str] = None 39 ) -> None: 40 """ 41 Initialize the GCPCloudFunctions class. 42 43 Authenticates using service account JSON if provided or default credentials, 44 and sets up the Storage Client. 45 46 Args: 47 project: Optional[str] = None 48 The GCP project ID. If not provided, will use project from service account or default. 49 service_account_json: Optional[str] = None 50 Path to service account JSON key file. If provided, will use these credentials. 51 """ 52 # Initialize credentials and project 53 credentials = None 54 default_project = None 55 56 if service_account_json: 57 credentials = service_account.Credentials.from_service_account_file(service_account_json) 58 # Extract project from service account if not specified 59 if not project: 60 with open(service_account_json, 'r') as f: 61 sa_info = json.load(f) 62 project = sa_info.get('project_id') 63 else: 64 # Use default credentials 65 credentials, default_project = default() 66 67 # Set project if not already set 68 if not project: 69 project = default_project 70 71 self.client = storage.Client(credentials=credentials, project=project) 72 """@private""" 73 74 @staticmethod 75 def _process_cloud_path(cloud_path: str) -> dict: 76 """ 77 Process a GCS cloud path into its components. 78 79 Args: 80 cloud_path (str): The GCS cloud path. 81 82 Returns: 83 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 84 """ 85 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 86 bucket_name = str.split(remaining_url, sep="/")[0] 87 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 88 path_components = { 89 "platform_prefix": platform_prefix, 90 "bucket": bucket_name, 91 "blob_url": blob_name 92 } 93 return path_components 94 95 def load_blob_from_full_path(self, full_path: str) -> Blob: 96 """ 97 Load a GCS blob object from a full GCS path. 98 99 **Args:** 100 - full_path (str): The full GCS path. 101 102 **Returns:** 103 - google.cloud.storage.blob.Blob: The GCS blob object. 104 """ 105 file_path_components = self._process_cloud_path(full_path) 106 107 # Specify the billing project 108 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 109 blob = bucket.blob(file_path_components["blob_url"]) 110 111 # If blob exists in GCS reload it so metadata is there 112 if blob.exists(): 113 blob.reload() 114 return blob 115 116 def check_file_exists(self, full_path: str) -> bool: 117 """ 118 Check if a file exists in GCS. 119 120 **Args:** 121 - full_path (str): The full GCS path. 122 123 **Returns:** 124 - bool: `True` if the file exists, `False` otherwise. 125 """ 126 blob = self.load_blob_from_full_path(full_path) 127 return blob.exists() 128 129 @staticmethod 130 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 131 """ 132 Create a dictionary containing file information. 133 134 Args: 135 bucket_name (str): The name of the GCS bucket. 136 blob (Any): The GCS blob object. 137 file_name_only (bool): Whether to return only the file list. 138 139 Returns: 140 dict: A dictionary containing file information. 141 """ 142 if file_name_only: 143 return { 144 "path": f"gs://{bucket_name}/{blob.name}" 145 } 146 return { 147 "name": os.path.basename(blob.name), 148 "path": f"gs://{bucket_name}/{blob.name}", 149 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 150 "file_extension": os.path.splitext(blob.name)[1], 151 "size_in_bytes": blob.size, 152 "md5_hash": blob.md5_hash 153 } 154 155 @staticmethod 156 def _validate_include_blob( 157 blob: Any, 158 bucket_name: str, 159 file_extensions_to_ignore: list[str] = [], 160 file_strings_to_ignore: list[str] = [], 161 file_extensions_to_include: list[str] = [], 162 verbose: bool = False 163 ) -> bool: 164 """ 165 Validate if a blob should be included based on its file extension. 166 167 Args: 168 file_extensions_to_include (list[str]): List of file extensions to include. 169 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 170 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 171 blob (Any): The GCS blob object. 172 verbose (bool): Whether to log files not being included. 173 174 Returns: 175 bool: True if the blob should be included, False otherwise. 176 """ 177 file_path = f"gs://{bucket_name}/{blob.name}" 178 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 179 if verbose: 180 logging.info(f"Skipping {file_path} as it has an extension to ignore") 181 return False 182 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 183 if verbose: 184 logging.info(f"Skipping {file_path} as it does not have an extension to include") 185 return False 186 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 187 if verbose: 188 logging.info(f"Skipping {file_path} as it has a string to ignore") 189 return False 190 return True 191 192 def list_bucket_contents( 193 self, 194 bucket_name: str, 195 prefix: Optional[str] = None, 196 file_extensions_to_ignore: list[str] = [], 197 file_strings_to_ignore: list[str] = [], 198 file_extensions_to_include: list[str] = [], 199 file_name_only: bool = False 200 ) -> list[dict]: 201 """ 202 List contents of a GCS bucket and return a list of dictionaries with file information. 203 204 **Args:** 205 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 206 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 207 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 208 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 209 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 210 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 211 212 **Returns:** 213 - list[dict]: A list of dictionaries containing file information. 214 """ 215 # If the bucket name starts with gs://, remove it 216 if bucket_name.startswith("gs://"): 217 bucket_name = bucket_name.split("/")[2].strip() 218 219 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 220 221 # Get the bucket object and set user_project for Requester Pays 222 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 223 224 # List blobs within the bucket 225 blobs = bucket.list_blobs(prefix=prefix) 226 logging.info("Finished listing blobs. Processing files now.") 227 228 # Create a list of dictionaries containing file information 229 file_list = [ 230 self._create_bucket_contents_dict( 231 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 232 ) 233 for blob in blobs 234 if self._validate_include_blob( 235 blob=blob, 236 file_extensions_to_ignore=file_extensions_to_ignore, 237 file_strings_to_ignore=file_strings_to_ignore, 238 file_extensions_to_include=file_extensions_to_include, 239 bucket_name=bucket_name 240 ) and not blob.name.endswith("/") 241 ] 242 logging.info(f"Found {len(file_list)} files in bucket") 243 return file_list 244 245 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 246 """ 247 Copy a file from one GCS location to another. 248 249 **Args:** 250 - src_cloud_path (str): The source GCS path. 251 - full_destination_path (str): The destination GCS path. 252 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 253 """ 254 try: 255 src_blob = self.load_blob_from_full_path(src_cloud_path) 256 dest_blob = self.load_blob_from_full_path(full_destination_path) 257 258 # Use rewrite so no timeouts 259 rewrite_token = False 260 261 while True: 262 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 263 src_blob, token=rewrite_token 264 ) 265 if verbose: 266 logging.info( 267 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 268 ) 269 if not rewrite_token: 270 break 271 272 except Exception as e: 273 logging.error( 274 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 275 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 276 ) 277 raise 278 279 def delete_cloud_file(self, full_cloud_path: str) -> None: 280 """ 281 Delete a file from GCS. 282 283 **Args:** 284 - full_cloud_path (str): The GCS path of the file to delete. 285 """ 286 blob = self.load_blob_from_full_path(full_cloud_path) 287 blob.delete() 288 289 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 290 """ 291 Move a file from one GCS location to another. 292 293 **Args:** 294 - src_cloud_path (str): The source GCS path. 295 - full_destination_path (str): The destination GCS path. 296 """ 297 self.copy_cloud_file(src_cloud_path, full_destination_path) 298 self.delete_cloud_file(src_cloud_path) 299 300 def get_filesize(self, target_path: str) -> int: 301 """ 302 Get the size of a file in GCS. 303 304 **Args:** 305 - target_path (str): The GCS path of the file. 306 307 **Returns:** 308 - int: The size of the file in bytes. 309 """ 310 blob = self.load_blob_from_full_path(target_path) 311 return blob.size 312 313 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 314 """ 315 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 316 317 **Args:** 318 - src_cloud_path (str): The source GCS path. 319 - dest_cloud_path (str): The destination GCS path. 320 321 **Returns:** 322 - bool: `True` if the files are identical, `False` otherwise. 323 """ 324 src_blob = self.load_blob_from_full_path(src_cloud_path) 325 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 326 327 # If either blob is None or does not exist 328 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 329 return False 330 # If the MD5 hashes exist 331 if src_blob.md5_hash and dest_blob.md5_hash: 332 # And are the same return True 333 if src_blob.md5_hash == dest_blob.md5_hash: 334 return True 335 else: 336 # If md5 do not exist (for larger files they may not) check size matches 337 if src_blob.size == dest_blob.size: 338 return True 339 # Otherwise, return False 340 return False 341 342 def delete_multiple_files( 343 self, 344 files_to_delete: list[str], 345 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 346 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 347 verbose: bool = False, 348 job_complete_for_logging: int = 500 349 ) -> None: 350 """ 351 Delete multiple cloud files in parallel using multi-threading. 352 353 **Args:** 354 - files_to_delete (list[str]): List of GCS paths of the files to delete. 355 - workers (int, optional): Number of worker threads. Defaults to `10`. 356 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 357 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 358 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 359 """ 360 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 361 362 MultiThreadedJobs().run_multi_threaded_job( 363 workers=workers, 364 function=self.delete_cloud_file, 365 list_of_jobs_args_list=list_of_jobs_args_list, 366 max_retries=max_retries, 367 fail_on_error=True, 368 verbose=verbose, 369 collect_output=False, 370 jobs_complete_for_logging=job_complete_for_logging 371 ) 372 373 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 374 """ 375 Validate if source and destination files are identical. 376 377 **Args:** 378 - source_file (str): The source file path. 379 - full_destination_path (str): The destination file path. 380 381 **Returns:** 382 dict: The file dictionary of the files with a boolean indicating if they are identical. 383 """ 384 if self.validate_files_are_same(source_file, full_destination_path): 385 identical = True 386 else: 387 identical = False 388 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 389 390 def loop_and_log_validation_files_multithreaded( 391 self, 392 files_to_validate: list[dict], 393 log_difference: bool, 394 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 395 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 396 job_complete_for_logging: int = 500 397 ) -> list[dict]: 398 """ 399 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 400 401 **Args:** 402 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 403 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 404 this at the start of a copy/move operation to check if files are already copied. 405 - workers (int, optional): Number of worker threads. Defaults to `10`. 406 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 407 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 408 409 **Returns:** 410 - list[dict]: List of dictionaries containing files that are **not** identical. 411 """ 412 logging.info(f"Validating if {len(files_to_validate)} files are identical") 413 414 # Prepare jobs: pass the necessary arguments to each validation 415 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 416 417 # Use multithreaded job runner to validate the files 418 checked_files = MultiThreadedJobs().run_multi_threaded_job( 419 workers=workers, 420 function=self._validate_file_pair, 421 list_of_jobs_args_list=jobs, 422 collect_output=True, 423 max_retries=max_retries, 424 jobs_complete_for_logging=job_complete_for_logging 425 ) 426 # If any files failed to load, raise an exception 427 if files_to_validate and None in checked_files: # type: ignore[operator] 428 logging.error("Failed to validate all files, could not load some blobs") 429 raise Exception("Failed to validate all files") 430 431 # Get all files that are not identical 432 not_identical_files = [ 433 file_dict 434 for file_dict in checked_files # type: ignore[operator, union-attr] 435 if not file_dict['identical'] 436 ] 437 if not_identical_files: 438 if log_difference: 439 for file_dict in not_identical_files: 440 logging.warning( 441 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 442 ) 443 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 444 return not_identical_files 445 446 def multithread_copy_of_files_with_validation( 447 self, 448 files_to_copy: list[dict], 449 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 450 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 451 skip_check_if_already_copied: bool = False 452 ) -> None: 453 """ 454 Copy multiple files in parallel with validation. 455 456 **Args:** 457 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 458 Dictionary should have keys `source_file` and `full_destination_path` 459 - workers (int): Number of worker threads. Defaults to `10`. 460 - max_retries (int): Maximum number of retries. Defaults to `5` 461 - skip_check_if_already_copied (bool, optional): Whether to skip checking 462 if files are already copied and start copying right away. Defaults to `False`. 463 """ 464 if skip_check_if_already_copied: 465 logging.info("Skipping check if files are already copied") 466 updated_files_to_move = files_to_copy 467 else: 468 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 469 files_to_copy, 470 log_difference=False, 471 workers=workers, 472 max_retries=max_retries 473 ) 474 # If all files are already copied, return 475 if not updated_files_to_move: 476 logging.info("All files are already copied") 477 return None 478 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 479 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 480 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 481 # Validate that all files were copied successfully 482 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 483 files_to_copy, 484 workers=workers, 485 log_difference=True, 486 max_retries=max_retries 487 ) 488 if files_not_moved_successfully: 489 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 490 raise Exception("Failed to copy all files") 491 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 492 return None 493 494 def move_or_copy_multiple_files( 495 self, files_to_move: list[dict], 496 action: str, 497 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 498 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 499 verbose: bool = False, 500 jobs_complete_for_logging: int = 500 501 ) -> None: 502 """ 503 Move or copy multiple files in parallel. 504 505 **Args:** 506 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 507 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 508 or `ops_utils.gcp_utils.COPY`). 509 - workers (int): Number of worker threads. Defaults to `10`. 510 - max_retries (int): Maximum number of retries. Defaults to `5`. 511 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 512 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 513 514 **Raises:** 515 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 516 """ 517 if action == MOVE: 518 cloud_function = self.move_cloud_file 519 elif action == COPY: 520 cloud_function = self.copy_cloud_file 521 else: 522 raise ValueError("Must either select move or copy") 523 524 list_of_jobs_args_list = [ 525 [ 526 file_dict['source_file'], file_dict['full_destination_path'] 527 ] 528 for file_dict in files_to_move 529 ] 530 MultiThreadedJobs().run_multi_threaded_job( 531 workers=workers, 532 function=cloud_function, 533 list_of_jobs_args_list=list_of_jobs_args_list, 534 max_retries=max_retries, 535 fail_on_error=True, 536 verbose=verbose, 537 collect_output=False, 538 jobs_complete_for_logging=jobs_complete_for_logging 539 ) 540 541 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 542 """ 543 Read the content of a file from GCS. 544 545 **Args:** 546 - cloud_path (str): The GCS path of the file to read. 547 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 548 549 **Returns:** 550 - bytes: The content of the file as bytes. 551 """ 552 blob = self.load_blob_from_full_path(cloud_path) 553 # Download the file content as bytes 554 content_bytes = blob.download_as_bytes() 555 # Convert bytes to string 556 content_str = content_bytes.decode(encoding) 557 return content_str 558 559 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 560 """ 561 Upload a file to GCS. 562 563 **Args:** 564 - destination_path (str): The destination GCS path. 565 - source_file (str): The source file path. 566 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 567 """ 568 blob = self.load_blob_from_full_path(destination_path) 569 if custom_metadata: 570 blob.metadata = custom_metadata 571 blob.upload_from_filename(source_file) 572 573 def get_object_md5( 574 self, 575 file_path: str, 576 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 577 chunk_size: int = parse_size("256 KB"), 578 logging_bytes: int = parse_size("1 GB"), 579 returned_md5_format: str = "hex" 580 ) -> str: 581 """ 582 Calculate the MD5 checksum of a file in GCS. 583 584 **Args:** 585 - file_path (str): The GCS path of the file. 586 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 587 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 588 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 589 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 590 591 **Returns:** 592 - str: The MD5 checksum of the file. 593 594 **Raises:** 595 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 596 or `ops_utils.gcp_utils.MD5_BASE64` 597 """ 598 if returned_md5_format not in ["hex", "base64"]: 599 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 600 601 blob = self.load_blob_from_full_path(file_path) 602 603 # Create an MD5 hash object 604 md5_hash = hashlib.md5() 605 606 blob_size_str = format_size(blob.size) 607 logging.info(f"Streaming {file_path} which is {blob_size_str}") 608 # Use a BytesIO stream to collect data in chunks and upload it 609 buffer = io.BytesIO() 610 total_bytes_streamed = 0 611 # Keep track of the last logged size for data logging 612 last_logged = 0 613 614 with blob.open("rb") as source_stream: 615 while True: 616 chunk = source_stream.read(chunk_size) 617 if not chunk: 618 break 619 md5_hash.update(chunk) 620 buffer.write(chunk) 621 total_bytes_streamed += len(chunk) 622 # Log progress every 1 gb if verbose used 623 if total_bytes_streamed - last_logged >= logging_bytes: 624 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 625 last_logged = total_bytes_streamed 626 627 if returned_md5_format == "hex": 628 md5 = md5_hash.hexdigest() 629 logging.info(f"MD5 (hex) for {file_path}: {md5}") 630 elif returned_md5_format == "base64": 631 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 632 logging.info(f"MD5 (base64) for {file_path}: {md5}") 633 return md5 634 635 def set_acl_public_read(self, cloud_path: str) -> None: 636 """ 637 Set the file in the bucket to be publicly readable. 638 639 **Args:** 640 - cloud_path (str): The GCS path of the file to be set as public readable. 641 """ 642 blob = self.load_blob_from_full_path(cloud_path) 643 blob.acl.all().grant_read() 644 blob.acl.save() 645 646 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 647 """ 648 Set the file in the bucket to grant OWNER permission to a specific group. 649 650 **Args:** 651 - cloud_path (str): The GCS path of the file. 652 - group_email (str): The email of the group to grant OWNER permission 653 """ 654 blob = self.load_blob_from_full_path(cloud_path) 655 blob.acl.group(group_email).grant_owner() 656 blob.acl.save() 657 658 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 659 """ 660 Set Cache-Control metadata for a file. 661 662 **Args:** 663 - cloud_path (str): The GCS path of the file. 664 - cache_control (str): The Cache-Control metadata to set. 665 """ 666 blob = self.load_blob_from_full_path(cloud_path) 667 blob.cache_control = cache_control 668 blob.patch() 669 670 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 671 """ 672 Get the most recent blob in the bucket. 673 674 If the blob with the most recent timestamp doesn't have 675 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 676 recent file until a log with useful information is encountered. This is useful when combing through 677 GCP activity logs for Terra workspace buckets. 678 679 **Args:** 680 - bucket_name (str): The GCS bucket name. 681 682 **Returns:** 683 - Optional tuple of the blob found and the file contents from the blob 684 """ 685 blobs = sorted( 686 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 687 ) 688 for blob in blobs: 689 # Download the file contents as a string 690 file_contents = blob.download_as_text() 691 692 # Check if the content matches the undesired format 693 lines = file_contents.splitlines() 694 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 695 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 696 continue 697 698 # If it doesn't match the undesired format, return its content 699 logging.info(f"Found valid file: {blob.name}") 700 return blob, file_contents 701 702 logging.info("No valid files found.") 703 return None 704 705 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 706 """ 707 Write content to a file in GCS. 708 709 **Args:** 710 - cloud_path (str): The GCS path of the file to write. 711 - file_contents (str): The content to write. 712 """ 713 blob = self.load_blob_from_full_path(cloud_path) 714 blob.upload_from_string(file_contents) 715 logging.info(f"Successfully wrote content to {cloud_path}") 716 717 @staticmethod 718 def get_active_gcloud_account() -> str: 719 """ 720 Get the active GCP email for the current account. 721 722 **Returns:** 723 - str: The active GCP account email. 724 """ 725 result = subprocess.run( 726 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 727 capture_output=True, 728 text=True, 729 check=True 730 ) 731 return result.stdout.strip()
Class to handle GCP Cloud Functions.
35 def __init__( 36 self, 37 project: Optional[str] = None, 38 service_account_json: Optional[str] = None 39 ) -> None: 40 """ 41 Initialize the GCPCloudFunctions class. 42 43 Authenticates using service account JSON if provided or default credentials, 44 and sets up the Storage Client. 45 46 Args: 47 project: Optional[str] = None 48 The GCP project ID. If not provided, will use project from service account or default. 49 service_account_json: Optional[str] = None 50 Path to service account JSON key file. If provided, will use these credentials. 51 """ 52 # Initialize credentials and project 53 credentials = None 54 default_project = None 55 56 if service_account_json: 57 credentials = service_account.Credentials.from_service_account_file(service_account_json) 58 # Extract project from service account if not specified 59 if not project: 60 with open(service_account_json, 'r') as f: 61 sa_info = json.load(f) 62 project = sa_info.get('project_id') 63 else: 64 # Use default credentials 65 credentials, default_project = default() 66 67 # Set project if not already set 68 if not project: 69 project = default_project 70 71 self.client = storage.Client(credentials=credentials, project=project) 72 """@private"""
Initialize the GCPCloudFunctions class.
Authenticates using service account JSON if provided or default credentials, and sets up the Storage Client.
Args: project: Optional[str] = None The GCP project ID. If not provided, will use project from service account or default. service_account_json: Optional[str] = None Path to service account JSON key file. If provided, will use these credentials.
95 def load_blob_from_full_path(self, full_path: str) -> Blob: 96 """ 97 Load a GCS blob object from a full GCS path. 98 99 **Args:** 100 - full_path (str): The full GCS path. 101 102 **Returns:** 103 - google.cloud.storage.blob.Blob: The GCS blob object. 104 """ 105 file_path_components = self._process_cloud_path(full_path) 106 107 # Specify the billing project 108 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 109 blob = bucket.blob(file_path_components["blob_url"]) 110 111 # If blob exists in GCS reload it so metadata is there 112 if blob.exists(): 113 blob.reload() 114 return blob
Load a GCS blob object from a full GCS path.
Args:
- full_path (str): The full GCS path.
Returns:
- google.cloud.storage.blob.Blob: The GCS blob object.
116 def check_file_exists(self, full_path: str) -> bool: 117 """ 118 Check if a file exists in GCS. 119 120 **Args:** 121 - full_path (str): The full GCS path. 122 123 **Returns:** 124 - bool: `True` if the file exists, `False` otherwise. 125 """ 126 blob = self.load_blob_from_full_path(full_path) 127 return blob.exists()
Check if a file exists in GCS.
Args:
- full_path (str): The full GCS path.
Returns:
- bool:
True
if the file exists,False
otherwise.
192 def list_bucket_contents( 193 self, 194 bucket_name: str, 195 prefix: Optional[str] = None, 196 file_extensions_to_ignore: list[str] = [], 197 file_strings_to_ignore: list[str] = [], 198 file_extensions_to_include: list[str] = [], 199 file_name_only: bool = False 200 ) -> list[dict]: 201 """ 202 List contents of a GCS bucket and return a list of dictionaries with file information. 203 204 **Args:** 205 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 206 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 207 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 208 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 209 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 210 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 211 212 **Returns:** 213 - list[dict]: A list of dictionaries containing file information. 214 """ 215 # If the bucket name starts with gs://, remove it 216 if bucket_name.startswith("gs://"): 217 bucket_name = bucket_name.split("/")[2].strip() 218 219 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 220 221 # Get the bucket object and set user_project for Requester Pays 222 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 223 224 # List blobs within the bucket 225 blobs = bucket.list_blobs(prefix=prefix) 226 logging.info("Finished listing blobs. Processing files now.") 227 228 # Create a list of dictionaries containing file information 229 file_list = [ 230 self._create_bucket_contents_dict( 231 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 232 ) 233 for blob in blobs 234 if self._validate_include_blob( 235 blob=blob, 236 file_extensions_to_ignore=file_extensions_to_ignore, 237 file_strings_to_ignore=file_strings_to_ignore, 238 file_extensions_to_include=file_extensions_to_include, 239 bucket_name=bucket_name 240 ) and not blob.name.endswith("/") 241 ] 242 logging.info(f"Found {len(file_list)} files in bucket") 243 return file_list
List contents of a GCS bucket and return a list of dictionaries with file information.
Args:
- bucket_name (str): The name of the GCS bucket. If includes
gs://
, it will be removed. - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
- file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
- file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
- file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
- file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to
False
.
Returns:
- list[dict]: A list of dictionaries containing file information.
245 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 246 """ 247 Copy a file from one GCS location to another. 248 249 **Args:** 250 - src_cloud_path (str): The source GCS path. 251 - full_destination_path (str): The destination GCS path. 252 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 253 """ 254 try: 255 src_blob = self.load_blob_from_full_path(src_cloud_path) 256 dest_blob = self.load_blob_from_full_path(full_destination_path) 257 258 # Use rewrite so no timeouts 259 rewrite_token = False 260 261 while True: 262 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 263 src_blob, token=rewrite_token 264 ) 265 if verbose: 266 logging.info( 267 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 268 ) 269 if not rewrite_token: 270 break 271 272 except Exception as e: 273 logging.error( 274 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 275 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 276 ) 277 raise
Copy a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
- verbose (bool, optional): Whether to log progress. Defaults to
False
.
279 def delete_cloud_file(self, full_cloud_path: str) -> None: 280 """ 281 Delete a file from GCS. 282 283 **Args:** 284 - full_cloud_path (str): The GCS path of the file to delete. 285 """ 286 blob = self.load_blob_from_full_path(full_cloud_path) 287 blob.delete()
Delete a file from GCS.
Args:
- full_cloud_path (str): The GCS path of the file to delete.
289 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 290 """ 291 Move a file from one GCS location to another. 292 293 **Args:** 294 - src_cloud_path (str): The source GCS path. 295 - full_destination_path (str): The destination GCS path. 296 """ 297 self.copy_cloud_file(src_cloud_path, full_destination_path) 298 self.delete_cloud_file(src_cloud_path)
Move a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
300 def get_filesize(self, target_path: str) -> int: 301 """ 302 Get the size of a file in GCS. 303 304 **Args:** 305 - target_path (str): The GCS path of the file. 306 307 **Returns:** 308 - int: The size of the file in bytes. 309 """ 310 blob = self.load_blob_from_full_path(target_path) 311 return blob.size
Get the size of a file in GCS.
Args:
- target_path (str): The GCS path of the file.
Returns:
- int: The size of the file in bytes.
313 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 314 """ 315 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 316 317 **Args:** 318 - src_cloud_path (str): The source GCS path. 319 - dest_cloud_path (str): The destination GCS path. 320 321 **Returns:** 322 - bool: `True` if the files are identical, `False` otherwise. 323 """ 324 src_blob = self.load_blob_from_full_path(src_cloud_path) 325 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 326 327 # If either blob is None or does not exist 328 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 329 return False 330 # If the MD5 hashes exist 331 if src_blob.md5_hash and dest_blob.md5_hash: 332 # And are the same return True 333 if src_blob.md5_hash == dest_blob.md5_hash: 334 return True 335 else: 336 # If md5 do not exist (for larger files they may not) check size matches 337 if src_blob.size == dest_blob.size: 338 return True 339 # Otherwise, return False 340 return False
Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
Args:
- src_cloud_path (str): The source GCS path.
- dest_cloud_path (str): The destination GCS path.
Returns:
- bool:
True
if the files are identical,False
otherwise.
342 def delete_multiple_files( 343 self, 344 files_to_delete: list[str], 345 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 346 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 347 verbose: bool = False, 348 job_complete_for_logging: int = 500 349 ) -> None: 350 """ 351 Delete multiple cloud files in parallel using multi-threading. 352 353 **Args:** 354 - files_to_delete (list[str]): List of GCS paths of the files to delete. 355 - workers (int, optional): Number of worker threads. Defaults to `10`. 356 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 357 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 358 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 359 """ 360 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 361 362 MultiThreadedJobs().run_multi_threaded_job( 363 workers=workers, 364 function=self.delete_cloud_file, 365 list_of_jobs_args_list=list_of_jobs_args_list, 366 max_retries=max_retries, 367 fail_on_error=True, 368 verbose=verbose, 369 collect_output=False, 370 jobs_complete_for_logging=job_complete_for_logging 371 )
Delete multiple cloud files in parallel using multi-threading.
Args:
- files_to_delete (list[str]): List of GCS paths of the files to delete.
- workers (int, optional): Number of worker threads. Defaults to
10
. - max_retries (int, optional): Maximum number of retries. Defaults to
5
. - verbose (bool, optional): Whether to log each job's success. Defaults to
False
. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
390 def loop_and_log_validation_files_multithreaded( 391 self, 392 files_to_validate: list[dict], 393 log_difference: bool, 394 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 395 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 396 job_complete_for_logging: int = 500 397 ) -> list[dict]: 398 """ 399 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 400 401 **Args:** 402 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 403 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 404 this at the start of a copy/move operation to check if files are already copied. 405 - workers (int, optional): Number of worker threads. Defaults to `10`. 406 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 407 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 408 409 **Returns:** 410 - list[dict]: List of dictionaries containing files that are **not** identical. 411 """ 412 logging.info(f"Validating if {len(files_to_validate)} files are identical") 413 414 # Prepare jobs: pass the necessary arguments to each validation 415 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 416 417 # Use multithreaded job runner to validate the files 418 checked_files = MultiThreadedJobs().run_multi_threaded_job( 419 workers=workers, 420 function=self._validate_file_pair, 421 list_of_jobs_args_list=jobs, 422 collect_output=True, 423 max_retries=max_retries, 424 jobs_complete_for_logging=job_complete_for_logging 425 ) 426 # If any files failed to load, raise an exception 427 if files_to_validate and None in checked_files: # type: ignore[operator] 428 logging.error("Failed to validate all files, could not load some blobs") 429 raise Exception("Failed to validate all files") 430 431 # Get all files that are not identical 432 not_identical_files = [ 433 file_dict 434 for file_dict in checked_files # type: ignore[operator, union-attr] 435 if not file_dict['identical'] 436 ] 437 if not_identical_files: 438 if log_difference: 439 for file_dict in not_identical_files: 440 logging.warning( 441 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 442 ) 443 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 444 return not_identical_files
Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
Args:
- files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
- log_difference (bool): Whether to log differences if files are not identical. Set
False
if you are running this at the start of a copy/move operation to check if files are already copied. - workers (int, optional): Number of worker threads. Defaults to
10
. - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to
5
. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
Returns:
- list[dict]: List of dictionaries containing files that are not identical.
446 def multithread_copy_of_files_with_validation( 447 self, 448 files_to_copy: list[dict], 449 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 450 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 451 skip_check_if_already_copied: bool = False 452 ) -> None: 453 """ 454 Copy multiple files in parallel with validation. 455 456 **Args:** 457 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 458 Dictionary should have keys `source_file` and `full_destination_path` 459 - workers (int): Number of worker threads. Defaults to `10`. 460 - max_retries (int): Maximum number of retries. Defaults to `5` 461 - skip_check_if_already_copied (bool, optional): Whether to skip checking 462 if files are already copied and start copying right away. Defaults to `False`. 463 """ 464 if skip_check_if_already_copied: 465 logging.info("Skipping check if files are already copied") 466 updated_files_to_move = files_to_copy 467 else: 468 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 469 files_to_copy, 470 log_difference=False, 471 workers=workers, 472 max_retries=max_retries 473 ) 474 # If all files are already copied, return 475 if not updated_files_to_move: 476 logging.info("All files are already copied") 477 return None 478 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 479 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 480 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 481 # Validate that all files were copied successfully 482 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 483 files_to_copy, 484 workers=workers, 485 log_difference=True, 486 max_retries=max_retries 487 ) 488 if files_not_moved_successfully: 489 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 490 raise Exception("Failed to copy all files") 491 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 492 return None
Copy multiple files in parallel with validation.
Args:
- files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
Dictionary should have keys
source_file
andfull_destination_path
- workers (int): Number of worker threads. Defaults to
10
. - max_retries (int): Maximum number of retries. Defaults to
5
- skip_check_if_already_copied (bool, optional): Whether to skip checking
if files are already copied and start copying right away. Defaults to
False
.
494 def move_or_copy_multiple_files( 495 self, files_to_move: list[dict], 496 action: str, 497 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 498 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 499 verbose: bool = False, 500 jobs_complete_for_logging: int = 500 501 ) -> None: 502 """ 503 Move or copy multiple files in parallel. 504 505 **Args:** 506 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 507 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 508 or `ops_utils.gcp_utils.COPY`). 509 - workers (int): Number of worker threads. Defaults to `10`. 510 - max_retries (int): Maximum number of retries. Defaults to `5`. 511 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 512 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 513 514 **Raises:** 515 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 516 """ 517 if action == MOVE: 518 cloud_function = self.move_cloud_file 519 elif action == COPY: 520 cloud_function = self.copy_cloud_file 521 else: 522 raise ValueError("Must either select move or copy") 523 524 list_of_jobs_args_list = [ 525 [ 526 file_dict['source_file'], file_dict['full_destination_path'] 527 ] 528 for file_dict in files_to_move 529 ] 530 MultiThreadedJobs().run_multi_threaded_job( 531 workers=workers, 532 function=cloud_function, 533 list_of_jobs_args_list=list_of_jobs_args_list, 534 max_retries=max_retries, 535 fail_on_error=True, 536 verbose=verbose, 537 collect_output=False, 538 jobs_complete_for_logging=jobs_complete_for_logging 539 )
Move or copy multiple files in parallel.
Args:
- files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
- action (str): The action to perform (should be one of
ops_utils.gcp_utils.MOVE
orops_utils.gcp_utils.COPY
). - workers (int): Number of worker threads. Defaults to
10
. - max_retries (int): Maximum number of retries. Defaults to
5
. - verbose (bool, optional): Whether to log each job's success. Defaults to
False
. - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
Raises:
- ValueError: If the action is not one of
ops_utils.gcp_utils.MOVE
orops_utils.gcp_utils.COPY
.
541 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 542 """ 543 Read the content of a file from GCS. 544 545 **Args:** 546 - cloud_path (str): The GCS path of the file to read. 547 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 548 549 **Returns:** 550 - bytes: The content of the file as bytes. 551 """ 552 blob = self.load_blob_from_full_path(cloud_path) 553 # Download the file content as bytes 554 content_bytes = blob.download_as_bytes() 555 # Convert bytes to string 556 content_str = content_bytes.decode(encoding) 557 return content_str
Read the content of a file from GCS.
Args:
- cloud_path (str): The GCS path of the file to read.
- encoding (str, optional): The encoding to use. Defaults to
utf-8
.
Returns:
- bytes: The content of the file as bytes.
559 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 560 """ 561 Upload a file to GCS. 562 563 **Args:** 564 - destination_path (str): The destination GCS path. 565 - source_file (str): The source file path. 566 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 567 """ 568 blob = self.load_blob_from_full_path(destination_path) 569 if custom_metadata: 570 blob.metadata = custom_metadata 571 blob.upload_from_filename(source_file)
Upload a file to GCS.
Args:
- destination_path (str): The destination GCS path.
- source_file (str): The source file path.
- custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
573 def get_object_md5( 574 self, 575 file_path: str, 576 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 577 chunk_size: int = parse_size("256 KB"), 578 logging_bytes: int = parse_size("1 GB"), 579 returned_md5_format: str = "hex" 580 ) -> str: 581 """ 582 Calculate the MD5 checksum of a file in GCS. 583 584 **Args:** 585 - file_path (str): The GCS path of the file. 586 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 587 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 588 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 589 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 590 591 **Returns:** 592 - str: The MD5 checksum of the file. 593 594 **Raises:** 595 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 596 or `ops_utils.gcp_utils.MD5_BASE64` 597 """ 598 if returned_md5_format not in ["hex", "base64"]: 599 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 600 601 blob = self.load_blob_from_full_path(file_path) 602 603 # Create an MD5 hash object 604 md5_hash = hashlib.md5() 605 606 blob_size_str = format_size(blob.size) 607 logging.info(f"Streaming {file_path} which is {blob_size_str}") 608 # Use a BytesIO stream to collect data in chunks and upload it 609 buffer = io.BytesIO() 610 total_bytes_streamed = 0 611 # Keep track of the last logged size for data logging 612 last_logged = 0 613 614 with blob.open("rb") as source_stream: 615 while True: 616 chunk = source_stream.read(chunk_size) 617 if not chunk: 618 break 619 md5_hash.update(chunk) 620 buffer.write(chunk) 621 total_bytes_streamed += len(chunk) 622 # Log progress every 1 gb if verbose used 623 if total_bytes_streamed - last_logged >= logging_bytes: 624 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 625 last_logged = total_bytes_streamed 626 627 if returned_md5_format == "hex": 628 md5 = md5_hash.hexdigest() 629 logging.info(f"MD5 (hex) for {file_path}: {md5}") 630 elif returned_md5_format == "base64": 631 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 632 logging.info(f"MD5 (base64) for {file_path}: {md5}") 633 return md5
Calculate the MD5 checksum of a file in GCS.
Args:
- file_path (str): The GCS path of the file.
- chunk_size (int, optional): The size of each chunk to read. Defaults to
256 KB
. - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to
1 GB
. - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to
hex
. Options areops_utils.gcp_utils.MD5_HEX
orops_utils.gcp_utils.MD5_BASE64
.
Returns:
- str: The MD5 checksum of the file.
Raises:
- ValueError: If the
returned_md5_format
is not one ofops_utils.gcp_utils.MD5_HEX
orops_utils.gcp_utils.MD5_BASE64
635 def set_acl_public_read(self, cloud_path: str) -> None: 636 """ 637 Set the file in the bucket to be publicly readable. 638 639 **Args:** 640 - cloud_path (str): The GCS path of the file to be set as public readable. 641 """ 642 blob = self.load_blob_from_full_path(cloud_path) 643 blob.acl.all().grant_read() 644 blob.acl.save()
Set the file in the bucket to be publicly readable.
Args:
- cloud_path (str): The GCS path of the file to be set as public readable.
646 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 647 """ 648 Set the file in the bucket to grant OWNER permission to a specific group. 649 650 **Args:** 651 - cloud_path (str): The GCS path of the file. 652 - group_email (str): The email of the group to grant OWNER permission 653 """ 654 blob = self.load_blob_from_full_path(cloud_path) 655 blob.acl.group(group_email).grant_owner() 656 blob.acl.save()
Set the file in the bucket to grant OWNER permission to a specific group.
Args:
- cloud_path (str): The GCS path of the file.
- group_email (str): The email of the group to grant OWNER permission
658 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 659 """ 660 Set Cache-Control metadata for a file. 661 662 **Args:** 663 - cloud_path (str): The GCS path of the file. 664 - cache_control (str): The Cache-Control metadata to set. 665 """ 666 blob = self.load_blob_from_full_path(cloud_path) 667 blob.cache_control = cache_control 668 blob.patch()
Set Cache-Control metadata for a file.
Args:
- cloud_path (str): The GCS path of the file.
- cache_control (str): The Cache-Control metadata to set.
670 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 671 """ 672 Get the most recent blob in the bucket. 673 674 If the blob with the most recent timestamp doesn't have 675 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 676 recent file until a log with useful information is encountered. This is useful when combing through 677 GCP activity logs for Terra workspace buckets. 678 679 **Args:** 680 - bucket_name (str): The GCS bucket name. 681 682 **Returns:** 683 - Optional tuple of the blob found and the file contents from the blob 684 """ 685 blobs = sorted( 686 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 687 ) 688 for blob in blobs: 689 # Download the file contents as a string 690 file_contents = blob.download_as_text() 691 692 # Check if the content matches the undesired format 693 lines = file_contents.splitlines() 694 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 695 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 696 continue 697 698 # If it doesn't match the undesired format, return its content 699 logging.info(f"Found valid file: {blob.name}") 700 return blob, file_contents 701 702 logging.info("No valid files found.") 703 return None
Get the most recent blob in the bucket.
If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.
Args:
- bucket_name (str): The GCS bucket name.
Returns:
- Optional tuple of the blob found and the file contents from the blob
705 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 706 """ 707 Write content to a file in GCS. 708 709 **Args:** 710 - cloud_path (str): The GCS path of the file to write. 711 - file_contents (str): The content to write. 712 """ 713 blob = self.load_blob_from_full_path(cloud_path) 714 blob.upload_from_string(file_contents) 715 logging.info(f"Successfully wrote content to {cloud_path}")
Write content to a file in GCS.
Args:
- cloud_path (str): The GCS path of the file to write.
- file_contents (str): The content to write.
717 @staticmethod 718 def get_active_gcloud_account() -> str: 719 """ 720 Get the active GCP email for the current account. 721 722 **Returns:** 723 - str: The active GCP account email. 724 """ 725 result = subprocess.run( 726 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 727 capture_output=True, 728 text=True, 729 check=True 730 ) 731 return result.stdout.strip()
Get the active GCP email for the current account.
Returns:
- str: The active GCP account email.