Skip to content

Commit

Permalink
Add RRule support
Browse files Browse the repository at this point in the history
  • Loading branch information
PierrickBrun committed Nov 26, 2024
1 parent f7d5787 commit 328d56c
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 10 deletions.
49 changes: 40 additions & 9 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from redis import WatchError

from .utils import from_unix, to_unix, get_next_scheduled_time, rationalize_until
from .utils import from_unix, to_unix, get_next_scheduled_time, get_next_rrule_scheduled_time, rationalize_until

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -298,6 +298,36 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None,
{job.id: to_unix(scheduled_time)})
return job


def rrule(self, rrule_string, func, args=None, kwargs=None, repeat=None,
queue_name=None, result_ttl=-1, ttl=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False,
depends_on=None, on_success=None, on_failure=None, at_front: bool = False):
"""
Schedule a recurring job via RRule
"""
scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone)

job = self._create_job(func, args=args, kwargs=kwargs, commit=False,
result_ttl=result_ttl, ttl=ttl, id=id, queue_name=queue_name,
description=description, timeout=timeout, meta=meta, depends_on=depends_on,
on_success=on_success, on_failure=on_failure)

job.meta['rrule_string'] = rrule_string
job.meta['use_local_timezone'] = use_local_timezone

if repeat is not None:
job.meta['repeat'] = int(repeat)

if at_front:
job.enqueue_at_front = True

job.save()

self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(scheduled_time)})
return job


def cancel(self, job):
"""
Pulls a job from the scheduler queue. This function accepts either a
Expand Down Expand Up @@ -415,6 +445,7 @@ def enqueue_job(self, job):
interval = job.meta.get('interval', None)
repeat = job.meta.get('repeat', None)
cron_string = job.meta.get('cron_string', None)
rrule_string = job.meta.get('rrule_string', None)
use_local_timezone = job.meta.get('use_local_timezone', None)

# If job is a repeated job, decrement counter
Expand All @@ -425,21 +456,21 @@ def enqueue_job(self, job):
queue.enqueue_job(job, at_front=bool(job.enqueue_at_front))
self.connection.zrem(self.scheduled_jobs_key, job.id)

# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
if interval:
# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(datetime.utcnow()) + int(interval)})
elif cron_string:
# If this is a repeat job and counter has reached 0, don't repeat
if repeat is not None:
if job.meta['repeat'] == 0:
return
next_scheduled_time = get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone)
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(next_scheduled_time)})
elif rrule_string:
next_scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone)
self.connection.zadd(self.scheduled_jobs_key,
{job.id: to_unix(next_scheduled_time)})

def enqueue_jobs(self):
"""
Expand Down
12 changes: 12 additions & 0 deletions rq_scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import calendar
import crontab
import dateutil.tz
import dateutil.rrule
import dateutil.tz

from datetime import datetime, timedelta
import logging
Expand Down Expand Up @@ -30,6 +32,16 @@ def get_next_scheduled_time(cron_string, use_local_timezone=False):
return next_time.astimezone(tz)


def get_next_rrule_scheduled_time(rrule_string, use_local_timezone=False):
"""Calculate the next scheduled time by creating a rrule object
with a rrule string"""
now = datetime.now()
rrule = dateutil.rrule.rrulestr(rrule_string)
next_time = rrule.after(now)
tz = dateutil.tz.tzlocal() if use_local_timezone else dateutil.tz.UTC
return next_time.astimezone(tz)


def setup_loghandlers(level='INFO'):
logger = logging.getLogger('rq_scheduler.scheduler')
if not logger.handlers:
Expand Down
170 changes: 169 additions & 1 deletion tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from rq_scheduler import Scheduler
from rq_scheduler.utils import from_unix
from rq_scheduler.utils import get_next_scheduled_time
from rq_scheduler.utils import get_next_scheduled_time, get_next_rrule_scheduled_time
from rq_scheduler.utils import to_unix
from tests import RQTestCase

Expand Down Expand Up @@ -874,3 +874,171 @@ def test_create_job_with_queue_class_name(self):
job = self.scheduler._create_job(say_hello)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertFalse(job_from_queue.meta.get("queue_class_name"))

def test_rrule_persisted_correctly(self):
"""
Ensure that rrule attribute gets correctly saved in Redis.
"""
# create a job that runs one minute past each whole hour
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job_from_queue.meta['rrule_string'], "RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0")

# get the scheduled_time and convert it to a datetime object
unix_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id)
datetime_time = from_unix(unix_time)

# check that minute=1, seconds=0, and is within an hour
assert datetime_time.minute == 1
assert datetime_time.second == 0
assert datetime_time - datetime.utcnow() <= timedelta(hours=1), f"{datetime_time - datetime.utcnow()} is greater than 1 hour"

def test_rrule_persisted_correctly_with_local_timezone(self):
"""
Ensure that rrule attribute gets correctly saved in Redis when using local TZ.
"""
# create a job that runs each day at 15:00
job = self.scheduler.rrule("RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=0;BYSECOND=0", say_hello, use_local_timezone=True)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job_from_queue.meta['rrule_string'], "RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=0;BYSECOND=0")

# get the scheduled_time and convert it to a datetime object
unix_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id)
datetime_time = from_unix(unix_time)

expected_datetime_in_local_tz = datetime.now(tzlocal()).replace(hour=15,minute=0,second=0,microsecond=0)
assert datetime_time.time() == expected_datetime_in_local_tz.astimezone(UTC).time()

def test_rrule_rescheduled_correctly_with_local_timezone(self):
# Create a job that runs each day at 15:01
job = self.scheduler.rrule("RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=1;BYSECOND=0", say_hello, use_local_timezone=True)

# Change this job to run each day at 15:02
job.meta['rrule_string'] = "RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=2;BYSECOND=0"

# reenqueue the job
self.scheduler.enqueue_job(job)

# get the scheduled_time and convert it to a datetime object
unix_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id)
datetime_time = from_unix(unix_time)

expected_datetime_in_local_tz = datetime.now(tzlocal()).replace(hour=15,minute=2,second=0,microsecond=0)
assert datetime_time.time() == expected_datetime_in_local_tz.astimezone(UTC).time()

def test_rrule_schedules_correctly(self):
# Create a job with a rrulejob_string
now = datetime.now().replace(minute=0, hour=0, second=0, microsecond=0)
with freezegun.freeze_time(now):
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=5;BYSECOND=0", say_hello)

with mock.patch.object(self.scheduler, 'enqueue_job', wraps=self.scheduler.enqueue_job) as enqueue_job, \
freezegun.freeze_time(now + timedelta(minutes=5)):
self.assertEqual(1, self.scheduler.count())
self.scheduler.enqueue_jobs()
self.assertEqual(1, enqueue_job.call_count)

(job, next_scheduled_time), = self.scheduler.get_jobs(with_times=True)
expected_scheduled_time = (now + timedelta(hours=1, minutes=5)).astimezone(UTC)
self.assertEqual(to_unix(expected_scheduled_time), to_unix(next_scheduled_time), f"{next_scheduled_time} should be {expected_scheduled_time}")

def test_rrule_sets_timeout(self):
"""
Ensure that a job scheduled via rrule can be created with
a custom timeout.
"""
timeout = 13
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, timeout=timeout)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job_from_queue.timeout, timeout)

def test_rrule_sets_id(self):
"""
Ensure that a job scheduled via rrule can be created with
a custom id
"""
job_id = "hello-job-id"
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, id=job_id)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job_id, job_from_queue.id)

def test_rrule_sets_default_result_ttl(self):
"""
Ensure that a job scheduled via rrule gets proper default
result_ttl (-1) periodic tasks.
"""
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(-1, job_from_queue.result_ttl)

def test_rrule_sets_description(self):
"""
Ensure that a job scheduled via rrule can be created with
a custom description
"""
description = 'test description'
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, description=description)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(description, job_from_queue.description)

def test_rrule_sets_default_result_ttl_to_minus_1(self):
"""
Ensure that a job scheduled via rrule sets the default result_ttl to -1
"""
result_ttl = -1
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(result_ttl, job_from_queue.result_ttl)

def test_rrule_sets_provided_result_ttl(self):
"""
Ensure that a job scheduled via rrule can be created with
a custom result_ttl
"""
result_ttl = 123
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, result_ttl=result_ttl)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(result_ttl, job_from_queue.result_ttl)

def test_rrule_sets_default_ttl_to_none(self):
"""
Ensure that a job scheduled via rrule sets the default result_ttl to -1
"""
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertIsNone(job_from_queue.ttl)

def test_rrule_sets_provided_ttl(self):
"""
Ensure that a job scheduled via rrule can be created with
a custom result_ttl
"""
ttl = 123
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, ttl=ttl)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(ttl, job_from_queue.ttl)

def test_job_with_rrule_get_rescheduled(self):
# Create a job with a rrule_string
job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello)

# current unix_time
old_next_scheduled_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id)

# change rrule_string
job.meta['rrule_string'] = "RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=2;BYSECOND=0"

# enqueue the job
self.scheduler.enqueue_job(job)

self.assertIn(job.id,
tl(self.testconn.zrange(self.scheduler.scheduled_jobs_key, 0, 1)))

# check that next scheduled time has changed
self.assertNotEqual(old_next_scheduled_time,
self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id))

# check that new next scheduled time is set correctly
expected_next_scheduled_time = to_unix(get_next_rrule_scheduled_time("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=2;BYSECOND=0"))
self.assertEqual(self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id),
expected_next_scheduled_time)

0 comments on commit 328d56c

Please sign in to comment.