Skip to content

Commit

Permalink
first attempt at a multi-level fair queue for sending tasks to pilots…
Browse files Browse the repository at this point in the history
…. for #238
  • Loading branch information
dsschult committed Jul 17, 2017
1 parent 9d3a86d commit 0e2384a
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 41 deletions.
10 changes: 10 additions & 0 deletions iceprod/server/data/etc/db_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@
],
"task_lookup": [
["task_id", "str"],
["queue", "str"],
["insert_time", "float", "time.time() value"],
["req_cpu", "int"],
["req_gpu", "int"],
["req_memory", "float"],
Expand Down Expand Up @@ -264,6 +266,14 @@
["master_update_history_last", "str"]
]
},
"indices": {
"job":["dataset_id", "status,dataset_id"],
"task": ["task_rel_id"],
"task_rel": ["dataset_id"],
"task_lookup": ["queue"],
"task_log": ["task_id"],
"search": ["dataset_id", "task_status", "job_id"]
},
"archive_tables": [
"site", "node", "dataset", "dataset_notes",
"dataset_stat", "job", "job_stat", "task", "task_stat",
Expand Down
18 changes: 12 additions & 6 deletions iceprod/server/dbmethods/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from iceprod.server import GlobalID
from iceprod.server.dbmethods import _Methods_Base,datetime2str,str2datetime,nowstr
from iceprod.server import task_queue

logger = logging.getLogger('dbmethods.queue')

Expand Down Expand Up @@ -1037,21 +1038,26 @@ def queue_add_task_lookup(self, tasks):
Args:
tasks (dict): dict of {task_id: resources}
"""
now = time.time()
keys = tasks.values()[0]
sql = 'replace into task_lookup (task_id,'
sql = 'replace into task_lookup (task_id,queue,insert_time,'
sql += ','.join('req_'+k for k in keys)
sql += ') values (?,'
sql += ') values (?,?,?,'
sql += ','.join('?' for k in keys)+')'
bindings = [(task_id,)+tuple(tasks[task_id][k] for k in keys) for task_id in tasks]
bindings = []
for t in tasks:
reqs = tasks[t]
task_queue.get_queue(reqs)
bindings.append((task_id,queue,now)+tuple(reqs[k] for k in keys))
yield self.parent.db.query([sql for _ in bindings], bindings)

@tornado.gen.coroutine
def queue_get_task_lookup(self):
"""
Get all the tasks in the lookup.
Get the resources for all tasks in the lookup.
Returns:
dict: tasks
dict: {task_id: resources}
"""
with (yield self.parent.db.acquire_lock('queue')):
# get tasks from lookup
Expand All @@ -1062,7 +1068,7 @@ def queue_get_task_lookup(self):
for row in ret:
row = self._list_to_dict('task_lookup',row)
tid = row.pop('task_id')
task_ids[tid] = {k.replace('req_',''):row[k] for k in row}
task_ids[tid] = {k.replace('req_',''):row[k] for k in row if k.startswith('req_')}

# verify that they are valid
tasks = {}
Expand Down
70 changes: 44 additions & 26 deletions iceprod/server/dbmethods/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from iceprod.core.jsonUtil import json_encode,json_decode
from iceprod.server import GlobalID
from iceprod.server import dataset_prio
from iceprod.server import task_queue

from iceprod.server.dbmethods import _Methods_Base,datetime2str,str2datetime,nowstr

Expand Down Expand Up @@ -56,20 +57,21 @@ def rpc_new_task(self, gridspec=None, **kwargs):
self.parent.statsd.incr('new_task.domain.'+args['domain'].replace('.','_'))
yield self.parent.service['node_update'](**args)

# check resource requirements
reqs = {}
for k in Resources.defaults:
default = Resources.defaults[k]
if isinstance(default, list):
default = len(default)
if k in args and args[k] != default:
reqs[k] = args[k]
else:
reqs[k] = default
logger.info('new task for resources: %r', reqs)

logger.info('acquiring queue lock')
with (yield self.parent.db.acquire_lock('queue')):
logger.info('queue lock granted')
# check resource requirements
reqs = {}
for k in Resources.defaults:
default = Resources.defaults[k]
if isinstance(default, list):
default = len(default)
if k in args and args[k] != default:
reqs[k] = args[k]
else:
reqs[k] = default
logger.info('new task for resources: %r', reqs)

# get all the tasks
sql = 'select * from task_lookup '
Expand All @@ -79,22 +81,24 @@ def rpc_new_task(self, gridspec=None, **kwargs):
else:
bindings = tuple()
ret = yield self.parent.db.query(sql, bindings)
tasks = []
tasks = defaultdict(list)
tasks['default'] = [] #make sure we have a default queue
task_ids = set()
for row in ret:
row = self._list_to_dict('task_lookup',row)
task_id = row.pop('task_id')
resources = {}
for k in row:
for k in Resources.defaults:
resources[k.replace('req_','')] = row[k]
tasks.append((task_id,resources))
tasks[row['queue']].append((task_id,row['time_in_queue'],resources))
task_ids.add(task_id)
if not tasks:
logger.info('no tasks found matching resources available')
raise tornado.gen.Return(None)

# check that these are still valid
sql = 'select * from search where task_id in (%s) and task_status = ?'
bindings = ('queued',)
task_ids = set(t[0] for t in tasks)
search = {}
for f in self._bulk_select(sql, task_ids, extra_bindings=bindings):
for row in (yield f):
Expand All @@ -108,21 +112,35 @@ def rpc_new_task(self, gridspec=None, **kwargs):
for f in self._bulk_select(sql, invalid_tasks):
yield f

# sort by largest resources
tasks.sort(key=lambda t:t[1]["memory"],
reverse=False) #DS: sort low to high to increase throughput
tasks.sort(key=lambda t:t[1]["gpu"], reverse=True)
# sort by priority
now = time.time()
for task_list in tasks:
task_list.sort(key=lambda t:task_queue.sched_prio(t[-1],now-t[1]))

# get only what can match
new_tasks = {}
for t in tasks:
for k in reqs:
if reqs[k] < t[1][k]:
while True:
match = False
queue = task_queue.get_queue(reqs)
logger.info('new task for queue: %s', queue)
if not tasks[queue]:
queue = 'default'
for i,t in enumerate(tasks[queue]):
task_id = t[0]
task_reqs = t[-1]
if any(reqs[k] < task_reqs[k] for k in reqs):
continue
else: # task passed
for k in reqs:
reqs[k] -= task_reqs[k]
new_tasks[task_id] = search[task_id]
new_tasks[task_id]['resources'] = task_reqs
match = True
# remove non-matching and matched task
tasks[queue] = tasks[queue][i+1:]
break
reqs[k] -= t[1][k]
else: # task passed
new_tasks[t[0]] = search[t[0]]
new_tasks[t[0]]['resources'] = t[1]
if not match:
break

if not new_tasks:
logger.info('error: no task to allocate')
Expand Down
27 changes: 18 additions & 9 deletions iceprod/server/modules/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class DBAPI(object):
# define tables
tables = read_db_conf('tables')
archive_tables = read_db_conf('archive_tables')
indices = read_db_conf('indices')
status_options = read_db_conf('status_options')

### basic functions ###
Expand Down Expand Up @@ -298,7 +299,6 @@ def _setup_tables(self):
for table_name in self.tables.keys():
sql_create = ' ('
sql_select = ' '
sql_index_create = []
sep = ''
cols = self.tables[table_name].keys()
for col in cols:
Expand All @@ -307,9 +307,12 @@ def _setup_tables(self):
if sep == '':
sql_create += ' PRIMARY KEY' # make first column the primary key
sep = ', '
elif False:#col.endswith('_id'):
sql_index_create.append('CREATE INDEX IF NOT EXISTS '+col+'_index ON '+table_name+' ('+col+')')
sql_create += ') WITHOUT ROWID'
sql_index_create = []
if table_name in self.indices:
for col in self.indices[table_name]:
name = col.replace(',','_')
sql_index_create.append('CREATE INDEX IF NOT EXISTS '+name+'_index ON '+table_name+' ('+col+')')
scols = set(cols)
with (conn if table_name != 'setting' else self._inc_id_connection) as c:
cur = c.cursor()
Expand All @@ -321,8 +324,6 @@ def _setup_tables(self):
# table does not exist
logger.info('create table '+table_name+sql_create)
cur.execute('create table '+table_name+sql_create)
for query in sql_index_create:
cur.execute(query)
elif curcols != scols:
# table not the same
logger.info('modify table '+table_name)
Expand All @@ -342,11 +343,12 @@ def _setup_tables(self):
sql = 'alter table '+table_name+'_backup rename to '+table_name
logger.info(sql)
cur.execute(sql)
for query in sql_index_create:
cur.execute(query)
else:
# table is good
logger.info('table '+table_name+' already exists')
# try for indices
for query in sql_index_create:
cur.execute(query)
except apsw.Error:
# something went wrong
logger.warn('setup tables error', exc_info=True)
Expand Down Expand Up @@ -499,9 +501,12 @@ def _setup_tables(self):
if sep == '':
sql_create += ' PRIMARY KEY' # make first column the primary key
sep = ', '
elif False:#col.endswith('_id'):
sql_create += ' INDEX'
sql_create += ') CHARACTER SET utf8 COLLATE utf8_general_ci'
sql_index_create = []
if table_name in self.indices:
for col in self.indices[table_name]:
name = col.replace(',','_')
sql_index_create.append('CREATE INDEX '+name+'_index ON '+table_name+' ('+col+')')
scols = set(cols)
try:
cur = conn.cursor()
Expand All @@ -516,6 +521,8 @@ def _setup_tables(self):
# table does not exist
logger.info('create table '+table_name)
cur.execute('create table '+table_name+sql_create)
for sql in sql_index_create:
cur.execute(sql)
elif curcols != scols:
# table not the same
logger.info('modify table '+table_name)
Expand All @@ -535,6 +542,8 @@ def _setup_tables(self):
sql = 'alter table '+table_name+'_backup rename to '+table_name
logger.info(sql)
cur.execute(sql)
for sql in sql_index_create:
cur.execute(sql)
else:
# table is good
logger.info('table '+table_name+' already exists')
Expand Down
64 changes: 64 additions & 0 deletions iceprod/server/task_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Task Queue
==========
Utilities for fairly queueing tasks based on resource usage.
"""

from __future__ import absolute_import, division, print_function

import logging


logger = logging.getLogger('task_queue')


def sched_prio(resources, time_in_queue=0):
"""
Weight the priority to prefer smaller resource requests.
As a task stays longer in the queue, increase priority.
Best priority is 0, worse increases to infinity.
Args:
resources (dict): resources dict
time_in_queue (float): the time this task has spent in the queue
Returns:
float: priority
"""
return max(0, sum(resources.values()) - time_in_queue/600)

def get_queue(resources):
"""
Determine which queue this task belongs in.
Args:
resources (dict): resources dict
Returns:
str: queue name
"""
if resources['gpu']:
return 'gpu'
elif resources['memory'] >= 4:
return 'memory'
else:
return 'default'

def get_queue_for_pilot(resources):
"""
Determine which queue this pilot resource belongs in.
Args:
resources (dict): resources dict
Returns:
str: queue name
"""
if resources['gpu']:
return 'gpu'
elif resources['memory'] >= 8:
return 'memory'
else:
return 'default'

0 comments on commit 0e2384a

Please sign in to comment.