Skip to content

Commit

Permalink
add a dataset age factor to priority (#406)
Browse files Browse the repository at this point in the history
* add a dataset age factor to priority

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* smaller adjustment to handle small date ranges better

* fix test

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
dsschult and github-actions authored Nov 22, 2024
1 parent fe2cf4b commit 2ca9271
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 55 deletions.
11 changes: 10 additions & 1 deletion iceprod/server/priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
"""
import argparse
import asyncio
from datetime import datetime, UTC
import logging

from iceprod.client_auth import add_auth_to_argparse, create_rest_client
from iceprod.roles_groups import GROUP_PRIORITIES
from iceprod.server.util import str2datetime

logger = logging.getLogger('priority')

Expand All @@ -20,7 +22,7 @@ def __init__(self, rest_client):

async def _populate_dataset_cache(self):
args = {
'keys': 'dataset_id|priority|jobs_submitted|tasks_submitted|group|username',
'keys': 'dataset_id|priority|jobs_submitted|tasks_submitted|group|username|start_date',
'status': 'processing',
}
self.dataset_cache = await self.rest_client.request('GET', '/datasets', args)
Expand Down Expand Up @@ -135,6 +137,7 @@ async def get_dataset_prio(self, dataset_id):
return 0.

dataset_prio = dataset['priority']
dataset_age = (datetime.now(UTC) - str2datetime(dataset['start_date'])).total_seconds() / 86400.
user = dataset['username']
group = dataset['group']
logger.debug(f'{dataset_id} dataset_prio: {dataset_prio}')
Expand Down Expand Up @@ -178,6 +181,12 @@ async def get_dataset_prio(self, dataset_id):
priority *= group_prio
logger.info(f'{dataset_id} after group adjustment: {priority}')

# bias towards older datasets
logger.debug(f'{dataset_id} age: {dataset_age}')
factor = (dataset_age*24)**.05 - .17 if dataset_age > 1 else 1.
priority *= factor
logger.info(f'{dataset_id} after age adjustment: {priority}')

# bias against large datasets
factor = (10000. / num_dataset_jobs)**.05 if num_dataset_jobs > 0 else 1.
if factor > 1:
Expand Down
6 changes: 3 additions & 3 deletions requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ attrs==24.2.0
# referencing
babel==2.16.0
# via sphinx
boto3==1.35.66
boto3==1.35.68
# via iceprod (setup.py)
botocore==1.35.66
botocore==1.35.68
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -157,7 +157,7 @@ statsd==4.0.1
# via iceprod (setup.py)
tomli==2.1.0
# via sphinx
tornado==6.4.1
tornado==6.4.2
# via
# iceprod (setup.py)
# wipac-rest-tools
Expand Down
6 changes: 3 additions & 3 deletions requirements-tests.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ attrs==24.2.0
# referencing
beautifulsoup4==4.12.3
# via iceprod (setup.py)
boto3==1.35.66
boto3==1.35.68
# via
# iceprod (setup.py)
# moto
botocore==1.35.66
botocore==1.35.68
# via
# boto3
# moto
Expand Down Expand Up @@ -195,7 +195,7 @@ tomli==2.1.0
# via
# coverage
# pytest
tornado==6.4.1
tornado==6.4.2
# via
# iceprod (setup.py)
# wipac-rest-tools
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ attrs==24.2.0
# via
# jsonschema
# referencing
boto3==1.35.66
boto3==1.35.68
# via iceprod (setup.py)
botocore==1.35.66
botocore==1.35.68
# via
# boto3
# s3transfer
Expand Down Expand Up @@ -122,7 +122,7 @@ sniffio==1.3.1
# httpx
statsd==4.0.1
# via iceprod (setup.py)
tornado==6.4.1
tornado==6.4.2
# via
# iceprod (setup.py)
# wipac-rest-tools
Expand Down
2 changes: 1 addition & 1 deletion tests/scheduled_tasks/update_task_priority_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def client(method, url, args=None):
elif url == '/tasks':
return {'tasks':[{'task_id':'bar','dataset_id':'foo'}]}
elif url == '/datasets':
return {'foo':{'dataset_id':'foo','username':'a','group':'g','tasks_submitted':200,'jobs_submitted':100,'priority':1.}}
return {'foo':{'dataset_id':'foo','start_date':'2024-01-01T01:00:00','username':'a','group':'g','tasks_submitted':200,'jobs_submitted':100,'priority':1.}}
elif url == '/datasets/foo/tasks':
return {'bar':{'task_id':'bar','dataset_id':'foo','task_index':0,'job_index':12}}
elif url == '/datasets/foo/tasks/bar':
Expand Down
98 changes: 54 additions & 44 deletions tests/server/priority_test.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
"""
Test script for priority
"""
import datetime
import time
import logging
from unittest.mock import MagicMock

from __future__ import absolute_import, division, print_function
import pytest
import iceprod.server
from iceprod.server import priority

from tests.util import unittest_reporter, glob_tests

import logging
logger = logging.getLogger('priority_test')

import os, sys, time
import shutil
import tempfile
import random
import unittest

from tornado.testing import AsyncTestCase
@pytest.fixture(autouse=True)
def set_time(monkeypatch):
now = datetime.datetime(2024, 1, 1, 1, 10, 0, 0, datetime.UTC)
mock = MagicMock()
mock.now = MagicMock(return_value=now)
monkeypatch.setattr(iceprod.server.priority, 'datetime', mock)
tnow = time.mktime(now.utctimetuple())
tmock = MagicMock(return_value=tnow)
monkeypatch.setattr(time, 'time', tmock)
yield now

import iceprod.server
from iceprod.server import priority

def prio_setup():
datasets = {
Expand All @@ -27,6 +33,7 @@ def prio_setup():
'jobs_submitted': 10,
'tasks_submitted': 20,
'priority': 1,
'start_date': '2024-01-01T01:00:00',
'group': 'users',
'username': 'u_a',
'tasks': {
Expand All @@ -39,6 +46,7 @@ def prio_setup():
'jobs_submitted': 10,
'tasks_submitted': 20,
'priority': 1,
'start_date': '2024-01-01T01:00:00',
'group': 'users',
'username': 'u_b',
'tasks': {
Expand All @@ -51,13 +59,27 @@ def prio_setup():
'jobs_submitted': 1000,
'tasks_submitted': 20000,
'priority': 1,
'start_date': '2024-01-01T01:00:00',
'group': 'simprod',
'username': 'u_a',
'tasks': {
't4': {'task_id': 't4', 'task_index': 0, 'job_index': 400},
't5': {'task_id': 't5', 'task_index': 1, 'job_index': 400},
}
},
'd3': {
'dataset_id': 'd3',
'jobs_submitted': 1000,
'tasks_submitted': 20000,
'priority': 1,
'start_date': '2023-10-01T01:00:00',
'group': 'simprod',
'username': 'u_a',
'tasks': {
't6': {'task_id': 't6', 'task_index': 0, 'job_index': 400},
't7': {'task_id': 't7', 'task_index': 1, 'job_index': 400},
}
},
}
users = {
'u_a': {'username': 'u_a', 'priority': 1.},
Expand All @@ -69,42 +91,30 @@ def prio_setup():
p.user_cache = users
return p

class priority_test(AsyncTestCase):
def setUp(self):
super(priority_test,self).setUp()
self.test_dir = tempfile.mkdtemp(dir=os.getcwd())
def cleanup():
shutil.rmtree(self.test_dir)
self.addCleanup(cleanup)

@unittest_reporter
async def test_10_get_dataset_prio(self):
"""Test get_dataset_prio"""
p = prio_setup()
prio1 = await p.get_dataset_prio('d0')
prio2 = await p.get_dataset_prio('d1')
self.assertLess(prio2, prio1)
async def test_10_get_dataset_prio():
"""Test get_dataset_prio"""
p = prio_setup()
prio1 = await p.get_dataset_prio('d0')
prio2 = await p.get_dataset_prio('d1')
assert prio2 < prio1

prio3 = await p.get_dataset_prio('d2')
self.assertLess(prio3, prio1)
prio3 = await p.get_dataset_prio('d2')
assert prio3 < prio1

@unittest_reporter
async def test_20_get_task_prio(self):
"""Test get_task_prio"""
p = prio_setup()
prio1 = await p.get_task_prio('d0', 't0')
prio2 = await p.get_task_prio('d1', 't2')
# equal at 1.0 because of boost
self.assertEqual(prio2, prio1)
prio4 = await p.get_dataset_prio('d3')
assert prio4 > prio3

prio3 = await p.get_task_prio('d2', 't4')
prio4 = await p.get_task_prio('d2', 't5')
self.assertLess(prio3, prio1) #
self.assertLess(prio3, prio4) # ordering of tasks in a job

async def test_20_get_task_prio():
"""Test get_task_prio"""
p = prio_setup()
prio1 = await p.get_task_prio('d0', 't0')
prio2 = await p.get_task_prio('d1', 't2')
# equal at 1.0 because of boost
assert prio2 == prio1

def load_tests(loader, tests, pattern):
suite = unittest.TestSuite()
alltests = glob_tests(loader.getTestCaseNames(priority_test))
suite.addTests(loader.loadTestsFromNames(alltests,priority_test))
return suite
prio3 = await p.get_task_prio('d2', 't4')
prio4 = await p.get_task_prio('d2', 't5')
assert prio3 < prio1
assert prio3 < prio4 # ordering of tasks in a job

0 comments on commit 2ca9271

Please sign in to comment.