Skip to content

Commit

Permalink
reset iceprod tasks if not present on the queue (#396)
Browse files Browse the repository at this point in the history
* reset iceprod tasks if not present on queue

* <bot> update requirements-docs.txt

* <bot> update requirements-tests.txt

* <bot> update requirements.txt

* get rid of old cruft

* fix tests

---------

Co-authored-by: github-actions <[email protected]>
  • Loading branch information
dsschult and github-actions authored Oct 21, 2024
1 parent ca1275d commit c3c95c8
Show file tree
Hide file tree
Showing 19 changed files with 203 additions and 479 deletions.
20 changes: 11 additions & 9 deletions iceprod/rest/handlers/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from collections import defaultdict
import json
import uuid
import logging
import math
from collections import defaultdict
import re
import uuid

import pymongo
import tornado.web
Expand Down Expand Up @@ -83,6 +84,7 @@ async def get(self):
Params (optional):
status: | separated list of task status to filter by
site: site to filter on
keys: | separated list of keys to return for each task
sort: | separated list of sort key=values, with values of 1 or -1
limit: number of tasks to return
Expand All @@ -92,13 +94,14 @@ async def get(self):
"""
filters = {}

status = self.get_argument('status', None)
if status:
if status := self.get_argument('status', None):
filters['status'] = {'$in': status.split('|')}

sort = self.get_argument('sort', None)
if site := self.get_argument('site', None):
filters['site'] = {'$regex': '^'+re.escape(site)}

mongo_sort = []
if sort:
if sort := self.get_argument('sort', None):
for s in sort.split('|'):
if '=' in s:
name, order = s.split('=', 1)
Expand All @@ -109,8 +112,7 @@ async def get(self):
else:
mongo_sort.append((s, pymongo.ASCENDING))

limit = self.get_argument('limit', 0)
if limit:
if limit := self.get_argument('limit', 0):
try:
limit = int(limit)
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions iceprod/scheduled_tasks/job_temp_cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import asyncio
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from functools import partial
import logging
import os
Expand Down Expand Up @@ -87,7 +87,7 @@ async def run(rest_client, temp_dir, list_dirs, rmtree, dataset=None, debug=Fals
debug (bool): debug flag to propagate exceptions
"""
suspend_time = timedelta(days=90)
now = datetime.utcnow()
now = datetime.now(UTC)

try:
# get all the job_indexes currently in tmp
Expand Down
4 changes: 2 additions & 2 deletions iceprod/scheduled_tasks/log_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import argparse
import asyncio
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
import logging

from iceprod.client_auth import add_auth_to_argparse, create_rest_client
Expand All @@ -26,7 +26,7 @@ async def run(rest_client, debug=False):
debug (bool): debug flag to propagate exceptions
"""
async def delete_logs(name, days):
time_limit = datetime.utcnow() - timedelta(days=days)
time_limit = datetime.now(UTC) - timedelta(days=days)
args = {
'to': datetime2str(time_limit),
'name': name,
Expand Down
7 changes: 4 additions & 3 deletions iceprod/scheduled_tasks/non_active_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import argparse
import asyncio
from datetime import datetime
from datetime import datetime, UTC
import logging

from iceprod.client_auth import add_auth_to_argparse, create_rest_client
Expand Down Expand Up @@ -64,6 +64,7 @@ async def delete_pilot(pilot_id):

awaitables = set()
reset_pilots = set()
now = datetime.now(UTC)
for dataset_id in dataset_ids:
tasks = dataset_tasks[dataset_id]
if 'processing' in tasks:
Expand All @@ -72,7 +73,7 @@ async def delete_pilot(pilot_id):
args = {'keys': 'status|status_changed'}
task = await rest_client.request('GET', f'/datasets/{dataset_id}/tasks/{task_id}', args)
# check status, and that we haven't just changed status
if task['status'] == 'processing' and (datetime.utcnow()-str2datetime(task['status_changed'])).total_seconds() > 600:
if task['status'] == 'processing' and (now-str2datetime(task['status_changed'])).total_seconds() > 600:
logger.info('dataset %s reset task %s', dataset_id, task_id)
awaitables.add(reset(dataset_id,task_id))

Expand All @@ -82,7 +83,7 @@ async def delete_pilot(pilot_id):
args = {'keys': 'status|status_changed'}
task = await rest_client.request('GET', f'/datasets/{dataset_id}/tasks/{task_id}', args)
# check status, and that we haven't just changed status
if task['status'] in ('reset', 'waiting', 'failed', 'suspended') and (datetime.utcnow()-str2datetime(task['status_changed'])).total_seconds() > 600:
if task['status'] in ('reset', 'waiting', 'failed', 'suspended') and (now-str2datetime(task['status_changed'])).total_seconds() > 600:
reset_pilots.add(task_id)

for p in pilots.values():
Expand Down
57 changes: 0 additions & 57 deletions iceprod/scheduled_tasks/pilot_cleanup.py

This file was deleted.

19 changes: 19 additions & 0 deletions iceprod/server/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,25 @@ def _get_resources(task):
resource.update(values)
return resource

async def get_tasks_on_queue(self) -> list:
"""
Get all tasks that are "assigned" to this queue.
Returns:
list of tasks
"""
args = {
'status': 'queued|processing',
'site': self.site,
'keys': 'dataset_id|task_id|instance_id|status|status_changed',
}
try:
tasks = await self.rest_client.request('GET', '/tasks', args)
return tasks['tasks']
except requests.exceptions.HTTPError:
logger.warning('cannot get tasks on queue', exc_info=True)
return []

# Task Actions #

async def _upload_log(self, task: GridTask, name: str, data: str):
Expand Down
46 changes: 37 additions & 9 deletions iceprod/server/plugins/condor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import asyncio
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, UTC
from datetime import datetime, timedelta, UTC
import enum
import importlib
import logging
Expand All @@ -25,6 +25,7 @@
from iceprod.core.exe import WriteToScript, Transfer
from iceprod.server.config import IceProdConfig
from iceprod.server import grid
from iceprod.server.util import str2datetime

logger = logging.getLogger('condor')

Expand Down Expand Up @@ -86,6 +87,7 @@ def from_condor_status(num):
'cgroup memory limit',
'local storage limit on worker node exceeded',
'execution time limit exceeded',
'exceeded max iceprod queue time',
]


Expand Down Expand Up @@ -807,7 +809,20 @@ async def check(self):
logger.info("job %s %s.%s removed from cross-check: %r", job_id, job.dataset_id, job.task_id, reason)
self.submitter.remove(job_id, reason=reason)

# check for history
await self.check_history()

await self.check_iceprod()

# check for old jobs and dirs
async for path in self.check_submit_dir():
for job_id, job in self.jobs.items():
if job.submit_dir == path:
self.submitter.remove(job_id, reason='exceeded max iceprod queue time')

logger.info('finished cross-check')

async def check_history(self):
"""Check condor_history"""
now = time.time()
hist_jobs = self.submitter.get_history(since=self.last_event_timestamp)
self.last_event_timestamp = now
Expand Down Expand Up @@ -857,13 +872,26 @@ async def check(self):
# finish job
await self.finish(job_id, success=success, resources=resources, stats=stats, reason=reason)

# check for old jobs and dirs
async for path in self.check_submit_dir():
for job_id, job in self.jobs.items():
if job.submit_dir == path:
self.submitter.remove(job_id, reason='exceeded max queue time')

logger.info('finished cross-check')
async def check_iceprod(self):
"""
Sync with iceprod server status.
"""
fut = self.get_tasks_on_queue()
queue_tasks = {j.task_id: j for j in self.jobs.values()}
server_tasks = await fut
now = datetime.now(UTC)
for task in server_tasks:
if task['task_id'] not in queue_tasks:
# ignore anything too recent
if str2datetime(task['status_changed']) >= now - timedelta(minutes=1):
continue
logger.info(f'task {task["dataset_id"]}.{task["task_id"]} in iceprod but not in queue')
job = CondorJob(
dataset_id=task['dataset_id'],
task_id=task['task_id'],
instance_id=task['instance_id'],
)
await self.task_reset(job, reason='task missing from HTCondor queue')

async def check_submit_dir(self):
"""
Expand Down
6 changes: 3 additions & 3 deletions iceprod/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def __init__(self, config_params=None, outfile=None, errfile=None):
logger.error('IceProd Server - version %s', version_string)

async def rotate_logs(self):
current_date = datetime.utcnow()
current_date = datetime.now(UTC)
while self.outfile and self.errfile:
if current_date.day != datetime.utcnow().day:
if current_date.day != datetime.now(UTC).day:
# rotate files
current_date = datetime.utcnow()
current_date = datetime.now(UTC)
if self.outfile:
roll_files(sys.stdout, self.outfile)
if self.errfile:
Expand Down
Loading

0 comments on commit c3c95c8

Please sign in to comment.