diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index cda2444b0..a847dc1bd 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,6 +22,7 @@ endif::[] [float] ===== Changed - Change {pull}2526[#2526] +- Racecar consumer instrumentation improvements {pull} [float] ===== Fixed diff --git a/lib/elastic_apm/spies/racecar.rb b/lib/elastic_apm/spies/racecar.rb index dd3d326ad..f757969d6 100644 --- a/lib/elastic_apm/spies/racecar.rb +++ b/lib/elastic_apm/spies/racecar.rb @@ -16,8 +16,8 @@ # under the License. begin -require 'active_support/notifications' -require 'active_support/subscriber' + require 'active_support/notifications' + require 'active_support/subscriber' # frozen_string_literal: true module ElasticAPM @@ -25,14 +25,15 @@ module ElasticAPM module Spies # @api private class RacecarSpy - TYPE = 'kafka' - SUBTYPE = 'racecar' + TYPE = 'kafka'.freeze + SUBTYPE = 'racecar'.freeze # @api private class ConsumerSubscriber < ActiveSupport::Subscriber def start_process_message(event) start_process_transaction(event: event, kind: 'process_message') end + def process_message(event) record_outcome(event: event) ElasticAPM.end_transaction @@ -41,6 +42,7 @@ def process_message(event) def start_process_batch(event) start_process_transaction(event: event, kind: 'process_batch') end + def process_batch(event) record_outcome(event: event) ElasticAPM.end_transaction @@ -50,7 +52,8 @@ def process_batch(event) def start_process_transaction(event:, kind:) ElasticAPM.start_transaction("#{event.payload[:consumer_class]}##{kind}", TYPE) - ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) + ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', + framework_version: Racecar::VERSION) end def record_outcome(event:) @@ -61,10 +64,12 @@ def record_outcome(event:) end end + # @api private class ProducerSubscriber < ActiveSupport::Subscriber - def start_deliver_message(event) - ElasticAPM.start_transaction('deliver_message',TYPE) - ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) + def start_deliver_message(_event) + ElasticAPM.start_transaction('deliver_message', TYPE) + ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', + framework_version: Racecar::VERSION) end def deliver_message(_event) @@ -80,8 +85,7 @@ def install register 'Racecar', 'racecar', RacecarSpy.new end end - rescue LoadError # no active support available - STDERR.puts "ActiveSupport not found." + warn "ActiveSupport not found." end diff --git a/spec/elastic_apm/spies/racecar_spec.rb b/spec/elastic_apm/spies/racecar_spec.rb index 970b22b54..2bc905e38 100644 --- a/spec/elastic_apm/spies/racecar_spec.rb +++ b/spec/elastic_apm/spies/racecar_spec.rb @@ -24,16 +24,16 @@ require 'racecar' module ElasticAPM RSpec.describe 'Spy: Racecar', :intercept do - let(:instrumentation_payload) { + let(:instrumentation_payload) do { consumer_class: 'SpecConsumer', - topic: 'spec_topic', - partition: '0', - offset: '1', - create_time: Time.now, - key: '1', - value: {key: 'value'}, - headers: {key: 'value'} } - } + topic: 'spec_topic', + partition: '0', + offset: '1', + create_time: Time.now, + key: '1', + value: { key: 'value' }, + headers: { key: 'value' } } + end it 'captures the instrumentation' do with_agent do ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) @@ -84,7 +84,6 @@ module ElasticAPM end end end - rescue LoadError # in case we don't have ActiveSupport - STDERR.puts "ActiveSupport not found, skipping." + warn "ActiveSupport not found, skipping." end