ops_utils.tdr_utils.tdr_job_utils
Utilities for interacting with TDR jobs.
1"""Utilities for interacting with TDR jobs.""" 2import json 3import logging 4import time 5from typing import Callable, Optional, Any 6 7from ..vars import ARG_DEFAULTS 8 9 10class MonitorTDRJob: 11 """A class to monitor the status of a TDR job until completion.""" 12 13 def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool): 14 """ 15 Initialize the MonitorTDRJob class. 16 17 **Args:** 18 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 19 - job_id (str): The ID of the job to be monitored. 20 - check_interval (int): The interval in seconds to wait between status checks. 21 - return_json (bool): Whether to get and return the result of the job as json. 22 """ 23 self.tdr = tdr 24 """@private""" 25 self.job_id = job_id 26 """@private""" 27 self.check_interval = check_interval 28 """@private""" 29 self.return_json = return_json 30 """@private""" 31 32 def _raise_for_failed_job(self) -> None: 33 """ 34 Raise an error with useful information if the job has failed. 35 36 Raises: 37 ValueError: If the job has failed. 38 """ 39 job_result = self.tdr.get_job_result(self.job_id, expect_failure=True) 40 raise Exception( 41 f"Status code {job_result.status_code}: {job_result.text}") 42 43 def run(self) -> Optional[dict]: 44 """ 45 Monitor the job until completion. 46 47 **Returns:** 48 - dict: The job results 49 """ 50 while True: 51 ingest_response = self.tdr.get_job_status(self.job_id) 52 if ingest_response.status_code == 202: 53 logging.info(f"TDR job {self.job_id} is still running") 54 # Check every x seconds if ingest is still running 55 time.sleep(self.check_interval) 56 elif ingest_response.status_code == 200: 57 response_json = json.loads(ingest_response.text) 58 if response_json["job_status"] == "succeeded": 59 logging.info(f"TDR job {self.job_id} succeeded") 60 if self.return_json: 61 request = self.tdr.get_job_result(self.job_id) 62 return json.loads(request.text) 63 # If not returning json, return None 64 return None 65 else: 66 logging.error(f"TDR job {self.job_id} failed") 67 self._raise_for_failed_job() 68 else: 69 logging.error(f"TDR job {self.job_id} failed") 70 self._raise_for_failed_job() 71 72 73class SubmitAndMonitorMultipleJobs: 74 """A class to batch submit and monitor TDR jobs.""" 75 76 def __init__( 77 self, tdr: Any, 78 job_function: Callable, 79 job_args_list: list[tuple], 80 batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] 81 check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 82 verbose: bool = False 83 ): 84 """ 85 Initialize the SubmitAndMonitorMultipleJobs class. 86 87 **Args:** 88 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 89 - job_function (Callable): The function to submit a job. 90 - job_args_list (list[tuple]): A list of tuples containing the arguments for each job. 91 - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`. 92 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`. 93 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 94 """ 95 self.tdr = tdr 96 """@private""" 97 self.job_function = job_function 98 """@private""" 99 self.job_args_list = job_args_list 100 """@private""" 101 self.batch_size = batch_size 102 """@private""" 103 self.check_interval = check_interval 104 """@private""" 105 self.verbose = verbose 106 """@private""" 107 108 def run(self) -> None: 109 """ 110 Run the process to submit and monitor multiple jobs in batches. 111 112 Logs the progress and status of each batch and job. 113 114 Failed jobs are collected and printed out at the end of processing. 115 """ 116 failed_jobs = [] 117 total_jobs = len(self.job_args_list) 118 logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") 119 120 # Process jobs in batches 121 for i in range(0, total_jobs, self.batch_size): 122 job_ids = [] 123 current_batch = self.job_args_list[i:i + self.batch_size] 124 logging.info( 125 f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." 126 ) 127 128 # Submit jobs for the current batch 129 for job_args in current_batch: 130 # Submit job with arguments and store the job ID 131 job_id = self.job_function(*job_args).json()["id"] 132 if self.verbose: 133 logging.info(f"Submitted job {job_id} with args {job_args}") 134 job_ids.append(job_id) 135 136 # Monitor jobs for the current batch 137 logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") 138 for job_id in job_ids: 139 try: 140 MonitorTDRJob( 141 tdr=self.tdr, 142 job_id=job_id, 143 check_interval=self.check_interval, 144 return_json=False 145 ).run() 146 except Exception as e: 147 logging.error(f"Job {job_id} failed: {e}") 148 failed_jobs.append(job_id) 149 150 logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") 151 152 logging.info(f"Successfully processed {total_jobs - len(failed_jobs)} jobs.") 153 154 if len(failed_jobs) > 0: 155 raise Exception( 156 f"The following job IDs failed: {', '.join(failed_jobs)}")
class
MonitorTDRJob:
11class MonitorTDRJob: 12 """A class to monitor the status of a TDR job until completion.""" 13 14 def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool): 15 """ 16 Initialize the MonitorTDRJob class. 17 18 **Args:** 19 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 20 - job_id (str): The ID of the job to be monitored. 21 - check_interval (int): The interval in seconds to wait between status checks. 22 - return_json (bool): Whether to get and return the result of the job as json. 23 """ 24 self.tdr = tdr 25 """@private""" 26 self.job_id = job_id 27 """@private""" 28 self.check_interval = check_interval 29 """@private""" 30 self.return_json = return_json 31 """@private""" 32 33 def _raise_for_failed_job(self) -> None: 34 """ 35 Raise an error with useful information if the job has failed. 36 37 Raises: 38 ValueError: If the job has failed. 39 """ 40 job_result = self.tdr.get_job_result(self.job_id, expect_failure=True) 41 raise Exception( 42 f"Status code {job_result.status_code}: {job_result.text}") 43 44 def run(self) -> Optional[dict]: 45 """ 46 Monitor the job until completion. 47 48 **Returns:** 49 - dict: The job results 50 """ 51 while True: 52 ingest_response = self.tdr.get_job_status(self.job_id) 53 if ingest_response.status_code == 202: 54 logging.info(f"TDR job {self.job_id} is still running") 55 # Check every x seconds if ingest is still running 56 time.sleep(self.check_interval) 57 elif ingest_response.status_code == 200: 58 response_json = json.loads(ingest_response.text) 59 if response_json["job_status"] == "succeeded": 60 logging.info(f"TDR job {self.job_id} succeeded") 61 if self.return_json: 62 request = self.tdr.get_job_result(self.job_id) 63 return json.loads(request.text) 64 # If not returning json, return None 65 return None 66 else: 67 logging.error(f"TDR job {self.job_id} failed") 68 self._raise_for_failed_job() 69 else: 70 logging.error(f"TDR job {self.job_id} failed") 71 self._raise_for_failed_job()
A class to monitor the status of a TDR job until completion.
MonitorTDRJob(tdr: Any, job_id: str, check_interval: int, return_json: bool)
14 def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool): 15 """ 16 Initialize the MonitorTDRJob class. 17 18 **Args:** 19 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 20 - job_id (str): The ID of the job to be monitored. 21 - check_interval (int): The interval in seconds to wait between status checks. 22 - return_json (bool): Whether to get and return the result of the job as json. 23 """ 24 self.tdr = tdr 25 """@private""" 26 self.job_id = job_id 27 """@private""" 28 self.check_interval = check_interval 29 """@private""" 30 self.return_json = return_json 31 """@private"""
Initialize the MonitorTDRJob class.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - job_id (str): The ID of the job to be monitored.
- check_interval (int): The interval in seconds to wait between status checks.
- return_json (bool): Whether to get and return the result of the job as json.
def
run(self) -> Optional[dict]:
44 def run(self) -> Optional[dict]: 45 """ 46 Monitor the job until completion. 47 48 **Returns:** 49 - dict: The job results 50 """ 51 while True: 52 ingest_response = self.tdr.get_job_status(self.job_id) 53 if ingest_response.status_code == 202: 54 logging.info(f"TDR job {self.job_id} is still running") 55 # Check every x seconds if ingest is still running 56 time.sleep(self.check_interval) 57 elif ingest_response.status_code == 200: 58 response_json = json.loads(ingest_response.text) 59 if response_json["job_status"] == "succeeded": 60 logging.info(f"TDR job {self.job_id} succeeded") 61 if self.return_json: 62 request = self.tdr.get_job_result(self.job_id) 63 return json.loads(request.text) 64 # If not returning json, return None 65 return None 66 else: 67 logging.error(f"TDR job {self.job_id} failed") 68 self._raise_for_failed_job() 69 else: 70 logging.error(f"TDR job {self.job_id} failed") 71 self._raise_for_failed_job()
Monitor the job until completion.
Returns:
- dict: The job results
class
SubmitAndMonitorMultipleJobs:
74class SubmitAndMonitorMultipleJobs: 75 """A class to batch submit and monitor TDR jobs.""" 76 77 def __init__( 78 self, tdr: Any, 79 job_function: Callable, 80 job_args_list: list[tuple], 81 batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] 82 check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 83 verbose: bool = False 84 ): 85 """ 86 Initialize the SubmitAndMonitorMultipleJobs class. 87 88 **Args:** 89 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 90 - job_function (Callable): The function to submit a job. 91 - job_args_list (list[tuple]): A list of tuples containing the arguments for each job. 92 - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`. 93 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`. 94 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 95 """ 96 self.tdr = tdr 97 """@private""" 98 self.job_function = job_function 99 """@private""" 100 self.job_args_list = job_args_list 101 """@private""" 102 self.batch_size = batch_size 103 """@private""" 104 self.check_interval = check_interval 105 """@private""" 106 self.verbose = verbose 107 """@private""" 108 109 def run(self) -> None: 110 """ 111 Run the process to submit and monitor multiple jobs in batches. 112 113 Logs the progress and status of each batch and job. 114 115 Failed jobs are collected and printed out at the end of processing. 116 """ 117 failed_jobs = [] 118 total_jobs = len(self.job_args_list) 119 logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") 120 121 # Process jobs in batches 122 for i in range(0, total_jobs, self.batch_size): 123 job_ids = [] 124 current_batch = self.job_args_list[i:i + self.batch_size] 125 logging.info( 126 f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." 127 ) 128 129 # Submit jobs for the current batch 130 for job_args in current_batch: 131 # Submit job with arguments and store the job ID 132 job_id = self.job_function(*job_args).json()["id"] 133 if self.verbose: 134 logging.info(f"Submitted job {job_id} with args {job_args}") 135 job_ids.append(job_id) 136 137 # Monitor jobs for the current batch 138 logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") 139 for job_id in job_ids: 140 try: 141 MonitorTDRJob( 142 tdr=self.tdr, 143 job_id=job_id, 144 check_interval=self.check_interval, 145 return_json=False 146 ).run() 147 except Exception as e: 148 logging.error(f"Job {job_id} failed: {e}") 149 failed_jobs.append(job_id) 150 151 logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") 152 153 logging.info(f"Successfully processed {total_jobs - len(failed_jobs)} jobs.") 154 155 if len(failed_jobs) > 0: 156 raise Exception( 157 f"The following job IDs failed: {', '.join(failed_jobs)}")
A class to batch submit and monitor TDR jobs.
SubmitAndMonitorMultipleJobs( tdr: Any, job_function: Callable, job_args_list: list[tuple], batch_size: int = 500, check_interval: int = 90, verbose: bool = False)
77 def __init__( 78 self, tdr: Any, 79 job_function: Callable, 80 job_args_list: list[tuple], 81 batch_size: int = ARG_DEFAULTS["batch_size"], # type: ignore[assignment] 82 check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"], # type: ignore[assignment] 83 verbose: bool = False 84 ): 85 """ 86 Initialize the SubmitAndMonitorMultipleJobs class. 87 88 **Args:** 89 - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class. 90 - job_function (Callable): The function to submit a job. 91 - job_args_list (list[tuple]): A list of tuples containing the arguments for each job. 92 - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`. 93 - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`. 94 - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`. 95 """ 96 self.tdr = tdr 97 """@private""" 98 self.job_function = job_function 99 """@private""" 100 self.job_args_list = job_args_list 101 """@private""" 102 self.batch_size = batch_size 103 """@private""" 104 self.check_interval = check_interval 105 """@private""" 106 self.verbose = verbose 107 """@private"""
Initialize the SubmitAndMonitorMultipleJobs class.
Args:
- tdr (
ops_utils.tdr_utils.tdr_api_utils.TDR
): An instance of the TDR class. - job_function (Callable): The function to submit a job.
- job_args_list (list[tuple]): A list of tuples containing the arguments for each job.
- batch_size (int, optional): The number of jobs to process in each batch. Defaults to
500
. - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to
90
. - verbose (bool, optional): Whether to log detailed information about each job. Defaults to
False
.
def
run(self) -> None:
109 def run(self) -> None: 110 """ 111 Run the process to submit and monitor multiple jobs in batches. 112 113 Logs the progress and status of each batch and job. 114 115 Failed jobs are collected and printed out at the end of processing. 116 """ 117 failed_jobs = [] 118 total_jobs = len(self.job_args_list) 119 logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}") 120 121 # Process jobs in batches 122 for i in range(0, total_jobs, self.batch_size): 123 job_ids = [] 124 current_batch = self.job_args_list[i:i + self.batch_size] 125 logging.info( 126 f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs." 127 ) 128 129 # Submit jobs for the current batch 130 for job_args in current_batch: 131 # Submit job with arguments and store the job ID 132 job_id = self.job_function(*job_args).json()["id"] 133 if self.verbose: 134 logging.info(f"Submitted job {job_id} with args {job_args}") 135 job_ids.append(job_id) 136 137 # Monitor jobs for the current batch 138 logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}") 139 for job_id in job_ids: 140 try: 141 MonitorTDRJob( 142 tdr=self.tdr, 143 job_id=job_id, 144 check_interval=self.check_interval, 145 return_json=False 146 ).run() 147 except Exception as e: 148 logging.error(f"Job {job_id} failed: {e}") 149 failed_jobs.append(job_id) 150 151 logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.") 152 153 logging.info(f"Successfully processed {total_jobs - len(failed_jobs)} jobs.") 154 155 if len(failed_jobs) > 0: 156 raise Exception( 157 f"The following job IDs failed: {', '.join(failed_jobs)}")
Run the process to submit and monitor multiple jobs in batches.
Logs the progress and status of each batch and job.
Failed jobs are collected and printed out at the end of processing.