simulators
Provides classes and utilities for managing simulation jobs, handling submission, monitoring, and status updates through various hardware interfaces. This module ensures consistent management of simulations with robust error handling and logging mechanisms.
Core Classes
SimulationsLog
Manages the logging of simulation jobs, including submission details, statuses, and outputs.
Supports querying and updating job records.
JobManager
Orchestrates the lifecycle of simulation jobs, including submission, monitoring, and cancellation.
Utilizes hardware interfaces for job execution.
Job Strategies
Defines strategies for handling jobs based on their current statuses. Each strategy dictates the actions to be taken for specific job states.
-
CompletedJobStrategy
Handles jobs that have completed execution by recording results and updating statuses. -
FailedJobStrategy
Manages jobs that have failed, ensuring proper status updates and cleanup. -
FailedSubmitJobStrategy
Handles jobs that failed during submission, updating logs accordingly. -
RunningJobStrategy
Monitors jobs currently running to ensure their statuses are correctly reflected. -
SubmittedJobStrategy
Manages jobs that have been submitted but are not yet completed or failed. -
PendingSubmitJobStrategy
Attempts submission of jobs, handling retries and updating statuses in case of failures. -
PendingCancelJobStrategy
Handles cancellation requests, including retries and status updates.
Utilities
JobIDGenerator
Generates unique job IDs based on the current datetime, ensuring uniqueness even in
concurrent environments.
Exceptions
SimulationsLogLookupError
Raised when a simulation log does not contain a particular record.
InvalidJobStatusError
Raised when a job's status is inappropriate for a specific action.
UnknownJobIdError
Raised when a provided job ID does not correspond to any known job.
Simulation = tuple[Input, Optional[Real]]
module-attribute
A type to represent a simulator input, possibly with corresponding simulator output.
CompletedJobStrategy
Bases: JobStrategy
Implements the strategy for handling jobs that have completed execution.
Upon invocation, this strategy retrieves the job's output from the simulation environment, updates the job's record in the simulations log to reflect its completion, and then removes the job from the JobManager's monitoring list.
Parameters:
-
job
(Job
) –The job that has completed its execution.
-
job_manager
(JobManager
) –The manager responsible for overseeing the job's lifecycle.
Source code in exauq/sim_management/simulators.py
FailedJobStrategy
Bases: JobStrategy
Strategy for handling jobs that have failed during execution.
This strategy updates the job's status in the simulations log to FAILED and removes the job from the JobManager's list of active jobs. It encapsulates the actions to be taken when a job does not complete successfully.
Parameters:
-
job
(Job
) –The job that has failed.
-
job_manager
(JobManager
) –The manager overseeing the job's lifecycle and responsible for its monitoring and logging.
Source code in exauq/sim_management/simulators.py
FailedSubmitJobStrategy
Bases: JobStrategy
Strategy for handling jobs that have failed to submit.
This strategy updates the job's status in the simulations log to FAILED_SUBMIT and removes the job from the JobManager's list of active jobs. It encapsulates the actions to be taken when a job fails to submit for execution.
Parameters:
-
job
(Job
) –The job that has failed to submit.
-
job_manager
(JobManager
) –The manager overseeing the job's lifecycle, including monitoring and logging.
Source code in exauq/sim_management/simulators.py
InvalidJobStatusError
Bases: Exception
Raised when the status of a job is not appropriate for some action.
Source code in exauq/sim_management/simulators.py
JobIDGenerator
A generator for unique job IDs, encapsulated within a JobId object, based on the current datetime down to the millisecond. This class provides a thread-safe mechanism to generate unique job IDs by ensuring that each ID corresponds to a unique point in time, formatted as 'YYYYMMDDHHMMSSfff', where 'fff' represents milliseconds.
In scenarios where multiple IDs are requested within the same millisecond, this generator will wait until the next millisecond to generate a new ID, ensuring the uniqueness of each ID without relying on additional counters.
Methods:
-
generate_id
–Generates a unique JobId object representing the job ID, formatted as 'YYYYMMDDHHMMSSfff', ensuring that each generated ID is unique to the millisecond.
Examples:
>>> id_generator = JobIDGenerator()
>>> job_id = id_generator.generate_id()
>>> print(job_id)
JobId('20240101123001005')
Source code in exauq/sim_management/simulators.py
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 |
|
__init__()
Initializes the JobIDGenerator, preparing it for generating unique job IDs.
generate_id()
Generates a unique job ID based on the current datetime down to the millisecond. If a request for a new ID occurs within the same millisecond as the previous ID, the method waits until the next millisecond to ensure uniqueness.
Returns:
-
JobId
–A JobId object encapsulating a unique job ID, formatted as 'YYYYMMDDHHMMSSfff', ensuring uniqueness to the millisecond.
Examples:
>>> id_generator = JobIDGenerator()
>>> job_id = id_generator.generate_id()
>>> print(job_id)
JobId('20240101123001005')
Source code in exauq/sim_management/simulators.py
JobManager
Orchestrates the submission, monitoring, and status management of simulation jobs within a simulation environment. Utilizes a specified hardware interface for job execution and interacts with a simulations log for recording job activities.
This manager supports dynamic job status updates, retry strategies for submission failures, and employs a strategy pattern for handling different job statuses, making the system adaptable to various simulation requirements and hardware interfaces.
Parameters:
-
simulations_log
(SimulationsLog
) –A log for recording and retrieving details of simulation jobs.
-
interfaces
(list[HardwareInterface]
) –A list of abstract interfaces to the hardware or simulation environment where jobs are executed.
-
polling_interval
(int
, default:10
) –Time interval, in seconds, for polling job statuses during monitoring. Defaults to 10 seconds.
-
wait_for_pending
(bool
, default:False
) –Specifies whether the manager should wait for all pending jobs to reach a conclusive status (e.g., COMPLETED or FAILED) upon initialization. Defaults to False.
Methods:
-
submit
–Submits a new simulation job based on the provided simulation input. Handles initial job logging and sets status to PENDING_SUBMIT.
-
monitor
–Initiates or resumes monitoring of job statuses in a separate background thread.
-
cancel
–Cancels a job with the given ID, if it has not yet reached a terminal status.
-
get_interface
–Retrieves the hardware interface with the given name.
-
remove_job
–Removes a job from the internal list of jobs being monitored.
-
shutdown
–Cleanly terminates the monitoring thread and releases all resources.
-
simulations_log : property
–Provides read-only access to the simulations log object for job recording and retrieval.
Raises:
-
SimulationsLogLookupError
–If operations on the simulations log encounter inconsistencies, such as missing records or duplicate job IDs.
-
UnknownJobIdError
–If an attempt is made to cancel a job that does not exist in the simulations log.
-
InvalidJobStatusError
–If an attempt is made to cancel a job that has already reached a terminal status.
Examples:
>>> job_manager = JobManager(simulations_log, hardware_interface)
>>> input_data = Input(0.0, 1.0)
>>> job = job_manager.submit(input_data)
>>> job_manager.shutdown()
The job manager handles the submission, monitors the job's progress, updates its status accordingly in the simulations log, and ensures proper shutdown of monitoring threads.
Source code in exauq/sim_management/simulators.py
|
|
interface_job_counts: dict[str, int]
property
Provides a thread-safe, read-only view of the job monitoring counts per interface.
Returns:
-
dict[str, int]
–A dictionary mapping interface names to the number of jobs being monitored.
simulations_log
property
(Read-only) The simulations log for job recording and retrieval.
cancel(job_id)
Cancels a job with the given ID.
This method attempts to cancel a job identified by the provided job ID. It first checks
if the job is actively being monitored. If the job is found, it updates its status to
PENDING_CANCEL
, signaling that the cancellation process is underway.
If the job is not currently monitored, the method queries the simulations log to check
its status. If the job has already reached a terminal state (e.g., COMPLETED, FAILED),
an InvalidJobStatusError
is raised as such jobs cannot be cancelled. If no job with
the provided ID exists, an UnknownJobIdError
is raised.
Parameters:
-
job_id
(JobId
) –The unique identifier of the job to be cancelled.
Returns:
-
Job
–The job object representing the job that was marked for cancellation.
Raises:
-
UnknownJobIdError
–If the provided ID does not correspond to any job in the simulations log.
-
InvalidJobStatusError
–If the job has already reached a terminal status and cannot be cancelled.
Examples:
Cancel an active job with ID '12345':
Attempt to cancel a job that has already completed:
>>> try:
... job_manager.cancel(JobId('67890'))
... except InvalidJobStatusError as e:
... print(f"Cannot cancel job: {e.status}")
Attempt to cancel a non-existent job:
>>> try:
... job_manager.cancel(JobId('00000'))
... except UnknownJobIdError:
... print("Job ID not found in the simulations log.")
Notes
- This method is thread-safe, ensuring consistency when accessed concurrently.
- Only jobs that have not yet reached a terminal status can be cancelled.
Source code in exauq/sim_management/simulators.py
get_interface(interface_name)
Get the hardware interface with the given name.
Parameters:
-
interface_name
(str
) –The name of the hardware interface to retrieve.
Returns:
-
HardwareInterface
–The hardware interface with the given name.
Raises:
-
ValueError
–If no interface with the given name is found.
Source code in exauq/sim_management/simulators.py
monitor(jobs)
Initiates or resumes monitoring of the specified jobs for status updates.
Adds the provided list of jobs to the monitoring queue and starts or restarts the monitoring thread if it's not currently active. This ensures that all jobs are continuously monitored for status changes until they are completed or fail.
Parameters:
-
jobs
(Sequence[Job]
) –A sequence of Job objects to be monitored.
Notes
This method is thread-safe and ensures that multiple calls to monitor jobs concurrently will not interfere with each other or duplicate monitoring efforts.
Example
job_manager.monitor([job1, job2])
Adds job1
and job2
to the monitoring queue and starts monitoring their statuses.
Source code in exauq/sim_management/simulators.py
remove_job(job)
Safely removes a job from the monitored jobs list and updates the interface job count.
This method ensures thread-safe removal of the specified job from the internal list of monitored jobs. It also decrements the count of jobs assigned to the job's associated hardware interface.
Parameters:
-
job
(Job
) –The job instance to be removed from monitoring.
Examples:
This command removes the given job
from the JobManager's internal list, stopping
its monitoring and updating the job count for its associated hardware interface.
Source code in exauq/sim_management/simulators.py
shutdown()
Cleanly terminates the monitoring thread and ensures all resources are properly released.
This method signals the monitoring thread to stop by setting a shutdown event. It waits for the monitoring thread to terminate, ensuring that the job manager is cleanly shut down. This is particularly useful to call before exiting an application to ensure that no threads remain running in the background.
Notes
If the monitoring thread is not active, this method will return immediately. It ensures thread-safe shutdown operations and can be called from any thread.
Examples:
This example demonstrates how to properly shut down the JobManager's monitoring capabilities, ensuring that the application can be closed without leaving orphaned threads.
Source code in exauq/sim_management/simulators.py
submit(x, level=1)
Submits a new simulation job. This method creates a job with a unique ID, logs it with a PENDING_SUBMIT status, and schedules it for submission through the appropriate job handling strategy.
Upon initialisation, the job is assigned a unique ID and recorded in the simulations log with a PENDING_SUBMIT status. It is then passed to a job handling strategy, which is tasked with submitting the job to the simulation hardware. The method returns the Job instance, allowing for further interaction or querying of its status.
Parameters:
-
x
(Input
) –The input data for the simulation job.
-
level
(int
, default:1
) –The level of the job. Defaults to 1.
Returns:
-
Job
–The initialised and logged Job object.
Examples:
This example demonstrates creating a job with the specified input parameters, logging it, and obtaining its unique ID. The job is prepared for submission through the job handling strategies.
Source code in exauq/sim_management/simulators.py
JobStrategy
Bases: ABC
Defines a template for job handling strategies in the simulation job management system.
This abstract base class outlines the required interface for all job handling strategies. Concrete implementations of this class will define specific actions to be taken based on the job's current status.
Methods:
-
handle
–Executes the strategy's actions for a given job within the context of the provided job manager.
Source code in exauq/sim_management/simulators.py
handle(job, job_manager)
abstractmethod
staticmethod
Handle a job according to the strategy's specific actions.
This method should be implemented by subclasses to define how a job should be processed, based on its status or other criteria. It may involve submitting the job, updating its status, or performing cleanup actions.
Parameters:
-
job
(Job
) –The job to be handled, which contains the necessary information for processing.
-
job_manager
(JobManager
) –The job manager instance, providing context and access to job management functionalities.
Raises:
-
NotImplementedError
–If the subclass does not implement this method.
Source code in exauq/sim_management/simulators.py
PendingCancelJobStrategy
Bases: JobStrategy
Strategy for handling jobs that have been cancelled.
This strategy attempts to cancel the job with up to 5 retries, using exponential backoff and jitter to manage temporary issues like network congestion or service unavailability. If cancellation fails after all retries, the job's status remains unchanged.
As part of cancellation, the status of the job is checked from the hardware interface.
If the job is not one of the TERMINAL_STATUSES
then cancellation is attempted and,
if successful, the simulations log of the supplied job_manager
is updated to reflect
the new CANCELLED status and the job is removed from the queue of monitored jobs
within job_manager
. On the other hand, if the job is found to be one of the
TERMINAL_STATUSES
then the job is not cancelled: instead, the simulations log of
job_manager
is updated to reflect the current status and the job is removed from the
queue of monitored jobs.
Parameters:
-
job
(Job
) –The job to be cancelled.
-
job_manager
(JobManager
) –The manager overseeing the job's lifecycle, including its submission, monitoring, and status updates.
Source code in exauq/sim_management/simulators.py
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 |
|
PendingSubmitJobStrategy
Bases: JobStrategy
Strategy for handling jobs that have not yet been submitted.
This strategy attempts to submit the job with up to 5 retries, using exponential backoff and jitter to manage temporary issues like network congestion or service unavailability. If submission fails after all retries, the job's status is marked as FAILED_SUBMIT.
Parameters:
-
job
(Job
) –The job to be submitted.
-
job_manager
(JobManager
) –The manager responsible for job submission, monitoring, and logging.
Notes
This strategy uses exponential backoff to increase the delay between each retry attempt, and jitter to avoid thundering herd problems.
Source code in exauq/sim_management/simulators.py
RunningJobStrategy
Bases: JobStrategy
Strategy for handling jobs that are currently running.
This strategy checks if a job's status is not already marked as RUNNING in the simulations log. If not, it updates the job's status to RUNNING. This ensures the job's current state is accurately reflected in the simulations log without unnecessarily updating the status of jobs already marked as running.
Parameters:
-
job
(Job
) –The job that is currently executing.
-
job_manager
(JobManager
) –The manager responsible for the job's lifecycle, including monitoring and logging.
Source code in exauq/sim_management/simulators.py
SimulationsLog
Bases: object
An interface to a log file containing details of simulations.
The log file is a csv file containing a record of simulations that have been submitted
for computation; it will be created at the supplied file path upon initialisation. The
input of each submission is recorded along with the simulator output, if this has been
computed. Columns that give the input coordinates should have headings 'Input_n' where
n
is the index of the coordinate (starting at 1). The column giving the simulator
output should have the heading 'Output'.
Parameters:
-
file
(FilePath
) –A path to the underlying log file containing details of simulations.
-
input_dim
(int
) –The number of coordinates needed to define an input to the simulator.
Source code in exauq/sim_management/simulators.py
|
|
add_new_record(x, job_id, job_status=JobStatus.PENDING_SUBMIT, job_level=1, interface_name=None)
Record a new simulation job in the log file.
This method adds a new record for a simulation job with a given input, job ID, and job status. It ensures that the job ID is unique and not None, and that the input dimension matches the expected dimension.
Parameters:
-
x
(Input
) –An input for the simulator to evaluate.
-
job_id
(Union[str, JobId, int]
) –The ID for the job of evaluating the simulator at
x
. Must consist only of digits and cannot be None. -
job_status
(JobStatus
, default:PENDING_SUBMIT
) –The status of the job to be recorded alongside the input
x
. Defaults to JobStatus.PENDING_SUBMIT. -
job_level
(int
, default:1
) –The level of the job. Defaults to 1.
-
interface_name
(Optional[str]
, default:None
) –The name of the interface that the job is assigned to. Defaults to None.
Raises:
-
ValueError
–- If
job_id
does not consist solely of digits or is None. - If the input
x
does not have the expected number of coordinates. - If the
job_id
is already in use.
- If
Source code in exauq/sim_management/simulators.py
get_job_status(job_id)
Retrieves the current status of a specified job from the simulations log.
This method queries the simulations log database for a job with the given ID and returns its current status. It is thread-safe, ensuring consistent reads even when accessed concurrently from multiple threads. If the job ID does not exist in the database it raises an exception.
Parameters:
-
job_id
(Union[str, JobId]
) –The unique identifier of the job whose status is to be retrieved.
Returns:
-
JobStatus
–The current status of the job as an instance of the
JobStatus
enum.
Raises:
-
SimulationsLogLookupError
–If there isn't a log record having job ID
job_id
.
Examples:
This example returns the JobStatus.RUNNING
enum, indicating that the job with
ID '12345' is currently running.
Notes
This method is particularly useful for monitoring the progress of jobs and handling them based on their current state. It enforces data integrity by ensuring that each job ID is unique and correctly mapped to a valid job status.
Source code in exauq/sim_management/simulators.py
get_non_terminated_jobs()
Return all jobs which don't have results and have a non-terminal status.
A job is considered non-terminal if it has one of the following statuses:
RUNNING
, SUBMITTED
or PENDING_SUBMIT
.
Returns:
-
tuple[Job]
–The Jobs that have a non-terminal status.
Source code in exauq/sim_management/simulators.py
get_records(job_ids=None, statuses=None)
Return records based on given job IDs and job status codes.
This method retrieves simulation job records from the simulations log based on specified job IDs and/or job status codes. If no filters are provided, all records are returned. The method ensures thread safety during record retrieval.
Parameters:
-
job_ids
(Sequence[Union[str, JobId, int]]
, default:None
) –A sequence of job IDs to filter the records. If
None
, records are not filtered based on job IDs. Default isNone
. -
statuses
(Sequence[JobStatus]
, default:None
) –A sequence of
JobStatus
values to filter the records. IfNone
, records are not filtered based on status. Default isNone
.
Returns:
-
list[dict[str, Any]]
–A list of dictionaries, where each dictionary represents a job record with the following keys:
- 'job_id' (JobId): The unique identifier of the job.
- 'status' (JobStatus): The current status of the job.
- 'input' (Input): The input associated with the simulation job.
- 'output' (Optional[Real]): The output of the simulation, or
None
if not yet available.
Examples:
Retrieve all job records:
Retrieve records for specific job IDs:
Retrieve records with specific statuses:
Retrieve records with specific job IDs and statuses:
Notes
- This method is thread-safe, ensuring consistent results when accessed concurrently.
- If both
job_ids
andstatuses
are provided, records must match both filters to be included.
Source code in exauq/sim_management/simulators.py
get_simulations()
Get all simulations contained in the log file.
This returns an immutable sequence of simulator inputs, outputs and their
corresponding level. In the case where the simulator output is not available
for the corresponding input, None
is instead returned alongside the input.
Returns:
-
tuple[tuple[Input, Optional[Real], int]]
–A tuple of
(x, y, z)
, wherex
is anInput
,y
is the simulation output, orNone
if this hasn't yet been computed andz
is the level of the simulation.
Source code in exauq/sim_management/simulators.py
get_unsubmitted_inputs()
Get all simulator inputs that have not been submitted as jobs.
Identifies inputs that are marked as 'PENDING_SUBMIT' in the simulation database, signaling they have not been dispatched for execution.
Returns:
-
tuple[Input]
–The inputs that have not been submitted as jobs.
Source code in exauq/sim_management/simulators.py
insert_result(job_id, result)
Insert the output of a simulation into a job record in the simulations log file.
Parameters:
-
job_id
(Union[str, JobId]
) –The ID of the job that the
result
should be added to. -
result
(Real
) –The output of a simulation.
Raises:
-
SimulationsLogLookupError
–If there isn't a log record having job ID
job_id
.
Source code in exauq/sim_management/simulators.py
prepare_training_data()
Transform the simulations log into feasible training data for an mlgp.
This quality of life function allows the user to have a direct route from the simulation log within the job management side of the Toolbox, to a set of training data for fitting to a mlgp.
Returns:
-
MultiLevel[Sequence[TrainingDatum]]
–The prepared training data for the mlgp.
Source code in exauq/sim_management/simulators.py
update_job_status(job_id, new_status)
Updates the status of a job in the simulations log.
This method updates the job status for a given job ID in the simulation log database.
It ensures thread safety by locking the operation. If the job ID does not exist it
raises a SimulationsLogLookupError
.
Parameters:
-
job_id
(Union[str, JobId]
) –The unique identifier of the job whose status is to be updated.
-
new_status
(JobStatus
) –The new status to be assigned to the job. This must be an instance of the
JobStatus
enum.
Raises:
-
SimulationsLogLookupError
–If there isn't a log record having job ID
job_id
.
Examples:
Suppose we have a job with ID '12345' that we want to mark as completed. We would call the method as follows:
If the job ID '12345' does not exist in the log, a SimulationsLogLookupError
will be raised.
Notes
This method is thread-safe and can be called concurrently from multiple threads without causing data corruption or race conditions.
Source code in exauq/sim_management/simulators.py
SimulationsLogLookupError
SubmittedJobStrategy
Bases: JobStrategy
Strategy for handling jobs that have been submitted.
Upon handling, this strategy updates the job's status in the simulations log to SUBMITTED and initiates monitoring of the job. This ensures that once a job is submitted, its status is accurately recorded, and the job is actively monitored for completion or failure.
Parameters:
-
job
(Job
) –The job that has been submitted for execution.
-
job_manager
(JobManager
) –The manager overseeing the job's lifecycle, responsible for its submission, monitoring, and logging.
Source code in exauq/sim_management/simulators.py
UnknownJobIdError
Bases: Exception
Raised when a job ID does not correspond to a job.