Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gh matrix #22

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions .github/workflows/run-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@ on:

jobs:
test:
strategy:
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.11
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: '3.11'
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: pip install -r requirements-test.txt
- name: Run tests
Expand Down
13 changes: 9 additions & 4 deletions pyworker/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class Job(object, metaclass=Meta):
"""docstring for Job"""
def __init__(self, class_name, database, logger,
job_id, queue, run_at, attempts=0, max_attempts=1,
attributes=None, abstract=False, extra_fields=None):
attributes=None, abstract=False, extra_fields=None,
reporter=None):
super(Job, self).__init__()
self.class_name = class_name
self.database = database
Expand All @@ -33,12 +34,14 @@ def __init__(self, class_name, database, logger,
self.attributes = attributes
self.abstract = abstract
self.extra_fields = extra_fields
self.reporter = reporter

def __str__(self):
return "%s: %s" % (self.__class__.__name__, str(self.__dict__))

@classmethod
def from_row(cls, job_row, max_attempts, database, logger, extra_fields=None):
def from_row(cls, job_row, max_attempts, database, logger,
extra_fields=None, reporter=None):
'''job_row is a tuple of (id, attempts, run_at, queue, handler, *extra_fields)'''
def extract_class_name(line):
regex = re.compile('object: !ruby/object:(.+)')
Expand Down Expand Up @@ -80,7 +83,8 @@ def extract_extra_fields(extra_fields, extra_field_values):
max_attempts=max_attempts,
job_id=job_id, attempts=attempts,
run_at=run_at, queue=queue, database=database,
abstract=True, extra_fields=extra_fields_dict)
abstract=True, extra_fields=extra_fields_dict,
reporter=reporter)

attributes = extract_attributes(handler[2:])
logger.debug("Found attributes: %s" % str(attributes))
Expand All @@ -94,7 +98,8 @@ def extract_extra_fields(extra_fields, extra_field_values):
run_at=run_at, queue=queue, database=database,
max_attempts=max_attempts,
attributes=payload['object']['attributes'],
abstract=False, extra_fields=extra_fields_dict)
abstract=False, extra_fields=extra_fields_dict,
reporter=reporter)

def before(self):
self.logger.debug("Running Job.before hook")
Expand Down
67 changes: 67 additions & 0 deletions pyworker/reporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import json
from contextlib import contextmanager
import newrelic.agent


class Reporter(object):

def __init__(self, attribute_prefix='', logger=None):
self._prefix = attribute_prefix
self._logger = logger
if self._logger:
self._logger.info('Reporter: initializing NewRelic')
newrelic.agent.initialize()
self._newrelic_app = newrelic.agent.register_application()

def report(self, review_id, articles_count, **attributes):
# extend attributes with review_id and articles_count
attributes.update({
'review_id': review_id,
'articles_count': articles_count
})
self.report_generic(**attributes)

def report_generic(self, **attributes):
# format attributes
attributes = self._format_attributes(attributes)
# report to NewRelic
self._report_newrelic(attributes)

@contextmanager
def recorder(self, name):
return newrelic.agent.BackgroundTask(
application=self._newrelic_app,
name=name,
group='DelayedJob')

def shutdown(self):
newrelic.agent.shutdown_agent()

def record_exception(self, exception):
newrelic.agent.record_exception(exception)

def _format_attributes(self, attributes):
# prefix then convert all attribute keys to camelCase
# ensure values types are supported or json dump them
return {
self._prefix + self._to_camel_case(key): self._convert_value(value)
for key, value in attributes.items()
if key is not None and value is not None
}

@staticmethod
def _to_camel_case(string):
return string[0]+string.title()[1:].replace("-","").replace("_","").replace(" ","")

@staticmethod
def _convert_value(value):
if type(value) not in [str, int, float, bool]:
return json.dumps(value)
return value

def _report_newrelic(self, attributes):
if self._logger:
self._logger.debug('Reporter: reporting to NewRelic: %s' % attributes)
# convert attributes dict to list of tuples
attributes = list(attributes.items())
newrelic.agent.add_custom_attributes(attributes)
61 changes: 27 additions & 34 deletions pyworker/worker.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import newrelic.agent

import os, signal, traceback
import time
import json
from contextlib import contextmanager
from pyworker.db import DBConnector
from pyworker.job import Job
from pyworker.logger import Logger
from pyworker.util import get_current_time, get_time_delta
from pyworker.reporter import Reporter

class TimeoutException(Exception): pass
class TerminatedException(Exception): pass

class Worker(object):
def __init__(self, dbstring, logger=None, extra_delayed_job_fields=None):
def __init__(self, dbstring, logger=None,
extra_delayed_job_fields=None,
reported_attributes_prefix=''):
super(Worker, self).__init__()
self.logger = Logger(logger)
self.logger.info('Starting pyworker...')
Expand All @@ -27,16 +27,15 @@ def __init__(self, dbstring, logger=None, extra_delayed_job_fields=None):
self.name = 'host:%s pid:%d' % (hostname, pid)
self.extra_delayed_job_fields = extra_delayed_job_fields

# Configure NewRelic if ENV variables set
self.newrelic_app = None
# Configure application reporter if ENV variables set
self.reporter = None
NEW_RELIC_LICENSE_KEY = os.environ.get("NEW_RELIC_LICENSE_KEY")
NEW_RELIC_APP_NAME = os.environ.get("NEW_RELIC_APP_NAME")

# Register Application in NewRelic if configured
# Register application reporter if configured
if NEW_RELIC_LICENSE_KEY and NEW_RELIC_APP_NAME:
self.logger.info('Initializing NewRelic Agent for: %s' % NEW_RELIC_APP_NAME)
newrelic.agent.initialize()
self.newrelic_app = newrelic.agent.register_application()
self.reporter = Reporter(
attribute_prefix=reported_attributes_prefix, logger=self.logger)

@contextmanager
def _time_limit(self, seconds):
Expand Down Expand Up @@ -72,30 +71,24 @@ def _latency(job_run_at):
# and when the job actually started running `now`
return (now - job_run_at).total_seconds()

if self.newrelic_app:
if self.reporter:
latency = _latency(job.run_at)

with newrelic.agent.BackgroundTask(
application=self.newrelic_app,
name=job.job_name,
group='DelayedJob') as task:
with self.reporter.recorder(job.job_name) as task:

# Record custom attributes for the job transaction
newrelic.agent.add_custom_attribute('job_id', job.job_id)
newrelic.agent.add_custom_attribute('job_name', job.job_name)
newrelic.agent.add_custom_attribute('job_queue', job.queue)
newrelic.agent.add_custom_attribute('job_latency', latency)
newrelic.agent.add_custom_attribute('job_attempts', job.attempts)
self.reporter.report_generic(
job_id=job.job_id,
job_name=job.job_name,
job_queue=job.queue,
job_latency=latency,
job_attempts=job.attempts
)

# Record extra fields if configured
self.logger.debug('job extra fields: %s' % job.extra_fields)
if job.extra_fields is not None:
for key, value in job.extra_fields.items():
# NewRelic only supports string, int, float, bool
if value is not None:
if type(value) not in [str, int, float, bool]:
value = json.dumps(value)
newrelic.agent.add_custom_attribute(key, value)
self.reporter.report_generic(**job.extra_fields)

yield task
else:
Expand All @@ -119,9 +112,9 @@ def run(self):

self.database.disconnect()

# If configured shutdown NewRelic Agent to upload data on shutdown
if self.newrelic_app:
newrelic.agent.shutdown_agent()
# If configured shutdown reporter to upload data on shutdown
if self.reporter:
self.reporter.shutdown()

def get_job(self):
def get_job_row():
Expand Down Expand Up @@ -152,7 +145,8 @@ def get_job_row():
if job_row:
return Job.from_row(job_row, max_attempts=self.max_attempts,
database=self.database, logger=self.logger,
extra_fields=self.extra_delayed_job_fields)
extra_fields=self.extra_delayed_job_fields,
reporter=self.reporter)
else:
return None

Expand Down Expand Up @@ -186,11 +180,10 @@ def handle_job(self, job):
raise exception
finally:
# report error status
if self.newrelic_app:
newrelic.agent.add_custom_attribute('error', error)
newrelic.agent.add_custom_attribute('job_failure', failed)
if self.reporter:
self.reporter.report_generic(error=error, job_failure=failed)
if caught_exception:
newrelic.agent.record_exception(caught_exception)
self.reporter.record_exception(caught_exception)
time_diff = time.time() - start_time
self.logger.info('Job %d finished in %d seconds' % \
(job.job_id, time_diff))
24 changes: 24 additions & 0 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ def load_unregistered_job(self):
def load_unregistered_job_with_extra_fields(self):
return self.load_job_with_extra_fields('handler_unregistered.yaml')

def load_unregistered_job_with_reporter(self, reporter):
job = self.load_unregistered_job()
job.reporter = reporter
return job

def load_registered_job(self):
job = self.load_job('handler_registered.yaml')
job.error = MagicMock()
Expand All @@ -76,6 +81,11 @@ def load_registered_job(self):
def load_registered_job_with_extra_fields(self):
return self.load_job_with_extra_fields('handler_registered.yaml')

def load_registered_job_with_reporter(self, reporter):
job = self.load_registered_job()
job.reporter = reporter
return job

def load_registered_job_with_attempts_exceeded(self):
job = self.load_registered_job()
job.attempts = self.mock_max_attempts - 1
Expand All @@ -99,12 +109,19 @@ def test_from_row_when_unregistered_class_returns_job_instance_without_attribute
self.assertEqual(job.max_attempts, self.mock_max_attempts)
self.assertIsNone(job.extra_fields)
self.assertIsNone(job.attributes)
self.assertIsNone(job.reporter)

def test_from_row_when_unregistered_class_returns_job_instance_with_extra_fields(self):
job = self.load_unregistered_job_with_extra_fields()

self.assertDictEqual(job.extra_fields, self.mock_extra_fields)

def test_from_row_when_unregistered_class_returns_abstract_job_instance_with_reporter(self):
mock_reporter = MagicMock()
job = self.load_unregistered_job_with_reporter(mock_reporter)

self.assertEqual(job.reporter, mock_reporter)

def test_from_row_when_registered_class_returns_concrete_job_instance(self):
job = self.load_registered_job()

Expand All @@ -128,12 +145,19 @@ def test_from_row_when_registered_class_returns_job_instance_with_attributes(sel
'total_articles': 1000,
'is_blind': True
})
self.assertIsNone(job.reporter)

def test_from_row_when_registered_class_returns_job_instance_with_extra_fields(self):
job = self.load_registered_job_with_extra_fields()

self.assertDictEqual(job.extra_fields, self.mock_extra_fields)

def test_from_row_when_registered_class_returns_concrete_job_instance_with_reporter(self):
mock_reporter = MagicMock()
job = self.load_registered_job_with_reporter(mock_reporter)

self.assertEqual(job.reporter, mock_reporter)

#********** .set_error_unlock tests **********#

def assert_job_updated_field(self, job, field, value):
Expand Down
Loading
Loading