ops_utils.gcp_utils

Module for GCP utilities.

  1"""Module for GCP utilities."""
  2import os
  3import logging
  4import time
  5import io
  6import json
  7import hashlib
  8import base64
  9import subprocess
 10
 11from humanfriendly import format_size, parse_size
 12from mimetypes import guess_type
 13from typing import Optional, Any
 14from google.cloud.storage.blob import Blob
 15from google.api_core.exceptions import Forbidden, GoogleAPICallError
 16from google.oauth2 import service_account
 17from google.cloud import storage
 18from google.auth import default
 19
 20from .vars import ARG_DEFAULTS
 21from .thread_pool_executor_util import MultiThreadedJobs
 22
 23MOVE = "move"
 24"""Variable to be used for the "action" when files should be moved."""
 25COPY = "copy"
 26"""Variable to be used for the "action" when files should be copied."""
 27MD5_HEX = "hex"
 28"""Variable to be used when the generated `md5` should be of the `hex` type."""
 29MD5_BASE64 = "base64"
 30"""Variable to be used when the generated `md5` should be of the `base64` type."""
 31
 32
 33class GCPCloudFunctions:
 34    """Class to handle GCP Cloud Functions."""
 35
 36    def __init__(
 37            self,
 38            project: Optional[str] = None,
 39            service_account_json: Optional[str] = None
 40    ) -> None:
 41        """
 42        Initialize the GCPCloudFunctions class.
 43
 44        Authenticates using service account JSON if provided or default credentials,
 45        and sets up the Storage Client.
 46
 47        Args:
 48            project: Optional[str] = None
 49                The GCP project ID. If not provided, will use project from service account or default.
 50            service_account_json: Optional[str] = None
 51                Path to service account JSON key file. If provided, will use these credentials.
 52        """
 53        # Initialize credentials and project
 54        credentials = None
 55        default_project = None
 56
 57        if service_account_json:
 58            credentials = service_account.Credentials.from_service_account_file(service_account_json)
 59            # Extract project from service account if not specified
 60            if not project:
 61                with open(service_account_json, 'r') as f:
 62                    sa_info = json.load(f)
 63                    project = sa_info.get('project_id')
 64        else:
 65            # Use default credentials
 66            credentials, default_project = default()
 67
 68        # Set project if not already set
 69        if not project:
 70            project = default_project
 71
 72        self.client = storage.Client(credentials=credentials, project=project)
 73        """@private"""
 74
 75    @staticmethod
 76    def _process_cloud_path(cloud_path: str) -> dict:
 77        """
 78        Process a GCS cloud path into its components.
 79
 80        Args:
 81            cloud_path (str): The GCS cloud path.
 82
 83        Returns:
 84            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
 85        """
 86        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
 87        bucket_name = str.split(remaining_url, sep="/")[0]
 88        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
 89        path_components = {
 90            "platform_prefix": platform_prefix,
 91            "bucket": bucket_name,
 92            "blob_url": blob_name
 93        }
 94        return path_components
 95
 96    def load_blob_from_full_path(self, full_path: str) -> Blob:
 97        """
 98        Load a GCS blob object from a full GCS path.
 99
100        **Args:**
101        - full_path (str): The full GCS path.
102
103        **Returns:**
104        - google.cloud.storage.blob.Blob: The GCS blob object.
105        """
106        file_path_components = self._process_cloud_path(full_path)
107
108        # Specify the billing project
109        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
110        blob = bucket.blob(file_path_components["blob_url"])
111
112        # If blob exists in GCS reload it so metadata is there
113        if blob.exists():
114            blob.reload()
115        return blob
116
117    def check_file_exists(self, full_path: str) -> bool:
118        """
119        Check if a file exists in GCS.
120
121        **Args:**
122        - full_path (str): The full GCS path.
123
124        **Returns:**
125        - bool: `True` if the file exists, `False` otherwise.
126        """
127        blob = self.load_blob_from_full_path(full_path)
128        return blob.exists()
129
130    @staticmethod
131    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
132        """
133        Create a dictionary containing file information.
134
135        Args:
136            bucket_name (str): The name of the GCS bucket.
137            blob (Any): The GCS blob object.
138            file_name_only (bool): Whether to return only the file list.
139
140        Returns:
141            dict: A dictionary containing file information.
142        """
143        if file_name_only:
144            return {
145                "path": f"gs://{bucket_name}/{blob.name}"
146            }
147        return {
148            "name": os.path.basename(blob.name),
149            "path": f"gs://{bucket_name}/{blob.name}",
150            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
151            "file_extension": os.path.splitext(blob.name)[1],
152            "size_in_bytes": blob.size,
153            "md5_hash": blob.md5_hash
154        }
155
156    @staticmethod
157    def _validate_include_blob(
158            blob: Any,
159            bucket_name: str,
160            file_extensions_to_ignore: list[str] = [],
161            file_strings_to_ignore: list[str] = [],
162            file_extensions_to_include: list[str] = [],
163            verbose: bool = False
164    ) -> bool:
165        """
166        Validate if a blob should be included based on its file extension.
167
168        Args:
169            file_extensions_to_include (list[str]): List of file extensions to include.
170            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
171            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
172            blob (Any): The GCS blob object.
173            verbose (bool): Whether to log files not being included.
174
175        Returns:
176            bool: True if the blob should be included, False otherwise.
177        """
178        file_path = f"gs://{bucket_name}/{blob.name}"
179        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
180            if verbose:
181                logging.info(f"Skipping {file_path} as it has an extension to ignore")
182            return False
183        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
184            if verbose:
185                logging.info(f"Skipping {file_path} as it does not have an extension to include")
186            return False
187        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
188            if verbose:
189                logging.info(f"Skipping {file_path} as it has a string to ignore")
190            return False
191        return True
192
193    def list_bucket_contents(
194            self,
195            bucket_name: str,
196            prefix: Optional[str] = None,
197            file_extensions_to_ignore: list[str] = [],
198            file_strings_to_ignore: list[str] = [],
199            file_extensions_to_include: list[str] = [],
200            file_name_only: bool = False
201    ) -> list[dict]:
202        """
203        List contents of a GCS bucket and return a list of dictionaries with file information.
204
205        **Args:**
206        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
207        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
208        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
209        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
210        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
211        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
212
213        **Returns:**
214        - list[dict]: A list of dictionaries containing file information.
215        """
216        # If the bucket name starts with gs://, remove it
217        if bucket_name.startswith("gs://"):
218            bucket_name = bucket_name.split("/")[2].strip()
219
220        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
221
222        # Get the bucket object and set user_project for Requester Pays
223        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
224
225        # List blobs within the bucket
226        blobs = bucket.list_blobs(prefix=prefix)
227        logging.info("Finished listing blobs. Processing files now.")
228
229        # Create a list of dictionaries containing file information
230        file_list = [
231            self._create_bucket_contents_dict(
232                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
233            )
234            for blob in blobs
235            if self._validate_include_blob(
236                blob=blob,
237                file_extensions_to_ignore=file_extensions_to_ignore,
238                file_strings_to_ignore=file_strings_to_ignore,
239                file_extensions_to_include=file_extensions_to_include,
240                bucket_name=bucket_name
241            ) and not blob.name.endswith("/")
242        ]
243        logging.info(f"Found {len(file_list)} files in bucket")
244        return file_list
245
246    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
247        """
248        Copy a file from one GCS location to another.
249
250        **Args:**
251        - src_cloud_path (str): The source GCS path.
252        - full_destination_path (str): The destination GCS path.
253        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
254        """
255        try:
256            src_blob = self.load_blob_from_full_path(src_cloud_path)
257            dest_blob = self.load_blob_from_full_path(full_destination_path)
258
259            # Use rewrite so no timeouts
260            rewrite_token = False
261
262            while True:
263                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
264                    src_blob, token=rewrite_token
265                )
266                if verbose:
267                    logging.info(
268                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
269                    )
270                if not rewrite_token:
271                    break
272
273        except Exception as e:
274            logging.error(
275                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
276                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
277            )
278            raise
279
280    def delete_cloud_file(self, full_cloud_path: str) -> None:
281        """
282        Delete a file from GCS.
283
284        **Args:**
285        - full_cloud_path (str): The GCS path of the file to delete.
286        """
287        blob = self.load_blob_from_full_path(full_cloud_path)
288        blob.delete()
289
290    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
291        """
292        Move a file from one GCS location to another.
293
294        **Args:**
295        - src_cloud_path (str): The source GCS path.
296        - full_destination_path (str): The destination GCS path.
297        """
298        self.copy_cloud_file(src_cloud_path, full_destination_path)
299        self.delete_cloud_file(src_cloud_path)
300
301    def get_filesize(self, target_path: str) -> int:
302        """
303        Get the size of a file in GCS.
304
305        **Args:**
306        - target_path (str): The GCS path of the file.
307
308        **Returns:**
309        - int: The size of the file in bytes.
310        """
311        blob = self.load_blob_from_full_path(target_path)
312        return blob.size
313
314    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
315        """
316        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
317
318        **Args:**
319        - src_cloud_path (str): The source GCS path.
320        - dest_cloud_path (str): The destination GCS path.
321
322        **Returns:**
323        - bool: `True` if the files are identical, `False` otherwise.
324        """
325        src_blob = self.load_blob_from_full_path(src_cloud_path)
326        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
327
328        # If either blob is None or does not exist
329        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
330            return False
331        # If the MD5 hashes exist
332        if src_blob.md5_hash and dest_blob.md5_hash:
333            # And are the same return True
334            if src_blob.md5_hash == dest_blob.md5_hash:
335                return True
336        else:
337            # If md5 do not exist (for larger files they may not) check size matches
338            if src_blob.size == dest_blob.size:
339                return True
340        # Otherwise, return False
341        return False
342
343    def delete_multiple_files(
344            self,
345            files_to_delete: list[str],
346            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
347            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
348            verbose: bool = False,
349            job_complete_for_logging: int = 500
350    ) -> None:
351        """
352        Delete multiple cloud files in parallel using multi-threading.
353
354        **Args:**
355        - files_to_delete (list[str]): List of GCS paths of the files to delete.
356        - workers (int, optional): Number of worker threads. Defaults to `10`.
357        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
358        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
359        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
360        """
361        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
362
363        MultiThreadedJobs().run_multi_threaded_job(
364            workers=workers,
365            function=self.delete_cloud_file,
366            list_of_jobs_args_list=list_of_jobs_args_list,
367            max_retries=max_retries,
368            fail_on_error=True,
369            verbose=verbose,
370            collect_output=False,
371            jobs_complete_for_logging=job_complete_for_logging
372        )
373
374    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
375        """
376        Validate if source and destination files are identical.
377
378        **Args:**
379        - source_file (str): The source file path.
380        - full_destination_path (str): The destination file path.
381
382        **Returns:**
383            dict: The file dictionary of the files with a boolean indicating if they are identical.
384        """
385        if self.validate_files_are_same(source_file, full_destination_path):
386            identical = True
387        else:
388            identical = False
389        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
390
391    def loop_and_log_validation_files_multithreaded(
392            self,
393            files_to_validate: list[dict],
394            log_difference: bool,
395            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
396            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
397            job_complete_for_logging: int = 500
398    ) -> list[dict]:
399        """
400        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
401
402        **Args:**
403        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
404        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
405                                   this at the start of a copy/move operation to check if files are already copied.
406        - workers (int, optional): Number of worker threads. Defaults to `10`.
407        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
408        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
409
410        **Returns:**
411        - list[dict]: List of dictionaries containing files that are **not** identical.
412        """
413        logging.info(f"Validating if {len(files_to_validate)} files are identical")
414
415        # Prepare jobs: pass the necessary arguments to each validation
416        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
417
418        # Use multithreaded job runner to validate the files
419        checked_files = MultiThreadedJobs().run_multi_threaded_job(
420            workers=workers,
421            function=self._validate_file_pair,
422            list_of_jobs_args_list=jobs,
423            collect_output=True,
424            max_retries=max_retries,
425            jobs_complete_for_logging=job_complete_for_logging
426        )
427        # If any files failed to load, raise an exception
428        if files_to_validate and None in checked_files:  # type: ignore[operator]
429            logging.error("Failed to validate all files, could not load some blobs")
430            raise Exception("Failed to validate all files")
431
432        # Get all files that are not identical
433        not_identical_files = [
434            file_dict
435            for file_dict in checked_files  # type: ignore[operator, union-attr]
436            if not file_dict['identical']
437        ]
438        if not_identical_files:
439            if log_difference:
440                for file_dict in not_identical_files:
441                    logging.warning(
442                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
443                    )
444            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
445        return not_identical_files
446
447    def multithread_copy_of_files_with_validation(
448            self,
449            files_to_copy: list[dict],
450            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
451            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
452            skip_check_if_already_copied: bool = False
453    ) -> None:
454        """
455        Copy multiple files in parallel with validation.
456
457        **Args:**
458        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
459                Dictionary should have keys `source_file` and `full_destination_path`
460        - workers (int): Number of worker threads. Defaults to `10`.
461        - max_retries (int): Maximum number of retries. Defaults to `5`
462        - skip_check_if_already_copied (bool, optional): Whether to skip checking
463                if files are already copied and start copying right away. Defaults to `False`.
464        """
465        if skip_check_if_already_copied:
466            logging.info("Skipping check if files are already copied")
467            updated_files_to_move = files_to_copy
468        else:
469            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
470                files_to_copy,
471                log_difference=False,
472                workers=workers,
473                max_retries=max_retries
474            )
475        # If all files are already copied, return
476        if not updated_files_to_move:
477            logging.info("All files are already copied")
478            return None
479        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
480        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
481        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
482        # Validate that all files were copied successfully
483        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
484            files_to_copy,
485            workers=workers,
486            log_difference=True,
487            max_retries=max_retries
488        )
489        if files_not_moved_successfully:
490            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
491            raise Exception("Failed to copy all files")
492        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
493        return None
494
495    def move_or_copy_multiple_files(
496            self, files_to_move: list[dict],
497            action: str,
498            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
499            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
500            verbose: bool = False,
501            jobs_complete_for_logging: int = 500
502    ) -> None:
503        """
504        Move or copy multiple files in parallel.
505
506        **Args:**
507        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
508        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
509        or `ops_utils.gcp_utils.COPY`).
510        - workers (int): Number of worker threads. Defaults to `10`.
511        - max_retries (int): Maximum number of retries. Defaults to `5`.
512        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
513        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
514
515        **Raises:**
516        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
517        """
518        if action == MOVE:
519            cloud_function = self.move_cloud_file
520        elif action == COPY:
521            cloud_function = self.copy_cloud_file
522        else:
523            raise ValueError("Must either select move or copy")
524
525        list_of_jobs_args_list = [
526            [
527                file_dict['source_file'], file_dict['full_destination_path']
528            ]
529            for file_dict in files_to_move
530        ]
531        MultiThreadedJobs().run_multi_threaded_job(
532            workers=workers,
533            function=cloud_function,
534            list_of_jobs_args_list=list_of_jobs_args_list,
535            max_retries=max_retries,
536            fail_on_error=True,
537            verbose=verbose,
538            collect_output=False,
539            jobs_complete_for_logging=jobs_complete_for_logging
540        )
541
542    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
543        """
544        Read the content of a file from GCS.
545
546        **Args:**
547        - cloud_path (str): The GCS path of the file to read.
548        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
549
550        **Returns:**
551        - bytes: The content of the file as bytes.
552        """
553        blob = self.load_blob_from_full_path(cloud_path)
554        # Download the file content as bytes
555        content_bytes = blob.download_as_bytes()
556        # Convert bytes to string
557        content_str = content_bytes.decode(encoding)
558        return content_str
559
560    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
561        """
562        Upload a file to GCS.
563
564        **Args:**
565        - destination_path (str): The destination GCS path.
566        - source_file (str): The source file path.
567        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
568        """
569        blob = self.load_blob_from_full_path(destination_path)
570        if custom_metadata:
571            blob.metadata = custom_metadata
572        blob.upload_from_filename(source_file)
573
574    def get_object_md5(
575        self,
576        file_path: str,
577        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
578        chunk_size: int = parse_size("256 KB"),
579        logging_bytes: int = parse_size("1 GB"),
580        returned_md5_format: str = "hex"
581    ) -> str:
582        """
583        Calculate the MD5 checksum of a file in GCS.
584
585        **Args:**
586        - file_path (str): The GCS path of the file.
587        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
588        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
589        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
590                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
591
592        **Returns:**
593        - str: The MD5 checksum of the file.
594
595        **Raises:**
596        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
597        or `ops_utils.gcp_utils.MD5_BASE64`
598        """
599        if returned_md5_format not in ["hex", "base64"]:
600            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
601
602        blob = self.load_blob_from_full_path(file_path)
603
604        # Create an MD5 hash object
605        md5_hash = hashlib.md5()
606
607        blob_size_str = format_size(blob.size)
608        logging.info(f"Streaming {file_path} which is {blob_size_str}")
609        # Use a BytesIO stream to collect data in chunks and upload it
610        buffer = io.BytesIO()
611        total_bytes_streamed = 0
612        # Keep track of the last logged size for data logging
613        last_logged = 0
614
615        with blob.open("rb") as source_stream:
616            while True:
617                chunk = source_stream.read(chunk_size)
618                if not chunk:
619                    break
620                md5_hash.update(chunk)
621                buffer.write(chunk)
622                total_bytes_streamed += len(chunk)
623                # Log progress every 1 gb if verbose used
624                if total_bytes_streamed - last_logged >= logging_bytes:
625                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
626                    last_logged = total_bytes_streamed
627
628        if returned_md5_format == "hex":
629            md5 = md5_hash.hexdigest()
630            logging.info(f"MD5 (hex) for {file_path}: {md5}")
631        elif returned_md5_format == "base64":
632            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
633            logging.info(f"MD5 (base64) for {file_path}: {md5}")
634        return md5
635
636    def set_acl_public_read(self, cloud_path: str) -> None:
637        """
638        Set the file in the bucket to be publicly readable.
639
640        **Args:**
641        - cloud_path (str): The GCS path of the file to be set as public readable.
642        """
643        blob = self.load_blob_from_full_path(cloud_path)
644        blob.acl.all().grant_read()
645        blob.acl.save()
646
647    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
648        """
649        Set the file in the bucket to grant OWNER permission to a specific group.
650
651        **Args:**
652        - cloud_path (str): The GCS path of the file.
653        - group_email (str): The email of the group to grant OWNER permission
654        """
655        blob = self.load_blob_from_full_path(cloud_path)
656        blob.acl.group(group_email).grant_owner()
657        blob.acl.save()
658
659    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
660        """
661        Set Cache-Control metadata for a file.
662
663        **Args:**
664        - cloud_path (str): The GCS path of the file.
665        - cache_control (str): The Cache-Control metadata to set.
666        """
667        blob = self.load_blob_from_full_path(cloud_path)
668        blob.cache_control = cache_control
669        blob.patch()
670
671    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
672        """
673        Get the most recent blob in the bucket.
674        
675        If the blob with the most recent timestamp doesn't have
676        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
677        recent file until a log with useful information is encountered. This is useful when combing through
678        GCP activity logs for Terra workspace buckets.
679
680        **Args:**
681        - bucket_name (str): The GCS bucket name.
682
683        **Returns:**
684        - Optional tuple of the blob found and the file contents from the blob
685        """
686        blobs = sorted(
687            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
688        )
689        for blob in blobs:
690            # Download the file contents as a string
691            file_contents = blob.download_as_text()
692
693            # Check if the content matches the undesired format
694            lines = file_contents.splitlines()
695            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
696                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
697                continue
698
699            # If it doesn't match the undesired format, return its content
700            logging.info(f"Found valid file: {blob.name}")
701            return blob, file_contents
702
703        logging.info("No valid files found.")
704        return None
705
706    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
707        """
708        Write content to a file in GCS.
709
710        **Args:**
711        - cloud_path (str): The GCS path of the file to write.
712        - file_contents (str): The content to write.
713        """
714        blob = self.load_blob_from_full_path(cloud_path)
715        blob.upload_from_string(file_contents)
716        logging.info(f"Successfully wrote content to {cloud_path}")
717
718    @staticmethod
719    def get_active_gcloud_account() -> str:
720        """
721        Get the active GCP email for the current account.
722
723        **Returns:**
724        - str: The active GCP account email.
725        """
726        result = subprocess.run(
727            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
728            capture_output=True,
729            text=True,
730            check=True
731        )
732        return result.stdout.strip()
733
734    def has_write_permission(self, cloud_path: str) -> bool:
735        """
736        Check if the current user has permission to write to a GCP path.
737
738        This method tests write access by attempting to update the metadata
739        of an existing blob or create a zero-byte temporary file if the blob
740        doesn't exist. The temporary file is deleted immediately if created.
741
742        **Args:**
743        - cloud_path (str): The GCS path to check for write permissions.
744
745        **Returns:**
746        - bool: True if the user has write permission, False otherwise.
747        """
748        if not cloud_path.startswith("gs://"):
749            raise ValueError("cloud_path must start with 'gs://'")
750        if cloud_path.endswith("/"):
751            logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp")
752            cloud_path = f"{cloud_path}permission_test_temp"
753        try:
754            blob = self.load_blob_from_full_path(cloud_path)
755            if blob.exists():
756                # Try updating metadata (doesn't change the content)
757                original_metadata = blob.metadata or {}
758                test_metadata = original_metadata.copy()
759                test_metadata["_write_permission_test"] = "true"
760
761                blob.metadata = test_metadata
762                blob.patch()
763
764                # Restore the original metadata
765                blob.metadata = original_metadata
766                blob.patch()
767
768                logging.info(f"Write permission confirmed for existing blob {cloud_path}")
769                return True
770            else:
771                # Try writing a temporary file to the bucket
772                blob.upload_from_string("")
773
774                # Clean up the test file
775                blob.delete()
776                logging.info(f"Write permission confirmed for {cloud_path}")
777                return True
778        except Forbidden:
779            logging.warning(f"No write permission on path {cloud_path}")
780            return False
781        except GoogleAPICallError as e:
782            logging.warning(f"Error testing write access to {cloud_path}: {e}")
783            return False
784
785    def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
786        """
787        Wait for write permissions on a GCP path, checking at regular intervals.
788
789        This method will periodically check if the user has write permission on the specified cloud path.
790        It will continue checking until either write permission is granted or the maximum wait time is reached.
791
792        **Args:**
793        - cloud_path (str): The GCS path to check for write permissions.
794        - interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
795        - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
796
797        **Returns:**
798        - bool: True if write permission is granted within the wait time, False otherwise.
799        """
800        if not cloud_path.startswith("gs://"):
801            raise ValueError("cloud_path must start with 'gs://'")
802
803        # Convert minutes to seconds for the sleep function
804        interval_seconds = interval_wait_time_minutes * 60
805        max_wait_seconds = max_wait_time_minutes * 60
806
807        start_time = time.time()
808        attempt_number = 1
809
810        logging.info(
811            f"Starting to check for write permissions on {cloud_path}. Will check "
812            f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).")
813
814        # First check immediately
815        if self.has_write_permission(cloud_path):
816            logging.info(f"Write permission confirmed on initial check for {cloud_path}")
817            return
818
819        # If first check fails, start periodic checks
820        while time.time() - start_time < max_wait_seconds:
821            elapsed_minutes = (time.time() - start_time) / 60
822            remaining_minutes = max_wait_time_minutes - elapsed_minutes
823
824            logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. "
825                         f"Time elapsed: {elapsed_minutes:.1f} minute(s). "
826                         f"Time remaining: {remaining_minutes:.1f} minute(s).")
827
828            # Sleep for the interval duration
829            time.sleep(interval_seconds)
830
831            attempt_number += 1
832            logging.info(f"Checking write permissions (attempt {attempt_number})...")
833
834            if self.has_write_permission(cloud_path):
835                elapsed_minutes = (time.time() - start_time) / 60
836                logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}")
837                return
838
839        # If we get here, we've exceeded the maximum wait time
840        raise PermissionError(
841            f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for "
842            f"{cloud_path} after {attempt_number} attempts.")
MOVE = 'move'

Variable to be used for the "action" when files should be moved.

COPY = 'copy'

Variable to be used for the "action" when files should be copied.

MD5_HEX = 'hex'

Variable to be used when the generated md5 should be of the hex type.

MD5_BASE64 = 'base64'

Variable to be used when the generated md5 should be of the base64 type.

class GCPCloudFunctions:
 34class GCPCloudFunctions:
 35    """Class to handle GCP Cloud Functions."""
 36
 37    def __init__(
 38            self,
 39            project: Optional[str] = None,
 40            service_account_json: Optional[str] = None
 41    ) -> None:
 42        """
 43        Initialize the GCPCloudFunctions class.
 44
 45        Authenticates using service account JSON if provided or default credentials,
 46        and sets up the Storage Client.
 47
 48        Args:
 49            project: Optional[str] = None
 50                The GCP project ID. If not provided, will use project from service account or default.
 51            service_account_json: Optional[str] = None
 52                Path to service account JSON key file. If provided, will use these credentials.
 53        """
 54        # Initialize credentials and project
 55        credentials = None
 56        default_project = None
 57
 58        if service_account_json:
 59            credentials = service_account.Credentials.from_service_account_file(service_account_json)
 60            # Extract project from service account if not specified
 61            if not project:
 62                with open(service_account_json, 'r') as f:
 63                    sa_info = json.load(f)
 64                    project = sa_info.get('project_id')
 65        else:
 66            # Use default credentials
 67            credentials, default_project = default()
 68
 69        # Set project if not already set
 70        if not project:
 71            project = default_project
 72
 73        self.client = storage.Client(credentials=credentials, project=project)
 74        """@private"""
 75
 76    @staticmethod
 77    def _process_cloud_path(cloud_path: str) -> dict:
 78        """
 79        Process a GCS cloud path into its components.
 80
 81        Args:
 82            cloud_path (str): The GCS cloud path.
 83
 84        Returns:
 85            dict: A dictionary containing the platform prefix, bucket name, and blob URL.
 86        """
 87        platform_prefix, remaining_url = str.split(str(cloud_path), sep="//", maxsplit=1)
 88        bucket_name = str.split(remaining_url, sep="/")[0]
 89        blob_name = "/".join(str.split(remaining_url, sep="/")[1:])
 90        path_components = {
 91            "platform_prefix": platform_prefix,
 92            "bucket": bucket_name,
 93            "blob_url": blob_name
 94        }
 95        return path_components
 96
 97    def load_blob_from_full_path(self, full_path: str) -> Blob:
 98        """
 99        Load a GCS blob object from a full GCS path.
100
101        **Args:**
102        - full_path (str): The full GCS path.
103
104        **Returns:**
105        - google.cloud.storage.blob.Blob: The GCS blob object.
106        """
107        file_path_components = self._process_cloud_path(full_path)
108
109        # Specify the billing project
110        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
111        blob = bucket.blob(file_path_components["blob_url"])
112
113        # If blob exists in GCS reload it so metadata is there
114        if blob.exists():
115            blob.reload()
116        return blob
117
118    def check_file_exists(self, full_path: str) -> bool:
119        """
120        Check if a file exists in GCS.
121
122        **Args:**
123        - full_path (str): The full GCS path.
124
125        **Returns:**
126        - bool: `True` if the file exists, `False` otherwise.
127        """
128        blob = self.load_blob_from_full_path(full_path)
129        return blob.exists()
130
131    @staticmethod
132    def _create_bucket_contents_dict(bucket_name: str, blob: Any, file_name_only: bool) -> dict:
133        """
134        Create a dictionary containing file information.
135
136        Args:
137            bucket_name (str): The name of the GCS bucket.
138            blob (Any): The GCS blob object.
139            file_name_only (bool): Whether to return only the file list.
140
141        Returns:
142            dict: A dictionary containing file information.
143        """
144        if file_name_only:
145            return {
146                "path": f"gs://{bucket_name}/{blob.name}"
147            }
148        return {
149            "name": os.path.basename(blob.name),
150            "path": f"gs://{bucket_name}/{blob.name}",
151            "content_type": blob.content_type or guess_type(blob.name)[0] or "application/octet-stream",
152            "file_extension": os.path.splitext(blob.name)[1],
153            "size_in_bytes": blob.size,
154            "md5_hash": blob.md5_hash
155        }
156
157    @staticmethod
158    def _validate_include_blob(
159            blob: Any,
160            bucket_name: str,
161            file_extensions_to_ignore: list[str] = [],
162            file_strings_to_ignore: list[str] = [],
163            file_extensions_to_include: list[str] = [],
164            verbose: bool = False
165    ) -> bool:
166        """
167        Validate if a blob should be included based on its file extension.
168
169        Args:
170            file_extensions_to_include (list[str]): List of file extensions to include.
171            file_extensions_to_ignore (list[str]): List of file extensions to ignore.
172            file_strings_to_ignore (list[str]): List of file name substrings to ignore.
173            blob (Any): The GCS blob object.
174            verbose (bool): Whether to log files not being included.
175
176        Returns:
177            bool: True if the blob should be included, False otherwise.
178        """
179        file_path = f"gs://{bucket_name}/{blob.name}"
180        if file_extensions_to_ignore and file_path.endswith(tuple(file_extensions_to_ignore)):
181            if verbose:
182                logging.info(f"Skipping {file_path} as it has an extension to ignore")
183            return False
184        if file_extensions_to_include and not file_path.endswith(tuple(file_extensions_to_include)):
185            if verbose:
186                logging.info(f"Skipping {file_path} as it does not have an extension to include")
187            return False
188        if file_strings_to_ignore and any(file_string in file_path for file_string in file_strings_to_ignore):
189            if verbose:
190                logging.info(f"Skipping {file_path} as it has a string to ignore")
191            return False
192        return True
193
194    def list_bucket_contents(
195            self,
196            bucket_name: str,
197            prefix: Optional[str] = None,
198            file_extensions_to_ignore: list[str] = [],
199            file_strings_to_ignore: list[str] = [],
200            file_extensions_to_include: list[str] = [],
201            file_name_only: bool = False
202    ) -> list[dict]:
203        """
204        List contents of a GCS bucket and return a list of dictionaries with file information.
205
206        **Args:**
207        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
208        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
209        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
210        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
211        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
212        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
213
214        **Returns:**
215        - list[dict]: A list of dictionaries containing file information.
216        """
217        # If the bucket name starts with gs://, remove it
218        if bucket_name.startswith("gs://"):
219            bucket_name = bucket_name.split("/")[2].strip()
220
221        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
222
223        # Get the bucket object and set user_project for Requester Pays
224        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
225
226        # List blobs within the bucket
227        blobs = bucket.list_blobs(prefix=prefix)
228        logging.info("Finished listing blobs. Processing files now.")
229
230        # Create a list of dictionaries containing file information
231        file_list = [
232            self._create_bucket_contents_dict(
233                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
234            )
235            for blob in blobs
236            if self._validate_include_blob(
237                blob=blob,
238                file_extensions_to_ignore=file_extensions_to_ignore,
239                file_strings_to_ignore=file_strings_to_ignore,
240                file_extensions_to_include=file_extensions_to_include,
241                bucket_name=bucket_name
242            ) and not blob.name.endswith("/")
243        ]
244        logging.info(f"Found {len(file_list)} files in bucket")
245        return file_list
246
247    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
248        """
249        Copy a file from one GCS location to another.
250
251        **Args:**
252        - src_cloud_path (str): The source GCS path.
253        - full_destination_path (str): The destination GCS path.
254        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
255        """
256        try:
257            src_blob = self.load_blob_from_full_path(src_cloud_path)
258            dest_blob = self.load_blob_from_full_path(full_destination_path)
259
260            # Use rewrite so no timeouts
261            rewrite_token = False
262
263            while True:
264                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
265                    src_blob, token=rewrite_token
266                )
267                if verbose:
268                    logging.info(
269                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
270                    )
271                if not rewrite_token:
272                    break
273
274        except Exception as e:
275            logging.error(
276                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
277                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
278            )
279            raise
280
281    def delete_cloud_file(self, full_cloud_path: str) -> None:
282        """
283        Delete a file from GCS.
284
285        **Args:**
286        - full_cloud_path (str): The GCS path of the file to delete.
287        """
288        blob = self.load_blob_from_full_path(full_cloud_path)
289        blob.delete()
290
291    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
292        """
293        Move a file from one GCS location to another.
294
295        **Args:**
296        - src_cloud_path (str): The source GCS path.
297        - full_destination_path (str): The destination GCS path.
298        """
299        self.copy_cloud_file(src_cloud_path, full_destination_path)
300        self.delete_cloud_file(src_cloud_path)
301
302    def get_filesize(self, target_path: str) -> int:
303        """
304        Get the size of a file in GCS.
305
306        **Args:**
307        - target_path (str): The GCS path of the file.
308
309        **Returns:**
310        - int: The size of the file in bytes.
311        """
312        blob = self.load_blob_from_full_path(target_path)
313        return blob.size
314
315    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
316        """
317        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
318
319        **Args:**
320        - src_cloud_path (str): The source GCS path.
321        - dest_cloud_path (str): The destination GCS path.
322
323        **Returns:**
324        - bool: `True` if the files are identical, `False` otherwise.
325        """
326        src_blob = self.load_blob_from_full_path(src_cloud_path)
327        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
328
329        # If either blob is None or does not exist
330        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
331            return False
332        # If the MD5 hashes exist
333        if src_blob.md5_hash and dest_blob.md5_hash:
334            # And are the same return True
335            if src_blob.md5_hash == dest_blob.md5_hash:
336                return True
337        else:
338            # If md5 do not exist (for larger files they may not) check size matches
339            if src_blob.size == dest_blob.size:
340                return True
341        # Otherwise, return False
342        return False
343
344    def delete_multiple_files(
345            self,
346            files_to_delete: list[str],
347            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
348            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
349            verbose: bool = False,
350            job_complete_for_logging: int = 500
351    ) -> None:
352        """
353        Delete multiple cloud files in parallel using multi-threading.
354
355        **Args:**
356        - files_to_delete (list[str]): List of GCS paths of the files to delete.
357        - workers (int, optional): Number of worker threads. Defaults to `10`.
358        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
359        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
360        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
361        """
362        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
363
364        MultiThreadedJobs().run_multi_threaded_job(
365            workers=workers,
366            function=self.delete_cloud_file,
367            list_of_jobs_args_list=list_of_jobs_args_list,
368            max_retries=max_retries,
369            fail_on_error=True,
370            verbose=verbose,
371            collect_output=False,
372            jobs_complete_for_logging=job_complete_for_logging
373        )
374
375    def _validate_file_pair(self, source_file: str, full_destination_path: str) -> dict:
376        """
377        Validate if source and destination files are identical.
378
379        **Args:**
380        - source_file (str): The source file path.
381        - full_destination_path (str): The destination file path.
382
383        **Returns:**
384            dict: The file dictionary of the files with a boolean indicating if they are identical.
385        """
386        if self.validate_files_are_same(source_file, full_destination_path):
387            identical = True
388        else:
389            identical = False
390        return {"source_file": source_file, "full_destination_path": full_destination_path, "identical": identical}
391
392    def loop_and_log_validation_files_multithreaded(
393            self,
394            files_to_validate: list[dict],
395            log_difference: bool,
396            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
397            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
398            job_complete_for_logging: int = 500
399    ) -> list[dict]:
400        """
401        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
402
403        **Args:**
404        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
405        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
406                                   this at the start of a copy/move operation to check if files are already copied.
407        - workers (int, optional): Number of worker threads. Defaults to `10`.
408        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
409        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
410
411        **Returns:**
412        - list[dict]: List of dictionaries containing files that are **not** identical.
413        """
414        logging.info(f"Validating if {len(files_to_validate)} files are identical")
415
416        # Prepare jobs: pass the necessary arguments to each validation
417        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
418
419        # Use multithreaded job runner to validate the files
420        checked_files = MultiThreadedJobs().run_multi_threaded_job(
421            workers=workers,
422            function=self._validate_file_pair,
423            list_of_jobs_args_list=jobs,
424            collect_output=True,
425            max_retries=max_retries,
426            jobs_complete_for_logging=job_complete_for_logging
427        )
428        # If any files failed to load, raise an exception
429        if files_to_validate and None in checked_files:  # type: ignore[operator]
430            logging.error("Failed to validate all files, could not load some blobs")
431            raise Exception("Failed to validate all files")
432
433        # Get all files that are not identical
434        not_identical_files = [
435            file_dict
436            for file_dict in checked_files  # type: ignore[operator, union-attr]
437            if not file_dict['identical']
438        ]
439        if not_identical_files:
440            if log_difference:
441                for file_dict in not_identical_files:
442                    logging.warning(
443                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
444                    )
445            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
446        return not_identical_files
447
448    def multithread_copy_of_files_with_validation(
449            self,
450            files_to_copy: list[dict],
451            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
452            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
453            skip_check_if_already_copied: bool = False
454    ) -> None:
455        """
456        Copy multiple files in parallel with validation.
457
458        **Args:**
459        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
460                Dictionary should have keys `source_file` and `full_destination_path`
461        - workers (int): Number of worker threads. Defaults to `10`.
462        - max_retries (int): Maximum number of retries. Defaults to `5`
463        - skip_check_if_already_copied (bool, optional): Whether to skip checking
464                if files are already copied and start copying right away. Defaults to `False`.
465        """
466        if skip_check_if_already_copied:
467            logging.info("Skipping check if files are already copied")
468            updated_files_to_move = files_to_copy
469        else:
470            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
471                files_to_copy,
472                log_difference=False,
473                workers=workers,
474                max_retries=max_retries
475            )
476        # If all files are already copied, return
477        if not updated_files_to_move:
478            logging.info("All files are already copied")
479            return None
480        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
481        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
482        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
483        # Validate that all files were copied successfully
484        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
485            files_to_copy,
486            workers=workers,
487            log_difference=True,
488            max_retries=max_retries
489        )
490        if files_not_moved_successfully:
491            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
492            raise Exception("Failed to copy all files")
493        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
494        return None
495
496    def move_or_copy_multiple_files(
497            self, files_to_move: list[dict],
498            action: str,
499            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
500            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
501            verbose: bool = False,
502            jobs_complete_for_logging: int = 500
503    ) -> None:
504        """
505        Move or copy multiple files in parallel.
506
507        **Args:**
508        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
509        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
510        or `ops_utils.gcp_utils.COPY`).
511        - workers (int): Number of worker threads. Defaults to `10`.
512        - max_retries (int): Maximum number of retries. Defaults to `5`.
513        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
514        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
515
516        **Raises:**
517        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
518        """
519        if action == MOVE:
520            cloud_function = self.move_cloud_file
521        elif action == COPY:
522            cloud_function = self.copy_cloud_file
523        else:
524            raise ValueError("Must either select move or copy")
525
526        list_of_jobs_args_list = [
527            [
528                file_dict['source_file'], file_dict['full_destination_path']
529            ]
530            for file_dict in files_to_move
531        ]
532        MultiThreadedJobs().run_multi_threaded_job(
533            workers=workers,
534            function=cloud_function,
535            list_of_jobs_args_list=list_of_jobs_args_list,
536            max_retries=max_retries,
537            fail_on_error=True,
538            verbose=verbose,
539            collect_output=False,
540            jobs_complete_for_logging=jobs_complete_for_logging
541        )
542
543    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
544        """
545        Read the content of a file from GCS.
546
547        **Args:**
548        - cloud_path (str): The GCS path of the file to read.
549        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
550
551        **Returns:**
552        - bytes: The content of the file as bytes.
553        """
554        blob = self.load_blob_from_full_path(cloud_path)
555        # Download the file content as bytes
556        content_bytes = blob.download_as_bytes()
557        # Convert bytes to string
558        content_str = content_bytes.decode(encoding)
559        return content_str
560
561    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
562        """
563        Upload a file to GCS.
564
565        **Args:**
566        - destination_path (str): The destination GCS path.
567        - source_file (str): The source file path.
568        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
569        """
570        blob = self.load_blob_from_full_path(destination_path)
571        if custom_metadata:
572            blob.metadata = custom_metadata
573        blob.upload_from_filename(source_file)
574
575    def get_object_md5(
576        self,
577        file_path: str,
578        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
579        chunk_size: int = parse_size("256 KB"),
580        logging_bytes: int = parse_size("1 GB"),
581        returned_md5_format: str = "hex"
582    ) -> str:
583        """
584        Calculate the MD5 checksum of a file in GCS.
585
586        **Args:**
587        - file_path (str): The GCS path of the file.
588        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
589        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
590        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
591                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
592
593        **Returns:**
594        - str: The MD5 checksum of the file.
595
596        **Raises:**
597        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
598        or `ops_utils.gcp_utils.MD5_BASE64`
599        """
600        if returned_md5_format not in ["hex", "base64"]:
601            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
602
603        blob = self.load_blob_from_full_path(file_path)
604
605        # Create an MD5 hash object
606        md5_hash = hashlib.md5()
607
608        blob_size_str = format_size(blob.size)
609        logging.info(f"Streaming {file_path} which is {blob_size_str}")
610        # Use a BytesIO stream to collect data in chunks and upload it
611        buffer = io.BytesIO()
612        total_bytes_streamed = 0
613        # Keep track of the last logged size for data logging
614        last_logged = 0
615
616        with blob.open("rb") as source_stream:
617            while True:
618                chunk = source_stream.read(chunk_size)
619                if not chunk:
620                    break
621                md5_hash.update(chunk)
622                buffer.write(chunk)
623                total_bytes_streamed += len(chunk)
624                # Log progress every 1 gb if verbose used
625                if total_bytes_streamed - last_logged >= logging_bytes:
626                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
627                    last_logged = total_bytes_streamed
628
629        if returned_md5_format == "hex":
630            md5 = md5_hash.hexdigest()
631            logging.info(f"MD5 (hex) for {file_path}: {md5}")
632        elif returned_md5_format == "base64":
633            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
634            logging.info(f"MD5 (base64) for {file_path}: {md5}")
635        return md5
636
637    def set_acl_public_read(self, cloud_path: str) -> None:
638        """
639        Set the file in the bucket to be publicly readable.
640
641        **Args:**
642        - cloud_path (str): The GCS path of the file to be set as public readable.
643        """
644        blob = self.load_blob_from_full_path(cloud_path)
645        blob.acl.all().grant_read()
646        blob.acl.save()
647
648    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
649        """
650        Set the file in the bucket to grant OWNER permission to a specific group.
651
652        **Args:**
653        - cloud_path (str): The GCS path of the file.
654        - group_email (str): The email of the group to grant OWNER permission
655        """
656        blob = self.load_blob_from_full_path(cloud_path)
657        blob.acl.group(group_email).grant_owner()
658        blob.acl.save()
659
660    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
661        """
662        Set Cache-Control metadata for a file.
663
664        **Args:**
665        - cloud_path (str): The GCS path of the file.
666        - cache_control (str): The Cache-Control metadata to set.
667        """
668        blob = self.load_blob_from_full_path(cloud_path)
669        blob.cache_control = cache_control
670        blob.patch()
671
672    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
673        """
674        Get the most recent blob in the bucket.
675        
676        If the blob with the most recent timestamp doesn't have
677        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
678        recent file until a log with useful information is encountered. This is useful when combing through
679        GCP activity logs for Terra workspace buckets.
680
681        **Args:**
682        - bucket_name (str): The GCS bucket name.
683
684        **Returns:**
685        - Optional tuple of the blob found and the file contents from the blob
686        """
687        blobs = sorted(
688            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
689        )
690        for blob in blobs:
691            # Download the file contents as a string
692            file_contents = blob.download_as_text()
693
694            # Check if the content matches the undesired format
695            lines = file_contents.splitlines()
696            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
697                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
698                continue
699
700            # If it doesn't match the undesired format, return its content
701            logging.info(f"Found valid file: {blob.name}")
702            return blob, file_contents
703
704        logging.info("No valid files found.")
705        return None
706
707    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
708        """
709        Write content to a file in GCS.
710
711        **Args:**
712        - cloud_path (str): The GCS path of the file to write.
713        - file_contents (str): The content to write.
714        """
715        blob = self.load_blob_from_full_path(cloud_path)
716        blob.upload_from_string(file_contents)
717        logging.info(f"Successfully wrote content to {cloud_path}")
718
719    @staticmethod
720    def get_active_gcloud_account() -> str:
721        """
722        Get the active GCP email for the current account.
723
724        **Returns:**
725        - str: The active GCP account email.
726        """
727        result = subprocess.run(
728            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
729            capture_output=True,
730            text=True,
731            check=True
732        )
733        return result.stdout.strip()
734
735    def has_write_permission(self, cloud_path: str) -> bool:
736        """
737        Check if the current user has permission to write to a GCP path.
738
739        This method tests write access by attempting to update the metadata
740        of an existing blob or create a zero-byte temporary file if the blob
741        doesn't exist. The temporary file is deleted immediately if created.
742
743        **Args:**
744        - cloud_path (str): The GCS path to check for write permissions.
745
746        **Returns:**
747        - bool: True if the user has write permission, False otherwise.
748        """
749        if not cloud_path.startswith("gs://"):
750            raise ValueError("cloud_path must start with 'gs://'")
751        if cloud_path.endswith("/"):
752            logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp")
753            cloud_path = f"{cloud_path}permission_test_temp"
754        try:
755            blob = self.load_blob_from_full_path(cloud_path)
756            if blob.exists():
757                # Try updating metadata (doesn't change the content)
758                original_metadata = blob.metadata or {}
759                test_metadata = original_metadata.copy()
760                test_metadata["_write_permission_test"] = "true"
761
762                blob.metadata = test_metadata
763                blob.patch()
764
765                # Restore the original metadata
766                blob.metadata = original_metadata
767                blob.patch()
768
769                logging.info(f"Write permission confirmed for existing blob {cloud_path}")
770                return True
771            else:
772                # Try writing a temporary file to the bucket
773                blob.upload_from_string("")
774
775                # Clean up the test file
776                blob.delete()
777                logging.info(f"Write permission confirmed for {cloud_path}")
778                return True
779        except Forbidden:
780            logging.warning(f"No write permission on path {cloud_path}")
781            return False
782        except GoogleAPICallError as e:
783            logging.warning(f"Error testing write access to {cloud_path}: {e}")
784            return False
785
786    def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
787        """
788        Wait for write permissions on a GCP path, checking at regular intervals.
789
790        This method will periodically check if the user has write permission on the specified cloud path.
791        It will continue checking until either write permission is granted or the maximum wait time is reached.
792
793        **Args:**
794        - cloud_path (str): The GCS path to check for write permissions.
795        - interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
796        - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
797
798        **Returns:**
799        - bool: True if write permission is granted within the wait time, False otherwise.
800        """
801        if not cloud_path.startswith("gs://"):
802            raise ValueError("cloud_path must start with 'gs://'")
803
804        # Convert minutes to seconds for the sleep function
805        interval_seconds = interval_wait_time_minutes * 60
806        max_wait_seconds = max_wait_time_minutes * 60
807
808        start_time = time.time()
809        attempt_number = 1
810
811        logging.info(
812            f"Starting to check for write permissions on {cloud_path}. Will check "
813            f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).")
814
815        # First check immediately
816        if self.has_write_permission(cloud_path):
817            logging.info(f"Write permission confirmed on initial check for {cloud_path}")
818            return
819
820        # If first check fails, start periodic checks
821        while time.time() - start_time < max_wait_seconds:
822            elapsed_minutes = (time.time() - start_time) / 60
823            remaining_minutes = max_wait_time_minutes - elapsed_minutes
824
825            logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. "
826                         f"Time elapsed: {elapsed_minutes:.1f} minute(s). "
827                         f"Time remaining: {remaining_minutes:.1f} minute(s).")
828
829            # Sleep for the interval duration
830            time.sleep(interval_seconds)
831
832            attempt_number += 1
833            logging.info(f"Checking write permissions (attempt {attempt_number})...")
834
835            if self.has_write_permission(cloud_path):
836                elapsed_minutes = (time.time() - start_time) / 60
837                logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}")
838                return
839
840        # If we get here, we've exceeded the maximum wait time
841        raise PermissionError(
842            f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for "
843            f"{cloud_path} after {attempt_number} attempts.")

Class to handle GCP Cloud Functions.

GCPCloudFunctions( project: Optional[str] = None, service_account_json: Optional[str] = None)
37    def __init__(
38            self,
39            project: Optional[str] = None,
40            service_account_json: Optional[str] = None
41    ) -> None:
42        """
43        Initialize the GCPCloudFunctions class.
44
45        Authenticates using service account JSON if provided or default credentials,
46        and sets up the Storage Client.
47
48        Args:
49            project: Optional[str] = None
50                The GCP project ID. If not provided, will use project from service account or default.
51            service_account_json: Optional[str] = None
52                Path to service account JSON key file. If provided, will use these credentials.
53        """
54        # Initialize credentials and project
55        credentials = None
56        default_project = None
57
58        if service_account_json:
59            credentials = service_account.Credentials.from_service_account_file(service_account_json)
60            # Extract project from service account if not specified
61            if not project:
62                with open(service_account_json, 'r') as f:
63                    sa_info = json.load(f)
64                    project = sa_info.get('project_id')
65        else:
66            # Use default credentials
67            credentials, default_project = default()
68
69        # Set project if not already set
70        if not project:
71            project = default_project
72
73        self.client = storage.Client(credentials=credentials, project=project)
74        """@private"""

Initialize the GCPCloudFunctions class.

Authenticates using service account JSON if provided or default credentials, and sets up the Storage Client.

Args: project: Optional[str] = None The GCP project ID. If not provided, will use project from service account or default. service_account_json: Optional[str] = None Path to service account JSON key file. If provided, will use these credentials.

def load_blob_from_full_path(self, full_path: str) -> google.cloud.storage.blob.Blob:
 97    def load_blob_from_full_path(self, full_path: str) -> Blob:
 98        """
 99        Load a GCS blob object from a full GCS path.
100
101        **Args:**
102        - full_path (str): The full GCS path.
103
104        **Returns:**
105        - google.cloud.storage.blob.Blob: The GCS blob object.
106        """
107        file_path_components = self._process_cloud_path(full_path)
108
109        # Specify the billing project
110        bucket = self.client.bucket(file_path_components["bucket"], user_project=self.client.project)
111        blob = bucket.blob(file_path_components["blob_url"])
112
113        # If blob exists in GCS reload it so metadata is there
114        if blob.exists():
115            blob.reload()
116        return blob

Load a GCS blob object from a full GCS path.

Args:

  • full_path (str): The full GCS path.

Returns:

  • google.cloud.storage.blob.Blob: The GCS blob object.
def check_file_exists(self, full_path: str) -> bool:
118    def check_file_exists(self, full_path: str) -> bool:
119        """
120        Check if a file exists in GCS.
121
122        **Args:**
123        - full_path (str): The full GCS path.
124
125        **Returns:**
126        - bool: `True` if the file exists, `False` otherwise.
127        """
128        blob = self.load_blob_from_full_path(full_path)
129        return blob.exists()

Check if a file exists in GCS.

Args:

  • full_path (str): The full GCS path.

Returns:

  • bool: True if the file exists, False otherwise.
def list_bucket_contents( self, bucket_name: str, prefix: Optional[str] = None, file_extensions_to_ignore: list[str] = [], file_strings_to_ignore: list[str] = [], file_extensions_to_include: list[str] = [], file_name_only: bool = False) -> list[dict]:
194    def list_bucket_contents(
195            self,
196            bucket_name: str,
197            prefix: Optional[str] = None,
198            file_extensions_to_ignore: list[str] = [],
199            file_strings_to_ignore: list[str] = [],
200            file_extensions_to_include: list[str] = [],
201            file_name_only: bool = False
202    ) -> list[dict]:
203        """
204        List contents of a GCS bucket and return a list of dictionaries with file information.
205
206        **Args:**
207        - bucket_name (str): The name of the GCS bucket. If includes `gs://`, it will be removed.
208        - prefix (str, optional): The prefix to filter the blobs. Defaults to None.
209        - file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
210        - file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
211        - file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
212        - file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to `False`.
213
214        **Returns:**
215        - list[dict]: A list of dictionaries containing file information.
216        """
217        # If the bucket name starts with gs://, remove it
218        if bucket_name.startswith("gs://"):
219            bucket_name = bucket_name.split("/")[2].strip()
220
221        logging.info(f"Accessing bucket: {bucket_name} with project: {self.client.project}")
222
223        # Get the bucket object and set user_project for Requester Pays
224        bucket = self.client.bucket(bucket_name, user_project=self.client.project)
225
226        # List blobs within the bucket
227        blobs = bucket.list_blobs(prefix=prefix)
228        logging.info("Finished listing blobs. Processing files now.")
229
230        # Create a list of dictionaries containing file information
231        file_list = [
232            self._create_bucket_contents_dict(
233                blob=blob, bucket_name=bucket_name, file_name_only=file_name_only
234            )
235            for blob in blobs
236            if self._validate_include_blob(
237                blob=blob,
238                file_extensions_to_ignore=file_extensions_to_ignore,
239                file_strings_to_ignore=file_strings_to_ignore,
240                file_extensions_to_include=file_extensions_to_include,
241                bucket_name=bucket_name
242            ) and not blob.name.endswith("/")
243        ]
244        logging.info(f"Found {len(file_list)} files in bucket")
245        return file_list

List contents of a GCS bucket and return a list of dictionaries with file information.

Args:

  • bucket_name (str): The name of the GCS bucket. If includes gs://, it will be removed.
  • prefix (str, optional): The prefix to filter the blobs. Defaults to None.
  • file_extensions_to_ignore (list[str], optional): List of file extensions to ignore. Defaults to [].
  • file_strings_to_ignore (list[str], optional): List of file name substrings to ignore. Defaults to [].
  • file_extensions_to_include (list[str], optional): List of file extensions to include. Defaults to [].
  • file_name_only (bool, optional): Whether to return only the file list and no extra info. Defaults to False.

Returns:

  • list[dict]: A list of dictionaries containing file information.
def copy_cloud_file( self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
247    def copy_cloud_file(self, src_cloud_path: str, full_destination_path: str, verbose: bool = False) -> None:
248        """
249        Copy a file from one GCS location to another.
250
251        **Args:**
252        - src_cloud_path (str): The source GCS path.
253        - full_destination_path (str): The destination GCS path.
254        - verbose (bool, optional): Whether to log progress. Defaults to `False`.
255        """
256        try:
257            src_blob = self.load_blob_from_full_path(src_cloud_path)
258            dest_blob = self.load_blob_from_full_path(full_destination_path)
259
260            # Use rewrite so no timeouts
261            rewrite_token = False
262
263            while True:
264                rewrite_token, bytes_rewritten, bytes_to_rewrite = dest_blob.rewrite(
265                    src_blob, token=rewrite_token
266                )
267                if verbose:
268                    logging.info(
269                        f"{full_destination_path}: Progress so far: {bytes_rewritten}/{bytes_to_rewrite} bytes."
270                    )
271                if not rewrite_token:
272                    break
273
274        except Exception as e:
275            logging.error(
276                f"Encountered the following error while attempting to copy file from '{src_cloud_path}' to "
277                f"'{full_destination_path}': {e}. If this is a retryable error, it will be re-attempted"
278            )
279            raise

Copy a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
  • verbose (bool, optional): Whether to log progress. Defaults to False.
def delete_cloud_file(self, full_cloud_path: str) -> None:
281    def delete_cloud_file(self, full_cloud_path: str) -> None:
282        """
283        Delete a file from GCS.
284
285        **Args:**
286        - full_cloud_path (str): The GCS path of the file to delete.
287        """
288        blob = self.load_blob_from_full_path(full_cloud_path)
289        blob.delete()

Delete a file from GCS.

Args:

  • full_cloud_path (str): The GCS path of the file to delete.
def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
291    def move_cloud_file(self, src_cloud_path: str, full_destination_path: str) -> None:
292        """
293        Move a file from one GCS location to another.
294
295        **Args:**
296        - src_cloud_path (str): The source GCS path.
297        - full_destination_path (str): The destination GCS path.
298        """
299        self.copy_cloud_file(src_cloud_path, full_destination_path)
300        self.delete_cloud_file(src_cloud_path)

Move a file from one GCS location to another.

Args:

  • src_cloud_path (str): The source GCS path.
  • full_destination_path (str): The destination GCS path.
def get_filesize(self, target_path: str) -> int:
302    def get_filesize(self, target_path: str) -> int:
303        """
304        Get the size of a file in GCS.
305
306        **Args:**
307        - target_path (str): The GCS path of the file.
308
309        **Returns:**
310        - int: The size of the file in bytes.
311        """
312        blob = self.load_blob_from_full_path(target_path)
313        return blob.size

Get the size of a file in GCS.

Args:

  • target_path (str): The GCS path of the file.

Returns:

  • int: The size of the file in bytes.
def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
315    def validate_files_are_same(self, src_cloud_path: str, dest_cloud_path: str) -> bool:
316        """
317        Validate if two cloud files (source and destination) are identical based on their MD5 hashes.
318
319        **Args:**
320        - src_cloud_path (str): The source GCS path.
321        - dest_cloud_path (str): The destination GCS path.
322
323        **Returns:**
324        - bool: `True` if the files are identical, `False` otherwise.
325        """
326        src_blob = self.load_blob_from_full_path(src_cloud_path)
327        dest_blob = self.load_blob_from_full_path(dest_cloud_path)
328
329        # If either blob is None or does not exist
330        if not src_blob or not dest_blob or not src_blob.exists() or not dest_blob.exists():
331            return False
332        # If the MD5 hashes exist
333        if src_blob.md5_hash and dest_blob.md5_hash:
334            # And are the same return True
335            if src_blob.md5_hash == dest_blob.md5_hash:
336                return True
337        else:
338            # If md5 do not exist (for larger files they may not) check size matches
339            if src_blob.size == dest_blob.size:
340                return True
341        # Otherwise, return False
342        return False

Validate if two cloud files (source and destination) are identical based on their MD5 hashes.

Args:

  • src_cloud_path (str): The source GCS path.
  • dest_cloud_path (str): The destination GCS path.

Returns:

  • bool: True if the files are identical, False otherwise.
def delete_multiple_files( self, files_to_delete: list[str], workers: int = 10, max_retries: int = 5, verbose: bool = False, job_complete_for_logging: int = 500) -> None:
344    def delete_multiple_files(
345            self,
346            files_to_delete: list[str],
347            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
348            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
349            verbose: bool = False,
350            job_complete_for_logging: int = 500
351    ) -> None:
352        """
353        Delete multiple cloud files in parallel using multi-threading.
354
355        **Args:**
356        - files_to_delete (list[str]): List of GCS paths of the files to delete.
357        - workers (int, optional): Number of worker threads. Defaults to `10`.
358        - max_retries (int, optional): Maximum number of retries. Defaults to `5`.
359        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
360        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
361        """
362        list_of_jobs_args_list = [[file_path] for file_path in set(files_to_delete)]
363
364        MultiThreadedJobs().run_multi_threaded_job(
365            workers=workers,
366            function=self.delete_cloud_file,
367            list_of_jobs_args_list=list_of_jobs_args_list,
368            max_retries=max_retries,
369            fail_on_error=True,
370            verbose=verbose,
371            collect_output=False,
372            jobs_complete_for_logging=job_complete_for_logging
373        )

Delete multiple cloud files in parallel using multi-threading.

Args:

  • files_to_delete (list[str]): List of GCS paths of the files to delete.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.
def loop_and_log_validation_files_multithreaded( self, files_to_validate: list[dict], log_difference: bool, workers: int = 10, max_retries: int = 5, job_complete_for_logging: int = 500) -> list[dict]:
392    def loop_and_log_validation_files_multithreaded(
393            self,
394            files_to_validate: list[dict],
395            log_difference: bool,
396            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
397            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
398            job_complete_for_logging: int = 500
399    ) -> list[dict]:
400        """
401        Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.
402
403        **Args:**
404        - files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
405        - log_difference (bool): Whether to log differences if files are not identical. Set `False` if you are running
406                                   this at the start of a copy/move operation to check if files are already copied.
407        - workers (int, optional): Number of worker threads. Defaults to `10`.
408        - max_retries (int, optional): Maximum number of retries for all jobs. Defaults to `5`.
409        - job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
410
411        **Returns:**
412        - list[dict]: List of dictionaries containing files that are **not** identical.
413        """
414        logging.info(f"Validating if {len(files_to_validate)} files are identical")
415
416        # Prepare jobs: pass the necessary arguments to each validation
417        jobs = [(file_dict['source_file'], file_dict['full_destination_path']) for file_dict in files_to_validate]
418
419        # Use multithreaded job runner to validate the files
420        checked_files = MultiThreadedJobs().run_multi_threaded_job(
421            workers=workers,
422            function=self._validate_file_pair,
423            list_of_jobs_args_list=jobs,
424            collect_output=True,
425            max_retries=max_retries,
426            jobs_complete_for_logging=job_complete_for_logging
427        )
428        # If any files failed to load, raise an exception
429        if files_to_validate and None in checked_files:  # type: ignore[operator]
430            logging.error("Failed to validate all files, could not load some blobs")
431            raise Exception("Failed to validate all files")
432
433        # Get all files that are not identical
434        not_identical_files = [
435            file_dict
436            for file_dict in checked_files  # type: ignore[operator, union-attr]
437            if not file_dict['identical']
438        ]
439        if not_identical_files:
440            if log_difference:
441                for file_dict in not_identical_files:
442                    logging.warning(
443                        f"File {file_dict['source_file']} and {file_dict['full_destination_path']} are not identical"
444                    )
445            logging.info(f"Validation complete. {len(not_identical_files)} files are not identical.")
446        return not_identical_files

Validate if multiple cloud files are identical based on their MD5 hashes using multithreading.

Args:

  • files_to_validate (list[dict]): List of dictionaries containing source and destination file paths.
  • log_difference (bool): Whether to log differences if files are not identical. Set False if you are running this at the start of a copy/move operation to check if files are already copied.
  • workers (int, optional): Number of worker threads. Defaults to 10.
  • max_retries (int, optional): Maximum number of retries for all jobs. Defaults to 5.
  • job_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Returns:

  • list[dict]: List of dictionaries containing files that are not identical.
def multithread_copy_of_files_with_validation( self, files_to_copy: list[dict], workers: int = 10, max_retries: int = 5, skip_check_if_already_copied: bool = False) -> None:
448    def multithread_copy_of_files_with_validation(
449            self,
450            files_to_copy: list[dict],
451            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
452            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
453            skip_check_if_already_copied: bool = False
454    ) -> None:
455        """
456        Copy multiple files in parallel with validation.
457
458        **Args:**
459        - files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
460                Dictionary should have keys `source_file` and `full_destination_path`
461        - workers (int): Number of worker threads. Defaults to `10`.
462        - max_retries (int): Maximum number of retries. Defaults to `5`
463        - skip_check_if_already_copied (bool, optional): Whether to skip checking
464                if files are already copied and start copying right away. Defaults to `False`.
465        """
466        if skip_check_if_already_copied:
467            logging.info("Skipping check if files are already copied")
468            updated_files_to_move = files_to_copy
469        else:
470            updated_files_to_move = self.loop_and_log_validation_files_multithreaded(
471                files_to_copy,
472                log_difference=False,
473                workers=workers,
474                max_retries=max_retries
475            )
476        # If all files are already copied, return
477        if not updated_files_to_move:
478            logging.info("All files are already copied")
479            return None
480        logging.info(f"Attempting to {COPY} {len(updated_files_to_move)} files")
481        self.move_or_copy_multiple_files(updated_files_to_move, COPY, workers, max_retries)
482        logging.info(f"Validating all {len(updated_files_to_move)} new files are identical to original")
483        # Validate that all files were copied successfully
484        files_not_moved_successfully = self.loop_and_log_validation_files_multithreaded(
485            files_to_copy,
486            workers=workers,
487            log_difference=True,
488            max_retries=max_retries
489        )
490        if files_not_moved_successfully:
491            logging.error(f"Failed to copy {len(files_not_moved_successfully)} files")
492            raise Exception("Failed to copy all files")
493        logging.info(f"Successfully copied {len(updated_files_to_move)} files")
494        return None

Copy multiple files in parallel with validation.

Args:

  • files_to_copy (list[dict]): List of dictionaries containing source and destination file paths. Dictionary should have keys source_file and full_destination_path
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5
  • skip_check_if_already_copied (bool, optional): Whether to skip checking if files are already copied and start copying right away. Defaults to False.
def move_or_copy_multiple_files( self, files_to_move: list[dict], action: str, workers: int = 10, max_retries: int = 5, verbose: bool = False, jobs_complete_for_logging: int = 500) -> None:
496    def move_or_copy_multiple_files(
497            self, files_to_move: list[dict],
498            action: str,
499            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
500            max_retries: int = ARG_DEFAULTS["max_retries"],  # type: ignore[assignment]
501            verbose: bool = False,
502            jobs_complete_for_logging: int = 500
503    ) -> None:
504        """
505        Move or copy multiple files in parallel.
506
507        **Args:**
508        - files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
509        - action (str): The action to perform (should be one of `ops_utils.gcp_utils.MOVE`
510        or `ops_utils.gcp_utils.COPY`).
511        - workers (int): Number of worker threads. Defaults to `10`.
512        - max_retries (int): Maximum number of retries. Defaults to `5`.
513        - verbose (bool, optional): Whether to log each job's success. Defaults to `False`.
514        - jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to `500`.
515
516        **Raises:**
517        - ValueError: If the action is not one of `ops_utils.gcp_utils.MOVE` or `ops_utils.gcp_utils.COPY`.
518        """
519        if action == MOVE:
520            cloud_function = self.move_cloud_file
521        elif action == COPY:
522            cloud_function = self.copy_cloud_file
523        else:
524            raise ValueError("Must either select move or copy")
525
526        list_of_jobs_args_list = [
527            [
528                file_dict['source_file'], file_dict['full_destination_path']
529            ]
530            for file_dict in files_to_move
531        ]
532        MultiThreadedJobs().run_multi_threaded_job(
533            workers=workers,
534            function=cloud_function,
535            list_of_jobs_args_list=list_of_jobs_args_list,
536            max_retries=max_retries,
537            fail_on_error=True,
538            verbose=verbose,
539            collect_output=False,
540            jobs_complete_for_logging=jobs_complete_for_logging
541        )

Move or copy multiple files in parallel.

Args:

  • files_to_move (list[dict]): List of dictionaries containing source and destination file paths.
  • action (str): The action to perform (should be one of ops_utils.gcp_utils.MOVE or ops_utils.gcp_utils.COPY).
  • workers (int): Number of worker threads. Defaults to 10.
  • max_retries (int): Maximum number of retries. Defaults to 5.
  • verbose (bool, optional): Whether to log each job's success. Defaults to False.
  • jobs_complete_for_logging (int, optional): The number of jobs to complete before logging. Defaults to 500.

Raises:

def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
543    def read_file(self, cloud_path: str, encoding: str = 'utf-8') -> str:
544        """
545        Read the content of a file from GCS.
546
547        **Args:**
548        - cloud_path (str): The GCS path of the file to read.
549        - encoding (str, optional): The encoding to use. Defaults to `utf-8`.
550
551        **Returns:**
552        - bytes: The content of the file as bytes.
553        """
554        blob = self.load_blob_from_full_path(cloud_path)
555        # Download the file content as bytes
556        content_bytes = blob.download_as_bytes()
557        # Convert bytes to string
558        content_str = content_bytes.decode(encoding)
559        return content_str

Read the content of a file from GCS.

Args:

  • cloud_path (str): The GCS path of the file to read.
  • encoding (str, optional): The encoding to use. Defaults to utf-8.

Returns:

  • bytes: The content of the file as bytes.
def upload_blob( self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
561    def upload_blob(self, destination_path: str, source_file: str, custom_metadata: Optional[dict] = None) -> None:
562        """
563        Upload a file to GCS.
564
565        **Args:**
566        - destination_path (str): The destination GCS path.
567        - source_file (str): The source file path.
568        - custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
569        """
570        blob = self.load_blob_from_full_path(destination_path)
571        if custom_metadata:
572            blob.metadata = custom_metadata
573        blob.upload_from_filename(source_file)

Upload a file to GCS.

Args:

  • destination_path (str): The destination GCS path.
  • source_file (str): The source file path.
  • custom_metadata (dict, optional): A dictionary of custom metadata to attach to the blob. Defaults to None.
def get_object_md5( self, file_path: str, chunk_size: int = 256000, logging_bytes: int = 1000000000, returned_md5_format: str = 'hex') -> str:
575    def get_object_md5(
576        self,
577        file_path: str,
578        # https://jbrojbrojbro.medium.com/finding-the-optimal-download-size-with-gcs-259dc7f26ad2
579        chunk_size: int = parse_size("256 KB"),
580        logging_bytes: int = parse_size("1 GB"),
581        returned_md5_format: str = "hex"
582    ) -> str:
583        """
584        Calculate the MD5 checksum of a file in GCS.
585
586        **Args:**
587        - file_path (str): The GCS path of the file.
588        - chunk_size (int, optional): The size of each chunk to read. Defaults to `256 KB`.
589        - logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to `1 GB`.
590        - returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to `hex`.
591                Options are `ops_utils.gcp_utils.MD5_HEX` or `ops_utils.gcp_utils.MD5_BASE64`.
592
593        **Returns:**
594        - str: The MD5 checksum of the file.
595
596        **Raises:**
597        - ValueError: If the `returned_md5_format` is not one of `ops_utils.gcp_utils.MD5_HEX`
598        or `ops_utils.gcp_utils.MD5_BASE64`
599        """
600        if returned_md5_format not in ["hex", "base64"]:
601            raise ValueError("returned_md5_format must be 'hex' or 'base64'")
602
603        blob = self.load_blob_from_full_path(file_path)
604
605        # Create an MD5 hash object
606        md5_hash = hashlib.md5()
607
608        blob_size_str = format_size(blob.size)
609        logging.info(f"Streaming {file_path} which is {blob_size_str}")
610        # Use a BytesIO stream to collect data in chunks and upload it
611        buffer = io.BytesIO()
612        total_bytes_streamed = 0
613        # Keep track of the last logged size for data logging
614        last_logged = 0
615
616        with blob.open("rb") as source_stream:
617            while True:
618                chunk = source_stream.read(chunk_size)
619                if not chunk:
620                    break
621                md5_hash.update(chunk)
622                buffer.write(chunk)
623                total_bytes_streamed += len(chunk)
624                # Log progress every 1 gb if verbose used
625                if total_bytes_streamed - last_logged >= logging_bytes:
626                    logging.info(f"Streamed {format_size(total_bytes_streamed)} / {blob_size_str} so far")
627                    last_logged = total_bytes_streamed
628
629        if returned_md5_format == "hex":
630            md5 = md5_hash.hexdigest()
631            logging.info(f"MD5 (hex) for {file_path}: {md5}")
632        elif returned_md5_format == "base64":
633            md5 = base64.b64encode(md5_hash.digest()).decode("utf-8")
634            logging.info(f"MD5 (base64) for {file_path}: {md5}")
635        return md5

Calculate the MD5 checksum of a file in GCS.

Args:

  • file_path (str): The GCS path of the file.
  • chunk_size (int, optional): The size of each chunk to read. Defaults to 256 KB.
  • logging_bytes (int, optional): The number of bytes to read before logging progress. Defaults to 1 GB.
  • returned_md5_format (str, optional): The format of the MD5 checksum to return. Defaults to hex. Options are ops_utils.gcp_utils.MD5_HEX or ops_utils.gcp_utils.MD5_BASE64.

Returns:

  • str: The MD5 checksum of the file.

Raises:

def set_acl_public_read(self, cloud_path: str) -> None:
637    def set_acl_public_read(self, cloud_path: str) -> None:
638        """
639        Set the file in the bucket to be publicly readable.
640
641        **Args:**
642        - cloud_path (str): The GCS path of the file to be set as public readable.
643        """
644        blob = self.load_blob_from_full_path(cloud_path)
645        blob.acl.all().grant_read()
646        blob.acl.save()

Set the file in the bucket to be publicly readable.

Args:

  • cloud_path (str): The GCS path of the file to be set as public readable.
def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
648    def set_acl_group_owner(self, cloud_path: str, group_email: str) -> None:
649        """
650        Set the file in the bucket to grant OWNER permission to a specific group.
651
652        **Args:**
653        - cloud_path (str): The GCS path of the file.
654        - group_email (str): The email of the group to grant OWNER permission
655        """
656        blob = self.load_blob_from_full_path(cloud_path)
657        blob.acl.group(group_email).grant_owner()
658        blob.acl.save()

Set the file in the bucket to grant OWNER permission to a specific group.

Args:

  • cloud_path (str): The GCS path of the file.
  • group_email (str): The email of the group to grant OWNER permission
def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
660    def set_metadata_cache_control(self, cloud_path: str, cache_control: str) -> None:
661        """
662        Set Cache-Control metadata for a file.
663
664        **Args:**
665        - cloud_path (str): The GCS path of the file.
666        - cache_control (str): The Cache-Control metadata to set.
667        """
668        blob = self.load_blob_from_full_path(cloud_path)
669        blob.cache_control = cache_control
670        blob.patch()

Set Cache-Control metadata for a file.

Args:

  • cloud_path (str): The GCS path of the file.
  • cache_control (str): The Cache-Control metadata to set.
def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
672    def get_file_contents_of_most_recent_blob_in_bucket(self, bucket_name: str) -> Optional[tuple[str, str]]:
673        """
674        Get the most recent blob in the bucket.
675        
676        If the blob with the most recent timestamp doesn't have
677        any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most
678        recent file until a log with useful information is encountered. This is useful when combing through
679        GCP activity logs for Terra workspace buckets.
680
681        **Args:**
682        - bucket_name (str): The GCS bucket name.
683
684        **Returns:**
685        - Optional tuple of the blob found and the file contents from the blob
686        """
687        blobs = sorted(
688            self.client.list_blobs(bucket_name), key=lambda blob: blob.updated, reverse=True
689        )
690        for blob in blobs:
691            # Download the file contents as a string
692            file_contents = blob.download_as_text()
693
694            # Check if the content matches the undesired format
695            lines = file_contents.splitlines()
696            if len(lines) > 1 and lines[0] == '"bucket","storage_byte_hours"':
697                logging.info(f"Skipping file {blob.name} as it matches the undesired format.")
698                continue
699
700            # If it doesn't match the undesired format, return its content
701            logging.info(f"Found valid file: {blob.name}")
702            return blob, file_contents
703
704        logging.info("No valid files found.")
705        return None

Get the most recent blob in the bucket.

If the blob with the most recent timestamp doesn't have any logging besides the basic "storage_byte_hours" logging, will continue to look at the next most recent file until a log with useful information is encountered. This is useful when combing through GCP activity logs for Terra workspace buckets.

Args:

  • bucket_name (str): The GCS bucket name.

Returns:

  • Optional tuple of the blob found and the file contents from the blob
def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
707    def write_to_gcp_file(self, cloud_path: str, file_contents: str) -> None:
708        """
709        Write content to a file in GCS.
710
711        **Args:**
712        - cloud_path (str): The GCS path of the file to write.
713        - file_contents (str): The content to write.
714        """
715        blob = self.load_blob_from_full_path(cloud_path)
716        blob.upload_from_string(file_contents)
717        logging.info(f"Successfully wrote content to {cloud_path}")

Write content to a file in GCS.

Args:

  • cloud_path (str): The GCS path of the file to write.
  • file_contents (str): The content to write.
@staticmethod
def get_active_gcloud_account() -> str:
719    @staticmethod
720    def get_active_gcloud_account() -> str:
721        """
722        Get the active GCP email for the current account.
723
724        **Returns:**
725        - str: The active GCP account email.
726        """
727        result = subprocess.run(
728            args=["gcloud", "auth", "list", "--filter=status:ACTIVE", "--format=value(account)"],
729            capture_output=True,
730            text=True,
731            check=True
732        )
733        return result.stdout.strip()

Get the active GCP email for the current account.

Returns:

  • str: The active GCP account email.
def has_write_permission(self, cloud_path: str) -> bool:
735    def has_write_permission(self, cloud_path: str) -> bool:
736        """
737        Check if the current user has permission to write to a GCP path.
738
739        This method tests write access by attempting to update the metadata
740        of an existing blob or create a zero-byte temporary file if the blob
741        doesn't exist. The temporary file is deleted immediately if created.
742
743        **Args:**
744        - cloud_path (str): The GCS path to check for write permissions.
745
746        **Returns:**
747        - bool: True if the user has write permission, False otherwise.
748        """
749        if not cloud_path.startswith("gs://"):
750            raise ValueError("cloud_path must start with 'gs://'")
751        if cloud_path.endswith("/"):
752            logging.warning(f"Provided cloud path {cloud_path} is a directory, will check {cloud_path}permission_test_temp")
753            cloud_path = f"{cloud_path}permission_test_temp"
754        try:
755            blob = self.load_blob_from_full_path(cloud_path)
756            if blob.exists():
757                # Try updating metadata (doesn't change the content)
758                original_metadata = blob.metadata or {}
759                test_metadata = original_metadata.copy()
760                test_metadata["_write_permission_test"] = "true"
761
762                blob.metadata = test_metadata
763                blob.patch()
764
765                # Restore the original metadata
766                blob.metadata = original_metadata
767                blob.patch()
768
769                logging.info(f"Write permission confirmed for existing blob {cloud_path}")
770                return True
771            else:
772                # Try writing a temporary file to the bucket
773                blob.upload_from_string("")
774
775                # Clean up the test file
776                blob.delete()
777                logging.info(f"Write permission confirmed for {cloud_path}")
778                return True
779        except Forbidden:
780            logging.warning(f"No write permission on path {cloud_path}")
781            return False
782        except GoogleAPICallError as e:
783            logging.warning(f"Error testing write access to {cloud_path}: {e}")
784            return False

Check if the current user has permission to write to a GCP path.

This method tests write access by attempting to update the metadata of an existing blob or create a zero-byte temporary file if the blob doesn't exist. The temporary file is deleted immediately if created.

Args:

  • cloud_path (str): The GCS path to check for write permissions.

Returns:

  • bool: True if the user has write permission, False otherwise.
def wait_for_write_permission( self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
786    def wait_for_write_permission(self, cloud_path: str, interval_wait_time_minutes: int, max_wait_time_minutes: int) -> None:
787        """
788        Wait for write permissions on a GCP path, checking at regular intervals.
789
790        This method will periodically check if the user has write permission on the specified cloud path.
791        It will continue checking until either write permission is granted or the maximum wait time is reached.
792
793        **Args:**
794        - cloud_path (str): The GCS path to check for write permissions.
795        - interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
796        - max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.
797
798        **Returns:**
799        - bool: True if write permission is granted within the wait time, False otherwise.
800        """
801        if not cloud_path.startswith("gs://"):
802            raise ValueError("cloud_path must start with 'gs://'")
803
804        # Convert minutes to seconds for the sleep function
805        interval_seconds = interval_wait_time_minutes * 60
806        max_wait_seconds = max_wait_time_minutes * 60
807
808        start_time = time.time()
809        attempt_number = 1
810
811        logging.info(
812            f"Starting to check for write permissions on {cloud_path}. Will check "
813            f"every {interval_wait_time_minutes} minute(s) for up to {max_wait_time_minutes} minute(s).")
814
815        # First check immediately
816        if self.has_write_permission(cloud_path):
817            logging.info(f"Write permission confirmed on initial check for {cloud_path}")
818            return
819
820        # If first check fails, start periodic checks
821        while time.time() - start_time < max_wait_seconds:
822            elapsed_minutes = (time.time() - start_time) / 60
823            remaining_minutes = max_wait_time_minutes - elapsed_minutes
824
825            logging.info(f"Waiting {interval_wait_time_minutes} minute(s) before next permission check. "
826                         f"Time elapsed: {elapsed_minutes:.1f} minute(s). "
827                         f"Time remaining: {remaining_minutes:.1f} minute(s).")
828
829            # Sleep for the interval duration
830            time.sleep(interval_seconds)
831
832            attempt_number += 1
833            logging.info(f"Checking write permissions (attempt {attempt_number})...")
834
835            if self.has_write_permission(cloud_path):
836                elapsed_minutes = (time.time() - start_time) / 60
837                logging.info(f"Write permission confirmed after {elapsed_minutes:.1f} minute(s) on attempt {attempt_number}")
838                return
839
840        # If we get here, we've exceeded the maximum wait time
841        raise PermissionError(
842            f"Maximum wait time of {max_wait_time_minutes} minute(s) exceeded. Write permission was not granted for "
843            f"{cloud_path} after {attempt_number} attempts.")

Wait for write permissions on a GCP path, checking at regular intervals.

This method will periodically check if the user has write permission on the specified cloud path. It will continue checking until either write permission is granted or the maximum wait time is reached.

Args:

  • cloud_path (str): The GCS path to check for write permissions.
  • interval_wait_time_minutes (int): Time in minutes to wait between permission checks.
  • max_wait_time_minutes (int): Maximum time in minutes to wait for permissions.

Returns:

  • bool: True if write permission is granted within the wait time, False otherwise.