ops_utils.tdr_utils.tdr_job_utils
Utilities for interacting with TDR jobs.
1"""Utilities for interacting with TDR jobs.""" 2import json 3import logging 4import time 5from typing import Callable, Optional, Any 6 7from ..vars import ARG_DEFAULTS 8 9 10class MonitorTDRJob: 11 """A class to monitor the status of a TDR job until completion.""" 12 13 def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool): 14 """ 15 Initialize the MonitorTDRJob class. 16 17 **Args:** 18 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 19 - job_id (str): The ID of the job to be monitored. 20 - check_interval (int): The interval in seconds to wait between status checks. 21 - return_json (bool): Whether to get and return the result of the job as json. 22 """ 23 self.tdr = tdr 24 """@private""" 25 self.job_id = job_id 26 """@private""" 27 self.check_interval = check_interval 28 """@private""" 29 self.return_json = return_json 30 """@private""" 31 32 def _raise_for_failed_job(self) -> None: 33 """ 34 Raise an error with useful information if the job has failed. 35 36 Raises: 37 ValueError: If the job has failed. 38 """ 39 job_result = self.tdr.get_job_result(self.job_id, expect_failure=True) 40 raise Exception( 41 f"Status code {job_result.status_code}: {job_result.text}") 42 43 def run(self) -> Optional[dict]: 44 """ 45 Monitor the job until completion. 46 47 **Returns:** 48 - dict: The job results 49 """ 50 while True: 51 ingest_response = self.tdr.get_job_status(self.job_id) 52 if ingest_response.status_code == 202: 53 logging.info(f"TDR job {self.job_id} is still running") 54 # Check every x seconds if ingest is still running 55 time.sleep(self.check_interval) 56 elif ingest_response.status_code == 200: 57 response_json = json.loads(ingest_response.text) 58 if response_json["job_status"] == "succeeded": 59 logging.info(f"TDR job {self.job_id} succeeded") 60 if self.return_json: 61 request = self.tdr.get_job_result(self.job_id) 62 return json.loads(request.text) 63 # If not returning json, return None 64 return None 65 else: 66 logging.error(f"TDR job {self.job_id} failed") 67 self._raise_for_failed_job() 68 else: 69 logging.error(f"TDR job {self.job_id} failed") 70 self._raise_for_failed_job() 71 72 73class SubmitAndMonitorMultipleJobs: 74 """A class to batch submit and monitor TDR jobs.""" 75 76 def __init__( 77 self, tdr: Any, 78 job_function: Callable, 79 job_args_list: list[tuple], 80 batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] 81 check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 82 verbose: bool = False 83 ): 84 """ 85 Initialize the SubmitAndMonitorMultipleJobs class. 86 87 **Args:** 88 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 89 - job_function (Callable): The function to submit a job. 90 - job_args_list (list[tuple]): A list of tuples containing the arguments for each job. 91 - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`. 92 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`. 93 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 94 """ 95 self.tdr = tdr 96 """@private""" 97 self.job_function = job_function 98 """@private""" 99 self.job_args_list = job_args_list 100 """@private""" 101 self.batch_size = batch_size 102 """@private""" 103 self.check_interval = check_interval 104 """@private""" 105 self.verbose = verbose 106 """@private""" 107 108 def run(self) -> None: 109 """ 110 Run the process to submit and monitor multiple jobs in batches. 111 112 Logs the progress and status of each batch and job. 113 """ 114 total_jobs = len(self.job_args_list) 115 logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") 116 117 # Process jobs in batches 118 for i in range(0, total_jobs, self.batch_size): 119 job_ids = [] 120 current_batch = self.job_args_list[i:i + self.batch_size] 121 logging.info( 122 f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." 123 ) 124 125 # Submit jobs for the current batch 126 for job_args in current_batch: 127 # Submit job with arguments and store the job ID 128 job_id = self.job_function(*job_args).json()["id"] 129 if self.verbose: 130 logging.info(f"Submitted job {job_id} with args {job_args}") 131 job_ids.append(job_id) 132 133 # Monitor jobs for the current batch 134 logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") 135 for job_id in job_ids: 136 MonitorTDRJob( 137 tdr=self.tdr, 138 job_id=job_id, 139 check_interval=self.check_interval, 140 return_json=False 141 ).run() 142 143 logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") 144 145 logging.info(f"Successfully processed {total_jobs} jobs.")
class
MonitorTDRJob:
11class MonitorTDRJob: 12 """A class to monitor the status of a TDR job until completion.""" 13 14 def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool): 15 """ 16 Initialize the MonitorTDRJob class. 17 18 **Args:** 19 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 20 - job_id (str): The ID of the job to be monitored. 21 - check_interval (int): The interval in seconds to wait between status checks. 22 - return_json (bool): Whether to get and return the result of the job as json. 23 """ 24 self.tdr = tdr 25 """@private""" 26 self.job_id = job_id 27 """@private""" 28 self.check_interval = check_interval 29 """@private""" 30 self.return_json = return_json 31 """@private""" 32 33 def _raise_for_failed_job(self) -> None: 34 """ 35 Raise an error with useful information if the job has failed. 36 37 Raises: 38 ValueError: If the job has failed. 39 """ 40 job_result = self.tdr.get_job_result(self.job_id, expect_failure=True) 41 raise Exception( 42 f"Status code {job_result.status_code}: {job_result.text}") 43 44 def run(self) -> Optional[dict]: 45 """ 46 Monitor the job until completion. 47 48 **Returns:** 49 - dict: The job results 50 """ 51 while True: 52 ingest_response = self.tdr.get_job_status(self.job_id) 53 if ingest_response.status_code == 202: 54 logging.info(f"TDR job {self.job_id} is still running") 55 # Check every x seconds if ingest is still running 56 time.sleep(self.check_interval) 57 elif ingest_response.status_code == 200: 58 response_json = json.loads(ingest_response.text) 59 if response_json["job_status"] == "succeeded": 60 logging.info(f"TDR job {self.job_id} succeeded") 61 if self.return_json: 62 request = self.tdr.get_job_result(self.job_id) 63 return json.loads(request.text) 64 # If not returning json, return None 65 return None 66 else: 67 logging.error(f"TDR job {self.job_id} failed") 68 self._raise_for_failed_job() 69 else: 70 logging.error(f"TDR job {self.job_id} failed") 71 self._raise_for_failed_job()
A class to monitor the status of a TDR job until completion.
MonitorTDRJob(tdr: Any, job_id: str, check_interval: int, return_json: bool)
14 def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool): 15 """ 16 Initialize the MonitorTDRJob class. 17 18 **Args:** 19 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 20 - job_id (str): The ID of the job to be monitored. 21 - check_interval (int): The interval in seconds to wait between status checks. 22 - return_json (bool): Whether to get and return the result of the job as json. 23 """ 24 self.tdr = tdr 25 """@private""" 26 self.job_id = job_id 27 """@private""" 28 self.check_interval = check_interval 29 """@private""" 30 self.return_json = return_json 31 """@private"""
Initialize the MonitorTDRJob class.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - job_id (str): The ID of the job to be monitored.
- check_interval (int): The interval in seconds to wait between status checks.
- return_json (bool): Whether to get and return the result of the job as json.
def
run(self) -> Optional[dict]:
44 def run(self) -> Optional[dict]: 45 """ 46 Monitor the job until completion. 47 48 **Returns:** 49 - dict: The job results 50 """ 51 while True: 52 ingest_response = self.tdr.get_job_status(self.job_id) 53 if ingest_response.status_code == 202: 54 logging.info(f"TDR job {self.job_id} is still running") 55 # Check every x seconds if ingest is still running 56 time.sleep(self.check_interval) 57 elif ingest_response.status_code == 200: 58 response_json = json.loads(ingest_response.text) 59 if response_json["job_status"] == "succeeded": 60 logging.info(f"TDR job {self.job_id} succeeded") 61 if self.return_json: 62 request = self.tdr.get_job_result(self.job_id) 63 return json.loads(request.text) 64 # If not returning json, return None 65 return None 66 else: 67 logging.error(f"TDR job {self.job_id} failed") 68 self._raise_for_failed_job() 69 else: 70 logging.error(f"TDR job {self.job_id} failed") 71 self._raise_for_failed_job()
Monitor the job until completion.
Returns:
- dict: The job results
class
SubmitAndMonitorMultipleJobs:
74class SubmitAndMonitorMultipleJobs: 75 """A class to batch submit and monitor TDR jobs.""" 76 77 def __init__( 78 self, tdr: Any, 79 job_function: Callable, 80 job_args_list: list[tuple], 81 batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] 82 check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 83 verbose: bool = False 84 ): 85 """ 86 Initialize the SubmitAndMonitorMultipleJobs class. 87 88 **Args:** 89 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 90 - job_function (Callable): The function to submit a job. 91 - job_args_list (list[tuple]): A list of tuples containing the arguments for each job. 92 - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`. 93 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`. 94 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 95 """ 96 self.tdr = tdr 97 """@private""" 98 self.job_function = job_function 99 """@private""" 100 self.job_args_list = job_args_list 101 """@private""" 102 self.batch_size = batch_size 103 """@private""" 104 self.check_interval = check_interval 105 """@private""" 106 self.verbose = verbose 107 """@private""" 108 109 def run(self) -> None: 110 """ 111 Run the process to submit and monitor multiple jobs in batches. 112 113 Logs the progress and status of each batch and job. 114 """ 115 total_jobs = len(self.job_args_list) 116 logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") 117 118 # Process jobs in batches 119 for i in range(0, total_jobs, self.batch_size): 120 job_ids = [] 121 current_batch = self.job_args_list[i:i + self.batch_size] 122 logging.info( 123 f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." 124 ) 125 126 # Submit jobs for the current batch 127 for job_args in current_batch: 128 # Submit job with arguments and store the job ID 129 job_id = self.job_function(*job_args).json()["id"] 130 if self.verbose: 131 logging.info(f"Submitted job {job_id} with args {job_args}") 132 job_ids.append(job_id) 133 134 # Monitor jobs for the current batch 135 logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") 136 for job_id in job_ids: 137 MonitorTDRJob( 138 tdr=self.tdr, 139 job_id=job_id, 140 check_interval=self.check_interval, 141 return_json=False 142 ).run() 143 144 logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") 145 146 logging.info(f"Successfully processed {total_jobs} jobs.")
A class to batch submit and monitor TDR jobs.
SubmitAndMonitorMultipleJobs( tdr: Any, job_function: Callable, job_args_list: list[tuple], batch_size: int = 500, check_interval: int = 90, verbose: bool = False)
77 def __init__( 78 self, tdr: Any, 79 job_function: Callable, 80 job_args_list: list[tuple], 81 batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] 82 check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 83 verbose: bool = False 84 ): 85 """ 86 Initialize the SubmitAndMonitorMultipleJobs class. 87 88 **Args:** 89 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 90 - job_function (Callable): The function to submit a job. 91 - job_args_list (list[tuple]): A list of tuples containing the arguments for each job. 92 - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`. 93 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`. 94 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 95 """ 96 self.tdr = tdr 97 """@private""" 98 self.job_function = job_function 99 """@private""" 100 self.job_args_list = job_args_list 101 """@private""" 102 self.batch_size = batch_size 103 """@private""" 104 self.check_interval = check_interval 105 """@private""" 106 self.verbose = verbose 107 """@private"""
Initialize the SubmitAndMonitorMultipleJobs class.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - job_function (Callable): The function to submit a job.
- job_args_list (list[tuple]): A list of tuples containing the arguments for each job.
- batch_size (int, optional): The number of jobs to process in each batch. Defaults to
500
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
90
. - verbose (bool, optional): Whether to log detailed information about each job. Defaults to
False
.
def
run(self) -> None:
109 def run(self) -> None: 110 """ 111 Run the process to submit and monitor multiple jobs in batches. 112 113 Logs the progress and status of each batch and job. 114 """ 115 total_jobs = len(self.job_args_list) 116 logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") 117 118 # Process jobs in batches 119 for i in range(0, total_jobs, self.batch_size): 120 job_ids = [] 121 current_batch = self.job_args_list[i:i + self.batch_size] 122 logging.info( 123 f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." 124 ) 125 126 # Submit jobs for the current batch 127 for job_args in current_batch: 128 # Submit job with arguments and store the job ID 129 job_id = self.job_function(*job_args).json()["id"] 130 if self.verbose: 131 logging.info(f"Submitted job {job_id} with args {job_args}") 132 job_ids.append(job_id) 133 134 # Monitor jobs for the current batch 135 logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") 136 for job_id in job_ids: 137 MonitorTDRJob( 138 tdr=self.tdr, 139 job_id=job_id, 140 check_interval=self.check_interval, 141 return_json=False 142 ).run() 143 144 logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") 145 146 logging.info(f"Successfully processed {total_jobs} jobs.")
Run the process to submit and monitor multiple jobs in batches.
Logs the progress and status of each batch and job.