diff --git a/iceprod/core/config.py b/iceprod/core/config.py index 38999f2d..7d73ecaa 100644 --- a/iceprod/core/config.py +++ b/iceprod/core/config.py @@ -1,3 +1,4 @@ +import asyncio from dataclasses import dataclass import importlib.resources import json @@ -15,6 +16,7 @@ @dataclass class Dataset: + """IceProd Dataset config and basic attributes""" dataset_id: str dataset_num: int group: str @@ -23,7 +25,7 @@ class Dataset: @classmethod async def load_from_api(cls, dataset_id: str, rest_client: RestClient) -> Self: - dataset = await rest_client.request('GET', f'/dataset/{dataset_id}') + dataset = await rest_client.request('GET', f'/datasets/{dataset_id}') config = await rest_client.request('GET', f'/config/{dataset_id}') return cls(dataset_id, dataset['dataset'], dataset['group'], dataset['username'], config) @@ -50,4 +52,52 @@ def _fill_list(user, schema): def validate(self): jsonschema.validate(self.config, CONFIG_SCHEMA) - \ No newline at end of file + +@dataclass +class Job: + """IceProd Job instance""" + dataset: Dataset + job_id: str + job_index: int + status: str + + +@dataclass +class Task: + """ + IceProd Task instance, ready for running. + + Old task stats are not loaded by default, but can be loaded on request. + """ + dataset: Dataset + job: Job + task_id: str + task_index: int + name: str + depends: list + requirements: dict + status: str + site: str + stats: dict + + @classmethod + async def load_from_api(cls, dataset_id: str, task_id: str, rest_client: RestClient) -> Self: + dataset, config, task = await asyncio.gather( + rest_client.request('GET', f'/datasets/{dataset_id}'), + rest_client.request('GET', f'/config/{dataset_id}'), + rest_client.request('GET', f'/datasets/{dataset_id}/tasks/{task_id}') + ) + d = Dataset(dataset_id, dataset['dataset'], dataset['group'], dataset['username'], config) + job = await rest_client.request('GET', f'/datasets/{dataset_id}/jobs/{task["job_id"]}') + j = Job(d, task['job_id'], job['job_index'], job['status']) + return cls(d, j, task['task_id'], task['task_index'], task['name'], task['depends'], task['requirements'], task['status'], task['site'], {}) + + async def load_stats_from_api(self, rest_client: RestClient): + ret = await rest_client.request('GET', f'/datasets/{self.dataset.dataset_id}/tasks/{self.task_id}/task_stats', {'last': 'true'}) + if not ret: + raise Exception('No stats to load!') + # get first (only) result in ret + self.stats = next(iter(ret.values())) + + def get_task_config(self): + return self.dataset.config['tasks'][self.task_index] diff --git a/iceprod/core/data/dataset.schema.json b/iceprod/core/data/dataset.schema.json index f4a4d12b..9d7f663b 100644 --- a/iceprod/core/data/dataset.schema.json +++ b/iceprod/core/data/dataset.schema.json @@ -7,8 +7,8 @@ "properties": { "version": { "description": "Schema version", - "type": "integer", - "default": 4 + "type": "number", + "default": 3.1 }, "description": { "description": "Dataset description", diff --git a/tests/core/config_test.py b/tests/core/config_test.py index 01ba56b0..47208f21 100644 --- a/tests/core/config_test.py +++ b/tests/core/config_test.py @@ -1,12 +1,21 @@ -from iceprod.core.config import Dataset import pytest from rest_tools.client import RestClient +from iceprod.core.config import Dataset, Job, Task +from iceprod.server.util import nowstr + def test_dataset_dataclasses(): with pytest.raises(Exception): Dataset() + d = Dataset('did123', 123, 'grp', 'usr', {}) + assert d.dataset_id == 'did123' + assert d.dataset_num == 123 + assert d.group == 'grp' + assert d.user == 'usr' + assert d.config == {} + async def test_load_config(requests_mock): dataset_id = 'did123' @@ -16,7 +25,7 @@ async def test_load_config(requests_mock): 'group': 'g123', 'username': 'u123', } - requests_mock.get(f'http://test.iceprod/dataset/{dataset_id}', json=dataset_data) + requests_mock.get(f'http://test.iceprod/datasets/{dataset_id}', json=dataset_data) config_data = { 'my': 'config' } @@ -30,7 +39,7 @@ async def test_load_config(requests_mock): assert d.dataset_num == dataset_data['dataset'] assert d.group == dataset_data['group'] assert d.user == dataset_data['username'] - assert config_data == d.config + assert d.config == config_data async def test_defaults(): @@ -43,7 +52,7 @@ async def test_defaults(): config_data = {} d = Dataset(dataset_data['dataset_id'], dataset_data['dataset'], dataset_data['group'], dataset_data['username'], config_data) d.fill_defaults() - assert d.config['version'] == 4 + assert d.config['version'] == 3.1 async def test_validate_error(): @@ -79,3 +88,109 @@ async def test_validate_valid(): d = Dataset(dataset_data['dataset_id'], dataset_data['dataset'], dataset_data['group'], dataset_data['username'], config_data) d.fill_defaults() d.validate() + + +def test_job_dataclasses(): + with pytest.raises(Exception): + Job() + + d = Dataset('did123', 123, 'grp', 'usr', {}) + j = Job(d, 'j123', 1, 'processing') + + assert j.dataset == d + assert j.job_id == 'j123' + assert j.job_index == 1 + assert j.status == 'processing' + + +def test_task_dataclasses(): + with pytest.raises(Exception): + Task() + + d = Dataset('did123', 123, 'grp', 'usr', {}) + j = Job(d, 'j123', 1, 'processing') + t = Task(d, j, 't123', 0, 'foo', [], {}, 'waiting', '', {}) + + assert t.dataset == d + assert t.job == j + assert t.task_id == 't123' + assert t.task_index == 0 + assert t.name == 'foo' + assert t.depends == [] + assert t.requirements == {} + assert t.status == 'waiting' + assert t.site == '' + assert t.stats == {} + + +def test_task_config(): + d = Dataset('did123', 123, 'grp', 'usr', {'tasks':[1,2,3]}) + j = Job(d, 'j123', 1, 'processing') + t = Task(d, j, 't123', 0, 'foo', [], {}, 'waiting', '', {}) + + assert t.get_task_config() == 1 + + +async def test_task_load_from_api(requests_mock): + dataset_id = 'did123' + dataset_data = { + 'dataset': 123, + 'dataset_id': dataset_id, + 'group': 'g123', + 'username': 'u123', + } + requests_mock.get(f'http://test.iceprod/datasets/{dataset_id}', json=dataset_data) + config_data = { + 'my': 'config' + } + requests_mock.get(f'http://test.iceprod/config/{dataset_id}', json=config_data) + job_data = { + 'dataset_id': dataset_id, + 'job_id': 'j123', + 'job_index': 1, + 'status': 'processing', + } + requests_mock.get(f'http://test.iceprod/datasets/{dataset_id}/jobs/{job_data["job_id"]}', json=job_data) + task_data = job_data | { + 'task_id': 't123', + 'task_index': 0, + 'name': 'foo', + 'depends': [], + 'requirements': {'cpu': 1}, + 'status': 'waiting', + 'status_changed': nowstr(), + 'failures': 1, + 'evictions': 0, + 'walltime': 0.0, + 'walltime_err': 0.15, + 'walltime_err_n': 1, + 'site': 'CHTC', + } + requests_mock.get(f'http://test.iceprod/datasets/{dataset_id}/tasks/{task_data["task_id"]}', json=task_data) + + r = RestClient('http://test.iceprod') + t = await Task.load_from_api(dataset_id, task_data['task_id'], r) + + assert t.dataset.dataset_id == dataset_id + assert t.job.job_id == job_data['job_id'] + assert t.task_id == task_data['task_id'] + + +async def test_task_load_stats(requests_mock): + d = Dataset('did123', 123, 'grp', 'usr', {'tasks':[1,2,3]}) + j = Job(d, 'j123', 1, 'processing') + t = Task(d, j, 't123', 0, 'foo', [], {}, 'waiting', '', {}) + + stat_data = { + 'task_stat_id': 'ts123', + 'foo': 'bar', + } + requests_mock.get( + f'http://test.iceprod/datasets/did123/tasks/t123/task_stats', + json={stat_data['task_stat_id']: stat_data}, + ) + + r = RestClient('http://test.iceprod') + await t.load_stats_from_api(r) + + assert t.stats == stat_data