From 736ad6102df74e7b1b7f5f1ce336ac51b5726b65 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 22 Apr 2024 19:33:02 +0200 Subject: [PATCH 1/7] Deferred retry of hardware_info handling when device not found Since we deployed the devices refactor, a slightly subtle bug has emerged: if a device sends the hardware_info packet before the user has completed registration, the device is not found, the info is not saved, and therefore the device information in the UI is incomplete. This (hopefully, I like to test on staging with a real device) fixes this by relying on Sidekiq's built in error handling functionality to retry the hardware_info message (with backoff) in the event that no device corresponds to the given key. Sidekiq defaults to 25 retries with backoff (corresponding to about 3 weeks of elapsed time between the first and the last try), which should be sufficient: https://docs.gitlab.com/ee/development/sidekiq/. --- app/jobs/retry_mqtt_message_job.rb | 10 ++++++++++ app/lib/mqtt_messages_handler.rb | 17 +++++++++++++++-- spec/jobs/retry_mqtt_message_job_spec.rb | 19 +++++++++++++++++++ spec/lib/mqtt_messages_handler_spec.rb | 12 +++++++++++- 4 files changed, 55 insertions(+), 3 deletions(-) create mode 100644 app/jobs/retry_mqtt_message_job.rb create mode 100644 spec/jobs/retry_mqtt_message_job_spec.rb diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb new file mode 100644 index 00000000..de868f9d --- /dev/null +++ b/app/jobs/retry_mqtt_message_job.rb @@ -0,0 +1,10 @@ +class RetryMQTTMessageJob < ApplicationJob + queue_as :default + + def perform(topic, message) + result = MqttMessagesHandler.handle_topic(topic, message, false) + raise "Message handler returned nil, retrying" if result.nil? + end +end + + diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index 86fd867c..607d32be 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -1,5 +1,5 @@ class MqttMessagesHandler - def self.handle_topic(topic, message) + def self.handle_topic(topic, message, retry_on_nil_device=true) Sentry.set_tags('mqtt-topic': topic) crumb = Sentry::Breadcrumb.new( @@ -19,7 +19,10 @@ def self.handle_topic(topic, message) end device = Device.find_by(device_token: device_token(topic)) - return if device.nil? + if device.nil? + handle_nil_device(topic, message, retry_on_nil_device) + return nil + end if topic.to_s.include?('raw') handle_readings(device, parse_raw_readings(message, device.id)) @@ -42,6 +45,16 @@ def self.handle_topic(topic, message) end end + def self.handle_nil_device(topic, message, retry_on_nil_device) + if topic.to_s.include?("info") + retry_later(topic, message) if retry_on_nil_device + end + end + + def self.retry_later(topic, message) + RetryMQTTMessageJob.perform_later(topic, message) + end + # takes a packet and stores data def self.handle_readings(device, message) data = self.data(message) diff --git a/spec/jobs/retry_mqtt_message_job_spec.rb b/spec/jobs/retry_mqtt_message_job_spec.rb new file mode 100644 index 00000000..2746c164 --- /dev/null +++ b/spec/jobs/retry_mqtt_message_job_spec.rb @@ -0,0 +1,19 @@ +require 'rails_helper' + +RSpec.describe RetryMQTTMessageJob, type: :job do + it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do + topic = "topic/1/2/3" + message = '{"foo": "bar", "test": "message"}' + expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(true) + RetryMQTTMessageJob.perform_now(topic, message) + end + + it "raises an error if the handler returns nil" do + topic = "topic/1/2/3" + message = '{"foo": "bar", "test": "message"}' + expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(nil) + expect { + RetryMQTTMessageJob.perform_now(topic, message) + }.to raise_error + end +end diff --git a/spec/lib/mqtt_messages_handler_spec.rb b/spec/lib/mqtt_messages_handler_spec.rb index a70ae86f..5e2fb7ea 100644 --- a/spec/lib/mqtt_messages_handler_spec.rb +++ b/spec/lib/mqtt_messages_handler_spec.rb @@ -243,12 +243,22 @@ expect(orphan_device.reload.device_handshake).to be true end - it 'does not handle bad topic' do + it 'defers messages with unknown device tokens if retry flag is true' do expect(device.hardware_info["id"]).to eq(47) + expect(RetryMQTTMessageJob).to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) device.reload expect(device.hardware_info["id"]).to eq(47) expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) end + + it 'does not defer messages with unknown device tokens if retry flag is false' do + expect(device.hardware_info["id"]).to eq(47) + expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) + MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload, false) + device.reload + expect(device.hardware_info["id"]).to eq(47) + expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) + end end end From 1760d33fb1d7a83530dbccdd4221792f50dd71b4 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 6 May 2024 06:49:57 +0200 Subject: [PATCH 2/7] seperate queue for mqtt retries, with custom backoff schedule --- app/jobs/retry_mqtt_message_job.rb | 14 +++++++++++++- config/sidekiq.yml | 1 + 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb index de868f9d..88eb2f6b 100644 --- a/app/jobs/retry_mqtt_message_job.rb +++ b/app/jobs/retry_mqtt_message_job.rb @@ -1,5 +1,17 @@ class RetryMQTTMessageJob < ApplicationJob - queue_as :default + queue_as :mqtt_retry + + sidekiq_retry_in do |count| + case count + when 0..10 # Every 30 seconds for the first 5 minutes + 30.seconds + when 11..55 # Then every minute for an hour + 1.minute + else + false # Fallback to default backoff after an hour, + # see https://github.com/sidekiq/sidekiq/issues/2338 + end + end def perform(topic, message) result = MqttMessagesHandler.handle_topic(topic, message, false) diff --git a/config/sidekiq.yml b/config/sidekiq.yml index b4636ea5..59939973 100644 --- a/config/sidekiq.yml +++ b/config/sidekiq.yml @@ -4,6 +4,7 @@ :queues: - default - mailers + - mqtt_retry production: :concurrency: 25 From 77bb5256215c8dc377ee48885be20d8a501256ac Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 6 May 2024 06:52:09 +0200 Subject: [PATCH 3/7] queue data packets for unknown devices for retry --- app/lib/mqtt_messages_handler.rb | 2 +- spec/lib/mqtt_messages_handler_spec.rb | 22 ++++++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index 607d32be..c95cef7d 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -46,7 +46,7 @@ def self.handle_topic(topic, message, retry_on_nil_device=true) end def self.handle_nil_device(topic, message, retry_on_nil_device) - if topic.to_s.include?("info") + if !topic.to_s.include?("inventory") retry_later(topic, message) if retry_on_nil_device end end diff --git a/spec/lib/mqtt_messages_handler_spec.rb b/spec/lib/mqtt_messages_handler_spec.rb index 5e2fb7ea..2d62d4b3 100644 --- a/spec/lib/mqtt_messages_handler_spec.rb +++ b/spec/lib/mqtt_messages_handler_spec.rb @@ -120,14 +120,24 @@ ) MqttMessagesHandler.handle_topic(@packet.topic, @hardware_info_packet.payload) end - end - context 'invalid packet' do - it 'it notifies Sentry' do - allow(Sentry).to receive(:capture_exception) - expect(Kairos).not_to receive(:http_post_to) + it 'defers messages with unknown device tokens if retry flag is true' do + expect(RetryMQTTMessageJob).to receive(:perform_later).with(@invalid_packet.topic, @invalid_packet.payload) MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload) - #expect(Sentry).to have_received(:capture_exception).with(RuntimeError) + end + + it 'does not defer messages with unknown device tokens if retry flag is false' do + expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@invalid_packet.topic, @invalid_packet.payload) + MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload, false) + end + + context 'invalid packet' do + it 'it notifies Sentry' do + allow(Sentry).to receive(:capture_exception) + expect(Kairos).not_to receive(:http_post_to) + MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload) + #expect(Sentry).to have_received(:capture_exception).with(RuntimeError) + end end end end From bdff632749345fd1b2a9ee90f2e95850efd2c73c Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Tue, 7 May 2024 16:17:01 +0200 Subject: [PATCH 4/7] actually return true when the handler succeeds --- app/lib/mqtt_messages_handler.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index c95cef7d..86e7f108 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -43,6 +43,7 @@ def self.handle_topic(topic, message, retry_on_nil_device=true) Sentry.add_breadcrumb(crumb) device.update_column(:hardware_info, json_message) end + return true end def self.handle_nil_device(topic, message, retry_on_nil_device) From 5364a6e24be55cb10389b20f5929daba3d65a1b1 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Tue, 7 May 2024 17:40:34 +0200 Subject: [PATCH 5/7] tweak schedule for retries --- app/jobs/retry_mqtt_message_job.rb | 9 +++++---- app/lib/mqtt_messages_handler.rb | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb index 88eb2f6b..c5b53bc2 100644 --- a/app/jobs/retry_mqtt_message_job.rb +++ b/app/jobs/retry_mqtt_message_job.rb @@ -3,13 +3,14 @@ class RetryMQTTMessageJob < ApplicationJob sidekiq_retry_in do |count| case count - when 0..10 # Every 30 seconds for the first 5 minutes + when 0..12 + 5.seconds + when 12..20 # Every 30 seconds for the first 5 minutes 30.seconds - when 11..55 # Then every minute for an hour + when 20..75 # Then every minute for an hour 1.minute else - false # Fallback to default backoff after an hour, - # see https://github.com/sidekiq/sidekiq/issues/2338 + :discard end end diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index 86e7f108..8b89bcf1 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -16,6 +16,7 @@ def self.handle_topic(topic, message, retry_on_nil_device=true) # The following do NOT need a device if topic.to_s.include?('inventory') DeviceInventory.create({ report: (message rescue nil) }) + return true end device = Device.find_by(device_token: device_token(topic)) From 4e9775ed42cecec21b29f30c42e239022e9da228 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 13 May 2024 08:11:11 +0200 Subject: [PATCH 6/7] use activejob apis to express retry timeout logic --- app/jobs/retry_mqtt_message_job.rb | 29 +++++++++++++----------- config/initializers/sentry.rb | 1 + spec/jobs/retry_mqtt_message_job_spec.rb | 12 ++++++---- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb index c5b53bc2..1d1f507e 100644 --- a/app/jobs/retry_mqtt_message_job.rb +++ b/app/jobs/retry_mqtt_message_job.rb @@ -1,22 +1,25 @@ class RetryMQTTMessageJob < ApplicationJob + class RetryMessageHandlerError < RuntimeError + end queue_as :mqtt_retry - sidekiq_retry_in do |count| - case count - when 0..12 - 5.seconds - when 12..20 # Every 30 seconds for the first 5 minutes - 30.seconds - when 20..75 # Then every minute for an hour - 1.minute - else - :discard - end - end + + retry_on RetryMessageHandlerError, attempts: 75, wait: ->(count) { + case count + when 0..12 + 5.seconds + when 12..20 # Every 30 seconds for the first 5 minutes + 30.seconds + when 20..75 # Then every minute for an hour + 1.minute + else + :polynomially_longer + end + } def perform(topic, message) result = MqttMessagesHandler.handle_topic(topic, message, false) - raise "Message handler returned nil, retrying" if result.nil? + raise RetryMessageHandlerError if result.nil? end end diff --git a/config/initializers/sentry.rb b/config/initializers/sentry.rb index 3644d792..3737f33b 100644 --- a/config/initializers/sentry.rb +++ b/config/initializers/sentry.rb @@ -1,4 +1,5 @@ Sentry.init do |config| config.dsn = ENV['RAVEN_DSN_URL'] config.breadcrumbs_logger = [:sentry_logger, :active_support_logger, :http_logger] + config.excluded_exceptions = ["RetryMQTTMessageJob::RetryMessageHandlerError"] end diff --git a/spec/jobs/retry_mqtt_message_job_spec.rb b/spec/jobs/retry_mqtt_message_job_spec.rb index 2746c164..9cfabd79 100644 --- a/spec/jobs/retry_mqtt_message_job_spec.rb +++ b/spec/jobs/retry_mqtt_message_job_spec.rb @@ -1,6 +1,8 @@ require 'rails_helper' RSpec.describe RetryMQTTMessageJob, type: :job do + include ActiveJob::TestHelper + it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do topic = "topic/1/2/3" message = '{"foo": "bar", "test": "message"}' @@ -8,12 +10,12 @@ RetryMQTTMessageJob.perform_now(topic, message) end - it "raises an error if the handler returns nil" do + it "retries if the handler returns nil" do topic = "topic/1/2/3" message = '{"foo": "bar", "test": "message"}' - expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(nil) - expect { - RetryMQTTMessageJob.perform_now(topic, message) - }.to raise_error + expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(nil, nil, true) + assert_performed_jobs 3 do + RetryMQTTMessageJob.perform_later(topic, message) + end end end From e4e3aacb7161a8e53c4088a8df722eca9dae9dfb Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Wed, 15 May 2024 14:21:51 +0200 Subject: [PATCH 7/7] ensure retry jobs abandoned after an hour --- app/jobs/retry_mqtt_message_job.rb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb index 1d1f507e..e359d6a7 100644 --- a/app/jobs/retry_mqtt_message_job.rb +++ b/app/jobs/retry_mqtt_message_job.rb @@ -1,21 +1,22 @@ class RetryMQTTMessageJob < ApplicationJob class RetryMessageHandlerError < RuntimeError end + queue_as :mqtt_retry - retry_on RetryMessageHandlerError, attempts: 75, wait: ->(count) { + retry_on(RetryMessageHandlerError, attempts: 75, wait: ->(count) { case count when 0..12 5.seconds when 12..20 # Every 30 seconds for the first 5 minutes 30.seconds - when 20..75 # Then every minute for an hour + else # Then every minute for an hour 1.minute - else - :polynomially_longer end - } + }) do |_job, _exeception| + # No-op, this block ensures the exception isn't reraised and retried by Sidekiq + end def perform(topic, message) result = MqttMessagesHandler.handle_topic(topic, message, false)