Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore condor log error #394

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions iceprod/core/exe.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,6 @@ async def convert(self, transfer=False):
print('# set some env vars for expansion', file=f)
print('OS_ARCH=$(/cvmfs/icecube.opensciencegrid.org/py3-v4.3.0/os_arch.sh)', file=f)
print('', file=f)
print('# debugging', file=f)
print('echo "dir contents:"', file=f)
print('ls -al', file=f)
print('', file=f)
with scope_env(self.cfgparser, self.task.dataset.config['steering'], logger=self.logger) as globalenv:
task = self.task.get_task_config()
if self.task.task_files:
Expand Down
145 changes: 74 additions & 71 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,77 +528,80 @@ async def wait(self, timeout):

while True:
for filename, events in self.jels.items():
for event in events:
if float(event.timestamp) < self.last_event_timestamp:
continue
self.last_event_timestamp = event.timestamp

job_id = CondorJobId(cluster_id=event.cluster, proc_id=event.proc)

if event.type == htcondor.JobEventType.SUBMIT:
self.jobs[job_id] = CondorJob()
continue
elif job_id not in self.jobs:
logger.debug('reloaded job %s', job_id)
self.jobs[job_id] = CondorJob()

job = self.jobs[job_id]

if event.type == htcondor.JobEventType.JOB_AD_INFORMATION:
if not job.dataset_id:
job.dataset_id = event['IceProdDatasetId']
job.task_id = event['IceProdTaskId']
job.instance_id = event['IceProdTaskInstanceId']
job.submit_dir = Path(event['Iwd'])

type_ = event['TriggerEventTypeNumber']
if type_ == htcondor.JobEventType.JOB_TERMINATED:
logger.info("job %s %s.%s exited on its own", job_id, job.dataset_id, job.task_id)

# get stats
cpu = event.get('CpusUsage', None)
if not cpu:
cpu = parse_usage(event.get('RunRemoteUsage', ''))
gpu = event.get('GpusUsage', None)
memory = event.get('MemoryUsage', None) # MB
disk = event.get('DiskUsage', None) # KB
time_ = event.get('LastRemoteWallClockTime', None) # seconds
# data_in = event['ReceivedBytes'] # KB
# data_out = event['SentBytes'] # KB

resources = {}
if cpu is not None:
resources['cpu'] = cpu
if gpu is not None:
resources['gpu'] = gpu
if memory is not None:
resources['memory'] = memory/1000.
if disk is not None:
resources['disk'] = disk/1000000.
if time_ is not None:
resources['time'] = time_/3600.

success = event.get('ReturnValue', 1) == 0
job.status = JobStatus.COMPLETED if success else JobStatus.FAILED

# finish job
await self.finish(job_id, success=success, resources=resources)

elif type_ == htcondor.JobEventType.JOB_ABORTED:
job.status = JobStatus.FAILED
reason = event.get('Reason', None)
logger.info("job %s %s.%s removed: %r", job_id, job.dataset_id, job.task_id, reason)
await self.finish(job_id, success=False, reason=reason)

else:
# update status
new_status = JOB_EVENT_STATUS_TRANSITIONS.get(type_, None)
if new_status is not None and job.status != new_status:
job.status = new_status
if new_status == JobStatus.FAILED:
self.submitter.remove(job_id, reason=event.get('HoldReason', None))
else:
await self.job_update(job)
try:
for event in events:
if float(event.timestamp) < self.last_event_timestamp:
continue
self.last_event_timestamp = event.timestamp

job_id = CondorJobId(cluster_id=event.cluster, proc_id=event.proc)

if event.type == htcondor.JobEventType.SUBMIT:
self.jobs[job_id] = CondorJob()
continue
elif job_id not in self.jobs:
logger.debug('reloaded job %s', job_id)
self.jobs[job_id] = CondorJob()

job = self.jobs[job_id]

if event.type == htcondor.JobEventType.JOB_AD_INFORMATION:
if not job.dataset_id:
job.dataset_id = event['IceProdDatasetId']
job.task_id = event['IceProdTaskId']
job.instance_id = event['IceProdTaskInstanceId']
job.submit_dir = Path(event['Iwd'])

type_ = event['TriggerEventTypeNumber']
if type_ == htcondor.JobEventType.JOB_TERMINATED:
logger.info("job %s %s.%s exited on its own", job_id, job.dataset_id, job.task_id)

# get stats
cpu = event.get('CpusUsage', None)
if not cpu:
cpu = parse_usage(event.get('RunRemoteUsage', ''))
gpu = event.get('GpusUsage', None)
memory = event.get('MemoryUsage', None) # MB
disk = event.get('DiskUsage', None) # KB
time_ = event.get('LastRemoteWallClockTime', None) # seconds
# data_in = event['ReceivedBytes'] # KB
# data_out = event['SentBytes'] # KB

resources = {}
if cpu is not None:
resources['cpu'] = cpu
if gpu is not None:
resources['gpu'] = gpu
if memory is not None:
resources['memory'] = memory/1000.
if disk is not None:
resources['disk'] = disk/1000000.
if time_ is not None:
resources['time'] = time_/3600.

success = event.get('ReturnValue', 1) == 0
job.status = JobStatus.COMPLETED if success else JobStatus.FAILED

# finish job
await self.finish(job_id, success=success, resources=resources)

elif type_ == htcondor.JobEventType.JOB_ABORTED:
job.status = JobStatus.FAILED
reason = event.get('Reason', None)
logger.info("job %s %s.%s removed: %r", job_id, job.dataset_id, job.task_id, reason)
await self.finish(job_id, success=False, reason=reason)

else:
# update status
new_status = JOB_EVENT_STATUS_TRANSITIONS.get(type_, None)
if new_status is not None and job.status != new_status:
job.status = new_status
if new_status == JobStatus.FAILED:
self.submitter.remove(job_id, reason=event.get('HoldReason', None))
else:
await self.job_update(job)
except Exception:
logger.warning('error processing condor log', exc_info=True)

if time.monotonic() - start >= timeout:
break
Expand Down
Loading