-
Notifications
You must be signed in to change notification settings - Fork 16
/
_crontab.py
512 lines (439 loc) · 16.6 KB
/
_crontab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
'''
crontab.py
Written July 15, 2011 by Josiah Carlson
Copyright 2011-2021 Josiah Carlson
Released under the GNU LGPL v2.1 and v3
available:
http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html
http://www.gnu.org/licenses/lgpl.html
Other licenses may be available upon request.
'''
from collections import namedtuple
from datetime import datetime, timedelta
import random
import sys
import warnings
_ranges = [
(0, 59),
(0, 59),
(0, 23),
(1, 31),
(1, 12),
(0, 6),
(1970, 2099),
]
ENTRIES = len(_ranges)
SECOND_OFFSET, MINUTE_OFFSET, HOUR_OFFSET, DAY_OFFSET, MONTH_OFFSET, WEEK_OFFSET, YEAR_OFFSET = range(ENTRIES)
_attribute = [
'second',
'minute',
'hour',
'day',
'month',
'isoweekday',
'year'
]
_alternate = {
MONTH_OFFSET: {'jan': 1, 'feb': 2, 'mar': 3, 'apr': 4, 'may': 5, 'jun': 6,
'jul': 7, 'aug': 8, 'sep': 9, 'oct': 10, 'nov':11, 'dec':12},
WEEK_OFFSET: {'sun': 0, 'mon': 1, 'tue': 2, 'wed': 3, 'thu': 4, 'fri': 5,
'sat': 6},
}
_aliases = {
'@yearly': '0 0 1 1 *',
'@annually': '0 0 1 1 *',
'@monthly': '0 0 1 * *',
'@weekly': '0 0 * * 0',
'@daily': '0 0 * * *',
'@hourly': '0 * * * *',
}
WARNING_CHANGE_MESSAGE = '''\
Version 0.22.0+ of crontab will use datetime.utcnow() and
datetime.utcfromtimestamp() instead of datetime.now() and
datetime.fromtimestamp() as was previous. This had been a bug, which will be
remedied. If you would like to keep the *old* behavior:
`ct.next(..., default_utc=False)` . If you want to use the new behavior *now*:
`ct.next(..., default_utc=True)`. If you pass a datetime object with a tzinfo
attribute that is not None, timezones will *just work* to the best of their
ability. There are tests...'''
if sys.version_info >= (3, 0):
_number_types = (int, float)
xrange = range
else:
_number_types = (int, long, float)
SECOND = timedelta(seconds=1)
MINUTE = timedelta(minutes=1)
HOUR = timedelta(hours=1)
DAY = timedelta(days=1)
WEEK = timedelta(days=7)
MONTH = timedelta(days=28)
YEAR = timedelta(days=365)
WARN_CHANGE = object()
# find the next scheduled time
def _end_of_month(dt):
ndt = dt + DAY
while dt.month == ndt.month:
ndt += DAY
return ndt.replace(day=1) - DAY
def _month_incr(dt, m):
odt = dt
dt += MONTH
while dt.month == odt.month:
dt += DAY
# get to the first of next month, let the backtracking handle it
dt = dt.replace(day=1)
return dt - odt
def _year_incr(dt, m):
# simple leapyear stuff works for 1970-2099 :)
mod = dt.year % 4
if mod == 0 and (dt.month, dt.day) < (2, 29):
return YEAR + DAY
if mod == 3 and (dt.month, dt.day) > (2, 29):
return YEAR + DAY
return YEAR
_increments = [
lambda *a: SECOND,
lambda *a: MINUTE,
lambda *a: HOUR,
lambda *a: DAY,
_month_incr,
lambda *a: DAY,
_year_incr,
lambda dt,x: dt.replace(second=0),
lambda dt,x: dt.replace(minute=0),
lambda dt,x: dt.replace(hour=0),
lambda dt,x: dt.replace(day=1) if x > DAY else dt,
lambda dt,x: dt.replace(month=1) if x > DAY else dt,
lambda dt,x: dt,
]
# find the previously scheduled time
def _day_decr(dt, m):
if m.day.input != 'l':
return -DAY
odt = dt
ndt = dt = dt - DAY
while dt.month == ndt.month:
dt -= DAY
return dt - odt
def _month_decr(dt, m):
odt = dt
# get to the last day of last month, let the backtracking handle it
dt = dt.replace(day=1) - DAY
return dt - odt
def _year_decr(dt, m):
# simple leapyear stuff works for 1970-2099 :)
mod = dt.year % 4
if mod == 0 and (dt.month, dt.day) > (2, 29):
return -(YEAR + DAY)
if mod == 1 and (dt.month, dt.day) < (2, 29):
return -(YEAR + DAY)
return -YEAR
def _day_decr_reset(dt, x):
if x >= -DAY:
return dt
cur = dt.month
while dt.month == cur:
dt += DAY
return dt - DAY
_decrements = [
lambda *a: -SECOND,
lambda *a: -MINUTE,
lambda *a: -HOUR,
_day_decr,
_month_decr,
lambda *a: -DAY,
_year_decr,
lambda dt,x: dt.replace(second=59),
lambda dt,x: dt.replace(minute=59),
lambda dt,x: dt.replace(hour=23),
_day_decr_reset,
lambda dt,x: dt.replace(month=12) if x < -DAY else dt,
lambda dt,x: dt,
_year_decr,
]
Matcher = namedtuple('Matcher', 'second, minute, hour, day, month, weekday, year')
def _assert(condition, message, *args):
if not condition:
raise ValueError(message%args)
class _Matcher(object):
__slots__ = 'allowed', 'end', 'any', 'input', 'which', 'split', 'loop'
def __init__(self, which, entry, loop=False):
"""
input:
`which` - index into the increment / validation lookup tables
`entry` - the value of the column
`loop` - do we loop when we validate / construct counts
(turning 55-5,1 -> 0,1,2,3,4,5,55,56,57,58,59 in a "minutes" column)
"""
_assert(0 <= which <= YEAR_OFFSET,
"improper number of cron entries specified")
self.input = entry.lower()
self.split = self.input.split(',')
self.which = which
self.allowed = set()
self.end = None
self.any = '*' in self.split or '?' in self.split
self.loop = loop
for it in self.split:
al, en = self._parse_crontab(which, it)
if al is not None:
self.allowed.update(al)
self.end = en
_assert(self.end is not None,
"improper item specification: %r", entry.lower()
)
self.allowed = frozenset(self.allowed)
def __call__(self, v, dt):
for i, x in enumerate(self.split):
if x == 'l':
if v == _end_of_month(dt).day:
return True
elif x.startswith('l'):
# We have to do this in here, otherwise we can end up, for
# example, accepting *any* Friday instead of the *last* Friday.
if dt.month == (dt + WEEK).month:
continue
x = x[1:]
if x.isdigit():
x = int(x) if x != '7' else 0
if v == x:
return True
continue
start, end = map(int, x.partition('-')[::2])
allowed = set(range(start, end+1))
if 7 in allowed:
allowed.add(0)
if v in allowed:
return True
return self.any or v in self.allowed
def __lt__(self, other):
if self.any:
return self.end < other
return all(item < other for item in self.allowed)
def __gt__(self, other):
if self.any:
return _ranges[self.which][0] > other
return all(item > other for item in self.allowed)
def __eq__(self, other):
if self.any:
return other.any
return self.allowed == other.allowed
def __hash__(self):
return hash((self.any, self.allowed))
def _parse_crontab(self, which, entry):
'''
This parses a single crontab field and returns the data necessary for
this matcher to accept the proper values.
See the README for information about what is accepted.
'''
# this handles day of week/month abbreviations
def _fix(it):
if which in _alternate and not it.isdigit():
if it in _alternate[which]:
return _alternate[which][it]
_assert(it.isdigit(),
"invalid range specifier: %r (%r)", it, entry)
it = int(it, 10)
_assert(_start <= it <= _end_limit,
"item value %r out of range [%r, %r]",
it, _start, _end_limit)
return it
# this handles individual items/ranges
def _parse_piece(it):
if '-' in it:
start, end = map(_fix, it.split('-'))
# Allow "sat-sun"
if which in (DAY_OFFSET, WEEK_OFFSET) and end == 0:
end = 7
elif it == '*':
start = _start
end = _end
else:
start = _fix(it)
end = _end
if increment is None:
return set([start])
_assert(_start <= start <= _end_limit,
"%s range start value %r out of range [%r, %r]",
_attribute[which], start, _start, _end_limit)
_assert(_start <= end <= _end_limit,
"%s range end value %r out of range [%r, %r]",
_attribute[which], end, _start, _end_limit)
if not self.loop:
_assert(start <= end,
"%s range start value %r > end value %r",
_attribute[which], start, end)
if increment and not self.loop:
next_value = start + increment
_assert(next_value <= _end_limit,
"first next value %r is out of range [%r, %r]",
next_value, start, _end_limit)
if start <= end:
return set(range(start, end+1, increment or 1))
right = set(range(end, _end_limit + 1, increment or 1))
first = max(right, default=end + (increment or 1)) % _end_limit
return set(range(first, start+1, increment or 1)) | right
_start, _end = _ranges[which]
_end_limit = _end
# wildcards
if entry in ('*', '?'):
if entry == '?':
_assert(which in (DAY_OFFSET, WEEK_OFFSET),
"cannot use '?' in the %r field", _attribute[which])
return None, _end
# last day of the month
if entry == 'l':
_assert(which == DAY_OFFSET,
"you can only specify a bare 'L' in the 'day' field")
return None, _end
# for the last 'friday' of the month, for example
elif entry.startswith('l'):
_assert(which == WEEK_OFFSET,
"you can only specify a leading 'L' in the 'weekday' field")
es, _, ee = entry[1:].partition('-')
_assert((entry[1:].isdigit() and 0 <= int(es) <= 7) or
(_ and es.isdigit() and ee.isdigit() and 0 <= int(es) <= 7 and 0 <= int(ee) <= 7),
"last <day> specifier must include a day number or range in the 'weekday' field, you entered %r", entry)
return None, _end
# allow Sunday to be specified as weekday 7
if which == WEEK_OFFSET:
_end_limit = 7
increment = None
# increments
if '/' in entry:
entry, increment = entry.split('/')
increment = int(increment, 10)
_assert(increment > 0,
"you can only use positive increment values, you provided %r",
increment)
_assert(increment <= _end_limit,
"increment value must be less than %r, you provided %r",
_end_limit, increment)
# handle singles and ranges
good = _parse_piece(entry)
# change Sunday to weekday 0
if which == WEEK_OFFSET and 7 in good:
good.discard(7)
good.add(0)
return good, _end
_gv = lambda: str(random.randrange(60))
class CronTab(object):
__slots__ = 'matchers', 'rs'
def __init__(self, crontab, loop=False, random_seconds=False):
"""
inputs:
`crontab` - crontab specification of "[S=0] Mi H D Mo DOW [Y=*]"
`loop` - do we loop when we validate / construct counts
(turning 55-5,1 -> 0,1,2,3,4,5,55,56,57,58,59 in a "minutes" column)
`random_seconds` - randomly select starting second for tasks
"""
self.rs = random_seconds
self.matchers = self._make_matchers(crontab, loop, random_seconds)
def __eq__(self, other):
if not isinstance(other, CronTab):
return False
match_last = self.matchers[1:] == other.matchers[1:]
return match_last and ((self.rs and other.rs) or (not self.rs and
not other.rs and self.matchers[0] == other.matchers[0]))
def _make_matchers(self, crontab, loop, random_seconds):
'''
This constructs the full matcher struct.
'''
crontab = _aliases.get(crontab, crontab)
ct = crontab.split()
if len(ct) == 5:
ct.insert(0, _gv() if random_seconds else '0')
ct.append('*')
elif len(ct) == 6:
ct.insert(0, _gv() if random_seconds else '0')
_assert(len(ct) == 7,
"improper number of cron entries specified; got %i need 5 to 7"%(len(ct,)))
matchers = [_Matcher(which, entry, loop) for which, entry in enumerate(ct)]
return Matcher(*matchers)
def _test_match(self, index, dt):
'''
This tests the given field for whether it matches with the current
datetime object passed.
'''
at = _attribute[index]
attr = getattr(dt, at)
if index == WEEK_OFFSET:
attr = attr() % 7
return self.matchers[index](attr, dt)
def next(self, now=None, increments=_increments, delta=True, default_utc=WARN_CHANGE, return_datetime=False):
'''
How long to wait in seconds before this crontab entry can next be
executed.
'''
if default_utc is WARN_CHANGE and (isinstance(now, _number_types) or (now and not now.tzinfo) or now is None):
warnings.warn(WARNING_CHANGE_MESSAGE, FutureWarning, 2)
default_utc = False
now = now or (datetime.utcnow() if default_utc and default_utc is not WARN_CHANGE else datetime.now())
if isinstance(now, _number_types):
now = datetime.utcfromtimestamp(now) if default_utc else datetime.fromtimestamp(now)
# handle timezones if the datetime object has a timezone and get a
# reasonable future/past start time
onow, now = now, now.replace(tzinfo=None)
tz = onow.tzinfo
future = now.replace(microsecond=0) + increments[0]()
if future < now:
# we are going backwards...
_test = lambda: future.year < self.matchers.year
if now.microsecond:
future = now.replace(microsecond=0)
else:
# we are going forwards
_test = lambda: self.matchers.year < future.year
# Start from the year and work our way down. Any time we increment a
# higher-magnitude value, we reset all lower-magnitude values. This
# gets us performance without sacrificing correctness. Still more
# complicated than a brute-force approach, but also orders of
# magnitude faster in basically all cases.
to_test = ENTRIES - 1
while to_test >= 0:
if not self._test_match(to_test, future):
inc = increments[to_test](future, self.matchers)
future += inc
for i in xrange(0, to_test):
future = increments[ENTRIES+i](future, inc)
try:
if _test():
return None
except:
print(future, type(future), type(inc))
raise
to_test = ENTRIES-1
continue
to_test -= 1
# verify the match
match = [self._test_match(i, future) for i in xrange(ENTRIES)]
_assert(all(match),
"\nYou have discovered a bug with crontab, please notify the\n" \
"author with the following information:\n" \
"crontab: %r\n" \
"now: %r", ' '.join(m.input for m in self.matchers), now)
if return_datetime:
return future.replace(tzinfo=tz)
if not delta:
onow = now = datetime(1970, 1, 1)
delay = future - now
if tz:
delay += _fix_none(onow.utcoffset())
if hasattr(tz, 'localize'):
delay -= _fix_none(tz.localize(future).utcoffset())
else:
delay -= _fix_none(future.replace(tzinfo=tz).utcoffset())
return delay.days * 86400 + delay.seconds + delay.microseconds / 1000000.
def previous(self, now=None, delta=True, default_utc=WARN_CHANGE):
return self.next(now, _decrements, delta, default_utc)
def test(self, entry):
if isinstance(entry, _number_types):
entry = datetime.utcfromtimestamp(entry)
for index in xrange(ENTRIES):
if not self._test_match(index, entry):
return False
return True
def _fix_none(d, _=timedelta(0)):
if d is None:
return _
return d