-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
63 lines (49 loc) · 2.11 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from db import Task, db
from datetime import datetime, timedelta
from crontab import CronTab
def create_task(id, name, execution_time, cron_schedule):
with db.atomic():
recurring = 'yes' if cron_schedule else 'no'
if cron_schedule:
execution_time = calculate_next_execution_time(cron_schedule)
return Task.create(id=id, name=name, execution_time=execution_time, cron_schedule=cron_schedule, recurring=recurring)
def read_all_tasks():
with db.atomic():
return Task.select()
def get_task_by_id(task_id):
with db.atomic():
task = Task.get_by_id(task_id)
return {
'id': task.id,
'name': task.name,
'status': task.status,
'recurring': 'yes' if task.recurring else 'no',
'cron_schedule': task.cron_schedule,
'execution_time': task.execution_time.strftime('%Y-%m-%d %H:%M:%S')
}
def update_task(task_id, name, execution_time, cron_schedule):
with db.atomic():
task = Task.get_by_id(task_id)
if task.recurring == 'yes' and execution_time is not None:
raise ValueError("Cannot change a recurring task to non-recurring")
if task.recurring == 'no' and cron_schedule is not None:
raise ValueError("Cannot change a non-recurring task to recurring or vice versa.")
if cron_schedule is not None:
task.cron_schedule = cron_schedule
task.execution_time = calculate_next_execution_time(cron_schedule)
task.recurring = 'yes'
if name is not None:
task.name = name
if execution_time is not None:
task.execution_time = datetime.strptime(execution_time, '%Y-%m-%d %H:%M:%S')
task.cron_schedule = None
task.recurring = 'no'
task.save()
def delete_task(task_id):
with db.atomic():
Task.get_by_id(task_id).delete_instance()
def calculate_next_execution_time(cron_schedule):
now = datetime.now()
cron = CronTab(cron_schedule)
next_execution = cron.next(default_utc=False)
return now + timedelta(seconds=next_execution)