ops_utils.gcp_utils
Module for GCP utilities.
1"""Module for GCP utilities.""" 2import os 3import logging 4import io 5import hashlib 6import base64 7import subprocess 8 9from humanfriendly import format_size, parse_size 10from mimetypes import guess_type 11from typing import Optional, Any 12from google.cloud.storage.blob import Blob 13 14from .vars import ARG_DEFAULTS 15from .thread_pool_executor_util import MultiThreadedJobs 16 17MOVE = "move" 18"""Variable to be used for the "action" when files should be moved.""" 19COPY = "copy" 20"""Variable to be used for the "action" when files should be copied.""" 21MD5_HEX = "hex" 22"""Variable to be used when the generated `md5` should be of the `hex` type.""" 23MD5_BASE64 = "base64" 24"""Variable to be used when the generated `md5` should be of the `base64` type.""" 25 26 27class GCPCloudFunctions: 28 """Class to handle GCP Cloud Functions.""" 29 30 def __init__(self, project: Optional[str] = None) -> None: 31 """ 32 Initialize the GCPCloudFunctions class. 33 34 Authenticates using the default credentials and sets up the Storage Client. 35 Uses the `project_id` if provided, otherwise utilizes the default project set. 36 37 **Args:** 38 - project (str, optional): The GCP project ID 39 """ 40 from google.cloud import storage # type: ignore[attr-defined] 41 from google.auth import default 42 credentials, default_project = default() 43 if not project: 44 project = default_project 45 self.client = storage.Client(credentials=credentials, project=project) 46 """@private""" 47 48 @staticmethod 49 def _process_cloud_path(cloud_path: str) -> dict: 50 """ 51 Process a GCS cloud path into its components. 52 53 Args: 54 cloud_path (str): The GCS cloud path. 55 56 Returns: 57 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 58 """ 59 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 60 bucket_name = str.split(remaining_url, sep="/")[0] 61 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 62 path_components = { 63 "platform_prefix": platform_prefix, 64 "bucket": bucket_name, 65 "blob_url": blob_name 66 } 67 return path_components 68 69 def load_blob_from_full_path(self, full_path: str) -> Blob: 70 """ 71 Load a GCS blob object from a full GCS path. 72 73 **Args:** 74 - full_path (str): The full GCS path. 75 76 **Returns:** 77 - google.cloud.storage.blob.Blob: The GCS blob object. 78 """ 79 file_path_components = self._process_cloud_path(full_path) 80 81 # Specify the billing project 82 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 83 blob = bucket.blob(file_path_components["blob_url"]) 84 85 # If blob exists in GCS reload it so metadata is there 86 if blob.exists(): 87 blob.reload() 88 return blob 89 90 def check_file_exists(self, full_path: str) -> bool: 91 """ 92 Check if a file exists in GCS. 93 94 **Args:** 95 - full_path (str): The full GCS path. 96 97 **Returns:** 98 - bool: `True` if the file exists, `False` otherwise. 99 """ 100 blob = self.load_blob_from_full_path(full_path) 101 return blob.exists() 102 103 @staticmethod 104 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 105 """ 106 Create a dictionary containing file information. 107 108 Args: 109 bucket_name (str): The name of the GCS bucket. 110 blob (Any): The GCS blob object. 111 file_name_only (bool): Whether to return only the file list. 112 113 Returns: 114 dict: A dictionary containing file information. 115 """ 116 if file_name_only: 117 return { 118 "path": f"gs://{bucket_name}/{blob.name}" 119 } 120 return { 121 "name": os.path.basename(blob.name), 122 "path": f"gs://{bucket_name}/{blob.name}", 123 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 124 "file_extension": os.path.splitext(blob.name)[1], 125 "size_in_bytes": blob.size, 126 "md5_hash": blob.md5_hash 127 } 128 129 @staticmethod 130 def _validate_include_blob( 131 blob: Any, 132 bucket_name: str, 133 file_extensions_to_ignore: list[str] = [], 134 file_strings_to_ignore: list[str] = [], 135 file_extensions_to_include: list[str] = [], 136 verbose: bool = False 137 ) -> bool: 138 """ 139 Validate if a blob should be included based on its file extension. 140 141 Args: 142 file_extensions_to_include (list[str]): List of file extensions to include. 143 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 144 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 145 blob (Any): The GCS blob object. 146 verbose (bool): Whether to log files not being included. 147 148 Returns: 149 bool: True if the blob should be included, False otherwise. 150 """ 151 file_path = f"gs://{bucket_name}/{blob.name}" 152 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 153 if verbose: 154 logging.info(f"Skipping {file_path} as it has an extension to ignore") 155 return False 156 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 157 if verbose: 158 logging.info(f"Skipping {file_path} as it does not have an extension to include") 159 return False 160 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 161 if verbose: 162 logging.info(f"Skipping {file_path} as it has a string to ignore") 163 return False 164 return True 165 166 def list_bucket_contents( 167 self, 168 bucket_name: str, 169 prefix: Optional[str] = None, 170 file_extensions_to_ignore: list[str] = [], 171 file_strings_to_ignore: list[str] = [], 172 file_extensions_to_include: list[str] = [], 173 file_name_only: bool = False 174 ) -> list[dict]: 175 """ 176 List contents of a GCS bucket and return a list of dictionaries with file information. 177 178 **Args:** 179 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 180 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 181 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 182 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 183 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 184 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 185 186 **Returns:** 187 - list[dict]: A list of dictionaries containing file information. 188 """ 189 # If the bucket name starts with gs://, remove it 190 if bucket_name.startswith("gs://"): 191 bucket_name = bucket_name.split("/")[2].strip() 192 193 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 194 195 # Get the bucket object and set user_project for Requester Pays 196 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 197 198 # List blobs within the bucket 199 blobs = bucket.list_blobs(prefix=prefix) 200 logging.info("Finished listing blobs. Processing files now.") 201 202 # Create a list of dictionaries containing file information 203 file_list = [ 204 self._create_bucket_contents_dict( 205 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 206 ) 207 for blob in blobs 208 if self._validate_include_blob( 209 blob=blob, 210 file_extensions_to_ignore=file_extensions_to_ignore, 211 file_strings_to_ignore=file_strings_to_ignore, 212 file_extensions_to_include=file_extensions_to_include, 213 bucket_name=bucket_name 214 ) and not blob.name.endswith("/") 215 ] 216 logging.info(f"Found {len(file_list)} files in bucket") 217 return file_list 218 219 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 220 """ 221 Copy a file from one GCS location to another. 222 223 **Args:** 224 - src_cloud_path (str): The source GCS path. 225 - full_destination_path (str): The destination GCS path. 226 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 227 """ 228 try: 229 src_blob = self.load_blob_from_full_path(src_cloud_path) 230 dest_blob = self.load_blob_from_full_path(full_destination_path) 231 232 # Use rewrite so no timeouts 233 rewrite_token = False 234 235 while True: 236 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 237 src_blob, token=rewrite_token 238 ) 239 if verbose: 240 logging.info( 241 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 242 ) 243 if not rewrite_token: 244 break 245 246 except Exception as e: 247 logging.error( 248 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 249 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 250 ) 251 raise 252 253 def delete_cloud_file(self, full_cloud_path: str) -> None: 254 """ 255 Delete a file from GCS. 256 257 **Args:** 258 - full_cloud_path (str): The GCS path of the file to delete. 259 """ 260 blob = self.load_blob_from_full_path(full_cloud_path) 261 blob.delete() 262 263 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 264 """ 265 Move a file from one GCS location to another. 266 267 **Args:** 268 - src_cloud_path (str): The source GCS path. 269 - full_destination_path (str): The destination GCS path. 270 """ 271 self.copy_cloud_file(src_cloud_path, full_destination_path) 272 self.delete_cloud_file(src_cloud_path) 273 274 def get_filesize(self, target_path: str) -> int: 275 """ 276 Get the size of a file in GCS. 277 278 **Args:** 279 - target_path (str): The GCS path of the file. 280 281 **Returns:** 282 - int: The size of the file in bytes. 283 """ 284 blob = self.load_blob_from_full_path(target_path) 285 return blob.size 286 287 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 288 """ 289 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 290 291 **Args:** 292 - src_cloud_path (str): The source GCS path. 293 - dest_cloud_path (str): The destination GCS path. 294 295 **Returns:** 296 - bool: `True` if the files are identical, `False` otherwise. 297 """ 298 src_blob = self.load_blob_from_full_path(src_cloud_path) 299 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 300 301 # If either blob is None or does not exist 302 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 303 return False 304 # If the MD5 hashes exist 305 if src_blob.md5_hash and dest_blob.md5_hash: 306 # And are the same return True 307 if src_blob.md5_hash == dest_blob.md5_hash: 308 return True 309 else: 310 # If md5 do not exist (for larger files they may not) check size matches 311 if src_blob.size == dest_blob.size: 312 return True 313 # Otherwise, return False 314 return False 315 316 def delete_multiple_files( 317 self, 318 files_to_delete: list[str], 319 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 320 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 321 verbose: bool = False, 322 job_complete_for_logging: int = 500 323 ) -> None: 324 """ 325 Delete multiple cloud files in parallel using multi-threading. 326 327 **Args:** 328 - files_to_delete (list[str]): List of GCS paths of the files to delete. 329 - workers (int, optional): Number of worker threads. Defaults to `10`. 330 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 331 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 332 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 333 """ 334 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 335 336 MultiThreadedJobs().run_multi_threaded_job( 337 workers=workers, 338 function=self.delete_cloud_file, 339 list_of_jobs_args_list=list_of_jobs_args_list, 340 max_retries=max_retries, 341 fail_on_error=True, 342 verbose=verbose, 343 collect_output=False, 344 jobs_complete_for_logging=job_complete_for_logging 345 ) 346 347 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 348 """ 349 Validate if source and destination files are identical. 350 351 **Args:** 352 - source_file (str): The source file path. 353 - full_destination_path (str): The destination file path. 354 355 **Returns:** 356 dict: The file dictionary of the files with a boolean indicating if they are identical. 357 """ 358 if self.validate_files_are_same(source_file, full_destination_path): 359 identical = True 360 else: 361 identical = False 362 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 363 364 def loop_and_log_validation_files_multithreaded( 365 self, 366 files_to_validate: list[dict], 367 log_difference: bool, 368 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 369 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 370 job_complete_for_logging: int = 500 371 ) -> list[dict]: 372 """ 373 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 374 375 **Args:** 376 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 377 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 378 this at the start of a copy/move operation to check if files are already copied. 379 - workers (int, optional): Number of worker threads. Defaults to `10`. 380 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 381 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 382 383 **Returns:** 384 - list[dict]: List of dictionaries containing files that are **not** identical. 385 """ 386 logging.info(f"Validating if {len(files_to_validate)} files are identical") 387 388 # Prepare jobs: pass the necessary arguments to each validation 389 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 390 391 # Use multithreaded job runner to validate the files 392 checked_files = MultiThreadedJobs().run_multi_threaded_job( 393 workers=workers, 394 function=self._validate_file_pair, 395 list_of_jobs_args_list=jobs, 396 collect_output=True, 397 max_retries=max_retries, 398 jobs_complete_for_logging=job_complete_for_logging 399 ) 400 # If any files failed to load, raise an exception 401 if files_to_validate and None in checked_files: # type: ignore[operator] 402 logging.error("Failed to validate all files, could not load some blobs") 403 raise Exception("Failed to validate all files") 404 405 # Get all files that are not identical 406 not_identical_files = [ 407 file_dict 408 for file_dict in checked_files # type: ignore[operator, union-attr] 409 if not file_dict['identical'] 410 ] 411 if not_identical_files: 412 if log_difference: 413 for file_dict in not_identical_files: 414 logging.warning( 415 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 416 ) 417 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 418 return not_identical_files 419 420 def multithread_copy_of_files_with_validation( 421 self, 422 files_to_copy: list[dict], 423 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 424 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 425 skip_check_if_already_copied: bool = False 426 ) -> None: 427 """ 428 Copy multiple files in parallel with validation. 429 430 **Args:** 431 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 432 Dictionary should have keys `source_file` and `full_destination_path` 433 - workers (int): Number of worker threads. Defaults to `10`. 434 - max_retries (int): Maximum number of retries. Defaults to `5` 435 - skip_check_if_already_copied (bool, optional): Whether to skip checking 436 if files are already copied and start copying right away. Defaults to `False`. 437 """ 438 if skip_check_if_already_copied: 439 logging.info("Skipping check if files are already copied") 440 updated_files_to_move = files_to_copy 441 else: 442 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 443 files_to_copy, 444 log_difference=False, 445 workers=workers, 446 max_retries=max_retries 447 ) 448 # If all files are already copied, return 449 if not updated_files_to_move: 450 logging.info("All files are already copied") 451 return None 452 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 453 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 454 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 455 # Validate that all files were copied successfully 456 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 457 files_to_copy, 458 workers=workers, 459 log_difference=True, 460 max_retries=max_retries 461 ) 462 if files_not_moved_successfully: 463 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 464 raise Exception("Failed to copy all files") 465 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 466 return None 467 468 def move_or_copy_multiple_files( 469 self, files_to_move: list[dict], 470 action: str, 471 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 472 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 473 verbose: bool = False, 474 jobs_complete_for_logging: int = 500 475 ) -> None: 476 """ 477 Move or copy multiple files in parallel. 478 479 **Args:** 480 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 481 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 482 or `ops_utils.gcp_utils.COPY`). 483 - workers (int): Number of worker threads. Defaults to `10`. 484 - max_retries (int): Maximum number of retries. Defaults to `5`. 485 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 486 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 487 488 **Raises:** 489 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 490 """ 491 if action == MOVE: 492 cloud_function = self.move_cloud_file 493 elif action == COPY: 494 cloud_function = self.copy_cloud_file 495 else: 496 raise ValueError("Must either select move or copy") 497 498 list_of_jobs_args_list = [ 499 [ 500 file_dict['source_file'], file_dict['full_destination_path'] 501 ] 502 for file_dict in files_to_move 503 ] 504 MultiThreadedJobs().run_multi_threaded_job( 505 workers=workers, 506 function=cloud_function, 507 list_of_jobs_args_list=list_of_jobs_args_list, 508 max_retries=max_retries, 509 fail_on_error=True, 510 verbose=verbose, 511 collect_output=False, 512 jobs_complete_for_logging=jobs_complete_for_logging 513 ) 514 515 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 516 """ 517 Read the content of a file from GCS. 518 519 **Args:** 520 - cloud_path (str): The GCS path of the file to read. 521 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 522 523 **Returns:** 524 - bytes: The content of the file as bytes. 525 """ 526 blob = self.load_blob_from_full_path(cloud_path) 527 # Download the file content as bytes 528 content_bytes = blob.download_as_bytes() 529 # Convert bytes to string 530 content_str = content_bytes.decode(encoding) 531 return content_str 532 533 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 534 """ 535 Upload a file to GCS. 536 537 **Args:** 538 - destination_path (str): The destination GCS path. 539 - source_file (str): The source file path. 540 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 541 """ 542 blob = self.load_blob_from_full_path(destination_path) 543 if custom_metadata: 544 blob.metadata = custom_metadata 545 blob.upload_from_filename(source_file) 546 547 def get_object_md5( 548 self, 549 file_path: str, 550 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 551 chunk_size: int = parse_size("256 KB"), 552 logging_bytes: int = parse_size("1 GB"), 553 returned_md5_format: str = "hex" 554 ) -> str: 555 """ 556 Calculate the MD5 checksum of a file in GCS. 557 558 **Args:** 559 - file_path (str): The GCS path of the file. 560 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 561 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 562 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 563 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 564 565 **Returns:** 566 - str: The MD5 checksum of the file. 567 568 **Raises:** 569 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 570 or `ops_utils.gcp_utils.MD5_BASE64` 571 """ 572 if returned_md5_format not in ["hex", "base64"]: 573 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 574 575 blob = self.load_blob_from_full_path(file_path) 576 577 # Create an MD5 hash object 578 md5_hash = hashlib.md5() 579 580 blob_size_str = format_size(blob.size) 581 logging.info(f"Streaming {file_path} which is {blob_size_str}") 582 # Use a BytesIO stream to collect data in chunks and upload it 583 buffer = io.BytesIO() 584 total_bytes_streamed = 0 585 # Keep track of the last logged size for data logging 586 last_logged = 0 587 588 with blob.open("rb") as source_stream: 589 while True: 590 chunk = source_stream.read(chunk_size) 591 if not chunk: 592 break 593 md5_hash.update(chunk) 594 buffer.write(chunk) 595 total_bytes_streamed += len(chunk) 596 # Log progress every 1 gb if verbose used 597 if total_bytes_streamed - last_logged >= logging_bytes: 598 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 599 last_logged = total_bytes_streamed 600 601 if returned_md5_format == "hex": 602 md5 = md5_hash.hexdigest() 603 logging.info(f"MD5 (hex) for {file_path}: {md5}") 604 elif returned_md5_format == "base64": 605 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 606 logging.info(f"MD5 (base64) for {file_path}: {md5}") 607 return md5 608 609 def set_acl_public_read(self, cloud_path: str) -> None: 610 """ 611 Set the file in the bucket to be publicly readable. 612 613 **Args:** 614 - cloud_path (str): The GCS path of the file to be set as public readable. 615 """ 616 blob = self.load_blob_from_full_path(cloud_path) 617 blob.acl.all().grant_read() 618 blob.acl.save() 619 620 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 621 """ 622 Set the file in the bucket to grant OWNER permission to a specific group. 623 624 **Args:** 625 - cloud_path (str): The GCS path of the file. 626 - group_email (str): The email of the group to grant OWNER permission 627 """ 628 blob = self.load_blob_from_full_path(cloud_path) 629 blob.acl.group(group_email).grant_owner() 630 blob.acl.save() 631 632 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 633 """ 634 Set Cache-Control metadata for a file. 635 636 **Args:** 637 - cloud_path (str): The GCS path of the file. 638 - cache_control (str): The Cache-Control metadata to set. 639 """ 640 blob = self.load_blob_from_full_path(cloud_path) 641 blob.cache_control = cache_control 642 blob.patch() 643 644 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 645 """ 646 Get the most recent blob in the bucket. 647 648 If the blob with the most recent timestamp doesn't have 649 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 650 recent file until a log with useful information is encountered. This is useful when combing through 651 GCP activity logs for Terra workspace buckets. 652 653 **Args:** 654 - bucket_name (str): The GCS bucket name. 655 656 **Returns:** 657 - Optional tuple of the blob found and the file contents from the blob 658 """ 659 blobs = sorted( 660 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 661 ) 662 for blob in blobs: 663 # Download the file contents as a string 664 file_contents = blob.download_as_text() 665 666 # Check if the content matches the undesired format 667 lines = file_contents.splitlines() 668 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 669 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 670 continue 671 672 # If it doesn't match the undesired format, return its content 673 logging.info(f"Found valid file: {blob.name}") 674 return blob, file_contents 675 676 logging.info("No valid files found.") 677 return None 678 679 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 680 """ 681 Write content to a file in GCS. 682 683 **Args:** 684 - cloud_path (str): The GCS path of the file to write. 685 - file_contents (str): The content to write. 686 """ 687 blob = self.load_blob_from_full_path(cloud_path) 688 blob.upload_from_string(file_contents) 689 logging.info(f"Successfully wrote content to {cloud_path}") 690 691 @staticmethod 692 def get_active_gcloud_account() -> str: 693 """ 694 Get the active GCP email for the current account. 695 696 **Returns:** 697 - str: The active GCP account email. 698 """ 699 result = subprocess.run( 700 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 701 capture_output=True, 702 text=True, 703 check=True 704 ) 705 return result.stdout.strip() 706 707
Variable to be used for the "action" when files should be moved.
Variable to be used for the "action" when files should be copied.
Variable to be used when the generated md5
should be of the hex
type.
Variable to be used when the generated md5
should be of the base64
type.
28class GCPCloudFunctions: 29 """Class to handle GCP Cloud Functions.""" 30 31 def __init__(self, project: Optional[str] = None) -> None: 32 """ 33 Initialize the GCPCloudFunctions class. 34 35 Authenticates using the default credentials and sets up the Storage Client. 36 Uses the `project_id` if provided, otherwise utilizes the default project set. 37 38 **Args:** 39 - project (str, optional): The GCP project ID 40 """ 41 from google.cloud import storage # type: ignore[attr-defined] 42 from google.auth import default 43 credentials, default_project = default() 44 if not project: 45 project = default_project 46 self.client = storage.Client(credentials=credentials, project=project) 47 """@private""" 48 49 @staticmethod 50 def _process_cloud_path(cloud_path: str) -> dict: 51 """ 52 Process a GCS cloud path into its components. 53 54 Args: 55 cloud_path (str): The GCS cloud path. 56 57 Returns: 58 dict: A dictionary containing the platform prefix, bucket name, and blob URL. 59 """ 60 platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1) 61 bucket_name = str.split(remaining_url, sep="/")[0] 62 blob_name = "/".join(str.split(remaining_url, sep="/")[1:]) 63 path_components = { 64 "platform_prefix": platform_prefix, 65 "bucket": bucket_name, 66 "blob_url": blob_name 67 } 68 return path_components 69 70 def load_blob_from_full_path(self, full_path: str) -> Blob: 71 """ 72 Load a GCS blob object from a full GCS path. 73 74 **Args:** 75 - full_path (str): The full GCS path. 76 77 **Returns:** 78 - google.cloud.storage.blob.Blob: The GCS blob object. 79 """ 80 file_path_components = self._process_cloud_path(full_path) 81 82 # Specify the billing project 83 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 84 blob = bucket.blob(file_path_components["blob_url"]) 85 86 # If blob exists in GCS reload it so metadata is there 87 if blob.exists(): 88 blob.reload() 89 return blob 90 91 def check_file_exists(self, full_path: str) -> bool: 92 """ 93 Check if a file exists in GCS. 94 95 **Args:** 96 - full_path (str): The full GCS path. 97 98 **Returns:** 99 - bool: `True` if the file exists, `False` otherwise. 100 """ 101 blob = self.load_blob_from_full_path(full_path) 102 return blob.exists() 103 104 @staticmethod 105 def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict: 106 """ 107 Create a dictionary containing file information. 108 109 Args: 110 bucket_name (str): The name of the GCS bucket. 111 blob (Any): The GCS blob object. 112 file_name_only (bool): Whether to return only the file list. 113 114 Returns: 115 dict: A dictionary containing file information. 116 """ 117 if file_name_only: 118 return { 119 "path": f"gs://{bucket_name}/{blob.name}" 120 } 121 return { 122 "name": os.path.basename(blob.name), 123 "path": f"gs://{bucket_name}/{blob.name}", 124 "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream", 125 "file_extension": os.path.splitext(blob.name)[1], 126 "size_in_bytes": blob.size, 127 "md5_hash": blob.md5_hash 128 } 129 130 @staticmethod 131 def _validate_include_blob( 132 blob: Any, 133 bucket_name: str, 134 file_extensions_to_ignore: list[str] = [], 135 file_strings_to_ignore: list[str] = [], 136 file_extensions_to_include: list[str] = [], 137 verbose: bool = False 138 ) -> bool: 139 """ 140 Validate if a blob should be included based on its file extension. 141 142 Args: 143 file_extensions_to_include (list[str]): List of file extensions to include. 144 file_extensions_to_ignore (list[str]): List of file extensions to ignore. 145 file_strings_to_ignore (list[str]): List of file name substrings to ignore. 146 blob (Any): The GCS blob object. 147 verbose (bool): Whether to log files not being included. 148 149 Returns: 150 bool: True if the blob should be included, False otherwise. 151 """ 152 file_path = f"gs://{bucket_name}/{blob.name}" 153 if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)): 154 if verbose: 155 logging.info(f"Skipping {file_path} as it has an extension to ignore") 156 return False 157 if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)): 158 if verbose: 159 logging.info(f"Skipping {file_path} as it does not have an extension to include") 160 return False 161 if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore): 162 if verbose: 163 logging.info(f"Skipping {file_path} as it has a string to ignore") 164 return False 165 return True 166 167 def list_bucket_contents( 168 self, 169 bucket_name: str, 170 prefix: Optional[str] = None, 171 file_extensions_to_ignore: list[str] = [], 172 file_strings_to_ignore: list[str] = [], 173 file_extensions_to_include: list[str] = [], 174 file_name_only: bool = False 175 ) -> list[dict]: 176 """ 177 List contents of a GCS bucket and return a list of dictionaries with file information. 178 179 **Args:** 180 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 181 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 182 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 183 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 184 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 185 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 186 187 **Returns:** 188 - list[dict]: A list of dictionaries containing file information. 189 """ 190 # If the bucket name starts with gs://, remove it 191 if bucket_name.startswith("gs://"): 192 bucket_name = bucket_name.split("/")[2].strip() 193 194 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 195 196 # Get the bucket object and set user_project for Requester Pays 197 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 198 199 # List blobs within the bucket 200 blobs = bucket.list_blobs(prefix=prefix) 201 logging.info("Finished listing blobs. Processing files now.") 202 203 # Create a list of dictionaries containing file information 204 file_list = [ 205 self._create_bucket_contents_dict( 206 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 207 ) 208 for blob in blobs 209 if self._validate_include_blob( 210 blob=blob, 211 file_extensions_to_ignore=file_extensions_to_ignore, 212 file_strings_to_ignore=file_strings_to_ignore, 213 file_extensions_to_include=file_extensions_to_include, 214 bucket_name=bucket_name 215 ) and not blob.name.endswith("/") 216 ] 217 logging.info(f"Found {len(file_list)} files in bucket") 218 return file_list 219 220 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 221 """ 222 Copy a file from one GCS location to another. 223 224 **Args:** 225 - src_cloud_path (str): The source GCS path. 226 - full_destination_path (str): The destination GCS path. 227 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 228 """ 229 try: 230 src_blob = self.load_blob_from_full_path(src_cloud_path) 231 dest_blob = self.load_blob_from_full_path(full_destination_path) 232 233 # Use rewrite so no timeouts 234 rewrite_token = False 235 236 while True: 237 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 238 src_blob, token=rewrite_token 239 ) 240 if verbose: 241 logging.info( 242 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 243 ) 244 if not rewrite_token: 245 break 246 247 except Exception as e: 248 logging.error( 249 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 250 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 251 ) 252 raise 253 254 def delete_cloud_file(self, full_cloud_path: str) -> None: 255 """ 256 Delete a file from GCS. 257 258 **Args:** 259 - full_cloud_path (str): The GCS path of the file to delete. 260 """ 261 blob = self.load_blob_from_full_path(full_cloud_path) 262 blob.delete() 263 264 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 265 """ 266 Move a file from one GCS location to another. 267 268 **Args:** 269 - src_cloud_path (str): The source GCS path. 270 - full_destination_path (str): The destination GCS path. 271 """ 272 self.copy_cloud_file(src_cloud_path, full_destination_path) 273 self.delete_cloud_file(src_cloud_path) 274 275 def get_filesize(self, target_path: str) -> int: 276 """ 277 Get the size of a file in GCS. 278 279 **Args:** 280 - target_path (str): The GCS path of the file. 281 282 **Returns:** 283 - int: The size of the file in bytes. 284 """ 285 blob = self.load_blob_from_full_path(target_path) 286 return blob.size 287 288 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 289 """ 290 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 291 292 **Args:** 293 - src_cloud_path (str): The source GCS path. 294 - dest_cloud_path (str): The destination GCS path. 295 296 **Returns:** 297 - bool: `True` if the files are identical, `False` otherwise. 298 """ 299 src_blob = self.load_blob_from_full_path(src_cloud_path) 300 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 301 302 # If either blob is None or does not exist 303 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 304 return False 305 # If the MD5 hashes exist 306 if src_blob.md5_hash and dest_blob.md5_hash: 307 # And are the same return True 308 if src_blob.md5_hash == dest_blob.md5_hash: 309 return True 310 else: 311 # If md5 do not exist (for larger files they may not) check size matches 312 if src_blob.size == dest_blob.size: 313 return True 314 # Otherwise, return False 315 return False 316 317 def delete_multiple_files( 318 self, 319 files_to_delete: list[str], 320 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 321 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 322 verbose: bool = False, 323 job_complete_for_logging: int = 500 324 ) -> None: 325 """ 326 Delete multiple cloud files in parallel using multi-threading. 327 328 **Args:** 329 - files_to_delete (list[str]): List of GCS paths of the files to delete. 330 - workers (int, optional): Number of worker threads. Defaults to `10`. 331 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 332 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 333 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 334 """ 335 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 336 337 MultiThreadedJobs().run_multi_threaded_job( 338 workers=workers, 339 function=self.delete_cloud_file, 340 list_of_jobs_args_list=list_of_jobs_args_list, 341 max_retries=max_retries, 342 fail_on_error=True, 343 verbose=verbose, 344 collect_output=False, 345 jobs_complete_for_logging=job_complete_for_logging 346 ) 347 348 def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict: 349 """ 350 Validate if source and destination files are identical. 351 352 **Args:** 353 - source_file (str): The source file path. 354 - full_destination_path (str): The destination file path. 355 356 **Returns:** 357 dict: The file dictionary of the files with a boolean indicating if they are identical. 358 """ 359 if self.validate_files_are_same(source_file, full_destination_path): 360 identical = True 361 else: 362 identical = False 363 return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical} 364 365 def loop_and_log_validation_files_multithreaded( 366 self, 367 files_to_validate: list[dict], 368 log_difference: bool, 369 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 370 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 371 job_complete_for_logging: int = 500 372 ) -> list[dict]: 373 """ 374 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 375 376 **Args:** 377 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 378 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 379 this at the start of a copy/move operation to check if files are already copied. 380 - workers (int, optional): Number of worker threads. Defaults to `10`. 381 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 382 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 383 384 **Returns:** 385 - list[dict]: List of dictionaries containing files that are **not** identical. 386 """ 387 logging.info(f"Validating if {len(files_to_validate)} files are identical") 388 389 # Prepare jobs: pass the necessary arguments to each validation 390 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 391 392 # Use multithreaded job runner to validate the files 393 checked_files = MultiThreadedJobs().run_multi_threaded_job( 394 workers=workers, 395 function=self._validate_file_pair, 396 list_of_jobs_args_list=jobs, 397 collect_output=True, 398 max_retries=max_retries, 399 jobs_complete_for_logging=job_complete_for_logging 400 ) 401 # If any files failed to load, raise an exception 402 if files_to_validate and None in checked_files: # type: ignore[operator] 403 logging.error("Failed to validate all files, could not load some blobs") 404 raise Exception("Failed to validate all files") 405 406 # Get all files that are not identical 407 not_identical_files = [ 408 file_dict 409 for file_dict in checked_files # type: ignore[operator, union-attr] 410 if not file_dict['identical'] 411 ] 412 if not_identical_files: 413 if log_difference: 414 for file_dict in not_identical_files: 415 logging.warning( 416 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 417 ) 418 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 419 return not_identical_files 420 421 def multithread_copy_of_files_with_validation( 422 self, 423 files_to_copy: list[dict], 424 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 425 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 426 skip_check_if_already_copied: bool = False 427 ) -> None: 428 """ 429 Copy multiple files in parallel with validation. 430 431 **Args:** 432 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 433 Dictionary should have keys `source_file` and `full_destination_path` 434 - workers (int): Number of worker threads. Defaults to `10`. 435 - max_retries (int): Maximum number of retries. Defaults to `5` 436 - skip_check_if_already_copied (bool, optional): Whether to skip checking 437 if files are already copied and start copying right away. Defaults to `False`. 438 """ 439 if skip_check_if_already_copied: 440 logging.info("Skipping check if files are already copied") 441 updated_files_to_move = files_to_copy 442 else: 443 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 444 files_to_copy, 445 log_difference=False, 446 workers=workers, 447 max_retries=max_retries 448 ) 449 # If all files are already copied, return 450 if not updated_files_to_move: 451 logging.info("All files are already copied") 452 return None 453 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 454 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 455 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 456 # Validate that all files were copied successfully 457 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 458 files_to_copy, 459 workers=workers, 460 log_difference=True, 461 max_retries=max_retries 462 ) 463 if files_not_moved_successfully: 464 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 465 raise Exception("Failed to copy all files") 466 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 467 return None 468 469 def move_or_copy_multiple_files( 470 self, files_to_move: list[dict], 471 action: str, 472 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 473 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 474 verbose: bool = False, 475 jobs_complete_for_logging: int = 500 476 ) -> None: 477 """ 478 Move or copy multiple files in parallel. 479 480 **Args:** 481 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 482 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 483 or `ops_utils.gcp_utils.COPY`). 484 - workers (int): Number of worker threads. Defaults to `10`. 485 - max_retries (int): Maximum number of retries. Defaults to `5`. 486 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 487 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 488 489 **Raises:** 490 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 491 """ 492 if action == MOVE: 493 cloud_function = self.move_cloud_file 494 elif action == COPY: 495 cloud_function = self.copy_cloud_file 496 else: 497 raise ValueError("Must either select move or copy") 498 499 list_of_jobs_args_list = [ 500 [ 501 file_dict['source_file'], file_dict['full_destination_path'] 502 ] 503 for file_dict in files_to_move 504 ] 505 MultiThreadedJobs().run_multi_threaded_job( 506 workers=workers, 507 function=cloud_function, 508 list_of_jobs_args_list=list_of_jobs_args_list, 509 max_retries=max_retries, 510 fail_on_error=True, 511 verbose=verbose, 512 collect_output=False, 513 jobs_complete_for_logging=jobs_complete_for_logging 514 ) 515 516 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 517 """ 518 Read the content of a file from GCS. 519 520 **Args:** 521 - cloud_path (str): The GCS path of the file to read. 522 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 523 524 **Returns:** 525 - bytes: The content of the file as bytes. 526 """ 527 blob = self.load_blob_from_full_path(cloud_path) 528 # Download the file content as bytes 529 content_bytes = blob.download_as_bytes() 530 # Convert bytes to string 531 content_str = content_bytes.decode(encoding) 532 return content_str 533 534 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 535 """ 536 Upload a file to GCS. 537 538 **Args:** 539 - destination_path (str): The destination GCS path. 540 - source_file (str): The source file path. 541 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 542 """ 543 blob = self.load_blob_from_full_path(destination_path) 544 if custom_metadata: 545 blob.metadata = custom_metadata 546 blob.upload_from_filename(source_file) 547 548 def get_object_md5( 549 self, 550 file_path: str, 551 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 552 chunk_size: int = parse_size("256 KB"), 553 logging_bytes: int = parse_size("1 GB"), 554 returned_md5_format: str = "hex" 555 ) -> str: 556 """ 557 Calculate the MD5 checksum of a file in GCS. 558 559 **Args:** 560 - file_path (str): The GCS path of the file. 561 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 562 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 563 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 564 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 565 566 **Returns:** 567 - str: The MD5 checksum of the file. 568 569 **Raises:** 570 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 571 or `ops_utils.gcp_utils.MD5_BASE64` 572 """ 573 if returned_md5_format not in ["hex", "base64"]: 574 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 575 576 blob = self.load_blob_from_full_path(file_path) 577 578 # Create an MD5 hash object 579 md5_hash = hashlib.md5() 580 581 blob_size_str = format_size(blob.size) 582 logging.info(f"Streaming {file_path} which is {blob_size_str}") 583 # Use a BytesIO stream to collect data in chunks and upload it 584 buffer = io.BytesIO() 585 total_bytes_streamed = 0 586 # Keep track of the last logged size for data logging 587 last_logged = 0 588 589 with blob.open("rb") as source_stream: 590 while True: 591 chunk = source_stream.read(chunk_size) 592 if not chunk: 593 break 594 md5_hash.update(chunk) 595 buffer.write(chunk) 596 total_bytes_streamed += len(chunk) 597 # Log progress every 1 gb if verbose used 598 if total_bytes_streamed - last_logged >= logging_bytes: 599 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 600 last_logged = total_bytes_streamed 601 602 if returned_md5_format == "hex": 603 md5 = md5_hash.hexdigest() 604 logging.info(f"MD5 (hex) for {file_path}: {md5}") 605 elif returned_md5_format == "base64": 606 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 607 logging.info(f"MD5 (base64) for {file_path}: {md5}") 608 return md5 609 610 def set_acl_public_read(self, cloud_path: str) -> None: 611 """ 612 Set the file in the bucket to be publicly readable. 613 614 **Args:** 615 - cloud_path (str): The GCS path of the file to be set as public readable. 616 """ 617 blob = self.load_blob_from_full_path(cloud_path) 618 blob.acl.all().grant_read() 619 blob.acl.save() 620 621 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 622 """ 623 Set the file in the bucket to grant OWNER permission to a specific group. 624 625 **Args:** 626 - cloud_path (str): The GCS path of the file. 627 - group_email (str): The email of the group to grant OWNER permission 628 """ 629 blob = self.load_blob_from_full_path(cloud_path) 630 blob.acl.group(group_email).grant_owner() 631 blob.acl.save() 632 633 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 634 """ 635 Set Cache-Control metadata for a file. 636 637 **Args:** 638 - cloud_path (str): The GCS path of the file. 639 - cache_control (str): The Cache-Control metadata to set. 640 """ 641 blob = self.load_blob_from_full_path(cloud_path) 642 blob.cache_control = cache_control 643 blob.patch() 644 645 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 646 """ 647 Get the most recent blob in the bucket. 648 649 If the blob with the most recent timestamp doesn't have 650 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 651 recent file until a log with useful information is encountered. This is useful when combing through 652 GCP activity logs for Terra workspace buckets. 653 654 **Args:** 655 - bucket_name (str): The GCS bucket name. 656 657 **Returns:** 658 - Optional tuple of the blob found and the file contents from the blob 659 """ 660 blobs = sorted( 661 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 662 ) 663 for blob in blobs: 664 # Download the file contents as a string 665 file_contents = blob.download_as_text() 666 667 # Check if the content matches the undesired format 668 lines = file_contents.splitlines() 669 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 670 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 671 continue 672 673 # If it doesn't match the undesired format, return its content 674 logging.info(f"Found valid file: {blob.name}") 675 return blob, file_contents 676 677 logging.info("No valid files found.") 678 return None 679 680 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 681 """ 682 Write content to a file in GCS. 683 684 **Args:** 685 - cloud_path (str): The GCS path of the file to write. 686 - file_contents (str): The content to write. 687 """ 688 blob = self.load_blob_from_full_path(cloud_path) 689 blob.upload_from_string(file_contents) 690 logging.info(f"Successfully wrote content to {cloud_path}") 691 692 @staticmethod 693 def get_active_gcloud_account() -> str: 694 """ 695 Get the active GCP email for the current account. 696 697 **Returns:** 698 - str: The active GCP account email. 699 """ 700 result = subprocess.run( 701 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 702 capture_output=True, 703 text=True, 704 check=True 705 ) 706 return result.stdout.strip()
Class to handle GCP Cloud Functions.
31 def __init__(self, project: Optional[str] = None) -> None: 32 """ 33 Initialize the GCPCloudFunctions class. 34 35 Authenticates using the default credentials and sets up the Storage Client. 36 Uses the `project_id` if provided, otherwise utilizes the default project set. 37 38 **Args:** 39 - project (str, optional): The GCP project ID 40 """ 41 from google.cloud import storage # type: ignore[attr-defined] 42 from google.auth import default 43 credentials, default_project = default() 44 if not project: 45 project = default_project 46 self.client = storage.Client(credentials=credentials, project=project) 47 """@private"""
Initialize the GCPCloudFunctions class.
Authenticates using the default credentials and sets up the Storage Client.
Uses the project_id
if provided, otherwise utilizes the default project set.
Args:
- project (str, optional): The GCP project ID
70 def load_blob_from_full_path(self, full_path: str) -> Blob: 71 """ 72 Load a GCS blob object from a full GCS path. 73 74 **Args:** 75 - full_path (str): The full GCS path. 76 77 **Returns:** 78 - google.cloud.storage.blob.Blob: The GCS blob object. 79 """ 80 file_path_components = self._process_cloud_path(full_path) 81 82 # Specify the billing project 83 bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project) 84 blob = bucket.blob(file_path_components["blob_url"]) 85 86 # If blob exists in GCS reload it so metadata is there 87 if blob.exists(): 88 blob.reload() 89 return blob
Load a GCS blob object from a full GCS path.
Args:
- full_path (str): The full GCS path.
Returns:
- google.cloud.storage.blob.Blob: The GCS blob object.
91 def check_file_exists(self, full_path: str) -> bool: 92 """ 93 Check if a file exists in GCS. 94 95 **Args:** 96 - full_path (str): The full GCS path. 97 98 **Returns:** 99 - bool: `True` if the file exists, `False` otherwise. 100 """ 101 blob = self.load_blob_from_full_path(full_path) 102 return blob.exists()
Check if a file exists in GCS.
Args:
- full_path (str): The full GCS path.
Returns:
- bool:
True
if the file exists,False
otherwise.
167 def list_bucket_contents( 168 self, 169 bucket_name: str, 170 prefix: Optional[str] = None, 171 file_extensions_to_ignore: list[str] = [], 172 file_strings_to_ignore: list[str] = [], 173 file_extensions_to_include: list[str] = [], 174 file_name_only: bool = False 175 ) -> list[dict]: 176 """ 177 List contents of a GCS bucket and return a list of dictionaries with file information. 178 179 **Args:** 180 - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed. 181 - prefix (str, optional): The prefix to filter the blobs. Defaults to None. 182 - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to []. 183 - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to []. 184 - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to []. 185 - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`. 186 187 **Returns:** 188 - list[dict]: A list of dictionaries containing file information. 189 """ 190 # If the bucket name starts with gs://, remove it 191 if bucket_name.startswith("gs://"): 192 bucket_name = bucket_name.split("/")[2].strip() 193 194 logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}") 195 196 # Get the bucket object and set user_project for Requester Pays 197 bucket = self.client.bucket(bucket_name, user_project=self.client.project) 198 199 # List blobs within the bucket 200 blobs = bucket.list_blobs(prefix=prefix) 201 logging.info("Finished listing blobs. Processing files now.") 202 203 # Create a list of dictionaries containing file information 204 file_list = [ 205 self._create_bucket_contents_dict( 206 blob=blob, bucket_name=bucket_name, file_name_only=file_name_only 207 ) 208 for blob in blobs 209 if self._validate_include_blob( 210 blob=blob, 211 file_extensions_to_ignore=file_extensions_to_ignore, 212 file_strings_to_ignore=file_strings_to_ignore, 213 file_extensions_to_include=file_extensions_to_include, 214 bucket_name=bucket_name 215 ) and not blob.name.endswith("/") 216 ] 217 logging.info(f"Found {len(file_list)} files in bucket") 218 return file_list
List contents of a GCS bucket and return a list of dictionaries with file information.
Args:
- bucket_name (str): The name of the GCS bucket. If includes
gs://
, it will be removed. - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
- file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
- file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
- file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
- file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to
False
.
Returns:
- list[dict]: A list of dictionaries containing file information.
220 def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None: 221 """ 222 Copy a file from one GCS location to another. 223 224 **Args:** 225 - src_cloud_path (str): The source GCS path. 226 - full_destination_path (str): The destination GCS path. 227 - verbose (bool, optional): Whether to log progress. Defaults to `False`. 228 """ 229 try: 230 src_blob = self.load_blob_from_full_path(src_cloud_path) 231 dest_blob = self.load_blob_from_full_path(full_destination_path) 232 233 # Use rewrite so no timeouts 234 rewrite_token = False 235 236 while True: 237 rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite( 238 src_blob, token=rewrite_token 239 ) 240 if verbose: 241 logging.info( 242 f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes." 243 ) 244 if not rewrite_token: 245 break 246 247 except Exception as e: 248 logging.error( 249 f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to " 250 f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted" 251 ) 252 raise
Copy a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
- verbose (bool, optional): Whether to log progress. Defaults to
False
.
254 def delete_cloud_file(self, full_cloud_path: str) -> None: 255 """ 256 Delete a file from GCS. 257 258 **Args:** 259 - full_cloud_path (str): The GCS path of the file to delete. 260 """ 261 blob = self.load_blob_from_full_path(full_cloud_path) 262 blob.delete()
Delete a file from GCS.
Args:
- full_cloud_path (str): The GCS path of the file to delete.
264 def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None: 265 """ 266 Move a file from one GCS location to another. 267 268 **Args:** 269 - src_cloud_path (str): The source GCS path. 270 - full_destination_path (str): The destination GCS path. 271 """ 272 self.copy_cloud_file(src_cloud_path, full_destination_path) 273 self.delete_cloud_file(src_cloud_path)
Move a file from one GCS location to another.
Args:
- src_cloud_path (str): The source GCS path.
- full_destination_path (str): The destination GCS path.
275 def get_filesize(self, target_path: str) -> int: 276 """ 277 Get the size of a file in GCS. 278 279 **Args:** 280 - target_path (str): The GCS path of the file. 281 282 **Returns:** 283 - int: The size of the file in bytes. 284 """ 285 blob = self.load_blob_from_full_path(target_path) 286 return blob.size
Get the size of a file in GCS.
Args:
- target_path (str): The GCS path of the file.
Returns:
- int: The size of the file in bytes.
288 def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool: 289 """ 290 Validate if two cloud files (source and destination) are identical based on their MD5 hashes. 291 292 **Args:** 293 - src_cloud_path (str): The source GCS path. 294 - dest_cloud_path (str): The destination GCS path. 295 296 **Returns:** 297 - bool: `True` if the files are identical, `False` otherwise. 298 """ 299 src_blob = self.load_blob_from_full_path(src_cloud_path) 300 dest_blob = self.load_blob_from_full_path(dest_cloud_path) 301 302 # If either blob is None or does not exist 303 if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists(): 304 return False 305 # If the MD5 hashes exist 306 if src_blob.md5_hash and dest_blob.md5_hash: 307 # And are the same return True 308 if src_blob.md5_hash == dest_blob.md5_hash: 309 return True 310 else: 311 # If md5 do not exist (for larger files they may not) check size matches 312 if src_blob.size == dest_blob.size: 313 return True 314 # Otherwise, return False 315 return False
Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
Args:
- src_cloud_path (str): The source GCS path.
- dest_cloud_path (str): The destination GCS path.
Returns:
- bool:
True
if the files are identical,False
otherwise.
317 def delete_multiple_files( 318 self, 319 files_to_delete: list[str], 320 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 321 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 322 verbose: bool = False, 323 job_complete_for_logging: int = 500 324 ) -> None: 325 """ 326 Delete multiple cloud files in parallel using multi-threading. 327 328 **Args:** 329 - files_to_delete (list[str]): List of GCS paths of the files to delete. 330 - workers (int, optional): Number of worker threads. Defaults to `10`. 331 - max_retries (int, optional): Maximum number of retries. Defaults to `5`. 332 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 333 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 334 """ 335 list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)] 336 337 MultiThreadedJobs().run_multi_threaded_job( 338 workers=workers, 339 function=self.delete_cloud_file, 340 list_of_jobs_args_list=list_of_jobs_args_list, 341 max_retries=max_retries, 342 fail_on_error=True, 343 verbose=verbose, 344 collect_output=False, 345 jobs_complete_for_logging=job_complete_for_logging 346 )
Delete multiple cloud files in parallel using multi-threading.
Args:
- files_to_delete (list[str]): List of GCS paths of the files to delete.
- workers (int, optional): Number of worker threads. Defaults to
10
. - max_retries (int, optional): Maximum number of retries. Defaults to
5
. - verbose (bool, optional): Whether to log each job's success. Defaults to
False
. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
365 def loop_and_log_validation_files_multithreaded( 366 self, 367 files_to_validate: list[dict], 368 log_difference: bool, 369 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 370 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 371 job_complete_for_logging: int = 500 372 ) -> list[dict]: 373 """ 374 Validate if multiple cloud files are identical based on their MD5 hashes using multithreading. 375 376 **Args:** 377 - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths. 378 - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running 379 this at the start of a copy/move operation to check if files are already copied. 380 - workers (int, optional): Number of worker threads. Defaults to `10`. 381 - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`. 382 - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 383 384 **Returns:** 385 - list[dict]: List of dictionaries containing files that are **not** identical. 386 """ 387 logging.info(f"Validating if {len(files_to_validate)} files are identical") 388 389 # Prepare jobs: pass the necessary arguments to each validation 390 jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate] 391 392 # Use multithreaded job runner to validate the files 393 checked_files = MultiThreadedJobs().run_multi_threaded_job( 394 workers=workers, 395 function=self._validate_file_pair, 396 list_of_jobs_args_list=jobs, 397 collect_output=True, 398 max_retries=max_retries, 399 jobs_complete_for_logging=job_complete_for_logging 400 ) 401 # If any files failed to load, raise an exception 402 if files_to_validate and None in checked_files: # type: ignore[operator] 403 logging.error("Failed to validate all files, could not load some blobs") 404 raise Exception("Failed to validate all files") 405 406 # Get all files that are not identical 407 not_identical_files = [ 408 file_dict 409 for file_dict in checked_files # type: ignore[operator, union-attr] 410 if not file_dict['identical'] 411 ] 412 if not_identical_files: 413 if log_difference: 414 for file_dict in not_identical_files: 415 logging.warning( 416 f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical" 417 ) 418 logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.") 419 return not_identical_files
Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
Args:
- files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
- log_difference (bool): Whether to log differences if files are not identical. Set
False
if you are running this at the start of a copy/move operation to check if files are already copied. - workers (int, optional): Number of worker threads. Defaults to
10
. - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to
5
. - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
Returns:
- list[dict]: List of dictionaries containing files that are not identical.
421 def multithread_copy_of_files_with_validation( 422 self, 423 files_to_copy: list[dict], 424 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 425 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 426 skip_check_if_already_copied: bool = False 427 ) -> None: 428 """ 429 Copy multiple files in parallel with validation. 430 431 **Args:** 432 - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. 433 Dictionary should have keys `source_file` and `full_destination_path` 434 - workers (int): Number of worker threads. Defaults to `10`. 435 - max_retries (int): Maximum number of retries. Defaults to `5` 436 - skip_check_if_already_copied (bool, optional): Whether to skip checking 437 if files are already copied and start copying right away. Defaults to `False`. 438 """ 439 if skip_check_if_already_copied: 440 logging.info("Skipping check if files are already copied") 441 updated_files_to_move = files_to_copy 442 else: 443 updated_files_to_move = self.loop_and_log_validation_files_multithreaded( 444 files_to_copy, 445 log_difference=False, 446 workers=workers, 447 max_retries=max_retries 448 ) 449 # If all files are already copied, return 450 if not updated_files_to_move: 451 logging.info("All files are already copied") 452 return None 453 logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files") 454 self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries) 455 logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original") 456 # Validate that all files were copied successfully 457 files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded( 458 files_to_copy, 459 workers=workers, 460 log_difference=True, 461 max_retries=max_retries 462 ) 463 if files_not_moved_successfully: 464 logging.error(f"Failed to copy {len(files_not_moved_successfully)} files") 465 raise Exception("Failed to copy all files") 466 logging.info(f"Successfully copied {len(updated_files_to_move)} files") 467 return None
Copy multiple files in parallel with validation.
Args:
- files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
Dictionary should have keys
source_file
andfull_destination_path
- workers (int): Number of worker threads. Defaults to
10
. - max_retries (int): Maximum number of retries. Defaults to
5
- skip_check_if_already_copied (bool, optional): Whether to skip checking
if files are already copied and start copying right away. Defaults to
False
.
469 def move_or_copy_multiple_files( 470 self, files_to_move: list[dict], 471 action: str, 472 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 473 max_retries: int = ARG_DEFAULTS["max_retries"], # type: ignore[assignment] 474 verbose: bool = False, 475 jobs_complete_for_logging: int = 500 476 ) -> None: 477 """ 478 Move or copy multiple files in parallel. 479 480 **Args:** 481 - files_to_move (list[dict]): List of dictionaries containing source and destination file paths. 482 - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE` 483 or `ops_utils.gcp_utils.COPY`). 484 - workers (int): Number of worker threads. Defaults to `10`. 485 - max_retries (int): Maximum number of retries. Defaults to `5`. 486 - verbose (bool, optional): Whether to log each job's success. Defaults to `False`. 487 - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`. 488 489 **Raises:** 490 - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`. 491 """ 492 if action == MOVE: 493 cloud_function = self.move_cloud_file 494 elif action == COPY: 495 cloud_function = self.copy_cloud_file 496 else: 497 raise ValueError("Must either select move or copy") 498 499 list_of_jobs_args_list = [ 500 [ 501 file_dict['source_file'], file_dict['full_destination_path'] 502 ] 503 for file_dict in files_to_move 504 ] 505 MultiThreadedJobs().run_multi_threaded_job( 506 workers=workers, 507 function=cloud_function, 508 list_of_jobs_args_list=list_of_jobs_args_list, 509 max_retries=max_retries, 510 fail_on_error=True, 511 verbose=verbose, 512 collect_output=False, 513 jobs_complete_for_logging=jobs_complete_for_logging 514 )
Move or copy multiple files in parallel.
Args:
- files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
- action (str): The action to perform (should be one of
ops_utils.gcp_utils.MOVE
orops_utils.gcp_utils.COPY
). - workers (int): Number of worker threads. Defaults to
10
. - max_retries (int): Maximum number of retries. Defaults to
5
. - verbose (bool, optional): Whether to log each job's success. Defaults to
False
. - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to
500
.
Raises:
- ValueError: If the action is not one of
ops_utils.gcp_utils.MOVE
orops_utils.gcp_utils.COPY
.
516 def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str: 517 """ 518 Read the content of a file from GCS. 519 520 **Args:** 521 - cloud_path (str): The GCS path of the file to read. 522 - encoding (str, optional): The encoding to use. Defaults to `utf-8`. 523 524 **Returns:** 525 - bytes: The content of the file as bytes. 526 """ 527 blob = self.load_blob_from_full_path(cloud_path) 528 # Download the file content as bytes 529 content_bytes = blob.download_as_bytes() 530 # Convert bytes to string 531 content_str = content_bytes.decode(encoding) 532 return content_str
Read the content of a file from GCS.
Args:
- cloud_path (str): The GCS path of the file to read.
- encoding (str, optional): The encoding to use. Defaults to
utf-8
.
Returns:
- bytes: The content of the file as bytes.
534 def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None: 535 """ 536 Upload a file to GCS. 537 538 **Args:** 539 - destination_path (str): The destination GCS path. 540 - source_file (str): The source file path. 541 - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None. 542 """ 543 blob = self.load_blob_from_full_path(destination_path) 544 if custom_metadata: 545 blob.metadata = custom_metadata 546 blob.upload_from_filename(source_file)
Upload a file to GCS.
Args:
- destination_path (str): The destination GCS path.
- source_file (str): The source file path.
- custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
548 def get_object_md5( 549 self, 550 file_path: str, 551 # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2 552 chunk_size: int = parse_size("256 KB"), 553 logging_bytes: int = parse_size("1 GB"), 554 returned_md5_format: str = "hex" 555 ) -> str: 556 """ 557 Calculate the MD5 checksum of a file in GCS. 558 559 **Args:** 560 - file_path (str): The GCS path of the file. 561 - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`. 562 - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`. 563 - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`. 564 Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`. 565 566 **Returns:** 567 - str: The MD5 checksum of the file. 568 569 **Raises:** 570 - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX` 571 or `ops_utils.gcp_utils.MD5_BASE64` 572 """ 573 if returned_md5_format not in ["hex", "base64"]: 574 raise ValueError("returned_md5_format must be 'hex' or 'base64'") 575 576 blob = self.load_blob_from_full_path(file_path) 577 578 # Create an MD5 hash object 579 md5_hash = hashlib.md5() 580 581 blob_size_str = format_size(blob.size) 582 logging.info(f"Streaming {file_path} which is {blob_size_str}") 583 # Use a BytesIO stream to collect data in chunks and upload it 584 buffer = io.BytesIO() 585 total_bytes_streamed = 0 586 # Keep track of the last logged size for data logging 587 last_logged = 0 588 589 with blob.open("rb") as source_stream: 590 while True: 591 chunk = source_stream.read(chunk_size) 592 if not chunk: 593 break 594 md5_hash.update(chunk) 595 buffer.write(chunk) 596 total_bytes_streamed += len(chunk) 597 # Log progress every 1 gb if verbose used 598 if total_bytes_streamed - last_logged >= logging_bytes: 599 logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far") 600 last_logged = total_bytes_streamed 601 602 if returned_md5_format == "hex": 603 md5 = md5_hash.hexdigest() 604 logging.info(f"MD5 (hex) for {file_path}: {md5}") 605 elif returned_md5_format == "base64": 606 md5 = base64.b64encode(md5_hash.digest()).decode("utf-8") 607 logging.info(f"MD5 (base64) for {file_path}: {md5}") 608 return md5
Calculate the MD5 checksum of a file in GCS.
Args:
- file_path (str): The GCS path of the file.
- chunk_size (int, optional): The size of each chunk to read. Defaults to
256 KB
. - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to
1 GB
. - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to
hex
. Options areops_utils.gcp_utils.MD5_HEX
orops_utils.gcp_utils.MD5_BASE64
.
Returns:
- str: The MD5 checksum of the file.
Raises:
- ValueError: If the
returned_md5_format
is not one ofops_utils.gcp_utils.MD5_HEX
orops_utils.gcp_utils.MD5_BASE64
610 def set_acl_public_read(self, cloud_path: str) -> None: 611 """ 612 Set the file in the bucket to be publicly readable. 613 614 **Args:** 615 - cloud_path (str): The GCS path of the file to be set as public readable. 616 """ 617 blob = self.load_blob_from_full_path(cloud_path) 618 blob.acl.all().grant_read() 619 blob.acl.save()
Set the file in the bucket to be publicly readable.
Args:
- cloud_path (str): The GCS path of the file to be set as public readable.
621 def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None: 622 """ 623 Set the file in the bucket to grant OWNER permission to a specific group. 624 625 **Args:** 626 - cloud_path (str): The GCS path of the file. 627 - group_email (str): The email of the group to grant OWNER permission 628 """ 629 blob = self.load_blob_from_full_path(cloud_path) 630 blob.acl.group(group_email).grant_owner() 631 blob.acl.save()
Set the file in the bucket to grant OWNER permission to a specific group.
Args:
- cloud_path (str): The GCS path of the file.
- group_email (str): The email of the group to grant OWNER permission
633 def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None: 634 """ 635 Set Cache-Control metadata for a file. 636 637 **Args:** 638 - cloud_path (str): The GCS path of the file. 639 - cache_control (str): The Cache-Control metadata to set. 640 """ 641 blob = self.load_blob_from_full_path(cloud_path) 642 blob.cache_control = cache_control 643 blob.patch()
Set Cache-Control metadata for a file.
Args:
- cloud_path (str): The GCS path of the file.
- cache_control (str): The Cache-Control metadata to set.
645 def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]: 646 """ 647 Get the most recent blob in the bucket. 648 649 If the blob with the most recent timestamp doesn't have 650 any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most 651 recent file until a log with useful information is encountered. This is useful when combing through 652 GCP activity logs for Terra workspace buckets. 653 654 **Args:** 655 - bucket_name (str): The GCS bucket name. 656 657 **Returns:** 658 - Optional tuple of the blob found and the file contents from the blob 659 """ 660 blobs = sorted( 661 self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True 662 ) 663 for blob in blobs: 664 # Download the file contents as a string 665 file_contents = blob.download_as_text() 666 667 # Check if the content matches the undesired format 668 lines = file_contents.splitlines() 669 if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"': 670 logging.info(f"Skipping file {blob.name} as it matches the undesired format.") 671 continue 672 673 # If it doesn't match the undesired format, return its content 674 logging.info(f"Found valid file: {blob.name}") 675 return blob, file_contents 676 677 logging.info("No valid files found.") 678 return None
Get the most recent blob in the bucket.
If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.
Args:
- bucket_name (str): The GCS bucket name.
Returns:
- Optional tuple of the blob found and the file contents from the blob
680 def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None: 681 """ 682 Write content to a file in GCS. 683 684 **Args:** 685 - cloud_path (str): The GCS path of the file to write. 686 - file_contents (str): The content to write. 687 """ 688 blob = self.load_blob_from_full_path(cloud_path) 689 blob.upload_from_string(file_contents) 690 logging.info(f"Successfully wrote content to {cloud_path}")
Write content to a file in GCS.
Args:
- cloud_path (str): The GCS path of the file to write.
- file_contents (str): The content to write.
692 @staticmethod 693 def get_active_gcloud_account() -> str: 694 """ 695 Get the active GCP email for the current account. 696 697 **Returns:** 698 - str: The active GCP account email. 699 """ 700 result = subprocess.run( 701 args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"], 702 capture_output=True, 703 text=True, 704 check=True 705 ) 706 return result.stdout.strip()
Get the active GCP email for the current account.
Returns:
- str: The active GCP account email.