-
Notifications
You must be signed in to change notification settings - Fork 10
/
scheduler.py
43 lines (36 loc) · 1.39 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import uasyncio
class Scheduler:
class Schedule:
def __init__(self, name: str, duration: int, callback: uasyncio.Task, initial_delay: int):
self.name = name
self.duration = duration
self.callback = callback
self.initial_delay = initial_delay
self.cancelled = False
def __init__(self):
self.started = False
self.schedules = []
self.display_task = None
self.event_loop = uasyncio.get_event_loop()
def start(self):
self.started = True
for schedule in self.schedules:
self.event_loop.create_task(self._start_task(schedule))
async def _start_task(self, task: Schedule):
if task.initial_delay != 0:
await uasyncio.sleep_ms(task.initial_delay)
while True:
if task.cancelled:
break
await task.callback()
await uasyncio.sleep_ms(task.duration)
def schedule(self, name, duration, callback, initial_delay=0):
task = self.Schedule(name, duration, callback, initial_delay)
self.schedules.append(task)
if self.started:
self.event_loop.create_task(self._start_task(task))
def remove(self, name):
for schedule in self.schedules:
if schedule.name == name:
schedule.cancelled = True
self.schedules.remove(schedule)