diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index d1b83ea..f75bc5f 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -108,8 +108,21 @@ def remove_lock(self): self.connection.delete(key) self._lock_acquired = False self.log.debug('{}: Lock Removed'.format(self.key)) + + def _clear_job_on_exit(self): + """ + Clears the job on exit. + """ + try: + self.log.info('Clear RQ scheduler jobs...') + jobs = self.get_jobs() + for job in jobs: + self.cancel(job) + self.log.info('Clear RQ scheduler jobs finished') + except NoSuchJobError: + pass - def _install_signal_handlers(self): + def _install_signal_handlers(self, clear): """ Installs signal handlers for handling SIGINT and SIGTERM gracefully. @@ -121,6 +134,8 @@ def stop(signum, frame): and remove previously acquired lock and exit. """ self.log.info('Shutting down RQ scheduler...') + if clear: + self._clear_job_on_exit() self.register_death() self.remove_lock() raise SystemExit() @@ -449,14 +464,14 @@ def heartbeat(self): self.log.debug('{}: Sending a HeartBeat'.format(self.key)) self.connection.expire(self.key, int(self._interval) + 10) - def run(self, burst=False): + def run(self, burst=False, clear=False): """ Periodically check whether there's any job that should be put in the queue (score lower than current time). """ self.register_birth() - self._install_signal_handlers() + self._install_signal_handlers(clear) try: while True: diff --git a/rq_scheduler/scripts/rqscheduler.py b/rq_scheduler/scripts/rqscheduler.py index 1f00726..06380e4 100755 --- a/rq_scheduler/scripts/rqscheduler.py +++ b/rq_scheduler/scripts/rqscheduler.py @@ -29,6 +29,7 @@ def main(): parser.add_argument('--pid', help='A filename to use for the PID file.', metavar='FILE') parser.add_argument('-j', '--job-class', help='Custom RQ Job class') parser.add_argument('-q', '--queue-class', help='Custom RQ Queue class') + parser.add_argument('--clear', action='store_true', default=False, help='clear job before exit') args = parser.parse_args() @@ -58,7 +59,7 @@ def main(): interval=args.interval, job_class=args.job_class, queue_class=args.queue_class) - scheduler.run(burst=args.burst) + scheduler.run(burst=args.burst, clear=args.clear) if __name__ == '__main__': main()