diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 0fa26d0..3971d06 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -17,7 +17,7 @@ jobs: matrix: python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] redis-version: [4, 5, 6, 7] - redis-py-version: [3.5.0] + redis-py-version: [4] steps: - uses: actions/checkout@v3 @@ -28,7 +28,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Start Redis - uses: supercharge/redis-github-action@1.5.0 + uses: supercharge/redis-github-action@1.8.0 with: redis-version: ${{ matrix.redis-version }} diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index 8566303..c6b3861 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -9,7 +9,7 @@ from itertools import repeat from rq.exceptions import NoSuchJobError -from rq.job import Job +from rq.job import Job, JobStatus from rq.queue import Queue from rq.utils import backend_class, import_attribute @@ -30,8 +30,9 @@ class Scheduler(object): def __init__(self, queue_name='default', queue=None, interval=60, connection=None, job_class=None, queue_class=None, name=None): - from rq.connections import resolve_connection - self.connection = resolve_connection(connection) + if connection is None: + raise ValueError('`connection` argument is required') + self.connection = connection self._queue = queue if self._queue is None: self.queue_name = queue_name @@ -143,7 +144,8 @@ def _create_job(self, func, args=None, kwargs=None, commit=True, func, args=args, connection=self.connection, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id, description=description, timeout=timeout, meta=meta, - depends_on=depends_on,on_success=on_success,on_failure=on_failure, + depends_on=depends_on, on_success=on_success, on_failure=on_failure, + status=JobStatus.SCHEDULED ) if queue_name: job.origin = queue_name diff --git a/setup.py b/setup.py index e6d200e..34352c4 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ rqscheduler = rq_scheduler.scripts.rqscheduler:main ''', package_data={'': ['README.rst']}, - install_requires=['crontab>=0.23.0', 'rq>=0.13', 'python-dateutil', 'freezegun'], + install_requires=['crontab>=0.23.0', 'rq>=2', 'python-dateutil', 'freezegun'], classifiers=[ 'Development Status :: 4 - Beta', 'Environment :: Console', @@ -27,12 +27,11 @@ 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Topic :: Software Development :: Libraries :: Python Modules', ], ) diff --git a/tests/__init__.py b/tests/__init__.py index 8874f30..f130bd0 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,6 +1,5 @@ import unittest from redis import StrictRedis -from rq import push_connection, pop_connection def find_empty_redis_database(): @@ -27,12 +26,7 @@ class RQTestCase(unittest.TestCase): @classmethod def setUpClass(cls): - # Set up connection to Redis - testconn = find_empty_redis_database() - push_connection(testconn) - - # Store the connection (for sanity checking) - cls.testconn = testconn + cls.testconn = find_empty_redis_database() def setUp(self): # Flush beforewards (we like our hygiene) @@ -64,12 +58,4 @@ def assertIsInstance(self, obj, cls, msg=None): @classmethod def tearDownClass(cls): - - # Pop the connection to Redis - testconn = pop_connection() - assert testconn == cls.testconn, 'Wow, something really nasty ' \ - 'happened to the Redis connection stack. Check your setup.' - -# for python < 2.7, which doesn't have setUpClass -if not hasattr(unittest.TestCase, 'setUpClass'): - RQTestCase.setUpClass() + pass diff --git a/tests/fixtures.py b/tests/fixtures.py index 870b7c2..3d3d36a 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -15,7 +15,7 @@ from multiprocessing import Process from redis import Redis -from rq import Connection, get_current_job, get_current_connection, Queue +from rq import get_current_job, Queue from rq.decorators import job from rq.worker import HerokuWorker, Worker @@ -66,17 +66,6 @@ def some_calculation(x, y, z=1): return x * y / z -def rpush(key, value, append_worker_name=False, sleep=0): - """Push a value into a list in Redis. Useful for detecting the order in - which jobs were executed.""" - if sleep: - time.sleep(sleep) - if append_worker_name: - value += ':' + get_current_job().worker_name - redis = get_current_connection() - redis.rpush(key, value) - - def check_dependencies_are_met(): return get_current_job().dependencies_are_met() @@ -105,11 +94,6 @@ def launch_process_within_worker_and_store_pid(path, timeout): p.wait() -def access_self(): - assert get_current_connection() is not None - assert get_current_job() is not None - - def modify_self(meta): j = get_current_job() j.meta.update(meta) @@ -155,12 +139,6 @@ def static_method(): return u"I'm a static method" -with Connection(): - @job(queue='default') - def decorated_job(x, y): - return x + y - - def black_hole(job, *exc_info): # Don't fall through to default behaviour (moving to failed queue) return False @@ -240,15 +218,6 @@ def start_worker(queue_name, conn_kwargs, worker_name, burst): w = Worker([queue_name], name=worker_name, connection=Redis(**conn_kwargs)) w.work(burst=burst) -def start_worker_process(queue_name, connection=None, worker_name=None, burst=False): - """ - Use multiprocessing to start a new worker in a separate process. - """ - connection = connection or get_current_connection() - conn_kwargs = connection.connection_pool.connection_kwargs - p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst)) - p.start() - return p def burst_two_workers(queue, timeout=2, tries=5, pause=0.1): """ diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 535a625..25a18fa 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -5,7 +5,7 @@ from rq import Queue, Worker from rq.job import Job, JobStatus, UNEVALUATED -from rq.worker import SimpleWorker +from rq.worker import SimpleWorker, Worker class QueueCallbackTestCase(RQTestCase): @@ -51,7 +51,7 @@ class WorkerCallbackTestCase(RQTestCase): def test_success_callback(self): """Test success callback is executed only when job is successful""" queue = Queue(connection=self.testconn) - worker = SimpleWorker([queue]) + worker = SimpleWorker(['default', 'high'], connection=self.testconn) job = queue.enqueue(say_hello, on_success=save_result) @@ -71,7 +71,7 @@ def test_success_callback(self): def test_erroneous_success_callback(self): """Test exception handling when executing success callback""" queue = Queue(connection=self.testconn) - worker = Worker([queue]) + worker = Worker(['default', 'high'], connection=self.testconn) # If success_callback raises an error, job will is considered as failed job = queue.enqueue(say_hello, on_success=erroneous_callback) @@ -81,7 +81,7 @@ def test_erroneous_success_callback(self): def test_failure_callback(self): """Test failure callback is executed only when job a fails""" queue = Queue(connection=self.testconn) - worker = SimpleWorker([queue]) + worker = Worker(['default', 'high'], connection=self.testconn) job = queue.enqueue(div_by_zero, on_failure=save_exception) @@ -89,7 +89,6 @@ def test_failure_callback(self): worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FAILED) job.refresh() - print(job.exc_info) self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode()) @@ -103,7 +102,7 @@ class JobCallbackTestCase(RQTestCase): def test_job_creation_with_success_callback(self): """Ensure callbacks are created and persisted properly""" - job = Job.create(say_hello) + job = Job.create(say_hello, connection=self.testconn) self.assertIsNone(job._success_callback_name) # _success_callback starts with UNEVALUATED self.assertEqual(job._success_callback, UNEVALUATED) @@ -112,7 +111,7 @@ def test_job_creation_with_success_callback(self): self.assertEqual(job._success_callback, None) # job.success_callback is assigned properly - job = Job.create(say_hello, on_success=print) + job = Job.create(say_hello, on_success=print, connection=self.testconn) self.assertIsNotNone(job._success_callback_name) self.assertEqual(job.success_callback, print) job.save() @@ -122,7 +121,7 @@ def test_job_creation_with_success_callback(self): def test_job_creation_with_failure_callback(self): """Ensure failure callbacks are persisted properly""" - job = Job.create(say_hello) + job = Job.create(say_hello, connection=self.testconn) self.assertIsNone(job._failure_callback_name) # _failure_callback starts with UNEVALUATED self.assertEqual(job._failure_callback, UNEVALUATED) @@ -131,7 +130,7 @@ def test_job_creation_with_failure_callback(self): self.assertEqual(job._failure_callback, None) # job.failure_callback is assigned properly - job = Job.create(say_hello, on_failure=print) + job = Job.create(say_hello, on_failure=print, connection=self.testconn) self.assertIsNotNone(job._failure_callback_name) self.assertEqual(job.failure_callback, print) job.save() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 7fde8af..e0ee785 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -381,9 +381,9 @@ def test_enqueue_job(self): self.assertTrue(job.enqueued_at is not None) queue = scheduler.get_queue_for_job(job) self.assertIn(job, queue.jobs) - queue = Queue.from_queue_key('rq:queue:{0}'.format(queue_name)) + queue = Queue.from_queue_key('rq:queue:{0}'.format(queue_name), connection=self.testconn) self.assertIn(job, queue.jobs) - self.assertIn(queue, Queue.all()) + self.assertIn(queue, Queue.all(self.testconn)) def test_enqueue_job_with_scheduler_queue(self): """ @@ -398,7 +398,7 @@ def test_enqueue_job_with_scheduler_queue(self): scheduler.enqueue_job(job) self.assertTrue(job.enqueued_at is not None) self.assertIn(job, queue.jobs) - self.assertIn(queue, Queue.all()) + self.assertIn(queue, Queue.all(self.testconn)) def test_enqueue_job_with_job_queue_name(self): """ @@ -413,7 +413,7 @@ def test_enqueue_job_with_job_queue_name(self): scheduler.enqueue_job(job) self.assertTrue(job.enqueued_at is not None) self.assertIn(job, job_queue.jobs) - self.assertIn(job_queue, Queue.all()) + self.assertIn(job_queue, Queue.all(self.testconn)) def test_enqueue_at_with_job_queue_name(self): """ @@ -428,7 +428,7 @@ def test_enqueue_at_with_job_queue_name(self): self.scheduler.enqueue_job(job) self.assertTrue(job.enqueued_at is not None) self.assertIn(job, job_queue.jobs) - self.assertIn(job_queue, Queue.all()) + self.assertIn(job_queue, Queue.all(self.testconn)) def test_job_membership(self): now = datetime.utcnow() @@ -797,7 +797,7 @@ def test_scheduler_w_o_explicit_connection(self): """ Ensure instantiating Scheduler w/o explicit connection works. """ - s = Scheduler() + s = Scheduler(connection=self.testconn) self.assertEqual(s.connection, self.testconn) def test_small_float_interval(self):