diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index be6abde..76f215b 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -68,7 +68,7 @@ def stop(signum, frame): signal.signal(signal.SIGTERM, stop) def _create_job(self, func, args=None, kwargs=None, commit=True, - result_ttl=None, queue_name=None): + result_ttl=None, timeout=None, queue_name=None): """ Creates an RQ job and saves it to Redis. """ @@ -77,12 +77,23 @@ def _create_job(self, func, args=None, kwargs=None, commit=True, if kwargs is None: kwargs = {} job = Job.create(func, args=args, connection=self.connection, - kwargs=kwargs, result_ttl=result_ttl) + kwargs=kwargs, result_ttl=result_ttl, timeout=timeout) job.origin = queue_name or self.queue_name if commit: job.save() return job + def enqueue_call_in(self, time_delta, func, args=None, kwargs=None, timeout=None): + job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout) + self.connection._zadd(self.scheduled_jobs_key, to_unix(datetime.utcnow() + time_delta), job.id) + return job + + def enqueue_call_at(self, scheduled_time, func, args=None, kwargs=None, timeout=None): + job = self._create_job(func, args=args, kwargs=kwargs, timeout=timeout) + self.connection._zadd(self.scheduled_jobs_key, to_unix(scheduled_time), job.id) + return job + + def enqueue_at(self, scheduled_time, func, *args, **kwargs): """ Pushes a job to the scheduler queue. The scheduled queue is a Redis sorted @@ -100,11 +111,10 @@ def enqueue_at(self, scheduled_time, func, *args, **kwargs): scheduler = Scheduler(queue_name='default', connection=redis) scheduler.enqueue_at(datetime(2020, 1, 1), func, 'argument', keyword='argument') """ - job = self._create_job(func, args=args, kwargs=kwargs) - self.connection._zadd(self.scheduled_jobs_key, - to_unix(scheduled_time), - job.id) - return job + return self.enqueue_call_at(scheduled_time, + func=func, + args=args, + kwargs=kwargs) def enqueue_in(self, time_delta, func, *args, **kwargs): """ @@ -112,11 +122,11 @@ def enqueue_in(self, time_delta, func, *args, **kwargs): The job's scheduled execution time will be calculated by adding the timedelta to datetime.utcnow(). """ - job = self._create_job(func, args=args, kwargs=kwargs) - self.connection._zadd(self.scheduled_jobs_key, - to_unix(datetime.utcnow() + time_delta), - job.id) - return job + return self.enqueue_call_in(time_delta, + func=func, + args=args, + kwargs=kwargs) + def enqueue_periodic(self, scheduled_time, interval, repeat, func, *args, **kwargs):