From f441e1e4b5a702a7a8b0f9fd2702d41b95a045af Mon Sep 17 00:00:00 2001 From: Hossam Hammady Date: Mon, 18 Dec 2023 10:46:58 -0500 Subject: [PATCH 1/2] Add tests for Job.from_row --- pyworker/job.py | 1 - test.py | 18 -------- tests/fixtures/handler_registered.yaml | 20 ++++++++ tests/fixtures/handler_unregistered.yaml | 20 ++++++++ tests/test_job.py | 59 ++++++++++++++++++++++++ tests/test_worker.py | 2 +- 6 files changed, 100 insertions(+), 20 deletions(-) delete mode 100755 test.py create mode 100644 tests/fixtures/handler_registered.yaml create mode 100644 tests/fixtures/handler_unregistered.yaml create mode 100644 tests/test_job.py diff --git a/pyworker/job.py b/pyworker/job.py index 07b4491..289144a 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -40,7 +40,6 @@ def __str__(self): def from_row(cls, job_row, max_attempts, database, logger): '''job_row is a tuple of (id, attempts, run_at, queue, handler)''' def extract_class_name(line): - # TODO cache regex regex = re.compile('object: !ruby/object:(.+)') match = regex.match(line) if match: diff --git a/test.py b/test.py deleted file mode 100755 index ddf9691..0000000 --- a/test.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python - -from os import environ as env -import logging -from pyworker.worker import Worker - -dbstring = env.get('DATABASE_URL') -if not dbstring: - raise EnvironmentError('DATABASE_URL missing from environment') - -logging.basicConfig() -logger = logging.getLogger('pyworker') -logger.setLevel(logging.DEBUG) - -w = Worker(dbstring, logger) # logger is optional -w.sleep_delay = 3 -w.max_run_time = 10 -w.run() diff --git a/tests/fixtures/handler_registered.yaml b/tests/fixtures/handler_registered.yaml new file mode 100644 index 0000000..6ae53f6 --- /dev/null +++ b/tests/fixtures/handler_registered.yaml @@ -0,0 +1,20 @@ +--- !ruby/object:Delayed::PerformableMethod +object: !ruby/object:RegisteredJob + raw_attributes: + id: 100 + title: review title + description: | + review description + multiline + total_articles: 1000 + is_blind: true + attributes: !ruby/object:ActiveRecord::AttributeSet + attributes: !ruby/object:ActiveRecord::LazyAttributeHash + types: {} + values: {} + additional_types: {} + materialized: true + new_record: false + active_record_yaml_version: 1 +method_name: :run +args: [] diff --git a/tests/fixtures/handler_unregistered.yaml b/tests/fixtures/handler_unregistered.yaml new file mode 100644 index 0000000..66f9c62 --- /dev/null +++ b/tests/fixtures/handler_unregistered.yaml @@ -0,0 +1,20 @@ +--- !ruby/object:Delayed::PerformableMethod +object: !ruby/object:UnregisteredJob + raw_attributes: + id: 100 + title: review title + description: | + review description + multiline + total_articles: 1000 + is_blind: true + attributes: !ruby/object:ActiveRecord::AttributeSet + attributes: !ruby/object:ActiveRecord::LazyAttributeHash + types: {} + values: {} + additional_types: {} + materialized: true + new_record: false + active_record_yaml_version: 1 +method_name: :run +args: [] diff --git a/tests/test_job.py b/tests/test_job.py new file mode 100644 index 0000000..3708825 --- /dev/null +++ b/tests/test_job.py @@ -0,0 +1,59 @@ +import datetime +from unittest import TestCase +from unittest.mock import patch, MagicMock +from pyworker.job import Job, get_current_time, get_time_delta + + +class RegisteredJob(Job): # matching the registered class fixture + def run(self): + pass + + +class TestJob(TestCase): + def setUp(self): + self.mock_run_at = datetime.datetime(2023, 10, 7, 0, 0, 1) + + def tearDown(self): + pass + + def load_fixture(self, filename): + with open('tests/fixtures/%s' % filename) as f: + return f.read() + + #********** .from_row tests **********# + + def test_from_row_when_unregistered_class_returns_abstract_job_instance(self): + mock_handler = self.load_fixture('handler_unregistered.yaml') + mock_row = (1, 0, self.mock_run_at, 'default', mock_handler) + job = Job.from_row(mock_row, 1, MagicMock(), MagicMock()) + + self.assertEqual(job.class_name, 'UnregisteredJob') + self.assertEqual(job.abstract, True) + + def test_from_row_when_registered_class_returns_concrete_job_instance(self): + mock_handler = self.load_fixture('handler_registered.yaml') + mock_row = (1, 0, self.mock_run_at, 'default', mock_handler) + job = Job.from_row(mock_row, 1, MagicMock(), MagicMock()) + + self.assertEqual(job.class_name, 'RegisteredJob') + self.assertEqual(job.abstract, False) + + def test_from_row_when_registered_class_returns_concrete_job_instance_with_attributes(self): + mock_handler = self.load_fixture('handler_registered.yaml') + mock_row = (1, 0, self.mock_run_at, 'default', mock_handler) + job = Job.from_row(mock_row, 1, MagicMock(), MagicMock()) + + self.assertEqual(job.job_id, 1) + self.assertEqual(job.attempts, 0) + self.assertEqual(job.run_at, self.mock_run_at) + self.assertEqual(job.queue, 'default') + self.assertEqual(job.max_attempts, 1) + self.assertDictEqual(job.attributes, { + 'id': 100, + 'title': 'review title', + 'description': 'review description\nmultiline\n', + 'total_articles': 1000, + 'is_blind': True + }) + + #********** .set_error_unlock tests **********# diff --git a/tests/test_worker.py b/tests/test_worker.py index eb11faf..b3e0b04 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,6 +1,6 @@ import datetime from unittest import TestCase -from unittest.mock import patch, MagicMock, Mock +from unittest.mock import patch, MagicMock from pyworker.worker import Worker, TerminatedException, get_current_time class TestWorker(TestCase): From cf6e7f6c7f676c6f7455d5fbd0df743215ff1a64 Mon Sep 17 00:00:00 2001 From: Hossam Hammady Date: Mon, 18 Dec 2023 17:59:36 -0500 Subject: [PATCH 2/2] Add tests for Job.set_error_unlock --- pyworker/job.py | 25 +++-- tests/test_job.py | 239 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 242 insertions(+), 22 deletions(-) diff --git a/pyworker/job.py b/pyworker/job.py index 289144a..879b311 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -110,12 +110,15 @@ def set_error_unlock(self, error): self.attempts += 1 now = get_current_time() setters = [ - 'locked_at = null', - 'locked_by = null', - 'attempts = %d' % self.attempts, + 'locked_at = %s', + 'locked_by = %s', + 'attempts = %s', 'last_error = %s' ] values = [ + None, + None, + self.attempts, error ] if self.attempts >= self.max_attempts: @@ -129,12 +132,8 @@ def set_error_unlock(self, error): setters.append('run_at = %s') delta = (self.attempts**4) + 5 values.append(str(now + get_time_delta(seconds=delta))) - query = 'UPDATE delayed_jobs SET %s WHERE id = %d' % \ - (', '.join(setters), self.job_id) - self.logger.debug('set error query: %s' % query) - self.logger.debug('set error values: %s' % str(values)) - self.database.cursor().execute(query, tuple(values)) - self.database.commit() + + self._update_job(setters, values) return failed def remove(self): @@ -142,3 +141,11 @@ def remove(self): query = 'DELETE FROM delayed_jobs WHERE id = %d' % self.job_id self.database.cursor().execute(query) self.database.commit() + + def _update_job(self, setters, values): + query = 'UPDATE delayed_jobs SET %s WHERE id = %d' % \ + (', '.join(setters), self.job_id) + self.logger.debug('update query: %s' % query) + self.logger.debug('update values: %s' % str(values)) + self.database.cursor().execute(query, tuple(values)) + self.database.commit() diff --git a/tests/test_job.py b/tests/test_job.py index 3708825..cbd1fb9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -11,7 +11,12 @@ def run(self): class TestJob(TestCase): def setUp(self): + self.mock_job_id = 1 + self.mock_attempts = 0 self.mock_run_at = datetime.datetime(2023, 10, 7, 0, 0, 1) + self.mock_queue = 'default' + self.mock_max_attempts = 5 + self.mock_now = datetime.datetime(2023, 10, 7, 0, 0, 0) def tearDown(self): pass @@ -20,34 +25,57 @@ def load_fixture(self, filename): with open('tests/fixtures/%s' % filename) as f: return f.read() + def load_job(self, filename): + mock_handler = self.load_fixture(filename) + mock_row = ( + self.mock_job_id, + self.mock_attempts, + self.mock_run_at, + self.mock_queue, + mock_handler + ) + return Job.from_row(mock_row, + self.mock_max_attempts, + MagicMock(), MagicMock()) + + def load_unregistered_job(self): + return self.load_job('handler_unregistered.yaml') + + def load_registered_job(self): + job = self.load_job('handler_registered.yaml') + job.error = MagicMock() + job.failure = MagicMock() + job._update_job = MagicMock() + return job + + def load_registered_job_with_attempts_exceeded(self): + job = self.load_registered_job() + job.attempts = self.mock_max_attempts - 1 + return job + #********** .from_row tests **********# def test_from_row_when_unregistered_class_returns_abstract_job_instance(self): - mock_handler = self.load_fixture('handler_unregistered.yaml') - mock_row = (1, 0, self.mock_run_at, 'default', mock_handler) - job = Job.from_row(mock_row, 1, MagicMock(), MagicMock()) + job = self.load_unregistered_job() self.assertEqual(job.class_name, 'UnregisteredJob') self.assertEqual(job.abstract, True) def test_from_row_when_registered_class_returns_concrete_job_instance(self): - mock_handler = self.load_fixture('handler_registered.yaml') - mock_row = (1, 0, self.mock_run_at, 'default', mock_handler) - job = Job.from_row(mock_row, 1, MagicMock(), MagicMock()) + job = self.load_registered_job() self.assertEqual(job.class_name, 'RegisteredJob') self.assertEqual(job.abstract, False) def test_from_row_when_registered_class_returns_concrete_job_instance_with_attributes(self): - mock_handler = self.load_fixture('handler_registered.yaml') - mock_row = (1, 0, self.mock_run_at, 'default', mock_handler) - job = Job.from_row(mock_row, 1, MagicMock(), MagicMock()) + job = self.load_registered_job() - self.assertEqual(job.job_id, 1) - self.assertEqual(job.attempts, 0) + self.assertEqual(job.job_id, self.mock_job_id) + self.assertEqual(job.attempts, self.mock_attempts) self.assertEqual(job.run_at, self.mock_run_at) - self.assertEqual(job.queue, 'default') - self.assertEqual(job.max_attempts, 1) + self.assertEqual(job.queue, self.mock_queue) + self.assertEqual(job.max_attempts, self.mock_max_attempts) + # below attributes match the registered class fixture self.assertDictEqual(job.attributes, { 'id': 100, 'title': 'review title', @@ -57,3 +85,188 @@ def test_from_row_when_registered_class_returns_concrete_job_instance_with_attri }) #********** .set_error_unlock tests **********# + + def assert_job_updated_field(self, job, field, value): + job._update_job.assert_called_once() + setters = job._update_job.call_args[0][0] + values = job._update_job.call_args[0][1] + index = setters.index("%s = %%s" % field) + assert index >= 0 + assert values[index] == value + + def assert_job_non_updated_field(self, job, field): + job._update_job.assert_called_once() + setters = job._update_job.call_args[0][0] + assert "%s = %%s" % field not in setters + + def assert_job_updated_run_at(self, job, attempts, expected_value): + job.attempts = attempts + expected_value = expected_value.strftime('%Y-%m-%d %H:%M:%S') + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'run_at', expected_value) + + # max attempts not exceeded + + ## error and failure hooks + def test_set_error_unlock_if_max_attempts_not_exceeded_calls_error_hook_only(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + job.error.assert_called_once_with('some error') + job.failure.assert_not_called() + + ## attempts + def test_set_error_unlock_if_max_attempts_not_exceeded_increments_attempts(self): + job = self.load_registered_job() + expected_value = job.attempts + 1 + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'attempts', expected_value) + + ## run_at + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_0( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=0, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 6)) + + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_1( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=1, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 21)) + + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_2( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=2, expected_value=datetime.datetime(2023, 10, 7, 0, 1, 26)) + + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_3( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=3, expected_value=datetime.datetime(2023, 10, 7, 0, 4, 21)) + + ## failed_at + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_does_not_update_failed_at( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_non_updated_field(job, 'failed_at') + + ## locked_at + def test_set_error_unlock_if_max_attempts_not_exceeded_nullifies_job_locked_at(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_at', None) + + ## locked_by + def test_set_error_unlock_if_max_attempts_not_exceeded_nullifies_job_locked_by(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_by', None) + + ## last_error + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_last_error(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'last_error', 'some error') + + ## returns + def test_set_error_unlock_if_max_attempts_not_exceeded_returns_false(self): + job = self.load_registered_job() + + self.assertFalse(job.set_error_unlock('some error')) + + ## max attempts exceeded + + ## error and failure hooks + def test_set_error_unlock_if_max_attempts_exceeded_calls_error_and_failure_hooks(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + job.error.assert_called_once_with('some error') + job.failure.assert_called_once_with('some error') + + ## attempts + def test_set_error_unlock_if_max_attempts_exceeded_increments_attempts(self): + job = self.load_registered_job_with_attempts_exceeded() + expected_value = job.attempts + 1 + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'attempts', expected_value) + + ## run_at + def test_set_error_unlock_if_max_attempts_exceeded_does_not_update_run_at(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_non_updated_field(job, 'run_at') + + ## failed_at + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_exceeded_updates_failed_at( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'failed_at', mock_get_current_time.return_value) + + ## locked_at + def test_set_error_unlock_if_max_attempts_exceeded_nullifies_job_locked_at(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_at', None) + + ## locked_by + def test_set_error_unlock_if_max_attempts_exceeded_nullifies_job_locked_by(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_by', None) + + ## last_error + def test_set_error_unlock_if_max_attempts_exceeded_updates_last_error(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'last_error', 'some error') + + ## returns + def test_set_error_unlock_if_max_attempts_exceeded_returns_true(self): + job = self.load_registered_job_with_attempts_exceeded() + + self.assertTrue(job.set_error_unlock('some error')) +