ops_utils.tdr_utils.renaming_util

Module for renaming files in TDR and updating metadata.

  1"""Module for renaming files in TDR and updating metadata."""
  2import logging
  3import os
  4from typing import Optional, Tuple
  5from .tdr_ingest_utils import StartAndMonitorIngest
  6from .tdr_api_utils import TDR
  7from ..gcp_utils import GCPCloudFunctions
  8from ..vars import ARG_DEFAULTS
  9
 10
 11class GetRowAndFileInfoForReingest:
 12    """Class to obtain file and row information for re-ingesting files in TDR."""
 13
 14    def __init__(
 15            self,
 16            table_schema_info: dict,
 17            files_info: dict,
 18            table_metrics: list[dict],
 19            original_column: str,
 20            new_column: str,
 21            row_identifier: str,
 22            temp_bucket: str,
 23            update_original_column: bool = False,
 24            column_update_only: bool = False
 25    ):
 26        """
 27        Initialize the GetRowAndFileInfoForReingest class.
 28
 29        **Args:**
 30        - table_schema_info (dict): Schema information of the table
 31        - files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata.
 32        - table_metrics (list[dict]): Metrics of the TDR table to update.
 33        - original_column (str): The column name with the original value.
 34        - new_column (str): The column name with the new value.
 35        - row_identifier (str): The identifier for the row. Should be the primary key.
 36        - temp_bucket (str): The temporary bucket for storing files.
 37        - update_original_column (bool, optional): Whether to update the original column.
 38                If not used will just update file paths. Defaults to `False`
 39        - column_update_only (bool, optional): Whether to update only the column and
 40                not update the file paths. Defaults to `False`.
 41        """
 42        self.table_schema_info = table_schema_info
 43        """@private"""
 44        self.files_info = files_info
 45        """@private"""
 46        self.table_metrics = table_metrics
 47        """@private"""
 48        self.original_column = original_column
 49        """@private"""
 50        self.new_column = new_column
 51        """@private"""
 52        self.row_identifier = row_identifier
 53        """@private"""
 54        self.total_files_to_reingest = 0
 55        """@private"""
 56        self.temp_bucket = temp_bucket
 57        """@private"""
 58        self.update_original_column = update_original_column
 59        """@private"""
 60        self.column_update_only = column_update_only
 61        """@private"""
 62
 63    def _create_paths(self, file_info: dict, og_basename: str, new_basename: str) -> Tuple[str, str, str]:
 64        """
 65        Create paths for the file in TDR, updated TDR metadata, and temporary storage.
 66
 67        Args:
 68            file_info (dict): Information about the file returned from TDR.
 69            og_basename (str): The original basename of the file.
 70            new_basename (str): The new basename of the file.
 71
 72        Returns:
 73            Tuple[str, str, str]: Paths for temporary storage, updated TDR metadata, and access URL.
 74        """
 75        # Access url is the full path to the file in TDR
 76        access_url = file_info["fileDetail"]["accessUrl"]
 77        # Get basename of file
 78        file_name = os.path.basename(access_url)
 79        file_safe_new_basename = new_basename.replace(" ", "_")
 80        # Replace basename with new basename and replace spaces with underscores
 81        new_file_name = file_name.replace(
 82            f'{og_basename}.', f'{file_safe_new_basename}.')
 83        # get tdr path. Not real path, just the metadata
 84        tdr_file_path = file_info["path"]
 85        # Create full path to updated tdr metadata file path
 86        updated_tdr_metadata_path = os.path.join(
 87            os.path.dirname(tdr_file_path), new_file_name)
 88        access_url_without_bucket = access_url.split('gs://')[1]
 89        temp_path = os.path.join(self.temp_bucket, os.path.dirname(
 90            access_url_without_bucket), new_file_name)
 91        return temp_path, updated_tdr_metadata_path, access_url
 92
 93    def _create_row_dict(
 94            self, row_dict: dict, file_ref_columns: list[str]
 95    ) -> Tuple[Optional[dict], Optional[list[dict]]]:
 96        """
 97        Go through each row and check each cell if it is a file and if it needs to be re-ingested.
 98
 99        If so, create a new row dict with the new file path.
100
101        Args:
102            row_dict (dict): The original row dictionary.
103            file_ref_columns (list[str]): List of columns that are file references.
104
105        Returns:
106            Tuple[Optional[dict], Optional[list[dict]]]: New row dictionary and list of files to copy,
107                or None if no re-ingestion is needed.
108        """
109        reingest_row = False
110        # Create new dictionary for ingest just the row identifier so can merge with right row later
111        new_row_dict = {self.row_identifier: row_dict[self.row_identifier]}
112        # Create list of all files for copy to temp location
113        temp_copy_list = []
114        # Get basename to replace
115        og_basename = row_dict[self.original_column]
116        new_basename = row_dict[self.new_column]
117        # If the new basename is the same as the old one, don't do anything
118        if og_basename == new_basename:
119            return None, None
120        for column_name in row_dict:
121            # Check if column is a fileref, cell is not empty, and update is not for columns only (not files)
122            if column_name in file_ref_columns and row_dict[column_name] and not self.column_update_only:
123                # Get full file info for that cell
124                file_info = self.files_info.get(row_dict[column_name])
125                # Get potential temp path, updated tdr metadata path, and access url for file
126                temp_path, updated_tdr_metadata_path, access_url = self._create_paths(
127                    file_info, og_basename, new_basename  # type: ignore[arg-type]
128                )
129                # Check if access_url starts with og basename and then .
130                if os.path.basename(access_url).startswith(f"{og_basename}."):
131                    self.total_files_to_reingest += 1
132                    # Add to ingest row dict to ingest from temp location with updated name
133                    new_row_dict[column_name] = {
134                        # temp path is the full path to the renamed file in the temp bucket
135                        "sourcePath": temp_path,
136                        # New target path with updated basename
137                        "targetPath": updated_tdr_metadata_path,
138                    }
139                    # Add to copy list for copying and renaming file currently in TDR
140                    temp_copy_list.append(
141                        {
142                            "source_file": access_url,
143                            "full_destination_path": temp_path
144                        }
145                    )
146                    # Set reingest row to True because files need to be updated
147                    reingest_row = True
148            # If column to update is set and column name is the column to update
149            # then update the new row dict with the new file basename
150            elif self.update_original_column and column_name == self.original_column:
151                new_row_dict[column_name] = row_dict[self.new_column]
152                reingest_row = True
153        if reingest_row:
154            return new_row_dict, temp_copy_list
155        else:
156            return None, None
157
158    def get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]:
159        """
160        Get the list of rows to re-ingest and files to copy to temporary storage.
161
162        Iterates through the table metrics, identifies the rows and files that need to be re-ingested,
163        and prepares lists of these rows and files.
164
165        **Returns:**
166        - Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy.
167        """
168        rows_to_reingest = []
169        files_to_copy_to_temp = []
170        # Get all columns in the table that are file references
171        file_ref_columns = [
172            col['name'] for col in self.table_schema_info['columns'] if col['datatype'] == 'fileref']
173        for row_dict in self.table_metrics:
174            new_row_dict, temp_copy_list = self._create_row_dict(row_dict, file_ref_columns)
175            # If there is something to copy and update
176            if new_row_dict and temp_copy_list:
177                rows_to_reingest.append(new_row_dict)
178                files_to_copy_to_temp.append(temp_copy_list)
179        logging.info(f"Total rows to re-ingest: {len(rows_to_reingest)}")
180        logging.info(f"Total files to copy and re-ingest: {self.total_files_to_reingest}")
181        return rows_to_reingest, files_to_copy_to_temp
182
183
184class BatchCopyAndIngest:
185    """Class to perform batch copy and ingest of files to TDR."""
186
187    def __init__(
188            self,
189            rows_to_ingest: list[dict],
190            tdr: TDR,
191            target_table_name: str,
192            update_strategy: str,
193            dataset_id: str,
194            row_files_to_copy: list[list[dict]],
195            copy_and_ingest_batch_size: int = ARG_DEFAULTS["file_ingest_batch_size"],  # type: ignore[assignment]
196            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
197            wait_time_to_poll: int = ARG_DEFAULTS['waiting_time_to_poll']  # type: ignore[assignment]
198    ) -> None:
199        """
200        Initialize the BatchCopyAndIngest class.
201
202        **Args:**
203        - rows_to_ingest (list[dict]): The list of rows to ingest.
204        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API.
205        - target_table_name (str): The name of the target table.
206        - update_strategy (str): Strategy for updating the data.
207        - dataset_id (str): The dataset ID.
208        - row_files_to_copy (list[list[dict]]): List of files to copy for each row.
209        - copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to `500`.
210        - workers (int, optional): Number of workers for parallel processing copies of files to temp location.
211        Defaults to `10`
212        - wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to `90`.
213        """
214        self.rows_to_ingest = rows_to_ingest
215        """@private"""
216        self.tdr = tdr
217        """@private"""
218        self.target_table_name = target_table_name
219        """@private"""
220        self.update_strategy = update_strategy
221        """@private"""
222        self.dataset_id = dataset_id
223        """@private"""
224        self.row_files_to_copy = row_files_to_copy
225        """@private"""
226        self.copy_and_ingest_batch_size = copy_and_ingest_batch_size
227        """@private"""
228        self.workers = workers
229        """@private"""
230        self.wait_time_to_poll = wait_time_to_poll
231        """@private"""
232
233    def run(self) -> None:
234        """
235        Run the batch copy and ingest process. 
236        
237        Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion.
238        """
239        # Batch through rows to copy files down and ingest so if script fails partway through large
240        # copy and ingest it will have copied over and ingested some of the files already
241        logging.info(
242            f"Batching {len(self.rows_to_ingest)} total rows into batches of {self.copy_and_ingest_batch_size} " +
243            "for copying to temp location and ingest"
244        )
245        total_batches = len(self.rows_to_ingest) // self.copy_and_ingest_batch_size + 1
246        gcp_functions = GCPCloudFunctions()
247        for i in range(0, len(self.rows_to_ingest), self.copy_and_ingest_batch_size):
248            batch_number = i // self.copy_and_ingest_batch_size + 1
249            logging.info(
250                f"Starting batch {batch_number} of {total_batches} for copy to temp and ingest")
251            ingest_metadata_batch = self.rows_to_ingest[i:i +
252                                                        self.copy_and_ingest_batch_size]
253            files_to_copy_batch = self.row_files_to_copy[i:i +
254                                                         self.copy_and_ingest_batch_size]
255            # files_to_copy_batch will be a list of lists of dicts, so flatten it
256            files_to_copy = [
257                file_dict for sublist in files_to_copy_batch for file_dict in sublist]
258
259            # Copy files to temp bucket if anything to copy
260            if files_to_copy:
261                gcp_functions.multithread_copy_of_files_with_validation(
262                    # Create dict with new names for copy of files to temp bucket
263                    files_to_copy=files_to_copy,
264                    workers=self.workers,
265                    max_retries=5
266                )
267
268            logging.info(
269                f"Batch {batch_number} of {total_batches} batches being ingested to dataset. "
270                f"{len(ingest_metadata_batch)} total rows in current ingest."
271            )
272            # Ingest renamed files into dataset
273            StartAndMonitorIngest(
274                tdr=self.tdr,
275                ingest_records=ingest_metadata_batch,
276                target_table_name=self.target_table_name,
277                dataset_id=self.dataset_id,
278                load_tag=f"{self.target_table_name}_re-ingest",
279                bulk_mode=False,
280                update_strategy=self.update_strategy,
281                waiting_time_to_poll=self.wait_time_to_poll,
282            ).run()
283
284            # Delete files from temp bucket
285            # Create list of files in temp location to delete. Full destination path is the temp location from copy
286            files_to_delete = [file_dict['full_destination_path']
287                               for file_dict in files_to_copy]
288            gcp_functions.delete_multiple_files(
289                # Create list of files in temp location to delete. Full destination path is the temp location from copy
290                files_to_delete=files_to_delete,
291                workers=self.workers,
292            )
class GetRowAndFileInfoForReingest:
 12class GetRowAndFileInfoForReingest:
 13    """Class to obtain file and row information for re-ingesting files in TDR."""
 14
 15    def __init__(
 16            self,
 17            table_schema_info: dict,
 18            files_info: dict,
 19            table_metrics: list[dict],
 20            original_column: str,
 21            new_column: str,
 22            row_identifier: str,
 23            temp_bucket: str,
 24            update_original_column: bool = False,
 25            column_update_only: bool = False
 26    ):
 27        """
 28        Initialize the GetRowAndFileInfoForReingest class.
 29
 30        **Args:**
 31        - table_schema_info (dict): Schema information of the table
 32        - files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata.
 33        - table_metrics (list[dict]): Metrics of the TDR table to update.
 34        - original_column (str): The column name with the original value.
 35        - new_column (str): The column name with the new value.
 36        - row_identifier (str): The identifier for the row. Should be the primary key.
 37        - temp_bucket (str): The temporary bucket for storing files.
 38        - update_original_column (bool, optional): Whether to update the original column.
 39                If not used will just update file paths. Defaults to `False`
 40        - column_update_only (bool, optional): Whether to update only the column and
 41                not update the file paths. Defaults to `False`.
 42        """
 43        self.table_schema_info = table_schema_info
 44        """@private"""
 45        self.files_info = files_info
 46        """@private"""
 47        self.table_metrics = table_metrics
 48        """@private"""
 49        self.original_column = original_column
 50        """@private"""
 51        self.new_column = new_column
 52        """@private"""
 53        self.row_identifier = row_identifier
 54        """@private"""
 55        self.total_files_to_reingest = 0
 56        """@private"""
 57        self.temp_bucket = temp_bucket
 58        """@private"""
 59        self.update_original_column = update_original_column
 60        """@private"""
 61        self.column_update_only = column_update_only
 62        """@private"""
 63
 64    def _create_paths(self, file_info: dict, og_basename: str, new_basename: str) -> Tuple[str, str, str]:
 65        """
 66        Create paths for the file in TDR, updated TDR metadata, and temporary storage.
 67
 68        Args:
 69            file_info (dict): Information about the file returned from TDR.
 70            og_basename (str): The original basename of the file.
 71            new_basename (str): The new basename of the file.
 72
 73        Returns:
 74            Tuple[str, str, str]: Paths for temporary storage, updated TDR metadata, and access URL.
 75        """
 76        # Access url is the full path to the file in TDR
 77        access_url = file_info["fileDetail"]["accessUrl"]
 78        # Get basename of file
 79        file_name = os.path.basename(access_url)
 80        file_safe_new_basename = new_basename.replace(" ", "_")
 81        # Replace basename with new basename and replace spaces with underscores
 82        new_file_name = file_name.replace(
 83            f'{og_basename}.', f'{file_safe_new_basename}.')
 84        # get tdr path. Not real path, just the metadata
 85        tdr_file_path = file_info["path"]
 86        # Create full path to updated tdr metadata file path
 87        updated_tdr_metadata_path = os.path.join(
 88            os.path.dirname(tdr_file_path), new_file_name)
 89        access_url_without_bucket = access_url.split('gs://')[1]
 90        temp_path = os.path.join(self.temp_bucket, os.path.dirname(
 91            access_url_without_bucket), new_file_name)
 92        return temp_path, updated_tdr_metadata_path, access_url
 93
 94    def _create_row_dict(
 95            self, row_dict: dict, file_ref_columns: list[str]
 96    ) -> Tuple[Optional[dict], Optional[list[dict]]]:
 97        """
 98        Go through each row and check each cell if it is a file and if it needs to be re-ingested.
 99
100        If so, create a new row dict with the new file path.
101
102        Args:
103            row_dict (dict): The original row dictionary.
104            file_ref_columns (list[str]): List of columns that are file references.
105
106        Returns:
107            Tuple[Optional[dict], Optional[list[dict]]]: New row dictionary and list of files to copy,
108                or None if no re-ingestion is needed.
109        """
110        reingest_row = False
111        # Create new dictionary for ingest just the row identifier so can merge with right row later
112        new_row_dict = {self.row_identifier: row_dict[self.row_identifier]}
113        # Create list of all files for copy to temp location
114        temp_copy_list = []
115        # Get basename to replace
116        og_basename = row_dict[self.original_column]
117        new_basename = row_dict[self.new_column]
118        # If the new basename is the same as the old one, don't do anything
119        if og_basename == new_basename:
120            return None, None
121        for column_name in row_dict:
122            # Check if column is a fileref, cell is not empty, and update is not for columns only (not files)
123            if column_name in file_ref_columns and row_dict[column_name] and not self.column_update_only:
124                # Get full file info for that cell
125                file_info = self.files_info.get(row_dict[column_name])
126                # Get potential temp path, updated tdr metadata path, and access url for file
127                temp_path, updated_tdr_metadata_path, access_url = self._create_paths(
128                    file_info, og_basename, new_basename  # type: ignore[arg-type]
129                )
130                # Check if access_url starts with og basename and then .
131                if os.path.basename(access_url).startswith(f"{og_basename}."):
132                    self.total_files_to_reingest += 1
133                    # Add to ingest row dict to ingest from temp location with updated name
134                    new_row_dict[column_name] = {
135                        # temp path is the full path to the renamed file in the temp bucket
136                        "sourcePath": temp_path,
137                        # New target path with updated basename
138                        "targetPath": updated_tdr_metadata_path,
139                    }
140                    # Add to copy list for copying and renaming file currently in TDR
141                    temp_copy_list.append(
142                        {
143                            "source_file": access_url,
144                            "full_destination_path": temp_path
145                        }
146                    )
147                    # Set reingest row to True because files need to be updated
148                    reingest_row = True
149            # If column to update is set and column name is the column to update
150            # then update the new row dict with the new file basename
151            elif self.update_original_column and column_name == self.original_column:
152                new_row_dict[column_name] = row_dict[self.new_column]
153                reingest_row = True
154        if reingest_row:
155            return new_row_dict, temp_copy_list
156        else:
157            return None, None
158
159    def get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]:
160        """
161        Get the list of rows to re-ingest and files to copy to temporary storage.
162
163        Iterates through the table metrics, identifies the rows and files that need to be re-ingested,
164        and prepares lists of these rows and files.
165
166        **Returns:**
167        - Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy.
168        """
169        rows_to_reingest = []
170        files_to_copy_to_temp = []
171        # Get all columns in the table that are file references
172        file_ref_columns = [
173            col['name'] for col in self.table_schema_info['columns'] if col['datatype'] == 'fileref']
174        for row_dict in self.table_metrics:
175            new_row_dict, temp_copy_list = self._create_row_dict(row_dict, file_ref_columns)
176            # If there is something to copy and update
177            if new_row_dict and temp_copy_list:
178                rows_to_reingest.append(new_row_dict)
179                files_to_copy_to_temp.append(temp_copy_list)
180        logging.info(f"Total rows to re-ingest: {len(rows_to_reingest)}")
181        logging.info(f"Total files to copy and re-ingest: {self.total_files_to_reingest}")
182        return rows_to_reingest, files_to_copy_to_temp

Class to obtain file and row information for re-ingesting files in TDR.

GetRowAndFileInfoForReingest( table_schema_info: dict, files_info: dict, table_metrics: list[dict], original_column: str, new_column: str, row_identifier: str, temp_bucket: str, update_original_column: bool = False, column_update_only: bool = False)
15    def __init__(
16            self,
17            table_schema_info: dict,
18            files_info: dict,
19            table_metrics: list[dict],
20            original_column: str,
21            new_column: str,
22            row_identifier: str,
23            temp_bucket: str,
24            update_original_column: bool = False,
25            column_update_only: bool = False
26    ):
27        """
28        Initialize the GetRowAndFileInfoForReingest class.
29
30        **Args:**
31        - table_schema_info (dict): Schema information of the table
32        - files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata.
33        - table_metrics (list[dict]): Metrics of the TDR table to update.
34        - original_column (str): The column name with the original value.
35        - new_column (str): The column name with the new value.
36        - row_identifier (str): The identifier for the row. Should be the primary key.
37        - temp_bucket (str): The temporary bucket for storing files.
38        - update_original_column (bool, optional): Whether to update the original column.
39                If not used will just update file paths. Defaults to `False`
40        - column_update_only (bool, optional): Whether to update only the column and
41                not update the file paths. Defaults to `False`.
42        """
43        self.table_schema_info = table_schema_info
44        """@private"""
45        self.files_info = files_info
46        """@private"""
47        self.table_metrics = table_metrics
48        """@private"""
49        self.original_column = original_column
50        """@private"""
51        self.new_column = new_column
52        """@private"""
53        self.row_identifier = row_identifier
54        """@private"""
55        self.total_files_to_reingest = 0
56        """@private"""
57        self.temp_bucket = temp_bucket
58        """@private"""
59        self.update_original_column = update_original_column
60        """@private"""
61        self.column_update_only = column_update_only
62        """@private"""

Initialize the GetRowAndFileInfoForReingest class.

Args:

  • table_schema_info (dict): Schema information of the table
  • files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata.
  • table_metrics (list[dict]): Metrics of the TDR table to update.
  • original_column (str): The column name with the original value.
  • new_column (str): The column name with the new value.
  • row_identifier (str): The identifier for the row. Should be the primary key.
  • temp_bucket (str): The temporary bucket for storing files.
  • update_original_column (bool, optional): Whether to update the original column. If not used will just update file paths. Defaults to False
  • column_update_only (bool, optional): Whether to update only the column and not update the file paths. Defaults to False.
def get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]:
159    def get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]:
160        """
161        Get the list of rows to re-ingest and files to copy to temporary storage.
162
163        Iterates through the table metrics, identifies the rows and files that need to be re-ingested,
164        and prepares lists of these rows and files.
165
166        **Returns:**
167        - Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy.
168        """
169        rows_to_reingest = []
170        files_to_copy_to_temp = []
171        # Get all columns in the table that are file references
172        file_ref_columns = [
173            col['name'] for col in self.table_schema_info['columns'] if col['datatype'] == 'fileref']
174        for row_dict in self.table_metrics:
175            new_row_dict, temp_copy_list = self._create_row_dict(row_dict, file_ref_columns)
176            # If there is something to copy and update
177            if new_row_dict and temp_copy_list:
178                rows_to_reingest.append(new_row_dict)
179                files_to_copy_to_temp.append(temp_copy_list)
180        logging.info(f"Total rows to re-ingest: {len(rows_to_reingest)}")
181        logging.info(f"Total files to copy and re-ingest: {self.total_files_to_reingest}")
182        return rows_to_reingest, files_to_copy_to_temp

Get the list of rows to re-ingest and files to copy to temporary storage.

Iterates through the table metrics, identifies the rows and files that need to be re-ingested, and prepares lists of these rows and files.

Returns:

  • Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy.
class BatchCopyAndIngest:
185class BatchCopyAndIngest:
186    """Class to perform batch copy and ingest of files to TDR."""
187
188    def __init__(
189            self,
190            rows_to_ingest: list[dict],
191            tdr: TDR,
192            target_table_name: str,
193            update_strategy: str,
194            dataset_id: str,
195            row_files_to_copy: list[list[dict]],
196            copy_and_ingest_batch_size: int = ARG_DEFAULTS["file_ingest_batch_size"],  # type: ignore[assignment]
197            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
198            wait_time_to_poll: int = ARG_DEFAULTS['waiting_time_to_poll']  # type: ignore[assignment]
199    ) -> None:
200        """
201        Initialize the BatchCopyAndIngest class.
202
203        **Args:**
204        - rows_to_ingest (list[dict]): The list of rows to ingest.
205        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API.
206        - target_table_name (str): The name of the target table.
207        - update_strategy (str): Strategy for updating the data.
208        - dataset_id (str): The dataset ID.
209        - row_files_to_copy (list[list[dict]]): List of files to copy for each row.
210        - copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to `500`.
211        - workers (int, optional): Number of workers for parallel processing copies of files to temp location.
212        Defaults to `10`
213        - wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to `90`.
214        """
215        self.rows_to_ingest = rows_to_ingest
216        """@private"""
217        self.tdr = tdr
218        """@private"""
219        self.target_table_name = target_table_name
220        """@private"""
221        self.update_strategy = update_strategy
222        """@private"""
223        self.dataset_id = dataset_id
224        """@private"""
225        self.row_files_to_copy = row_files_to_copy
226        """@private"""
227        self.copy_and_ingest_batch_size = copy_and_ingest_batch_size
228        """@private"""
229        self.workers = workers
230        """@private"""
231        self.wait_time_to_poll = wait_time_to_poll
232        """@private"""
233
234    def run(self) -> None:
235        """
236        Run the batch copy and ingest process. 
237        
238        Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion.
239        """
240        # Batch through rows to copy files down and ingest so if script fails partway through large
241        # copy and ingest it will have copied over and ingested some of the files already
242        logging.info(
243            f"Batching {len(self.rows_to_ingest)} total rows into batches of {self.copy_and_ingest_batch_size} " +
244            "for copying to temp location and ingest"
245        )
246        total_batches = len(self.rows_to_ingest) // self.copy_and_ingest_batch_size + 1
247        gcp_functions = GCPCloudFunctions()
248        for i in range(0, len(self.rows_to_ingest), self.copy_and_ingest_batch_size):
249            batch_number = i // self.copy_and_ingest_batch_size + 1
250            logging.info(
251                f"Starting batch {batch_number} of {total_batches} for copy to temp and ingest")
252            ingest_metadata_batch = self.rows_to_ingest[i:i +
253                                                        self.copy_and_ingest_batch_size]
254            files_to_copy_batch = self.row_files_to_copy[i:i +
255                                                         self.copy_and_ingest_batch_size]
256            # files_to_copy_batch will be a list of lists of dicts, so flatten it
257            files_to_copy = [
258                file_dict for sublist in files_to_copy_batch for file_dict in sublist]
259
260            # Copy files to temp bucket if anything to copy
261            if files_to_copy:
262                gcp_functions.multithread_copy_of_files_with_validation(
263                    # Create dict with new names for copy of files to temp bucket
264                    files_to_copy=files_to_copy,
265                    workers=self.workers,
266                    max_retries=5
267                )
268
269            logging.info(
270                f"Batch {batch_number} of {total_batches} batches being ingested to dataset. "
271                f"{len(ingest_metadata_batch)} total rows in current ingest."
272            )
273            # Ingest renamed files into dataset
274            StartAndMonitorIngest(
275                tdr=self.tdr,
276                ingest_records=ingest_metadata_batch,
277                target_table_name=self.target_table_name,
278                dataset_id=self.dataset_id,
279                load_tag=f"{self.target_table_name}_re-ingest",
280                bulk_mode=False,
281                update_strategy=self.update_strategy,
282                waiting_time_to_poll=self.wait_time_to_poll,
283            ).run()
284
285            # Delete files from temp bucket
286            # Create list of files in temp location to delete. Full destination path is the temp location from copy
287            files_to_delete = [file_dict['full_destination_path']
288                               for file_dict in files_to_copy]
289            gcp_functions.delete_multiple_files(
290                # Create list of files in temp location to delete. Full destination path is the temp location from copy
291                files_to_delete=files_to_delete,
292                workers=self.workers,
293            )

Class to perform batch copy and ingest of files to TDR.

BatchCopyAndIngest( rows_to_ingest: list[dict], tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, target_table_name: str, update_strategy: str, dataset_id: str, row_files_to_copy: list[list[dict]], copy_and_ingest_batch_size: int = 500, workers: int = 10, wait_time_to_poll: int = 90)
188    def __init__(
189            self,
190            rows_to_ingest: list[dict],
191            tdr: TDR,
192            target_table_name: str,
193            update_strategy: str,
194            dataset_id: str,
195            row_files_to_copy: list[list[dict]],
196            copy_and_ingest_batch_size: int = ARG_DEFAULTS["file_ingest_batch_size"],  # type: ignore[assignment]
197            workers: int = ARG_DEFAULTS["multithread_workers"],  # type: ignore[assignment]
198            wait_time_to_poll: int = ARG_DEFAULTS['waiting_time_to_poll']  # type: ignore[assignment]
199    ) -> None:
200        """
201        Initialize the BatchCopyAndIngest class.
202
203        **Args:**
204        - rows_to_ingest (list[dict]): The list of rows to ingest.
205        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API.
206        - target_table_name (str): The name of the target table.
207        - update_strategy (str): Strategy for updating the data.
208        - dataset_id (str): The dataset ID.
209        - row_files_to_copy (list[list[dict]]): List of files to copy for each row.
210        - copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to `500`.
211        - workers (int, optional): Number of workers for parallel processing copies of files to temp location.
212        Defaults to `10`
213        - wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to `90`.
214        """
215        self.rows_to_ingest = rows_to_ingest
216        """@private"""
217        self.tdr = tdr
218        """@private"""
219        self.target_table_name = target_table_name
220        """@private"""
221        self.update_strategy = update_strategy
222        """@private"""
223        self.dataset_id = dataset_id
224        """@private"""
225        self.row_files_to_copy = row_files_to_copy
226        """@private"""
227        self.copy_and_ingest_batch_size = copy_and_ingest_batch_size
228        """@private"""
229        self.workers = workers
230        """@private"""
231        self.wait_time_to_poll = wait_time_to_poll
232        """@private"""

Initialize the BatchCopyAndIngest class.

Args:

  • rows_to_ingest (list[dict]): The list of rows to ingest.
  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): TDR instance for interacting with the TDR API.
  • target_table_name (str): The name of the target table.
  • update_strategy (str): Strategy for updating the data.
  • dataset_id (str): The dataset ID.
  • row_files_to_copy (list[list[dict]]): List of files to copy for each row.
  • copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to 500.
  • workers (int, optional): Number of workers for parallel processing copies of files to temp location. Defaults to 10
  • wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to 90.
def run(self) -> None:
234    def run(self) -> None:
235        """
236        Run the batch copy and ingest process. 
237        
238        Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion.
239        """
240        # Batch through rows to copy files down and ingest so if script fails partway through large
241        # copy and ingest it will have copied over and ingested some of the files already
242        logging.info(
243            f"Batching {len(self.rows_to_ingest)} total rows into batches of {self.copy_and_ingest_batch_size} " +
244            "for copying to temp location and ingest"
245        )
246        total_batches = len(self.rows_to_ingest) // self.copy_and_ingest_batch_size + 1
247        gcp_functions = GCPCloudFunctions()
248        for i in range(0, len(self.rows_to_ingest), self.copy_and_ingest_batch_size):
249            batch_number = i // self.copy_and_ingest_batch_size + 1
250            logging.info(
251                f"Starting batch {batch_number} of {total_batches} for copy to temp and ingest")
252            ingest_metadata_batch = self.rows_to_ingest[i:i +
253                                                        self.copy_and_ingest_batch_size]
254            files_to_copy_batch = self.row_files_to_copy[i:i +
255                                                         self.copy_and_ingest_batch_size]
256            # files_to_copy_batch will be a list of lists of dicts, so flatten it
257            files_to_copy = [
258                file_dict for sublist in files_to_copy_batch for file_dict in sublist]
259
260            # Copy files to temp bucket if anything to copy
261            if files_to_copy:
262                gcp_functions.multithread_copy_of_files_with_validation(
263                    # Create dict with new names for copy of files to temp bucket
264                    files_to_copy=files_to_copy,
265                    workers=self.workers,
266                    max_retries=5
267                )
268
269            logging.info(
270                f"Batch {batch_number} of {total_batches} batches being ingested to dataset. "
271                f"{len(ingest_metadata_batch)} total rows in current ingest."
272            )
273            # Ingest renamed files into dataset
274            StartAndMonitorIngest(
275                tdr=self.tdr,
276                ingest_records=ingest_metadata_batch,
277                target_table_name=self.target_table_name,
278                dataset_id=self.dataset_id,
279                load_tag=f"{self.target_table_name}_re-ingest",
280                bulk_mode=False,
281                update_strategy=self.update_strategy,
282                waiting_time_to_poll=self.wait_time_to_poll,
283            ).run()
284
285            # Delete files from temp bucket
286            # Create list of files in temp location to delete. Full destination path is the temp location from copy
287            files_to_delete = [file_dict['full_destination_path']
288                               for file_dict in files_to_copy]
289            gcp_functions.delete_multiple_files(
290                # Create list of files in temp location to delete. Full destination path is the temp location from copy
291                files_to_delete=files_to_delete,
292                workers=self.workers,
293            )

Run the batch copy and ingest process.

Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion.