From cfd92e2b724116c4b1f73cb55622429794b0483e Mon Sep 17 00:00:00 2001 From: David Schultz Date: Thu, 31 Oct 2024 15:44:27 -0500 Subject: [PATCH] fix materialization case where job gets created and service is killed (#401) * fix materialization case where job gets created but tasks are all missing * update requirements-docs.txt * update requirements-tests.txt * update requirements.txt --------- Co-authored-by: github-actions --- iceprod/materialization/materialize.py | 7 +- requirements-docs.txt | 6 +- requirements-tests.txt | 10 +- requirements.txt | 6 +- tests/materialization/test_materialize.py | 119 ++++++++++++++++++++++ 5 files changed, 133 insertions(+), 15 deletions(-) diff --git a/iceprod/materialization/materialize.py b/iceprod/materialization/materialize.py index 8918f3b36..136d12856 100644 --- a/iceprod/materialization/materialize.py +++ b/iceprod/materialization/materialize.py @@ -56,6 +56,7 @@ async def run_once(self, only_dataset=None, set_status=None, num=10000, dryrun=F # buffer for this dataset logger.warning('checking dataset %s', dataset_id) jobs = await self.rest_client.request('GET', '/datasets/{}/jobs'.format(dataset_id), {'keys': 'job_id|job_index'}) + job_index_id = {j['job_index']: j['job_id'] for j in jobs.values()} # check that last job was buffered correctly job_index = max(jobs[i]['job_index'] for i in jobs)+1 if jobs else 0 @@ -63,16 +64,14 @@ async def run_once(self, only_dataset=None, set_status=None, num=10000, dryrun=F logger.info('job_index: %d', job_index) logger.info('num_tasks: %d', num_tasks) logger.info('tasks_per_job: %d', dataset['tasks_per_job']) - while num_tasks % dataset['tasks_per_job'] != 0 and job_index > 0: + while job_index * dataset['tasks_per_job'] != num_tasks and job_index > 0: job_index -= 1 logger.info('a job must have failed to buffer, so check in reverse order. job_index=%d, num_tasks=%d', job_index, num_tasks) job_tasks = await self.rest_client.request('GET', f'/datasets/{dataset_id}/tasks', {'job_index': job_index, 'keys': 'task_id|job_id|task_index'}) if len(job_tasks) != dataset['tasks_per_job']: logger.info('fixing buffer of job %d for dataset %s', job_index, dataset_id) - ret = await self.rest_client.request('GET', f'/datasets/{dataset_id}/jobs', - {'job_index': job_index, 'keys': 'job_id'}) - job_id = list(ret.keys())[0] + job_id = job_index_id[job_index] logger.info(' fixing job_id %s, num existing tasks: %d', job_id, len(job_tasks)) tasks_buffered = await self.buffer_job(dataset, job_index, job_id=job_id, tasks=list(job_tasks.values()), diff --git a/requirements-docs.txt b/requirements-docs.txt index 3ca710b95..c57e35cb9 100644 --- a/requirements-docs.txt +++ b/requirements-docs.txt @@ -16,9 +16,9 @@ attrs==24.2.0 # referencing babel==2.16.0 # via sphinx -boto3==1.35.49 +boto3==1.35.53 # via iceprod (setup.py) -botocore==1.35.49 +botocore==1.35.53 # via # boto3 # s3transfer @@ -123,7 +123,7 @@ requests-futures==1.0.1 # wipac-rest-tools requests-toolbelt==1.0.0 # via iceprod (setup.py) -rpds-py==0.20.0 +rpds-py==0.20.1 # via # jsonschema # referencing diff --git a/requirements-tests.txt b/requirements-tests.txt index c5fc80fc7..9d720ca00 100644 --- a/requirements-tests.txt +++ b/requirements-tests.txt @@ -14,11 +14,11 @@ attrs==24.2.0 # referencing beautifulsoup4==4.12.3 # via iceprod (setup.py) -boto3==1.35.49 +boto3==1.35.53 # via # iceprod (setup.py) # moto -botocore==1.35.49 +botocore==1.35.53 # via # boto3 # moto @@ -131,7 +131,7 @@ pytest==8.0.2 # pytest-mock pytest-asyncio==0.23.8 # via iceprod (setup.py) -pytest-cov==5.0.0 +pytest-cov==6.0.0 # via iceprod (setup.py) pytest-mock==3.14.0 # via iceprod (setup.py) @@ -173,7 +173,7 @@ responses==0.25.3 # via moto respx==0.21.1 # via iceprod (setup.py) -rpds-py==0.20.0 +rpds-py==0.20.1 # via # jsonschema # referencing @@ -216,7 +216,7 @@ urllib3==2.2.3 # responses # types-requests # wipac-rest-tools -werkzeug==3.0.6 +werkzeug==3.1.0 # via moto wipac-dev-tools==1.13.0 # via diff --git a/requirements.txt b/requirements.txt index 4decbbd64..99f01d08e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,9 +12,9 @@ attrs==24.2.0 # via # jsonschema # referencing -boto3==1.35.49 +boto3==1.35.53 # via iceprod (setup.py) -botocore==1.35.49 +botocore==1.35.53 # via # boto3 # s3transfer @@ -106,7 +106,7 @@ requests-futures==1.0.1 # wipac-rest-tools requests-toolbelt==1.0.0 # via iceprod (setup.py) -rpds-py==0.20.0 +rpds-py==0.20.1 # via # jsonschema # referencing diff --git a/tests/materialization/test_materialize.py b/tests/materialization/test_materialize.py index d7cabed76..cc3a7a661 100644 --- a/tests/materialization/test_materialize.py +++ b/tests/materialization/test_materialize.py @@ -2,6 +2,7 @@ from rest_tools.client import RestClient +import iceprod.materialization.materialize from iceprod.materialization.materialize import Materialize def test_materialize_init(): @@ -66,3 +67,121 @@ async def test_materialize_buffer_job_no_depends(requests_mock): ret = await m.buffer_job(dataset, 0) assert ret == 1 + + +async def test_materialize_buffer_job_incomplete(monkeypatch, requests_mock): + rc = RestClient('http://test.iceprod') + m = Materialize(rc) + config = { + 'tasks': [ + { + 'name': 'foo', + }, + { + 'name': 'bar', + }, + { + 'name': 'baz', + } + ], + 'options': {} + } + m.get_config = AsyncMock(return_value=config) + prio_mock = MagicMock() + monkeypatch.setattr('iceprod.materialization.materialize.Priority', prio_mock) + prio_mock.return_value.get_task_prio = AsyncMock(return_value=1.) + + requests_mock.get('http://test.iceprod/datasets/did123', json={ + 'dataset_id': 'did123', + 'dataset': 123, + 'status': 'processing', + 'tasks_per_job': 3, + 'jobs_submitted': 10, + 'tasks_submitted': 10, + 'debug': False, + }) + + requests_mock.get('http://test.iceprod/datasets/did123/job_counts/status', json={ + 'processing': 1, + }) + requests_mock.get('http://test.iceprod/datasets/did123/task_counts/status', json={ + 'idle': 1, + }) + requests_mock.get('http://test.iceprod/datasets/did123/jobs', json={ + 'j123': { + 'job_id': 'j123', + 'job_index': 0, + }, + }) + requests_mock.get('http://test.iceprod/datasets/did123/tasks', json={ + 't0': { + 'task_id': 't0', + 'job_id': 'j123', + 'task_index': 0, + }, + }) + + requests_mock.post('http://test.iceprod/jobs', json={'result': 'j2'}) + requests_mock.post('http://test.iceprod/tasks', json={'result': 't0'}) + requests_mock.patch('http://test.iceprod/tasks/t0', json={}) + + ret = await m.run_once(only_dataset='did123', num=0) + + calls = [h for h in requests_mock.request_history if h.url == 'http://test.iceprod/tasks'] + assert len(calls) == 2 + + +async def test_materialize_buffer_job_no_tasks(monkeypatch, requests_mock): + rc = RestClient('http://test.iceprod') + m = Materialize(rc) + config = { + 'tasks': [ + { + 'name': 'foo', + }, + { + 'name': 'bar', + }, + { + 'name': 'baz', + } + ], + 'options': {} + } + m.get_config = AsyncMock(return_value=config) + prio_mock = MagicMock() + monkeypatch.setattr('iceprod.materialization.materialize.Priority', prio_mock) + prio_mock.return_value.get_task_prio = AsyncMock(return_value=1.) + + requests_mock.get('http://test.iceprod/datasets/did123', json={ + 'dataset_id': 'did123', + 'dataset': 123, + 'status': 'processing', + 'tasks_per_job': 3, + 'jobs_submitted': 10, + 'tasks_submitted': 10, + 'debug': False, + }) + + requests_mock.get('http://test.iceprod/datasets/did123/job_counts/status', json={ + 'processing': 1, + }) + requests_mock.get('http://test.iceprod/datasets/did123/task_counts/status', json={ + }) + requests_mock.get('http://test.iceprod/datasets/did123/jobs', json={ + 'j123': { + 'job_id': 'j123', + 'job_index': 0, + }, + }) + requests_mock.get('http://test.iceprod/datasets/did123/tasks', json={ + }) + + requests_mock.post('http://test.iceprod/jobs', json={'result': 'j2'}) + requests_mock.post('http://test.iceprod/tasks', json={'result': 't0'}) + requests_mock.patch('http://test.iceprod/tasks/t0', json={}) + + ret = await m.run_once(only_dataset='did123', num=0) + + calls = [h for h in requests_mock.request_history if h.url == 'http://test.iceprod/tasks'] + assert len(calls) == 3