Skip to content

Commit

Permalink
Fix tests (#316)
Browse files Browse the repository at this point in the history
* Removed rq.compat

* Install setuptools before running CI

* Properly decode strings

* Try using redis-py version 4

* Add compatibility with RQ 2.0

* Fix tests

* Fix test_callbacks.py

* Use newer version of Redis action

* Use `Worker` instead of `SimpleWorker` in test

* Use multiple queue names

* More fixes
  • Loading branch information
selwin authored Oct 29, 2024
1 parent 656898e commit dfa1188
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 72 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"]
redis-version: [4, 5, 6, 7]
redis-py-version: [3.5.0]
redis-py-version: [4]

steps:
- uses: actions/checkout@v3
Expand All @@ -28,7 +28,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Start Redis
uses: supercharge/redis-github-action@1.5.0
uses: supercharge/redis-github-action@1.8.0
with:
redis-version: ${{ matrix.redis-version }}

Expand Down
10 changes: 6 additions & 4 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from itertools import repeat

from rq.exceptions import NoSuchJobError
from rq.job import Job
from rq.job import Job, JobStatus
from rq.queue import Queue
from rq.utils import backend_class, import_attribute

Expand All @@ -30,8 +30,9 @@ class Scheduler(object):

def __init__(self, queue_name='default', queue=None, interval=60, connection=None,
job_class=None, queue_class=None, name=None):
from rq.connections import resolve_connection
self.connection = resolve_connection(connection)
if connection is None:
raise ValueError('`connection` argument is required')
self.connection = connection
self._queue = queue
if self._queue is None:
self.queue_name = queue_name
Expand Down Expand Up @@ -143,7 +144,8 @@ def _create_job(self, func, args=None, kwargs=None, commit=True,
func, args=args, connection=self.connection,
kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, id=id,
description=description, timeout=timeout, meta=meta,
depends_on=depends_on,on_success=on_success,on_failure=on_failure,
depends_on=depends_on, on_success=on_success, on_failure=on_failure,
status=JobStatus.SCHEDULED
)
if queue_name:
job.origin = queue_name
Expand Down
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@
rqscheduler = rq_scheduler.scripts.rqscheduler:main
''',
package_data={'': ['README.rst']},
install_requires=['crontab>=0.23.0', 'rq>=0.13', 'python-dateutil', 'freezegun'],
install_requires=['crontab>=0.23.0', 'rq>=2', 'python-dateutil', 'freezegun'],
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)
18 changes: 2 additions & 16 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import unittest
from redis import StrictRedis
from rq import push_connection, pop_connection


def find_empty_redis_database():
Expand All @@ -27,12 +26,7 @@ class RQTestCase(unittest.TestCase):

@classmethod
def setUpClass(cls):
# Set up connection to Redis
testconn = find_empty_redis_database()
push_connection(testconn)

# Store the connection (for sanity checking)
cls.testconn = testconn
cls.testconn = find_empty_redis_database()

def setUp(self):
# Flush beforewards (we like our hygiene)
Expand Down Expand Up @@ -64,12 +58,4 @@ def assertIsInstance(self, obj, cls, msg=None):

@classmethod
def tearDownClass(cls):

# Pop the connection to Redis
testconn = pop_connection()
assert testconn == cls.testconn, 'Wow, something really nasty ' \
'happened to the Redis connection stack. Check your setup.'

# for python < 2.7, which doesn't have setUpClass
if not hasattr(unittest.TestCase, 'setUpClass'):
RQTestCase.setUpClass()
pass
33 changes: 1 addition & 32 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from multiprocessing import Process

from redis import Redis
from rq import Connection, get_current_job, get_current_connection, Queue
from rq import get_current_job, Queue
from rq.decorators import job
from rq.worker import HerokuWorker, Worker

Expand Down Expand Up @@ -66,17 +66,6 @@ def some_calculation(x, y, z=1):
return x * y / z


def rpush(key, value, append_worker_name=False, sleep=0):
"""Push a value into a list in Redis. Useful for detecting the order in
which jobs were executed."""
if sleep:
time.sleep(sleep)
if append_worker_name:
value += ':' + get_current_job().worker_name
redis = get_current_connection()
redis.rpush(key, value)


def check_dependencies_are_met():
return get_current_job().dependencies_are_met()

Expand Down Expand Up @@ -105,11 +94,6 @@ def launch_process_within_worker_and_store_pid(path, timeout):
p.wait()


def access_self():
assert get_current_connection() is not None
assert get_current_job() is not None


def modify_self(meta):
j = get_current_job()
j.meta.update(meta)
Expand Down Expand Up @@ -155,12 +139,6 @@ def static_method():
return u"I'm a static method"


with Connection():
@job(queue='default')
def decorated_job(x, y):
return x + y


def black_hole(job, *exc_info):
# Don't fall through to default behaviour (moving to failed queue)
return False
Expand Down Expand Up @@ -240,15 +218,6 @@ def start_worker(queue_name, conn_kwargs, worker_name, burst):
w = Worker([queue_name], name=worker_name, connection=Redis(**conn_kwargs))
w.work(burst=burst)

def start_worker_process(queue_name, connection=None, worker_name=None, burst=False):
"""
Use multiprocessing to start a new worker in a separate process.
"""
connection = connection or get_current_connection()
conn_kwargs = connection.connection_pool.connection_kwargs
p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst))
p.start()
return p

def burst_two_workers(queue, timeout=2, tries=5, pause=0.1):
"""
Expand Down
17 changes: 8 additions & 9 deletions tests/test_callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from rq import Queue, Worker
from rq.job import Job, JobStatus, UNEVALUATED
from rq.worker import SimpleWorker
from rq.worker import SimpleWorker, Worker


class QueueCallbackTestCase(RQTestCase):
Expand Down Expand Up @@ -51,7 +51,7 @@ class WorkerCallbackTestCase(RQTestCase):
def test_success_callback(self):
"""Test success callback is executed only when job is successful"""
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
worker = SimpleWorker(['default', 'high'], connection=self.testconn)

job = queue.enqueue(say_hello, on_success=save_result)

Expand All @@ -71,7 +71,7 @@ def test_success_callback(self):
def test_erroneous_success_callback(self):
"""Test exception handling when executing success callback"""
queue = Queue(connection=self.testconn)
worker = Worker([queue])
worker = Worker(['default', 'high'], connection=self.testconn)

# If success_callback raises an error, job will is considered as failed
job = queue.enqueue(say_hello, on_success=erroneous_callback)
Expand All @@ -81,15 +81,14 @@ def test_erroneous_success_callback(self):
def test_failure_callback(self):
"""Test failure callback is executed only when job a fails"""
queue = Queue(connection=self.testconn)
worker = SimpleWorker([queue])
worker = Worker(['default', 'high'], connection=self.testconn)

job = queue.enqueue(div_by_zero, on_failure=save_exception)

# Callback is executed when job is successfully executed
worker.work(burst=True)
self.assertEqual(job.get_status(), JobStatus.FAILED)
job.refresh()
print(job.exc_info)
self.assertIn('div_by_zero',
self.testconn.get('failure_callback:%s' % job.id).decode())

Expand All @@ -103,7 +102,7 @@ class JobCallbackTestCase(RQTestCase):

def test_job_creation_with_success_callback(self):
"""Ensure callbacks are created and persisted properly"""
job = Job.create(say_hello)
job = Job.create(say_hello, connection=self.testconn)
self.assertIsNone(job._success_callback_name)
# _success_callback starts with UNEVALUATED
self.assertEqual(job._success_callback, UNEVALUATED)
Expand All @@ -112,7 +111,7 @@ def test_job_creation_with_success_callback(self):
self.assertEqual(job._success_callback, None)

# job.success_callback is assigned properly
job = Job.create(say_hello, on_success=print)
job = Job.create(say_hello, on_success=print, connection=self.testconn)
self.assertIsNotNone(job._success_callback_name)
self.assertEqual(job.success_callback, print)
job.save()
Expand All @@ -122,7 +121,7 @@ def test_job_creation_with_success_callback(self):

def test_job_creation_with_failure_callback(self):
"""Ensure failure callbacks are persisted properly"""
job = Job.create(say_hello)
job = Job.create(say_hello, connection=self.testconn)
self.assertIsNone(job._failure_callback_name)
# _failure_callback starts with UNEVALUATED
self.assertEqual(job._failure_callback, UNEVALUATED)
Expand All @@ -131,7 +130,7 @@ def test_job_creation_with_failure_callback(self):
self.assertEqual(job._failure_callback, None)

# job.failure_callback is assigned properly
job = Job.create(say_hello, on_failure=print)
job = Job.create(say_hello, on_failure=print, connection=self.testconn)
self.assertIsNotNone(job._failure_callback_name)
self.assertEqual(job.failure_callback, print)
job.save()
Expand Down
12 changes: 6 additions & 6 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ def test_enqueue_job(self):
self.assertTrue(job.enqueued_at is not None)
queue = scheduler.get_queue_for_job(job)
self.assertIn(job, queue.jobs)
queue = Queue.from_queue_key('rq:queue:{0}'.format(queue_name))
queue = Queue.from_queue_key('rq:queue:{0}'.format(queue_name), connection=self.testconn)
self.assertIn(job, queue.jobs)
self.assertIn(queue, Queue.all())
self.assertIn(queue, Queue.all(self.testconn))

def test_enqueue_job_with_scheduler_queue(self):
"""
Expand All @@ -398,7 +398,7 @@ def test_enqueue_job_with_scheduler_queue(self):
scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, queue.jobs)
self.assertIn(queue, Queue.all())
self.assertIn(queue, Queue.all(self.testconn))

def test_enqueue_job_with_job_queue_name(self):
"""
Expand All @@ -413,7 +413,7 @@ def test_enqueue_job_with_job_queue_name(self):
scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, job_queue.jobs)
self.assertIn(job_queue, Queue.all())
self.assertIn(job_queue, Queue.all(self.testconn))

def test_enqueue_at_with_job_queue_name(self):
"""
Expand All @@ -428,7 +428,7 @@ def test_enqueue_at_with_job_queue_name(self):
self.scheduler.enqueue_job(job)
self.assertTrue(job.enqueued_at is not None)
self.assertIn(job, job_queue.jobs)
self.assertIn(job_queue, Queue.all())
self.assertIn(job_queue, Queue.all(self.testconn))

def test_job_membership(self):
now = datetime.utcnow()
Expand Down Expand Up @@ -797,7 +797,7 @@ def test_scheduler_w_o_explicit_connection(self):
"""
Ensure instantiating Scheduler w/o explicit connection works.
"""
s = Scheduler()
s = Scheduler(connection=self.testconn)
self.assertEqual(s.connection, self.testconn)

def test_small_float_interval(self):
Expand Down

0 comments on commit dfa1188

Please sign in to comment.