Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added timeout support to enqueue_at and enqueue_in #75

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def stop(signum, frame):
signal.signal(signal.SIGTERM, stop)

def _create_job(self, func, args=None, kwargs=None, commit=True,
result_ttl=None, queue_name=None):
result_ttl=None, timeout=None, queue_name=None):
"""
Creates an RQ job and saves it to Redis.
"""
Expand All @@ -77,12 +77,23 @@ def _create_job(self, func, args=None, kwargs=None, commit=True,
if kwargs is None:
kwargs = {}
job = Job.create(func, args=args, connection=self.connection,
kwargs=kwargs, result_ttl=result_ttl)
kwargs=kwargs, result_ttl=result_ttl, timeout=timeout)
job.origin = queue_name or self.queue_name
if commit:
job.save()
return job

def enqueue_call_in(self, time_delta, func, args=None, kwargs=None, timeout=None):
job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout)
self.connection._zadd(self.scheduled_jobs_key, to_unix(datetime.utcnow() + time_delta), job.id)
return job

def enqueue_call_at(self, scheduled_time, func, args=None, kwargs=None, timeout=None):
job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout)
self.connection._zadd(self.scheduled_jobs_key, to_unix(scheduled_time), job.id)
return job


def enqueue_at(self, scheduled_time, func, *args, **kwargs):
"""
Pushes a job to the scheduler queue. The scheduled queue is a Redis sorted
Expand All @@ -100,23 +111,22 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs):
scheduler = Scheduler(queue_name='default', connection=redis)
scheduler.enqueue_at(datetime(2020, 1, 1), func, 'argument', keyword='argument')
"""
job = self._create_job(func, args=args, kwargs=kwargs)
self.connection._zadd(self.scheduled_jobs_key,
to_unix(scheduled_time),
job.id)
return job
return self.enqueue_call_at(scheduled_time,
func=func,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why the job creation logic is moved to enqueue_call_in?

args=args,
kwargs=kwargs)

def enqueue_in(self, time_delta, func, *args, **kwargs):
"""
Similar to ``enqueue_at``, but accepts a timedelta instead of datetime object.
The job's scheduled execution time will be calculated by adding the timedelta
to datetime.utcnow().
"""
job = self._create_job(func, args=args, kwargs=kwargs)
self.connection._zadd(self.scheduled_jobs_key,
to_unix(datetime.utcnow() + time_delta),
job.id)
return job
return self.enqueue_call_in(time_delta,
func=func,
args=args,
kwargs=kwargs)


def enqueue_periodic(self, scheduled_time, interval, repeat, func,
*args, **kwargs):
Expand Down