Skip to content

Commit

Permalink
Reset reasons (#374)
Browse files Browse the repository at this point in the history
* add some reset reasons

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* another test for iterations and data

* more error handling for gsiftp transfers

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
dsschult and github-actions authored Oct 2, 2024
1 parent 1b7f264 commit 064cf31
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 13 deletions.
9 changes: 8 additions & 1 deletion iceprod/server/data/condor_transfer_plugins/gsiftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ def setup_env(self):
proxies = glob.glob(os.path.join(os.getcwd(), 'x509up_*'))
if proxies:
os.environ['X509_USER_PROXY'] = proxies[0]
else:
raise RuntimeError('X509_USER_PROXY does not exist')

if not os.path.exists('/cvmfs/icecube.opensciencegrid.org/iceprod/v2.7.1/env-shell.sh'):
raise RuntimeError('CVMFS does not exist')

def _do_transfer(self, inpath, outpath):
try:
Expand All @@ -129,9 +134,11 @@ def _do_transfer(self, inpath, outpath):
], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
if e.output:
for line in e.output.decode('utf-8').split('\n'):
output = e.output.decode('utf-8')
for line in output.split('\n'):
if line.lower().startswith('error'):
raise RuntimeError('globus-url-copy failed: '+line)
raise RuntimeError('Generic subprocess failure: '+output)
raise

def download_file(self, url, local_file_path):
Expand Down
7 changes: 4 additions & 3 deletions iceprod/server/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,9 @@ async def task_reset(self, task: GridTask, reason: str | None = None):
"""
Tell IceProd API a task should be reset back to the "waiting" status.
This is for non-payload errors.
Args:
task: IceProd task info
reason: A reason for resetting
reason: A reason for failure
"""
if not task.task_id or not task.instance_id:
raise RuntimeError("Either task_id or instance_id is empty")
Expand All @@ -324,6 +322,9 @@ async def task_reset(self, task: GridTask, reason: str | None = None):
except requests.exceptions.HTTPError as e:
if e.response.status_code != 404:
raise
else:
if reason:
await self._upload_log(task, 'stdlog', reason)

async def task_failure(self, task: GridTask, reason: str | None = None, stats: dict | None = None, stdout: Path | None = None, stderr: Path | None = None):
"""
Expand Down
21 changes: 18 additions & 3 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import asyncio
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, UTC
import enum
import importlib
import logging
Expand Down Expand Up @@ -69,6 +69,14 @@ class JobStatus(enum.Enum):
}


RESET_REASONS = [
'SIGTERM',
'killed',
'Transfer input files failure',
'Transfer output files failure',
]


def parse_usage(usage: str) -> int:
"""
Parse HTCondor usage expression
Expand Down Expand Up @@ -469,7 +477,7 @@ def get_current_JEL(self) -> Path:
Returns:
Path: filename to current JEL
"""
day = datetime.utcnow().date().isoformat()
day = datetime.now(UTC).date().isoformat()
day_submit_dir = self.submit_dir / day
if not day_submit_dir.exists():
day_submit_dir.mkdir(mode=0o700, parents=True)
Expand Down Expand Up @@ -623,9 +631,16 @@ async def finish(self, job_id: CondorJobId, success: bool = True, resources: dic
if success:
await self.task_success(job, stats=stats, stdout=stdout, stderr=stderr)
else:
future = None
if reason:
stats['error_summary'] = reason
await self.task_failure(job, stats=stats, reason=reason, stdout=stdout, stderr=stderr)
for text in RESET_REASONS:
if text in reason:
future = self.task_reset(job, reason=reason)
break
if future is None:
future = self.task_failure(job, stats=stats, reason=reason, stdout=stdout, stderr=stderr)
await future
except Exception:
logger.warning('failed to update REST', exc_info=True)

Expand Down
4 changes: 2 additions & 2 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ attrs==24.2.0
# referencing
babel==2.16.0
# via sphinx
boto3==1.35.31
boto3==1.35.32
# via iceprod (setup.py)
botocore==1.35.31
botocore==1.35.32
# via
# boto3
# s3transfer
Expand Down
4 changes: 2 additions & 2 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ attrs==24.2.0
# referencing
beautifulsoup4==4.12.3
# via iceprod (setup.py)
boto3==1.35.31
boto3==1.35.32
# via
# iceprod (setup.py)
# moto
botocore==1.35.31
botocore==1.35.32
# via
# boto3
# moto
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ attrs==24.2.0
# via
# jsonschema
# referencing
boto3==1.35.31
boto3==1.35.32
# via iceprod (setup.py)
botocore==1.35.31
botocore==1.35.32
# via
# boto3
# s3transfer
Expand Down
38 changes: 38 additions & 0 deletions tests/core/exe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,41 @@ async def test_write_to_script_data_dups(tmp_path):
script = open(scriptpath).read()
lines = [line for line in script.split('\n') if not (not line.strip() or line.startswith('#') or line.startswith('set '))]
assert lines == ['python foo.py']


async def test_write_to_script_data_iterations(tmp_path):
t = get_task({
'options': {
'job_temp': 'https://foo.bar',
},
'steering': {
'parameters': {
'foo0': 'baz',
'test': 'foo_bar_$steering(foo$(iter))',
'remote': 'https://foo.bar',
}
},
'tasks': [{
'name': 'foo',
'trays': [{
'modules': [{
'env_clear': False,
'src': 'foo.py'
}],
'data': [{
'movement': 'input',
'type': 'permanent',
'remote': '$steering(remote)/$steering(test)',
}]
}]
}]
})

ws = iceprod.core.exe.WriteToScript(t, workdir=tmp_path, logger=logger)
scriptpath = await ws.convert()

assert ws.infiles == {Data('https://foo.bar/foo_bar_baz', 'foo_bar_baz', Transfer.TRUE)}
assert ws.outfiles == set()
script = open(scriptpath).read()
lines = [line for line in script.split('\n') if not (not line.strip() or line.startswith('#') or line.startswith('set '))]
assert lines == ['python foo.py']
60 changes: 60 additions & 0 deletions tests/server/plugins/condor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,63 @@ async def test_Grid_check_oldjob(schedd, i3prod_path):
assert dirs == {daydir.name: [p]}
assert g.submitter.remove.call_count == 1

async def test_reset_task(schedd, i3prod_path):
override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10']
cfg = iceprod.server.config.IceProdConfig(save=False, override=override)

rc = MagicMock()
g = iceprod.server.plugins.condor.Grid(cfg=cfg, rest_client=rc, cred_client=None)

g.submitter.remove = MagicMock()

jel = g.get_current_JEL()
daydir = jel.parent
p = daydir / 'olddir'
p.mkdir()
t = time.time() - 25
os.utime(p, (t, t))
logging.info('set time to %d', t)

JobStatus = iceprod.server.plugins.condor.JobStatus
CondorJob = iceprod.server.plugins.condor.CondorJob
CondorJobId = iceprod.server.plugins.condor.CondorJobId

# normal failure
jobid = CondorJobId(cluster_id=1, proc_id=0)
g.jobs[jobid] = CondorJob(status=JobStatus.IDLE, submit_dir=p)

g.task_success = AsyncMock()
g.task_reset = AsyncMock()
g.task_failure = AsyncMock()

await g.finish(jobid, success=False)

assert g.task_success.call_count == 0
assert g.task_reset.call_count == 0
assert g.task_failure.call_count == 1

# success
g.jobs[jobid] = CondorJob(status=JobStatus.IDLE, submit_dir=p)

g.task_success = AsyncMock()
g.task_reset = AsyncMock()
g.task_failure = AsyncMock()

await g.finish(jobid, success=True)

assert g.task_success.call_count == 1
assert g.task_reset.call_count == 0
assert g.task_failure.call_count == 0

# success
g.jobs[jobid] = CondorJob(status=JobStatus.IDLE, submit_dir=p)

g.task_success = AsyncMock()
g.task_reset = AsyncMock()
g.task_failure = AsyncMock()

await g.finish(jobid, success=False, reason=iceprod.server.plugins.condor.RESET_REASONS[0])

assert g.task_success.call_count == 0
assert g.task_reset.call_count == 1
assert g.task_failure.call_count == 0

0 comments on commit 064cf31

Please sign in to comment.