Condor Plugin

The Condor plugin. Allows submission to HTCondor.

Note: Condor was renamed to HTCondor in 2012.

iceprod.server.plugins.condor.check_call_clean_env(*args, **kwargs)[source]
iceprod.server.plugins.condor.check_output_clean_env(*args, **kwargs)[source]
class iceprod.server.plugins.condor.JobStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]
IDLE = 1
RUNNING = 2
FAILED = 3
COMPLETED = 4
static from_condor_status(num)[source]
iceprod.server.plugins.condor.parse_usage(usage: str) int[source]

Parse HTCondor usage expression

Example input: “Usr 0 00:00:00, Sys 0 00:00:00”

Parameters:

usage – usage expression

Returns:

usage sum in seconds

class iceprod.server.plugins.condor.CondorJob(*, dataset_id: str | None = None, task_id: str | None = None, instance_id: str | None = None, submit_dir: Path | None = None, status: JobStatus = JobStatus.IDLE, extra: dict | None = None)[source]

Holds the job states for an HTCondor cluster.

dataset_id: str | None
task_id: str | None
instance_id: str | None
submit_dir: Path | None
status: JobStatus
extra: dict | None
class iceprod.server.plugins.condor.CondorJobId(cluster_id: int, proc_id: int)[source]

Represents an HTCondor job id

Create new instance of CondorJobId(cluster_id, proc_id)

cluster_id: int

Alias for field number 0

proc_id: int

Alias for field number 1

class iceprod.server.plugins.condor.CondorSubmit(cfg: IceProdConfig, submit_dir: Path, credentials_dir: Path)[source]

Factory for submitting HTCondor jobs

AD_DEFAULTS = {'+OriginalTime': 3600, 'PreArguments': 'UNDEFINED', 'PreCmd': 'UNDEFINED', 'request_cpus': 1, 'request_disk': 1000000, 'request_gpus': 'UNDEFINED', 'request_memory': 1000, 'requirements': '', 'transfer_input_files': [], 'transfer_output_files': [], 'transfer_output_remaps': []}
AD_INFO = ['RemotePool', 'RemoteHost', 'HoldReason', 'RemoveReason', 'Reason', 'MachineAttrGLIDEIN_Site0', 'Iwd', 'IceProdDatasetId', 'IceProdTaskId', 'IceProdTaskInstanceId', 'MATCH_EXP_JOBGLIDEIN_ResourceName']
AD_PROJECTION_QUEUE = ['JobStatus', 'RemotePool', 'RemoteHost', 'Iwd', 'IceProdDatasetId', 'IceProdTaskId', 'IceProdTaskInstanceId', 'MATCH_EXP_JOBGLIDEIN_ResourceName']
AD_PROJECTION_HISTORY = ['JobStatus', 'ExitCode', 'RemoveReason', 'LastHoldReason', 'CpusUsage', 'RemoteSysCpu', 'RemoteUserCpu', 'GpusUsage', 'ResidentSetSize_RAW', 'DiskUsage_RAW', 'LastRemoteWallClockTime', 'LastRemoteHost', 'LastRemotePool', 'MachineAttrGLIDEIN_Site0', 'Iwd', 'IceProdDatasetId', 'IceProdTaskId', 'IceProdTaskInstanceId', 'MATCH_EXP_JOBGLIDEIN_ResourceName']
condor_plugin_discovery()[source]

Find all available HTCondor transfer plugins, and copy them to the submit_dir

static condor_os_container(os_arch)[source]

Convert from OS_ARCH to container image

static condor_resource_reqs(task: Task)[source]

Convert from Task requirements to HTCondor requirements

condor_infiles(infiles)[source]

Convert from set[Data] to HTCondor classads for input files

condor_outfiles(outfiles)[source]

Convert from set[Data] to HTCondor classads for output files

create_submit_dir(task: Task, jel_dir: Path) Path[source]

Create the submit dir

async submit(tasks: list[Task], jel: Path) dict[CondorJobId, CondorJob][source]

Submit multiple jobs to Condor as a single batch.

Assumes that the resource requirements are identical.

Parameters:
  • tasks – IceProd Tasks to submit

  • jel – common job event log

Returns:

dict of new jobs

get_jobs() {<class 'iceprod.server.plugins.condor.CondorJobId'>: <class 'iceprod.server.plugins.condor.CondorJob'>}[source]

Get all jobs currently on the condor queue.

get_history(since=None) {<class 'iceprod.server.plugins.condor.CondorJobId'>: <class 'iceprod.server.plugins.condor.CondorJob'>}[source]

Get all jobs currently on the condor history.

remove(job_id: str | CondorJobId, reason: str | None = None)[source]

Remove a job from condor.

Parameters:
  • job_id – condor job id

  • reason – reason for removal

class iceprod.server.plugins.condor.Grid(*args, **kwargs)[source]

HTCondor grid plugin

load_timestamp()[source]
save_timestamp()[source]
async run(forever=True)[source]

Override the ActiveJobs and JobActions batch submission / monitoring classes

async submit()[source]
get_queue_num() int[source]

Determine how many tasks to queue.

get_current_JEL() Path[source]

Get the current Job Event Log, possibly creating a new one for every hour.

Returns:

filename to current JEL

Return type:

Path

async wait(timeout)[source]

Wait for jobs to complete from the Job Event Logs.

Parameters:

timeout – wait up to N seconds

async job_update(job: CondorJob)[source]

Send updated info from the batch system to the IceProd API.

Must handle dup calls.

async finish(job_id: CondorJobId, success: bool = True, resources: dict | None = None, reason: str | None = None, stats: dict | None = None)[source]

Run cleanup actions after a batch job completes.

Must handle dup calls.

async check()[source]

Do a cross-check, to verify self.jobs vs the submit dir and IceProd API.

async check_history()[source]

Check condor_history

async check_iceprod()[source]

Sync with iceprod server status.

async check_submit_dir()[source]

Return directory paths that should be cleaned up.