Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred retry of hardware_info handling when device not found #317

Merged
merged 7 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions app/jobs/retry_mqtt_message_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
class RetryMQTTMessageJob < ApplicationJob
class RetryMessageHandlerError < RuntimeError
end

queue_as :mqtt_retry


retry_on(RetryMessageHandlerError, attempts: 75, wait: ->(count) {
case count
when 0..12
5.seconds
when 12..20 # Every 30 seconds for the first 5 minutes
30.seconds
else # Then every minute for an hour
1.minute
end
}) do |_job, _exeception|
# No-op, this block ensures the exception isn't reraised and retried by Sidekiq
end

def perform(topic, message)
result = MqttMessagesHandler.handle_topic(topic, message, false)
raise RetryMessageHandlerError if result.nil?
end
end


19 changes: 17 additions & 2 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
class MqttMessagesHandler
def self.handle_topic(topic, message)
def self.handle_topic(topic, message, retry_on_nil_device=true)
Sentry.set_tags('mqtt-topic': topic)

crumb = Sentry::Breadcrumb.new(
Expand All @@ -16,10 +16,14 @@ def self.handle_topic(topic, message)
# The following do NOT need a device
if topic.to_s.include?('inventory')
DeviceInventory.create({ report: (message rescue nil) })
return true
end

device = Device.find_by(device_token: device_token(topic))
return if device.nil?
if device.nil?
handle_nil_device(topic, message, retry_on_nil_device)
return nil
end

if topic.to_s.include?('raw')
handle_readings(device, parse_raw_readings(message, device.id))
Expand All @@ -40,6 +44,17 @@ def self.handle_topic(topic, message)
Sentry.add_breadcrumb(crumb)
device.update_column(:hardware_info, json_message)
end
return true
end

def self.handle_nil_device(topic, message, retry_on_nil_device)
if !topic.to_s.include?("inventory")
retry_later(topic, message) if retry_on_nil_device
end
end

def self.retry_later(topic, message)
RetryMQTTMessageJob.perform_later(topic, message)
end

# takes a packet and stores data
Expand Down
1 change: 1 addition & 0 deletions config/initializers/sentry.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Sentry.init do |config|
config.dsn = ENV['RAVEN_DSN_URL']
config.breadcrumbs_logger = [:sentry_logger, :active_support_logger, :http_logger]
config.excluded_exceptions = ["RetryMQTTMessageJob::RetryMessageHandlerError"]
end
1 change: 1 addition & 0 deletions config/sidekiq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
:queues:
- default
- mailers
- mqtt_retry

production:
:concurrency: 25
Expand Down
21 changes: 21 additions & 0 deletions spec/jobs/retry_mqtt_message_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require 'rails_helper'

RSpec.describe RetryMQTTMessageJob, type: :job do
include ActiveJob::TestHelper

it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do
topic = "topic/1/2/3"
message = '{"foo": "bar", "test": "message"}'
expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(true)
RetryMQTTMessageJob.perform_now(topic, message)
end

it "retries if the handler returns nil" do
topic = "topic/1/2/3"
message = '{"foo": "bar", "test": "message"}'
expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(nil, nil, true)
assert_performed_jobs 3 do
RetryMQTTMessageJob.perform_later(topic, message)
end
end
end
34 changes: 27 additions & 7 deletions spec/lib/mqtt_messages_handler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,24 @@
)
MqttMessagesHandler.handle_topic(@packet.topic, @hardware_info_packet.payload)
end
end

context 'invalid packet' do
it 'it notifies Sentry' do
allow(Sentry).to receive(:capture_exception)
expect(Kairos).not_to receive(:http_post_to)
it 'defers messages with unknown device tokens if retry flag is true' do
expect(RetryMQTTMessageJob).to receive(:perform_later).with(@invalid_packet.topic, @invalid_packet.payload)
MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload)
#expect(Sentry).to have_received(:capture_exception).with(RuntimeError)
end

it 'does not defer messages with unknown device tokens if retry flag is false' do
expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@invalid_packet.topic, @invalid_packet.payload)
MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload, false)
end

context 'invalid packet' do
it 'it notifies Sentry' do
allow(Sentry).to receive(:capture_exception)
expect(Kairos).not_to receive(:http_post_to)
MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload)
#expect(Sentry).to have_received(:capture_exception).with(RuntimeError)
end
end
end
end
Expand Down Expand Up @@ -243,12 +253,22 @@
expect(orphan_device.reload.device_handshake).to be true
end

it 'does not handle bad topic' do
it 'defers messages with unknown device tokens if retry flag is true' do
expect(device.hardware_info["id"]).to eq(47)
expect(RetryMQTTMessageJob).to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload)
MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload)
device.reload
expect(device.hardware_info["id"]).to eq(47)
expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json))
end

it 'does not defer messages with unknown device tokens if retry flag is false' do
expect(device.hardware_info["id"]).to eq(47)
expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload)
MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload, false)
device.reload
expect(device.hardware_info["id"]).to eq(47)
expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json))
end
end
end
Loading