Skip to content

Commit

Permalink
fix materialization case where job gets created and service is killed (
Browse files Browse the repository at this point in the history
…#401)

* fix materialization case where job gets created but tasks are all missing

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
dsschult and github-actions authored Oct 31, 2024
1 parent 8c77f0a commit cfd92e2
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 15 deletions.
7 changes: 3 additions & 4 deletions iceprod/materialization/materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,22 @@ async def run_once(self, only_dataset=None, set_status=None, num=10000, dryrun=F
# buffer for this dataset
logger.warning('checking dataset %s', dataset_id)
jobs = await self.rest_client.request('GET', '/datasets/{}/jobs'.format(dataset_id), {'keys': 'job_id|job_index'})
job_index_id = {j['job_index']: j['job_id'] for j in jobs.values()}

# check that last job was buffered correctly
job_index = max(jobs[i]['job_index'] for i in jobs)+1 if jobs else 0
num_tasks = sum(tasks.values())
logger.info('job_index: %d', job_index)
logger.info('num_tasks: %d', num_tasks)
logger.info('tasks_per_job: %d', dataset['tasks_per_job'])
while num_tasks % dataset['tasks_per_job'] != 0 and job_index > 0:
while job_index * dataset['tasks_per_job'] != num_tasks and job_index > 0:
job_index -= 1
logger.info('a job must have failed to buffer, so check in reverse order. job_index=%d, num_tasks=%d', job_index, num_tasks)
job_tasks = await self.rest_client.request('GET', f'/datasets/{dataset_id}/tasks',
{'job_index': job_index, 'keys': 'task_id|job_id|task_index'})
if len(job_tasks) != dataset['tasks_per_job']:
logger.info('fixing buffer of job %d for dataset %s', job_index, dataset_id)
ret = await self.rest_client.request('GET', f'/datasets/{dataset_id}/jobs',
{'job_index': job_index, 'keys': 'job_id'})
job_id = list(ret.keys())[0]
job_id = job_index_id[job_index]
logger.info(' fixing job_id %s, num existing tasks: %d', job_id, len(job_tasks))
tasks_buffered = await self.buffer_job(dataset, job_index, job_id=job_id,
tasks=list(job_tasks.values()),
Expand Down
6 changes: 3 additions & 3 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ attrs==24.2.0
# referencing
babel==2.16.0
# via sphinx
boto3==1.35.49
boto3==1.35.53
# via iceprod (setup.py)
botocore==1.35.49
botocore==1.35.53
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -123,7 +123,7 @@ requests-futures==1.0.1
# wipac-rest-tools
requests-toolbelt==1.0.0
# via iceprod (setup.py)
rpds-py==0.20.0
rpds-py==0.20.1
# via
# jsonschema
# referencing
Expand Down
10 changes: 5 additions & 5 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ attrs==24.2.0
# referencing
beautifulsoup4==4.12.3
# via iceprod (setup.py)
boto3==1.35.49
boto3==1.35.53
# via
# iceprod (setup.py)
# moto
botocore==1.35.49
botocore==1.35.53
# via
# boto3
# moto
Expand Down Expand Up @@ -131,7 +131,7 @@ pytest==8.0.2
# pytest-mock
pytest-asyncio==0.23.8
# via iceprod (setup.py)
pytest-cov==5.0.0
pytest-cov==6.0.0
# via iceprod (setup.py)
pytest-mock==3.14.0
# via iceprod (setup.py)
Expand Down Expand Up @@ -173,7 +173,7 @@ responses==0.25.3
# via moto
respx==0.21.1
# via iceprod (setup.py)
rpds-py==0.20.0
rpds-py==0.20.1
# via
# jsonschema
# referencing
Expand Down Expand Up @@ -216,7 +216,7 @@ urllib3==2.2.3
# responses
# types-requests
# wipac-rest-tools
werkzeug==3.0.6
werkzeug==3.1.0
# via moto
wipac-dev-tools==1.13.0
# via
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ attrs==24.2.0
# via
# jsonschema
# referencing
boto3==1.35.49
boto3==1.35.53
# via iceprod (setup.py)
botocore==1.35.49
botocore==1.35.53
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -106,7 +106,7 @@ requests-futures==1.0.1
# wipac-rest-tools
requests-toolbelt==1.0.0
# via iceprod (setup.py)
rpds-py==0.20.0
rpds-py==0.20.1
# via
# jsonschema
# referencing
Expand Down
119 changes: 119 additions & 0 deletions tests/materialization/test_materialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from rest_tools.client import RestClient

import iceprod.materialization.materialize
from iceprod.materialization.materialize import Materialize

def test_materialize_init():
Expand Down Expand Up @@ -66,3 +67,121 @@ async def test_materialize_buffer_job_no_depends(requests_mock):
ret = await m.buffer_job(dataset, 0)

assert ret == 1


async def test_materialize_buffer_job_incomplete(monkeypatch, requests_mock):
rc = RestClient('http://test.iceprod')
m = Materialize(rc)
config = {
'tasks': [
{
'name': 'foo',
},
{
'name': 'bar',
},
{
'name': 'baz',
}
],
'options': {}
}
m.get_config = AsyncMock(return_value=config)
prio_mock = MagicMock()
monkeypatch.setattr('iceprod.materialization.materialize.Priority', prio_mock)
prio_mock.return_value.get_task_prio = AsyncMock(return_value=1.)

requests_mock.get('http://test.iceprod/datasets/did123', json={
'dataset_id': 'did123',
'dataset': 123,
'status': 'processing',
'tasks_per_job': 3,
'jobs_submitted': 10,
'tasks_submitted': 10,
'debug': False,
})

requests_mock.get('http://test.iceprod/datasets/did123/job_counts/status', json={
'processing': 1,
})
requests_mock.get('http://test.iceprod/datasets/did123/task_counts/status', json={
'idle': 1,
})
requests_mock.get('http://test.iceprod/datasets/did123/jobs', json={
'j123': {
'job_id': 'j123',
'job_index': 0,
},
})
requests_mock.get('http://test.iceprod/datasets/did123/tasks', json={
't0': {
'task_id': 't0',
'job_id': 'j123',
'task_index': 0,
},
})

requests_mock.post('http://test.iceprod/jobs', json={'result': 'j2'})
requests_mock.post('http://test.iceprod/tasks', json={'result': 't0'})
requests_mock.patch('http://test.iceprod/tasks/t0', json={})

ret = await m.run_once(only_dataset='did123', num=0)

calls = [h for h in requests_mock.request_history if h.url == 'http://test.iceprod/tasks']
assert len(calls) == 2


async def test_materialize_buffer_job_no_tasks(monkeypatch, requests_mock):
rc = RestClient('http://test.iceprod')
m = Materialize(rc)
config = {
'tasks': [
{
'name': 'foo',
},
{
'name': 'bar',
},
{
'name': 'baz',
}
],
'options': {}
}
m.get_config = AsyncMock(return_value=config)
prio_mock = MagicMock()
monkeypatch.setattr('iceprod.materialization.materialize.Priority', prio_mock)
prio_mock.return_value.get_task_prio = AsyncMock(return_value=1.)

requests_mock.get('http://test.iceprod/datasets/did123', json={
'dataset_id': 'did123',
'dataset': 123,
'status': 'processing',
'tasks_per_job': 3,
'jobs_submitted': 10,
'tasks_submitted': 10,
'debug': False,
})

requests_mock.get('http://test.iceprod/datasets/did123/job_counts/status', json={
'processing': 1,
})
requests_mock.get('http://test.iceprod/datasets/did123/task_counts/status', json={
})
requests_mock.get('http://test.iceprod/datasets/did123/jobs', json={
'j123': {
'job_id': 'j123',
'job_index': 0,
},
})
requests_mock.get('http://test.iceprod/datasets/did123/tasks', json={
})

requests_mock.post('http://test.iceprod/jobs', json={'result': 'j2'})
requests_mock.post('http://test.iceprod/tasks', json={'result': 't0'})
requests_mock.patch('http://test.iceprod/tasks/t0', json={})

ret = await m.run_once(only_dataset='did123', num=0)

calls = [h for h in requests_mock.request_history if h.url == 'http://test.iceprod/tasks']
assert len(calls) == 3

0 comments on commit cfd92e2

Please sign in to comment.