diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index 775a45c..d8cea2b 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -365,23 +365,45 @@ def enqueue_job(self, job): job.meta['repeat'] = int(repeat) - 1 queue = self.get_queue_for_job(job) - queue.enqueue_job(job) - self.connection.zrem(self.scheduled_jobs_key, job.id) - if interval: - # If this is a repeat job and counter has reached 0, don't repeat - if repeat is not None: - if job.meta['repeat'] == 0: - return - self.connection.zadd(self.scheduled_jobs_key, - {job.id: to_unix(datetime.utcnow()) + int(interval)}) - elif cron_string: - # If this is a repeat job and counter has reached 0, don't repeat - if repeat is not None: - if job.meta['repeat'] == 0: + with self.connection.pipeline() as pipe: + while True: + try: + pipe.watch(self.scheduled_jobs_key) + queue.enqueue_job(job) + pipe.multi() + pipe.zrem(self.scheduled_jobs_key, job.id) + + if not (interval or cron_string): + pipe.execute() + return + + # If this is a repeat job and counter has reached 0, don't repeat + if repeat is not None: + if job.meta['repeat'] == 0: + pipe.execute() + return + + next_scheduled_time = ( + to_unix(datetime.utcnow()) + int(interval) + if interval else + to_unix( + get_next_scheduled_time( + cron_string, + use_local_timezone=use_local_timezone + ) + ) + ) + + pipe.zadd(self.scheduled_jobs_key, {job.id: next_scheduled_time}) + pipe.execute() return - self.connection.zadd(self.scheduled_jobs_key, - {job.id: to_unix(get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone))}) + except WatchError: + self.log.info( + "{} changed between the time of watching " + "and the pipeline's execution.".format(self.scheduled_jobs_key) + ) + continue def enqueue_jobs(self): """