Skip to content

Commit

Permalink
clean up job submit dirs more rapidly after completion (#397)
Browse files Browse the repository at this point in the history
* be more aggressive about cleaning out jobs that have finished

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* need to flip if

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* fix tests and add one more

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
dsschult and github-actions authored Oct 25, 2024
1 parent 2ee5ab0 commit 9546be8
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 13 deletions.
14 changes: 11 additions & 3 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def from_condor_status(num):
'sigterm',
'killed',
'bus error (core dumped)',
'segmentation fault (core dumped)',
'operation timed out',
'connection timed out',
]
Expand Down Expand Up @@ -898,25 +899,32 @@ async def check_submit_dir(self):
Return directory paths that should be cleaned up.
"""
# get time limits
queue_tasks = {j.task_id for j in self.jobs.values()}
queued_time = self.cfg['queue'].get('max_task_queued_time', 86400*2)
processing_time = self.cfg['queue'].get('max_task_processing_time', 86400*2)
suspend_time = self.cfg['queue'].get('suspend_submit_dir_time', 86400)
now = time.time()
job_clean_logs_time = now - suspend_time
job_old_time = now - (queued_time + processing_time)
dir_old_time = now - (queued_time + processing_time + suspend_time)
logger.debug('now: %r, job_old_time: %r, dir_old_time: %r', now, job_old_time, dir_old_time)
logger.debug('now: %r, job_clean_logs_time: %r, job_old_time: %r, dir_old_time: %r', now, job_clean_logs_time, job_old_time, dir_old_time)

for daydir in self.submit_dir.glob('[0-9][0-9][0-9][0-9]*'):
logger.debug('looking at daydir %s', daydir)
if daydir.is_dir():
empty = True
for path in daydir.iterdir():
logger.debug('looking at path %s', path)
job_active = path.name.split('_')[0] in queue_tasks
logger.debug('looking at path %s, active: %r', path, job_active)
st = path.lstat()
logger.debug('stat: %r', st)
if stat.S_ISDIR(st.st_mode):
empty = False
if st.st_mtime < job_old_time:
if not job_active:
if st.st_mtime < job_clean_logs_time:
logger.info('cleaning up submit dir %s', path)
shutil.rmtree(path)
elif st.st_mtime < job_old_time:
yield path
if st.st_mtime < dir_old_time:
logger.info('cleaning up submit dir %s', path)
Expand Down
4 changes: 2 additions & 2 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ attrs==24.2.0
# referencing
babel==2.16.0
# via sphinx
boto3==1.35.45
boto3==1.35.49
# via iceprod (setup.py)
botocore==1.35.45
botocore==1.35.49
# via
# boto3
# s3transfer
Expand Down
6 changes: 3 additions & 3 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ attrs==24.2.0
# referencing
beautifulsoup4==4.12.3
# via iceprod (setup.py)
boto3==1.35.45
boto3==1.35.49
# via
# iceprod (setup.py)
# moto
botocore==1.35.45
botocore==1.35.49
# via
# boto3
# moto
Expand Down Expand Up @@ -216,7 +216,7 @@ urllib3==2.2.3
# responses
# types-requests
# wipac-rest-tools
werkzeug==3.0.4
werkzeug==3.0.6
# via moto
wipac-dev-tools==1.13.0
# via
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ attrs==24.2.0
# via
# jsonschema
# referencing
boto3==1.35.45
boto3==1.35.49
# via iceprod (setup.py)
botocore==1.35.45
botocore==1.35.49
# via
# boto3
# s3transfer
Expand Down
34 changes: 31 additions & 3 deletions tests/server/plugins/condor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ async def test_Grid_check_delete_day(schedd, i3prod_path, set_time):
assert g.jels == {}


async def test_Grid_check_old(schedd, i3prod_path, set_time):
async def test_Grid_check_old_delete(schedd, i3prod_path, set_time):
override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10']
cfg = iceprod.server.config.IceProdConfig(save=False, override=override)

Expand All @@ -661,7 +661,7 @@ async def test_Grid_check_old(schedd, i3prod_path, set_time):
assert dirs == {daydir.name: []}


async def test_Grid_check_oldjob(schedd, i3prod_path, set_time):
async def test_Grid_check_oldjob_remove(schedd, i3prod_path, set_time):
override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10']
cfg = iceprod.server.config.IceProdConfig(save=False, override=override)

Expand All @@ -682,7 +682,7 @@ async def test_Grid_check_oldjob(schedd, i3prod_path, set_time):
os.utime(p, (t, t))
logging.info('set time to %d', t)

jobs[CondorJobId(cluster_id=1, proc_id=0)] = CondorJob(status=JobStatus.IDLE, submit_dir=p)
jobs[CondorJobId(cluster_id=1, proc_id=0)] = CondorJob(status=JobStatus.IDLE, submit_dir=p, task_id=p.name)

await g.check()

Expand All @@ -691,6 +691,34 @@ async def test_Grid_check_oldjob(schedd, i3prod_path, set_time):
assert g.submitter.remove.call_count == 1


async def test_Grid_check_oldjob_delete(schedd, i3prod_path, set_time):
override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10']
cfg = iceprod.server.config.IceProdConfig(save=False, override=override)

rc = MagicMock()
g = iceprod.server.plugins.condor.Grid(cfg=cfg, rest_client=rc, cred_client=None)

jobs = {}
g.submitter.get_jobs = MagicMock(return_value=jobs)
g.submitter.get_history = MagicMock(return_value={})
g.submitter.remove = MagicMock()
g.get_tasks_on_queue = AsyncMock(return_value=[])

jel = g.get_current_JEL()
daydir = jel.parent
p = daydir / 'olddir'
p.mkdir()
t = time.mktime(set_time.utctimetuple()) - 15 # must be older than suspend time
os.utime(p, (t, t))
logging.info('set time to %d', t)

await g.check()

dirs = {x.name: [x for x in x.iterdir() if x.is_dir()] for x in g.submit_dir.glob('[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]')}
assert dirs == {daydir.name: []}
assert g.submitter.remove.call_count == 0


@pytest.mark.parametrize('jel_jobs,queue_jobs,hist_jobs,remove_calls,finish_calls', [
({(1,0): JobStatus.IDLE},
{(1,0): JobStatus.IDLE},
Expand Down

0 comments on commit 9546be8

Please sign in to comment.