diff --git a/pyworker/job.py b/pyworker/job.py index 07b4491..879b311 100644 --- a/pyworker/job.py +++ b/pyworker/job.py @@ -40,7 +40,6 @@ def __str__(self): def from_row(cls, job_row, max_attempts, database, logger): '''job_row is a tuple of (id, attempts, run_at, queue, handler)''' def extract_class_name(line): - # TODO cache regex regex = re.compile('object: !ruby/object:(.+)') match = regex.match(line) if match: @@ -111,12 +110,15 @@ def set_error_unlock(self, error): self.attempts += 1 now = get_current_time() setters = [ - 'locked_at = null', - 'locked_by = null', - 'attempts = %d' % self.attempts, + 'locked_at = %s', + 'locked_by = %s', + 'attempts = %s', 'last_error = %s' ] values = [ + None, + None, + self.attempts, error ] if self.attempts >= self.max_attempts: @@ -130,12 +132,8 @@ def set_error_unlock(self, error): setters.append('run_at = %s') delta = (self.attempts**4) + 5 values.append(str(now + get_time_delta(seconds=delta))) - query = 'UPDATE delayed_jobs SET %s WHERE id = %d' % \ - (', '.join(setters), self.job_id) - self.logger.debug('set error query: %s' % query) - self.logger.debug('set error values: %s' % str(values)) - self.database.cursor().execute(query, tuple(values)) - self.database.commit() + + self._update_job(setters, values) return failed def remove(self): @@ -143,3 +141,11 @@ def remove(self): query = 'DELETE FROM delayed_jobs WHERE id = %d' % self.job_id self.database.cursor().execute(query) self.database.commit() + + def _update_job(self, setters, values): + query = 'UPDATE delayed_jobs SET %s WHERE id = %d' % \ + (', '.join(setters), self.job_id) + self.logger.debug('update query: %s' % query) + self.logger.debug('update values: %s' % str(values)) + self.database.cursor().execute(query, tuple(values)) + self.database.commit() diff --git a/test.py b/test.py deleted file mode 100755 index ddf9691..0000000 --- a/test.py +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env python - -from os import environ as env -import logging -from pyworker.worker import Worker - -dbstring = env.get('DATABASE_URL') -if not dbstring: - raise EnvironmentError('DATABASE_URL missing from environment') - -logging.basicConfig() -logger = logging.getLogger('pyworker') -logger.setLevel(logging.DEBUG) - -w = Worker(dbstring, logger) # logger is optional -w.sleep_delay = 3 -w.max_run_time = 10 -w.run() diff --git a/tests/fixtures/handler_registered.yaml b/tests/fixtures/handler_registered.yaml new file mode 100644 index 0000000..6ae53f6 --- /dev/null +++ b/tests/fixtures/handler_registered.yaml @@ -0,0 +1,20 @@ +--- !ruby/object:Delayed::PerformableMethod +object: !ruby/object:RegisteredJob + raw_attributes: + id: 100 + title: review title + description: | + review description + multiline + total_articles: 1000 + is_blind: true + attributes: !ruby/object:ActiveRecord::AttributeSet + attributes: !ruby/object:ActiveRecord::LazyAttributeHash + types: {} + values: {} + additional_types: {} + materialized: true + new_record: false + active_record_yaml_version: 1 +method_name: :run +args: [] diff --git a/tests/fixtures/handler_unregistered.yaml b/tests/fixtures/handler_unregistered.yaml new file mode 100644 index 0000000..66f9c62 --- /dev/null +++ b/tests/fixtures/handler_unregistered.yaml @@ -0,0 +1,20 @@ +--- !ruby/object:Delayed::PerformableMethod +object: !ruby/object:UnregisteredJob + raw_attributes: + id: 100 + title: review title + description: | + review description + multiline + total_articles: 1000 + is_blind: true + attributes: !ruby/object:ActiveRecord::AttributeSet + attributes: !ruby/object:ActiveRecord::LazyAttributeHash + types: {} + values: {} + additional_types: {} + materialized: true + new_record: false + active_record_yaml_version: 1 +method_name: :run +args: [] diff --git a/tests/test_job.py b/tests/test_job.py new file mode 100644 index 0000000..cbd1fb9 --- /dev/null +++ b/tests/test_job.py @@ -0,0 +1,272 @@ +import datetime +from unittest import TestCase +from unittest.mock import patch, MagicMock +from pyworker.job import Job, get_current_time, get_time_delta + + +class RegisteredJob(Job): # matching the registered class fixture + def run(self): + pass + + +class TestJob(TestCase): + def setUp(self): + self.mock_job_id = 1 + self.mock_attempts = 0 + self.mock_run_at = datetime.datetime(2023, 10, 7, 0, 0, 1) + self.mock_queue = 'default' + self.mock_max_attempts = 5 + self.mock_now = datetime.datetime(2023, 10, 7, 0, 0, 0) + + def tearDown(self): + pass + + def load_fixture(self, filename): + with open('tests/fixtures/%s' % filename) as f: + return f.read() + + def load_job(self, filename): + mock_handler = self.load_fixture(filename) + mock_row = ( + self.mock_job_id, + self.mock_attempts, + self.mock_run_at, + self.mock_queue, + mock_handler + ) + return Job.from_row(mock_row, + self.mock_max_attempts, + MagicMock(), MagicMock()) + + def load_unregistered_job(self): + return self.load_job('handler_unregistered.yaml') + + def load_registered_job(self): + job = self.load_job('handler_registered.yaml') + job.error = MagicMock() + job.failure = MagicMock() + job._update_job = MagicMock() + return job + + def load_registered_job_with_attempts_exceeded(self): + job = self.load_registered_job() + job.attempts = self.mock_max_attempts - 1 + return job + + #********** .from_row tests **********# + + def test_from_row_when_unregistered_class_returns_abstract_job_instance(self): + job = self.load_unregistered_job() + + self.assertEqual(job.class_name, 'UnregisteredJob') + self.assertEqual(job.abstract, True) + + def test_from_row_when_registered_class_returns_concrete_job_instance(self): + job = self.load_registered_job() + + self.assertEqual(job.class_name, 'RegisteredJob') + self.assertEqual(job.abstract, False) + + def test_from_row_when_registered_class_returns_concrete_job_instance_with_attributes(self): + job = self.load_registered_job() + + self.assertEqual(job.job_id, self.mock_job_id) + self.assertEqual(job.attempts, self.mock_attempts) + self.assertEqual(job.run_at, self.mock_run_at) + self.assertEqual(job.queue, self.mock_queue) + self.assertEqual(job.max_attempts, self.mock_max_attempts) + # below attributes match the registered class fixture + self.assertDictEqual(job.attributes, { + 'id': 100, + 'title': 'review title', + 'description': 'review description\nmultiline\n', + 'total_articles': 1000, + 'is_blind': True + }) + + #********** .set_error_unlock tests **********# + + def assert_job_updated_field(self, job, field, value): + job._update_job.assert_called_once() + setters = job._update_job.call_args[0][0] + values = job._update_job.call_args[0][1] + index = setters.index("%s = %%s" % field) + assert index >= 0 + assert values[index] == value + + def assert_job_non_updated_field(self, job, field): + job._update_job.assert_called_once() + setters = job._update_job.call_args[0][0] + assert "%s = %%s" % field not in setters + + def assert_job_updated_run_at(self, job, attempts, expected_value): + job.attempts = attempts + expected_value = expected_value.strftime('%Y-%m-%d %H:%M:%S') + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'run_at', expected_value) + + # max attempts not exceeded + + ## error and failure hooks + def test_set_error_unlock_if_max_attempts_not_exceeded_calls_error_hook_only(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + job.error.assert_called_once_with('some error') + job.failure.assert_not_called() + + ## attempts + def test_set_error_unlock_if_max_attempts_not_exceeded_increments_attempts(self): + job = self.load_registered_job() + expected_value = job.attempts + 1 + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'attempts', expected_value) + + ## run_at + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_0( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=0, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 6)) + + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_1( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=1, expected_value=datetime.datetime(2023, 10, 7, 0, 0, 21)) + + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_2( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=2, expected_value=datetime.datetime(2023, 10, 7, 0, 1, 26)) + + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_run_at_exponentially_when_attempts_3( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + self.assert_job_updated_run_at(job, attempts=3, expected_value=datetime.datetime(2023, 10, 7, 0, 4, 21)) + + ## failed_at + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_not_exceeded_does_not_update_failed_at( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_non_updated_field(job, 'failed_at') + + ## locked_at + def test_set_error_unlock_if_max_attempts_not_exceeded_nullifies_job_locked_at(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_at', None) + + ## locked_by + def test_set_error_unlock_if_max_attempts_not_exceeded_nullifies_job_locked_by(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_by', None) + + ## last_error + def test_set_error_unlock_if_max_attempts_not_exceeded_updates_last_error(self): + job = self.load_registered_job() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'last_error', 'some error') + + ## returns + def test_set_error_unlock_if_max_attempts_not_exceeded_returns_false(self): + job = self.load_registered_job() + + self.assertFalse(job.set_error_unlock('some error')) + + ## max attempts exceeded + + ## error and failure hooks + def test_set_error_unlock_if_max_attempts_exceeded_calls_error_and_failure_hooks(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + job.error.assert_called_once_with('some error') + job.failure.assert_called_once_with('some error') + + ## attempts + def test_set_error_unlock_if_max_attempts_exceeded_increments_attempts(self): + job = self.load_registered_job_with_attempts_exceeded() + expected_value = job.attempts + 1 + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'attempts', expected_value) + + ## run_at + def test_set_error_unlock_if_max_attempts_exceeded_does_not_update_run_at(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_non_updated_field(job, 'run_at') + + ## failed_at + @patch('pyworker.job.get_current_time') + def test_set_error_unlock_if_max_attempts_exceeded_updates_failed_at( + self, mock_get_current_time): + mock_get_current_time.return_value = self.mock_now + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'failed_at', mock_get_current_time.return_value) + + ## locked_at + def test_set_error_unlock_if_max_attempts_exceeded_nullifies_job_locked_at(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_at', None) + + ## locked_by + def test_set_error_unlock_if_max_attempts_exceeded_nullifies_job_locked_by(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'locked_by', None) + + ## last_error + def test_set_error_unlock_if_max_attempts_exceeded_updates_last_error(self): + job = self.load_registered_job_with_attempts_exceeded() + + job.set_error_unlock('some error') + + self.assert_job_updated_field(job, 'last_error', 'some error') + + ## returns + def test_set_error_unlock_if_max_attempts_exceeded_returns_true(self): + job = self.load_registered_job_with_attempts_exceeded() + + self.assertTrue(job.set_error_unlock('some error')) + diff --git a/tests/test_worker.py b/tests/test_worker.py index eb11faf..b3e0b04 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,6 +1,6 @@ import datetime from unittest import TestCase -from unittest.mock import patch, MagicMock, Mock +from unittest.mock import patch, MagicMock from pyworker.worker import Worker, TerminatedException, get_current_time class TestWorker(TestCase):