ops_utils.tdr_utils.renaming_util
Module for renaming files in TDR and updating metadata.
1"""Module for renaming files in TDR and updating metadata.""" 2import logging 3import os 4from typing import Optional, Tuple 5from .tdr_ingest_utils import StartAndMonitorIngest 6from .tdr_api_utils import TDR 7from ..gcp_utils import GCPCloudFunctions 8from ..vars import ARG_DEFAULTS 9 10 11class GetRowAndFileInfoForReingest: 12 """Class to obtain file and row information for re-ingesting files in TDR.""" 13 14 def __init__( 15 self, 16 table_schema_info: dict, 17 files_info: dict, 18 table_metrics: list[dict], 19 original_column: str, 20 new_column: str, 21 row_identifier: str, 22 temp_bucket: str, 23 update_original_column: bool = False, 24 column_update_only: bool = False 25 ): 26 """ 27 Initialize the GetRowAndFileInfoForReingest class. 28 29 **Args:** 30 - table_schema_info (dict): Schema information of the table 31 - files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata. 32 - table_metrics (list[dict]): Metrics of the TDR table to update. 33 - original_column (str): The column name with the original value. 34 - new_column (str): The column name with the new value. 35 - row_identifier (str): The identifier for the row. Should be the primary key. 36 - temp_bucket (str): The temporary bucket for storing files. 37 - update_original_column (bool, optional): Whether to update the original column. 38 If not used will just update file paths. Defaults to `False` 39 - column_update_only (bool, optional): Whether to update only the column and 40 not update the file paths. Defaults to `False`. 41 """ 42 self.table_schema_info = table_schema_info 43 """@private""" 44 self.files_info = files_info 45 """@private""" 46 self.table_metrics = table_metrics 47 """@private""" 48 self.original_column = original_column 49 """@private""" 50 self.new_column = new_column 51 """@private""" 52 self.row_identifier = row_identifier 53 """@private""" 54 self.total_files_to_reingest = 0 55 """@private""" 56 self.temp_bucket = temp_bucket 57 """@private""" 58 self.update_original_column = update_original_column 59 """@private""" 60 self.column_update_only = column_update_only 61 """@private""" 62 63 def _create_paths(self, file_info: dict, og_basename: str, new_basename: str) -> Tuple[str, str, str]: 64 """ 65 Create paths for the file in TDR, updated TDR metadata, and temporary storage. 66 67 Args: 68 file_info (dict): Information about the file returned from TDR. 69 og_basename (str): The original basename of the file. 70 new_basename (str): The new basename of the file. 71 72 Returns: 73 Tuple[str, str, str]: Paths for temporary storage, updated TDR metadata, and access URL. 74 """ 75 # Access url is the full path to the file in TDR 76 access_url = file_info["fileDetail"]["accessUrl"] 77 # Get basename of file 78 file_name = os.path.basename(access_url) 79 file_safe_new_basename = new_basename.replace(" ", "_") 80 # Replace basename with new basename and replace spaces with underscores 81 new_file_name = file_name.replace( 82 f'{og_basename}.', f'{file_safe_new_basename}.') 83 # get tdr path. Not real path, just the metadata 84 tdr_file_path = file_info["path"] 85 # Create full path to updated tdr metadata file path 86 updated_tdr_metadata_path = os.path.join( 87 os.path.dirname(tdr_file_path), new_file_name) 88 access_url_without_bucket = access_url.split('gs://')[1] 89 temp_path = os.path.join(self.temp_bucket, os.path.dirname( 90 access_url_without_bucket), new_file_name) 91 return temp_path, updated_tdr_metadata_path, access_url 92 93 def _create_row_dict( 94 self, row_dict: dict, file_ref_columns: list[str] 95 ) -> Tuple[Optional[dict], Optional[list[dict]]]: 96 """ 97 Go through each row and check each cell if it is a file and if it needs to be re-ingested. 98 99 If so, create a new row dict with the new file path. 100 101 Args: 102 row_dict (dict): The original row dictionary. 103 file_ref_columns (list[str]): List of columns that are file references. 104 105 Returns: 106 Tuple[Optional[dict], Optional[list[dict]]]: New row dictionary and list of files to copy, 107 or None if no re-ingestion is needed. 108 """ 109 reingest_row = False 110 # Create new dictionary for ingest just the row identifier so can merge with right row later 111 new_row_dict = {self.row_identifier: row_dict[self.row_identifier]} 112 # Create list of all files for copy to temp location 113 temp_copy_list = [] 114 # Get basename to replace 115 og_basename = row_dict[self.original_column] 116 new_basename = row_dict[self.new_column] 117 # If the new basename is the same as the old one, don't do anything 118 if og_basename == new_basename: 119 return None, None 120 for column_name in row_dict: 121 # Check if column is a fileref, cell is not empty, and update is not for columns only (not files) 122 if column_name in file_ref_columns and row_dict[column_name] and not self.column_update_only: 123 # Get full file info for that cell 124 file_info = self.files_info.get(row_dict[column_name]) 125 # Get potential temp path, updated tdr metadata path, and access url for file 126 temp_path, updated_tdr_metadata_path, access_url = self._create_paths( 127 file_info, og_basename, new_basename # type: ignore[arg-type] 128 ) 129 # Check if access_url starts with og basename and then . 130 if os.path.basename(access_url).startswith(f"{og_basename}."): 131 self.total_files_to_reingest += 1 132 # Add to ingest row dict to ingest from temp location with updated name 133 new_row_dict[column_name] = { 134 # temp path is the full path to the renamed file in the temp bucket 135 "sourcePath": temp_path, 136 # New target path with updated basename 137 "targetPath": updated_tdr_metadata_path, 138 } 139 # Add to copy list for copying and renaming file currently in TDR 140 temp_copy_list.append( 141 { 142 "source_file": access_url, 143 "full_destination_path": temp_path 144 } 145 ) 146 # Set reingest row to True because files need to be updated 147 reingest_row = True 148 # If column to update is set and column name is the column to update 149 # then update the new row dict with the new file basename 150 elif self.update_original_column and column_name == self.original_column: 151 new_row_dict[column_name] = row_dict[self.new_column] 152 reingest_row = True 153 if reingest_row: 154 return new_row_dict, temp_copy_list 155 else: 156 return None, None 157 158 def get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]: 159 """ 160 Get the list of rows to re-ingest and files to copy to temporary storage. 161 162 Iterates through the table metrics, identifies the rows and files that need to be re-ingested, 163 and prepares lists of these rows and files. 164 165 **Returns:** 166 - Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy. 167 """ 168 rows_to_reingest = [] 169 files_to_copy_to_temp = [] 170 # Get all columns in the table that are file references 171 file_ref_columns = [ 172 col['name'] for col in self.table_schema_info['columns'] if col['datatype'] == 'fileref'] 173 for row_dict in self.table_metrics: 174 new_row_dict, temp_copy_list = self._create_row_dict(row_dict, file_ref_columns) 175 # If there is something to copy and update 176 if new_row_dict and temp_copy_list: 177 rows_to_reingest.append(new_row_dict) 178 files_to_copy_to_temp.append(temp_copy_list) 179 logging.info(f"Total rows to re-ingest: {len(rows_to_reingest)}") 180 logging.info(f"Total files to copy and re-ingest: {self.total_files_to_reingest}") 181 return rows_to_reingest, files_to_copy_to_temp 182 183 184class BatchCopyAndIngest: 185 """Class to perform batch copy and ingest of files to TDR.""" 186 187 def __init__( 188 self, 189 rows_to_ingest: list[dict], 190 tdr: TDR, 191 target_table_name: str, 192 update_strategy: str, 193 dataset_id: str, 194 row_files_to_copy: list[list[dict]], 195 copy_and_ingest_batch_size: int = ARG_DEFAULTS["file_ingest_batch_size"], # type: ignore[assignment] 196 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 197 wait_time_to_poll: int = ARG_DEFAULTS['waiting_time_to_poll'] # type: ignore[assignment] 198 ) -> None: 199 """ 200 Initialize the BatchCopyAndIngest class. 201 202 **Args:** 203 - rows_to_ingest (list[dict]): The list of rows to ingest. 204 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API. 205 - target_table_name (str): The name of the target table. 206 - update_strategy (str): Strategy for updating the data. 207 - dataset_id (str): The dataset ID. 208 - row_files_to_copy (list[list[dict]]): List of files to copy for each row. 209 - copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to `500`. 210 - workers (int, optional): Number of workers for parallel processing copies of files to temp location. 211 Defaults to `10` 212 - wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to `90`. 213 """ 214 self.rows_to_ingest = rows_to_ingest 215 """@private""" 216 self.tdr = tdr 217 """@private""" 218 self.target_table_name = target_table_name 219 """@private""" 220 self.update_strategy = update_strategy 221 """@private""" 222 self.dataset_id = dataset_id 223 """@private""" 224 self.row_files_to_copy = row_files_to_copy 225 """@private""" 226 self.copy_and_ingest_batch_size = copy_and_ingest_batch_size 227 """@private""" 228 self.workers = workers 229 """@private""" 230 self.wait_time_to_poll = wait_time_to_poll 231 """@private""" 232 233 def run(self) -> None: 234 """ 235 Run the batch copy and ingest process. 236 237 Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion. 238 """ 239 # Batch through rows to copy files down and ingest so if script fails partway through large 240 # copy and ingest it will have copied over and ingested some of the files already 241 logging.info( 242 f"Batching {len(self.rows_to_ingest)} total rows into batches of {self.copy_and_ingest_batch_size} " + 243 "for copying to temp location and ingest" 244 ) 245 total_batches = len(self.rows_to_ingest) // self.copy_and_ingest_batch_size + 1 246 gcp_functions = GCPCloudFunctions() 247 for i in range(0, len(self.rows_to_ingest), self.copy_and_ingest_batch_size): 248 batch_number = i // self.copy_and_ingest_batch_size + 1 249 logging.info( 250 f"Starting batch {batch_number} of {total_batches} for copy to temp and ingest") 251 ingest_metadata_batch = self.rows_to_ingest[i:i + 252 self.copy_and_ingest_batch_size] 253 files_to_copy_batch = self.row_files_to_copy[i:i + 254 self.copy_and_ingest_batch_size] 255 # files_to_copy_batch will be a list of lists of dicts, so flatten it 256 files_to_copy = [ 257 file_dict for sublist in files_to_copy_batch for file_dict in sublist] 258 259 # Copy files to temp bucket if anything to copy 260 if files_to_copy: 261 gcp_functions.multithread_copy_of_files_with_validation( 262 # Create dict with new names for copy of files to temp bucket 263 files_to_copy=files_to_copy, 264 workers=self.workers, 265 max_retries=5 266 ) 267 268 logging.info( 269 f"Batch {batch_number} of {total_batches} batches being ingested to dataset. " 270 f"{len(ingest_metadata_batch)} total rows in current ingest." 271 ) 272 # Ingest renamed files into dataset 273 StartAndMonitorIngest( 274 tdr=self.tdr, 275 ingest_records=ingest_metadata_batch, 276 target_table_name=self.target_table_name, 277 dataset_id=self.dataset_id, 278 load_tag=f"{self.target_table_name}_re-ingest", 279 bulk_mode=False, 280 update_strategy=self.update_strategy, 281 waiting_time_to_poll=self.wait_time_to_poll, 282 ).run() 283 284 # Delete files from temp bucket 285 # Create list of files in temp location to delete. Full destination path is the temp location from copy 286 files_to_delete = [file_dict['full_destination_path'] 287 for file_dict in files_to_copy] 288 gcp_functions.delete_multiple_files( 289 # Create list of files in temp location to delete. Full destination path is the temp location from copy 290 files_to_delete=files_to_delete, 291 workers=self.workers, 292 )
class
GetRowAndFileInfoForReingest:
12class GetRowAndFileInfoForReingest: 13 """Class to obtain file and row information for re-ingesting files in TDR.""" 14 15 def __init__( 16 self, 17 table_schema_info: dict, 18 files_info: dict, 19 table_metrics: list[dict], 20 original_column: str, 21 new_column: str, 22 row_identifier: str, 23 temp_bucket: str, 24 update_original_column: bool = False, 25 column_update_only: bool = False 26 ): 27 """ 28 Initialize the GetRowAndFileInfoForReingest class. 29 30 **Args:** 31 - table_schema_info (dict): Schema information of the table 32 - files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata. 33 - table_metrics (list[dict]): Metrics of the TDR table to update. 34 - original_column (str): The column name with the original value. 35 - new_column (str): The column name with the new value. 36 - row_identifier (str): The identifier for the row. Should be the primary key. 37 - temp_bucket (str): The temporary bucket for storing files. 38 - update_original_column (bool, optional): Whether to update the original column. 39 If not used will just update file paths. Defaults to `False` 40 - column_update_only (bool, optional): Whether to update only the column and 41 not update the file paths. Defaults to `False`. 42 """ 43 self.table_schema_info = table_schema_info 44 """@private""" 45 self.files_info = files_info 46 """@private""" 47 self.table_metrics = table_metrics 48 """@private""" 49 self.original_column = original_column 50 """@private""" 51 self.new_column = new_column 52 """@private""" 53 self.row_identifier = row_identifier 54 """@private""" 55 self.total_files_to_reingest = 0 56 """@private""" 57 self.temp_bucket = temp_bucket 58 """@private""" 59 self.update_original_column = update_original_column 60 """@private""" 61 self.column_update_only = column_update_only 62 """@private""" 63 64 def _create_paths(self, file_info: dict, og_basename: str, new_basename: str) -> Tuple[str, str, str]: 65 """ 66 Create paths for the file in TDR, updated TDR metadata, and temporary storage. 67 68 Args: 69 file_info (dict): Information about the file returned from TDR. 70 og_basename (str): The original basename of the file. 71 new_basename (str): The new basename of the file. 72 73 Returns: 74 Tuple[str, str, str]: Paths for temporary storage, updated TDR metadata, and access URL. 75 """ 76 # Access url is the full path to the file in TDR 77 access_url = file_info["fileDetail"]["accessUrl"] 78 # Get basename of file 79 file_name = os.path.basename(access_url) 80 file_safe_new_basename = new_basename.replace(" ", "_") 81 # Replace basename with new basename and replace spaces with underscores 82 new_file_name = file_name.replace( 83 f'{og_basename}.', f'{file_safe_new_basename}.') 84 # get tdr path. Not real path, just the metadata 85 tdr_file_path = file_info["path"] 86 # Create full path to updated tdr metadata file path 87 updated_tdr_metadata_path = os.path.join( 88 os.path.dirname(tdr_file_path), new_file_name) 89 access_url_without_bucket = access_url.split('gs://')[1] 90 temp_path = os.path.join(self.temp_bucket, os.path.dirname( 91 access_url_without_bucket), new_file_name) 92 return temp_path, updated_tdr_metadata_path, access_url 93 94 def _create_row_dict( 95 self, row_dict: dict, file_ref_columns: list[str] 96 ) -> Tuple[Optional[dict], Optional[list[dict]]]: 97 """ 98 Go through each row and check each cell if it is a file and if it needs to be re-ingested. 99 100 If so, create a new row dict with the new file path. 101 102 Args: 103 row_dict (dict): The original row dictionary. 104 file_ref_columns (list[str]): List of columns that are file references. 105 106 Returns: 107 Tuple[Optional[dict], Optional[list[dict]]]: New row dictionary and list of files to copy, 108 or None if no re-ingestion is needed. 109 """ 110 reingest_row = False 111 # Create new dictionary for ingest just the row identifier so can merge with right row later 112 new_row_dict = {self.row_identifier: row_dict[self.row_identifier]} 113 # Create list of all files for copy to temp location 114 temp_copy_list = [] 115 # Get basename to replace 116 og_basename = row_dict[self.original_column] 117 new_basename = row_dict[self.new_column] 118 # If the new basename is the same as the old one, don't do anything 119 if og_basename == new_basename: 120 return None, None 121 for column_name in row_dict: 122 # Check if column is a fileref, cell is not empty, and update is not for columns only (not files) 123 if column_name in file_ref_columns and row_dict[column_name] and not self.column_update_only: 124 # Get full file info for that cell 125 file_info = self.files_info.get(row_dict[column_name]) 126 # Get potential temp path, updated tdr metadata path, and access url for file 127 temp_path, updated_tdr_metadata_path, access_url = self._create_paths( 128 file_info, og_basename, new_basename # type: ignore[arg-type] 129 ) 130 # Check if access_url starts with og basename and then . 131 if os.path.basename(access_url).startswith(f"{og_basename}."): 132 self.total_files_to_reingest += 1 133 # Add to ingest row dict to ingest from temp location with updated name 134 new_row_dict[column_name] = { 135 # temp path is the full path to the renamed file in the temp bucket 136 "sourcePath": temp_path, 137 # New target path with updated basename 138 "targetPath": updated_tdr_metadata_path, 139 } 140 # Add to copy list for copying and renaming file currently in TDR 141 temp_copy_list.append( 142 { 143 "source_file": access_url, 144 "full_destination_path": temp_path 145 } 146 ) 147 # Set reingest row to True because files need to be updated 148 reingest_row = True 149 # If column to update is set and column name is the column to update 150 # then update the new row dict with the new file basename 151 elif self.update_original_column and column_name == self.original_column: 152 new_row_dict[column_name] = row_dict[self.new_column] 153 reingest_row = True 154 if reingest_row: 155 return new_row_dict, temp_copy_list 156 else: 157 return None, None 158 159 def get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]: 160 """ 161 Get the list of rows to re-ingest and files to copy to temporary storage. 162 163 Iterates through the table metrics, identifies the rows and files that need to be re-ingested, 164 and prepares lists of these rows and files. 165 166 **Returns:** 167 - Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy. 168 """ 169 rows_to_reingest = [] 170 files_to_copy_to_temp = [] 171 # Get all columns in the table that are file references 172 file_ref_columns = [ 173 col['name'] for col in self.table_schema_info['columns'] if col['datatype'] == 'fileref'] 174 for row_dict in self.table_metrics: 175 new_row_dict, temp_copy_list = self._create_row_dict(row_dict, file_ref_columns) 176 # If there is something to copy and update 177 if new_row_dict and temp_copy_list: 178 rows_to_reingest.append(new_row_dict) 179 files_to_copy_to_temp.append(temp_copy_list) 180 logging.info(f"Total rows to re-ingest: {len(rows_to_reingest)}") 181 logging.info(f"Total files to copy and re-ingest: {self.total_files_to_reingest}") 182 return rows_to_reingest, files_to_copy_to_temp
Class to obtain file and row information for re-ingesting files in TDR.
GetRowAndFileInfoForReingest( table_schema_info: dict, files_info: dict, table_metrics: list[dict], original_column: str, new_column: str, row_identifier: str, temp_bucket: str, update_original_column: bool = False, column_update_only: bool = False)
15 def __init__( 16 self, 17 table_schema_info: dict, 18 files_info: dict, 19 table_metrics: list[dict], 20 original_column: str, 21 new_column: str, 22 row_identifier: str, 23 temp_bucket: str, 24 update_original_column: bool = False, 25 column_update_only: bool = False 26 ): 27 """ 28 Initialize the GetRowAndFileInfoForReingest class. 29 30 **Args:** 31 - table_schema_info (dict): Schema information of the table 32 - files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata. 33 - table_metrics (list[dict]): Metrics of the TDR table to update. 34 - original_column (str): The column name with the original value. 35 - new_column (str): The column name with the new value. 36 - row_identifier (str): The identifier for the row. Should be the primary key. 37 - temp_bucket (str): The temporary bucket for storing files. 38 - update_original_column (bool, optional): Whether to update the original column. 39 If not used will just update file paths. Defaults to `False` 40 - column_update_only (bool, optional): Whether to update only the column and 41 not update the file paths. Defaults to `False`. 42 """ 43 self.table_schema_info = table_schema_info 44 """@private""" 45 self.files_info = files_info 46 """@private""" 47 self.table_metrics = table_metrics 48 """@private""" 49 self.original_column = original_column 50 """@private""" 51 self.new_column = new_column 52 """@private""" 53 self.row_identifier = row_identifier 54 """@private""" 55 self.total_files_to_reingest = 0 56 """@private""" 57 self.temp_bucket = temp_bucket 58 """@private""" 59 self.update_original_column = update_original_column 60 """@private""" 61 self.column_update_only = column_update_only 62 """@private"""
Initialize the GetRowAndFileInfoForReingest class.
Args:
- table_schema_info (dict): Schema information of the table
- files_info (dict): A dictionary where the key is the file UUID and the value is the file metadata.
- table_metrics (list[dict]): Metrics of the TDR table to update.
- original_column (str): The column name with the original value.
- new_column (str): The column name with the new value.
- row_identifier (str): The identifier for the row. Should be the primary key.
- temp_bucket (str): The temporary bucket for storing files.
- update_original_column (bool, optional): Whether to update the original column.
If not used will just update file paths. Defaults to
False
- column_update_only (bool, optional): Whether to update only the column and
not update the file paths. Defaults to
False
.
def
get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]:
159 def get_new_copy_and_ingest_list(self) -> Tuple[list[dict], list[list]]: 160 """ 161 Get the list of rows to re-ingest and files to copy to temporary storage. 162 163 Iterates through the table metrics, identifies the rows and files that need to be re-ingested, 164 and prepares lists of these rows and files. 165 166 **Returns:** 167 - Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy. 168 """ 169 rows_to_reingest = [] 170 files_to_copy_to_temp = [] 171 # Get all columns in the table that are file references 172 file_ref_columns = [ 173 col['name'] for col in self.table_schema_info['columns'] if col['datatype'] == 'fileref'] 174 for row_dict in self.table_metrics: 175 new_row_dict, temp_copy_list = self._create_row_dict(row_dict, file_ref_columns) 176 # If there is something to copy and update 177 if new_row_dict and temp_copy_list: 178 rows_to_reingest.append(new_row_dict) 179 files_to_copy_to_temp.append(temp_copy_list) 180 logging.info(f"Total rows to re-ingest: {len(rows_to_reingest)}") 181 logging.info(f"Total files to copy and re-ingest: {self.total_files_to_reingest}") 182 return rows_to_reingest, files_to_copy_to_temp
Get the list of rows to re-ingest and files to copy to temporary storage.
Iterates through the table metrics, identifies the rows and files that need to be re-ingested, and prepares lists of these rows and files.
Returns:
- Tuple[list[dict], list[list]]: A tuple containing a list of rows to re-ingest and a list of files to copy.
class
BatchCopyAndIngest:
185class BatchCopyAndIngest: 186 """Class to perform batch copy and ingest of files to TDR.""" 187 188 def __init__( 189 self, 190 rows_to_ingest: list[dict], 191 tdr: TDR, 192 target_table_name: str, 193 update_strategy: str, 194 dataset_id: str, 195 row_files_to_copy: list[list[dict]], 196 copy_and_ingest_batch_size: int = ARG_DEFAULTS["file_ingest_batch_size"], # type: ignore[assignment] 197 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 198 wait_time_to_poll: int = ARG_DEFAULTS['waiting_time_to_poll'] # type: ignore[assignment] 199 ) -> None: 200 """ 201 Initialize the BatchCopyAndIngest class. 202 203 **Args:** 204 - rows_to_ingest (list[dict]): The list of rows to ingest. 205 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API. 206 - target_table_name (str): The name of the target table. 207 - update_strategy (str): Strategy for updating the data. 208 - dataset_id (str): The dataset ID. 209 - row_files_to_copy (list[list[dict]]): List of files to copy for each row. 210 - copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to `500`. 211 - workers (int, optional): Number of workers for parallel processing copies of files to temp location. 212 Defaults to `10` 213 - wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to `90`. 214 """ 215 self.rows_to_ingest = rows_to_ingest 216 """@private""" 217 self.tdr = tdr 218 """@private""" 219 self.target_table_name = target_table_name 220 """@private""" 221 self.update_strategy = update_strategy 222 """@private""" 223 self.dataset_id = dataset_id 224 """@private""" 225 self.row_files_to_copy = row_files_to_copy 226 """@private""" 227 self.copy_and_ingest_batch_size = copy_and_ingest_batch_size 228 """@private""" 229 self.workers = workers 230 """@private""" 231 self.wait_time_to_poll = wait_time_to_poll 232 """@private""" 233 234 def run(self) -> None: 235 """ 236 Run the batch copy and ingest process. 237 238 Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion. 239 """ 240 # Batch through rows to copy files down and ingest so if script fails partway through large 241 # copy and ingest it will have copied over and ingested some of the files already 242 logging.info( 243 f"Batching {len(self.rows_to_ingest)} total rows into batches of {self.copy_and_ingest_batch_size} " + 244 "for copying to temp location and ingest" 245 ) 246 total_batches = len(self.rows_to_ingest) // self.copy_and_ingest_batch_size + 1 247 gcp_functions = GCPCloudFunctions() 248 for i in range(0, len(self.rows_to_ingest), self.copy_and_ingest_batch_size): 249 batch_number = i // self.copy_and_ingest_batch_size + 1 250 logging.info( 251 f"Starting batch {batch_number} of {total_batches} for copy to temp and ingest") 252 ingest_metadata_batch = self.rows_to_ingest[i:i + 253 self.copy_and_ingest_batch_size] 254 files_to_copy_batch = self.row_files_to_copy[i:i + 255 self.copy_and_ingest_batch_size] 256 # files_to_copy_batch will be a list of lists of dicts, so flatten it 257 files_to_copy = [ 258 file_dict for sublist in files_to_copy_batch for file_dict in sublist] 259 260 # Copy files to temp bucket if anything to copy 261 if files_to_copy: 262 gcp_functions.multithread_copy_of_files_with_validation( 263 # Create dict with new names for copy of files to temp bucket 264 files_to_copy=files_to_copy, 265 workers=self.workers, 266 max_retries=5 267 ) 268 269 logging.info( 270 f"Batch {batch_number} of {total_batches} batches being ingested to dataset. " 271 f"{len(ingest_metadata_batch)} total rows in current ingest." 272 ) 273 # Ingest renamed files into dataset 274 StartAndMonitorIngest( 275 tdr=self.tdr, 276 ingest_records=ingest_metadata_batch, 277 target_table_name=self.target_table_name, 278 dataset_id=self.dataset_id, 279 load_tag=f"{self.target_table_name}_re-ingest", 280 bulk_mode=False, 281 update_strategy=self.update_strategy, 282 waiting_time_to_poll=self.wait_time_to_poll, 283 ).run() 284 285 # Delete files from temp bucket 286 # Create list of files in temp location to delete. Full destination path is the temp location from copy 287 files_to_delete = [file_dict['full_destination_path'] 288 for file_dict in files_to_copy] 289 gcp_functions.delete_multiple_files( 290 # Create list of files in temp location to delete. Full destination path is the temp location from copy 291 files_to_delete=files_to_delete, 292 workers=self.workers, 293 )
Class to perform batch copy and ingest of files to TDR.
BatchCopyAndIngest( rows_to_ingest: list[dict], tdr: ops_utils.tdr_utils.tdr_api_utils.TDR, target_table_name: str, update_strategy: str, dataset_id: str, row_files_to_copy: list[list[dict]], copy_and_ingest_batch_size: int = 500, workers: int = 10, wait_time_to_poll: int = 90)
188 def __init__( 189 self, 190 rows_to_ingest: list[dict], 191 tdr: TDR, 192 target_table_name: str, 193 update_strategy: str, 194 dataset_id: str, 195 row_files_to_copy: list[list[dict]], 196 copy_and_ingest_batch_size: int = ARG_DEFAULTS["file_ingest_batch_size"], # type: ignore[assignment] 197 workers: int = ARG_DEFAULTS["multithread_workers"], # type: ignore[assignment] 198 wait_time_to_poll: int = ARG_DEFAULTS['waiting_time_to_poll'] # type: ignore[assignment] 199 ) -> None: 200 """ 201 Initialize the BatchCopyAndIngest class. 202 203 **Args:** 204 - rows_to_ingest (list[dict]): The list of rows to ingest. 205 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): TDR instance for interacting with the TDR API. 206 - target_table_name (str): The name of the target table. 207 - update_strategy (str): Strategy for updating the data. 208 - dataset_id (str): The dataset ID. 209 - row_files_to_copy (list[list[dict]]): List of files to copy for each row. 210 - copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to `500`. 211 - workers (int, optional): Number of workers for parallel processing copies of files to temp location. 212 Defaults to `10` 213 - wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to `90`. 214 """ 215 self.rows_to_ingest = rows_to_ingest 216 """@private""" 217 self.tdr = tdr 218 """@private""" 219 self.target_table_name = target_table_name 220 """@private""" 221 self.update_strategy = update_strategy 222 """@private""" 223 self.dataset_id = dataset_id 224 """@private""" 225 self.row_files_to_copy = row_files_to_copy 226 """@private""" 227 self.copy_and_ingest_batch_size = copy_and_ingest_batch_size 228 """@private""" 229 self.workers = workers 230 """@private""" 231 self.wait_time_to_poll = wait_time_to_poll 232 """@private"""
Initialize the BatchCopyAndIngest class.
Args:
- rows_to_ingest (list[dict]): The list of rows to ingest.
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): TDR instance for interacting with the TDR API. - target_table_name (str): The name of the target table.
- update_strategy (str): Strategy for updating the data.
- dataset_id (str): The dataset ID.
- row_files_to_copy (list[list[dict]]): List of files to copy for each row.
- copy_and_ingest_batch_size (int): Size of each batch for copying and ingesting. Defaults to
500
. - workers (int, optional): Number of workers for parallel processing copies of files to temp location.
Defaults to
10
- wait_time_to_poll (int, optional): Time to wait between polling for ingest status. Defaults to
90
.
def
run(self) -> None:
234 def run(self) -> None: 235 """ 236 Run the batch copy and ingest process. 237 238 Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion. 239 """ 240 # Batch through rows to copy files down and ingest so if script fails partway through large 241 # copy and ingest it will have copied over and ingested some of the files already 242 logging.info( 243 f"Batching {len(self.rows_to_ingest)} total rows into batches of {self.copy_and_ingest_batch_size} " + 244 "for copying to temp location and ingest" 245 ) 246 total_batches = len(self.rows_to_ingest) // self.copy_and_ingest_batch_size + 1 247 gcp_functions = GCPCloudFunctions() 248 for i in range(0, len(self.rows_to_ingest), self.copy_and_ingest_batch_size): 249 batch_number = i // self.copy_and_ingest_batch_size + 1 250 logging.info( 251 f"Starting batch {batch_number} of {total_batches} for copy to temp and ingest") 252 ingest_metadata_batch = self.rows_to_ingest[i:i + 253 self.copy_and_ingest_batch_size] 254 files_to_copy_batch = self.row_files_to_copy[i:i + 255 self.copy_and_ingest_batch_size] 256 # files_to_copy_batch will be a list of lists of dicts, so flatten it 257 files_to_copy = [ 258 file_dict for sublist in files_to_copy_batch for file_dict in sublist] 259 260 # Copy files to temp bucket if anything to copy 261 if files_to_copy: 262 gcp_functions.multithread_copy_of_files_with_validation( 263 # Create dict with new names for copy of files to temp bucket 264 files_to_copy=files_to_copy, 265 workers=self.workers, 266 max_retries=5 267 ) 268 269 logging.info( 270 f"Batch {batch_number} of {total_batches} batches being ingested to dataset. " 271 f"{len(ingest_metadata_batch)} total rows in current ingest." 272 ) 273 # Ingest renamed files into dataset 274 StartAndMonitorIngest( 275 tdr=self.tdr, 276 ingest_records=ingest_metadata_batch, 277 target_table_name=self.target_table_name, 278 dataset_id=self.dataset_id, 279 load_tag=f"{self.target_table_name}_re-ingest", 280 bulk_mode=False, 281 update_strategy=self.update_strategy, 282 waiting_time_to_poll=self.wait_time_to_poll, 283 ).run() 284 285 # Delete files from temp bucket 286 # Create list of files in temp location to delete. Full destination path is the temp location from copy 287 files_to_delete = [file_dict['full_destination_path'] 288 for file_dict in files_to_copy] 289 gcp_functions.delete_multiple_files( 290 # Create list of files in temp location to delete. Full destination path is the temp location from copy 291 files_to_delete=files_to_delete, 292 workers=self.workers, 293 )
Run the batch copy and ingest process.
Batches the rows to copy files and ingests them into the dataset. Also deletes the temporary files after ingestion.