ops_utils.tdr_utils.tdr_job_utils

Utilities for interacting with TDR jobs.

  1"""Utilities for interacting with TDR jobs."""
  2import json
  3import logging
  4import time
  5from typing import Callable, Optional, Any
  6
  7from ..vars import ARG_DEFAULTS
  8
  9
 10class MonitorTDRJob:
 11    """A class to monitor the status of a TDR job until completion."""
 12
 13    def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool):
 14        """
 15        Initialize the MonitorTDRJob class.
 16
 17        **Args:**
 18        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 19        - job_id (str): The ID of the job to be monitored.
 20        - check_interval (int): The interval in seconds to wait between status checks.
 21        - return_json (bool): Whether to get and return the result of the job as json.
 22        """
 23        self.tdr = tdr
 24        """@private"""
 25        self.job_id = job_id
 26        """@private"""
 27        self.check_interval = check_interval
 28        """@private"""
 29        self.return_json = return_json
 30        """@private"""
 31
 32    def _raise_for_failed_job(self) -> None:
 33        """
 34        Raise an error with useful information if the job has failed.
 35
 36        Raises:
 37            ValueError: If the job has failed.
 38        """
 39        job_result = self.tdr.get_job_result(self.job_id, expect_failure=True)
 40        raise Exception(
 41            f"Status code {job_result.status_code}: {job_result.text}")
 42
 43    def run(self) -> Optional[dict]:
 44        """
 45        Monitor the job until completion.
 46
 47        **Returns:**
 48        - dict: The job results
 49        """
 50        while True:
 51            ingest_response = self.tdr.get_job_status(self.job_id)
 52            if ingest_response.status_code == 202:
 53                logging.info(f"TDR job {self.job_id} is still running")
 54                # Check every x seconds if ingest is still running
 55                time.sleep(self.check_interval)
 56            elif ingest_response.status_code == 200:
 57                response_json = json.loads(ingest_response.text)
 58                if response_json["job_status"] == "succeeded":
 59                    logging.info(f"TDR job {self.job_id} succeeded")
 60                    if self.return_json:
 61                        request = self.tdr.get_job_result(self.job_id)
 62                        return json.loads(request.text)
 63                    # If not returning json, return None
 64                    return None
 65                else:
 66                    logging.error(f"TDR job {self.job_id} failed")
 67                    self._raise_for_failed_job()
 68            else:
 69                logging.error(f"TDR job {self.job_id} failed")
 70                self._raise_for_failed_job()
 71
 72
 73class SubmitAndMonitorMultipleJobs:
 74    """A class to batch submit and monitor TDR jobs."""
 75
 76    def __init__(
 77            self, tdr: Any,
 78            job_function: Callable,
 79            job_args_list: list[tuple],
 80            batch_size: int = ARG_DEFAULTS["batch_size"],  # type: ignore[assignment]
 81            check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"],  # type: ignore[assignment]
 82            verbose: bool = False
 83    ):
 84        """
 85        Initialize the SubmitAndMonitorMultipleJobs class.
 86
 87        **Args:**
 88        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 89        - job_function (Callable): The function to submit a job.
 90        - job_args_list (list[tuple]): A list of tuples containing the arguments for each job.
 91        - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`.
 92        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`.
 93        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 94        """
 95        self.tdr = tdr
 96        """@private"""
 97        self.job_function = job_function
 98        """@private"""
 99        self.job_args_list = job_args_list
100        """@private"""
101        self.batch_size = batch_size
102        """@private"""
103        self.check_interval = check_interval
104        """@private"""
105        self.verbose = verbose
106        """@private"""
107
108    def run(self) -> None:
109        """
110        Run the process to submit and monitor multiple jobs in batches.
111
112        Logs the progress and status of each batch and job.
113        """
114        total_jobs = len(self.job_args_list)
115        logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}")
116
117        # Process jobs in batches
118        for i in range(0, total_jobs, self.batch_size):
119            job_ids = []
120            current_batch = self.job_args_list[i:i + self.batch_size]
121            logging.info(
122                f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs."
123            )
124
125            # Submit jobs for the current batch
126            for job_args in current_batch:
127                # Submit job with arguments and store the job ID
128                job_id = self.job_function(*job_args).json()["id"]
129                if self.verbose:
130                    logging.info(f"Submitted job {job_id} with args {job_args}")
131                job_ids.append(job_id)
132
133            # Monitor jobs for the current batch
134            logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}")
135            for job_id in job_ids:
136                MonitorTDRJob(
137                    tdr=self.tdr,
138                    job_id=job_id,
139                    check_interval=self.check_interval,
140                    return_json=False
141                ).run()
142
143            logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.")
144
145        logging.info(f"Successfully processed {total_jobs} jobs.")
class MonitorTDRJob:
11class MonitorTDRJob:
12    """A class to monitor the status of a TDR job until completion."""
13
14    def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool):
15        """
16        Initialize the MonitorTDRJob class.
17
18        **Args:**
19        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
20        - job_id (str): The ID of the job to be monitored.
21        - check_interval (int): The interval in seconds to wait between status checks.
22        - return_json (bool): Whether to get and return the result of the job as json.
23        """
24        self.tdr = tdr
25        """@private"""
26        self.job_id = job_id
27        """@private"""
28        self.check_interval = check_interval
29        """@private"""
30        self.return_json = return_json
31        """@private"""
32
33    def _raise_for_failed_job(self) -> None:
34        """
35        Raise an error with useful information if the job has failed.
36
37        Raises:
38            ValueError: If the job has failed.
39        """
40        job_result = self.tdr.get_job_result(self.job_id, expect_failure=True)
41        raise Exception(
42            f"Status code {job_result.status_code}: {job_result.text}")
43
44    def run(self) -> Optional[dict]:
45        """
46        Monitor the job until completion.
47
48        **Returns:**
49        - dict: The job results
50        """
51        while True:
52            ingest_response = self.tdr.get_job_status(self.job_id)
53            if ingest_response.status_code == 202:
54                logging.info(f"TDR job {self.job_id} is still running")
55                # Check every x seconds if ingest is still running
56                time.sleep(self.check_interval)
57            elif ingest_response.status_code == 200:
58                response_json = json.loads(ingest_response.text)
59                if response_json["job_status"] == "succeeded":
60                    logging.info(f"TDR job {self.job_id} succeeded")
61                    if self.return_json:
62                        request = self.tdr.get_job_result(self.job_id)
63                        return json.loads(request.text)
64                    # If not returning json, return None
65                    return None
66                else:
67                    logging.error(f"TDR job {self.job_id} failed")
68                    self._raise_for_failed_job()
69            else:
70                logging.error(f"TDR job {self.job_id} failed")
71                self._raise_for_failed_job()

A class to monitor the status of a TDR job until completion.

MonitorTDRJob(tdr: Any, job_id: str, check_interval: int, return_json: bool)
14    def __init__(self, tdr: Any, job_id: str, check_interval: int, return_json: bool):
15        """
16        Initialize the MonitorTDRJob class.
17
18        **Args:**
19        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
20        - job_id (str): The ID of the job to be monitored.
21        - check_interval (int): The interval in seconds to wait between status checks.
22        - return_json (bool): Whether to get and return the result of the job as json.
23        """
24        self.tdr = tdr
25        """@private"""
26        self.job_id = job_id
27        """@private"""
28        self.check_interval = check_interval
29        """@private"""
30        self.return_json = return_json
31        """@private"""

Initialize the MonitorTDRJob class.

Args:

  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): An instance of the TDR class.
  • job_id (str): The ID of the job to be monitored.
  • check_interval (int): The interval in seconds to wait between status checks.
  • return_json (bool): Whether to get and return the result of the job as json.
def run(self) -> Optional[dict]:
44    def run(self) -> Optional[dict]:
45        """
46        Monitor the job until completion.
47
48        **Returns:**
49        - dict: The job results
50        """
51        while True:
52            ingest_response = self.tdr.get_job_status(self.job_id)
53            if ingest_response.status_code == 202:
54                logging.info(f"TDR job {self.job_id} is still running")
55                # Check every x seconds if ingest is still running
56                time.sleep(self.check_interval)
57            elif ingest_response.status_code == 200:
58                response_json = json.loads(ingest_response.text)
59                if response_json["job_status"] == "succeeded":
60                    logging.info(f"TDR job {self.job_id} succeeded")
61                    if self.return_json:
62                        request = self.tdr.get_job_result(self.job_id)
63                        return json.loads(request.text)
64                    # If not returning json, return None
65                    return None
66                else:
67                    logging.error(f"TDR job {self.job_id} failed")
68                    self._raise_for_failed_job()
69            else:
70                logging.error(f"TDR job {self.job_id} failed")
71                self._raise_for_failed_job()

Monitor the job until completion.

Returns:

  • dict: The job results
class SubmitAndMonitorMultipleJobs:
 74class SubmitAndMonitorMultipleJobs:
 75    """A class to batch submit and monitor TDR jobs."""
 76
 77    def __init__(
 78            self, tdr: Any,
 79            job_function: Callable,
 80            job_args_list: list[tuple],
 81            batch_size: int = ARG_DEFAULTS["batch_size"],  # type: ignore[assignment]
 82            check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"],  # type: ignore[assignment]
 83            verbose: bool = False
 84    ):
 85        """
 86        Initialize the SubmitAndMonitorMultipleJobs class.
 87
 88        **Args:**
 89        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 90        - job_function (Callable): The function to submit a job.
 91        - job_args_list (list[tuple]): A list of tuples containing the arguments for each job.
 92        - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`.
 93        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`.
 94        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 95        """
 96        self.tdr = tdr
 97        """@private"""
 98        self.job_function = job_function
 99        """@private"""
100        self.job_args_list = job_args_list
101        """@private"""
102        self.batch_size = batch_size
103        """@private"""
104        self.check_interval = check_interval
105        """@private"""
106        self.verbose = verbose
107        """@private"""
108
109    def run(self) -> None:
110        """
111        Run the process to submit and monitor multiple jobs in batches.
112
113        Logs the progress and status of each batch and job.
114        """
115        total_jobs = len(self.job_args_list)
116        logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}")
117
118        # Process jobs in batches
119        for i in range(0, total_jobs, self.batch_size):
120            job_ids = []
121            current_batch = self.job_args_list[i:i + self.batch_size]
122            logging.info(
123                f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs."
124            )
125
126            # Submit jobs for the current batch
127            for job_args in current_batch:
128                # Submit job with arguments and store the job ID
129                job_id = self.job_function(*job_args).json()["id"]
130                if self.verbose:
131                    logging.info(f"Submitted job {job_id} with args {job_args}")
132                job_ids.append(job_id)
133
134            # Monitor jobs for the current batch
135            logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}")
136            for job_id in job_ids:
137                MonitorTDRJob(
138                    tdr=self.tdr,
139                    job_id=job_id,
140                    check_interval=self.check_interval,
141                    return_json=False
142                ).run()
143
144            logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.")
145
146        logging.info(f"Successfully processed {total_jobs} jobs.")

A class to batch submit and monitor TDR jobs.

SubmitAndMonitorMultipleJobs( tdr: Any, job_function: Callable, job_args_list: list[tuple], batch_size: int = 500, check_interval: int = 90, verbose: bool = False)
 77    def __init__(
 78            self, tdr: Any,
 79            job_function: Callable,
 80            job_args_list: list[tuple],
 81            batch_size: int = ARG_DEFAULTS["batch_size"],  # type: ignore[assignment]
 82            check_interval: int = ARG_DEFAULTS["waiting_time_to_poll"],  # type: ignore[assignment]
 83            verbose: bool = False
 84    ):
 85        """
 86        Initialize the SubmitAndMonitorMultipleJobs class.
 87
 88        **Args:**
 89        - tdr (`ops_utils.tdr_utils.tdr_api_utils.TDR`): An instance of the TDR class.
 90        - job_function (Callable): The function to submit a job.
 91        - job_args_list (list[tuple]): A list of tuples containing the arguments for each job.
 92        - batch_size (int, optional): The number of jobs to process in each batch. Defaults to `500`.
 93        - check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to `90`.
 94        - verbose (bool, optional): Whether to log detailed information about each job. Defaults to `False`.
 95        """
 96        self.tdr = tdr
 97        """@private"""
 98        self.job_function = job_function
 99        """@private"""
100        self.job_args_list = job_args_list
101        """@private"""
102        self.batch_size = batch_size
103        """@private"""
104        self.check_interval = check_interval
105        """@private"""
106        self.verbose = verbose
107        """@private"""

Initialize the SubmitAndMonitorMultipleJobs class.

Args:

  • tdr (ops_utils.tdr_utils.tdr_api_utils.TDR): An instance of the TDR class.
  • job_function (Callable): The function to submit a job.
  • job_args_list (list[tuple]): A list of tuples containing the arguments for each job.
  • batch_size (int, optional): The number of jobs to process in each batch. Defaults to 500.
  • check_interval (int, optional): The interval in seconds to wait between status checks. Defaults to 90.
  • verbose (bool, optional): Whether to log detailed information about each job. Defaults to False.
def run(self) -> None:
109    def run(self) -> None:
110        """
111        Run the process to submit and monitor multiple jobs in batches.
112
113        Logs the progress and status of each batch and job.
114        """
115        total_jobs = len(self.job_args_list)
116        logging.info(f"Processing {total_jobs} {self.job_function.__name__} jobs in batches of {self.batch_size}")
117
118        # Process jobs in batches
119        for i in range(0, total_jobs, self.batch_size):
120            job_ids = []
121            current_batch = self.job_args_list[i:i + self.batch_size]
122            logging.info(
123                f"Submitting jobs for batch {i // self.batch_size + 1} with {len(current_batch)} jobs."
124            )
125
126            # Submit jobs for the current batch
127            for job_args in current_batch:
128                # Submit job with arguments and store the job ID
129                job_id = self.job_function(*job_args).json()["id"]
130                if self.verbose:
131                    logging.info(f"Submitted job {job_id} with args {job_args}")
132                job_ids.append(job_id)
133
134            # Monitor jobs for the current batch
135            logging.info(f"Monitoring {len(current_batch)} jobs in batch {i // self.batch_size + 1}")
136            for job_id in job_ids:
137                MonitorTDRJob(
138                    tdr=self.tdr,
139                    job_id=job_id,
140                    check_interval=self.check_interval,
141                    return_json=False
142                ).run()
143
144            logging.info(f"Completed batch {i // self.batch_size + 1} with {len(current_batch)} jobs.")
145
146        logging.info(f"Successfully processed {total_jobs} jobs.")

Run the process to submit and monitor multiple jobs in batches.

Logs the progress and status of each batch and job.