ops_utils.gcp_utils

Module for GCP utilities.

   1"""Module for GCP utilities."""
   2import os
   3import logging
   4import time
   5import io
   6import json
   7import hashlib
   8import base64
   9import subprocess
  10
  11from humanfriendly import format_size, parse_size
  12from mimetypes import guess_type
  13from typing import Optional, Any
  14from google.cloud.storage.blob import Blob
  15from google.api_core.exceptions import Forbidden, GoogleAPICallError
  16from google.oauth2 import service_account
  17from google.cloud import storage
  18from google.auth import default
  19
  20from .vars import ARG_DEFAULTS
  21from .thread_pool_executor_util import MultiThreadedJobs
  22
  23MOVE = "move"
  24"""Variable to be used for the "action" when files should be moved."""
  25COPY = "copy"
  26"""Variable to be used for the "action" when files should be copied."""
  27MD5_HEX = "hex"
  28"""Variable to be used when the generated `md5` should be of the `hex` type."""
  29MD5_BASE64 = "base64"
  30"""Variable to be used when the generated `md5` should be of the `base64` type."""
  31
  32
  33class GCPCloudFunctions:
  34    """Class to handle GCP Cloud Functions."""
  35
  36    def __init__(
  37            self,
  38            project: Optional[str] = None,
  39            service_account_json: Optional[str] = None
  40    ) -> None:
  41        """
  42        Initialize the GCPCloudFunctions class.
  43
  44        Authenticates using service account JSON if provided or default credentials,
  45        and sets up the Storage Client.
  46
  47        Args:
  48            project: Optional[str] = None
  49                The GCP project ID. If not provided, will use project from service account or default.
  50            service_account_json: Optional[str] = None
  51                Path to service account JSON key file. If provided, will use these credentials.
  52        """
  53        # Initialize credentials and project
  54        credentials = None
  55        default_project = None
  56
  57        if service_account_json:
  58            credentials = service_account.Credentials.from_service_account_file(service_account_json)
  59            # Extract project from service account if not specified
  60            if not project:
  61                with open(service_account_json, 'r') as f:
  62                    sa_info = json.load(f)
  63                    project = sa_info.get('project_id')
  64        else:
  65            # Use default credentials
  66            credentials, default_project = default()
  67
  68        # Set project if not already set
  69        if not project:
  70            project = default_project
  71
  72        self.client = storage.Client(credentials=credentials, project=project)
  73        """@private"""
  74
  75    @staticmethod
  76    def _process_cloud_path(cloud_path: str) -> dict:
  77        """
  78        Process a GCS cloud path into its components.
  79
  80        Args:
  81            cloud_path (str): The GCS cloud path.
  82
  83        Returns:
  84            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
  85        """
  86        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
  87        bucket_name = str.split(remaining_url, sep="/")[0]
  88        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
  89        path_components = {
  90            "platform_prefix": platform_prefix,
  91            "bucket": bucket_name,
  92            "blob_url": blob_name
  93        }
  94        return path_components
  95
  96    def load_blob_from_full_path(self, full_path: str) -> Blob:
  97        """
  98        Load a GCS blob object from a full GCS path.
  99
 100        **Args:**
 101        - full_path (str): The full GCS path.
 102
 103        **Returns:**
 104        - google.cloud.storage.blob.Blob: The GCS blob object.
 105        """
 106        file_path_components = self._process_cloud_path(full_path)
 107
 108        # Specify the billing project
 109        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
 110        blob = bucket.blob(file_path_components["blob_url"])
 111
 112        # If blob exists in GCS reload it so metadata is there
 113        if blob.exists():
 114            blob.reload()
 115        return blob
 116
 117    def check_file_exists(self, full_path: str) -> bool:
 118        """
 119        Check if a file exists in GCS.
 120
 121        **Args:**
 122        - full_path (str): The full GCS path.
 123
 124        **Returns:**
 125        - bool: `True` if the file exists, `False` otherwise.
 126        """
 127        blob = self.load_blob_from_full_path(full_path)
 128        return blob.exists()
 129
 130    @staticmethod
 131    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
 132        """
 133        Create a dictionary containing file information.
 134
 135        Args:
 136            bucket_name (str): The name of the GCS bucket.
 137            blob (Any): The GCS blob object.
 138            file_name_only (bool): Whether to return only the file list.
 139
 140        Returns:
 141            dict: A dictionary containing file information.
 142        """
 143        if file_name_only:
 144            return {
 145                "path": f"gs://{bucket_name}/{blob.name}"
 146            }
 147        return {
 148            "name": os.path.basename(blob.name),
 149            "path": f"gs://{bucket_name}/{blob.name}",
 150            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
 151            "file_extension": os.path.splitext(blob.name)[1],
 152            "size_in_bytes": blob.size,
 153            "md5_hash": blob.md5_hash,
 154            "time_created": blob.time_created.isoformat() if blob.time_created else None,
 155            "last_modified": blob.updated.isoformat() if blob.updated else None,
 156        }
 157
 158    @staticmethod
 159    def _validate_include_blob(
 160            blob: Any,
 161            bucket_name: str,
 162            file_extensions_to_ignore: list[str] = [],
 163            file_strings_to_ignore: list[str] = [],
 164            file_extensions_to_include: list[str] = [],
 165            verbose: bool = False
 166    ) -> bool:
 167        """
 168        Validate if a blob should be included based on its file extension.
 169
 170        Args:
 171            file_extensions_to_include (list[str]): List of file extensions to include.
 172            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
 173            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
 174            blob (Any): The GCS blob object.
 175            verbose (bool): Whether to log files not being included.
 176
 177        Returns:
 178            bool: True if the blob should be included, False otherwise.
 179        """
 180        file_path = f"gs://{bucket_name}/{blob.name}"
 181        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
 182            if verbose:
 183                logging.info(f"Skipping {file_path} as it has an extension to ignore")
 184            return False
 185        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
 186            if verbose:
 187                logging.info(f"Skipping {file_path} as it does not have an extension to include")
 188            return False
 189        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
 190            if verbose:
 191                logging.info(f"Skipping {file_path} as it has a string to ignore")
 192            return False
 193        return True
 194
 195    def list_bucket_contents(
 196            self,
 197            bucket_name: str,
 198            prefix: Optional[str] = None,
 199            file_extensions_to_ignore: list[str] = [],
 200            file_strings_to_ignore: list[str] = [],
 201            file_extensions_to_include: list[str] = [],
 202            file_name_only: bool = False,
 203            verbose: bool = False,
 204            log_progress_interval: int = 10000
 205    ) -> list[dict]:
 206        """
 207        List contents of a GCS bucket and return a list of dictionaries with file information.
 208
 209        **Args:**
 210        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
 211        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
 212        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
 213        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
 214        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
 215        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
 216        - verbose (bool, optional): Whether to log files not being included. Defaults to `False`.
 217        - log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to `10000`.
 218
 219        **Returns:**
 220        - list[dict]: A list of dictionaries containing file information.
 221        """
 222        # If the bucket name starts with gs://, remove it
 223        if bucket_name.startswith("gs://"):
 224            bucket_name = bucket_name.split("/")[2].strip()
 225
 226        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
 227
 228        # Get the bucket object and set user_project for Requester Pays
 229        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
 230
 231        # list_blobs returns a lazy iterator — no network calls happen until we iterate.
 232        # page_size=1000 is the GCS maximum, minimising the number of paginated round-trips.
 233        prefix_msg = f" with prefix '{prefix}'" if prefix else ""
 234        logging.info(f"Starting to page through blobs in bucket '{bucket_name}'{prefix_msg}...")
 235        blobs = bucket.list_blobs(prefix=prefix, page_size=1000)
 236
 237        # Iterate with progress logging so large buckets don't appear stuck
 238        file_list = []
 239        for blob in blobs:
 240            if blob.name.endswith("/"):
 241                continue
 242            if not self._validate_include_blob(
 243                blob=blob,
 244                file_extensions_to_ignore=file_extensions_to_ignore,
 245                file_strings_to_ignore=file_strings_to_ignore,
 246                file_extensions_to_include=file_extensions_to_include,
 247                bucket_name=bucket_name
 248            ):
 249                continue
 250            file_list.append(
 251                self._create_bucket_contents_dict(
 252                    blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
 253                )
 254            )
 255            if verbose and len(file_list) % log_progress_interval == 0:
 256                logging.info(f"Processed {len(file_list):,} files so far...")
 257
 258        logging.info(f"Found {len(file_list):,} files in bucket")
 259        return file_list
 260
 261    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
 262        """
 263        Copy a file from one GCS location to another.
 264
 265        **Args:**
 266        - src_cloud_path (str): The source GCS path.
 267        - full_destination_path (str): The destination GCS path.
 268        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
 269        """
 270        try:
 271            src_blob = self.load_blob_from_full_path(src_cloud_path)
 272            dest_blob = self.load_blob_from_full_path(full_destination_path)
 273
 274            # Use rewrite so no timeouts
 275            rewrite_token = False
 276
 277            while True:
 278                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
 279                    src_blob, token=rewrite_token
 280                )
 281                if verbose:
 282                    logging.info(
 283                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
 284                    )
 285                if not rewrite_token:
 286                    break
 287
 288        except Exception as e:
 289            logging.error(
 290                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
 291                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
 292            )
 293            raise
 294
 295    def delete_cloud_file(self, full_cloud_path: str) -> None:
 296        """
 297        Delete a file from GCS.
 298
 299        **Args:**
 300        - full_cloud_path (str): The GCS path of the file to delete.
 301        """
 302        blob = self.load_blob_from_full_path(full_cloud_path)
 303        blob.delete()
 304
 305    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
 306        """
 307        Move a file from one GCS location to another.
 308
 309        **Args:**
 310        - src_cloud_path (str): The source GCS path.
 311        - full_destination_path (str): The destination GCS path.
 312        """
 313        self.copy_cloud_file(src_cloud_path, full_destination_path)
 314        self.delete_cloud_file(src_cloud_path)
 315
 316    def get_filesize(self, target_path: str) -> int:
 317        """
 318        Get the size of a file in GCS.
 319
 320        **Args:**
 321        - target_path (str): The GCS path of the file.
 322
 323        **Returns:**
 324        - int: The size of the file in bytes.
 325        """
 326        blob = self.load_blob_from_full_path(target_path)
 327        return blob.size
 328
 329    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
 330        """
 331        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
 332
 333        **Args:**
 334        - src_cloud_path (str): The source GCS path.
 335        - dest_cloud_path (str): The destination GCS path.
 336
 337        **Returns:**
 338        - bool: `True` if the files are identical, `False` otherwise.
 339        """
 340        src_blob = self.load_blob_from_full_path(src_cloud_path)
 341        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
 342
 343        # If either blob is None or does not exist
 344        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
 345            return False
 346        # If the MD5 hashes exist
 347        if src_blob.md5_hash and dest_blob.md5_hash:
 348            # And are the same return True
 349            if src_blob.md5_hash == dest_blob.md5_hash:
 350                return True
 351        else:
 352            # If md5 do not exist (for larger files they may not) check size matches
 353            if src_blob.size == dest_blob.size:
 354                return True
 355        # Otherwise, return False
 356        return False
 357
 358    def delete_multiple_files(
 359            self,
 360            files_to_delete: list[str],
 361            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 362            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 363            verbose: bool = False,
 364            job_complete_for_logging: int = 500
 365    ) -> None:
 366        """
 367        Delete multiple cloud files in parallel using multi-threading.
 368
 369        **Args:**
 370        - files_to_delete (list[str]): List of GCS paths of the files to delete.
 371        - workers (int, optional): Number of worker threads. Defaults to `10`.
 372        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
 373        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
 374        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
 375        """
 376        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
 377
 378        MultiThreadedJobs().run_multi_threaded_job(
 379            workers=workers,
 380            function=self.delete_cloud_file,
 381            list_of_jobs_args_list=list_of_jobs_args_list,
 382            max_retries=max_retries,
 383            fail_on_error=True,
 384            verbose=verbose,
 385            collect_output=False,
 386            jobs_complete_for_logging=job_complete_for_logging
 387        )
 388
 389    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
 390        """
 391        Validate if source and destination files are identical.
 392
 393        **Args:**
 394        - source_file (str): The source file path.
 395        - full_destination_path (str): The destination file path.
 396
 397        **Returns:**
 398            dict: The file dictionary of the files with a boolean indicating if they are identical.
 399        """
 400        if self.validate_files_are_same(source_file, full_destination_path):
 401            identical = True
 402        else:
 403            identical = False
 404        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
 405
 406    def loop_and_log_validation_files_multithreaded(
 407            self,
 408            files_to_validate: list[dict],
 409            log_difference: bool,
 410            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 411            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 412            job_complete_for_logging: int = 500
 413    ) -> list[dict]:
 414        """
 415        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
 416
 417        **Args:**
 418        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
 419        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
 420                                   this at the start of a copy/move operation to check if files are already copied.
 421        - workers (int, optional): Number of worker threads. Defaults to `10`.
 422        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
 423        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
 424
 425        **Returns:**
 426        - list[dict]: List of dictionaries containing files that are **not** identical.
 427        """
 428        logging.info(f"Validating if {len(files_to_validate)} files are identical")
 429
 430        # Prepare jobs: pass the necessary arguments to each validation
 431        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
 432
 433        # Use multithreaded job runner to validate the files
 434        checked_files = MultiThreadedJobs().run_multi_threaded_job(
 435            workers=workers,
 436            function=self._validate_file_pair,
 437            list_of_jobs_args_list=jobs,
 438            collect_output=True,
 439            max_retries=max_retries,
 440            jobs_complete_for_logging=job_complete_for_logging
 441        )
 442        # If any files failed to load, raise an exception
 443        if files_to_validate and None in checked_files:  # type: ignore[operator]
 444            logging.error("Failed to validate all files, could not load some blobs")
 445            raise Exception("Failed to validate all files")
 446
 447        # Get all files that are not identical
 448        not_identical_files = [
 449            file_dict
 450            for file_dict in checked_files  # type: ignore[operator, union-attr]
 451            if not file_dict['identical']
 452        ]
 453        if not_identical_files:
 454            if log_difference:
 455                for file_dict in not_identical_files:
 456                    logging.warning(
 457                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
 458                    )
 459            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
 460        return not_identical_files
 461
 462    def multithread_copy_of_files_with_validation(
 463            self,
 464            files_to_copy: list[dict],
 465            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 466            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 467            skip_check_if_already_copied: bool = False
 468    ) -> None:
 469        """
 470        Copy multiple files in parallel with validation.
 471
 472        **Args:**
 473        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
 474                Dictionary should have keys `source_file` and `full_destination_path`
 475        - workers (int): Number of worker threads. Defaults to `10`.
 476        - max_retries (int): Maximum number of retries. Defaults to `5`
 477        - skip_check_if_already_copied (bool, optional): Whether to skip checking
 478                if files are already copied and start copying right away. Defaults to `False`.
 479        """
 480        if skip_check_if_already_copied:
 481            logging.info("Skipping check if files are already copied")
 482            updated_files_to_move = files_to_copy
 483        else:
 484            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
 485                files_to_copy,
 486                log_difference=False,
 487                workers=workers,
 488                max_retries=max_retries
 489            )
 490        # If all files are already copied, return
 491        if not updated_files_to_move:
 492            logging.info("All files are already copied")
 493            return None
 494        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
 495        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
 496        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
 497        # Validate that all files were copied successfully
 498        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
 499            files_to_copy,
 500            workers=workers,
 501            log_difference=True,
 502            max_retries=max_retries
 503        )
 504        if files_not_moved_successfully:
 505            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
 506            raise Exception("Failed to copy all files")
 507        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
 508        return None
 509
 510    def move_or_copy_multiple_files(
 511            self, files_to_move: list[dict],
 512            action: str,
 513            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 514            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 515            verbose: bool = False,
 516            jobs_complete_for_logging: int = 500
 517    ) -> None:
 518        """
 519        Move or copy multiple files in parallel.
 520
 521        **Args:**
 522        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
 523        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
 524        or `ops_utils.gcp_utils.COPY`).
 525        - workers (int): Number of worker threads. Defaults to `10`.
 526        - max_retries (int): Maximum number of retries. Defaults to `5`.
 527        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
 528        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
 529
 530        **Raises:**
 531        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
 532        """
 533        if action == MOVE:
 534            cloud_function = self.move_cloud_file
 535        elif action == COPY:
 536            cloud_function = self.copy_cloud_file
 537        else:
 538            raise ValueError("Must either select move or copy")
 539
 540        list_of_jobs_args_list = [
 541            [
 542                file_dict['source_file'], file_dict['full_destination_path']
 543            ]
 544            for file_dict in files_to_move
 545        ]
 546        MultiThreadedJobs().run_multi_threaded_job(
 547            workers=workers,
 548            function=cloud_function,
 549            list_of_jobs_args_list=list_of_jobs_args_list,
 550            max_retries=max_retries,
 551            fail_on_error=True,
 552            verbose=verbose,
 553            collect_output=False,
 554            jobs_complete_for_logging=jobs_complete_for_logging
 555        )
 556
 557    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
 558        """
 559        Read the content of a file from GCS.
 560
 561        **Args:**
 562        - cloud_path (str): The GCS path of the file to read.
 563        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
 564
 565        **Returns:**
 566        - bytes: The content of the file as bytes.
 567        """
 568        blob = self.load_blob_from_full_path(cloud_path)
 569        # Download the file content as bytes
 570        content_bytes = blob.download_as_bytes()
 571        # Convert bytes to string
 572        content_str = content_bytes.decode(encoding)
 573        return content_str
 574
 575    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
 576        """
 577        Upload a file to GCS.
 578
 579        **Args:**
 580        - destination_path (str): The destination GCS path.
 581        - source_file (str): The source file path.
 582        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
 583        """
 584        blob = self.load_blob_from_full_path(destination_path)
 585        if custom_metadata:
 586            blob.metadata = custom_metadata
 587        blob.upload_from_filename(source_file)
 588
 589    def get_object_md5(
 590        self,
 591        file_path: str,
 592        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
 593        chunk_size: int = parse_size("256 KB"),
 594        logging_bytes: int = parse_size("1 GB"),
 595        returned_md5_format: str = "hex"
 596    ) -> str:
 597        """
 598        Calculate the MD5 checksum of a file in GCS.
 599
 600        **Args:**
 601        - file_path (str): The GCS path of the file.
 602        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
 603        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
 604        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
 605                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
 606
 607        **Returns:**
 608        - str: The MD5 checksum of the file.
 609
 610        **Raises:**
 611        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
 612        or `ops_utils.gcp_utils.MD5_BASE64`
 613        """
 614        if returned_md5_format not in ["hex", "base64"]:
 615            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
 616
 617        blob = self.load_blob_from_full_path(file_path)
 618
 619        # Create an MD5 hash object
 620        md5_hash = hashlib.md5()
 621
 622        blob_size_str = format_size(blob.size)
 623        logging.info(f"Streaming {file_path} which is {blob_size_str}")
 624        # Use a BytesIO stream to collect data in chunks and upload it
 625        buffer = io.BytesIO()
 626        total_bytes_streamed = 0
 627        # Keep track of the last logged size for data logging
 628        last_logged = 0
 629
 630        with blob.open("rb") as source_stream:
 631            while True:
 632                chunk = source_stream.read(chunk_size)
 633                if not chunk:
 634                    break
 635                md5_hash.update(chunk)
 636                buffer.write(chunk)
 637                total_bytes_streamed += len(chunk)
 638                # Log progress every 1 gb if verbose used
 639                if total_bytes_streamed - last_logged >= logging_bytes:
 640                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
 641                    last_logged = total_bytes_streamed
 642
 643        if returned_md5_format == "hex":
 644            md5 = md5_hash.hexdigest()
 645            logging.info(f"MD5 (hex) for {file_path}: {md5}")
 646        elif returned_md5_format == "base64":
 647            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
 648            logging.info(f"MD5 (base64) for {file_path}: {md5}")
 649        return md5
 650
 651    def set_acl_public_read(self, cloud_path: str) -> None:
 652        """
 653        Set the file in the bucket to be publicly readable.
 654
 655        **Args:**
 656        - cloud_path (str): The GCS path of the file to be set as public readable.
 657        """
 658        blob = self.load_blob_from_full_path(cloud_path)
 659        blob.acl.all().grant_read()
 660        blob.acl.save()
 661
 662    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
 663        """
 664        Set the file in the bucket to grant OWNER permission to a specific group.
 665
 666        **Args:**
 667        - cloud_path (str): The GCS path of the file.
 668        - group_email (str): The email of the group to grant OWNER permission
 669        """
 670        blob = self.load_blob_from_full_path(cloud_path)
 671        blob.acl.group(group_email).grant_owner()
 672        blob.acl.save()
 673
 674    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
 675        """
 676        Set Cache-Control metadata for a file.
 677
 678        **Args:**
 679        - cloud_path (str): The GCS path of the file.
 680        - cache_control (str): The Cache-Control metadata to set.
 681        """
 682        blob = self.load_blob_from_full_path(cloud_path)
 683        blob.cache_control = cache_control
 684        blob.patch()
 685
 686    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
 687        """
 688        Get the most recent blob in the bucket.
 689        
 690        If the blob with the most recent timestamp doesn't have
 691        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
 692        recent file until a log with useful information is encountered. This is useful when combing through
 693        GCP activity logs for Terra workspace buckets.
 694
 695        **Args:**
 696        - bucket_name (str): The GCS bucket name.
 697
 698        **Returns:**
 699        - Optional tuple of the blob found and the file contents from the blob
 700        """
 701        blobs = sorted(
 702            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
 703        )
 704        for blob in blobs:
 705            # Download the file contents as a string
 706            file_contents = blob.download_as_text()
 707
 708            # Check if the content matches the undesired format
 709            lines = file_contents.splitlines()
 710            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
 711                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
 712                continue
 713
 714            # If it doesn't match the undesired format, return its content
 715            logging.info(f"Found valid file: {blob.name}")
 716            return blob, file_contents
 717
 718        logging.info("No valid files found.")
 719        return None
 720
 721    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
 722        """
 723        Write content to a file in GCS.
 724
 725        **Args:**
 726        - cloud_path (str): The GCS path of the file to write.
 727        - file_contents (str): The content to write.
 728        """
 729        blob = self.load_blob_from_full_path(cloud_path)
 730        blob.upload_from_string(file_contents)
 731        logging.info(f"Successfully wrote content to {cloud_path}")
 732
 733    @staticmethod
 734    def get_active_gcloud_account() -> str:
 735        """
 736        Get the active GCP email for the current account.
 737
 738        **Returns:**
 739        - str: The active GCP account email.
 740        """
 741        result = subprocess.run(
 742            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
 743            capture_output=True,
 744            text=True,
 745            check=True
 746        )
 747        return result.stdout.strip()
 748
 749    def has_write_permission(self, cloud_path: str) -> bool:
 750        """
 751        Check if the current user has permission to write to a GCP path.
 752
 753        This method tests write access by attempting to update the metadata
 754        of an existing blob or create a zero-byte temporary file if the blob
 755        doesn't exist. The temporary file is deleted immediately if created.
 756
 757        **Args:**
 758        - cloud_path (str): The GCS path to check for write permissions.
 759
 760        **Returns:**
 761        - bool: True if the user has write permission, False otherwise.
 762        """
 763        if not cloud_path.startswith("gs://"):
 764            raise ValueError("cloud_path must start with 'gs://'")
 765        if cloud_path.endswith("/"):
 766            logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp")
 767            cloud_path = f"{cloud_path}permission_test_temp"
 768        try:
 769            blob = self.load_blob_from_full_path(cloud_path)
 770            if blob.exists():
 771                # Try updating metadata (doesn't change the content)
 772                original_metadata = blob.metadata or {}
 773                test_metadata = original_metadata.copy()
 774                test_metadata["_write_permission_test"] = "true"
 775
 776                blob.metadata = test_metadata
 777                blob.patch()
 778
 779                # Restore the original metadata
 780                blob.metadata = original_metadata
 781                blob.patch()
 782
 783                logging.info(f"Write permission confirmed for existing blob {cloud_path}")
 784                return True
 785            else:
 786                # Try writing a temporary file to the bucket
 787                blob.upload_from_string("")
 788
 789                # Clean up the test file
 790                blob.delete()
 791                logging.info(f"Write permission confirmed for {cloud_path}")
 792                return True
 793        except Forbidden:
 794            logging.warning(f"No write permission on path {cloud_path}")
 795            return False
 796        except GoogleAPICallError as e:
 797            logging.warning(f"Error testing write access to {cloud_path}: {e}")
 798            return False
 799
 800    def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
 801        """
 802        Wait for write permissions on a GCP path, checking at regular intervals.
 803
 804        This method will periodically check if the user has write permission on the specified cloud path.
 805        It will continue checking until either write permission is granted or the maximum wait time is reached.
 806
 807        **Args:**
 808        - cloud_path (str): The GCS path to check for write permissions.
 809        - interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
 810        - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
 811
 812        **Returns:**
 813        - bool: True if write permission is granted within the wait time, False otherwise.
 814        """
 815        if not cloud_path.startswith("gs://"):
 816            raise ValueError("cloud_path must start with 'gs://'")
 817
 818        # Convert minutes to seconds for the sleep function
 819        interval_seconds = interval_wait_time_minutes * 60
 820        max_wait_seconds = max_wait_time_minutes * 60
 821
 822        start_time = time.time()
 823        attempt_number = 1
 824
 825        logging.info(
 826            f"Starting to check for write permissions on {cloud_path}. Will check "
 827            f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).")
 828
 829        # First check immediately
 830        if self.has_write_permission(cloud_path):
 831            logging.info(f"Write permission confirmed on initial check for {cloud_path}")
 832            return
 833
 834        # If first check fails, start periodic checks
 835        while time.time() - start_time < max_wait_seconds:
 836            elapsed_minutes = (time.time() - start_time) / 60
 837            remaining_minutes = max_wait_time_minutes - elapsed_minutes
 838
 839            logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. "
 840                         f"Time elapsed: {elapsed_minutes:.1f} minute(s). "
 841                         f"Time remaining: {remaining_minutes:.1f} minute(s).")
 842
 843            # Sleep for the interval duration
 844            time.sleep(interval_seconds)
 845
 846            attempt_number += 1
 847            logging.info(f"Checking write permissions (attempt {attempt_number})...")
 848
 849            if self.has_write_permission(cloud_path):
 850                elapsed_minutes = (time.time() - start_time) / 60
 851                logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}")
 852                return
 853
 854        # If we get here, we've exceeded the maximum wait time
 855        raise PermissionError(
 856            f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for "
 857            f"{cloud_path} after {attempt_number} attempts.")
 858
 859    def _load_blob_full_path_to_bucket_contents_dict(self, full_path: str, file_name_only: bool) -> dict:
 860        """Load a blob from a full GCS path and convert it to a bucket-contents dict.
 861
 862        This is intended for use with multithreaded batch operations.
 863
 864        Raises:
 865            ValueError: If the blob does not exist.
 866        """
 867        file_path_components = self._process_cloud_path(full_path)
 868        blob = self.load_blob_from_full_path(full_path)
 869
 870        # load_blob_from_full_path reloads metadata when exists() is True.
 871        # Ensure the object exists so downstream consumers always get a valid dict.
 872        if not blob.exists():
 873            raise ValueError(f"Blob does not exist: {full_path}")
 874
 875        return self._create_bucket_contents_dict(
 876            bucket_name=file_path_components["bucket"],
 877            blob=blob,
 878            file_name_only=file_name_only
 879        )
 880
 881    def check_files_exist_multithreaded(
 882            self,
 883            full_paths: list[str],
 884            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 885            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 886            job_complete_for_logging: int = 500
 887    ) -> dict[str, bool]:
 888        """Check existence of multiple GCS files in parallel.
 889
 890        **Args:**
 891        - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to check.
 892        - workers (int, optional): Number of worker threads. Defaults to `10`.
 893        - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`.
 894        - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`.
 895
 896        **Returns:**
 897        - dict[str, bool]: A dictionary where each key is a GCS path and the value is ``True`` if the file exists,
 898          ``False`` otherwise.
 899        """
 900        # _check_file_exists_in_gcs is needed because check_file_exists returns a plain bool, which would make it
 901        # impossible to associate each result back to its path after the threads complete. By wrapping
 902        # it here we return both the path and the result together, so the final dict comprehension
 903        # can correctly map path -> bool regardless of the order results come back from the thread pool.
 904        def _check_file_exists_in_gcs(path: str) -> dict:
 905            return {"path": path, "exists": self.check_file_exists(path)}
 906
 907        jobs = [[path] for path in full_paths]
 908
 909        results = MultiThreadedJobs().run_multi_threaded_job(
 910            workers=workers,
 911            function=_check_file_exists_in_gcs,
 912            list_of_jobs_args_list=jobs,
 913            collect_output=True,
 914            max_retries=max_retries,
 915            fail_on_error=True,
 916            jobs_complete_for_logging=job_complete_for_logging
 917        )
 918        return {item["path"]: item["exists"] for item in results}  # type: ignore[union-attr]
 919
 920    def read_files_multithreaded(
 921            self,
 922            full_paths: list[str],
 923            encoding: str = "utf-8",
 924            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 925            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 926            job_complete_for_logging: int = 500
 927    ) -> dict[str, str]:
 928        """Read the contents of multiple GCS files in parallel.
 929
 930        **Args:**
 931        - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to read.
 932        - encoding (str, optional): The encoding to use when decoding file contents. Defaults to `utf-8`.
 933        - workers (int, optional): Number of worker threads. Defaults to `10`.
 934        - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`.
 935        - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`.
 936
 937        **Returns:**
 938        - dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string.
 939        """
 940        # _read_file_contents_from_gcs is needed because read_file returns a plain str, which would make it
 941        # impossible to associate each result back to its path after the threads complete. By wrapping
 942        # it here we return both the path and the contents together, so the final dict comprehension
 943        # can correctly map path -> contents regardless of the order results come back from the thread pool.
 944        def _read_file_contents_from_gcs(path: str) -> dict:
 945            return {"path": path, "contents": self.read_file(path, encoding=encoding)}
 946
 947        jobs = [[path] for path in full_paths]
 948
 949        results = MultiThreadedJobs().run_multi_threaded_job(
 950            workers=workers,
 951            function=_read_file_contents_from_gcs,
 952            list_of_jobs_args_list=jobs,
 953            collect_output=True,
 954            max_retries=max_retries,
 955            fail_on_error=True,
 956            jobs_complete_for_logging=job_complete_for_logging
 957        )
 958        return {item["path"]: item["contents"] for item in results}  # type: ignore[union-attr]
 959
 960    def load_blobs_from_full_paths_multithreaded(
 961            self,
 962            full_paths: list[str],
 963            file_name_only: bool = False,
 964            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 965            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 966            job_complete_for_logging: int = 500
 967    ) -> list[dict]:
 968        """Load multiple blobs in parallel from a list of full GCS paths.
 969
 970        Args:
 971            full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object).
 972            file_name_only: Whether to return only the file path in each dict.
 973            workers: Number of worker threads.
 974            max_retries: Maximum number of retries per job.
 975            job_complete_for_logging: Emit progress logs every N completed jobs.
 976
 977        Returns:
 978            List of dictionaries shaped like :meth:`_create_bucket_contents_dict`.
 979
 980        Raises:
 981            Exception: If any blob cannot be loaded / does not exist / returns no output.
 982        """
 983        if not full_paths:
 984            return []
 985
 986        # De-duplicate input paths and keep order
 987        unique_paths = list(dict.fromkeys(full_paths))
 988        jobs = [(path, file_name_only) for path in unique_paths]
 989
 990        results = MultiThreadedJobs().run_multi_threaded_job(
 991            workers=workers,
 992            function=self._load_blob_full_path_to_bucket_contents_dict,
 993            list_of_jobs_args_list=jobs,
 994            collect_output=True,
 995            max_retries=max_retries,
 996            fail_on_error=True,
 997            jobs_complete_for_logging=job_complete_for_logging
 998        )
 999
1000        # If any jobs failed, MultiThreadedJobs would have raised. Still defensively validate output.
1001        if results is None or (unique_paths and (None in results)):  # type: ignore[operator]
1002            raise Exception("Failed to load all blobs")
1003
1004        if len(results) != len(unique_paths) or any(not item for item in results):  # type: ignore[arg-type]
1005            raise Exception("Failed to load all blobs")
1006
1007        return results  # type: ignore[return-value]
MOVE = 'move'

Variable to be used for the "action" when files should be moved.

COPY = 'copy'

Variable to be used for the "action" when files should be copied.

MD5_HEX = 'hex'

Variable to be used when the generated md5 should be of the hex type.

MD5_BASE64 = 'base64'

Variable to be used when the generated md5 should be of the base64 type.

class GCPCloudFunctions:
  34class GCPCloudFunctions:
  35    """Class to handle GCP Cloud Functions."""
  36
  37    def __init__(
  38            self,
  39            project: Optional[str] = None,
  40            service_account_json: Optional[str] = None
  41    ) -> None:
  42        """
  43        Initialize the GCPCloudFunctions class.
  44
  45        Authenticates using service account JSON if provided or default credentials,
  46        and sets up the Storage Client.
  47
  48        Args:
  49            project: Optional[str] = None
  50                The GCP project ID. If not provided, will use project from service account or default.
  51            service_account_json: Optional[str] = None
  52                Path to service account JSON key file. If provided, will use these credentials.
  53        """
  54        # Initialize credentials and project
  55        credentials = None
  56        default_project = None
  57
  58        if service_account_json:
  59            credentials = service_account.Credentials.from_service_account_file(service_account_json)
  60            # Extract project from service account if not specified
  61            if not project:
  62                with open(service_account_json, 'r') as f:
  63                    sa_info = json.load(f)
  64                    project = sa_info.get('project_id')
  65        else:
  66            # Use default credentials
  67            credentials, default_project = default()
  68
  69        # Set project if not already set
  70        if not project:
  71            project = default_project
  72
  73        self.client = storage.Client(credentials=credentials, project=project)
  74        """@private"""
  75
  76    @staticmethod
  77    def _process_cloud_path(cloud_path: str) -> dict:
  78        """
  79        Process a GCS cloud path into its components.
  80
  81        Args:
  82            cloud_path (str): The GCS cloud path.
  83
  84        Returns:
  85            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
  86        """
  87        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
  88        bucket_name = str.split(remaining_url, sep="/")[0]
  89        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
  90        path_components = {
  91            "platform_prefix": platform_prefix,
  92            "bucket": bucket_name,
  93            "blob_url": blob_name
  94        }
  95        return path_components
  96
  97    def load_blob_from_full_path(self, full_path: str) -> Blob:
  98        """
  99        Load a GCS blob object from a full GCS path.
 100
 101        **Args:**
 102        - full_path (str): The full GCS path.
 103
 104        **Returns:**
 105        - google.cloud.storage.blob.Blob: The GCS blob object.
 106        """
 107        file_path_components = self._process_cloud_path(full_path)
 108
 109        # Specify the billing project
 110        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
 111        blob = bucket.blob(file_path_components["blob_url"])
 112
 113        # If blob exists in GCS reload it so metadata is there
 114        if blob.exists():
 115            blob.reload()
 116        return blob
 117
 118    def check_file_exists(self, full_path: str) -> bool:
 119        """
 120        Check if a file exists in GCS.
 121
 122        **Args:**
 123        - full_path (str): The full GCS path.
 124
 125        **Returns:**
 126        - bool: `True` if the file exists, `False` otherwise.
 127        """
 128        blob = self.load_blob_from_full_path(full_path)
 129        return blob.exists()
 130
 131    @staticmethod
 132    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
 133        """
 134        Create a dictionary containing file information.
 135
 136        Args:
 137            bucket_name (str): The name of the GCS bucket.
 138            blob (Any): The GCS blob object.
 139            file_name_only (bool): Whether to return only the file list.
 140
 141        Returns:
 142            dict: A dictionary containing file information.
 143        """
 144        if file_name_only:
 145            return {
 146                "path": f"gs://{bucket_name}/{blob.name}"
 147            }
 148        return {
 149            "name": os.path.basename(blob.name),
 150            "path": f"gs://{bucket_name}/{blob.name}",
 151            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
 152            "file_extension": os.path.splitext(blob.name)[1],
 153            "size_in_bytes": blob.size,
 154            "md5_hash": blob.md5_hash,
 155            "time_created": blob.time_created.isoformat() if blob.time_created else None,
 156            "last_modified": blob.updated.isoformat() if blob.updated else None,
 157        }
 158
 159    @staticmethod
 160    def _validate_include_blob(
 161            blob: Any,
 162            bucket_name: str,
 163            file_extensions_to_ignore: list[str] = [],
 164            file_strings_to_ignore: list[str] = [],
 165            file_extensions_to_include: list[str] = [],
 166            verbose: bool = False
 167    ) -> bool:
 168        """
 169        Validate if a blob should be included based on its file extension.
 170
 171        Args:
 172            file_extensions_to_include (list[str]): List of file extensions to include.
 173            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
 174            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
 175            blob (Any): The GCS blob object.
 176            verbose (bool): Whether to log files not being included.
 177
 178        Returns:
 179            bool: True if the blob should be included, False otherwise.
 180        """
 181        file_path = f"gs://{bucket_name}/{blob.name}"
 182        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
 183            if verbose:
 184                logging.info(f"Skipping {file_path} as it has an extension to ignore")
 185            return False
 186        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
 187            if verbose:
 188                logging.info(f"Skipping {file_path} as it does not have an extension to include")
 189            return False
 190        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
 191            if verbose:
 192                logging.info(f"Skipping {file_path} as it has a string to ignore")
 193            return False
 194        return True
 195
 196    def list_bucket_contents(
 197            self,
 198            bucket_name: str,
 199            prefix: Optional[str] = None,
 200            file_extensions_to_ignore: list[str] = [],
 201            file_strings_to_ignore: list[str] = [],
 202            file_extensions_to_include: list[str] = [],
 203            file_name_only: bool = False,
 204            verbose: bool = False,
 205            log_progress_interval: int = 10000
 206    ) -> list[dict]:
 207        """
 208        List contents of a GCS bucket and return a list of dictionaries with file information.
 209
 210        **Args:**
 211        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
 212        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
 213        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
 214        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
 215        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
 216        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
 217        - verbose (bool, optional): Whether to log files not being included. Defaults to `False`.
 218        - log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to `10000`.
 219
 220        **Returns:**
 221        - list[dict]: A list of dictionaries containing file information.
 222        """
 223        # If the bucket name starts with gs://, remove it
 224        if bucket_name.startswith("gs://"):
 225            bucket_name = bucket_name.split("/")[2].strip()
 226
 227        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
 228
 229        # Get the bucket object and set user_project for Requester Pays
 230        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
 231
 232        # list_blobs returns a lazy iterator — no network calls happen until we iterate.
 233        # page_size=1000 is the GCS maximum, minimising the number of paginated round-trips.
 234        prefix_msg = f" with prefix '{prefix}'" if prefix else ""
 235        logging.info(f"Starting to page through blobs in bucket '{bucket_name}'{prefix_msg}...")
 236        blobs = bucket.list_blobs(prefix=prefix, page_size=1000)
 237
 238        # Iterate with progress logging so large buckets don't appear stuck
 239        file_list = []
 240        for blob in blobs:
 241            if blob.name.endswith("/"):
 242                continue
 243            if not self._validate_include_blob(
 244                blob=blob,
 245                file_extensions_to_ignore=file_extensions_to_ignore,
 246                file_strings_to_ignore=file_strings_to_ignore,
 247                file_extensions_to_include=file_extensions_to_include,
 248                bucket_name=bucket_name
 249            ):
 250                continue
 251            file_list.append(
 252                self._create_bucket_contents_dict(
 253                    blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
 254                )
 255            )
 256            if verbose and len(file_list) % log_progress_interval == 0:
 257                logging.info(f"Processed {len(file_list):,} files so far...")
 258
 259        logging.info(f"Found {len(file_list):,} files in bucket")
 260        return file_list
 261
 262    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
 263        """
 264        Copy a file from one GCS location to another.
 265
 266        **Args:**
 267        - src_cloud_path (str): The source GCS path.
 268        - full_destination_path (str): The destination GCS path.
 269        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
 270        """
 271        try:
 272            src_blob = self.load_blob_from_full_path(src_cloud_path)
 273            dest_blob = self.load_blob_from_full_path(full_destination_path)
 274
 275            # Use rewrite so no timeouts
 276            rewrite_token = False
 277
 278            while True:
 279                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
 280                    src_blob, token=rewrite_token
 281                )
 282                if verbose:
 283                    logging.info(
 284                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
 285                    )
 286                if not rewrite_token:
 287                    break
 288
 289        except Exception as e:
 290            logging.error(
 291                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
 292                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
 293            )
 294            raise
 295
 296    def delete_cloud_file(self, full_cloud_path: str) -> None:
 297        """
 298        Delete a file from GCS.
 299
 300        **Args:**
 301        - full_cloud_path (str): The GCS path of the file to delete.
 302        """
 303        blob = self.load_blob_from_full_path(full_cloud_path)
 304        blob.delete()
 305
 306    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
 307        """
 308        Move a file from one GCS location to another.
 309
 310        **Args:**
 311        - src_cloud_path (str): The source GCS path.
 312        - full_destination_path (str): The destination GCS path.
 313        """
 314        self.copy_cloud_file(src_cloud_path, full_destination_path)
 315        self.delete_cloud_file(src_cloud_path)
 316
 317    def get_filesize(self, target_path: str) -> int:
 318        """
 319        Get the size of a file in GCS.
 320
 321        **Args:**
 322        - target_path (str): The GCS path of the file.
 323
 324        **Returns:**
 325        - int: The size of the file in bytes.
 326        """
 327        blob = self.load_blob_from_full_path(target_path)
 328        return blob.size
 329
 330    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
 331        """
 332        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
 333
 334        **Args:**
 335        - src_cloud_path (str): The source GCS path.
 336        - dest_cloud_path (str): The destination GCS path.
 337
 338        **Returns:**
 339        - bool: `True` if the files are identical, `False` otherwise.
 340        """
 341        src_blob = self.load_blob_from_full_path(src_cloud_path)
 342        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
 343
 344        # If either blob is None or does not exist
 345        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
 346            return False
 347        # If the MD5 hashes exist
 348        if src_blob.md5_hash and dest_blob.md5_hash:
 349            # And are the same return True
 350            if src_blob.md5_hash == dest_blob.md5_hash:
 351                return True
 352        else:
 353            # If md5 do not exist (for larger files they may not) check size matches
 354            if src_blob.size == dest_blob.size:
 355                return True
 356        # Otherwise, return False
 357        return False
 358
 359    def delete_multiple_files(
 360            self,
 361            files_to_delete: list[str],
 362            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 363            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 364            verbose: bool = False,
 365            job_complete_for_logging: int = 500
 366    ) -> None:
 367        """
 368        Delete multiple cloud files in parallel using multi-threading.
 369
 370        **Args:**
 371        - files_to_delete (list[str]): List of GCS paths of the files to delete.
 372        - workers (int, optional): Number of worker threads. Defaults to `10`.
 373        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
 374        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
 375        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
 376        """
 377        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
 378
 379        MultiThreadedJobs().run_multi_threaded_job(
 380            workers=workers,
 381            function=self.delete_cloud_file,
 382            list_of_jobs_args_list=list_of_jobs_args_list,
 383            max_retries=max_retries,
 384            fail_on_error=True,
 385            verbose=verbose,
 386            collect_output=False,
 387            jobs_complete_for_logging=job_complete_for_logging
 388        )
 389
 390    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
 391        """
 392        Validate if source and destination files are identical.
 393
 394        **Args:**
 395        - source_file (str): The source file path.
 396        - full_destination_path (str): The destination file path.
 397
 398        **Returns:**
 399            dict: The file dictionary of the files with a boolean indicating if they are identical.
 400        """
 401        if self.validate_files_are_same(source_file, full_destination_path):
 402            identical = True
 403        else:
 404            identical = False
 405        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
 406
 407    def loop_and_log_validation_files_multithreaded(
 408            self,
 409            files_to_validate: list[dict],
 410            log_difference: bool,
 411            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 412            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 413            job_complete_for_logging: int = 500
 414    ) -> list[dict]:
 415        """
 416        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
 417
 418        **Args:**
 419        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
 420        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
 421                                   this at the start of a copy/move operation to check if files are already copied.
 422        - workers (int, optional): Number of worker threads. Defaults to `10`.
 423        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
 424        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
 425
 426        **Returns:**
 427        - list[dict]: List of dictionaries containing files that are **not** identical.
 428        """
 429        logging.info(f"Validating if {len(files_to_validate)} files are identical")
 430
 431        # Prepare jobs: pass the necessary arguments to each validation
 432        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
 433
 434        # Use multithreaded job runner to validate the files
 435        checked_files = MultiThreadedJobs().run_multi_threaded_job(
 436            workers=workers,
 437            function=self._validate_file_pair,
 438            list_of_jobs_args_list=jobs,
 439            collect_output=True,
 440            max_retries=max_retries,
 441            jobs_complete_for_logging=job_complete_for_logging
 442        )
 443        # If any files failed to load, raise an exception
 444        if files_to_validate and None in checked_files:  # type: ignore[operator]
 445            logging.error("Failed to validate all files, could not load some blobs")
 446            raise Exception("Failed to validate all files")
 447
 448        # Get all files that are not identical
 449        not_identical_files = [
 450            file_dict
 451            for file_dict in checked_files  # type: ignore[operator, union-attr]
 452            if not file_dict['identical']
 453        ]
 454        if not_identical_files:
 455            if log_difference:
 456                for file_dict in not_identical_files:
 457                    logging.warning(
 458                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
 459                    )
 460            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
 461        return not_identical_files
 462
 463    def multithread_copy_of_files_with_validation(
 464            self,
 465            files_to_copy: list[dict],
 466            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 467            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 468            skip_check_if_already_copied: bool = False
 469    ) -> None:
 470        """
 471        Copy multiple files in parallel with validation.
 472
 473        **Args:**
 474        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
 475                Dictionary should have keys `source_file` and `full_destination_path`
 476        - workers (int): Number of worker threads. Defaults to `10`.
 477        - max_retries (int): Maximum number of retries. Defaults to `5`
 478        - skip_check_if_already_copied (bool, optional): Whether to skip checking
 479                if files are already copied and start copying right away. Defaults to `False`.
 480        """
 481        if skip_check_if_already_copied:
 482            logging.info("Skipping check if files are already copied")
 483            updated_files_to_move = files_to_copy
 484        else:
 485            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
 486                files_to_copy,
 487                log_difference=False,
 488                workers=workers,
 489                max_retries=max_retries
 490            )
 491        # If all files are already copied, return
 492        if not updated_files_to_move:
 493            logging.info("All files are already copied")
 494            return None
 495        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
 496        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
 497        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
 498        # Validate that all files were copied successfully
 499        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
 500            files_to_copy,
 501            workers=workers,
 502            log_difference=True,
 503            max_retries=max_retries
 504        )
 505        if files_not_moved_successfully:
 506            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
 507            raise Exception("Failed to copy all files")
 508        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
 509        return None
 510
 511    def move_or_copy_multiple_files(
 512            self, files_to_move: list[dict],
 513            action: str,
 514            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 515            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 516            verbose: bool = False,
 517            jobs_complete_for_logging: int = 500
 518    ) -> None:
 519        """
 520        Move or copy multiple files in parallel.
 521
 522        **Args:**
 523        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
 524        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
 525        or `ops_utils.gcp_utils.COPY`).
 526        - workers (int): Number of worker threads. Defaults to `10`.
 527        - max_retries (int): Maximum number of retries. Defaults to `5`.
 528        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
 529        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
 530
 531        **Raises:**
 532        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
 533        """
 534        if action == MOVE:
 535            cloud_function = self.move_cloud_file
 536        elif action == COPY:
 537            cloud_function = self.copy_cloud_file
 538        else:
 539            raise ValueError("Must either select move or copy")
 540
 541        list_of_jobs_args_list = [
 542            [
 543                file_dict['source_file'], file_dict['full_destination_path']
 544            ]
 545            for file_dict in files_to_move
 546        ]
 547        MultiThreadedJobs().run_multi_threaded_job(
 548            workers=workers,
 549            function=cloud_function,
 550            list_of_jobs_args_list=list_of_jobs_args_list,
 551            max_retries=max_retries,
 552            fail_on_error=True,
 553            verbose=verbose,
 554            collect_output=False,
 555            jobs_complete_for_logging=jobs_complete_for_logging
 556        )
 557
 558    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
 559        """
 560        Read the content of a file from GCS.
 561
 562        **Args:**
 563        - cloud_path (str): The GCS path of the file to read.
 564        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
 565
 566        **Returns:**
 567        - bytes: The content of the file as bytes.
 568        """
 569        blob = self.load_blob_from_full_path(cloud_path)
 570        # Download the file content as bytes
 571        content_bytes = blob.download_as_bytes()
 572        # Convert bytes to string
 573        content_str = content_bytes.decode(encoding)
 574        return content_str
 575
 576    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
 577        """
 578        Upload a file to GCS.
 579
 580        **Args:**
 581        - destination_path (str): The destination GCS path.
 582        - source_file (str): The source file path.
 583        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
 584        """
 585        blob = self.load_blob_from_full_path(destination_path)
 586        if custom_metadata:
 587            blob.metadata = custom_metadata
 588        blob.upload_from_filename(source_file)
 589
 590    def get_object_md5(
 591        self,
 592        file_path: str,
 593        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
 594        chunk_size: int = parse_size("256 KB"),
 595        logging_bytes: int = parse_size("1 GB"),
 596        returned_md5_format: str = "hex"
 597    ) -> str:
 598        """
 599        Calculate the MD5 checksum of a file in GCS.
 600
 601        **Args:**
 602        - file_path (str): The GCS path of the file.
 603        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
 604        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
 605        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
 606                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
 607
 608        **Returns:**
 609        - str: The MD5 checksum of the file.
 610
 611        **Raises:**
 612        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
 613        or `ops_utils.gcp_utils.MD5_BASE64`
 614        """
 615        if returned_md5_format not in ["hex", "base64"]:
 616            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
 617
 618        blob = self.load_blob_from_full_path(file_path)
 619
 620        # Create an MD5 hash object
 621        md5_hash = hashlib.md5()
 622
 623        blob_size_str = format_size(blob.size)
 624        logging.info(f"Streaming {file_path} which is {blob_size_str}")
 625        # Use a BytesIO stream to collect data in chunks and upload it
 626        buffer = io.BytesIO()
 627        total_bytes_streamed = 0
 628        # Keep track of the last logged size for data logging
 629        last_logged = 0
 630
 631        with blob.open("rb") as source_stream:
 632            while True:
 633                chunk = source_stream.read(chunk_size)
 634                if not chunk:
 635                    break
 636                md5_hash.update(chunk)
 637                buffer.write(chunk)
 638                total_bytes_streamed += len(chunk)
 639                # Log progress every 1 gb if verbose used
 640                if total_bytes_streamed - last_logged >= logging_bytes:
 641                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
 642                    last_logged = total_bytes_streamed
 643
 644        if returned_md5_format == "hex":
 645            md5 = md5_hash.hexdigest()
 646            logging.info(f"MD5 (hex) for {file_path}: {md5}")
 647        elif returned_md5_format == "base64":
 648            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
 649            logging.info(f"MD5 (base64) for {file_path}: {md5}")
 650        return md5
 651
 652    def set_acl_public_read(self, cloud_path: str) -> None:
 653        """
 654        Set the file in the bucket to be publicly readable.
 655
 656        **Args:**
 657        - cloud_path (str): The GCS path of the file to be set as public readable.
 658        """
 659        blob = self.load_blob_from_full_path(cloud_path)
 660        blob.acl.all().grant_read()
 661        blob.acl.save()
 662
 663    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
 664        """
 665        Set the file in the bucket to grant OWNER permission to a specific group.
 666
 667        **Args:**
 668        - cloud_path (str): The GCS path of the file.
 669        - group_email (str): The email of the group to grant OWNER permission
 670        """
 671        blob = self.load_blob_from_full_path(cloud_path)
 672        blob.acl.group(group_email).grant_owner()
 673        blob.acl.save()
 674
 675    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
 676        """
 677        Set Cache-Control metadata for a file.
 678
 679        **Args:**
 680        - cloud_path (str): The GCS path of the file.
 681        - cache_control (str): The Cache-Control metadata to set.
 682        """
 683        blob = self.load_blob_from_full_path(cloud_path)
 684        blob.cache_control = cache_control
 685        blob.patch()
 686
 687    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
 688        """
 689        Get the most recent blob in the bucket.
 690        
 691        If the blob with the most recent timestamp doesn't have
 692        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
 693        recent file until a log with useful information is encountered. This is useful when combing through
 694        GCP activity logs for Terra workspace buckets.
 695
 696        **Args:**
 697        - bucket_name (str): The GCS bucket name.
 698
 699        **Returns:**
 700        - Optional tuple of the blob found and the file contents from the blob
 701        """
 702        blobs = sorted(
 703            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
 704        )
 705        for blob in blobs:
 706            # Download the file contents as a string
 707            file_contents = blob.download_as_text()
 708
 709            # Check if the content matches the undesired format
 710            lines = file_contents.splitlines()
 711            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
 712                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
 713                continue
 714
 715            # If it doesn't match the undesired format, return its content
 716            logging.info(f"Found valid file: {blob.name}")
 717            return blob, file_contents
 718
 719        logging.info("No valid files found.")
 720        return None
 721
 722    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
 723        """
 724        Write content to a file in GCS.
 725
 726        **Args:**
 727        - cloud_path (str): The GCS path of the file to write.
 728        - file_contents (str): The content to write.
 729        """
 730        blob = self.load_blob_from_full_path(cloud_path)
 731        blob.upload_from_string(file_contents)
 732        logging.info(f"Successfully wrote content to {cloud_path}")
 733
 734    @staticmethod
 735    def get_active_gcloud_account() -> str:
 736        """
 737        Get the active GCP email for the current account.
 738
 739        **Returns:**
 740        - str: The active GCP account email.
 741        """
 742        result = subprocess.run(
 743            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
 744            capture_output=True,
 745            text=True,
 746            check=True
 747        )
 748        return result.stdout.strip()
 749
 750    def has_write_permission(self, cloud_path: str) -> bool:
 751        """
 752        Check if the current user has permission to write to a GCP path.
 753
 754        This method tests write access by attempting to update the metadata
 755        of an existing blob or create a zero-byte temporary file if the blob
 756        doesn't exist. The temporary file is deleted immediately if created.
 757
 758        **Args:**
 759        - cloud_path (str): The GCS path to check for write permissions.
 760
 761        **Returns:**
 762        - bool: True if the user has write permission, False otherwise.
 763        """
 764        if not cloud_path.startswith("gs://"):
 765            raise ValueError("cloud_path must start with 'gs://'")
 766        if cloud_path.endswith("/"):
 767            logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp")
 768            cloud_path = f"{cloud_path}permission_test_temp"
 769        try:
 770            blob = self.load_blob_from_full_path(cloud_path)
 771            if blob.exists():
 772                # Try updating metadata (doesn't change the content)
 773                original_metadata = blob.metadata or {}
 774                test_metadata = original_metadata.copy()
 775                test_metadata["_write_permission_test"] = "true"
 776
 777                blob.metadata = test_metadata
 778                blob.patch()
 779
 780                # Restore the original metadata
 781                blob.metadata = original_metadata
 782                blob.patch()
 783
 784                logging.info(f"Write permission confirmed for existing blob {cloud_path}")
 785                return True
 786            else:
 787                # Try writing a temporary file to the bucket
 788                blob.upload_from_string("")
 789
 790                # Clean up the test file
 791                blob.delete()
 792                logging.info(f"Write permission confirmed for {cloud_path}")
 793                return True
 794        except Forbidden:
 795            logging.warning(f"No write permission on path {cloud_path}")
 796            return False
 797        except GoogleAPICallError as e:
 798            logging.warning(f"Error testing write access to {cloud_path}: {e}")
 799            return False
 800
 801    def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
 802        """
 803        Wait for write permissions on a GCP path, checking at regular intervals.
 804
 805        This method will periodically check if the user has write permission on the specified cloud path.
 806        It will continue checking until either write permission is granted or the maximum wait time is reached.
 807
 808        **Args:**
 809        - cloud_path (str): The GCS path to check for write permissions.
 810        - interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
 811        - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
 812
 813        **Returns:**
 814        - bool: True if write permission is granted within the wait time, False otherwise.
 815        """
 816        if not cloud_path.startswith("gs://"):
 817            raise ValueError("cloud_path must start with 'gs://'")
 818
 819        # Convert minutes to seconds for the sleep function
 820        interval_seconds = interval_wait_time_minutes * 60
 821        max_wait_seconds = max_wait_time_minutes * 60
 822
 823        start_time = time.time()
 824        attempt_number = 1
 825
 826        logging.info(
 827            f"Starting to check for write permissions on {cloud_path}. Will check "
 828            f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).")
 829
 830        # First check immediately
 831        if self.has_write_permission(cloud_path):
 832            logging.info(f"Write permission confirmed on initial check for {cloud_path}")
 833            return
 834
 835        # If first check fails, start periodic checks
 836        while time.time() - start_time < max_wait_seconds:
 837            elapsed_minutes = (time.time() - start_time) / 60
 838            remaining_minutes = max_wait_time_minutes - elapsed_minutes
 839
 840            logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. "
 841                         f"Time elapsed: {elapsed_minutes:.1f} minute(s). "
 842                         f"Time remaining: {remaining_minutes:.1f} minute(s).")
 843
 844            # Sleep for the interval duration
 845            time.sleep(interval_seconds)
 846
 847            attempt_number += 1
 848            logging.info(f"Checking write permissions (attempt {attempt_number})...")
 849
 850            if self.has_write_permission(cloud_path):
 851                elapsed_minutes = (time.time() - start_time) / 60
 852                logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}")
 853                return
 854
 855        # If we get here, we've exceeded the maximum wait time
 856        raise PermissionError(
 857            f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for "
 858            f"{cloud_path} after {attempt_number} attempts.")
 859
 860    def _load_blob_full_path_to_bucket_contents_dict(self, full_path: str, file_name_only: bool) -> dict:
 861        """Load a blob from a full GCS path and convert it to a bucket-contents dict.
 862
 863        This is intended for use with multithreaded batch operations.
 864
 865        Raises:
 866            ValueError: If the blob does not exist.
 867        """
 868        file_path_components = self._process_cloud_path(full_path)
 869        blob = self.load_blob_from_full_path(full_path)
 870
 871        # load_blob_from_full_path reloads metadata when exists() is True.
 872        # Ensure the object exists so downstream consumers always get a valid dict.
 873        if not blob.exists():
 874            raise ValueError(f"Blob does not exist: {full_path}")
 875
 876        return self._create_bucket_contents_dict(
 877            bucket_name=file_path_components["bucket"],
 878            blob=blob,
 879            file_name_only=file_name_only
 880        )
 881
 882    def check_files_exist_multithreaded(
 883            self,
 884            full_paths: list[str],
 885            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 886            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 887            job_complete_for_logging: int = 500
 888    ) -> dict[str, bool]:
 889        """Check existence of multiple GCS files in parallel.
 890
 891        **Args:**
 892        - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to check.
 893        - workers (int, optional): Number of worker threads. Defaults to `10`.
 894        - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`.
 895        - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`.
 896
 897        **Returns:**
 898        - dict[str, bool]: A dictionary where each key is a GCS path and the value is ``True`` if the file exists,
 899          ``False`` otherwise.
 900        """
 901        # _check_file_exists_in_gcs is needed because check_file_exists returns a plain bool, which would make it
 902        # impossible to associate each result back to its path after the threads complete. By wrapping
 903        # it here we return both the path and the result together, so the final dict comprehension
 904        # can correctly map path -> bool regardless of the order results come back from the thread pool.
 905        def _check_file_exists_in_gcs(path: str) -> dict:
 906            return {"path": path, "exists": self.check_file_exists(path)}
 907
 908        jobs = [[path] for path in full_paths]
 909
 910        results = MultiThreadedJobs().run_multi_threaded_job(
 911            workers=workers,
 912            function=_check_file_exists_in_gcs,
 913            list_of_jobs_args_list=jobs,
 914            collect_output=True,
 915            max_retries=max_retries,
 916            fail_on_error=True,
 917            jobs_complete_for_logging=job_complete_for_logging
 918        )
 919        return {item["path"]: item["exists"] for item in results}  # type: ignore[union-attr]
 920
 921    def read_files_multithreaded(
 922            self,
 923            full_paths: list[str],
 924            encoding: str = "utf-8",
 925            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 926            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 927            job_complete_for_logging: int = 500
 928    ) -> dict[str, str]:
 929        """Read the contents of multiple GCS files in parallel.
 930
 931        **Args:**
 932        - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to read.
 933        - encoding (str, optional): The encoding to use when decoding file contents. Defaults to `utf-8`.
 934        - workers (int, optional): Number of worker threads. Defaults to `10`.
 935        - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`.
 936        - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`.
 937
 938        **Returns:**
 939        - dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string.
 940        """
 941        # _read_file_contents_from_gcs is needed because read_file returns a plain str, which would make it
 942        # impossible to associate each result back to its path after the threads complete. By wrapping
 943        # it here we return both the path and the contents together, so the final dict comprehension
 944        # can correctly map path -> contents regardless of the order results come back from the thread pool.
 945        def _read_file_contents_from_gcs(path: str) -> dict:
 946            return {"path": path, "contents": self.read_file(path, encoding=encoding)}
 947
 948        jobs = [[path] for path in full_paths]
 949
 950        results = MultiThreadedJobs().run_multi_threaded_job(
 951            workers=workers,
 952            function=_read_file_contents_from_gcs,
 953            list_of_jobs_args_list=jobs,
 954            collect_output=True,
 955            max_retries=max_retries,
 956            fail_on_error=True,
 957            jobs_complete_for_logging=job_complete_for_logging
 958        )
 959        return {item["path"]: item["contents"] for item in results}  # type: ignore[union-attr]
 960
 961    def load_blobs_from_full_paths_multithreaded(
 962            self,
 963            full_paths: list[str],
 964            file_name_only: bool = False,
 965            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 966            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 967            job_complete_for_logging: int = 500
 968    ) -> list[dict]:
 969        """Load multiple blobs in parallel from a list of full GCS paths.
 970
 971        Args:
 972            full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object).
 973            file_name_only: Whether to return only the file path in each dict.
 974            workers: Number of worker threads.
 975            max_retries: Maximum number of retries per job.
 976            job_complete_for_logging: Emit progress logs every N completed jobs.
 977
 978        Returns:
 979            List of dictionaries shaped like :meth:`_create_bucket_contents_dict`.
 980
 981        Raises:
 982            Exception: If any blob cannot be loaded / does not exist / returns no output.
 983        """
 984        if not full_paths:
 985            return []
 986
 987        # De-duplicate input paths and keep order
 988        unique_paths = list(dict.fromkeys(full_paths))
 989        jobs = [(path, file_name_only) for path in unique_paths]
 990
 991        results = MultiThreadedJobs().run_multi_threaded_job(
 992            workers=workers,
 993            function=self._load_blob_full_path_to_bucket_contents_dict,
 994            list_of_jobs_args_list=jobs,
 995            collect_output=True,
 996            max_retries=max_retries,
 997            fail_on_error=True,
 998            jobs_complete_for_logging=job_complete_for_logging
 999        )
1000
1001        # If any jobs failed, MultiThreadedJobs would have raised. Still defensively validate output.
1002        if results is None or (unique_paths and (None in results)):  # type: ignore[operator]
1003            raise Exception("Failed to load all blobs")
1004
1005        if len(results) != len(unique_paths) or any(not item for item in results):  # type: ignore[arg-type]
1006            raise Exception("Failed to load all blobs")
1007
1008        return results  # type: ignore[return-value]

Class to handle GCP Cloud Functions.

GCPCloudFunctions( project: Optional[str] = None, service_account_json: Optional[str] = None)
37    def __init__(
38            self,
39            project: Optional[str] = None,
40            service_account_json: Optional[str] = None
41    ) -> None:
42        """
43        Initialize the GCPCloudFunctions class.
44
45        Authenticates using service account JSON if provided or default credentials,
46        and sets up the Storage Client.
47
48        Args:
49            project: Optional[str] = None
50                The GCP project ID. If not provided, will use project from service account or default.
51            service_account_json: Optional[str] = None
52                Path to service account JSON key file. If provided, will use these credentials.
53        """
54        # Initialize credentials and project
55        credentials = None
56        default_project = None
57
58        if service_account_json:
59            credentials = service_account.Credentials.from_service_account_file(service_account_json)
60            # Extract project from service account if not specified
61            if not project:
62                with open(service_account_json, 'r') as f:
63                    sa_info = json.load(f)
64                    project = sa_info.get('project_id')
65        else:
66            # Use default credentials
67            credentials, default_project = default()
68
69        # Set project if not already set
70        if not project:
71            project = default_project
72
73        self.client = storage.Client(credentials=credentials, project=project)
74        """@private"""

Initialize the GCPCloudFunctions class.

Authenticates using service account JSON if provided or default credentials, and sets up the Storage Client.

Args: project: Optional[str] = None The GCP project ID. If not provided, will use project from service account or default. service_account_json: Optional[str] = None Path to service account JSON key file. If provided, will use these credentials.

def load_blob_from_full_path(self, full_path: str) -> google.cloud.storage.blob.Blob:
 97    def load_blob_from_full_path(self, full_path: str) -> Blob:
 98        """
 99        Load a GCS blob object from a full GCS path.
100
101        **Args:**
102        - full_path (str): The full GCS path.
103
104        **Returns:**
105        - google.cloud.storage.blob.Blob: The GCS blob object.
106        """
107        file_path_components = self._process_cloud_path(full_path)
108
109        # Specify the billing project
110        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
111        blob = bucket.blob(file_path_components["blob_url"])
112
113        # If blob exists in GCS reload it so metadata is there
114        if blob.exists():
115            blob.reload()
116        return blob

Load a GCS blob object from a full GCS path.

Args:

  • full_path (str): The full GCS path.

Returns:

  • google.cloud.storage.blob.Blob: The GCS blob object.
def check_file_exists(self, full_path: str) -> bool:
118    def check_file_exists(self, full_path: str) -> bool:
119        """
120        Check if a file exists in GCS.
121
122        **Args:**
123        - full_path (str): The full GCS path.
124
125        **Returns:**
126        - bool: `True` if the file exists, `False` otherwise.
127        """
128        blob = self.load_blob_from_full_path(full_path)
129        return blob.exists()

Check if a file exists in GCS.

Args:

  • full_path (str): The full GCS path.

Returns:

  • bool: True if the file exists, False otherwise.
def list_bucket_contents( self, bucket_name: str, prefix: Optional[str] = None, file_extensions_to_ignore: list[str] = [], file_strings_to_ignore: list[str] = [], file_extensions_to_include: list[str] = [], file_name_only: bool = False, verbose: bool = False, log_progress_interval: int = 10000) -> list[dict]:
196    def list_bucket_contents(
197            self,
198            bucket_name: str,
199            prefix: Optional[str] = None,
200            file_extensions_to_ignore: list[str] = [],
201            file_strings_to_ignore: list[str] = [],
202            file_extensions_to_include: list[str] = [],
203            file_name_only: bool = False,
204            verbose: bool = False,
205            log_progress_interval: int = 10000
206    ) -> list[dict]:
207        """
208        List contents of a GCS bucket and return a list of dictionaries with file information.
209
210        **Args:**
211        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
212        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
213        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
214        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
215        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
216        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
217        - verbose (bool, optional): Whether to log files not being included. Defaults to `False`.
218        - log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to `10000`.
219
220        **Returns:**
221        - list[dict]: A list of dictionaries containing file information.
222        """
223        # If the bucket name starts with gs://, remove it
224        if bucket_name.startswith("gs://"):
225            bucket_name = bucket_name.split("/")[2].strip()
226
227        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
228
229        # Get the bucket object and set user_project for Requester Pays
230        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
231
232        # list_blobs returns a lazy iterator — no network calls happen until we iterate.
233        # page_size=1000 is the GCS maximum, minimising the number of paginated round-trips.
234        prefix_msg = f" with prefix '{prefix}'" if prefix else ""
235        logging.info(f"Starting to page through blobs in bucket '{bucket_name}'{prefix_msg}...")
236        blobs = bucket.list_blobs(prefix=prefix, page_size=1000)
237
238        # Iterate with progress logging so large buckets don't appear stuck
239        file_list = []
240        for blob in blobs:
241            if blob.name.endswith("/"):
242                continue
243            if not self._validate_include_blob(
244                blob=blob,
245                file_extensions_to_ignore=file_extensions_to_ignore,
246                file_strings_to_ignore=file_strings_to_ignore,
247                file_extensions_to_include=file_extensions_to_include,
248                bucket_name=bucket_name
249            ):
250                continue
251            file_list.append(
252                self._create_bucket_contents_dict(
253                    blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
254                )
255            )
256            if verbose and len(file_list) % log_progress_interval == 0:
257                logging.info(f"Processed {len(file_list):,} files so far...")
258
259        logging.info(f"Found {len(file_list):,} files in bucket")
260        return file_list

List contents of a GCS bucket and return a list of dictionaries with file information.

Args:

  • bucket_name (str): The name of the GCS bucket. If includes gs://, it will be removed.
  • prefix (str, optional): The prefix to filter the blobs. Defaults to None.
  • file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
  • file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
  • file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
  • file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to False.
  • verbose (bool, optional): Whether to log files not being included. Defaults to False.
  • log_progress_interval (int, optional): Log a progress message every N files processed. Defaults to 10000.

Returns:

  • list[dict]: A list of dictionaries containing file information.
def copy_cloud_file( self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
262    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
263        """
264        Copy a file from one GCS location to another.
265
266        **Args:**
267        - src_cloud_path (str): The source GCS path.
268        - full_destination_path (str): The destination GCS path.
269        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
270        """
271        try:
272            src_blob = self.load_blob_from_full_path(src_cloud_path)
273            dest_blob = self.load_blob_from_full_path(full_destination_path)
274
275            # Use rewrite so no timeouts
276            rewrite_token = False
277
278            while True:
279                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
280                    src_blob, token=rewrite_token
281                )
282                if verbose:
283                    logging.info(
284                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
285                    )
286                if not rewrite_token:
287                    break
288
289        except Exception as e:
290            logging.error(
291                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
292                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
293            )
294            raise

Copy a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
  • verbose (bool, optional): Whether to log progress. Defaults to False.
def delete_cloud_file(self, full_cloud_path: str) -> None:
296    def delete_cloud_file(self, full_cloud_path: str) -> None:
297        """
298        Delete a file from GCS.
299
300        **Args:**
301        - full_cloud_path (str): The GCS path of the file to delete.
302        """
303        blob = self.load_blob_from_full_path(full_cloud_path)
304        blob.delete()

Delete a file from GCS.

Args:

  • full_cloud_path (str): The GCS path of the file to delete.
def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
306    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
307        """
308        Move a file from one GCS location to another.
309
310        **Args:**
311        - src_cloud_path (str): The source GCS path.
312        - full_destination_path (str): The destination GCS path.
313        """
314        self.copy_cloud_file(src_cloud_path, full_destination_path)
315        self.delete_cloud_file(src_cloud_path)

Move a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
def get_filesize(self, target_path: str) -> int:
317    def get_filesize(self, target_path: str) -> int:
318        """
319        Get the size of a file in GCS.
320
321        **Args:**
322        - target_path (str): The GCS path of the file.
323
324        **Returns:**
325        - int: The size of the file in bytes.
326        """
327        blob = self.load_blob_from_full_path(target_path)
328        return blob.size

Get the size of a file in GCS.

Args:

  • target_path (str): The GCS path of the file.

Returns:

  • int: The size of the file in bytes.
def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
330    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
331        """
332        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
333
334        **Args:**
335        - src_cloud_path (str): The source GCS path.
336        - dest_cloud_path (str): The destination GCS path.
337
338        **Returns:**
339        - bool: `True` if the files are identical, `False` otherwise.
340        """
341        src_blob = self.load_blob_from_full_path(src_cloud_path)
342        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
343
344        # If either blob is None or does not exist
345        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
346            return False
347        # If the MD5 hashes exist
348        if src_blob.md5_hash and dest_blob.md5_hash:
349            # And are the same return True
350            if src_blob.md5_hash == dest_blob.md5_hash:
351                return True
352        else:
353            # If md5 do not exist (for larger files they may not) check size matches
354            if src_blob.size == dest_blob.size:
355                return True
356        # Otherwise, return False
357        return False

Validate if two cloud files (source and destination) are identical based on their MD5 hashes.

Args:

  • src_cloud_path (str): The source GCS path.
  • dest_cloud_path (str): The destination GCS path.

Returns:

  • bool: True if the files are identical, False otherwise.
def delete_multiple_files( self, files_to_delete: list[str], workers: int = 10, max_retries: int = 5, verbose: bool = False, job_complete_for_logging: int = 500) -> None:
359    def delete_multiple_files(
360            self,
361            files_to_delete: list[str],
362            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
363            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
364            verbose: bool = False,
365            job_complete_for_logging: int = 500
366    ) -> None:
367        """
368        Delete multiple cloud files in parallel using multi-threading.
369
370        **Args:**
371        - files_to_delete (list[str]): List of GCS paths of the files to delete.
372        - workers (int, optional): Number of worker threads. Defaults to `10`.
373        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
374        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
375        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
376        """
377        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
378
379        MultiThreadedJobs().run_multi_threaded_job(
380            workers=workers,
381            function=self.delete_cloud_file,
382            list_of_jobs_args_list=list_of_jobs_args_list,
383            max_retries=max_retries,
384            fail_on_error=True,
385            verbose=verbose,
386            collect_output=False,
387            jobs_complete_for_logging=job_complete_for_logging
388        )

Delete multiple cloud files in parallel using multi-threading.

Args:

  • files_to_delete (list[str]): List of GCS paths of the files to delete.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.
def loop_and_log_validation_files_multithreaded( self, files_to_validate: list[dict], log_difference: bool, workers: int = 10, max_retries: int = 5, job_complete_for_logging: int = 500) -> list[dict]:
407    def loop_and_log_validation_files_multithreaded(
408            self,
409            files_to_validate: list[dict],
410            log_difference: bool,
411            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
412            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
413            job_complete_for_logging: int = 500
414    ) -> list[dict]:
415        """
416        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
417
418        **Args:**
419        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
420        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
421                                   this at the start of a copy/move operation to check if files are already copied.
422        - workers (int, optional): Number of worker threads. Defaults to `10`.
423        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
424        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
425
426        **Returns:**
427        - list[dict]: List of dictionaries containing files that are **not** identical.
428        """
429        logging.info(f"Validating if {len(files_to_validate)} files are identical")
430
431        # Prepare jobs: pass the necessary arguments to each validation
432        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
433
434        # Use multithreaded job runner to validate the files
435        checked_files = MultiThreadedJobs().run_multi_threaded_job(
436            workers=workers,
437            function=self._validate_file_pair,
438            list_of_jobs_args_list=jobs,
439            collect_output=True,
440            max_retries=max_retries,
441            jobs_complete_for_logging=job_complete_for_logging
442        )
443        # If any files failed to load, raise an exception
444        if files_to_validate and None in checked_files:  # type: ignore[operator]
445            logging.error("Failed to validate all files, could not load some blobs")
446            raise Exception("Failed to validate all files")
447
448        # Get all files that are not identical
449        not_identical_files = [
450            file_dict
451            for file_dict in checked_files  # type: ignore[operator, union-attr]
452            if not file_dict['identical']
453        ]
454        if not_identical_files:
455            if log_difference:
456                for file_dict in not_identical_files:
457                    logging.warning(
458                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
459                    )
460            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
461        return not_identical_files

Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.

Args:

  • files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
  • log_difference (bool): Whether to log differences if files are not identical. Set False if you are running this at the start of a copy/move operation to check if files are already copied.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries for all jobs. Defaults to 5.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Returns:

  • list[dict]: List of dictionaries containing files that are not identical.
def multithread_copy_of_files_with_validation( self, files_to_copy: list[dict], workers: int = 10, max_retries: int = 5, skip_check_if_already_copied: bool = False) -> None:
463    def multithread_copy_of_files_with_validation(
464            self,
465            files_to_copy: list[dict],
466            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
467            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
468            skip_check_if_already_copied: bool = False
469    ) -> None:
470        """
471        Copy multiple files in parallel with validation.
472
473        **Args:**
474        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
475                Dictionary should have keys `source_file` and `full_destination_path`
476        - workers (int): Number of worker threads. Defaults to `10`.
477        - max_retries (int): Maximum number of retries. Defaults to `5`
478        - skip_check_if_already_copied (bool, optional): Whether to skip checking
479                if files are already copied and start copying right away. Defaults to `False`.
480        """
481        if skip_check_if_already_copied:
482            logging.info("Skipping check if files are already copied")
483            updated_files_to_move = files_to_copy
484        else:
485            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
486                files_to_copy,
487                log_difference=False,
488                workers=workers,
489                max_retries=max_retries
490            )
491        # If all files are already copied, return
492        if not updated_files_to_move:
493            logging.info("All files are already copied")
494            return None
495        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
496        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
497        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
498        # Validate that all files were copied successfully
499        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
500            files_to_copy,
501            workers=workers,
502            log_difference=True,
503            max_retries=max_retries
504        )
505        if files_not_moved_successfully:
506            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
507            raise Exception("Failed to copy all files")
508        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
509        return None

Copy multiple files in parallel with validation.

Args:

  • files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. Dictionary should have keys source_file and full_destination_path
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5
  • skip_check_if_already_copied (bool, optional): Whether to skip checking if files are already copied and start copying right away. Defaults to False.
def move_or_copy_multiple_files( self, files_to_move: list[dict], action: str, workers: int = 10, max_retries: int = 5, verbose: bool = False, jobs_complete_for_logging: int = 500) -> None:
511    def move_or_copy_multiple_files(
512            self, files_to_move: list[dict],
513            action: str,
514            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
515            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
516            verbose: bool = False,
517            jobs_complete_for_logging: int = 500
518    ) -> None:
519        """
520        Move or copy multiple files in parallel.
521
522        **Args:**
523        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
524        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
525        or `ops_utils.gcp_utils.COPY`).
526        - workers (int): Number of worker threads. Defaults to `10`.
527        - max_retries (int): Maximum number of retries. Defaults to `5`.
528        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
529        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
530
531        **Raises:**
532        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
533        """
534        if action == MOVE:
535            cloud_function = self.move_cloud_file
536        elif action == COPY:
537            cloud_function = self.copy_cloud_file
538        else:
539            raise ValueError("Must either select move or copy")
540
541        list_of_jobs_args_list = [
542            [
543                file_dict['source_file'], file_dict['full_destination_path']
544            ]
545            for file_dict in files_to_move
546        ]
547        MultiThreadedJobs().run_multi_threaded_job(
548            workers=workers,
549            function=cloud_function,
550            list_of_jobs_args_list=list_of_jobs_args_list,
551            max_retries=max_retries,
552            fail_on_error=True,
553            verbose=verbose,
554            collect_output=False,
555            jobs_complete_for_logging=jobs_complete_for_logging
556        )

Move or copy multiple files in parallel.

Args:

  • files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
  • action (str): The action to perform (should be one of ops_utils.gcp_utils.MOVE or ops_utils.gcp_utils.COPY).
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Raises:

def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
558    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
559        """
560        Read the content of a file from GCS.
561
562        **Args:**
563        - cloud_path (str): The GCS path of the file to read.
564        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
565
566        **Returns:**
567        - bytes: The content of the file as bytes.
568        """
569        blob = self.load_blob_from_full_path(cloud_path)
570        # Download the file content as bytes
571        content_bytes = blob.download_as_bytes()
572        # Convert bytes to string
573        content_str = content_bytes.decode(encoding)
574        return content_str

Read the content of a file from GCS.

Args:

  • cloud_path (str): The GCS path of the file to read.
  • encoding (str, optional): The encoding to use. Defaults to utf-8.

Returns:

  • bytes: The content of the file as bytes.
def upload_blob( self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
576    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
577        """
578        Upload a file to GCS.
579
580        **Args:**
581        - destination_path (str): The destination GCS path.
582        - source_file (str): The source file path.
583        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
584        """
585        blob = self.load_blob_from_full_path(destination_path)
586        if custom_metadata:
587            blob.metadata = custom_metadata
588        blob.upload_from_filename(source_file)

Upload a file to GCS.

Args:

  • destination_path (str): The destination GCS path.
  • source_file (str): The source file path.
  • custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
def get_object_md5( self, file_path: str, chunk_size: int = 256000, logging_bytes: int = 1000000000, returned_md5_format: str = 'hex') -> str:
590    def get_object_md5(
591        self,
592        file_path: str,
593        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
594        chunk_size: int = parse_size("256 KB"),
595        logging_bytes: int = parse_size("1 GB"),
596        returned_md5_format: str = "hex"
597    ) -> str:
598        """
599        Calculate the MD5 checksum of a file in GCS.
600
601        **Args:**
602        - file_path (str): The GCS path of the file.
603        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
604        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
605        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
606                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
607
608        **Returns:**
609        - str: The MD5 checksum of the file.
610
611        **Raises:**
612        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
613        or `ops_utils.gcp_utils.MD5_BASE64`
614        """
615        if returned_md5_format not in ["hex", "base64"]:
616            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
617
618        blob = self.load_blob_from_full_path(file_path)
619
620        # Create an MD5 hash object
621        md5_hash = hashlib.md5()
622
623        blob_size_str = format_size(blob.size)
624        logging.info(f"Streaming {file_path} which is {blob_size_str}")
625        # Use a BytesIO stream to collect data in chunks and upload it
626        buffer = io.BytesIO()
627        total_bytes_streamed = 0
628        # Keep track of the last logged size for data logging
629        last_logged = 0
630
631        with blob.open("rb") as source_stream:
632            while True:
633                chunk = source_stream.read(chunk_size)
634                if not chunk:
635                    break
636                md5_hash.update(chunk)
637                buffer.write(chunk)
638                total_bytes_streamed += len(chunk)
639                # Log progress every 1 gb if verbose used
640                if total_bytes_streamed - last_logged >= logging_bytes:
641                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
642                    last_logged = total_bytes_streamed
643
644        if returned_md5_format == "hex":
645            md5 = md5_hash.hexdigest()
646            logging.info(f"MD5 (hex) for {file_path}: {md5}")
647        elif returned_md5_format == "base64":
648            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
649            logging.info(f"MD5 (base64) for {file_path}: {md5}")
650        return md5

Calculate the MD5 checksum of a file in GCS.

Args:

  • file_path (str): The GCS path of the file.
  • chunk_size (int, optional): The size of each chunk to read. Defaults to 256 KB.
  • logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to 1 GB.
  • returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to hex. Options are ops_utils.gcp_utils.MD5_HEX or ops_utils.gcp_utils.MD5_BASE64.

Returns:

  • str: The MD5 checksum of the file.

Raises:

def set_acl_public_read(self, cloud_path: str) -> None:
652    def set_acl_public_read(self, cloud_path: str) -> None:
653        """
654        Set the file in the bucket to be publicly readable.
655
656        **Args:**
657        - cloud_path (str): The GCS path of the file to be set as public readable.
658        """
659        blob = self.load_blob_from_full_path(cloud_path)
660        blob.acl.all().grant_read()
661        blob.acl.save()

Set the file in the bucket to be publicly readable.

Args:

  • cloud_path (str): The GCS path of the file to be set as public readable.
def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
663    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
664        """
665        Set the file in the bucket to grant OWNER permission to a specific group.
666
667        **Args:**
668        - cloud_path (str): The GCS path of the file.
669        - group_email (str): The email of the group to grant OWNER permission
670        """
671        blob = self.load_blob_from_full_path(cloud_path)
672        blob.acl.group(group_email).grant_owner()
673        blob.acl.save()

Set the file in the bucket to grant OWNER permission to a specific group.

Args:

  • cloud_path (str): The GCS path of the file.
  • group_email (str): The email of the group to grant OWNER permission
def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
675    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
676        """
677        Set Cache-Control metadata for a file.
678
679        **Args:**
680        - cloud_path (str): The GCS path of the file.
681        - cache_control (str): The Cache-Control metadata to set.
682        """
683        blob = self.load_blob_from_full_path(cloud_path)
684        blob.cache_control = cache_control
685        blob.patch()

Set Cache-Control metadata for a file.

Args:

  • cloud_path (str): The GCS path of the file.
  • cache_control (str): The Cache-Control metadata to set.
def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
687    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
688        """
689        Get the most recent blob in the bucket.
690        
691        If the blob with the most recent timestamp doesn't have
692        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
693        recent file until a log with useful information is encountered. This is useful when combing through
694        GCP activity logs for Terra workspace buckets.
695
696        **Args:**
697        - bucket_name (str): The GCS bucket name.
698
699        **Returns:**
700        - Optional tuple of the blob found and the file contents from the blob
701        """
702        blobs = sorted(
703            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
704        )
705        for blob in blobs:
706            # Download the file contents as a string
707            file_contents = blob.download_as_text()
708
709            # Check if the content matches the undesired format
710            lines = file_contents.splitlines()
711            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
712                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
713                continue
714
715            # If it doesn't match the undesired format, return its content
716            logging.info(f"Found valid file: {blob.name}")
717            return blob, file_contents
718
719        logging.info("No valid files found.")
720        return None

Get the most recent blob in the bucket.

If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.

Args:

  • bucket_name (str): The GCS bucket name.

Returns:

  • Optional tuple of the blob found and the file contents from the blob
def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
722    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
723        """
724        Write content to a file in GCS.
725
726        **Args:**
727        - cloud_path (str): The GCS path of the file to write.
728        - file_contents (str): The content to write.
729        """
730        blob = self.load_blob_from_full_path(cloud_path)
731        blob.upload_from_string(file_contents)
732        logging.info(f"Successfully wrote content to {cloud_path}")

Write content to a file in GCS.

Args:

  • cloud_path (str): The GCS path of the file to write.
  • file_contents (str): The content to write.
@staticmethod
def get_active_gcloud_account() -> str:
734    @staticmethod
735    def get_active_gcloud_account() -> str:
736        """
737        Get the active GCP email for the current account.
738
739        **Returns:**
740        - str: The active GCP account email.
741        """
742        result = subprocess.run(
743            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
744            capture_output=True,
745            text=True,
746            check=True
747        )
748        return result.stdout.strip()

Get the active GCP email for the current account.

Returns:

  • str: The active GCP account email.
def has_write_permission(self, cloud_path: str) -> bool:
750    def has_write_permission(self, cloud_path: str) -> bool:
751        """
752        Check if the current user has permission to write to a GCP path.
753
754        This method tests write access by attempting to update the metadata
755        of an existing blob or create a zero-byte temporary file if the blob
756        doesn't exist. The temporary file is deleted immediately if created.
757
758        **Args:**
759        - cloud_path (str): The GCS path to check for write permissions.
760
761        **Returns:**
762        - bool: True if the user has write permission, False otherwise.
763        """
764        if not cloud_path.startswith("gs://"):
765            raise ValueError("cloud_path must start with 'gs://'")
766        if cloud_path.endswith("/"):
767            logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp")
768            cloud_path = f"{cloud_path}permission_test_temp"
769        try:
770            blob = self.load_blob_from_full_path(cloud_path)
771            if blob.exists():
772                # Try updating metadata (doesn't change the content)
773                original_metadata = blob.metadata or {}
774                test_metadata = original_metadata.copy()
775                test_metadata["_write_permission_test"] = "true"
776
777                blob.metadata = test_metadata
778                blob.patch()
779
780                # Restore the original metadata
781                blob.metadata = original_metadata
782                blob.patch()
783
784                logging.info(f"Write permission confirmed for existing blob {cloud_path}")
785                return True
786            else:
787                # Try writing a temporary file to the bucket
788                blob.upload_from_string("")
789
790                # Clean up the test file
791                blob.delete()
792                logging.info(f"Write permission confirmed for {cloud_path}")
793                return True
794        except Forbidden:
795            logging.warning(f"No write permission on path {cloud_path}")
796            return False
797        except GoogleAPICallError as e:
798            logging.warning(f"Error testing write access to {cloud_path}: {e}")
799            return False

Check if the current user has permission to write to a GCP path.

This method tests write access by attempting to update the metadata of an existing blob or create a zero-byte temporary file if the blob doesn't exist. The temporary file is deleted immediately if created.

Args:

  • cloud_path (str): The GCS path to check for write permissions.

Returns:

  • bool: True if the user has write permission, False otherwise.
def wait_for_write_permission( self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
801    def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
802        """
803        Wait for write permissions on a GCP path, checking at regular intervals.
804
805        This method will periodically check if the user has write permission on the specified cloud path.
806        It will continue checking until either write permission is granted or the maximum wait time is reached.
807
808        **Args:**
809        - cloud_path (str): The GCS path to check for write permissions.
810        - interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
811        - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
812
813        **Returns:**
814        - bool: True if write permission is granted within the wait time, False otherwise.
815        """
816        if not cloud_path.startswith("gs://"):
817            raise ValueError("cloud_path must start with 'gs://'")
818
819        # Convert minutes to seconds for the sleep function
820        interval_seconds = interval_wait_time_minutes * 60
821        max_wait_seconds = max_wait_time_minutes * 60
822
823        start_time = time.time()
824        attempt_number = 1
825
826        logging.info(
827            f"Starting to check for write permissions on {cloud_path}. Will check "
828            f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).")
829
830        # First check immediately
831        if self.has_write_permission(cloud_path):
832            logging.info(f"Write permission confirmed on initial check for {cloud_path}")
833            return
834
835        # If first check fails, start periodic checks
836        while time.time() - start_time < max_wait_seconds:
837            elapsed_minutes = (time.time() - start_time) / 60
838            remaining_minutes = max_wait_time_minutes - elapsed_minutes
839
840            logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. "
841                         f"Time elapsed: {elapsed_minutes:.1f} minute(s). "
842                         f"Time remaining: {remaining_minutes:.1f} minute(s).")
843
844            # Sleep for the interval duration
845            time.sleep(interval_seconds)
846
847            attempt_number += 1
848            logging.info(f"Checking write permissions (attempt {attempt_number})...")
849
850            if self.has_write_permission(cloud_path):
851                elapsed_minutes = (time.time() - start_time) / 60
852                logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}")
853                return
854
855        # If we get here, we've exceeded the maximum wait time
856        raise PermissionError(
857            f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for "
858            f"{cloud_path} after {attempt_number} attempts.")

Wait for write permissions on a GCP path, checking at regular intervals.

This method will periodically check if the user has write permission on the specified cloud path. It will continue checking until either write permission is granted or the maximum wait time is reached.

Args:

  • cloud_path (str): The GCS path to check for write permissions.
  • interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
  • max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.

Returns:

  • bool: True if write permission is granted within the wait time, False otherwise.
def check_files_exist_multithreaded( self, full_paths: list[str], workers: int = 10, max_retries: int = 5, job_complete_for_logging: int = 500) -> dict[str, bool]:
882    def check_files_exist_multithreaded(
883            self,
884            full_paths: list[str],
885            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
886            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
887            job_complete_for_logging: int = 500
888    ) -> dict[str, bool]:
889        """Check existence of multiple GCS files in parallel.
890
891        **Args:**
892        - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to check.
893        - workers (int, optional): Number of worker threads. Defaults to `10`.
894        - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`.
895        - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`.
896
897        **Returns:**
898        - dict[str, bool]: A dictionary where each key is a GCS path and the value is ``True`` if the file exists,
899          ``False`` otherwise.
900        """
901        # _check_file_exists_in_gcs is needed because check_file_exists returns a plain bool, which would make it
902        # impossible to associate each result back to its path after the threads complete. By wrapping
903        # it here we return both the path and the result together, so the final dict comprehension
904        # can correctly map path -> bool regardless of the order results come back from the thread pool.
905        def _check_file_exists_in_gcs(path: str) -> dict:
906            return {"path": path, "exists": self.check_file_exists(path)}
907
908        jobs = [[path] for path in full_paths]
909
910        results = MultiThreadedJobs().run_multi_threaded_job(
911            workers=workers,
912            function=_check_file_exists_in_gcs,
913            list_of_jobs_args_list=jobs,
914            collect_output=True,
915            max_retries=max_retries,
916            fail_on_error=True,
917            jobs_complete_for_logging=job_complete_for_logging
918        )
919        return {item["path"]: item["exists"] for item in results}  # type: ignore[union-attr]

Check existence of multiple GCS files in parallel.

Args:

  • full_paths (list[str]): List of full GCS paths (e.g. gs://bucket/path/to/object) to check.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries per job. Defaults to 5.
  • job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to 500.

Returns:

  • dict[str, bool]: A dictionary where each key is a GCS path and the value is True if the file exists, False otherwise.
def read_files_multithreaded( self, full_paths: list[str], encoding: str = 'utf-8', workers: int = 10, max_retries: int = 5, job_complete_for_logging: int = 500) -> dict[str, str]:
921    def read_files_multithreaded(
922            self,
923            full_paths: list[str],
924            encoding: str = "utf-8",
925            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
926            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
927            job_complete_for_logging: int = 500
928    ) -> dict[str, str]:
929        """Read the contents of multiple GCS files in parallel.
930
931        **Args:**
932        - full_paths (list[str]): List of full GCS paths (e.g. ``gs://bucket/path/to/object``) to read.
933        - encoding (str, optional): The encoding to use when decoding file contents. Defaults to `utf-8`.
934        - workers (int, optional): Number of worker threads. Defaults to `10`.
935        - max_retries (int, optional): Maximum number of retries per job. Defaults to `5`.
936        - job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to `500`.
937
938        **Returns:**
939        - dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string.
940        """
941        # _read_file_contents_from_gcs is needed because read_file returns a plain str, which would make it
942        # impossible to associate each result back to its path after the threads complete. By wrapping
943        # it here we return both the path and the contents together, so the final dict comprehension
944        # can correctly map path -> contents regardless of the order results come back from the thread pool.
945        def _read_file_contents_from_gcs(path: str) -> dict:
946            return {"path": path, "contents": self.read_file(path, encoding=encoding)}
947
948        jobs = [[path] for path in full_paths]
949
950        results = MultiThreadedJobs().run_multi_threaded_job(
951            workers=workers,
952            function=_read_file_contents_from_gcs,
953            list_of_jobs_args_list=jobs,
954            collect_output=True,
955            max_retries=max_retries,
956            fail_on_error=True,
957            jobs_complete_for_logging=job_complete_for_logging
958        )
959        return {item["path"]: item["contents"] for item in results}  # type: ignore[union-attr]

Read the contents of multiple GCS files in parallel.

Args:

  • full_paths (list[str]): List of full GCS paths (e.g. gs://bucket/path/to/object) to read.
  • encoding (str, optional): The encoding to use when decoding file contents. Defaults to utf-8.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries per job. Defaults to 5.
  • job_complete_for_logging (int, optional): Emit progress logs every N completed jobs. Defaults to 500.

Returns:

  • dict[str, str]: A dictionary where each key is a GCS path and the value is the file's contents as a string.
def load_blobs_from_full_paths_multithreaded( self, full_paths: list[str], file_name_only: bool = False, workers: int = 10, max_retries: int = 5, job_complete_for_logging: int = 500) -> list[dict]:
 961    def load_blobs_from_full_paths_multithreaded(
 962            self,
 963            full_paths: list[str],
 964            file_name_only: bool = False,
 965            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
 966            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
 967            job_complete_for_logging: int = 500
 968    ) -> list[dict]:
 969        """Load multiple blobs in parallel from a list of full GCS paths.
 970
 971        Args:
 972            full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object).
 973            file_name_only: Whether to return only the file path in each dict.
 974            workers: Number of worker threads.
 975            max_retries: Maximum number of retries per job.
 976            job_complete_for_logging: Emit progress logs every N completed jobs.
 977
 978        Returns:
 979            List of dictionaries shaped like :meth:`_create_bucket_contents_dict`.
 980
 981        Raises:
 982            Exception: If any blob cannot be loaded / does not exist / returns no output.
 983        """
 984        if not full_paths:
 985            return []
 986
 987        # De-duplicate input paths and keep order
 988        unique_paths = list(dict.fromkeys(full_paths))
 989        jobs = [(path, file_name_only) for path in unique_paths]
 990
 991        results = MultiThreadedJobs().run_multi_threaded_job(
 992            workers=workers,
 993            function=self._load_blob_full_path_to_bucket_contents_dict,
 994            list_of_jobs_args_list=jobs,
 995            collect_output=True,
 996            max_retries=max_retries,
 997            fail_on_error=True,
 998            jobs_complete_for_logging=job_complete_for_logging
 999        )
1000
1001        # If any jobs failed, MultiThreadedJobs would have raised. Still defensively validate output.
1002        if results is None or (unique_paths and (None in results)):  # type: ignore[operator]
1003            raise Exception("Failed to load all blobs")
1004
1005        if len(results) != len(unique_paths) or any(not item for item in results):  # type: ignore[arg-type]
1006            raise Exception("Failed to load all blobs")
1007
1008        return results  # type: ignore[return-value]

Load multiple blobs in parallel from a list of full GCS paths.

Args: full_paths: List of full GCS paths (e.g. gs://bucket/path/to/object). file_name_only: Whether to return only the file path in each dict. workers: Number of worker threads. max_retries: Maximum number of retries per job. job_complete_for_logging: Emit progress logs every N completed jobs.

Returns: List of dictionaries shaped like _create_bucket_contents_dict().

Raises: Exception: If any blob cannot be loaded / does not exist / returns no output.