From 9546be8be07b91065530dca2b76b347a35683d28 Mon Sep 17 00:00:00 2001 From: David Schultz Date: Fri, 25 Oct 2024 15:31:32 -0500 Subject: [PATCH] clean up job submit dirs more rapidly after completion (#397) * be more aggressive about cleaning out jobs that have finished * update requirements-docs.txt * update requirements-tests.txt * update requirements.txt * need to flip if * update requirements-docs.txt * update requirements-tests.txt * update requirements.txt * fix tests and add one more --------- Co-authored-by: github-actions --- iceprod/server/plugins/condor.py | 14 +++++++++--- requirements-docs.txt | 4 ++-- requirements-tests.txt | 6 ++--- requirements.txt | 4 ++-- tests/server/plugins/condor_test.py | 34 ++++++++++++++++++++++++++--- 5 files changed, 49 insertions(+), 13 deletions(-) diff --git a/iceprod/server/plugins/condor.py b/iceprod/server/plugins/condor.py index 8ca8e0ca..6d5a2438 100644 --- a/iceprod/server/plugins/condor.py +++ b/iceprod/server/plugins/condor.py @@ -95,6 +95,7 @@ def from_condor_status(num): 'sigterm', 'killed', 'bus error (core dumped)', + 'segmentation fault (core dumped)', 'operation timed out', 'connection timed out', ] @@ -898,25 +899,32 @@ async def check_submit_dir(self): Return directory paths that should be cleaned up. """ # get time limits + queue_tasks = {j.task_id for j in self.jobs.values()} queued_time = self.cfg['queue'].get('max_task_queued_time', 86400*2) processing_time = self.cfg['queue'].get('max_task_processing_time', 86400*2) suspend_time = self.cfg['queue'].get('suspend_submit_dir_time', 86400) now = time.time() + job_clean_logs_time = now - suspend_time job_old_time = now - (queued_time + processing_time) dir_old_time = now - (queued_time + processing_time + suspend_time) - logger.debug('now: %r, job_old_time: %r, dir_old_time: %r', now, job_old_time, dir_old_time) + logger.debug('now: %r, job_clean_logs_time: %r, job_old_time: %r, dir_old_time: %r', now, job_clean_logs_time, job_old_time, dir_old_time) for daydir in self.submit_dir.glob('[0-9][0-9][0-9][0-9]*'): logger.debug('looking at daydir %s', daydir) if daydir.is_dir(): empty = True for path in daydir.iterdir(): - logger.debug('looking at path %s', path) + job_active = path.name.split('_')[0] in queue_tasks + logger.debug('looking at path %s, active: %r', path, job_active) st = path.lstat() logger.debug('stat: %r', st) if stat.S_ISDIR(st.st_mode): empty = False - if st.st_mtime < job_old_time: + if not job_active: + if st.st_mtime < job_clean_logs_time: + logger.info('cleaning up submit dir %s', path) + shutil.rmtree(path) + elif st.st_mtime < job_old_time: yield path if st.st_mtime < dir_old_time: logger.info('cleaning up submit dir %s', path) diff --git a/requirements-docs.txt b/requirements-docs.txt index 9e0aa9df..3ca710b9 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -16,9 +16,9 @@ attrs==24.2.0 # referencing babel==2.16.0 # via sphinx -boto3==1.35.45 +boto3==1.35.49 # via iceprod (setup.py) -botocore==1.35.45 +botocore==1.35.49 # via # boto3 # s3transfer diff --git a/requirements-tests.txt b/requirements-tests.txt index ab8a5b9b..c5fc80fc 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -14,11 +14,11 @@ attrs==24.2.0 # referencing beautifulsoup4==4.12.3 # via iceprod (setup.py) -boto3==1.35.45 +boto3==1.35.49 # via # iceprod (setup.py) # moto -botocore==1.35.45 +botocore==1.35.49 # via # boto3 # moto @@ -216,7 +216,7 @@ urllib3==2.2.3 # responses # types-requests # wipac-rest-tools -werkzeug==3.0.4 +werkzeug==3.0.6 # via moto wipac-dev-tools==1.13.0 # via diff --git a/requirements.txt b/requirements.txt index 260fb9b7..4decbbd6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,9 +12,9 @@ attrs==24.2.0 # via # jsonschema # referencing -boto3==1.35.45 +boto3==1.35.49 # via iceprod (setup.py) -botocore==1.35.45 +botocore==1.35.49 # via # boto3 # s3transfer diff --git a/tests/server/plugins/condor_test.py b/tests/server/plugins/condor_test.py index f163ecb8..b4b5db97 100644 --- a/tests/server/plugins/condor_test.py +++ b/tests/server/plugins/condor_test.py @@ -635,7 +635,7 @@ async def test_Grid_check_delete_day(schedd, i3prod_path, set_time): assert g.jels == {} -async def test_Grid_check_old(schedd, i3prod_path, set_time): +async def test_Grid_check_old_delete(schedd, i3prod_path, set_time): override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10'] cfg = iceprod.server.config.IceProdConfig(save=False, override=override) @@ -661,7 +661,7 @@ async def test_Grid_check_old(schedd, i3prod_path, set_time): assert dirs == {daydir.name: []} -async def test_Grid_check_oldjob(schedd, i3prod_path, set_time): +async def test_Grid_check_oldjob_remove(schedd, i3prod_path, set_time): override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10'] cfg = iceprod.server.config.IceProdConfig(save=False, override=override) @@ -682,7 +682,7 @@ async def test_Grid_check_oldjob(schedd, i3prod_path, set_time): os.utime(p, (t, t)) logging.info('set time to %d', t) - jobs[CondorJobId(cluster_id=1, proc_id=0)] = CondorJob(status=JobStatus.IDLE, submit_dir=p) + jobs[CondorJobId(cluster_id=1, proc_id=0)] = CondorJob(status=JobStatus.IDLE, submit_dir=p, task_id=p.name) await g.check() @@ -691,6 +691,34 @@ async def test_Grid_check_oldjob(schedd, i3prod_path, set_time): assert g.submitter.remove.call_count == 1 +async def test_Grid_check_oldjob_delete(schedd, i3prod_path, set_time): + override = ['queue.type=htcondor', 'queue.max_task_queued_time=10', 'queue.max_task_processing_time=10', 'queue.suspend_submit_dir_time=10'] + cfg = iceprod.server.config.IceProdConfig(save=False, override=override) + + rc = MagicMock() + g = iceprod.server.plugins.condor.Grid(cfg=cfg, rest_client=rc, cred_client=None) + + jobs = {} + g.submitter.get_jobs = MagicMock(return_value=jobs) + g.submitter.get_history = MagicMock(return_value={}) + g.submitter.remove = MagicMock() + g.get_tasks_on_queue = AsyncMock(return_value=[]) + + jel = g.get_current_JEL() + daydir = jel.parent + p = daydir / 'olddir' + p.mkdir() + t = time.mktime(set_time.utctimetuple()) - 15 # must be older than suspend time + os.utime(p, (t, t)) + logging.info('set time to %d', t) + + await g.check() + + dirs = {x.name: [x for x in x.iterdir() if x.is_dir()] for x in g.submit_dir.glob('[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]')} + assert dirs == {daydir.name: []} + assert g.submitter.remove.call_count == 0 + + @pytest.mark.parametrize('jel_jobs,queue_jobs,hist_jobs,remove_calls,finish_calls', [ ({(1,0): JobStatus.IDLE}, {(1,0): JobStatus.IDLE},