ops_utils.gcp_utils

Module for GCP utilities.

  1"""Module for GCP utilities."""
  2import os
  3import logging
  4import io
  5import json
  6import hashlib
  7import base64
  8import subprocess
  9
 10from humanfriendly import format_size, parse_size
 11from mimetypes import guess_type
 12from typing import Optional, Any
 13from google.cloud.storage.blob import Blob
 14from google.oauth2 import service_account
 15from google.cloud import storage
 16from google.auth import default
 17
 18from .vars import ARG_DEFAULTS
 19from .thread_pool_executor_util import MultiThreadedJobs
 20
 21MOVE = "move"
 22"""Variable to be used for the "action" when files should be moved."""
 23COPY = "copy"
 24"""Variable to be used for the "action" when files should be copied."""
 25MD5_HEX = "hex"
 26"""Variable to be used when the generated `md5` should be of the `hex` type."""
 27MD5_BASE64 = "base64"
 28"""Variable to be used when the generated `md5` should be of the `base64` type."""
 29
 30
 31class GCPCloudFunctions:
 32    """Class to handle GCP Cloud Functions."""
 33
 34    def __init__(
 35            self,
 36            project: Optional[str] = None,
 37            service_account_json: Optional[str] = None
 38    ) -> None:
 39        """
 40        Initialize the GCPCloudFunctions class.
 41
 42        Authenticates using service account JSON if provided or default credentials,
 43        and sets up the Storage Client.
 44
 45        Args:
 46            project: Optional[str] = None
 47                The GCP project ID. If not provided, will use project from service account or default.
 48            service_account_json: Optional[str] = None
 49                Path to service account JSON key file. If provided, will use these credentials.
 50        """
 51        # Initialize credentials and project
 52        credentials = None
 53        default_project = None
 54
 55        if service_account_json:
 56            credentials = service_account.Credentials.from_service_account_file(service_account_json)
 57            # Extract project from service account if not specified
 58            if not project:
 59                with open(service_account_json, 'r') as f:
 60                    sa_info = json.load(f)
 61                    project = sa_info.get('project_id')
 62        else:
 63            # Use default credentials
 64            credentials, default_project = default()
 65
 66        # Set project if not already set
 67        if not project:
 68            project = default_project
 69
 70        self.client = storage.Client(credentials=credentials, project=project)
 71        """@private"""
 72
 73    @staticmethod
 74    def _process_cloud_path(cloud_path: str) -> dict:
 75        """
 76        Process a GCS cloud path into its components.
 77
 78        Args:
 79            cloud_path (str): The GCS cloud path.
 80
 81        Returns:
 82            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
 83        """
 84        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
 85        bucket_name = str.split(remaining_url, sep="/")[0]
 86        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
 87        path_components = {
 88            "platform_prefix": platform_prefix,
 89            "bucket": bucket_name,
 90            "blob_url": blob_name
 91        }
 92        return path_components
 93
 94    def load_blob_from_full_path(self, full_path: str) -> Blob:
 95        """
 96        Load a GCS blob object from a full GCS path.
 97
 98        **Args:**
 99        - full_path (str): The full GCS path.
100
101        **Returns:**
102        - google.cloud.storage.blob.Blob: The GCS blob object.
103        """
104        file_path_components = self._process_cloud_path(full_path)
105
106        # Specify the billing project
107        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
108        blob = bucket.blob(file_path_components["blob_url"])
109
110        # If blob exists in GCS reload it so metadata is there
111        if blob.exists():
112            blob.reload()
113        return blob
114
115    def check_file_exists(self, full_path: str) -> bool:
116        """
117        Check if a file exists in GCS.
118
119        **Args:**
120        - full_path (str): The full GCS path.
121
122        **Returns:**
123        - bool: `True` if the file exists, `False` otherwise.
124        """
125        blob = self.load_blob_from_full_path(full_path)
126        return blob.exists()
127
128    @staticmethod
129    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
130        """
131        Create a dictionary containing file information.
132
133        Args:
134            bucket_name (str): The name of the GCS bucket.
135            blob (Any): The GCS blob object.
136            file_name_only (bool): Whether to return only the file list.
137
138        Returns:
139            dict: A dictionary containing file information.
140        """
141        if file_name_only:
142            return {
143                "path": f"gs://{bucket_name}/{blob.name}"
144            }
145        return {
146            "name": os.path.basename(blob.name),
147            "path": f"gs://{bucket_name}/{blob.name}",
148            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
149            "file_extension": os.path.splitext(blob.name)[1],
150            "size_in_bytes": blob.size,
151            "md5_hash": blob.md5_hash
152        }
153
154    @staticmethod
155    def _validate_include_blob(
156            blob: Any,
157            bucket_name: str,
158            file_extensions_to_ignore: list[str] = [],
159            file_strings_to_ignore: list[str] = [],
160            file_extensions_to_include: list[str] = [],
161            verbose: bool = False
162    ) -> bool:
163        """
164        Validate if a blob should be included based on its file extension.
165
166        Args:
167            file_extensions_to_include (list[str]): List of file extensions to include.
168            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
169            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
170            blob (Any): The GCS blob object.
171            verbose (bool): Whether to log files not being included.
172
173        Returns:
174            bool: True if the blob should be included, False otherwise.
175        """
176        file_path = f"gs://{bucket_name}/{blob.name}"
177        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
178            if verbose:
179                logging.info(f"Skipping {file_path} as it has an extension to ignore")
180            return False
181        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
182            if verbose:
183                logging.info(f"Skipping {file_path} as it does not have an extension to include")
184            return False
185        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
186            if verbose:
187                logging.info(f"Skipping {file_path} as it has a string to ignore")
188            return False
189        return True
190
191    def list_bucket_contents(
192            self,
193            bucket_name: str,
194            prefix: Optional[str] = None,
195            file_extensions_to_ignore: list[str] = [],
196            file_strings_to_ignore: list[str] = [],
197            file_extensions_to_include: list[str] = [],
198            file_name_only: bool = False
199    ) -> list[dict]:
200        """
201        List contents of a GCS bucket and return a list of dictionaries with file information.
202
203        **Args:**
204        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
205        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
206        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
207        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
208        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
209        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
210
211        **Returns:**
212        - list[dict]: A list of dictionaries containing file information.
213        """
214        # If the bucket name starts with gs://, remove it
215        if bucket_name.startswith("gs://"):
216            bucket_name = bucket_name.split("/")[2].strip()
217
218        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
219
220        # Get the bucket object and set user_project for Requester Pays
221        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
222
223        # List blobs within the bucket
224        blobs = bucket.list_blobs(prefix=prefix)
225        logging.info("Finished listing blobs. Processing files now.")
226
227        # Create a list of dictionaries containing file information
228        file_list = [
229            self._create_bucket_contents_dict(
230                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
231            )
232            for blob in blobs
233            if self._validate_include_blob(
234                blob=blob,
235                file_extensions_to_ignore=file_extensions_to_ignore,
236                file_strings_to_ignore=file_strings_to_ignore,
237                file_extensions_to_include=file_extensions_to_include,
238                bucket_name=bucket_name
239            ) and not blob.name.endswith("/")
240        ]
241        logging.info(f"Found {len(file_list)} files in bucket")
242        return file_list
243
244    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
245        """
246        Copy a file from one GCS location to another.
247
248        **Args:**
249        - src_cloud_path (str): The source GCS path.
250        - full_destination_path (str): The destination GCS path.
251        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
252        """
253        try:
254            src_blob = self.load_blob_from_full_path(src_cloud_path)
255            dest_blob = self.load_blob_from_full_path(full_destination_path)
256
257            # Use rewrite so no timeouts
258            rewrite_token = False
259
260            while True:
261                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
262                    src_blob, token=rewrite_token
263                )
264                if verbose:
265                    logging.info(
266                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
267                    )
268                if not rewrite_token:
269                    break
270
271        except Exception as e:
272            logging.error(
273                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
274                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
275            )
276            raise
277
278    def delete_cloud_file(self, full_cloud_path: str) -> None:
279        """
280        Delete a file from GCS.
281
282        **Args:**
283        - full_cloud_path (str): The GCS path of the file to delete.
284        """
285        blob = self.load_blob_from_full_path(full_cloud_path)
286        blob.delete()
287
288    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
289        """
290        Move a file from one GCS location to another.
291
292        **Args:**
293        - src_cloud_path (str): The source GCS path.
294        - full_destination_path (str): The destination GCS path.
295        """
296        self.copy_cloud_file(src_cloud_path, full_destination_path)
297        self.delete_cloud_file(src_cloud_path)
298
299    def get_filesize(self, target_path: str) -> int:
300        """
301        Get the size of a file in GCS.
302
303        **Args:**
304        - target_path (str): The GCS path of the file.
305
306        **Returns:**
307        - int: The size of the file in bytes.
308        """
309        blob = self.load_blob_from_full_path(target_path)
310        return blob.size
311
312    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
313        """
314        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
315
316        **Args:**
317        - src_cloud_path (str): The source GCS path.
318        - dest_cloud_path (str): The destination GCS path.
319
320        **Returns:**
321        - bool: `True` if the files are identical, `False` otherwise.
322        """
323        src_blob = self.load_blob_from_full_path(src_cloud_path)
324        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
325
326        # If either blob is None or does not exist
327        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
328            return False
329        # If the MD5 hashes exist
330        if src_blob.md5_hash and dest_blob.md5_hash:
331            # And are the same return True
332            if src_blob.md5_hash == dest_blob.md5_hash:
333                return True
334        else:
335            # If md5 do not exist (for larger files they may not) check size matches
336            if src_blob.size == dest_blob.size:
337                return True
338        # Otherwise, return False
339        return False
340
341    def delete_multiple_files(
342            self,
343            files_to_delete: list[str],
344            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
345            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
346            verbose: bool = False,
347            job_complete_for_logging: int = 500
348    ) -> None:
349        """
350        Delete multiple cloud files in parallel using multi-threading.
351
352        **Args:**
353        - files_to_delete (list[str]): List of GCS paths of the files to delete.
354        - workers (int, optional): Number of worker threads. Defaults to `10`.
355        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
356        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
357        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
358        """
359        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
360
361        MultiThreadedJobs().run_multi_threaded_job(
362            workers=workers,
363            function=self.delete_cloud_file,
364            list_of_jobs_args_list=list_of_jobs_args_list,
365            max_retries=max_retries,
366            fail_on_error=True,
367            verbose=verbose,
368            collect_output=False,
369            jobs_complete_for_logging=job_complete_for_logging
370        )
371
372    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
373        """
374        Validate if source and destination files are identical.
375
376        **Args:**
377        - source_file (str): The source file path.
378        - full_destination_path (str): The destination file path.
379
380        **Returns:**
381            dict: The file dictionary of the files with a boolean indicating if they are identical.
382        """
383        if self.validate_files_are_same(source_file, full_destination_path):
384            identical = True
385        else:
386            identical = False
387        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
388
389    def loop_and_log_validation_files_multithreaded(
390            self,
391            files_to_validate: list[dict],
392            log_difference: bool,
393            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
394            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
395            job_complete_for_logging: int = 500
396    ) -> list[dict]:
397        """
398        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
399
400        **Args:**
401        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
402        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
403                                   this at the start of a copy/move operation to check if files are already copied.
404        - workers (int, optional): Number of worker threads. Defaults to `10`.
405        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
406        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
407
408        **Returns:**
409        - list[dict]: List of dictionaries containing files that are **not** identical.
410        """
411        logging.info(f"Validating if {len(files_to_validate)} files are identical")
412
413        # Prepare jobs: pass the necessary arguments to each validation
414        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
415
416        # Use multithreaded job runner to validate the files
417        checked_files = MultiThreadedJobs().run_multi_threaded_job(
418            workers=workers,
419            function=self._validate_file_pair,
420            list_of_jobs_args_list=jobs,
421            collect_output=True,
422            max_retries=max_retries,
423            jobs_complete_for_logging=job_complete_for_logging
424        )
425        # If any files failed to load, raise an exception
426        if files_to_validate and None in checked_files:  # type: ignore[operator]
427            logging.error("Failed to validate all files, could not load some blobs")
428            raise Exception("Failed to validate all files")
429
430        # Get all files that are not identical
431        not_identical_files = [
432            file_dict
433            for file_dict in checked_files  # type: ignore[operator, union-attr]
434            if not file_dict['identical']
435        ]
436        if not_identical_files:
437            if log_difference:
438                for file_dict in not_identical_files:
439                    logging.warning(
440                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
441                    )
442            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
443        return not_identical_files
444
445    def multithread_copy_of_files_with_validation(
446            self,
447            files_to_copy: list[dict],
448            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
449            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
450            skip_check_if_already_copied: bool = False
451    ) -> None:
452        """
453        Copy multiple files in parallel with validation.
454
455        **Args:**
456        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
457                Dictionary should have keys `source_file` and `full_destination_path`
458        - workers (int): Number of worker threads. Defaults to `10`.
459        - max_retries (int): Maximum number of retries. Defaults to `5`
460        - skip_check_if_already_copied (bool, optional): Whether to skip checking
461                if files are already copied and start copying right away. Defaults to `False`.
462        """
463        if skip_check_if_already_copied:
464            logging.info("Skipping check if files are already copied")
465            updated_files_to_move = files_to_copy
466        else:
467            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
468                files_to_copy,
469                log_difference=False,
470                workers=workers,
471                max_retries=max_retries
472            )
473        # If all files are already copied, return
474        if not updated_files_to_move:
475            logging.info("All files are already copied")
476            return None
477        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
478        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
479        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
480        # Validate that all files were copied successfully
481        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
482            files_to_copy,
483            workers=workers,
484            log_difference=True,
485            max_retries=max_retries
486        )
487        if files_not_moved_successfully:
488            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
489            raise Exception("Failed to copy all files")
490        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
491        return None
492
493    def move_or_copy_multiple_files(
494            self, files_to_move: list[dict],
495            action: str,
496            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
497            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
498            verbose: bool = False,
499            jobs_complete_for_logging: int = 500
500    ) -> None:
501        """
502        Move or copy multiple files in parallel.
503
504        **Args:**
505        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
506        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
507        or `ops_utils.gcp_utils.COPY`).
508        - workers (int): Number of worker threads. Defaults to `10`.
509        - max_retries (int): Maximum number of retries. Defaults to `5`.
510        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
511        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
512
513        **Raises:**
514        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
515        """
516        if action == MOVE:
517            cloud_function = self.move_cloud_file
518        elif action == COPY:
519            cloud_function = self.copy_cloud_file
520        else:
521            raise ValueError("Must either select move or copy")
522
523        list_of_jobs_args_list = [
524            [
525                file_dict['source_file'], file_dict['full_destination_path']
526            ]
527            for file_dict in files_to_move
528        ]
529        MultiThreadedJobs().run_multi_threaded_job(
530            workers=workers,
531            function=cloud_function,
532            list_of_jobs_args_list=list_of_jobs_args_list,
533            max_retries=max_retries,
534            fail_on_error=True,
535            verbose=verbose,
536            collect_output=False,
537            jobs_complete_for_logging=jobs_complete_for_logging
538        )
539
540    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
541        """
542        Read the content of a file from GCS.
543
544        **Args:**
545        - cloud_path (str): The GCS path of the file to read.
546        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
547
548        **Returns:**
549        - bytes: The content of the file as bytes.
550        """
551        blob = self.load_blob_from_full_path(cloud_path)
552        # Download the file content as bytes
553        content_bytes = blob.download_as_bytes()
554        # Convert bytes to string
555        content_str = content_bytes.decode(encoding)
556        return content_str
557
558    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
559        """
560        Upload a file to GCS.
561
562        **Args:**
563        - destination_path (str): The destination GCS path.
564        - source_file (str): The source file path.
565        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
566        """
567        blob = self.load_blob_from_full_path(destination_path)
568        if custom_metadata:
569            blob.metadata = custom_metadata
570        blob.upload_from_filename(source_file)
571
572    def get_object_md5(
573        self,
574        file_path: str,
575        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
576        chunk_size: int = parse_size("256 KB"),
577        logging_bytes: int = parse_size("1 GB"),
578        returned_md5_format: str = "hex"
579    ) -> str:
580        """
581        Calculate the MD5 checksum of a file in GCS.
582
583        **Args:**
584        - file_path (str): The GCS path of the file.
585        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
586        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
587        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
588                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
589
590        **Returns:**
591        - str: The MD5 checksum of the file.
592
593        **Raises:**
594        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
595        or `ops_utils.gcp_utils.MD5_BASE64`
596        """
597        if returned_md5_format not in ["hex", "base64"]:
598            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
599
600        blob = self.load_blob_from_full_path(file_path)
601
602        # Create an MD5 hash object
603        md5_hash = hashlib.md5()
604
605        blob_size_str = format_size(blob.size)
606        logging.info(f"Streaming {file_path} which is {blob_size_str}")
607        # Use a BytesIO stream to collect data in chunks and upload it
608        buffer = io.BytesIO()
609        total_bytes_streamed = 0
610        # Keep track of the last logged size for data logging
611        last_logged = 0
612
613        with blob.open("rb") as source_stream:
614            while True:
615                chunk = source_stream.read(chunk_size)
616                if not chunk:
617                    break
618                md5_hash.update(chunk)
619                buffer.write(chunk)
620                total_bytes_streamed += len(chunk)
621                # Log progress every 1 gb if verbose used
622                if total_bytes_streamed - last_logged >= logging_bytes:
623                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
624                    last_logged = total_bytes_streamed
625
626        if returned_md5_format == "hex":
627            md5 = md5_hash.hexdigest()
628            logging.info(f"MD5 (hex) for {file_path}: {md5}")
629        elif returned_md5_format == "base64":
630            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
631            logging.info(f"MD5 (base64) for {file_path}: {md5}")
632        return md5
633
634    def set_acl_public_read(self, cloud_path: str) -> None:
635        """
636        Set the file in the bucket to be publicly readable.
637
638        **Args:**
639        - cloud_path (str): The GCS path of the file to be set as public readable.
640        """
641        blob = self.load_blob_from_full_path(cloud_path)
642        blob.acl.all().grant_read()
643        blob.acl.save()
644
645    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
646        """
647        Set the file in the bucket to grant OWNER permission to a specific group.
648
649        **Args:**
650        - cloud_path (str): The GCS path of the file.
651        - group_email (str): The email of the group to grant OWNER permission
652        """
653        blob = self.load_blob_from_full_path(cloud_path)
654        blob.acl.group(group_email).grant_owner()
655        blob.acl.save()
656
657    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
658        """
659        Set Cache-Control metadata for a file.
660
661        **Args:**
662        - cloud_path (str): The GCS path of the file.
663        - cache_control (str): The Cache-Control metadata to set.
664        """
665        blob = self.load_blob_from_full_path(cloud_path)
666        blob.cache_control = cache_control
667        blob.patch()
668
669    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
670        """
671        Get the most recent blob in the bucket.
672        
673        If the blob with the most recent timestamp doesn't have
674        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
675        recent file until a log with useful information is encountered. This is useful when combing through
676        GCP activity logs for Terra workspace buckets.
677
678        **Args:**
679        - bucket_name (str): The GCS bucket name.
680
681        **Returns:**
682        - Optional tuple of the blob found and the file contents from the blob
683        """
684        blobs = sorted(
685            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
686        )
687        for blob in blobs:
688            # Download the file contents as a string
689            file_contents = blob.download_as_text()
690
691            # Check if the content matches the undesired format
692            lines = file_contents.splitlines()
693            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
694                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
695                continue
696
697            # If it doesn't match the undesired format, return its content
698            logging.info(f"Found valid file: {blob.name}")
699            return blob, file_contents
700
701        logging.info("No valid files found.")
702        return None
703
704    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
705        """
706        Write content to a file in GCS.
707
708        **Args:**
709        - cloud_path (str): The GCS path of the file to write.
710        - file_contents (str): The content to write.
711        """
712        blob = self.load_blob_from_full_path(cloud_path)
713        blob.upload_from_string(file_contents)
714        logging.info(f"Successfully wrote content to {cloud_path}")
715
716    @staticmethod
717    def get_active_gcloud_account() -> str:
718        """
719        Get the active GCP email for the current account.
720
721        **Returns:**
722        - str: The active GCP account email.
723        """
724        result = subprocess.run(
725            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
726            capture_output=True,
727            text=True,
728            check=True
729        )
730        return result.stdout.strip()
731
732    
MOVE = 'move'

Variable to be used for the "action" when files should be moved.

COPY = 'copy'

Variable to be used for the "action" when files should be copied.

MD5_HEX = 'hex'

Variable to be used when the generated md5 should be of the hex type.

MD5_BASE64 = 'base64'

Variable to be used when the generated md5 should be of the base64 type.

class GCPCloudFunctions:
 32class GCPCloudFunctions:
 33    """Class to handle GCP Cloud Functions."""
 34
 35    def __init__(
 36            self,
 37            project: Optional[str] = None,
 38            service_account_json: Optional[str] = None
 39    ) -> None:
 40        """
 41        Initialize the GCPCloudFunctions class.
 42
 43        Authenticates using service account JSON if provided or default credentials,
 44        and sets up the Storage Client.
 45
 46        Args:
 47            project: Optional[str] = None
 48                The GCP project ID. If not provided, will use project from service account or default.
 49            service_account_json: Optional[str] = None
 50                Path to service account JSON key file. If provided, will use these credentials.
 51        """
 52        # Initialize credentials and project
 53        credentials = None
 54        default_project = None
 55
 56        if service_account_json:
 57            credentials = service_account.Credentials.from_service_account_file(service_account_json)
 58            # Extract project from service account if not specified
 59            if not project:
 60                with open(service_account_json, 'r') as f:
 61                    sa_info = json.load(f)
 62                    project = sa_info.get('project_id')
 63        else:
 64            # Use default credentials
 65            credentials, default_project = default()
 66
 67        # Set project if not already set
 68        if not project:
 69            project = default_project
 70
 71        self.client = storage.Client(credentials=credentials, project=project)
 72        """@private"""
 73
 74    @staticmethod
 75    def _process_cloud_path(cloud_path: str) -> dict:
 76        """
 77        Process a GCS cloud path into its components.
 78
 79        Args:
 80            cloud_path (str): The GCS cloud path.
 81
 82        Returns:
 83            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
 84        """
 85        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
 86        bucket_name = str.split(remaining_url, sep="/")[0]
 87        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
 88        path_components = {
 89            "platform_prefix": platform_prefix,
 90            "bucket": bucket_name,
 91            "blob_url": blob_name
 92        }
 93        return path_components
 94
 95    def load_blob_from_full_path(self, full_path: str) -> Blob:
 96        """
 97        Load a GCS blob object from a full GCS path.
 98
 99        **Args:**
100        - full_path (str): The full GCS path.
101
102        **Returns:**
103        - google.cloud.storage.blob.Blob: The GCS blob object.
104        """
105        file_path_components = self._process_cloud_path(full_path)
106
107        # Specify the billing project
108        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
109        blob = bucket.blob(file_path_components["blob_url"])
110
111        # If blob exists in GCS reload it so metadata is there
112        if blob.exists():
113            blob.reload()
114        return blob
115
116    def check_file_exists(self, full_path: str) -> bool:
117        """
118        Check if a file exists in GCS.
119
120        **Args:**
121        - full_path (str): The full GCS path.
122
123        **Returns:**
124        - bool: `True` if the file exists, `False` otherwise.
125        """
126        blob = self.load_blob_from_full_path(full_path)
127        return blob.exists()
128
129    @staticmethod
130    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
131        """
132        Create a dictionary containing file information.
133
134        Args:
135            bucket_name (str): The name of the GCS bucket.
136            blob (Any): The GCS blob object.
137            file_name_only (bool): Whether to return only the file list.
138
139        Returns:
140            dict: A dictionary containing file information.
141        """
142        if file_name_only:
143            return {
144                "path": f"gs://{bucket_name}/{blob.name}"
145            }
146        return {
147            "name": os.path.basename(blob.name),
148            "path": f"gs://{bucket_name}/{blob.name}",
149            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
150            "file_extension": os.path.splitext(blob.name)[1],
151            "size_in_bytes": blob.size,
152            "md5_hash": blob.md5_hash
153        }
154
155    @staticmethod
156    def _validate_include_blob(
157            blob: Any,
158            bucket_name: str,
159            file_extensions_to_ignore: list[str] = [],
160            file_strings_to_ignore: list[str] = [],
161            file_extensions_to_include: list[str] = [],
162            verbose: bool = False
163    ) -> bool:
164        """
165        Validate if a blob should be included based on its file extension.
166
167        Args:
168            file_extensions_to_include (list[str]): List of file extensions to include.
169            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
170            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
171            blob (Any): The GCS blob object.
172            verbose (bool): Whether to log files not being included.
173
174        Returns:
175            bool: True if the blob should be included, False otherwise.
176        """
177        file_path = f"gs://{bucket_name}/{blob.name}"
178        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
179            if verbose:
180                logging.info(f"Skipping {file_path} as it has an extension to ignore")
181            return False
182        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
183            if verbose:
184                logging.info(f"Skipping {file_path} as it does not have an extension to include")
185            return False
186        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
187            if verbose:
188                logging.info(f"Skipping {file_path} as it has a string to ignore")
189            return False
190        return True
191
192    def list_bucket_contents(
193            self,
194            bucket_name: str,
195            prefix: Optional[str] = None,
196            file_extensions_to_ignore: list[str] = [],
197            file_strings_to_ignore: list[str] = [],
198            file_extensions_to_include: list[str] = [],
199            file_name_only: bool = False
200    ) -> list[dict]:
201        """
202        List contents of a GCS bucket and return a list of dictionaries with file information.
203
204        **Args:**
205        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
206        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
207        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
208        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
209        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
210        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
211
212        **Returns:**
213        - list[dict]: A list of dictionaries containing file information.
214        """
215        # If the bucket name starts with gs://, remove it
216        if bucket_name.startswith("gs://"):
217            bucket_name = bucket_name.split("/")[2].strip()
218
219        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
220
221        # Get the bucket object and set user_project for Requester Pays
222        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
223
224        # List blobs within the bucket
225        blobs = bucket.list_blobs(prefix=prefix)
226        logging.info("Finished listing blobs. Processing files now.")
227
228        # Create a list of dictionaries containing file information
229        file_list = [
230            self._create_bucket_contents_dict(
231                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
232            )
233            for blob in blobs
234            if self._validate_include_blob(
235                blob=blob,
236                file_extensions_to_ignore=file_extensions_to_ignore,
237                file_strings_to_ignore=file_strings_to_ignore,
238                file_extensions_to_include=file_extensions_to_include,
239                bucket_name=bucket_name
240            ) and not blob.name.endswith("/")
241        ]
242        logging.info(f"Found {len(file_list)} files in bucket")
243        return file_list
244
245    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
246        """
247        Copy a file from one GCS location to another.
248
249        **Args:**
250        - src_cloud_path (str): The source GCS path.
251        - full_destination_path (str): The destination GCS path.
252        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
253        """
254        try:
255            src_blob = self.load_blob_from_full_path(src_cloud_path)
256            dest_blob = self.load_blob_from_full_path(full_destination_path)
257
258            # Use rewrite so no timeouts
259            rewrite_token = False
260
261            while True:
262                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
263                    src_blob, token=rewrite_token
264                )
265                if verbose:
266                    logging.info(
267                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
268                    )
269                if not rewrite_token:
270                    break
271
272        except Exception as e:
273            logging.error(
274                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
275                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
276            )
277            raise
278
279    def delete_cloud_file(self, full_cloud_path: str) -> None:
280        """
281        Delete a file from GCS.
282
283        **Args:**
284        - full_cloud_path (str): The GCS path of the file to delete.
285        """
286        blob = self.load_blob_from_full_path(full_cloud_path)
287        blob.delete()
288
289    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
290        """
291        Move a file from one GCS location to another.
292
293        **Args:**
294        - src_cloud_path (str): The source GCS path.
295        - full_destination_path (str): The destination GCS path.
296        """
297        self.copy_cloud_file(src_cloud_path, full_destination_path)
298        self.delete_cloud_file(src_cloud_path)
299
300    def get_filesize(self, target_path: str) -> int:
301        """
302        Get the size of a file in GCS.
303
304        **Args:**
305        - target_path (str): The GCS path of the file.
306
307        **Returns:**
308        - int: The size of the file in bytes.
309        """
310        blob = self.load_blob_from_full_path(target_path)
311        return blob.size
312
313    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
314        """
315        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
316
317        **Args:**
318        - src_cloud_path (str): The source GCS path.
319        - dest_cloud_path (str): The destination GCS path.
320
321        **Returns:**
322        - bool: `True` if the files are identical, `False` otherwise.
323        """
324        src_blob = self.load_blob_from_full_path(src_cloud_path)
325        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
326
327        # If either blob is None or does not exist
328        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
329            return False
330        # If the MD5 hashes exist
331        if src_blob.md5_hash and dest_blob.md5_hash:
332            # And are the same return True
333            if src_blob.md5_hash == dest_blob.md5_hash:
334                return True
335        else:
336            # If md5 do not exist (for larger files they may not) check size matches
337            if src_blob.size == dest_blob.size:
338                return True
339        # Otherwise, return False
340        return False
341
342    def delete_multiple_files(
343            self,
344            files_to_delete: list[str],
345            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
346            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
347            verbose: bool = False,
348            job_complete_for_logging: int = 500
349    ) -> None:
350        """
351        Delete multiple cloud files in parallel using multi-threading.
352
353        **Args:**
354        - files_to_delete (list[str]): List of GCS paths of the files to delete.
355        - workers (int, optional): Number of worker threads. Defaults to `10`.
356        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
357        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
358        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
359        """
360        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
361
362        MultiThreadedJobs().run_multi_threaded_job(
363            workers=workers,
364            function=self.delete_cloud_file,
365            list_of_jobs_args_list=list_of_jobs_args_list,
366            max_retries=max_retries,
367            fail_on_error=True,
368            verbose=verbose,
369            collect_output=False,
370            jobs_complete_for_logging=job_complete_for_logging
371        )
372
373    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
374        """
375        Validate if source and destination files are identical.
376
377        **Args:**
378        - source_file (str): The source file path.
379        - full_destination_path (str): The destination file path.
380
381        **Returns:**
382            dict: The file dictionary of the files with a boolean indicating if they are identical.
383        """
384        if self.validate_files_are_same(source_file, full_destination_path):
385            identical = True
386        else:
387            identical = False
388        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
389
390    def loop_and_log_validation_files_multithreaded(
391            self,
392            files_to_validate: list[dict],
393            log_difference: bool,
394            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
395            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
396            job_complete_for_logging: int = 500
397    ) -> list[dict]:
398        """
399        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
400
401        **Args:**
402        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
403        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
404                                   this at the start of a copy/move operation to check if files are already copied.
405        - workers (int, optional): Number of worker threads. Defaults to `10`.
406        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
407        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
408
409        **Returns:**
410        - list[dict]: List of dictionaries containing files that are **not** identical.
411        """
412        logging.info(f"Validating if {len(files_to_validate)} files are identical")
413
414        # Prepare jobs: pass the necessary arguments to each validation
415        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
416
417        # Use multithreaded job runner to validate the files
418        checked_files = MultiThreadedJobs().run_multi_threaded_job(
419            workers=workers,
420            function=self._validate_file_pair,
421            list_of_jobs_args_list=jobs,
422            collect_output=True,
423            max_retries=max_retries,
424            jobs_complete_for_logging=job_complete_for_logging
425        )
426        # If any files failed to load, raise an exception
427        if files_to_validate and None in checked_files:  # type: ignore[operator]
428            logging.error("Failed to validate all files, could not load some blobs")
429            raise Exception("Failed to validate all files")
430
431        # Get all files that are not identical
432        not_identical_files = [
433            file_dict
434            for file_dict in checked_files  # type: ignore[operator, union-attr]
435            if not file_dict['identical']
436        ]
437        if not_identical_files:
438            if log_difference:
439                for file_dict in not_identical_files:
440                    logging.warning(
441                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
442                    )
443            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
444        return not_identical_files
445
446    def multithread_copy_of_files_with_validation(
447            self,
448            files_to_copy: list[dict],
449            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
450            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
451            skip_check_if_already_copied: bool = False
452    ) -> None:
453        """
454        Copy multiple files in parallel with validation.
455
456        **Args:**
457        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
458                Dictionary should have keys `source_file` and `full_destination_path`
459        - workers (int): Number of worker threads. Defaults to `10`.
460        - max_retries (int): Maximum number of retries. Defaults to `5`
461        - skip_check_if_already_copied (bool, optional): Whether to skip checking
462                if files are already copied and start copying right away. Defaults to `False`.
463        """
464        if skip_check_if_already_copied:
465            logging.info("Skipping check if files are already copied")
466            updated_files_to_move = files_to_copy
467        else:
468            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
469                files_to_copy,
470                log_difference=False,
471                workers=workers,
472                max_retries=max_retries
473            )
474        # If all files are already copied, return
475        if not updated_files_to_move:
476            logging.info("All files are already copied")
477            return None
478        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
479        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
480        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
481        # Validate that all files were copied successfully
482        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
483            files_to_copy,
484            workers=workers,
485            log_difference=True,
486            max_retries=max_retries
487        )
488        if files_not_moved_successfully:
489            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
490            raise Exception("Failed to copy all files")
491        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
492        return None
493
494    def move_or_copy_multiple_files(
495            self, files_to_move: list[dict],
496            action: str,
497            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
498            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
499            verbose: bool = False,
500            jobs_complete_for_logging: int = 500
501    ) -> None:
502        """
503        Move or copy multiple files in parallel.
504
505        **Args:**
506        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
507        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
508        or `ops_utils.gcp_utils.COPY`).
509        - workers (int): Number of worker threads. Defaults to `10`.
510        - max_retries (int): Maximum number of retries. Defaults to `5`.
511        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
512        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
513
514        **Raises:**
515        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
516        """
517        if action == MOVE:
518            cloud_function = self.move_cloud_file
519        elif action == COPY:
520            cloud_function = self.copy_cloud_file
521        else:
522            raise ValueError("Must either select move or copy")
523
524        list_of_jobs_args_list = [
525            [
526                file_dict['source_file'], file_dict['full_destination_path']
527            ]
528            for file_dict in files_to_move
529        ]
530        MultiThreadedJobs().run_multi_threaded_job(
531            workers=workers,
532            function=cloud_function,
533            list_of_jobs_args_list=list_of_jobs_args_list,
534            max_retries=max_retries,
535            fail_on_error=True,
536            verbose=verbose,
537            collect_output=False,
538            jobs_complete_for_logging=jobs_complete_for_logging
539        )
540
541    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
542        """
543        Read the content of a file from GCS.
544
545        **Args:**
546        - cloud_path (str): The GCS path of the file to read.
547        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
548
549        **Returns:**
550        - bytes: The content of the file as bytes.
551        """
552        blob = self.load_blob_from_full_path(cloud_path)
553        # Download the file content as bytes
554        content_bytes = blob.download_as_bytes()
555        # Convert bytes to string
556        content_str = content_bytes.decode(encoding)
557        return content_str
558
559    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
560        """
561        Upload a file to GCS.
562
563        **Args:**
564        - destination_path (str): The destination GCS path.
565        - source_file (str): The source file path.
566        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
567        """
568        blob = self.load_blob_from_full_path(destination_path)
569        if custom_metadata:
570            blob.metadata = custom_metadata
571        blob.upload_from_filename(source_file)
572
573    def get_object_md5(
574        self,
575        file_path: str,
576        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
577        chunk_size: int = parse_size("256 KB"),
578        logging_bytes: int = parse_size("1 GB"),
579        returned_md5_format: str = "hex"
580    ) -> str:
581        """
582        Calculate the MD5 checksum of a file in GCS.
583
584        **Args:**
585        - file_path (str): The GCS path of the file.
586        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
587        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
588        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
589                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
590
591        **Returns:**
592        - str: The MD5 checksum of the file.
593
594        **Raises:**
595        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
596        or `ops_utils.gcp_utils.MD5_BASE64`
597        """
598        if returned_md5_format not in ["hex", "base64"]:
599            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
600
601        blob = self.load_blob_from_full_path(file_path)
602
603        # Create an MD5 hash object
604        md5_hash = hashlib.md5()
605
606        blob_size_str = format_size(blob.size)
607        logging.info(f"Streaming {file_path} which is {blob_size_str}")
608        # Use a BytesIO stream to collect data in chunks and upload it
609        buffer = io.BytesIO()
610        total_bytes_streamed = 0
611        # Keep track of the last logged size for data logging
612        last_logged = 0
613
614        with blob.open("rb") as source_stream:
615            while True:
616                chunk = source_stream.read(chunk_size)
617                if not chunk:
618                    break
619                md5_hash.update(chunk)
620                buffer.write(chunk)
621                total_bytes_streamed += len(chunk)
622                # Log progress every 1 gb if verbose used
623                if total_bytes_streamed - last_logged >= logging_bytes:
624                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
625                    last_logged = total_bytes_streamed
626
627        if returned_md5_format == "hex":
628            md5 = md5_hash.hexdigest()
629            logging.info(f"MD5 (hex) for {file_path}: {md5}")
630        elif returned_md5_format == "base64":
631            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
632            logging.info(f"MD5 (base64) for {file_path}: {md5}")
633        return md5
634
635    def set_acl_public_read(self, cloud_path: str) -> None:
636        """
637        Set the file in the bucket to be publicly readable.
638
639        **Args:**
640        - cloud_path (str): The GCS path of the file to be set as public readable.
641        """
642        blob = self.load_blob_from_full_path(cloud_path)
643        blob.acl.all().grant_read()
644        blob.acl.save()
645
646    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
647        """
648        Set the file in the bucket to grant OWNER permission to a specific group.
649
650        **Args:**
651        - cloud_path (str): The GCS path of the file.
652        - group_email (str): The email of the group to grant OWNER permission
653        """
654        blob = self.load_blob_from_full_path(cloud_path)
655        blob.acl.group(group_email).grant_owner()
656        blob.acl.save()
657
658    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
659        """
660        Set Cache-Control metadata for a file.
661
662        **Args:**
663        - cloud_path (str): The GCS path of the file.
664        - cache_control (str): The Cache-Control metadata to set.
665        """
666        blob = self.load_blob_from_full_path(cloud_path)
667        blob.cache_control = cache_control
668        blob.patch()
669
670    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
671        """
672        Get the most recent blob in the bucket.
673        
674        If the blob with the most recent timestamp doesn't have
675        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
676        recent file until a log with useful information is encountered. This is useful when combing through
677        GCP activity logs for Terra workspace buckets.
678
679        **Args:**
680        - bucket_name (str): The GCS bucket name.
681
682        **Returns:**
683        - Optional tuple of the blob found and the file contents from the blob
684        """
685        blobs = sorted(
686            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
687        )
688        for blob in blobs:
689            # Download the file contents as a string
690            file_contents = blob.download_as_text()
691
692            # Check if the content matches the undesired format
693            lines = file_contents.splitlines()
694            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
695                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
696                continue
697
698            # If it doesn't match the undesired format, return its content
699            logging.info(f"Found valid file: {blob.name}")
700            return blob, file_contents
701
702        logging.info("No valid files found.")
703        return None
704
705    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
706        """
707        Write content to a file in GCS.
708
709        **Args:**
710        - cloud_path (str): The GCS path of the file to write.
711        - file_contents (str): The content to write.
712        """
713        blob = self.load_blob_from_full_path(cloud_path)
714        blob.upload_from_string(file_contents)
715        logging.info(f"Successfully wrote content to {cloud_path}")
716
717    @staticmethod
718    def get_active_gcloud_account() -> str:
719        """
720        Get the active GCP email for the current account.
721
722        **Returns:**
723        - str: The active GCP account email.
724        """
725        result = subprocess.run(
726            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
727            capture_output=True,
728            text=True,
729            check=True
730        )
731        return result.stdout.strip()

Class to handle GCP Cloud Functions.

GCPCloudFunctions( project: Optional[str] = None, service_account_json: Optional[str] = None)
35    def __init__(
36            self,
37            project: Optional[str] = None,
38            service_account_json: Optional[str] = None
39    ) -> None:
40        """
41        Initialize the GCPCloudFunctions class.
42
43        Authenticates using service account JSON if provided or default credentials,
44        and sets up the Storage Client.
45
46        Args:
47            project: Optional[str] = None
48                The GCP project ID. If not provided, will use project from service account or default.
49            service_account_json: Optional[str] = None
50                Path to service account JSON key file. If provided, will use these credentials.
51        """
52        # Initialize credentials and project
53        credentials = None
54        default_project = None
55
56        if service_account_json:
57            credentials = service_account.Credentials.from_service_account_file(service_account_json)
58            # Extract project from service account if not specified
59            if not project:
60                with open(service_account_json, 'r') as f:
61                    sa_info = json.load(f)
62                    project = sa_info.get('project_id')
63        else:
64            # Use default credentials
65            credentials, default_project = default()
66
67        # Set project if not already set
68        if not project:
69            project = default_project
70
71        self.client = storage.Client(credentials=credentials, project=project)
72        """@private"""

Initialize the GCPCloudFunctions class.

Authenticates using service account JSON if provided or default credentials, and sets up the Storage Client.

Args: project: Optional[str] = None The GCP project ID. If not provided, will use project from service account or default. service_account_json: Optional[str] = None Path to service account JSON key file. If provided, will use these credentials.

def load_blob_from_full_path(self, full_path: str) -> google.cloud.storage.blob.Blob:
 95    def load_blob_from_full_path(self, full_path: str) -> Blob:
 96        """
 97        Load a GCS blob object from a full GCS path.
 98
 99        **Args:**
100        - full_path (str): The full GCS path.
101
102        **Returns:**
103        - google.cloud.storage.blob.Blob: The GCS blob object.
104        """
105        file_path_components = self._process_cloud_path(full_path)
106
107        # Specify the billing project
108        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
109        blob = bucket.blob(file_path_components["blob_url"])
110
111        # If blob exists in GCS reload it so metadata is there
112        if blob.exists():
113            blob.reload()
114        return blob

Load a GCS blob object from a full GCS path.

Args:

  • full_path (str): The full GCS path.

Returns:

  • google.cloud.storage.blob.Blob: The GCS blob object.
def check_file_exists(self, full_path: str) -> bool:
116    def check_file_exists(self, full_path: str) -> bool:
117        """
118        Check if a file exists in GCS.
119
120        **Args:**
121        - full_path (str): The full GCS path.
122
123        **Returns:**
124        - bool: `True` if the file exists, `False` otherwise.
125        """
126        blob = self.load_blob_from_full_path(full_path)
127        return blob.exists()

Check if a file exists in GCS.

Args:

  • full_path (str): The full GCS path.

Returns:

  • bool: True if the file exists, False otherwise.
def list_bucket_contents( self, bucket_name: str, prefix: Optional[str] = None, file_extensions_to_ignore: list[str] = [], file_strings_to_ignore: list[str] = [], file_extensions_to_include: list[str] = [], file_name_only: bool = False) -> list[dict]:
192    def list_bucket_contents(
193            self,
194            bucket_name: str,
195            prefix: Optional[str] = None,
196            file_extensions_to_ignore: list[str] = [],
197            file_strings_to_ignore: list[str] = [],
198            file_extensions_to_include: list[str] = [],
199            file_name_only: bool = False
200    ) -> list[dict]:
201        """
202        List contents of a GCS bucket and return a list of dictionaries with file information.
203
204        **Args:**
205        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
206        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
207        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
208        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
209        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
210        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
211
212        **Returns:**
213        - list[dict]: A list of dictionaries containing file information.
214        """
215        # If the bucket name starts with gs://, remove it
216        if bucket_name.startswith("gs://"):
217            bucket_name = bucket_name.split("/")[2].strip()
218
219        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
220
221        # Get the bucket object and set user_project for Requester Pays
222        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
223
224        # List blobs within the bucket
225        blobs = bucket.list_blobs(prefix=prefix)
226        logging.info("Finished listing blobs. Processing files now.")
227
228        # Create a list of dictionaries containing file information
229        file_list = [
230            self._create_bucket_contents_dict(
231                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
232            )
233            for blob in blobs
234            if self._validate_include_blob(
235                blob=blob,
236                file_extensions_to_ignore=file_extensions_to_ignore,
237                file_strings_to_ignore=file_strings_to_ignore,
238                file_extensions_to_include=file_extensions_to_include,
239                bucket_name=bucket_name
240            ) and not blob.name.endswith("/")
241        ]
242        logging.info(f"Found {len(file_list)} files in bucket")
243        return file_list

List contents of a GCS bucket and return a list of dictionaries with file information.

Args:

  • bucket_name (str): The name of the GCS bucket. If includes gs://, it will be removed.
  • prefix (str, optional): The prefix to filter the blobs. Defaults to None.
  • file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
  • file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
  • file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
  • file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to False.

Returns:

  • list[dict]: A list of dictionaries containing file information.
def copy_cloud_file( self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
245    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
246        """
247        Copy a file from one GCS location to another.
248
249        **Args:**
250        - src_cloud_path (str): The source GCS path.
251        - full_destination_path (str): The destination GCS path.
252        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
253        """
254        try:
255            src_blob = self.load_blob_from_full_path(src_cloud_path)
256            dest_blob = self.load_blob_from_full_path(full_destination_path)
257
258            # Use rewrite so no timeouts
259            rewrite_token = False
260
261            while True:
262                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
263                    src_blob, token=rewrite_token
264                )
265                if verbose:
266                    logging.info(
267                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
268                    )
269                if not rewrite_token:
270                    break
271
272        except Exception as e:
273            logging.error(
274                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
275                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
276            )
277            raise

Copy a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
  • verbose (bool, optional): Whether to log progress. Defaults to False.
def delete_cloud_file(self, full_cloud_path: str) -> None:
279    def delete_cloud_file(self, full_cloud_path: str) -> None:
280        """
281        Delete a file from GCS.
282
283        **Args:**
284        - full_cloud_path (str): The GCS path of the file to delete.
285        """
286        blob = self.load_blob_from_full_path(full_cloud_path)
287        blob.delete()

Delete a file from GCS.

Args:

  • full_cloud_path (str): The GCS path of the file to delete.
def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
289    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
290        """
291        Move a file from one GCS location to another.
292
293        **Args:**
294        - src_cloud_path (str): The source GCS path.
295        - full_destination_path (str): The destination GCS path.
296        """
297        self.copy_cloud_file(src_cloud_path, full_destination_path)
298        self.delete_cloud_file(src_cloud_path)

Move a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
def get_filesize(self, target_path: str) -> int:
300    def get_filesize(self, target_path: str) -> int:
301        """
302        Get the size of a file in GCS.
303
304        **Args:**
305        - target_path (str): The GCS path of the file.
306
307        **Returns:**
308        - int: The size of the file in bytes.
309        """
310        blob = self.load_blob_from_full_path(target_path)
311        return blob.size

Get the size of a file in GCS.

Args:

  • target_path (str): The GCS path of the file.

Returns:

  • int: The size of the file in bytes.
def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
313    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
314        """
315        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
316
317        **Args:**
318        - src_cloud_path (str): The source GCS path.
319        - dest_cloud_path (str): The destination GCS path.
320
321        **Returns:**
322        - bool: `True` if the files are identical, `False` otherwise.
323        """
324        src_blob = self.load_blob_from_full_path(src_cloud_path)
325        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
326
327        # If either blob is None or does not exist
328        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
329            return False
330        # If the MD5 hashes exist
331        if src_blob.md5_hash and dest_blob.md5_hash:
332            # And are the same return True
333            if src_blob.md5_hash == dest_blob.md5_hash:
334                return True
335        else:
336            # If md5 do not exist (for larger files they may not) check size matches
337            if src_blob.size == dest_blob.size:
338                return True
339        # Otherwise, return False
340        return False

Validate if two cloud files (source and destination) are identical based on their MD5 hashes.

Args:

  • src_cloud_path (str): The source GCS path.
  • dest_cloud_path (str): The destination GCS path.

Returns:

  • bool: True if the files are identical, False otherwise.
def delete_multiple_files( self, files_to_delete: list[str], workers: int = 10, max_retries: int = 5, verbose: bool = False, job_complete_for_logging: int = 500) -> None:
342    def delete_multiple_files(
343            self,
344            files_to_delete: list[str],
345            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
346            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
347            verbose: bool = False,
348            job_complete_for_logging: int = 500
349    ) -> None:
350        """
351        Delete multiple cloud files in parallel using multi-threading.
352
353        **Args:**
354        - files_to_delete (list[str]): List of GCS paths of the files to delete.
355        - workers (int, optional): Number of worker threads. Defaults to `10`.
356        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
357        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
358        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
359        """
360        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
361
362        MultiThreadedJobs().run_multi_threaded_job(
363            workers=workers,
364            function=self.delete_cloud_file,
365            list_of_jobs_args_list=list_of_jobs_args_list,
366            max_retries=max_retries,
367            fail_on_error=True,
368            verbose=verbose,
369            collect_output=False,
370            jobs_complete_for_logging=job_complete_for_logging
371        )

Delete multiple cloud files in parallel using multi-threading.

Args:

  • files_to_delete (list[str]): List of GCS paths of the files to delete.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.
def loop_and_log_validation_files_multithreaded( self, files_to_validate: list[dict], log_difference: bool, workers: int = 10, max_retries: int = 5, job_complete_for_logging: int = 500) -> list[dict]:
390    def loop_and_log_validation_files_multithreaded(
391            self,
392            files_to_validate: list[dict],
393            log_difference: bool,
394            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
395            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
396            job_complete_for_logging: int = 500
397    ) -> list[dict]:
398        """
399        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
400
401        **Args:**
402        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
403        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
404                                   this at the start of a copy/move operation to check if files are already copied.
405        - workers (int, optional): Number of worker threads. Defaults to `10`.
406        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
407        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
408
409        **Returns:**
410        - list[dict]: List of dictionaries containing files that are **not** identical.
411        """
412        logging.info(f"Validating if {len(files_to_validate)} files are identical")
413
414        # Prepare jobs: pass the necessary arguments to each validation
415        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
416
417        # Use multithreaded job runner to validate the files
418        checked_files = MultiThreadedJobs().run_multi_threaded_job(
419            workers=workers,
420            function=self._validate_file_pair,
421            list_of_jobs_args_list=jobs,
422            collect_output=True,
423            max_retries=max_retries,
424            jobs_complete_for_logging=job_complete_for_logging
425        )
426        # If any files failed to load, raise an exception
427        if files_to_validate and None in checked_files:  # type: ignore[operator]
428            logging.error("Failed to validate all files, could not load some blobs")
429            raise Exception("Failed to validate all files")
430
431        # Get all files that are not identical
432        not_identical_files = [
433            file_dict
434            for file_dict in checked_files  # type: ignore[operator, union-attr]
435            if not file_dict['identical']
436        ]
437        if not_identical_files:
438            if log_difference:
439                for file_dict in not_identical_files:
440                    logging.warning(
441                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
442                    )
443            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
444        return not_identical_files

Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.

Args:

  • files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
  • log_difference (bool): Whether to log differences if files are not identical. Set False if you are running this at the start of a copy/move operation to check if files are already copied.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries for all jobs. Defaults to 5.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Returns:

  • list[dict]: List of dictionaries containing files that are not identical.
def multithread_copy_of_files_with_validation( self, files_to_copy: list[dict], workers: int = 10, max_retries: int = 5, skip_check_if_already_copied: bool = False) -> None:
446    def multithread_copy_of_files_with_validation(
447            self,
448            files_to_copy: list[dict],
449            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
450            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
451            skip_check_if_already_copied: bool = False
452    ) -> None:
453        """
454        Copy multiple files in parallel with validation.
455
456        **Args:**
457        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
458                Dictionary should have keys `source_file` and `full_destination_path`
459        - workers (int): Number of worker threads. Defaults to `10`.
460        - max_retries (int): Maximum number of retries. Defaults to `5`
461        - skip_check_if_already_copied (bool, optional): Whether to skip checking
462                if files are already copied and start copying right away. Defaults to `False`.
463        """
464        if skip_check_if_already_copied:
465            logging.info("Skipping check if files are already copied")
466            updated_files_to_move = files_to_copy
467        else:
468            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
469                files_to_copy,
470                log_difference=False,
471                workers=workers,
472                max_retries=max_retries
473            )
474        # If all files are already copied, return
475        if not updated_files_to_move:
476            logging.info("All files are already copied")
477            return None
478        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
479        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
480        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
481        # Validate that all files were copied successfully
482        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
483            files_to_copy,
484            workers=workers,
485            log_difference=True,
486            max_retries=max_retries
487        )
488        if files_not_moved_successfully:
489            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
490            raise Exception("Failed to copy all files")
491        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
492        return None

Copy multiple files in parallel with validation.

Args:

  • files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. Dictionary should have keys source_file and full_destination_path
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5
  • skip_check_if_already_copied (bool, optional): Whether to skip checking if files are already copied and start copying right away. Defaults to False.
def move_or_copy_multiple_files( self, files_to_move: list[dict], action: str, workers: int = 10, max_retries: int = 5, verbose: bool = False, jobs_complete_for_logging: int = 500) -> None:
494    def move_or_copy_multiple_files(
495            self, files_to_move: list[dict],
496            action: str,
497            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
498            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
499            verbose: bool = False,
500            jobs_complete_for_logging: int = 500
501    ) -> None:
502        """
503        Move or copy multiple files in parallel.
504
505        **Args:**
506        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
507        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
508        or `ops_utils.gcp_utils.COPY`).
509        - workers (int): Number of worker threads. Defaults to `10`.
510        - max_retries (int): Maximum number of retries. Defaults to `5`.
511        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
512        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
513
514        **Raises:**
515        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
516        """
517        if action == MOVE:
518            cloud_function = self.move_cloud_file
519        elif action == COPY:
520            cloud_function = self.copy_cloud_file
521        else:
522            raise ValueError("Must either select move or copy")
523
524        list_of_jobs_args_list = [
525            [
526                file_dict['source_file'], file_dict['full_destination_path']
527            ]
528            for file_dict in files_to_move
529        ]
530        MultiThreadedJobs().run_multi_threaded_job(
531            workers=workers,
532            function=cloud_function,
533            list_of_jobs_args_list=list_of_jobs_args_list,
534            max_retries=max_retries,
535            fail_on_error=True,
536            verbose=verbose,
537            collect_output=False,
538            jobs_complete_for_logging=jobs_complete_for_logging
539        )

Move or copy multiple files in parallel.

Args:

  • files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
  • action (str): The action to perform (should be one of ops_utils.gcp_utils.MOVE or ops_utils.gcp_utils.COPY).
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Raises:

def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
541    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
542        """
543        Read the content of a file from GCS.
544
545        **Args:**
546        - cloud_path (str): The GCS path of the file to read.
547        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
548
549        **Returns:**
550        - bytes: The content of the file as bytes.
551        """
552        blob = self.load_blob_from_full_path(cloud_path)
553        # Download the file content as bytes
554        content_bytes = blob.download_as_bytes()
555        # Convert bytes to string
556        content_str = content_bytes.decode(encoding)
557        return content_str

Read the content of a file from GCS.

Args:

  • cloud_path (str): The GCS path of the file to read.
  • encoding (str, optional): The encoding to use. Defaults to utf-8.

Returns:

  • bytes: The content of the file as bytes.
def upload_blob( self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
559    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
560        """
561        Upload a file to GCS.
562
563        **Args:**
564        - destination_path (str): The destination GCS path.
565        - source_file (str): The source file path.
566        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
567        """
568        blob = self.load_blob_from_full_path(destination_path)
569        if custom_metadata:
570            blob.metadata = custom_metadata
571        blob.upload_from_filename(source_file)

Upload a file to GCS.

Args:

  • destination_path (str): The destination GCS path.
  • source_file (str): The source file path.
  • custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
def get_object_md5( self, file_path: str, chunk_size: int = 256000, logging_bytes: int = 1000000000, returned_md5_format: str = 'hex') -> str:
573    def get_object_md5(
574        self,
575        file_path: str,
576        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
577        chunk_size: int = parse_size("256 KB"),
578        logging_bytes: int = parse_size("1 GB"),
579        returned_md5_format: str = "hex"
580    ) -> str:
581        """
582        Calculate the MD5 checksum of a file in GCS.
583
584        **Args:**
585        - file_path (str): The GCS path of the file.
586        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
587        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
588        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
589                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
590
591        **Returns:**
592        - str: The MD5 checksum of the file.
593
594        **Raises:**
595        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
596        or `ops_utils.gcp_utils.MD5_BASE64`
597        """
598        if returned_md5_format not in ["hex", "base64"]:
599            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
600
601        blob = self.load_blob_from_full_path(file_path)
602
603        # Create an MD5 hash object
604        md5_hash = hashlib.md5()
605
606        blob_size_str = format_size(blob.size)
607        logging.info(f"Streaming {file_path} which is {blob_size_str}")
608        # Use a BytesIO stream to collect data in chunks and upload it
609        buffer = io.BytesIO()
610        total_bytes_streamed = 0
611        # Keep track of the last logged size for data logging
612        last_logged = 0
613
614        with blob.open("rb") as source_stream:
615            while True:
616                chunk = source_stream.read(chunk_size)
617                if not chunk:
618                    break
619                md5_hash.update(chunk)
620                buffer.write(chunk)
621                total_bytes_streamed += len(chunk)
622                # Log progress every 1 gb if verbose used
623                if total_bytes_streamed - last_logged >= logging_bytes:
624                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
625                    last_logged = total_bytes_streamed
626
627        if returned_md5_format == "hex":
628            md5 = md5_hash.hexdigest()
629            logging.info(f"MD5 (hex) for {file_path}: {md5}")
630        elif returned_md5_format == "base64":
631            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
632            logging.info(f"MD5 (base64) for {file_path}: {md5}")
633        return md5

Calculate the MD5 checksum of a file in GCS.

Args:

  • file_path (str): The GCS path of the file.
  • chunk_size (int, optional): The size of each chunk to read. Defaults to 256 KB.
  • logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to 1 GB.
  • returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to hex. Options are ops_utils.gcp_utils.MD5_HEX or ops_utils.gcp_utils.MD5_BASE64.

Returns:

  • str: The MD5 checksum of the file.

Raises:

def set_acl_public_read(self, cloud_path: str) -> None:
635    def set_acl_public_read(self, cloud_path: str) -> None:
636        """
637        Set the file in the bucket to be publicly readable.
638
639        **Args:**
640        - cloud_path (str): The GCS path of the file to be set as public readable.
641        """
642        blob = self.load_blob_from_full_path(cloud_path)
643        blob.acl.all().grant_read()
644        blob.acl.save()

Set the file in the bucket to be publicly readable.

Args:

  • cloud_path (str): The GCS path of the file to be set as public readable.
def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
646    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
647        """
648        Set the file in the bucket to grant OWNER permission to a specific group.
649
650        **Args:**
651        - cloud_path (str): The GCS path of the file.
652        - group_email (str): The email of the group to grant OWNER permission
653        """
654        blob = self.load_blob_from_full_path(cloud_path)
655        blob.acl.group(group_email).grant_owner()
656        blob.acl.save()

Set the file in the bucket to grant OWNER permission to a specific group.

Args:

  • cloud_path (str): The GCS path of the file.
  • group_email (str): The email of the group to grant OWNER permission
def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
658    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
659        """
660        Set Cache-Control metadata for a file.
661
662        **Args:**
663        - cloud_path (str): The GCS path of the file.
664        - cache_control (str): The Cache-Control metadata to set.
665        """
666        blob = self.load_blob_from_full_path(cloud_path)
667        blob.cache_control = cache_control
668        blob.patch()

Set Cache-Control metadata for a file.

Args:

  • cloud_path (str): The GCS path of the file.
  • cache_control (str): The Cache-Control metadata to set.
def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
670    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
671        """
672        Get the most recent blob in the bucket.
673        
674        If the blob with the most recent timestamp doesn't have
675        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
676        recent file until a log with useful information is encountered. This is useful when combing through
677        GCP activity logs for Terra workspace buckets.
678
679        **Args:**
680        - bucket_name (str): The GCS bucket name.
681
682        **Returns:**
683        - Optional tuple of the blob found and the file contents from the blob
684        """
685        blobs = sorted(
686            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
687        )
688        for blob in blobs:
689            # Download the file contents as a string
690            file_contents = blob.download_as_text()
691
692            # Check if the content matches the undesired format
693            lines = file_contents.splitlines()
694            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
695                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
696                continue
697
698            # If it doesn't match the undesired format, return its content
699            logging.info(f"Found valid file: {blob.name}")
700            return blob, file_contents
701
702        logging.info("No valid files found.")
703        return None

Get the most recent blob in the bucket.

If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.

Args:

  • bucket_name (str): The GCS bucket name.

Returns:

  • Optional tuple of the blob found and the file contents from the blob
def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
705    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
706        """
707        Write content to a file in GCS.
708
709        **Args:**
710        - cloud_path (str): The GCS path of the file to write.
711        - file_contents (str): The content to write.
712        """
713        blob = self.load_blob_from_full_path(cloud_path)
714        blob.upload_from_string(file_contents)
715        logging.info(f"Successfully wrote content to {cloud_path}")

Write content to a file in GCS.

Args:

  • cloud_path (str): The GCS path of the file to write.
  • file_contents (str): The content to write.
@staticmethod
def get_active_gcloud_account() -> str:
717    @staticmethod
718    def get_active_gcloud_account() -> str:
719        """
720        Get the active GCP email for the current account.
721
722        **Returns:**
723        - str: The active GCP account email.
724        """
725        result = subprocess.run(
726            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
727            capture_output=True,
728            text=True,
729            check=True
730        )
731        return result.stdout.strip()

Get the active GCP email for the current account.

Returns:

  • str: The active GCP account email.