From 353ab0ce7484fcc17d325a8c7a0cc247983f4d18 Mon Sep 17 00:00:00 2001 From: Fernando Bellincanta Date: Fri, 5 Jan 2024 16:24:53 -0300 Subject: [PATCH 1/4] Adds consumer names to Racecar transactions --- lib/elastic_apm/spies/racecar.rb | 3 ++- spec/elastic_apm/spies/racecar_spec.rb | 24 +++++++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/lib/elastic_apm/spies/racecar.rb b/lib/elastic_apm/spies/racecar.rb index a92ec169b..276ffc503 100644 --- a/lib/elastic_apm/spies/racecar.rb +++ b/lib/elastic_apm/spies/racecar.rb @@ -47,7 +47,7 @@ def process_batch(_event) private # only public methods will be subscribed def start_process_transaction(event:, kind:) - ElasticAPM.start_transaction(kind, TYPE) + ElasticAPM.start_transaction("#{event.payload[:consumer_class]}##{kind}", TYPE) ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) end end @@ -74,4 +74,5 @@ def install rescue LoadError # no active support available + STDERR.puts "ActiveSupport not found." end diff --git a/spec/elastic_apm/spies/racecar_spec.rb b/spec/elastic_apm/spies/racecar_spec.rb index 2d33f55ab..d6963da81 100644 --- a/spec/elastic_apm/spies/racecar_spec.rb +++ b/spec/elastic_apm/spies/racecar_spec.rb @@ -18,28 +18,38 @@ # frozen_string_literal: true require 'spec_helper' - begin require 'active_support/notifications' - require "active_support/subscriber" + require 'active_support/subscriber' require 'racecar' - module ElasticAPM RSpec.describe 'Spy: Racecar', :intercept do it 'captures the instrumentation' do with_agent do - ActiveSupport::Notifications.instrument('start_process_message.racecar') - ActiveSupport::Notifications.instrument('process_message.racecar') do + instrumentation_payload = { + consumer_class: 'SpecConsumer', + topic: 'spec_topic', + partition: '0', + offset: '1', + create_time: Time.now, + key: '1', + value: {key: 'value'}, + headers: {key: 'value'} + } + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do # this is the body of the racecar consumer #process method end first_transaction = @intercepted.transactions.first expect(first_transaction).not_to be_nil - expect(first_transaction.name).to eq('process_message') + expect(first_transaction.name).to eq("#{instrumentation_payload[:consumer_class]}#process_message") expect(first_transaction.type).to eq('kafka') + expect(first_transaction.context.service.framework.name).to eq('racecar') end end end end rescue LoadError # in case we don't have ActiveSupport -end \ No newline at end of file + STDERR.puts "ActiveSupport not found, skipping." +end From 3d74b193f9c33034c888c5e02b3899bccb8aa244 Mon Sep 17 00:00:00 2001 From: Fernando Bellincanta Date: Mon, 8 Jan 2024 12:25:49 -0300 Subject: [PATCH 2/4] Adds transaction outcomes to Racecar transactions --- lib/elastic_apm/spies/racecar.rb | 14 ++++++- spec/elastic_apm/spies/racecar_spec.rb | 55 +++++++++++++++++++++----- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/lib/elastic_apm/spies/racecar.rb b/lib/elastic_apm/spies/racecar.rb index 276ffc503..e342f0eed 100644 --- a/lib/elastic_apm/spies/racecar.rb +++ b/lib/elastic_apm/spies/racecar.rb @@ -33,14 +33,24 @@ class ConsumerSubscriber < ActiveSupport::Subscriber def start_process_message(event) start_process_transaction(event: event, kind: 'process_message') end - def process_message(_event) + def process_message(event) + transaction = ElasticAPM.current_transaction + error_present = !event.payload[:unrecoverable_delivery_error].nil? + transaction.outcome = error_present ? Transaction::Outcome::FAILURE : Transaction::Outcome::SUCCESS + transaction.done(error_present ? :error : :success) + ElasticAPM.end_transaction end def start_process_batch(event) start_process_transaction(event: event, kind: 'process_batch') end - def process_batch(_event) + def process_batch(event) + transaction = ElasticAPM.current_transaction + error_present = !event.payload[:unrecoverable_delivery_error].nil? + transaction.outcome = error_present ? Transaction::Outcome::FAILURE : Transaction::Outcome::SUCCESS + transaction.done(error_present ? :error : :success) + ElasticAPM.end_transaction end diff --git a/spec/elastic_apm/spies/racecar_spec.rb b/spec/elastic_apm/spies/racecar_spec.rb index d6963da81..970b22b54 100644 --- a/spec/elastic_apm/spies/racecar_spec.rb +++ b/spec/elastic_apm/spies/racecar_spec.rb @@ -24,18 +24,18 @@ require 'racecar' module ElasticAPM RSpec.describe 'Spy: Racecar', :intercept do + let(:instrumentation_payload) { + { consumer_class: 'SpecConsumer', + topic: 'spec_topic', + partition: '0', + offset: '1', + create_time: Time.now, + key: '1', + value: {key: 'value'}, + headers: {key: 'value'} } + } it 'captures the instrumentation' do with_agent do - instrumentation_payload = { - consumer_class: 'SpecConsumer', - topic: 'spec_topic', - partition: '0', - offset: '1', - create_time: Time.now, - key: '1', - value: {key: 'value'}, - headers: {key: 'value'} - } ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do # this is the body of the racecar consumer #process method @@ -47,6 +47,41 @@ module ElasticAPM expect(first_transaction.context.service.framework.name).to eq('racecar') end end + + describe 'transaction outcome' do + it 'records success when no delivery error happen' do + with_agent do + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do + # this is the body of the racecar consumer #process method + end + first_transaction = @intercepted.transactions.first + expect(first_transaction.outcome).to eq(Transaction::Outcome::SUCCESS) + end + end + it 'records failure when with a unrecoverable delivery error' do + instrumentation_payload[:unrecoverable_delivery_error] = true + with_agent do + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do + # this is the body of the racecar consumer #process method + end + first_transaction = @intercepted.transactions.first + expect(first_transaction.outcome).to eq(Transaction::Outcome::FAILURE) + end + end + it 'records failure when with a recoverable delivery error' do + instrumentation_payload[:unrecoverable_delivery_error] = false + with_agent do + ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) + ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do + # this is the body of the racecar consumer #process method + end + first_transaction = @intercepted.transactions.first + expect(first_transaction.outcome).to eq(Transaction::Outcome::FAILURE) + end + end + end end end From f9f7e863646fdf62df1e88ca014aad3ef35e00f2 Mon Sep 17 00:00:00 2001 From: Fernando Bellincanta Date: Mon, 8 Jan 2024 13:13:50 -0300 Subject: [PATCH 3/4] Refactor consumer transaction outcome --- lib/elastic_apm/spies/racecar.rb | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lib/elastic_apm/spies/racecar.rb b/lib/elastic_apm/spies/racecar.rb index e342f0eed..dd3d326ad 100644 --- a/lib/elastic_apm/spies/racecar.rb +++ b/lib/elastic_apm/spies/racecar.rb @@ -34,11 +34,7 @@ def start_process_message(event) start_process_transaction(event: event, kind: 'process_message') end def process_message(event) - transaction = ElasticAPM.current_transaction - error_present = !event.payload[:unrecoverable_delivery_error].nil? - transaction.outcome = error_present ? Transaction::Outcome::FAILURE : Transaction::Outcome::SUCCESS - transaction.done(error_present ? :error : :success) - + record_outcome(event: event) ElasticAPM.end_transaction end @@ -46,11 +42,7 @@ def start_process_batch(event) start_process_transaction(event: event, kind: 'process_batch') end def process_batch(event) - transaction = ElasticAPM.current_transaction - error_present = !event.payload[:unrecoverable_delivery_error].nil? - transaction.outcome = error_present ? Transaction::Outcome::FAILURE : Transaction::Outcome::SUCCESS - transaction.done(error_present ? :error : :success) - + record_outcome(event: event) ElasticAPM.end_transaction end @@ -60,6 +52,13 @@ def start_process_transaction(event:, kind:) ElasticAPM.start_transaction("#{event.payload[:consumer_class]}##{kind}", TYPE) ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) end + + def record_outcome(event:) + transaction = ElasticAPM.current_transaction + error_present = !event.payload[:unrecoverable_delivery_error].nil? + transaction.outcome = error_present ? Transaction::Outcome::FAILURE : Transaction::Outcome::SUCCESS + transaction.done(error_present ? :error : :success) + end end class ProducerSubscriber < ActiveSupport::Subscriber From 6f2a0b1c1d50e59a07278b1696eda7924e1dd707 Mon Sep 17 00:00:00 2001 From: Fernando Bellincanta Date: Mon, 8 Jan 2024 13:37:40 -0300 Subject: [PATCH 4/4] Style guidelines applied --- CHANGELOG.asciidoc | 1 + lib/elastic_apm/spies/racecar.rb | 24 ++++++++++++++---------- spec/elastic_apm/spies/racecar_spec.rb | 21 ++++++++++----------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index cda2444b0..a847dc1bd 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -22,6 +22,7 @@ endif::[] [float] ===== Changed - Change {pull}2526[#2526] +- Racecar consumer instrumentation improvements {pull} [float] ===== Fixed diff --git a/lib/elastic_apm/spies/racecar.rb b/lib/elastic_apm/spies/racecar.rb index dd3d326ad..f757969d6 100644 --- a/lib/elastic_apm/spies/racecar.rb +++ b/lib/elastic_apm/spies/racecar.rb @@ -16,8 +16,8 @@ # under the License. begin -require 'active_support/notifications' -require 'active_support/subscriber' + require 'active_support/notifications' + require 'active_support/subscriber' # frozen_string_literal: true module ElasticAPM @@ -25,14 +25,15 @@ module ElasticAPM module Spies # @api private class RacecarSpy - TYPE = 'kafka' - SUBTYPE = 'racecar' + TYPE = 'kafka'.freeze + SUBTYPE = 'racecar'.freeze # @api private class ConsumerSubscriber < ActiveSupport::Subscriber def start_process_message(event) start_process_transaction(event: event, kind: 'process_message') end + def process_message(event) record_outcome(event: event) ElasticAPM.end_transaction @@ -41,6 +42,7 @@ def process_message(event) def start_process_batch(event) start_process_transaction(event: event, kind: 'process_batch') end + def process_batch(event) record_outcome(event: event) ElasticAPM.end_transaction @@ -50,7 +52,8 @@ def process_batch(event) def start_process_transaction(event:, kind:) ElasticAPM.start_transaction("#{event.payload[:consumer_class]}##{kind}", TYPE) - ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) + ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', + framework_version: Racecar::VERSION) end def record_outcome(event:) @@ -61,10 +64,12 @@ def record_outcome(event:) end end + # @api private class ProducerSubscriber < ActiveSupport::Subscriber - def start_deliver_message(event) - ElasticAPM.start_transaction('deliver_message',TYPE) - ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION) + def start_deliver_message(_event) + ElasticAPM.start_transaction('deliver_message', TYPE) + ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', + framework_version: Racecar::VERSION) end def deliver_message(_event) @@ -80,8 +85,7 @@ def install register 'Racecar', 'racecar', RacecarSpy.new end end - rescue LoadError # no active support available - STDERR.puts "ActiveSupport not found." + warn "ActiveSupport not found." end diff --git a/spec/elastic_apm/spies/racecar_spec.rb b/spec/elastic_apm/spies/racecar_spec.rb index 970b22b54..2bc905e38 100644 --- a/spec/elastic_apm/spies/racecar_spec.rb +++ b/spec/elastic_apm/spies/racecar_spec.rb @@ -24,16 +24,16 @@ require 'racecar' module ElasticAPM RSpec.describe 'Spy: Racecar', :intercept do - let(:instrumentation_payload) { + let(:instrumentation_payload) do { consumer_class: 'SpecConsumer', - topic: 'spec_topic', - partition: '0', - offset: '1', - create_time: Time.now, - key: '1', - value: {key: 'value'}, - headers: {key: 'value'} } - } + topic: 'spec_topic', + partition: '0', + offset: '1', + create_time: Time.now, + key: '1', + value: { key: 'value' }, + headers: { key: 'value' } } + end it 'captures the instrumentation' do with_agent do ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload) @@ -84,7 +84,6 @@ module ElasticAPM end end end - rescue LoadError # in case we don't have ActiveSupport - STDERR.puts "ActiveSupport not found, skipping." + warn "ActiveSupport not found, skipping." end