diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb new file mode 100644 index 00000000..e359d6a7 --- /dev/null +++ b/app/jobs/retry_mqtt_message_job.rb @@ -0,0 +1,27 @@ +class RetryMQTTMessageJob < ApplicationJob + class RetryMessageHandlerError < RuntimeError + end + + queue_as :mqtt_retry + + + retry_on(RetryMessageHandlerError, attempts: 75, wait: ->(count) { + case count + when 0..12 + 5.seconds + when 12..20 # Every 30 seconds for the first 5 minutes + 30.seconds + else # Then every minute for an hour + 1.minute + end + }) do |_job, _exeception| + # No-op, this block ensures the exception isn't reraised and retried by Sidekiq + end + + def perform(topic, message) + result = MqttMessagesHandler.handle_topic(topic, message, false) + raise RetryMessageHandlerError if result.nil? + end +end + + diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index 86fd867c..8b89bcf1 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -1,5 +1,5 @@ class MqttMessagesHandler - def self.handle_topic(topic, message) + def self.handle_topic(topic, message, retry_on_nil_device=true) Sentry.set_tags('mqtt-topic': topic) crumb = Sentry::Breadcrumb.new( @@ -16,10 +16,14 @@ def self.handle_topic(topic, message) # The following do NOT need a device if topic.to_s.include?('inventory') DeviceInventory.create({ report: (message rescue nil) }) + return true end device = Device.find_by(device_token: device_token(topic)) - return if device.nil? + if device.nil? + handle_nil_device(topic, message, retry_on_nil_device) + return nil + end if topic.to_s.include?('raw') handle_readings(device, parse_raw_readings(message, device.id)) @@ -40,6 +44,17 @@ def self.handle_topic(topic, message) Sentry.add_breadcrumb(crumb) device.update_column(:hardware_info, json_message) end + return true + end + + def self.handle_nil_device(topic, message, retry_on_nil_device) + if !topic.to_s.include?("inventory") + retry_later(topic, message) if retry_on_nil_device + end + end + + def self.retry_later(topic, message) + RetryMQTTMessageJob.perform_later(topic, message) end # takes a packet and stores data diff --git a/config/initializers/sentry.rb b/config/initializers/sentry.rb index 3644d792..3737f33b 100644 --- a/config/initializers/sentry.rb +++ b/config/initializers/sentry.rb @@ -1,4 +1,5 @@ Sentry.init do |config| config.dsn = ENV['RAVEN_DSN_URL'] config.breadcrumbs_logger = [:sentry_logger, :active_support_logger, :http_logger] + config.excluded_exceptions = ["RetryMQTTMessageJob::RetryMessageHandlerError"] end diff --git a/config/sidekiq.yml b/config/sidekiq.yml index b4636ea5..59939973 100644 --- a/config/sidekiq.yml +++ b/config/sidekiq.yml @@ -4,6 +4,7 @@ :queues: - default - mailers + - mqtt_retry production: :concurrency: 25 diff --git a/spec/jobs/retry_mqtt_message_job_spec.rb b/spec/jobs/retry_mqtt_message_job_spec.rb new file mode 100644 index 00000000..9cfabd79 --- /dev/null +++ b/spec/jobs/retry_mqtt_message_job_spec.rb @@ -0,0 +1,21 @@ +require 'rails_helper' + +RSpec.describe RetryMQTTMessageJob, type: :job do + include ActiveJob::TestHelper + + it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do + topic = "topic/1/2/3" + message = '{"foo": "bar", "test": "message"}' + expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(true) + RetryMQTTMessageJob.perform_now(topic, message) + end + + it "retries if the handler returns nil" do + topic = "topic/1/2/3" + message = '{"foo": "bar", "test": "message"}' + expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(nil, nil, true) + assert_performed_jobs 3 do + RetryMQTTMessageJob.perform_later(topic, message) + end + end +end diff --git a/spec/lib/mqtt_messages_handler_spec.rb b/spec/lib/mqtt_messages_handler_spec.rb index a70ae86f..2d62d4b3 100644 --- a/spec/lib/mqtt_messages_handler_spec.rb +++ b/spec/lib/mqtt_messages_handler_spec.rb @@ -120,14 +120,24 @@ ) MqttMessagesHandler.handle_topic(@packet.topic, @hardware_info_packet.payload) end - end - context 'invalid packet' do - it 'it notifies Sentry' do - allow(Sentry).to receive(:capture_exception) - expect(Kairos).not_to receive(:http_post_to) + it 'defers messages with unknown device tokens if retry flag is true' do + expect(RetryMQTTMessageJob).to receive(:perform_later).with(@invalid_packet.topic, @invalid_packet.payload) MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload) - #expect(Sentry).to have_received(:capture_exception).with(RuntimeError) + end + + it 'does not defer messages with unknown device tokens if retry flag is false' do + expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@invalid_packet.topic, @invalid_packet.payload) + MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload, false) + end + + context 'invalid packet' do + it 'it notifies Sentry' do + allow(Sentry).to receive(:capture_exception) + expect(Kairos).not_to receive(:http_post_to) + MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload) + #expect(Sentry).to have_received(:capture_exception).with(RuntimeError) + end end end end @@ -243,12 +253,22 @@ expect(orphan_device.reload.device_handshake).to be true end - it 'does not handle bad topic' do + it 'defers messages with unknown device tokens if retry flag is true' do expect(device.hardware_info["id"]).to eq(47) + expect(RetryMQTTMessageJob).to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) device.reload expect(device.hardware_info["id"]).to eq(47) expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) end + + it 'does not defer messages with unknown device tokens if retry flag is false' do + expect(device.hardware_info["id"]).to eq(47) + expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) + MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload, false) + device.reload + expect(device.hardware_info["id"]).to eq(47) + expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) + end end end