diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index cda2444b0..a847dc1bd 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,6 +22,7 @@ endif::[] [float] ===== Changed - Change {pull}2526[#2526] +- Racecar consumer instrumentation improvements {pull} [float] ===== Fixed diff --git a/lib/elastic_apm/spies/racecar.rb b/lib/elastic_apm/spies/racecar.rb index a92ec169b..f757969d6 100644 --- a/lib/elastic_apm/spies/racecar.rb +++ b/lib/elastic_apm/spies/racecar.rb @@ -16,8 +16,8 @@ # under the License. begin -require 'active_support/notifications' -require 'active_support/subscriber' + require 'active_support/notifications' + require 'active_support/subscriber' # frozen_string_literal: true module ElasticAPM @@ -25,37 +25,51 @@ module ElasticAPM module Spies # @api private class RacecarSpy - TYPE = 'kafka' - SUBTYPE = 'racecar' + TYPE = 'kafka'.freeze + SUBTYPE = 'racecar'.freeze # @api private class ConsumerSubscriber < ActiveSupport::Subscriber def start_process_message(event) start_process_transaction(event: event, kind: 'process_message') end - def process_message(_event) + + def process_message(event) + record_outcome(event: event) ElasticAPM.end_transaction end def start_process_batch(event) start_process_transaction(event: event, kind: 'process_batch') end - def process_batch(_event) + + def process_batch(event) + record_outcome(event: event) ElasticAPM.end_transaction end private # only public methods will be subscribed def start_process_transaction(event:, kind:) - ElasticAPM.start_transaction(kind, TYPE) - ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) + ElasticAPM.start_transaction("#{event.payload[:consumer_class]}##{kind}", TYPE) + ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', + framework_version: Racecar::VERSION) + end + + def record_outcome(event:) + transaction = ElasticAPM.current_transaction + error_present = !event.payload[:unrecoverable_delivery_error].nil? + transaction.outcome = error_present ? Transaction::Outcome::FAILURE : Transaction::Outcome::SUCCESS + transaction.done(error_present ? :error : :success) end end + # @api private class ProducerSubscriber < ActiveSupport::Subscriber - def start_deliver_message(event) - ElasticAPM.start_transaction('deliver_message',TYPE) - ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) + def start_deliver_message(_event) + ElasticAPM.start_transaction('deliver_message', TYPE) + ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', + framework_version: Racecar::VERSION) end def deliver_message(_event) @@ -71,7 +85,7 @@ def install register 'Racecar', 'racecar', RacecarSpy.new end end - rescue LoadError # no active support available + warn "ActiveSupport not found." end diff --git a/spec/elastic_apm/spies/racecar_spec.rb b/spec/elastic_apm/spies/racecar_spec.rb index 2d33f55ab..2bc905e38 100644 --- a/spec/elastic_apm/spies/racecar_spec.rb +++ b/spec/elastic_apm/spies/racecar_spec.rb @@ -18,28 +18,72 @@ # frozen_string_literal: true require 'spec_helper' - begin require 'active_support/notifications' - require "active_support/subscriber" + require 'active_support/subscriber' require 'racecar' - module ElasticAPM RSpec.describe 'Spy: Racecar', :intercept do + let(:instrumentation_payload) do + { consumer_class: 'SpecConsumer', + topic: 'spec_topic', + partition: '0', + offset: '1', + create_time: Time.now, + key: '1', + value: { key: 'value' }, + headers: { key: 'value' } } + end it 'captures the instrumentation' do with_agent do - ActiveSupport::Notifications.instrument('start_process_message.racecar') - ActiveSupport::Notifications.instrument('process_message.racecar') do + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do # this is the body of the racecar consumer #process method end first_transaction = @intercepted.transactions.first expect(first_transaction).not_to be_nil - expect(first_transaction.name).to eq('process_message') + expect(first_transaction.name).to eq("#{instrumentation_payload[:consumer_class]}#process_message") expect(first_transaction.type).to eq('kafka') + expect(first_transaction.context.service.framework.name).to eq('racecar') + end + end + + describe 'transaction outcome' do + it 'records success when no delivery error happen' do + with_agent do + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do + # this is the body of the racecar consumer #process method + end + first_transaction = @intercepted.transactions.first + expect(first_transaction.outcome).to eq(Transaction::Outcome::SUCCESS) + end + end + it 'records failure when with a unrecoverable delivery error' do + instrumentation_payload[:unrecoverable_delivery_error] = true + with_agent do + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do + # this is the body of the racecar consumer #process method + end + first_transaction = @intercepted.transactions.first + expect(first_transaction.outcome).to eq(Transaction::Outcome::FAILURE) + end + end + it 'records failure when with a recoverable delivery error' do + instrumentation_payload[:unrecoverable_delivery_error] = false + with_agent do + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do + # this is the body of the racecar consumer #process method + end + first_transaction = @intercepted.transactions.first + expect(first_transaction.outcome).to eq(Transaction::Outcome::FAILURE) + end end end end end - rescue LoadError # in case we don't have ActiveSupport -end \ No newline at end of file + warn "ActiveSupport not found, skipping." +end