ops_utils.gcp_utils

Module for GCP utilities.

  1"""Module for GCP utilities."""
  2import os
  3import logging
  4import io
  5import hashlib
  6import base64
  7import subprocess
  8
  9from humanfriendly import format_size, parse_size
 10from mimetypes import guess_type
 11from typing import Optional, Any
 12from google.cloud.storage.blob import Blob
 13
 14from .vars import ARG_DEFAULTS
 15from .thread_pool_executor_util import MultiThreadedJobs
 16
 17MOVE = "move"
 18"""Variable to be used for the "action" when files should be moved."""
 19COPY = "copy"
 20"""Variable to be used for the "action" when files should be copied."""
 21MD5_HEX = "hex"
 22"""Variable to be used when the generated `md5` should be of the `hex` type."""
 23MD5_BASE64 = "base64"
 24"""Variable to be used when the generated `md5` should be of the `base64` type."""
 25
 26
 27class GCPCloudFunctions:
 28    """Class to handle GCP Cloud Functions."""
 29
 30    def __init__(self, project: Optional[str] = None) -> None:
 31        """
 32        Initialize the GCPCloudFunctions class.
 33
 34        Authenticates using the default credentials and sets up the Storage Client.
 35        Uses the `project_id` if provided, otherwise utilizes the default project set.
 36
 37        **Args:**
 38        - project (str, optional): The GCP project ID
 39        """
 40        from google.cloud import storage  # type: ignore[attr-defined]
 41        from google.auth import default
 42        credentials, default_project = default()
 43        if not project:
 44            project = default_project
 45        self.client = storage.Client(credentials=credentials, project=project)
 46        """@private"""
 47
 48    @staticmethod
 49    def _process_cloud_path(cloud_path: str) -> dict:
 50        """
 51        Process a GCS cloud path into its components.
 52
 53        Args:
 54            cloud_path (str): The GCS cloud path.
 55
 56        Returns:
 57            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
 58        """
 59        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
 60        bucket_name = str.split(remaining_url, sep="/")[0]
 61        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
 62        path_components = {
 63            "platform_prefix": platform_prefix,
 64            "bucket": bucket_name,
 65            "blob_url": blob_name
 66        }
 67        return path_components
 68
 69    def load_blob_from_full_path(self, full_path: str) -> Blob:
 70        """
 71        Load a GCS blob object from a full GCS path.
 72
 73        **Args:**
 74        - full_path (str): The full GCS path.
 75
 76        **Returns:**
 77        - google.cloud.storage.blob.Blob: The GCS blob object.
 78        """
 79        file_path_components = self._process_cloud_path(full_path)
 80
 81        # Specify the billing project
 82        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
 83        blob = bucket.blob(file_path_components["blob_url"])
 84
 85        # If blob exists in GCS reload it so metadata is there
 86        if blob.exists():
 87            blob.reload()
 88        return blob
 89
 90    def check_file_exists(self, full_path: str) -> bool:
 91        """
 92        Check if a file exists in GCS.
 93
 94        **Args:**
 95        - full_path (str): The full GCS path.
 96
 97        **Returns:**
 98        - bool: `True` if the file exists, `False` otherwise.
 99        """
100        blob = self.load_blob_from_full_path(full_path)
101        return blob.exists()
102
103    @staticmethod
104    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
105        """
106        Create a dictionary containing file information.
107
108        Args:
109            bucket_name (str): The name of the GCS bucket.
110            blob (Any): The GCS blob object.
111            file_name_only (bool): Whether to return only the file list.
112
113        Returns:
114            dict: A dictionary containing file information.
115        """
116        if file_name_only:
117            return {
118                "path": f"gs://{bucket_name}/{blob.name}"
119            }
120        return {
121            "name": os.path.basename(blob.name),
122            "path": f"gs://{bucket_name}/{blob.name}",
123            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
124            "file_extension": os.path.splitext(blob.name)[1],
125            "size_in_bytes": blob.size,
126            "md5_hash": blob.md5_hash
127        }
128
129    @staticmethod
130    def _validate_include_blob(
131            blob: Any,
132            bucket_name: str,
133            file_extensions_to_ignore: list[str] = [],
134            file_strings_to_ignore: list[str] = [],
135            file_extensions_to_include: list[str] = [],
136            verbose: bool = False
137    ) -> bool:
138        """
139        Validate if a blob should be included based on its file extension.
140
141        Args:
142            file_extensions_to_include (list[str]): List of file extensions to include.
143            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
144            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
145            blob (Any): The GCS blob object.
146            verbose (bool): Whether to log files not being included.
147
148        Returns:
149            bool: True if the blob should be included, False otherwise.
150        """
151        file_path = f"gs://{bucket_name}/{blob.name}"
152        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
153            if verbose:
154                logging.info(f"Skipping {file_path} as it has an extension to ignore")
155            return False
156        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
157            if verbose:
158                logging.info(f"Skipping {file_path} as it does not have an extension to include")
159            return False
160        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
161            if verbose:
162                logging.info(f"Skipping {file_path} as it has a string to ignore")
163            return False
164        return True
165
166    def list_bucket_contents(
167            self,
168            bucket_name: str,
169            prefix: Optional[str] = None,
170            file_extensions_to_ignore: list[str] = [],
171            file_strings_to_ignore: list[str] = [],
172            file_extensions_to_include: list[str] = [],
173            file_name_only: bool = False
174    ) -> list[dict]:
175        """
176        List contents of a GCS bucket and return a list of dictionaries with file information.
177
178        **Args:**
179        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
180        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
181        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
182        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
183        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
184        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
185
186        **Returns:**
187        - list[dict]: A list of dictionaries containing file information.
188        """
189        # If the bucket name starts with gs://, remove it
190        if bucket_name.startswith("gs://"):
191            bucket_name = bucket_name.split("/")[2].strip()
192
193        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
194
195        # Get the bucket object and set user_project for Requester Pays
196        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
197
198        # List blobs within the bucket
199        blobs = bucket.list_blobs(prefix=prefix)
200        logging.info("Finished listing blobs. Processing files now.")
201
202        # Create a list of dictionaries containing file information
203        file_list = [
204            self._create_bucket_contents_dict(
205                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
206            )
207            for blob in blobs
208            if self._validate_include_blob(
209                blob=blob,
210                file_extensions_to_ignore=file_extensions_to_ignore,
211                file_strings_to_ignore=file_strings_to_ignore,
212                file_extensions_to_include=file_extensions_to_include,
213                bucket_name=bucket_name
214            ) and not blob.name.endswith("/")
215        ]
216        logging.info(f"Found {len(file_list)} files in bucket")
217        return file_list
218
219    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
220        """
221        Copy a file from one GCS location to another.
222
223        **Args:**
224        - src_cloud_path (str): The source GCS path.
225        - full_destination_path (str): The destination GCS path.
226        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
227        """
228        try:
229            src_blob = self.load_blob_from_full_path(src_cloud_path)
230            dest_blob = self.load_blob_from_full_path(full_destination_path)
231
232            # Use rewrite so no timeouts
233            rewrite_token = False
234
235            while True:
236                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
237                    src_blob, token=rewrite_token
238                )
239                if verbose:
240                    logging.info(
241                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
242                    )
243                if not rewrite_token:
244                    break
245
246        except Exception as e:
247            logging.error(
248                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
249                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
250            )
251            raise
252
253    def delete_cloud_file(self, full_cloud_path: str) -> None:
254        """
255        Delete a file from GCS.
256
257        **Args:**
258        - full_cloud_path (str): The GCS path of the file to delete.
259        """
260        blob = self.load_blob_from_full_path(full_cloud_path)
261        blob.delete()
262
263    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
264        """
265        Move a file from one GCS location to another.
266
267        **Args:**
268        - src_cloud_path (str): The source GCS path.
269        - full_destination_path (str): The destination GCS path.
270        """
271        self.copy_cloud_file(src_cloud_path, full_destination_path)
272        self.delete_cloud_file(src_cloud_path)
273
274    def get_filesize(self, target_path: str) -> int:
275        """
276        Get the size of a file in GCS.
277
278        **Args:**
279        - target_path (str): The GCS path of the file.
280
281        **Returns:**
282        - int: The size of the file in bytes.
283        """
284        blob = self.load_blob_from_full_path(target_path)
285        return blob.size
286
287    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
288        """
289        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
290
291        **Args:**
292        - src_cloud_path (str): The source GCS path.
293        - dest_cloud_path (str): The destination GCS path.
294
295        **Returns:**
296        - bool: `True` if the files are identical, `False` otherwise.
297        """
298        src_blob = self.load_blob_from_full_path(src_cloud_path)
299        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
300
301        # If either blob is None or does not exist
302        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
303            return False
304        # If the MD5 hashes exist
305        if src_blob.md5_hash and dest_blob.md5_hash:
306            # And are the same return True
307            if src_blob.md5_hash == dest_blob.md5_hash:
308                return True
309        else:
310            # If md5 do not exist (for larger files they may not) check size matches
311            if src_blob.size == dest_blob.size:
312                return True
313        # Otherwise, return False
314        return False
315
316    def delete_multiple_files(
317            self,
318            files_to_delete: list[str],
319            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
320            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
321            verbose: bool = False,
322            job_complete_for_logging: int = 500
323    ) -> None:
324        """
325        Delete multiple cloud files in parallel using multi-threading.
326
327        **Args:**
328        - files_to_delete (list[str]): List of GCS paths of the files to delete.
329        - workers (int, optional): Number of worker threads. Defaults to `10`.
330        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
331        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
332        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
333        """
334        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
335
336        MultiThreadedJobs().run_multi_threaded_job(
337            workers=workers,
338            function=self.delete_cloud_file,
339            list_of_jobs_args_list=list_of_jobs_args_list,
340            max_retries=max_retries,
341            fail_on_error=True,
342            verbose=verbose,
343            collect_output=False,
344            jobs_complete_for_logging=job_complete_for_logging
345        )
346
347    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
348        """
349        Validate if source and destination files are identical.
350
351        **Args:**
352        - source_file (str): The source file path.
353        - full_destination_path (str): The destination file path.
354
355        **Returns:**
356            dict: The file dictionary of the files with a boolean indicating if they are identical.
357        """
358        if self.validate_files_are_same(source_file, full_destination_path):
359            identical = True
360        else:
361            identical = False
362        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
363
364    def loop_and_log_validation_files_multithreaded(
365            self,
366            files_to_validate: list[dict],
367            log_difference: bool,
368            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
369            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
370            job_complete_for_logging: int = 500
371    ) -> list[dict]:
372        """
373        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
374
375        **Args:**
376        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
377        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
378                                   this at the start of a copy/move operation to check if files are already copied.
379        - workers (int, optional): Number of worker threads. Defaults to `10`.
380        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
381        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
382
383        **Returns:**
384        - list[dict]: List of dictionaries containing files that are **not** identical.
385        """
386        logging.info(f"Validating if {len(files_to_validate)} files are identical")
387
388        # Prepare jobs: pass the necessary arguments to each validation
389        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
390
391        # Use multithreaded job runner to validate the files
392        checked_files = MultiThreadedJobs().run_multi_threaded_job(
393            workers=workers,
394            function=self._validate_file_pair,
395            list_of_jobs_args_list=jobs,
396            collect_output=True,
397            max_retries=max_retries,
398            jobs_complete_for_logging=job_complete_for_logging
399        )
400        # If any files failed to load, raise an exception
401        if files_to_validate and None in checked_files:  # type: ignore[operator]
402            logging.error("Failed to validate all files, could not load some blobs")
403            raise Exception("Failed to validate all files")
404
405        # Get all files that are not identical
406        not_identical_files = [
407            file_dict
408            for file_dict in checked_files  # type: ignore[operator, union-attr]
409            if not file_dict['identical']
410        ]
411        if not_identical_files:
412            if log_difference:
413                for file_dict in not_identical_files:
414                    logging.warning(
415                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
416                    )
417            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
418        return not_identical_files
419
420    def multithread_copy_of_files_with_validation(
421            self,
422            files_to_copy: list[dict],
423            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
424            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
425            skip_check_if_already_copied: bool = False
426    ) -> None:
427        """
428        Copy multiple files in parallel with validation.
429
430        **Args:**
431        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
432                Dictionary should have keys `source_file` and `full_destination_path`
433        - workers (int): Number of worker threads. Defaults to `10`.
434        - max_retries (int): Maximum number of retries. Defaults to `5`
435        - skip_check_if_already_copied (bool, optional): Whether to skip checking
436                if files are already copied and start copying right away. Defaults to `False`.
437        """
438        if skip_check_if_already_copied:
439            logging.info("Skipping check if files are already copied")
440            updated_files_to_move = files_to_copy
441        else:
442            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
443                files_to_copy,
444                log_difference=False,
445                workers=workers,
446                max_retries=max_retries
447            )
448        # If all files are already copied, return
449        if not updated_files_to_move:
450            logging.info("All files are already copied")
451            return None
452        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
453        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
454        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
455        # Validate that all files were copied successfully
456        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
457            files_to_copy,
458            workers=workers,
459            log_difference=True,
460            max_retries=max_retries
461        )
462        if files_not_moved_successfully:
463            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
464            raise Exception("Failed to copy all files")
465        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
466        return None
467
468    def move_or_copy_multiple_files(
469            self, files_to_move: list[dict],
470            action: str,
471            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
472            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
473            verbose: bool = False,
474            jobs_complete_for_logging: int = 500
475    ) -> None:
476        """
477        Move or copy multiple files in parallel.
478
479        **Args:**
480        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
481        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
482        or `ops_utils.gcp_utils.COPY`).
483        - workers (int): Number of worker threads. Defaults to `10`.
484        - max_retries (int): Maximum number of retries. Defaults to `5`.
485        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
486        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
487
488        **Raises:**
489        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
490        """
491        if action == MOVE:
492            cloud_function = self.move_cloud_file
493        elif action == COPY:
494            cloud_function = self.copy_cloud_file
495        else:
496            raise ValueError("Must either select move or copy")
497
498        list_of_jobs_args_list = [
499            [
500                file_dict['source_file'], file_dict['full_destination_path']
501            ]
502            for file_dict in files_to_move
503        ]
504        MultiThreadedJobs().run_multi_threaded_job(
505            workers=workers,
506            function=cloud_function,
507            list_of_jobs_args_list=list_of_jobs_args_list,
508            max_retries=max_retries,
509            fail_on_error=True,
510            verbose=verbose,
511            collect_output=False,
512            jobs_complete_for_logging=jobs_complete_for_logging
513        )
514
515    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
516        """
517        Read the content of a file from GCS.
518
519        **Args:**
520        - cloud_path (str): The GCS path of the file to read.
521        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
522
523        **Returns:**
524        - bytes: The content of the file as bytes.
525        """
526        blob = self.load_blob_from_full_path(cloud_path)
527        # Download the file content as bytes
528        content_bytes = blob.download_as_bytes()
529        # Convert bytes to string
530        content_str = content_bytes.decode(encoding)
531        return content_str
532
533    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
534        """
535        Upload a file to GCS.
536
537        **Args:**
538        - destination_path (str): The destination GCS path.
539        - source_file (str): The source file path.
540        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
541        """
542        blob = self.load_blob_from_full_path(destination_path)
543        if custom_metadata:
544            blob.metadata = custom_metadata
545        blob.upload_from_filename(source_file)
546
547    def get_object_md5(
548        self,
549        file_path: str,
550        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
551        chunk_size: int = parse_size("256 KB"),
552        logging_bytes: int = parse_size("1 GB"),
553        returned_md5_format: str = "hex"
554    ) -> str:
555        """
556        Calculate the MD5 checksum of a file in GCS.
557
558        **Args:**
559        - file_path (str): The GCS path of the file.
560        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
561        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
562        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
563                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
564
565        **Returns:**
566        - str: The MD5 checksum of the file.
567
568        **Raises:**
569        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
570        or `ops_utils.gcp_utils.MD5_BASE64`
571        """
572        if returned_md5_format not in ["hex", "base64"]:
573            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
574
575        blob = self.load_blob_from_full_path(file_path)
576
577        # Create an MD5 hash object
578        md5_hash = hashlib.md5()
579
580        blob_size_str = format_size(blob.size)
581        logging.info(f"Streaming {file_path} which is {blob_size_str}")
582        # Use a BytesIO stream to collect data in chunks and upload it
583        buffer = io.BytesIO()
584        total_bytes_streamed = 0
585        # Keep track of the last logged size for data logging
586        last_logged = 0
587
588        with blob.open("rb") as source_stream:
589            while True:
590                chunk = source_stream.read(chunk_size)
591                if not chunk:
592                    break
593                md5_hash.update(chunk)
594                buffer.write(chunk)
595                total_bytes_streamed += len(chunk)
596                # Log progress every 1 gb if verbose used
597                if total_bytes_streamed - last_logged >= logging_bytes:
598                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
599                    last_logged = total_bytes_streamed
600
601        if returned_md5_format == "hex":
602            md5 = md5_hash.hexdigest()
603            logging.info(f"MD5 (hex) for {file_path}: {md5}")
604        elif returned_md5_format == "base64":
605            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
606            logging.info(f"MD5 (base64) for {file_path}: {md5}")
607        return md5
608
609    def set_acl_public_read(self, cloud_path: str) -> None:
610        """
611        Set the file in the bucket to be publicly readable.
612
613        **Args:**
614        - cloud_path (str): The GCS path of the file to be set as public readable.
615        """
616        blob = self.load_blob_from_full_path(cloud_path)
617        blob.acl.all().grant_read()
618        blob.acl.save()
619
620    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
621        """
622        Set the file in the bucket to grant OWNER permission to a specific group.
623
624        **Args:**
625        - cloud_path (str): The GCS path of the file.
626        - group_email (str): The email of the group to grant OWNER permission
627        """
628        blob = self.load_blob_from_full_path(cloud_path)
629        blob.acl.group(group_email).grant_owner()
630        blob.acl.save()
631
632    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
633        """
634        Set Cache-Control metadata for a file.
635
636        **Args:**
637        - cloud_path (str): The GCS path of the file.
638        - cache_control (str): The Cache-Control metadata to set.
639        """
640        blob = self.load_blob_from_full_path(cloud_path)
641        blob.cache_control = cache_control
642        blob.patch()
643
644    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
645        """
646        Get the most recent blob in the bucket.
647        
648        If the blob with the most recent timestamp doesn't have
649        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
650        recent file until a log with useful information is encountered. This is useful when combing through
651        GCP activity logs for Terra workspace buckets.
652
653        **Args:**
654        - bucket_name (str): The GCS bucket name.
655
656        **Returns:**
657        - Optional tuple of the blob found and the file contents from the blob
658        """
659        blobs = sorted(
660            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
661        )
662        for blob in blobs:
663            # Download the file contents as a string
664            file_contents = blob.download_as_text()
665
666            # Check if the content matches the undesired format
667            lines = file_contents.splitlines()
668            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
669                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
670                continue
671
672            # If it doesn't match the undesired format, return its content
673            logging.info(f"Found valid file: {blob.name}")
674            return blob, file_contents
675
676        logging.info("No valid files found.")
677        return None
678
679    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
680        """
681        Write content to a file in GCS.
682
683        **Args:**
684        - cloud_path (str): The GCS path of the file to write.
685        - file_contents (str): The content to write.
686        """
687        blob = self.load_blob_from_full_path(cloud_path)
688        blob.upload_from_string(file_contents)
689        logging.info(f"Successfully wrote content to {cloud_path}")
690
691    @staticmethod
692    def get_active_gcloud_account() -> str:
693        """
694        Get the active GCP email for the current account.
695
696        **Returns:**
697        - str: The active GCP account email.
698        """
699        result = subprocess.run(
700            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
701            capture_output=True,
702            text=True,
703            check=True
704        )
705        return result.stdout.strip()
706
707    
MOVE = 'move'

Variable to be used for the "action" when files should be moved.

COPY = 'copy'

Variable to be used for the "action" when files should be copied.

MD5_HEX = 'hex'

Variable to be used when the generated md5 should be of the hex type.

MD5_BASE64 = 'base64'

Variable to be used when the generated md5 should be of the base64 type.

class GCPCloudFunctions:
 28class GCPCloudFunctions:
 29    """Class to handle GCP Cloud Functions."""
 30
 31    def __init__(self, project: Optional[str] = None) -> None:
 32        """
 33        Initialize the GCPCloudFunctions class.
 34
 35        Authenticates using the default credentials and sets up the Storage Client.
 36        Uses the `project_id` if provided, otherwise utilizes the default project set.
 37
 38        **Args:**
 39        - project (str, optional): The GCP project ID
 40        """
 41        from google.cloud import storage  # type: ignore[attr-defined]
 42        from google.auth import default
 43        credentials, default_project = default()
 44        if not project:
 45            project = default_project
 46        self.client = storage.Client(credentials=credentials, project=project)
 47        """@private"""
 48
 49    @staticmethod
 50    def _process_cloud_path(cloud_path: str) -> dict:
 51        """
 52        Process a GCS cloud path into its components.
 53
 54        Args:
 55            cloud_path (str): The GCS cloud path.
 56
 57        Returns:
 58            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
 59        """
 60        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
 61        bucket_name = str.split(remaining_url, sep="/")[0]
 62        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
 63        path_components = {
 64            "platform_prefix": platform_prefix,
 65            "bucket": bucket_name,
 66            "blob_url": blob_name
 67        }
 68        return path_components
 69
 70    def load_blob_from_full_path(self, full_path: str) -> Blob:
 71        """
 72        Load a GCS blob object from a full GCS path.
 73
 74        **Args:**
 75        - full_path (str): The full GCS path.
 76
 77        **Returns:**
 78        - google.cloud.storage.blob.Blob: The GCS blob object.
 79        """
 80        file_path_components = self._process_cloud_path(full_path)
 81
 82        # Specify the billing project
 83        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
 84        blob = bucket.blob(file_path_components["blob_url"])
 85
 86        # If blob exists in GCS reload it so metadata is there
 87        if blob.exists():
 88            blob.reload()
 89        return blob
 90
 91    def check_file_exists(self, full_path: str) -> bool:
 92        """
 93        Check if a file exists in GCS.
 94
 95        **Args:**
 96        - full_path (str): The full GCS path.
 97
 98        **Returns:**
 99        - bool: `True` if the file exists, `False` otherwise.
100        """
101        blob = self.load_blob_from_full_path(full_path)
102        return blob.exists()
103
104    @staticmethod
105    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
106        """
107        Create a dictionary containing file information.
108
109        Args:
110            bucket_name (str): The name of the GCS bucket.
111            blob (Any): The GCS blob object.
112            file_name_only (bool): Whether to return only the file list.
113
114        Returns:
115            dict: A dictionary containing file information.
116        """
117        if file_name_only:
118            return {
119                "path": f"gs://{bucket_name}/{blob.name}"
120            }
121        return {
122            "name": os.path.basename(blob.name),
123            "path": f"gs://{bucket_name}/{blob.name}",
124            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
125            "file_extension": os.path.splitext(blob.name)[1],
126            "size_in_bytes": blob.size,
127            "md5_hash": blob.md5_hash
128        }
129
130    @staticmethod
131    def _validate_include_blob(
132            blob: Any,
133            bucket_name: str,
134            file_extensions_to_ignore: list[str] = [],
135            file_strings_to_ignore: list[str] = [],
136            file_extensions_to_include: list[str] = [],
137            verbose: bool = False
138    ) -> bool:
139        """
140        Validate if a blob should be included based on its file extension.
141
142        Args:
143            file_extensions_to_include (list[str]): List of file extensions to include.
144            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
145            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
146            blob (Any): The GCS blob object.
147            verbose (bool): Whether to log files not being included.
148
149        Returns:
150            bool: True if the blob should be included, False otherwise.
151        """
152        file_path = f"gs://{bucket_name}/{blob.name}"
153        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
154            if verbose:
155                logging.info(f"Skipping {file_path} as it has an extension to ignore")
156            return False
157        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
158            if verbose:
159                logging.info(f"Skipping {file_path} as it does not have an extension to include")
160            return False
161        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
162            if verbose:
163                logging.info(f"Skipping {file_path} as it has a string to ignore")
164            return False
165        return True
166
167    def list_bucket_contents(
168            self,
169            bucket_name: str,
170            prefix: Optional[str] = None,
171            file_extensions_to_ignore: list[str] = [],
172            file_strings_to_ignore: list[str] = [],
173            file_extensions_to_include: list[str] = [],
174            file_name_only: bool = False
175    ) -> list[dict]:
176        """
177        List contents of a GCS bucket and return a list of dictionaries with file information.
178
179        **Args:**
180        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
181        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
182        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
183        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
184        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
185        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
186
187        **Returns:**
188        - list[dict]: A list of dictionaries containing file information.
189        """
190        # If the bucket name starts with gs://, remove it
191        if bucket_name.startswith("gs://"):
192            bucket_name = bucket_name.split("/")[2].strip()
193
194        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
195
196        # Get the bucket object and set user_project for Requester Pays
197        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
198
199        # List blobs within the bucket
200        blobs = bucket.list_blobs(prefix=prefix)
201        logging.info("Finished listing blobs. Processing files now.")
202
203        # Create a list of dictionaries containing file information
204        file_list = [
205            self._create_bucket_contents_dict(
206                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
207            )
208            for blob in blobs
209            if self._validate_include_blob(
210                blob=blob,
211                file_extensions_to_ignore=file_extensions_to_ignore,
212                file_strings_to_ignore=file_strings_to_ignore,
213                file_extensions_to_include=file_extensions_to_include,
214                bucket_name=bucket_name
215            ) and not blob.name.endswith("/")
216        ]
217        logging.info(f"Found {len(file_list)} files in bucket")
218        return file_list
219
220    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
221        """
222        Copy a file from one GCS location to another.
223
224        **Args:**
225        - src_cloud_path (str): The source GCS path.
226        - full_destination_path (str): The destination GCS path.
227        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
228        """
229        try:
230            src_blob = self.load_blob_from_full_path(src_cloud_path)
231            dest_blob = self.load_blob_from_full_path(full_destination_path)
232
233            # Use rewrite so no timeouts
234            rewrite_token = False
235
236            while True:
237                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
238                    src_blob, token=rewrite_token
239                )
240                if verbose:
241                    logging.info(
242                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
243                    )
244                if not rewrite_token:
245                    break
246
247        except Exception as e:
248            logging.error(
249                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
250                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
251            )
252            raise
253
254    def delete_cloud_file(self, full_cloud_path: str) -> None:
255        """
256        Delete a file from GCS.
257
258        **Args:**
259        - full_cloud_path (str): The GCS path of the file to delete.
260        """
261        blob = self.load_blob_from_full_path(full_cloud_path)
262        blob.delete()
263
264    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
265        """
266        Move a file from one GCS location to another.
267
268        **Args:**
269        - src_cloud_path (str): The source GCS path.
270        - full_destination_path (str): The destination GCS path.
271        """
272        self.copy_cloud_file(src_cloud_path, full_destination_path)
273        self.delete_cloud_file(src_cloud_path)
274
275    def get_filesize(self, target_path: str) -> int:
276        """
277        Get the size of a file in GCS.
278
279        **Args:**
280        - target_path (str): The GCS path of the file.
281
282        **Returns:**
283        - int: The size of the file in bytes.
284        """
285        blob = self.load_blob_from_full_path(target_path)
286        return blob.size
287
288    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
289        """
290        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
291
292        **Args:**
293        - src_cloud_path (str): The source GCS path.
294        - dest_cloud_path (str): The destination GCS path.
295
296        **Returns:**
297        - bool: `True` if the files are identical, `False` otherwise.
298        """
299        src_blob = self.load_blob_from_full_path(src_cloud_path)
300        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
301
302        # If either blob is None or does not exist
303        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
304            return False
305        # If the MD5 hashes exist
306        if src_blob.md5_hash and dest_blob.md5_hash:
307            # And are the same return True
308            if src_blob.md5_hash == dest_blob.md5_hash:
309                return True
310        else:
311            # If md5 do not exist (for larger files they may not) check size matches
312            if src_blob.size == dest_blob.size:
313                return True
314        # Otherwise, return False
315        return False
316
317    def delete_multiple_files(
318            self,
319            files_to_delete: list[str],
320            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
321            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
322            verbose: bool = False,
323            job_complete_for_logging: int = 500
324    ) -> None:
325        """
326        Delete multiple cloud files in parallel using multi-threading.
327
328        **Args:**
329        - files_to_delete (list[str]): List of GCS paths of the files to delete.
330        - workers (int, optional): Number of worker threads. Defaults to `10`.
331        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
332        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
333        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
334        """
335        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
336
337        MultiThreadedJobs().run_multi_threaded_job(
338            workers=workers,
339            function=self.delete_cloud_file,
340            list_of_jobs_args_list=list_of_jobs_args_list,
341            max_retries=max_retries,
342            fail_on_error=True,
343            verbose=verbose,
344            collect_output=False,
345            jobs_complete_for_logging=job_complete_for_logging
346        )
347
348    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
349        """
350        Validate if source and destination files are identical.
351
352        **Args:**
353        - source_file (str): The source file path.
354        - full_destination_path (str): The destination file path.
355
356        **Returns:**
357            dict: The file dictionary of the files with a boolean indicating if they are identical.
358        """
359        if self.validate_files_are_same(source_file, full_destination_path):
360            identical = True
361        else:
362            identical = False
363        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
364
365    def loop_and_log_validation_files_multithreaded(
366            self,
367            files_to_validate: list[dict],
368            log_difference: bool,
369            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
370            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
371            job_complete_for_logging: int = 500
372    ) -> list[dict]:
373        """
374        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
375
376        **Args:**
377        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
378        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
379                                   this at the start of a copy/move operation to check if files are already copied.
380        - workers (int, optional): Number of worker threads. Defaults to `10`.
381        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
382        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
383
384        **Returns:**
385        - list[dict]: List of dictionaries containing files that are **not** identical.
386        """
387        logging.info(f"Validating if {len(files_to_validate)} files are identical")
388
389        # Prepare jobs: pass the necessary arguments to each validation
390        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
391
392        # Use multithreaded job runner to validate the files
393        checked_files = MultiThreadedJobs().run_multi_threaded_job(
394            workers=workers,
395            function=self._validate_file_pair,
396            list_of_jobs_args_list=jobs,
397            collect_output=True,
398            max_retries=max_retries,
399            jobs_complete_for_logging=job_complete_for_logging
400        )
401        # If any files failed to load, raise an exception
402        if files_to_validate and None in checked_files:  # type: ignore[operator]
403            logging.error("Failed to validate all files, could not load some blobs")
404            raise Exception("Failed to validate all files")
405
406        # Get all files that are not identical
407        not_identical_files = [
408            file_dict
409            for file_dict in checked_files  # type: ignore[operator, union-attr]
410            if not file_dict['identical']
411        ]
412        if not_identical_files:
413            if log_difference:
414                for file_dict in not_identical_files:
415                    logging.warning(
416                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
417                    )
418            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
419        return not_identical_files
420
421    def multithread_copy_of_files_with_validation(
422            self,
423            files_to_copy: list[dict],
424            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
425            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
426            skip_check_if_already_copied: bool = False
427    ) -> None:
428        """
429        Copy multiple files in parallel with validation.
430
431        **Args:**
432        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
433                Dictionary should have keys `source_file` and `full_destination_path`
434        - workers (int): Number of worker threads. Defaults to `10`.
435        - max_retries (int): Maximum number of retries. Defaults to `5`
436        - skip_check_if_already_copied (bool, optional): Whether to skip checking
437                if files are already copied and start copying right away. Defaults to `False`.
438        """
439        if skip_check_if_already_copied:
440            logging.info("Skipping check if files are already copied")
441            updated_files_to_move = files_to_copy
442        else:
443            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
444                files_to_copy,
445                log_difference=False,
446                workers=workers,
447                max_retries=max_retries
448            )
449        # If all files are already copied, return
450        if not updated_files_to_move:
451            logging.info("All files are already copied")
452            return None
453        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
454        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
455        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
456        # Validate that all files were copied successfully
457        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
458            files_to_copy,
459            workers=workers,
460            log_difference=True,
461            max_retries=max_retries
462        )
463        if files_not_moved_successfully:
464            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
465            raise Exception("Failed to copy all files")
466        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
467        return None
468
469    def move_or_copy_multiple_files(
470            self, files_to_move: list[dict],
471            action: str,
472            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
473            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
474            verbose: bool = False,
475            jobs_complete_for_logging: int = 500
476    ) -> None:
477        """
478        Move or copy multiple files in parallel.
479
480        **Args:**
481        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
482        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
483        or `ops_utils.gcp_utils.COPY`).
484        - workers (int): Number of worker threads. Defaults to `10`.
485        - max_retries (int): Maximum number of retries. Defaults to `5`.
486        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
487        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
488
489        **Raises:**
490        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
491        """
492        if action == MOVE:
493            cloud_function = self.move_cloud_file
494        elif action == COPY:
495            cloud_function = self.copy_cloud_file
496        else:
497            raise ValueError("Must either select move or copy")
498
499        list_of_jobs_args_list = [
500            [
501                file_dict['source_file'], file_dict['full_destination_path']
502            ]
503            for file_dict in files_to_move
504        ]
505        MultiThreadedJobs().run_multi_threaded_job(
506            workers=workers,
507            function=cloud_function,
508            list_of_jobs_args_list=list_of_jobs_args_list,
509            max_retries=max_retries,
510            fail_on_error=True,
511            verbose=verbose,
512            collect_output=False,
513            jobs_complete_for_logging=jobs_complete_for_logging
514        )
515
516    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
517        """
518        Read the content of a file from GCS.
519
520        **Args:**
521        - cloud_path (str): The GCS path of the file to read.
522        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
523
524        **Returns:**
525        - bytes: The content of the file as bytes.
526        """
527        blob = self.load_blob_from_full_path(cloud_path)
528        # Download the file content as bytes
529        content_bytes = blob.download_as_bytes()
530        # Convert bytes to string
531        content_str = content_bytes.decode(encoding)
532        return content_str
533
534    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
535        """
536        Upload a file to GCS.
537
538        **Args:**
539        - destination_path (str): The destination GCS path.
540        - source_file (str): The source file path.
541        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
542        """
543        blob = self.load_blob_from_full_path(destination_path)
544        if custom_metadata:
545            blob.metadata = custom_metadata
546        blob.upload_from_filename(source_file)
547
548    def get_object_md5(
549        self,
550        file_path: str,
551        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
552        chunk_size: int = parse_size("256 KB"),
553        logging_bytes: int = parse_size("1 GB"),
554        returned_md5_format: str = "hex"
555    ) -> str:
556        """
557        Calculate the MD5 checksum of a file in GCS.
558
559        **Args:**
560        - file_path (str): The GCS path of the file.
561        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
562        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
563        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
564                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
565
566        **Returns:**
567        - str: The MD5 checksum of the file.
568
569        **Raises:**
570        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
571        or `ops_utils.gcp_utils.MD5_BASE64`
572        """
573        if returned_md5_format not in ["hex", "base64"]:
574            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
575
576        blob = self.load_blob_from_full_path(file_path)
577
578        # Create an MD5 hash object
579        md5_hash = hashlib.md5()
580
581        blob_size_str = format_size(blob.size)
582        logging.info(f"Streaming {file_path} which is {blob_size_str}")
583        # Use a BytesIO stream to collect data in chunks and upload it
584        buffer = io.BytesIO()
585        total_bytes_streamed = 0
586        # Keep track of the last logged size for data logging
587        last_logged = 0
588
589        with blob.open("rb") as source_stream:
590            while True:
591                chunk = source_stream.read(chunk_size)
592                if not chunk:
593                    break
594                md5_hash.update(chunk)
595                buffer.write(chunk)
596                total_bytes_streamed += len(chunk)
597                # Log progress every 1 gb if verbose used
598                if total_bytes_streamed - last_logged >= logging_bytes:
599                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
600                    last_logged = total_bytes_streamed
601
602        if returned_md5_format == "hex":
603            md5 = md5_hash.hexdigest()
604            logging.info(f"MD5 (hex) for {file_path}: {md5}")
605        elif returned_md5_format == "base64":
606            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
607            logging.info(f"MD5 (base64) for {file_path}: {md5}")
608        return md5
609
610    def set_acl_public_read(self, cloud_path: str) -> None:
611        """
612        Set the file in the bucket to be publicly readable.
613
614        **Args:**
615        - cloud_path (str): The GCS path of the file to be set as public readable.
616        """
617        blob = self.load_blob_from_full_path(cloud_path)
618        blob.acl.all().grant_read()
619        blob.acl.save()
620
621    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
622        """
623        Set the file in the bucket to grant OWNER permission to a specific group.
624
625        **Args:**
626        - cloud_path (str): The GCS path of the file.
627        - group_email (str): The email of the group to grant OWNER permission
628        """
629        blob = self.load_blob_from_full_path(cloud_path)
630        blob.acl.group(group_email).grant_owner()
631        blob.acl.save()
632
633    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
634        """
635        Set Cache-Control metadata for a file.
636
637        **Args:**
638        - cloud_path (str): The GCS path of the file.
639        - cache_control (str): The Cache-Control metadata to set.
640        """
641        blob = self.load_blob_from_full_path(cloud_path)
642        blob.cache_control = cache_control
643        blob.patch()
644
645    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
646        """
647        Get the most recent blob in the bucket.
648        
649        If the blob with the most recent timestamp doesn't have
650        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
651        recent file until a log with useful information is encountered. This is useful when combing through
652        GCP activity logs for Terra workspace buckets.
653
654        **Args:**
655        - bucket_name (str): The GCS bucket name.
656
657        **Returns:**
658        - Optional tuple of the blob found and the file contents from the blob
659        """
660        blobs = sorted(
661            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
662        )
663        for blob in blobs:
664            # Download the file contents as a string
665            file_contents = blob.download_as_text()
666
667            # Check if the content matches the undesired format
668            lines = file_contents.splitlines()
669            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
670                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
671                continue
672
673            # If it doesn't match the undesired format, return its content
674            logging.info(f"Found valid file: {blob.name}")
675            return blob, file_contents
676
677        logging.info("No valid files found.")
678        return None
679
680    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
681        """
682        Write content to a file in GCS.
683
684        **Args:**
685        - cloud_path (str): The GCS path of the file to write.
686        - file_contents (str): The content to write.
687        """
688        blob = self.load_blob_from_full_path(cloud_path)
689        blob.upload_from_string(file_contents)
690        logging.info(f"Successfully wrote content to {cloud_path}")
691
692    @staticmethod
693    def get_active_gcloud_account() -> str:
694        """
695        Get the active GCP email for the current account.
696
697        **Returns:**
698        - str: The active GCP account email.
699        """
700        result = subprocess.run(
701            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
702            capture_output=True,
703            text=True,
704            check=True
705        )
706        return result.stdout.strip()

Class to handle GCP Cloud Functions.

GCPCloudFunctions(project: Optional[str] = None)
31    def __init__(self, project: Optional[str] = None) -> None:
32        """
33        Initialize the GCPCloudFunctions class.
34
35        Authenticates using the default credentials and sets up the Storage Client.
36        Uses the `project_id` if provided, otherwise utilizes the default project set.
37
38        **Args:**
39        - project (str, optional): The GCP project ID
40        """
41        from google.cloud import storage  # type: ignore[attr-defined]
42        from google.auth import default
43        credentials, default_project = default()
44        if not project:
45            project = default_project
46        self.client = storage.Client(credentials=credentials, project=project)
47        """@private"""

Initialize the GCPCloudFunctions class.

Authenticates using the default credentials and sets up the Storage Client. Uses the project_id if provided, otherwise utilizes the default project set.

Args:

  • project (str, optional): The GCP project ID
def load_blob_from_full_path(self, full_path: str) -> google.cloud.storage.blob.Blob:
70    def load_blob_from_full_path(self, full_path: str) -> Blob:
71        """
72        Load a GCS blob object from a full GCS path.
73
74        **Args:**
75        - full_path (str): The full GCS path.
76
77        **Returns:**
78        - google.cloud.storage.blob.Blob: The GCS blob object.
79        """
80        file_path_components = self._process_cloud_path(full_path)
81
82        # Specify the billing project
83        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
84        blob = bucket.blob(file_path_components["blob_url"])
85
86        # If blob exists in GCS reload it so metadata is there
87        if blob.exists():
88            blob.reload()
89        return blob

Load a GCS blob object from a full GCS path.

Args:

  • full_path (str): The full GCS path.

Returns:

  • google.cloud.storage.blob.Blob: The GCS blob object.
def check_file_exists(self, full_path: str) -> bool:
 91    def check_file_exists(self, full_path: str) -> bool:
 92        """
 93        Check if a file exists in GCS.
 94
 95        **Args:**
 96        - full_path (str): The full GCS path.
 97
 98        **Returns:**
 99        - bool: `True` if the file exists, `False` otherwise.
100        """
101        blob = self.load_blob_from_full_path(full_path)
102        return blob.exists()

Check if a file exists in GCS.

Args:

  • full_path (str): The full GCS path.

Returns:

  • bool: True if the file exists, False otherwise.
def list_bucket_contents( self, bucket_name: str, prefix: Optional[str] = None, file_extensions_to_ignore: list[str] = [], file_strings_to_ignore: list[str] = [], file_extensions_to_include: list[str] = [], file_name_only: bool = False) -> list[dict]:
167    def list_bucket_contents(
168            self,
169            bucket_name: str,
170            prefix: Optional[str] = None,
171            file_extensions_to_ignore: list[str] = [],
172            file_strings_to_ignore: list[str] = [],
173            file_extensions_to_include: list[str] = [],
174            file_name_only: bool = False
175    ) -> list[dict]:
176        """
177        List contents of a GCS bucket and return a list of dictionaries with file information.
178
179        **Args:**
180        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
181        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
182        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
183        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
184        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
185        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
186
187        **Returns:**
188        - list[dict]: A list of dictionaries containing file information.
189        """
190        # If the bucket name starts with gs://, remove it
191        if bucket_name.startswith("gs://"):
192            bucket_name = bucket_name.split("/")[2].strip()
193
194        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
195
196        # Get the bucket object and set user_project for Requester Pays
197        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
198
199        # List blobs within the bucket
200        blobs = bucket.list_blobs(prefix=prefix)
201        logging.info("Finished listing blobs. Processing files now.")
202
203        # Create a list of dictionaries containing file information
204        file_list = [
205            self._create_bucket_contents_dict(
206                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
207            )
208            for blob in blobs
209            if self._validate_include_blob(
210                blob=blob,
211                file_extensions_to_ignore=file_extensions_to_ignore,
212                file_strings_to_ignore=file_strings_to_ignore,
213                file_extensions_to_include=file_extensions_to_include,
214                bucket_name=bucket_name
215            ) and not blob.name.endswith("/")
216        ]
217        logging.info(f"Found {len(file_list)} files in bucket")
218        return file_list

List contents of a GCS bucket and return a list of dictionaries with file information.

Args:

  • bucket_name (str): The name of the GCS bucket. If includes gs://, it will be removed.
  • prefix (str, optional): The prefix to filter the blobs. Defaults to None.
  • file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
  • file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
  • file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
  • file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to False.

Returns:

  • list[dict]: A list of dictionaries containing file information.
def copy_cloud_file( self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
220    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
221        """
222        Copy a file from one GCS location to another.
223
224        **Args:**
225        - src_cloud_path (str): The source GCS path.
226        - full_destination_path (str): The destination GCS path.
227        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
228        """
229        try:
230            src_blob = self.load_blob_from_full_path(src_cloud_path)
231            dest_blob = self.load_blob_from_full_path(full_destination_path)
232
233            # Use rewrite so no timeouts
234            rewrite_token = False
235
236            while True:
237                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
238                    src_blob, token=rewrite_token
239                )
240                if verbose:
241                    logging.info(
242                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
243                    )
244                if not rewrite_token:
245                    break
246
247        except Exception as e:
248            logging.error(
249                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
250                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
251            )
252            raise

Copy a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
  • verbose (bool, optional): Whether to log progress. Defaults to False.
def delete_cloud_file(self, full_cloud_path: str) -> None:
254    def delete_cloud_file(self, full_cloud_path: str) -> None:
255        """
256        Delete a file from GCS.
257
258        **Args:**
259        - full_cloud_path (str): The GCS path of the file to delete.
260        """
261        blob = self.load_blob_from_full_path(full_cloud_path)
262        blob.delete()

Delete a file from GCS.

Args:

  • full_cloud_path (str): The GCS path of the file to delete.
def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
264    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
265        """
266        Move a file from one GCS location to another.
267
268        **Args:**
269        - src_cloud_path (str): The source GCS path.
270        - full_destination_path (str): The destination GCS path.
271        """
272        self.copy_cloud_file(src_cloud_path, full_destination_path)
273        self.delete_cloud_file(src_cloud_path)

Move a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
def get_filesize(self, target_path: str) -> int:
275    def get_filesize(self, target_path: str) -> int:
276        """
277        Get the size of a file in GCS.
278
279        **Args:**
280        - target_path (str): The GCS path of the file.
281
282        **Returns:**
283        - int: The size of the file in bytes.
284        """
285        blob = self.load_blob_from_full_path(target_path)
286        return blob.size

Get the size of a file in GCS.

Args:

  • target_path (str): The GCS path of the file.

Returns:

  • int: The size of the file in bytes.
def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
288    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
289        """
290        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
291
292        **Args:**
293        - src_cloud_path (str): The source GCS path.
294        - dest_cloud_path (str): The destination GCS path.
295
296        **Returns:**
297        - bool: `True` if the files are identical, `False` otherwise.
298        """
299        src_blob = self.load_blob_from_full_path(src_cloud_path)
300        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
301
302        # If either blob is None or does not exist
303        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
304            return False
305        # If the MD5 hashes exist
306        if src_blob.md5_hash and dest_blob.md5_hash:
307            # And are the same return True
308            if src_blob.md5_hash == dest_blob.md5_hash:
309                return True
310        else:
311            # If md5 do not exist (for larger files they may not) check size matches
312            if src_blob.size == dest_blob.size:
313                return True
314        # Otherwise, return False
315        return False

Validate if two cloud files (source and destination) are identical based on their MD5 hashes.

Args:

  • src_cloud_path (str): The source GCS path.
  • dest_cloud_path (str): The destination GCS path.

Returns:

  • bool: True if the files are identical, False otherwise.
def delete_multiple_files( self, files_to_delete: list[str], workers: int = 10, max_retries: int = 5, verbose: bool = False, job_complete_for_logging: int = 500) -> None:
317    def delete_multiple_files(
318            self,
319            files_to_delete: list[str],
320            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
321            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
322            verbose: bool = False,
323            job_complete_for_logging: int = 500
324    ) -> None:
325        """
326        Delete multiple cloud files in parallel using multi-threading.
327
328        **Args:**
329        - files_to_delete (list[str]): List of GCS paths of the files to delete.
330        - workers (int, optional): Number of worker threads. Defaults to `10`.
331        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
332        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
333        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
334        """
335        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
336
337        MultiThreadedJobs().run_multi_threaded_job(
338            workers=workers,
339            function=self.delete_cloud_file,
340            list_of_jobs_args_list=list_of_jobs_args_list,
341            max_retries=max_retries,
342            fail_on_error=True,
343            verbose=verbose,
344            collect_output=False,
345            jobs_complete_for_logging=job_complete_for_logging
346        )

Delete multiple cloud files in parallel using multi-threading.

Args:

  • files_to_delete (list[str]): List of GCS paths of the files to delete.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.
def loop_and_log_validation_files_multithreaded( self, files_to_validate: list[dict], log_difference: bool, workers: int = 10, max_retries: int = 5, job_complete_for_logging: int = 500) -> list[dict]:
365    def loop_and_log_validation_files_multithreaded(
366            self,
367            files_to_validate: list[dict],
368            log_difference: bool,
369            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
370            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
371            job_complete_for_logging: int = 500
372    ) -> list[dict]:
373        """
374        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
375
376        **Args:**
377        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
378        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
379                                   this at the start of a copy/move operation to check if files are already copied.
380        - workers (int, optional): Number of worker threads. Defaults to `10`.
381        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
382        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
383
384        **Returns:**
385        - list[dict]: List of dictionaries containing files that are **not** identical.
386        """
387        logging.info(f"Validating if {len(files_to_validate)} files are identical")
388
389        # Prepare jobs: pass the necessary arguments to each validation
390        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
391
392        # Use multithreaded job runner to validate the files
393        checked_files = MultiThreadedJobs().run_multi_threaded_job(
394            workers=workers,
395            function=self._validate_file_pair,
396            list_of_jobs_args_list=jobs,
397            collect_output=True,
398            max_retries=max_retries,
399            jobs_complete_for_logging=job_complete_for_logging
400        )
401        # If any files failed to load, raise an exception
402        if files_to_validate and None in checked_files:  # type: ignore[operator]
403            logging.error("Failed to validate all files, could not load some blobs")
404            raise Exception("Failed to validate all files")
405
406        # Get all files that are not identical
407        not_identical_files = [
408            file_dict
409            for file_dict in checked_files  # type: ignore[operator, union-attr]
410            if not file_dict['identical']
411        ]
412        if not_identical_files:
413            if log_difference:
414                for file_dict in not_identical_files:
415                    logging.warning(
416                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
417                    )
418            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
419        return not_identical_files

Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.

Args:

  • files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
  • log_difference (bool): Whether to log differences if files are not identical. Set False if you are running this at the start of a copy/move operation to check if files are already copied.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries for all jobs. Defaults to 5.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Returns:

  • list[dict]: List of dictionaries containing files that are not identical.
def multithread_copy_of_files_with_validation( self, files_to_copy: list[dict], workers: int = 10, max_retries: int = 5, skip_check_if_already_copied: bool = False) -> None:
421    def multithread_copy_of_files_with_validation(
422            self,
423            files_to_copy: list[dict],
424            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
425            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
426            skip_check_if_already_copied: bool = False
427    ) -> None:
428        """
429        Copy multiple files in parallel with validation.
430
431        **Args:**
432        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
433                Dictionary should have keys `source_file` and `full_destination_path`
434        - workers (int): Number of worker threads. Defaults to `10`.
435        - max_retries (int): Maximum number of retries. Defaults to `5`
436        - skip_check_if_already_copied (bool, optional): Whether to skip checking
437                if files are already copied and start copying right away. Defaults to `False`.
438        """
439        if skip_check_if_already_copied:
440            logging.info("Skipping check if files are already copied")
441            updated_files_to_move = files_to_copy
442        else:
443            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
444                files_to_copy,
445                log_difference=False,
446                workers=workers,
447                max_retries=max_retries
448            )
449        # If all files are already copied, return
450        if not updated_files_to_move:
451            logging.info("All files are already copied")
452            return None
453        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
454        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
455        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
456        # Validate that all files were copied successfully
457        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
458            files_to_copy,
459            workers=workers,
460            log_difference=True,
461            max_retries=max_retries
462        )
463        if files_not_moved_successfully:
464            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
465            raise Exception("Failed to copy all files")
466        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
467        return None

Copy multiple files in parallel with validation.

Args:

  • files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. Dictionary should have keys source_file and full_destination_path
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5
  • skip_check_if_already_copied (bool, optional): Whether to skip checking if files are already copied and start copying right away. Defaults to False.
def move_or_copy_multiple_files( self, files_to_move: list[dict], action: str, workers: int = 10, max_retries: int = 5, verbose: bool = False, jobs_complete_for_logging: int = 500) -> None:
469    def move_or_copy_multiple_files(
470            self, files_to_move: list[dict],
471            action: str,
472            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
473            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
474            verbose: bool = False,
475            jobs_complete_for_logging: int = 500
476    ) -> None:
477        """
478        Move or copy multiple files in parallel.
479
480        **Args:**
481        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
482        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
483        or `ops_utils.gcp_utils.COPY`).
484        - workers (int): Number of worker threads. Defaults to `10`.
485        - max_retries (int): Maximum number of retries. Defaults to `5`.
486        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
487        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
488
489        **Raises:**
490        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
491        """
492        if action == MOVE:
493            cloud_function = self.move_cloud_file
494        elif action == COPY:
495            cloud_function = self.copy_cloud_file
496        else:
497            raise ValueError("Must either select move or copy")
498
499        list_of_jobs_args_list = [
500            [
501                file_dict['source_file'], file_dict['full_destination_path']
502            ]
503            for file_dict in files_to_move
504        ]
505        MultiThreadedJobs().run_multi_threaded_job(
506            workers=workers,
507            function=cloud_function,
508            list_of_jobs_args_list=list_of_jobs_args_list,
509            max_retries=max_retries,
510            fail_on_error=True,
511            verbose=verbose,
512            collect_output=False,
513            jobs_complete_for_logging=jobs_complete_for_logging
514        )

Move or copy multiple files in parallel.

Args:

  • files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
  • action (str): The action to perform (should be one of ops_utils.gcp_utils.MOVE or ops_utils.gcp_utils.COPY).
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Raises:

def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
516    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
517        """
518        Read the content of a file from GCS.
519
520        **Args:**
521        - cloud_path (str): The GCS path of the file to read.
522        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
523
524        **Returns:**
525        - bytes: The content of the file as bytes.
526        """
527        blob = self.load_blob_from_full_path(cloud_path)
528        # Download the file content as bytes
529        content_bytes = blob.download_as_bytes()
530        # Convert bytes to string
531        content_str = content_bytes.decode(encoding)
532        return content_str

Read the content of a file from GCS.

Args:

  • cloud_path (str): The GCS path of the file to read.
  • encoding (str, optional): The encoding to use. Defaults to utf-8.

Returns:

  • bytes: The content of the file as bytes.
def upload_blob( self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
534    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
535        """
536        Upload a file to GCS.
537
538        **Args:**
539        - destination_path (str): The destination GCS path.
540        - source_file (str): The source file path.
541        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
542        """
543        blob = self.load_blob_from_full_path(destination_path)
544        if custom_metadata:
545            blob.metadata = custom_metadata
546        blob.upload_from_filename(source_file)

Upload a file to GCS.

Args:

  • destination_path (str): The destination GCS path.
  • source_file (str): The source file path.
  • custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
def get_object_md5( self, file_path: str, chunk_size: int = 256000, logging_bytes: int = 1000000000, returned_md5_format: str = 'hex') -> str:
548    def get_object_md5(
549        self,
550        file_path: str,
551        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
552        chunk_size: int = parse_size("256 KB"),
553        logging_bytes: int = parse_size("1 GB"),
554        returned_md5_format: str = "hex"
555    ) -> str:
556        """
557        Calculate the MD5 checksum of a file in GCS.
558
559        **Args:**
560        - file_path (str): The GCS path of the file.
561        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
562        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
563        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
564                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
565
566        **Returns:**
567        - str: The MD5 checksum of the file.
568
569        **Raises:**
570        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
571        or `ops_utils.gcp_utils.MD5_BASE64`
572        """
573        if returned_md5_format not in ["hex", "base64"]:
574            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
575
576        blob = self.load_blob_from_full_path(file_path)
577
578        # Create an MD5 hash object
579        md5_hash = hashlib.md5()
580
581        blob_size_str = format_size(blob.size)
582        logging.info(f"Streaming {file_path} which is {blob_size_str}")
583        # Use a BytesIO stream to collect data in chunks and upload it
584        buffer = io.BytesIO()
585        total_bytes_streamed = 0
586        # Keep track of the last logged size for data logging
587        last_logged = 0
588
589        with blob.open("rb") as source_stream:
590            while True:
591                chunk = source_stream.read(chunk_size)
592                if not chunk:
593                    break
594                md5_hash.update(chunk)
595                buffer.write(chunk)
596                total_bytes_streamed += len(chunk)
597                # Log progress every 1 gb if verbose used
598                if total_bytes_streamed - last_logged >= logging_bytes:
599                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
600                    last_logged = total_bytes_streamed
601
602        if returned_md5_format == "hex":
603            md5 = md5_hash.hexdigest()
604            logging.info(f"MD5 (hex) for {file_path}: {md5}")
605        elif returned_md5_format == "base64":
606            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
607            logging.info(f"MD5 (base64) for {file_path}: {md5}")
608        return md5

Calculate the MD5 checksum of a file in GCS.

Args:

  • file_path (str): The GCS path of the file.
  • chunk_size (int, optional): The size of each chunk to read. Defaults to 256 KB.
  • logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to 1 GB.
  • returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to hex. Options are ops_utils.gcp_utils.MD5_HEX or ops_utils.gcp_utils.MD5_BASE64.

Returns:

  • str: The MD5 checksum of the file.

Raises:

def set_acl_public_read(self, cloud_path: str) -> None:
610    def set_acl_public_read(self, cloud_path: str) -> None:
611        """
612        Set the file in the bucket to be publicly readable.
613
614        **Args:**
615        - cloud_path (str): The GCS path of the file to be set as public readable.
616        """
617        blob = self.load_blob_from_full_path(cloud_path)
618        blob.acl.all().grant_read()
619        blob.acl.save()

Set the file in the bucket to be publicly readable.

Args:

  • cloud_path (str): The GCS path of the file to be set as public readable.
def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
621    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
622        """
623        Set the file in the bucket to grant OWNER permission to a specific group.
624
625        **Args:**
626        - cloud_path (str): The GCS path of the file.
627        - group_email (str): The email of the group to grant OWNER permission
628        """
629        blob = self.load_blob_from_full_path(cloud_path)
630        blob.acl.group(group_email).grant_owner()
631        blob.acl.save()

Set the file in the bucket to grant OWNER permission to a specific group.

Args:

  • cloud_path (str): The GCS path of the file.
  • group_email (str): The email of the group to grant OWNER permission
def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
633    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
634        """
635        Set Cache-Control metadata for a file.
636
637        **Args:**
638        - cloud_path (str): The GCS path of the file.
639        - cache_control (str): The Cache-Control metadata to set.
640        """
641        blob = self.load_blob_from_full_path(cloud_path)
642        blob.cache_control = cache_control
643        blob.patch()

Set Cache-Control metadata for a file.

Args:

  • cloud_path (str): The GCS path of the file.
  • cache_control (str): The Cache-Control metadata to set.
def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
645    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
646        """
647        Get the most recent blob in the bucket.
648        
649        If the blob with the most recent timestamp doesn't have
650        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
651        recent file until a log with useful information is encountered. This is useful when combing through
652        GCP activity logs for Terra workspace buckets.
653
654        **Args:**
655        - bucket_name (str): The GCS bucket name.
656
657        **Returns:**
658        - Optional tuple of the blob found and the file contents from the blob
659        """
660        blobs = sorted(
661            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
662        )
663        for blob in blobs:
664            # Download the file contents as a string
665            file_contents = blob.download_as_text()
666
667            # Check if the content matches the undesired format
668            lines = file_contents.splitlines()
669            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
670                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
671                continue
672
673            # If it doesn't match the undesired format, return its content
674            logging.info(f"Found valid file: {blob.name}")
675            return blob, file_contents
676
677        logging.info("No valid files found.")
678        return None

Get the most recent blob in the bucket.

If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.

Args:

  • bucket_name (str): The GCS bucket name.

Returns:

  • Optional tuple of the blob found and the file contents from the blob
def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
680    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
681        """
682        Write content to a file in GCS.
683
684        **Args:**
685        - cloud_path (str): The GCS path of the file to write.
686        - file_contents (str): The content to write.
687        """
688        blob = self.load_blob_from_full_path(cloud_path)
689        blob.upload_from_string(file_contents)
690        logging.info(f"Successfully wrote content to {cloud_path}")

Write content to a file in GCS.

Args:

  • cloud_path (str): The GCS path of the file to write.
  • file_contents (str): The content to write.
@staticmethod
def get_active_gcloud_account() -> str:
692    @staticmethod
693    def get_active_gcloud_account() -> str:
694        """
695        Get the active GCP email for the current account.
696
697        **Returns:**
698        - str: The active GCP account email.
699        """
700        result = subprocess.run(
701            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
702            capture_output=True,
703            text=True,
704            check=True
705        )
706        return result.stdout.strip()

Get the active GCP email for the current account.

Returns:

  • str: The active GCP account email.