Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Racecar consumer instrumentation improvements #1429

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ endif::[]
[float]
===== Changed
- Change {pull}2526[#2526]
- Racecar consumer instrumentation improvements {pull}

[float]
===== Fixed
Expand Down
38 changes: 26 additions & 12 deletions lib/elastic_apm/spies/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,60 @@
# under the License.

begin
require 'active_support/notifications'
require 'active_support/subscriber'
require 'active_support/notifications'
require 'active_support/subscriber'

# frozen_string_literal: true
module ElasticAPM
# @api private
module Spies
# @api private
class RacecarSpy
TYPE = 'kafka'
SUBTYPE = 'racecar'
TYPE = 'kafka'.freeze
SUBTYPE = 'racecar'.freeze

# @api private
class ConsumerSubscriber < ActiveSupport::Subscriber
def start_process_message(event)
start_process_transaction(event: event, kind: 'process_message')
end
def process_message(_event)

def process_message(event)
record_outcome(event: event)
ElasticAPM.end_transaction
end

def start_process_batch(event)
start_process_transaction(event: event, kind: 'process_batch')
end
def process_batch(_event)

def process_batch(event)
record_outcome(event: event)
ElasticAPM.end_transaction
end

private # only public methods will be subscribed

def start_process_transaction(event:, kind:)
ElasticAPM.start_transaction(kind, TYPE)
ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION)
ElasticAPM.start_transaction("#{event.payload[:consumer_class]}##{kind}", TYPE)
ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar',
framework_version: Racecar::VERSION)
end

def record_outcome(event:)
transaction = ElasticAPM.current_transaction
error_present = !event.payload[:unrecoverable_delivery_error].nil?
transaction.outcome = error_present ? Transaction::Outcome::FAILURE : Transaction::Outcome::SUCCESS
transaction.done(error_present ? :error : :success)
end
end

# @api private
class ProducerSubscriber < ActiveSupport::Subscriber
def start_deliver_message(event)
ElasticAPM.start_transaction('deliver_message',TYPE)
ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar', framework_version: Racecar::VERSION)
def start_deliver_message(_event)
ElasticAPM.start_transaction('deliver_message', TYPE)
ElasticAPM.current_transaction.context.set_service(framework_name: 'racecar',
framework_version: Racecar::VERSION)
end

def deliver_message(_event)
Expand All @@ -71,7 +85,7 @@ def install
register 'Racecar', 'racecar', RacecarSpy.new
end
end

rescue LoadError
# no active support available
warn "ActiveSupport not found."
end
60 changes: 52 additions & 8 deletions spec/elastic_apm/spies/racecar_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,72 @@
# frozen_string_literal: true

require 'spec_helper'

begin
require 'active_support/notifications'
require "active_support/subscriber"
require 'active_support/subscriber'
require 'racecar'

module ElasticAPM
RSpec.describe 'Spy: Racecar', :intercept do
let(:instrumentation_payload) do
{ consumer_class: 'SpecConsumer',
topic: 'spec_topic',
partition: '0',
offset: '1',
create_time: Time.now,
key: '1',
value: { key: 'value' },
headers: { key: 'value' } }
end
it 'captures the instrumentation' do
with_agent do
ActiveSupport::Notifications.instrument('start_process_message.racecar')
ActiveSupport::Notifications.instrument('process_message.racecar') do
ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload)
ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do
# this is the body of the racecar consumer #process method
end
first_transaction = @intercepted.transactions.first
expect(first_transaction).not_to be_nil
expect(first_transaction.name).to eq('process_message')
expect(first_transaction.name).to eq("#{instrumentation_payload[:consumer_class]}#process_message")
expect(first_transaction.type).to eq('kafka')
expect(first_transaction.context.service.framework.name).to eq('racecar')
end
end

describe 'transaction outcome' do
it 'records success when no delivery error happen' do
with_agent do
ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload)
ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do
# this is the body of the racecar consumer #process method
end
first_transaction = @intercepted.transactions.first
expect(first_transaction.outcome).to eq(Transaction::Outcome::SUCCESS)
end
end
it 'records failure when with a unrecoverable delivery error' do
instrumentation_payload[:unrecoverable_delivery_error] = true
with_agent do
ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload)
ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do
# this is the body of the racecar consumer #process method
end
first_transaction = @intercepted.transactions.first
expect(first_transaction.outcome).to eq(Transaction::Outcome::FAILURE)
end
end
it 'records failure when with a recoverable delivery error' do
instrumentation_payload[:unrecoverable_delivery_error] = false
with_agent do
ActiveSupport::Notifications.instrument('start_process_message.racecar', instrumentation_payload)
ActiveSupport::Notifications.instrument('process_message.racecar', instrumentation_payload) do
# this is the body of the racecar consumer #process method
end
first_transaction = @intercepted.transactions.first
expect(first_transaction.outcome).to eq(Transaction::Outcome::FAILURE)
end
end
end
end
end

rescue LoadError # in case we don't have ActiveSupport
end
warn "ActiveSupport not found, skipping."
end
Loading