Skip to content

hardware

Provides classes and utilities for managing hardware interfaces and job execution, primarily over SSH. This module abstracts hardware interactions, enabling consistent APIs for job submission, monitoring, and control across various systems, including Unix servers.

Job Status (Enums)

JobStatus Represents various states a job can occupy, such as COMPLETED, FAILED, RUNNING, and more.

Hardware Interfaces (Abstract Base Classes)

HardwareInterface Abstract base class defining the interface for interacting with hardware resources.

SSHInterface Abstract class extending HardwareInterface to manage remote hardware via SSH connections.

Hardware Interfaces (Concrete Classes)

UnixServerScriptInterface Concrete implementation of SSHInterface for executing simulation scripts on Unix servers.

PENDING_STATUSES = {JobStatus.PENDING_CANCEL, JobStatus.PENDING_SUBMIT} module-attribute

Statuses that define jobs that are waiting to be run or cancelled.

TERMINAL_STATUSES = {JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.FAILED_SUBMIT, JobStatus.CANCELLED} module-attribute

Statuses that define jobs that are no longer running (possibly due to error).

HardwareInterface

Bases: ABC

Abstract base class for a hardware interface.

This class defines the abstract methods that any hardware interface should implement, providing a consistent API for interacting with different types of hardware resources.

The HardwareInterface class is not specific to any type of hardware or infrastructure. It can be extended to provide interfaces for various types of hardware resources, such as supercomputers, GPU clusters, servers, personal laptops, or even potatoes! Whether the hardware is local or remote is also abstracted away by this interface.

The goal is to provide a unified way to submit jobs, query job status, fetch job output, and cancel jobs across all types of hardware resources. This enables writing hardware-agnostic code for running simulations or performing other computational tasks.

Instances of this class must be initialised with two attributes: level and name. level is an integer that represents the level of the hardware interface, and name is a string that can be used to name or label the hardware interface. The level can indicate the complexity or hierarchy of the interface, and the name is typically expected to be unique amongst interfaces.

Implementations should provide the following methods: - submit_job - get_job_status - get_job_output - cancel_job

Attributes:

name : str The name of the hardware interface. level : int, optional The level of the hardware interface, indicating its complexity or hierarchy. Defaults to 1.

Source code in exauq/sim_management/hardware.py
class HardwareInterface(ABC):
    """
    Abstract base class for a hardware interface.

    This class defines the abstract methods that any hardware interface should implement,
    providing a consistent API for interacting with different types of hardware resources.

    The HardwareInterface class is not specific to any type of hardware or infrastructure.
    It can be extended to provide interfaces for various types of hardware resources,
    such as supercomputers, GPU clusters, servers, personal laptops, or even potatoes!
    Whether the hardware is local or remote is also abstracted away by this interface.

    The goal is to provide a unified way to submit jobs, query job status, fetch job
    output, and cancel jobs across all types of hardware resources. This enables
    writing hardware-agnostic code for running simulations or performing other
    computational tasks.

    Instances of this class must be initialised with two attributes: `level` and `name`.
    `level` is an integer that represents the level of the hardware interface, and `name`
    is a string that can be used to name or label the hardware interface. The level can
    indicate the complexity or hierarchy of the interface, and the name is typically expected
    to be unique amongst interfaces.

    Implementations should provide the following methods:
    - submit_job
    - get_job_status
    - get_job_output
    - cancel_job

    Attributes:
    ----------
    name : str
        The name of the hardware interface.
    level : int, optional
        The level of the hardware interface, indicating its complexity or hierarchy.
        Defaults to ``1``.
    """

    def __init__(self, name: str, level: int = 1):
        self._name = name
        self._level = level

    @property
    def level(self) -> int:
        """(Read-only) The level of the hardware interface."""
        return self._level

    @property
    def name(self) -> Optional[str]:
        """(Read-only) The name of the hardware interface."""
        return self._name

    @abstractmethod
    def submit_job(self, job: Job):
        raise NotImplementedError

    @abstractmethod
    def get_job_status(self, job_id: JobId):
        raise NotImplementedError

    @abstractmethod
    def get_job_output(self, job_id: JobId):
        raise NotImplementedError

    @abstractmethod
    def cancel_job(self, job_id: JobId):
        raise NotImplementedError

level: int property

(Read-only) The level of the hardware interface.

name: Optional[str] property

(Read-only) The name of the hardware interface.

HardwareInterfaceFailureError

Bases: Exception

Raised when an error was encountered when running a command or communicating with a machine.

Source code in exauq/sim_management/hardware.py
class HardwareInterfaceFailureError(Exception):
    """Raised when an error was encountered when running a command or communicating with a
    machine."""

    pass

JobStatus

Bases: Enum

Represents the statuses of jobs that can arise when running jobs.

It is possible for multiple statuses hold at the same time. For example, a job can both have been submitted (SUBMITTED) and also have been run and exited with error (FAILED).

Source code in exauq/sim_management/hardware.py
class JobStatus(Enum):
    """Represents the statuses of jobs that can arise when running jobs.

    It is possible for multiple statuses hold at the same time. For example, a job
    can both have been submitted (`SUBMITTED`) and also have been run and exited with
    error (`FAILED`).
    """

    COMPLETED = "Completed"
    """A job has run to completion on a remote machine without error. Has the value
    'Completed'."""

    FAILED = "Failed"
    """A job has been run on a remote machine but has exited with an error. Has the
    value 'Failed'."""

    RUNNING = "Running"
    """A job is running on a remote machine but has not yet finished. Has the value
    'Running'."""

    SUBMITTED = "Submitted"
    """A job has been successfully sent to a remote machine. Has the value 'Submitted'."""

    PENDING_SUBMIT = "Pending submit"
    """A job has been set up locally but has not yet been submitted to a remote
    machine. Has the value 'Pending submit'."""

    FAILED_SUBMIT = "Failed submit"
    """A job has been set up locally however submission to remote machine failed.
    Has the value 'Failed submit'."""

    CANCELLED = "Cancelled"
    """A job has been cancelled from a locally issued request or intervention. Has the
    value 'Cancelled'."""

    PENDING_CANCEL = "Pending cancel"
    """A job has been requested to be cancelled but has not yet been cancelled. Has the 
    value 'Pending cancel'."""

    FAILED_CANCEL = "Failed cancel"
    """A job has been requested to be cancelled but the cancellation failed. Has the
    value 'Failed cancel'."""

CANCELLED = 'Cancelled' class-attribute instance-attribute

A job has been cancelled from a locally issued request or intervention. Has the value 'Cancelled'.

COMPLETED = 'Completed' class-attribute instance-attribute

A job has run to completion on a remote machine without error. Has the value 'Completed'.

FAILED = 'Failed' class-attribute instance-attribute

A job has been run on a remote machine but has exited with an error. Has the value 'Failed'.

FAILED_CANCEL = 'Failed cancel' class-attribute instance-attribute

A job has been requested to be cancelled but the cancellation failed. Has the value 'Failed cancel'.

FAILED_SUBMIT = 'Failed submit' class-attribute instance-attribute

A job has been set up locally however submission to remote machine failed. Has the value 'Failed submit'.

PENDING_CANCEL = 'Pending cancel' class-attribute instance-attribute

A job has been requested to be cancelled but has not yet been cancelled. Has the value 'Pending cancel'.

PENDING_SUBMIT = 'Pending submit' class-attribute instance-attribute

A job has been set up locally but has not yet been submitted to a remote machine. Has the value 'Pending submit'.

RUNNING = 'Running' class-attribute instance-attribute

A job is running on a remote machine but has not yet finished. Has the value 'Running'.

SUBMITTED = 'Submitted' class-attribute instance-attribute

A job has been successfully sent to a remote machine. Has the value 'Submitted'.

SSHInterface

Bases: HardwareInterface, ABC

SSH Interface to manage and submit jobs. Inherits from the HardwareInterface.

The SSHInterface class provides an interface for interacting with hardware over SSH. It can authenticate using either a key file, an SSH config path, or via an SSH agent. If none of these methods are provided, it will prompt for a password.

Parameters:

  • user (str) –

    The username to authenticate with the SSH server.

  • host (str) –

    The hostname or IP address of the SSH server.

  • name (str) –

    The name of the SSH interface.

  • level (int, default: 1 ) –

    The level of the SSH interface. Defaults to 1.

  • key_filename (FilePath, default: None ) –

    The path to the SSH private key file to authenticate with the SSH server.

  • ssh_config_path (FilePath, default: None ) –

    The path to the SSH configuration file.

  • use_ssh_agent (bool, default: False ) –

    If True, use SSH agent for authentication. Defaults to False.

  • max_attempts (int, default: 3 ) –

    The number of authentication attempts allowed. Defaults to 3.

Raises:

  • ValueError

    If more than one method of authentication is provided.

Source code in exauq/sim_management/hardware.py
class SSHInterface(HardwareInterface, ABC):
    """
    SSH Interface to manage and submit jobs. Inherits from the HardwareInterface.

    The SSHInterface class provides an interface for interacting with hardware over
    SSH. It can authenticate using either a key file, an SSH config path, or via an SSH
    agent. If none of these methods are provided, it will prompt for a password.

    Parameters
    ----------
    user : str
        The username to authenticate with the SSH server.
    host : str
        The hostname or IP address of the SSH server.
    name : str
        The name of the SSH interface.
    level : int, optional
        The level of the SSH interface. Defaults to ``1``.
    key_filename : exauq.sim_management.types.FilePath, optional
        The path to the SSH private key file to authenticate with the SSH server.
    ssh_config_path : exauq.sim_management.types.FilePath, optional
        The path to the SSH configuration file.
    use_ssh_agent : bool, optional
        If ``True``, use SSH agent for authentication. Defaults to ``False``.
    max_attempts : int, optional
        The number of authentication attempts allowed. Defaults to ``3``.

    Raises
    ------
    ValueError
        If more than one method of authentication is provided.
    """

    def __init__(
        self,
        user: str,
        host: str,
        name: str,
        level: int = 1,
        key_filename: Optional[FilePath] = None,
        ssh_config_path: Optional[FilePath] = None,
        use_ssh_agent: Optional[bool] = False,
        max_attempts: int = 3,
    ):
        super().__init__(name, level)
        self._user = user
        self._host = host
        self.max_attempts = max_attempts

        # Check if more than one method is provided
        if (
            sum(
                [
                    key_filename is not None,
                    ssh_config_path is not None,
                    use_ssh_agent,
                ]
            )
            > 1
        ):
            raise ValueError(
                "Only one method of authentication should be provided. Please specify either "
                "'key_filename', 'ssh_config_path' or set 'use_ssh_agent' to True."
            )

        if key_filename is not None:
            self._conn = Connection(
                f"{user}@{host}", connect_kwargs={"key_filename": str(key_filename)}
            )
        elif ssh_config_path is not None:
            ssh_config = Config(overrides={"ssh_config_path": str(ssh_config_path)})
            self._conn = Connection(host, config=ssh_config)
        elif use_ssh_agent:
            self._conn = Connection(f"{user}@{host}")
        else:
            self._init_with_password(user, host)

        self._check_connection()

    def _check_connection(self):
        try:
            self._conn.run('echo "Testing connection"', hide=True)
            print(f"Connection to {self._conn.original_host} established.")
            return

        except Exception as e:
            message = f"Could not connect to {self._conn.original_host}: {str(e)}"
            raise Exception(message) from None

    def _init_with_password(self, user: str, host: str):
        for attempt in range(1, self.max_attempts + 1):
            password = getpass.getpass(prompt=f"Password for {user}@{host}: ")
            try:
                self._conn = Connection(
                    f"{user}@{host}", connect_kwargs={"password": password}
                )
                # Check connection by running a simple command
                self._conn.run('echo "Testing connection"', hide=True)
                return

            except AuthenticationException:  # Catch the specific exception
                if attempt < self.max_attempts:  # Don't say this on the last attempt
                    print("Failed to authenticate. Please try again.")
                else:
                    print("Maximum number of attempts exceeded.")
                    raise
            except SSHException as e:
                print(f"Could not connect to {self._conn.original_host}: {str(e)}")
                raise  # Re-raise the exception

    @property
    def user(self) -> str:
        """(Read-only) The username to authenticate with the SSH server."""
        return self._user

    @property
    def host(self) -> str:
        """(Read-only) The hostname or IP address of the SSH server."""
        return self._host

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._conn.close()

host: str property

(Read-only) The hostname or IP address of the SSH server.

user: str property

(Read-only) The username to authenticate with the SSH server.

SimulatorOutputParsingError

Bases: Exception

Raised when the output from a simulator cannot be parsed as a floating point number.

Source code in exauq/sim_management/hardware.py
class SimulatorOutputParsingError(Exception):
    """Raised when the output from a simulator cannot be parsed as a floating point
    number."""

    pass

UnixServerScriptInterface

Bases: SSHInterface

Interface for running a simulation script on a Unix server over SSH.

This interface is designed to invoke simulation code on a remote server with the following command:

<program> <script_path> path/to/simulation-input.csv path/to/simulation-output.txt

Here, <program> is a program that can run the simulator script, such as bash, python, Rscript, etc. The first argument to the script is the path to a CSV file containing the simulation input data, while the second argument is the path to a text file that the script will write the simulation output to.

Objects of this class will take care of creating the input CSV file from a Job, uploading this to a job-specific subdirectory of a remote 'workspace' directory. This job-specific directory is also where the output of the simulator script is written to, along with a file capturing standard output and standard error from running the script. Note that any required intermediary directories are created on the server.

The remote workspace directory can be specified as part of initialisation. If a a pre-existing directory is supplied, then the details of any existing jobs already recorded in the workspace directory will be retrieved and cached. If a workspace directory is not specified, then a new directory will be created alongside the main simulation script with name'exauqXXXXX' where 'XXXXX' is a uniquely generated string of characters created via the mktemp command on Unix systems.

If key_filename and ssh_config_path are not provided and use_ssh_agent is False then a prompt for a password from standard input will be issued each time a connection is made to the server.

Parameters:

  • user (str) –

    The username to authenticate with the SSH server.

  • host (str) –

    The hostname or IP address of the SSH server.

  • program (str) –

    The program to run on the server.

  • script_path (FilePath) –

    The path to the script on the Unix server to run with program.

  • name (str) –

    The name of the interface.

  • level (int, default: 1 ) –

    (Default: 1) The level of the interface.

  • workspace_dir (FilePath, default: None ) –

    (Default: None) A path to a directory on the Unix server where job-specific subdirectories should be created. Relative paths will be relative to the default working directory for a new SSH session (usually the user's home directory). If None then a new directory will be created alongside the script defined by script_path.

  • key_filename (FilePath, default: None ) –

    (Default: None) The path to an SSH private key file to authenticate with the SSH server. The key file must be unencrypted.

  • ssh_config_path (FilePath, default: None ) –

    (Default: None) The path to an SSH configuration file.

  • use_ssh_agent (bool, default: False ) –

    (Default: False) If True, use a running SSH agent for authentication.

  • max_attempts (int, default: 3 ) –

    (Default: 3) The number of authentication attempts allowed.

Attributes:

  • workspace_dir ((str, optional)) –

    (Read-only) The directory within which details of jobs are recorded, or None if this is unknown at the time of calling.

Raises:

  • ValueError

    If more than one method of authentication is provided.

  • HardwareInterfaceFailureError:

    If there were problems connecting to the server, establishing the existence of the workspace directory, or other such server-related issues.

Source code in exauq/sim_management/hardware.py
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
class UnixServerScriptInterface(SSHInterface):
    """Interface for running a simulation script on a Unix server over SSH.

    This interface is designed to invoke simulation code on a remote server with the
    following command:

    ```
    <program> <script_path> path/to/simulation-input.csv path/to/simulation-output.txt
    ```

    Here, ``<program>`` is a program that can run the simulator script, such as ``bash``,
    ``python``, ``Rscript``, etc. The first argument to the script is the path to a CSV
    file containing the simulation input data, while the second argument is the path to a
    text file that the script will write the simulation output to.

    Objects of this class will take care of creating the input CSV file from a `Job`,
    uploading this to a job-specific subdirectory of a remote 'workspace' directory. This
    job-specific directory is also where the output of the simulator script is written to,
    along with a file capturing standard output and standard error from running the
    script. Note that any required intermediary directories are created on the server.

    The remote workspace directory can be specified as part of initialisation. If a a
    pre-existing directory is supplied, then the details of any existing jobs already
    recorded in the workspace directory will be retrieved and cached. If a workspace
    directory is not specified, then a new directory will be created alongside the main
    simulation script with name'exauqXXXXX' where 'XXXXX' is a uniquely generated string
    of characters created via the ``mktemp`` command on Unix systems.

    If `key_filename` and `ssh_config_path` are not provided and `use_ssh_agent` is
    ``False`` then a prompt for a password from standard input will be issued each time a
    connection is made to the server.

    Parameters
    ----------
    user : str
        The username to authenticate with the SSH server.
    host : str
        The hostname or IP address of the SSH server.
    program : str
        The program to run on the server.
    script_path : exauq.sim_management.types.FilePath
        The path to the script on the Unix server to run with `program`.
    name : str
        The name of the interface.
    level : int, optional
        (Default: 1) The level of the interface.
    workspace_dir : exauq.sim_management.types.FilePath, optional
        (Default: None) A path to a directory on the Unix server where job-specific
        subdirectories should be created. Relative paths will be relative to the default
        working directory for a new SSH session (usually the user's home directory). If
        ``None`` then a new directory will be created alongside the script defined by
        `script_path`.
    key_filename : exauq.sim_management.types.FilePath, optional
        (Default: None) The path to an SSH private key file to authenticate with the SSH
        server. The key file must be unencrypted.
    ssh_config_path : exauq.sim_management.types.FilePath, optional
        (Default: None) The path to an SSH configuration file.
    use_ssh_agent : bool, optional
        (Default: False) If ``True``, use a running SSH agent for authentication.
    max_attempts : int, optional
        (Default: 3) The number of authentication attempts allowed.

    Attributes
    ----------
    workspace_dir : str, optional
        (Read-only) The directory within which details of jobs are recorded, or None
        if this is unknown at the time of calling.

    Raises
    ------
    ValueError
        If more than one method of authentication is provided.
    HardwareInterfaceFailureError:
        If there were problems connecting to the server, establishing the existence of
        the workspace directory, or other such server-related issues.
    """

    _bash = "/bin/bash"
    _manager_script_name = "exauq_manager.sh"
    _runner_script_name = "runner.sh"

    def __init__(
        self,
        user: str,
        host: str,
        program: str,
        script_path: FilePath,
        name: str,
        level: int = 1,
        workspace_dir: Optional[FilePath] = None,
        key_filename: Optional[FilePath] = None,
        ssh_config_path: Optional[FilePath] = None,
        use_ssh_agent: Optional[bool] = False,
        max_attempts: int = 3,
    ):
        super().__init__(
            user,
            host,
            name,
            level,
            key_filename,
            ssh_config_path,
            use_ssh_agent,
            max_attempts,
        )
        self._user = user
        self._host = host
        self._user_at_host = f"{self._user}@{self._host}"
        self._program = program
        self._script_path = pathlib.PurePosixPath(script_path)
        self._workspace_dir = (
            pathlib.PurePosixPath(workspace_dir) if workspace_dir is not None else None
        )
        if not self._remote_dir_exists(workspace_dir):
            self._make_workspace_dir()
            self._job_log = dict()
        else:
            self._job_log = self._initialise_job_log_from_server()

    def _remote_dir_exists(self, path: Union[str, pathlib.PurePosixPath, None]) -> bool:
        """Whether a directory at the given path exists on the server."""

        if path is None:
            return False
        else:
            flag = "EXISTS"
            try:
                result = self._run_remote_command(
                    f"if [ -d {path} ]; then echo {flag}; fi"
                )
            except Exception as e:
                raise HardwareInterfaceFailureError(
                    f"Could not establish existence of workspace directory {self.workspace_dir} "
                    f"for {self._user_at_host}: {e}"
                )
            return flag == result

    def _initialise_job_log_from_server(self) -> dict[str, dict[str, Any]]:
        """Populate the job log with details of existing jobs that have been submitted to
        the server."""

        # Check whether there are any jobs already submitted
        job_ids = self._fetch_remote_job_ids()

        return {
            job_id: self._make_job_settings(job_id, status=JobStatus.SUBMITTED)
            for job_id in job_ids
        }

    def _fetch_remote_job_ids(self) -> tuple[JobId, ...]:
        """Get IDs of jobs that have been submitted to the server.

        Gathers the names of directories directly below the workspace directory that
        have integer names and contain a manager script.
        """
        # List paths to job manager scripts in directories directly below the workspace
        # directory
        no_job_ids_flag = "NO_JOBIDS"
        cmd = f"cd {self.workspace_dir} && find . | grep -G '^\\./[0-9]*/{self._manager_script_name}$' || echo {no_job_ids_flag}"
        try:
            job_manager_paths_str = self._run_remote_command(cmd)
        except Exception as e:
            raise HardwareInterfaceFailureError(
                f"Could not fetch job IDs from workspace directory {self.workspace_dir} "
                f"for {self._user_at_host}: {e}"
            )

        if not job_manager_paths_str == no_job_ids_flag:
            # Extract the job IDs as names of directories containing the job manager scripts
            job_manager_paths = [
                pathlib.PurePosixPath(path) for path in job_manager_paths_str.split("\n")
            ]
            return tuple(JobId(path.parent.name) for path in job_manager_paths)
        else:
            return tuple()

    def _make_job_settings(
        self, job_id: JobId, status: JobStatus = JobStatus.PENDING_SUBMIT
    ) -> dict[str, Any]:
        """Make settings for specifying a job on the server.

        It is expected that the resulting dict will constitute an entry in the job log.
        """

        job_remote_dir = self._workspace_dir / str(job_id)
        job_manager_path = job_remote_dir / self._manager_script_name
        runner_path = job_remote_dir / self._runner_script_name
        input_data_path = pathlib.PurePosixPath(job_remote_dir, "input.csv")
        script_output_path = str(job_remote_dir / "output.txt")
        script_stdout_path = job_remote_dir / f"{self._script_path.name}.out"
        return {
            "status": status,
            "job_remote_dir": job_remote_dir,
            "runner": runner_path,
            "job_manager": job_manager_path,
            "input_data_path": input_data_path,
            "script_output_path": script_output_path,
            "script_stdout_path": script_stdout_path,
            "output": None,
        }

    @property
    def workspace_dir(self) -> Optional[str]:
        """(Read-only) The directory within which details of jobs are recorded, or None
        if this is unknown at the time of calling (e.g. because it hasn't been created
        yet)."""

        return str(self._workspace_dir) if self._workspace_dir is not None else None

    def submit_job(self, job: Job, resubmit: bool = False) -> None:
        """Submit a job for the simulation code.

        Upon submission, a new subdirectory of the remote workspace directory is created
        for the job, using the job's ID as the directory name. (The workspace directory
        will be created as well if it doesn't already exist.) A CSV file containing the
        simulation input data for the job is uploaded to this directory.

        A Bash script is also uploaded to the job's directory, which is responsible for
        managing the job; it is through this script that the job can be started, cancelled
        or its status retrieved.

        If a job with the same ID has already been submitted to the server and `resubmit`
        is ``False``, then an error will be raised. Only jobs that have completed, be it
        successfully or ending in failure, or that have been cancelled, may be
        resubmitted. In this case, calling with `resubmit` set to ``True`` will delete any
        existing remote-side data for the corresponding job directory and then submit the
        supplied `job`.

        Parameters
        ----------
        job : Job
            A job containing the data to run the simulation code with.
        resubmit : bool
            (Default: False) Whether the job is being resubmitted, i.e. whether to delete
            any existing remote-side data for job before submission.

        Raises
        ------
        ValueError
            If a job with the same ID has already been submitted and ``resubmit = False``,
            or if ``resubmit = True`` and a job with the same ID has not completed.
        HardwareInterfaceFailure
            If there were problems connecting to the server, making files / directories on
            the server or other such server-related problems.
        """

        if resubmit:
            # Clear the artifacts from the remote side, as long as the job is not running
            # or marked as submitted.
            if (status := self.get_job_status(job.id)) not in {
                JobStatus.SUBMITTED,
                JobStatus.RUNNING,
            }:
                self.delete_remote_job_dir(job.id)
                del self._job_log[job.id]
            else:
                raise ValueError(
                    f"Cannot resubmit job with ID {job.id} as job status is '{status.value}'. "
                    "(Cancel the job before resubmitting.)"
                )

        elif self._job_has_been_submitted(job.id):
            raise ValueError(
                f"Cannot submit job with ID {job.id}: a job with the same ID has already "
                f"been submitted."
            )

        # Create the settings for the new job
        job_settings = self._make_job_settings(job.id)

        # Make job-specific remote workspace directory (will raise error if directory
        # already exists)
        self._make_directory_on_remote(job_settings["job_remote_dir"])

        # Put simulator input data onto server
        data_str = ",".join(map(str, job.data)) + "\n"
        self._make_text_file_on_remote(data_str, job_settings["input_data_path"])

        # Create runner script and manager script and put onto server
        runner_script = self._make_runner_script(
            job_settings["job_remote_dir"],
            job_settings["input_data_path"],
            job_settings["script_output_path"],
        )
        manager_script = self._make_manager_script(
            job_settings["job_remote_dir"],
            job_settings["runner"],
            job_settings["script_stdout_path"],
            job_settings["script_output_path"],
        )
        self._make_text_file_on_remote(runner_script, job_settings["runner"])
        self._make_text_file_on_remote(manager_script, job_settings["job_manager"])

        # Start job
        try:
            _ = self._run_remote_command(
                f"{self._bash} {job_settings['job_manager']} start"
            )
        except Exception as e:
            raise HardwareInterfaceFailureError(
                f"Could not start job with id {job.id} on {self._user_at_host}: {e}"
            )

        # Mark job as submitted and store settings in job log
        job_settings["status"] = JobStatus.SUBMITTED
        self._job_log[job.id] = job_settings

        return None

    def _make_workspace_dir(self):
        """Make the server-side workspace directory.

        If the path to a directory was provided explicitly during object initialisation,
        then create that directory on the server. Otherwise, create a default directory
        alongside the simulator script. The name of the directory will be of the form
        'exauqXXXXX' where 'XXXXX' is a uniquely generated string of characters created
        via the ``mktemp`` command on Unix systems."""

        if self.workspace_dir is None:
            try:
                workspace_dir_str = self._run_remote_command(
                    f"mktemp -d -p {self._script_path.parent} exauqXXXXX"
                )
                self._workspace_dir = pathlib.PurePosixPath(workspace_dir_str)
            except Exception as e:
                raise HardwareInterfaceFailureError(
                    f"Could not create workspace directory in {self._script_path.parent} "
                    f"for {self._user_at_host}: {e}"
                )
            return None
        else:
            self._make_directory_on_remote(self._workspace_dir, make_parents=True)
            return None

    def _make_runner_script(
        self,
        job_remote_dir: Union[str, pathlib.PurePosixPath],
        input_path: Union[str, pathlib.PurePosixPath],
        output_path: Union[str, pathlib.PurePosixPath],
    ) -> str:
        """Create the text for a script that runs the simulation script."""
        template_str = r"""
        #!/bin/bash

        # Run script and create new COMPLETED flag file upon successful execution and
        # presence of output file.
        #PY_PROGRAM #PY_SCRIPT #PY_INPUT_PATH #PY_OUTPUT_PATH && if [ -e #PY_OUTPUT_PATH ]; then touch #PY_JOB_DIR/COMPLETED; fi

        """

        template_str = template_str[1:]  # remove leading newline character
        template = _Template(textwrap.dedent(template_str))
        return template.substitute(
            {
                "JOB_DIR": str(job_remote_dir),
                "SCRIPT": str(self._script_path),
                "PROGRAM": str(self._program),
                "INPUT_PATH": str(input_path),
                "OUTPUT_PATH": str(output_path),
            }
        )

    def _make_manager_script(
        self,
        job_remote_dir: Union[str, pathlib.PurePosixPath],
        runner_path: Union[str, pathlib.PurePosixPath],
        stdout_path: Union[str, pathlib.PurePosixPath],
        output_path: Union[str, pathlib.PurePosixPath],
    ) -> str:
        """Create the text for the server-side job management Bash program."""

        template_str = r"""
        #!/bin/bash

        # This script provides an interface for working with processes -- a kind of
        # very basic 'job' manager (where 'job' means a process and collection of
        # subprocesses, not a 'job' as would be worked with using e.g. the jobs program.)
        #
        # Arg 1: One of: start, status, stop.

        job_dir=#PY_JOB_DIR
        runner=#PY_RUNNER
        script_stout_sterr=#PY_STDOUT_PATH
        script_output=#PY_OUTPUT_PATH
        pid_file="${job_dir}/PID"
        pgid_file="${job_dir}/PGID"
        jobid_file="${job_dir}/JOBID"
        stopped_flag_file="${job_dir}/STOPPED"
        completed_flag_file="${job_dir}/COMPLETED"
        failed_flag_file="${job_dir}/FAILED"

        FAILED_JOBID=",,"

        # Print an error message and exit with nonzero status.
        # Arg 1: String containing details of the error.
        error() {
            echo -e "${0}: error: ${1}" >&2
            exit 1
        }

        # Check that the current shell is set up in the required way for this script.
        check_system() {
            if ! [[ "$SHELL" =~ /bash$ ]]
            then
                error "must be running in a Bash shell"
            elif ! which pkill > /dev/null 2>&1
            then
                error "required command 'pkill' not available on system"
            elif ! which mktemp > /dev/null 2>&1
            then
                error "required command 'mktemp' not available on system"
            fi
        }

        # Run a job in the background and capture the PID of the process
        run_job() {
            nohup /bin/bash "$runner" >& "$script_stout_sterr" < /dev/null &
            runner_pid=$!
            echo $runner_pid | tr -d '[:space:]' > $pid_file
            ps -p $runner_pid -o pgid= | tr -d '[:space:]' > $pgid_file
        }

        # Get a unique identifier for a process, utilising the PID, start time (long
        # format) and user.
        # Arg1: a pid
        get_process_identifier() {
            echo "$(ps -p "${1}" -o user=),$(ps -p "${1}" -o pid=),$(ps -p "${1}" -o lstart=)"
        }

        PENDING_SUBMIT="PENDING_SUBMIT"
        RUNNING="RUNNING"
        STOPPED="STOPPED"
        COMPLETED="COMPLETED"
        FAILED="FAILED"

        record() {
            case $1 in
            "$STOPPED")
                touch $stopped_flag_file;;
            "$COMPLETED")
                touch $completed_flag_file;;
            "$FAILED")
                touch $failed_flag_file;;
            *)
                error "in function record: unsupported arg '${1}'";;
            esac
        }

        # Start the job and capture an ID for it.
        start_job() {
            run_job
            job_pid=$(cat $pid_file)
            jobid=$(get_process_identifier "${job_pid}")

            # If identifier is empty this implies the process is no-longer running,
            # which almost certainly suggests there was an error.
            if [ "$jobid" = "$FAILED_JOBID" ] && [ ! -e "$script_output" ]
            then
                record $FAILED
                error "script failed to run:\n$(cat ${script_stout_sterr})"
            fi
            echo "$jobid" > $jobid_file
        }


        # Get the status of a job
        get_status() {
            if [ ! -e $pid_file ]
            then
                echo $PENDING_SUBMIT
            elif [ -e $stopped_flag_file ]
            then
                echo $STOPPED
            elif [ -e $completed_flag_file ]
            then
                echo $COMPLETED
            elif [ -e $failed_flag_file ] || [ ! -e $jobid_file ]
            then
                echo $FAILED
            else
                # If here then the job was last known to be running
                job_id=$(cat $jobid_file)
                job_pid=$(cat $pid_file)
                current_id=$(get_process_identifier "${job_pid}")
                if [ ! "$job_id" = "$FAILED_JOBID" ] && [ "$job_id" = "$current_id" ]
                then
                    echo $RUNNING
                elif [ -e "$script_output" ]
                then
                    # For a job to be completed, it must not be running and have an
                    # output file.
                    record $COMPLETED
                    echo $COMPLETED
                else
                    record $FAILED
                    echo $FAILED
                fi
            fi
        }

        # Stop (cancel) a job by killing all processes within its group.
        stop_job() {
            status=$(get_status)
            if [ "$status" = "$RUNNING" ]
            then
                if xargs pkill -g < $pgid_file
                then
                    record $STOPPED
                fi
            fi
        }

        # Dispatch on command line arg
        check_system
        case $1 in
        start)
            start_job;;
        stop)
            stop_job;;
        status)
            get_status;;
        *)
            error "unsupported arg '${1}'";;
        esac

        """

        template_str = template_str[1:]  # remove leading newline character
        template = _Template(textwrap.dedent(template_str))
        return template.substitute(
            {
                "JOB_DIR": str(job_remote_dir),
                "RUNNER": str(runner_path),
                "STDOUT_PATH": str(stdout_path),
                "OUTPUT_PATH": str(output_path),
            }
        )

    def _make_directory_on_remote(
        self, path: Union[str, pathlib.PurePosixPath], make_parents: bool = False
    ) -> None:
        """Make a directory at the given path on the remote machine.

        If the directory already exists, then this will be left untouched without error if
        `make_parents` is ``True``, whereas an error will be raised if `make_parents` is
        ``False``. If `make_parents` is ``True`` then intermediary directories will be
        created as required (by calling ``mkdir`` with the ``-p`` option).
        """

        mkdir_command = (
            f"mkdir {path}" if not make_parents else f"[ -d {path} ] || mkdir -p {path}"
        )
        try:
            _ = self._run_remote_command(mkdir_command)
        except Exception as e:
            raise HardwareInterfaceFailureError(
                f"Could not make directory {path} for {self._user_at_host}: {e}"
            )
        return None

    def _make_text_file_on_remote(
        self, file_contents: str, target_path: Union[str, pathlib.PurePosixPath]
    ) -> None:
        """Make a text file on the remote machine with a given string as contents."""

        try:
            _ = self._conn.put(
                io.StringIO(file_contents),
                remote=str(target_path),
            )
        except Exception as e:
            raise HardwareInterfaceFailureError(
                f"Could not create text file at {target_path} for "
                f"{self._user_at_host}: {e}"
            )
        return None

    def _run_remote_command(self, command: str) -> str:
        """Run a shell command and return the resulting contents of standard output.

        The contents of standard output is stripped of leading/trailing whitespace before
        returning."""

        res = self._conn.run(command, hide=True)
        return str(res.stdout).strip()

    def get_job_status(self, job_id: JobId) -> JobStatus:
        """Get the status of a job with given job ID.

        Any jobs that have not been submitted with `submit_job` will return a status of
        `JobStatus.PENDING_SUBMIT`.

        A job that has successfully been started on the server will have a status of
        `JobStatus.RUNNING` (which, in this case, is equivalent to `JobStatus.SUBMITTED`).
        The status will remain as `JobStatus.RUNNING` until the corresponding remote
        process has stopped, at which point the status is determined as follows:

        * If an output file from the simulator has been created, then the status is
          ``JobStatus.COMPLETED``.
        * If the job was cancelled before completion, then the status is
          ``JobStatus.CANCELLED`.
        * If the job was not cancelled but no output file was created, then the status
          is `JobStatus.FAILED`. In particular, note that the exit code of the
          simulator script is not taken into account when determining whether a job has
          finished successfully or not.

        Parameters
        ----------
        job_id : JobId
            The ID of the job to check the status of.

        Returns
        -------
        JobStatus
            The status of the job.

        Raises
        ------
        HardwareInterfaceFailure
            If there were problems connecting to the server or retrieving the status of
            the job.
        """
        if not self._job_has_been_submitted(job_id):
            return JobStatus.PENDING_SUBMIT
        elif self._job_log[job_id]["status"] in {JobStatus.RUNNING, JobStatus.SUBMITTED}:
            self._update_status_from_remote(job_id)
            return self._job_log[job_id]["status"]
        else:
            return self._job_log[job_id]["status"]

    def _job_has_been_submitted(self, job_id: JobId) -> bool:
        """Whether a job with the given ID has been submitted."""

        return job_id in self._job_log

    def _update_status_from_remote(self, job_id: JobId) -> None:
        """Update the status of a job based on the status of the corresponding process on
        the server."""

        status = self._run_remote_command(
            f"{self._bash} {self._job_log[job_id]['job_manager']} status"
        )
        if status == "RUNNING":
            self._job_log[job_id]["status"] = JobStatus.RUNNING
        elif status == "COMPLETED":
            self._job_log[job_id]["status"] = JobStatus.COMPLETED
        elif status == "STOPPED":
            self._job_log[job_id]["status"] = JobStatus.CANCELLED
        else:
            self._job_log[job_id]["status"] = JobStatus.FAILED

        return None

    def _remote_job_is_running(self, job_id: JobId) -> bool:
        """Whether the remote process of a given job is running."""

        return self.get_job_status(job_id) == JobStatus.RUNNING

    def get_job_output(self, job_id: JobId) -> Optional[float]:
        """Get the simulator output for a job with the given ID.

        This is read from the contents of the simulator output file for the job, located
        in the job's remote directory. It is expected that the contents of this output
        file will be a single floating point number.

        Parameters
        ----------
        job_id : JobId
            The ID of the job to get the simulator output for.

        Returns
        -------
        Optional[float]
            The output of the simulator, if the job has completed successfully, or else
            ``None``.

        Raises
        ------
        HardwareInterfaceFailure
            If there were problems connecting to the server or retrieving the simulator
            output.
        SimulatorOutputParsingError
            If the output of the simulator cannot be parsed as a single floating point
            number.
        """
        if not self._job_has_been_submitted(job_id):
            return None

        elif self._job_log[job_id]["output"] is not None:
            return self._job_log[job_id]["output"]

        elif self.get_job_status(job_id) == JobStatus.COMPLETED:
            output_path = self._job_log[job_id]["script_output_path"]
            output = self._retrieve_output(output_path)
            try:
                self._job_log[job_id]["output"] = (
                    float(output) if output is not None else output
                )
            except ValueError:
                raise SimulatorOutputParsingError(
                    f"Could not parse simulator output {output} for job ID {job_id} as a "
                    "float."
                )
            return self._job_log[job_id]["output"]
        else:
            return None

    def _retrieve_output(
        self, remote_path: Union[str, pathlib.PurePosixPath]
    ) -> Optional[str]:
        """Get the output of a simulation from the remote server."""

        with io.BytesIO() as buffer:
            try:
                _ = self._conn.get(str(remote_path), local=buffer)
            except FileNotFoundError:
                return None
            except Exception as e:
                raise HardwareInterfaceFailureError(
                    f"Could not retrieve output of script {self._script_path} from file "
                    f"{remote_path}: {e}"
                )

            contents = buffer.getvalue().decode(encoding="utf-8")

        return contents.strip()

    def cancel_job(self, job_id: JobId) -> None:
        """Cancel the job with a given job ID.

        Cancelling a job involves terminating the server-side simulator script process
        (and any subprocesses) associated with the job. If the job is not running (i.e.
        has completed, has failed or has already been cancelled) then this method will
        return without error.

        Parameters
        ----------
        job_id : JobId
            The ID of the job to cancel.

        Raises
        ------
        HardwareInterfaceFailureError
            If there were problems connecting to the server or otherwise cancelling the
            job.
        """
        if self.get_job_status(job_id) == JobStatus.RUNNING:
            try:
                self._run_remote_command(
                    f"{self._bash} {self._job_log[job_id]['job_manager']} stop"
                )
                self._job_log[job_id]["status"] = JobStatus.CANCELLED
            except Exception as e:
                raise HardwareInterfaceFailureError(
                    f"Could not cancel job with id {job_id}: {e}"
                )
            return None
        else:
            return None

    def delete_workspace(self) -> None:
        """Delete the entire workspace directory associated with this instance.

        Warning: this is an 'unsafe' deletion: it does not wait for any outstanding jobs
        to complete. This could result in server-side errors for any simulations that are
        still running when the workspace directory is deleted.

        Raises
        ------
        HardwareInterfaceFailureError
            If there were problems connecting to the server or deleting the directory.
        """
        try:
            _ = self._run_remote_command(f"rm -r {self.workspace_dir}")
        except Exception as e:
            raise HardwareInterfaceFailureError(
                f"Could not delete workspace directory {self.workspace_dir} for "
                f"{self._user_at_host}: {e}"
            )
        return None

    def delete_remote_job_dir(self, job_id: JobId) -> None:
        """Delete the remote directory corresponding to a given job ID.

        This will recursively delete all the contents of the directory, invoking
        ``rm -r`` on it. Only submitted jobs that aren't currently running can have their
        remote directories deleted.

        Parameters
        ----------
        job_id : JobId
            The ID of the job whose remote directory should be deleted.

        Raises
        ------
        ValueError
            If the supplied job ID has not been submitted, or if the job is still running.
        HardwareInterfaceFailure
            If there were problems connecting to the server or deleting the directory.
        """

        job_status = self.get_job_status(job_id)
        if job_status == JobStatus.PENDING_SUBMIT:
            raise ValueError(
                f"Cannot delete directory for job ID {job_id}: job has not been submitted."
            )

        elif job_status == JobStatus.RUNNING:
            raise ValueError(
                f"Cannot delete directory {self._job_log[job_id]['job_remote_dir']} for "
                f"job ID {job_id}: job is currently running."
            )
        else:
            job_remote_dir = self._job_log[job_id]["job_remote_dir"]
            deletion_cmd = f"rm -r {job_remote_dir}"
            try:
                _ = self._run_remote_command(deletion_cmd)
            except Exception as e:
                raise HardwareInterfaceFailureError(
                    f"Could not delete remote folder {job_remote_dir} for "
                    f"{self._user_at_host}: {e}"
                )

            return None

workspace_dir: Optional[str] property

(Read-only) The directory within which details of jobs are recorded, or None if this is unknown at the time of calling (e.g. because it hasn't been created yet).

cancel_job(job_id)

Cancel the job with a given job ID.

Cancelling a job involves terminating the server-side simulator script process (and any subprocesses) associated with the job. If the job is not running (i.e. has completed, has failed or has already been cancelled) then this method will return without error.

Parameters:

  • job_id (JobId) –

    The ID of the job to cancel.

Raises:

Source code in exauq/sim_management/hardware.py
def cancel_job(self, job_id: JobId) -> None:
    """Cancel the job with a given job ID.

    Cancelling a job involves terminating the server-side simulator script process
    (and any subprocesses) associated with the job. If the job is not running (i.e.
    has completed, has failed or has already been cancelled) then this method will
    return without error.

    Parameters
    ----------
    job_id : JobId
        The ID of the job to cancel.

    Raises
    ------
    HardwareInterfaceFailureError
        If there were problems connecting to the server or otherwise cancelling the
        job.
    """
    if self.get_job_status(job_id) == JobStatus.RUNNING:
        try:
            self._run_remote_command(
                f"{self._bash} {self._job_log[job_id]['job_manager']} stop"
            )
            self._job_log[job_id]["status"] = JobStatus.CANCELLED
        except Exception as e:
            raise HardwareInterfaceFailureError(
                f"Could not cancel job with id {job_id}: {e}"
            )
        return None
    else:
        return None

delete_remote_job_dir(job_id)

Delete the remote directory corresponding to a given job ID.

This will recursively delete all the contents of the directory, invoking rm -r on it. Only submitted jobs that aren't currently running can have their remote directories deleted.

Parameters:

  • job_id (JobId) –

    The ID of the job whose remote directory should be deleted.

Raises:

  • ValueError

    If the supplied job ID has not been submitted, or if the job is still running.

  • HardwareInterfaceFailure

    If there were problems connecting to the server or deleting the directory.

Source code in exauq/sim_management/hardware.py
def delete_remote_job_dir(self, job_id: JobId) -> None:
    """Delete the remote directory corresponding to a given job ID.

    This will recursively delete all the contents of the directory, invoking
    ``rm -r`` on it. Only submitted jobs that aren't currently running can have their
    remote directories deleted.

    Parameters
    ----------
    job_id : JobId
        The ID of the job whose remote directory should be deleted.

    Raises
    ------
    ValueError
        If the supplied job ID has not been submitted, or if the job is still running.
    HardwareInterfaceFailure
        If there were problems connecting to the server or deleting the directory.
    """

    job_status = self.get_job_status(job_id)
    if job_status == JobStatus.PENDING_SUBMIT:
        raise ValueError(
            f"Cannot delete directory for job ID {job_id}: job has not been submitted."
        )

    elif job_status == JobStatus.RUNNING:
        raise ValueError(
            f"Cannot delete directory {self._job_log[job_id]['job_remote_dir']} for "
            f"job ID {job_id}: job is currently running."
        )
    else:
        job_remote_dir = self._job_log[job_id]["job_remote_dir"]
        deletion_cmd = f"rm -r {job_remote_dir}"
        try:
            _ = self._run_remote_command(deletion_cmd)
        except Exception as e:
            raise HardwareInterfaceFailureError(
                f"Could not delete remote folder {job_remote_dir} for "
                f"{self._user_at_host}: {e}"
            )

        return None

delete_workspace()

Delete the entire workspace directory associated with this instance.

Warning: this is an 'unsafe' deletion: it does not wait for any outstanding jobs to complete. This could result in server-side errors for any simulations that are still running when the workspace directory is deleted.

Raises:

Source code in exauq/sim_management/hardware.py
def delete_workspace(self) -> None:
    """Delete the entire workspace directory associated with this instance.

    Warning: this is an 'unsafe' deletion: it does not wait for any outstanding jobs
    to complete. This could result in server-side errors for any simulations that are
    still running when the workspace directory is deleted.

    Raises
    ------
    HardwareInterfaceFailureError
        If there were problems connecting to the server or deleting the directory.
    """
    try:
        _ = self._run_remote_command(f"rm -r {self.workspace_dir}")
    except Exception as e:
        raise HardwareInterfaceFailureError(
            f"Could not delete workspace directory {self.workspace_dir} for "
            f"{self._user_at_host}: {e}"
        )
    return None

get_job_output(job_id)

Get the simulator output for a job with the given ID.

This is read from the contents of the simulator output file for the job, located in the job's remote directory. It is expected that the contents of this output file will be a single floating point number.

Parameters:

  • job_id (JobId) –

    The ID of the job to get the simulator output for.

Returns:

  • Optional[float]

    The output of the simulator, if the job has completed successfully, or else None.

Raises:

  • HardwareInterfaceFailure

    If there were problems connecting to the server or retrieving the simulator output.

  • SimulatorOutputParsingError

    If the output of the simulator cannot be parsed as a single floating point number.

Source code in exauq/sim_management/hardware.py
def get_job_output(self, job_id: JobId) -> Optional[float]:
    """Get the simulator output for a job with the given ID.

    This is read from the contents of the simulator output file for the job, located
    in the job's remote directory. It is expected that the contents of this output
    file will be a single floating point number.

    Parameters
    ----------
    job_id : JobId
        The ID of the job to get the simulator output for.

    Returns
    -------
    Optional[float]
        The output of the simulator, if the job has completed successfully, or else
        ``None``.

    Raises
    ------
    HardwareInterfaceFailure
        If there were problems connecting to the server or retrieving the simulator
        output.
    SimulatorOutputParsingError
        If the output of the simulator cannot be parsed as a single floating point
        number.
    """
    if not self._job_has_been_submitted(job_id):
        return None

    elif self._job_log[job_id]["output"] is not None:
        return self._job_log[job_id]["output"]

    elif self.get_job_status(job_id) == JobStatus.COMPLETED:
        output_path = self._job_log[job_id]["script_output_path"]
        output = self._retrieve_output(output_path)
        try:
            self._job_log[job_id]["output"] = (
                float(output) if output is not None else output
            )
        except ValueError:
            raise SimulatorOutputParsingError(
                f"Could not parse simulator output {output} for job ID {job_id} as a "
                "float."
            )
        return self._job_log[job_id]["output"]
    else:
        return None

get_job_status(job_id)

Get the status of a job with given job ID.

Any jobs that have not been submitted with submit_job will return a status of JobStatus.PENDING_SUBMIT.

A job that has successfully been started on the server will have a status of JobStatus.RUNNING (which, in this case, is equivalent to JobStatus.SUBMITTED). The status will remain as JobStatus.RUNNING until the corresponding remote process has stopped, at which point the status is determined as follows:

  • If an output file from the simulator has been created, then the status is JobStatus.COMPLETED.
  • If the job was cancelled before completion, then the status is `JobStatus.CANCELLED.
  • If the job was not cancelled but no output file was created, then the status is JobStatus.FAILED. In particular, note that the exit code of the simulator script is not taken into account when determining whether a job has finished successfully or not.

Parameters:

  • job_id (JobId) –

    The ID of the job to check the status of.

Returns:

Raises:

  • HardwareInterfaceFailure

    If there were problems connecting to the server or retrieving the status of the job.

Source code in exauq/sim_management/hardware.py
def get_job_status(self, job_id: JobId) -> JobStatus:
    """Get the status of a job with given job ID.

    Any jobs that have not been submitted with `submit_job` will return a status of
    `JobStatus.PENDING_SUBMIT`.

    A job that has successfully been started on the server will have a status of
    `JobStatus.RUNNING` (which, in this case, is equivalent to `JobStatus.SUBMITTED`).
    The status will remain as `JobStatus.RUNNING` until the corresponding remote
    process has stopped, at which point the status is determined as follows:

    * If an output file from the simulator has been created, then the status is
      ``JobStatus.COMPLETED``.
    * If the job was cancelled before completion, then the status is
      ``JobStatus.CANCELLED`.
    * If the job was not cancelled but no output file was created, then the status
      is `JobStatus.FAILED`. In particular, note that the exit code of the
      simulator script is not taken into account when determining whether a job has
      finished successfully or not.

    Parameters
    ----------
    job_id : JobId
        The ID of the job to check the status of.

    Returns
    -------
    JobStatus
        The status of the job.

    Raises
    ------
    HardwareInterfaceFailure
        If there were problems connecting to the server or retrieving the status of
        the job.
    """
    if not self._job_has_been_submitted(job_id):
        return JobStatus.PENDING_SUBMIT
    elif self._job_log[job_id]["status"] in {JobStatus.RUNNING, JobStatus.SUBMITTED}:
        self._update_status_from_remote(job_id)
        return self._job_log[job_id]["status"]
    else:
        return self._job_log[job_id]["status"]

submit_job(job, resubmit=False)

Submit a job for the simulation code.

Upon submission, a new subdirectory of the remote workspace directory is created for the job, using the job's ID as the directory name. (The workspace directory will be created as well if it doesn't already exist.) A CSV file containing the simulation input data for the job is uploaded to this directory.

A Bash script is also uploaded to the job's directory, which is responsible for managing the job; it is through this script that the job can be started, cancelled or its status retrieved.

If a job with the same ID has already been submitted to the server and resubmit is False, then an error will be raised. Only jobs that have completed, be it successfully or ending in failure, or that have been cancelled, may be resubmitted. In this case, calling with resubmit set to True will delete any existing remote-side data for the corresponding job directory and then submit the supplied job.

Parameters:

  • job (Job) –

    A job containing the data to run the simulation code with.

  • resubmit (bool, default: False ) –

    (Default: False) Whether the job is being resubmitted, i.e. whether to delete any existing remote-side data for job before submission.

Raises:

  • ValueError

    If a job with the same ID has already been submitted and resubmit = False, or if resubmit = True and a job with the same ID has not completed.

  • HardwareInterfaceFailure

    If there were problems connecting to the server, making files / directories on the server or other such server-related problems.

Source code in exauq/sim_management/hardware.py
def submit_job(self, job: Job, resubmit: bool = False) -> None:
    """Submit a job for the simulation code.

    Upon submission, a new subdirectory of the remote workspace directory is created
    for the job, using the job's ID as the directory name. (The workspace directory
    will be created as well if it doesn't already exist.) A CSV file containing the
    simulation input data for the job is uploaded to this directory.

    A Bash script is also uploaded to the job's directory, which is responsible for
    managing the job; it is through this script that the job can be started, cancelled
    or its status retrieved.

    If a job with the same ID has already been submitted to the server and `resubmit`
    is ``False``, then an error will be raised. Only jobs that have completed, be it
    successfully or ending in failure, or that have been cancelled, may be
    resubmitted. In this case, calling with `resubmit` set to ``True`` will delete any
    existing remote-side data for the corresponding job directory and then submit the
    supplied `job`.

    Parameters
    ----------
    job : Job
        A job containing the data to run the simulation code with.
    resubmit : bool
        (Default: False) Whether the job is being resubmitted, i.e. whether to delete
        any existing remote-side data for job before submission.

    Raises
    ------
    ValueError
        If a job with the same ID has already been submitted and ``resubmit = False``,
        or if ``resubmit = True`` and a job with the same ID has not completed.
    HardwareInterfaceFailure
        If there were problems connecting to the server, making files / directories on
        the server or other such server-related problems.
    """

    if resubmit:
        # Clear the artifacts from the remote side, as long as the job is not running
        # or marked as submitted.
        if (status := self.get_job_status(job.id)) not in {
            JobStatus.SUBMITTED,
            JobStatus.RUNNING,
        }:
            self.delete_remote_job_dir(job.id)
            del self._job_log[job.id]
        else:
            raise ValueError(
                f"Cannot resubmit job with ID {job.id} as job status is '{status.value}'. "
                "(Cancel the job before resubmitting.)"
            )

    elif self._job_has_been_submitted(job.id):
        raise ValueError(
            f"Cannot submit job with ID {job.id}: a job with the same ID has already "
            f"been submitted."
        )

    # Create the settings for the new job
    job_settings = self._make_job_settings(job.id)

    # Make job-specific remote workspace directory (will raise error if directory
    # already exists)
    self._make_directory_on_remote(job_settings["job_remote_dir"])

    # Put simulator input data onto server
    data_str = ",".join(map(str, job.data)) + "\n"
    self._make_text_file_on_remote(data_str, job_settings["input_data_path"])

    # Create runner script and manager script and put onto server
    runner_script = self._make_runner_script(
        job_settings["job_remote_dir"],
        job_settings["input_data_path"],
        job_settings["script_output_path"],
    )
    manager_script = self._make_manager_script(
        job_settings["job_remote_dir"],
        job_settings["runner"],
        job_settings["script_stdout_path"],
        job_settings["script_output_path"],
    )
    self._make_text_file_on_remote(runner_script, job_settings["runner"])
    self._make_text_file_on_remote(manager_script, job_settings["job_manager"])

    # Start job
    try:
        _ = self._run_remote_command(
            f"{self._bash} {job_settings['job_manager']} start"
        )
    except Exception as e:
        raise HardwareInterfaceFailureError(
            f"Could not start job with id {job.id} on {self._user_at_host}: {e}"
        )

    # Mark job as submitted and store settings in job log
    job_settings["status"] = JobStatus.SUBMITTED
    self._job_log[job.id] = job_settings

    return None