From 09ee67d485d47d25b3963a739af7d1ec15f20631 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Wed, 10 Jul 2024 12:57:55 +0200 Subject: [PATCH] Message forwarding logic moved to sidekiq job --- app/controllers/v0/readings_controller.rb | 14 ++--- app/jobs/mqtt_forwarding_job.rb | 41 +++++++++++++ app/jobs/retry_mqtt_message_job.rb | 18 +----- app/jobs/send_to_datastore_job.rb | 26 +++----- app/lib/mqtt_messages_handler.rb | 9 +-- app/models/concerns/message_forwarding.rb | 24 +------- app/models/raw_storer.rb | 10 --- app/models/storer.rb | 10 --- lib/tasks/mqtt_subscriber.rake | 2 +- spec/jobs/mqtt_forwarding_job_spec.rb | 75 +++++++++++++++++++++++ spec/jobs/retry_mqtt_message_job_spec.rb | 26 +------- spec/lib/mqtt_messages_handler_spec.rb | 8 +-- spec/models/raw_storer_spec.rb | 34 ++-------- spec/models/storer_spec.rb | 24 +------- 14 files changed, 145 insertions(+), 176 deletions(-) create mode 100644 app/jobs/mqtt_forwarding_job.rb create mode 100644 spec/jobs/mqtt_forwarding_job_spec.rb diff --git a/app/controllers/v0/readings_controller.rb b/app/controllers/v0/readings_controller.rb index 17311a1c..5e66a3f0 100644 --- a/app/controllers/v0/readings_controller.rb +++ b/app/controllers/v0/readings_controller.rb @@ -38,14 +38,12 @@ def create def legacy_create if request.headers['X-SmartCitizenData'] - MQTTClientFactory.create_client({clean_session: true, client_id: nil}) do |mqtt_client| - storer = RawStorer.new(mqtt_client) - JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading| - mac = request.headers['X-SmartCitizenMacADDR'] - version = request.headers['X-SmartCitizenVersion'] - ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip) - storer.store(raw_reading,mac,version,ip) - end + storer = RawStorer.new + JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading| + mac = request.headers['X-SmartCitizenMacADDR'] + version = request.headers['X-SmartCitizenVersion'] + ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip) + storer.store(raw_reading,mac,version,ip) end end diff --git a/app/jobs/mqtt_forwarding_job.rb b/app/jobs/mqtt_forwarding_job.rb new file mode 100644 index 00000000..8cdffe26 --- /dev/null +++ b/app/jobs/mqtt_forwarding_job.rb @@ -0,0 +1,41 @@ +class MQTTForwardingJob < ApplicationJob + + def perform(device_id, reading) + begin + device = Device.find(device_id) + forwarder = MQTTForwarder.new(mqtt_client) + payload = payload_for(device, reading) + forwarder.forward_reading(device.forwarding_token, device.id, payload) + ensure + disconnect_mqtt! + end + end + + private + + def payload_for(device, reading) + renderer.render( + partial: "v0/devices/device", + locals: { + device: device.reload, + current_user: nil, + slim_owner: true + } + ) + end + + def mqtt_client + @mqtt_client ||= MQTTClientFactory.create_client({ + clean_session: true, client_id: nil + }) + end + + def renderer + @renderer ||= ActionController::Base.new.view_context + end + + def disconnect_mqtt! + @mqtt_client&.disconnect + end +end + diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb index caf0e737..1fdeb5cd 100644 --- a/app/jobs/retry_mqtt_message_job.rb +++ b/app/jobs/retry_mqtt_message_job.rb @@ -19,26 +19,14 @@ class RetryMessageHandlerError < RuntimeError end def perform(topic, message) - begin - result = handler.handle_topic(topic, message, false) - raise RetryMessageHandlerError if result.nil? - ensure - disconnect_mqtt - end + result = handler.handle_topic(topic, message, false) + raise RetryMessageHandlerError if result.nil? end private def handler - @handler ||= MqttMessagesHandler.new(mqtt_client) - end - - def mqtt_client - @mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil }) - end - - def disconnect_mqtt - @mqtt_client&.disconnect + @handler ||= MqttMessagesHandler.new end end diff --git a/app/jobs/send_to_datastore_job.rb b/app/jobs/send_to_datastore_job.rb index 184289ec..29cb3d7c 100644 --- a/app/jobs/send_to_datastore_job.rb +++ b/app/jobs/send_to_datastore_job.rb @@ -2,28 +2,16 @@ class SendToDatastoreJob < ApplicationJob queue_as :default def perform(data_param, device_id) - begin - @device = Device.includes(:components).find(device_id) - the_data = JSON.parse(data_param) - the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index| - # move to async method call - do_update = index == 0 - storer.store(@device, reading, do_update) - end - ensure - disconnect_mqtt + @device = Device.includes(:components).find(device_id) + the_data = JSON.parse(data_param) + the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index| + # move to async method call + do_update = index == 0 + storer.store(@device, reading, do_update) end end def storer - @storer ||= Storer.new(mqtt_client) - end - - def mqtt_client - @mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil }) - end - - def disconnect_mqtt - @mqtt_client&.disconnect + @storer ||= Storer.new end end diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index c932df6b..e2989c89 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -1,9 +1,5 @@ class MqttMessagesHandler - def initialize(mqtt_client) - @mqtt_client = mqtt_client - end - def handle_topic(topic, message, retry_on_nil_device=true) Sentry.set_tags('mqtt-topic': topic) @@ -138,10 +134,7 @@ def data(message) private - attr_reader :mqtt_client - - def storer - @storer ||= Storer.new(mqtt_client) + @storer ||= Storer.new end end diff --git a/app/models/concerns/message_forwarding.rb b/app/models/concerns/message_forwarding.rb index 544f7d67..5136d71e 100644 --- a/app/models/concerns/message_forwarding.rb +++ b/app/models/concerns/message_forwarding.rb @@ -4,30 +4,8 @@ module MessageForwarding def forward_reading(device, reading) if device.forward_readings? - forwarder = MQTTForwarder.new(mqtt_client) - payload = payload_for(device, reading) - forwarder.forward_reading(device.forwarding_token, device.id, payload) + MQTTForwardingJob.perform_later(device.id, reading) end end - def payload_for(device, reading) - renderer.render( - partial: "v0/devices/device", - locals: { - device: device.reload, - current_user: nil, - slim_owner: true - } - ) - end - - private - - def mqtt_client - raise NotImplementedError - end - - def renderer - raise NotImplementedError - end end diff --git a/app/models/raw_storer.rb b/app/models/raw_storer.rb index e712fb54..0e46402b 100644 --- a/app/models/raw_storer.rb +++ b/app/models/raw_storer.rb @@ -5,11 +5,6 @@ class RawStorer include MessageForwarding - def initialize(mqtt_client, renderer=nil) - @mqtt_client = mqtt_client - @renderer = renderer || ActionController::Base.new.view_context - end - def store data, mac, version, ip, raise_errors=false success = true @@ -81,9 +76,4 @@ def store data, mac, version, ip, raise_errors=false end end end - - private - - attr_reader :mqtt_client, :renderer - end diff --git a/app/models/storer.rb b/app/models/storer.rb index e67cad6e..bd40d9cd 100644 --- a/app/models/storer.rb +++ b/app/models/storer.rb @@ -2,11 +2,6 @@ class Storer include DataParser::Storer include MessageForwarding - def initialize(mqtt_client, renderer=nil) - @mqtt_client = mqtt_client - @renderer = renderer || ActionController::Base.new.view_context - end - def store device, reading, do_update = true begin parsed_reading = Storer.parse_reading(device, reading) @@ -47,9 +42,4 @@ def kairos_publish(reading_data) #NOTE: If you want to use the Telnet port below, make sure it is open! Redis.current.publish('telnet_queue', reading_data.to_json) end - - private - - attr_reader :mqtt_client, :renderer - end diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 526583d0..41c45a15 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -31,7 +31,7 @@ namespace :mqtt do mqtt_log.info "Connected to #{client.host}" mqtt_log.info "Using clean_session setting: #{client.clean_session}" - message_handler = MqttMessagesHandler.new(client) + message_handler = MqttMessagesHandler.new client.subscribe(*mqtt_topics.flat_map { |topic| topic = topic == "" ? topic : topic + "/" diff --git a/spec/jobs/mqtt_forwarding_job_spec.rb b/spec/jobs/mqtt_forwarding_job_spec.rb new file mode 100644 index 00000000..24b273d1 --- /dev/null +++ b/spec/jobs/mqtt_forwarding_job_spec.rb @@ -0,0 +1,75 @@ +require 'rails_helper' + +RSpec.describe MQTTForwardingJob, type: :job do + + let(:forwarding_token) { "abc123_forwarding_token" } + + let(:device) { create(:device) } + + let(:reading) { double(:reading) } + + let(:mqtt_client) { + double(:mqtt_client).tap do |mqtt_client| + allow(mqtt_client).to receive(:disconnect) + end + } + + let(:device_json) { + double(:device_json) + } + + let(:renderer) { + double(:renderer).tap do |renderer| + allow(renderer).to receive(:render).and_return(device_json) + end + } + + let(:forwarder) { + double(:forwarder).tap do |forwarder| + allow(forwarder).to receive(:forward_reading) + end + } + + before do + allow(MQTTClientFactory).to receive(:create_client).and_return(mqtt_client) + allow_any_instance_of(ActionController::Base).to receive(:view_context).and_return(renderer) + allow(MQTTForwarder).to receive(:new).and_return(forwarder) + allow_any_instance_of(Device).to receive(:forwarding_token).and_return(forwarding_token) + end + + it "creates an mqtt client with a clean session and no client id" do + MQTTForwardingJob.perform_now(device.id, reading) + expect(MQTTClientFactory).to have_received(:create_client).with({ + clean_session: true, + client_id: nil + }) + end + + it "creates a forwarder with the mqtt client" do + MQTTForwardingJob.perform_now(device.id, reading) + expect(MQTTForwarder).to have_received(:new).with(mqtt_client) + end + + it "renders the device json for the given device, as an unauthorized user" do + MQTTForwardingJob.perform_now(device.id, reading) + expect(renderer).to have_received(:render).with({ + partial: "v0/devices/device", + locals: { + device: device.reload, + current_user: nil, + slim_owner: true + } + }) + end + + it "forwards using the device's id and forwarding token, with the rendered json payload" do + MQTTForwardingJob.perform_now(device.id, reading) + expect(forwarder).to have_received(:forward_reading).with(forwarding_token, device.id, device_json) + end + + it "disconnects the MQTT client" do + MQTTForwardingJob.perform_now(device.id, reading) + expect(mqtt_client).to have_received(:disconnect) + end + +end diff --git a/spec/jobs/retry_mqtt_message_job_spec.rb b/spec/jobs/retry_mqtt_message_job_spec.rb index 81cc1d67..841f2687 100644 --- a/spec/jobs/retry_mqtt_message_job_spec.rb +++ b/spec/jobs/retry_mqtt_message_job_spec.rb @@ -3,12 +3,6 @@ RSpec.describe RetryMQTTMessageJob, type: :job do include ActiveJob::TestHelper - let(:mqtt_client) { - double(:mqtt_client).tap do |mqtt_client| - allow(mqtt_client).to receive(:disconnect) - end - } - let(:handler_results) { [true] } let(:mqtt_message_handler) { @@ -22,18 +16,12 @@ let(:message) { '{"foo": "bar", "test": "message"}' } before do - allow(MQTTClientFactory).to receive(:create_client).and_return(mqtt_client) allow(MqttMessagesHandler).to receive(:new).and_return(mqtt_message_handler) end - it "creates an MQTT client, overriding clean_session to true and the client_id to nil" do - RetryMQTTMessageJob.perform_now(topic, message) - expect(MQTTClientFactory).to have_received(:create_client).with({clean_session: true, client_id: nil }) - end - - it "creates an MQTTMessagesHandler, passing the client" do + it "creates an MQTTMessagesHandler" do RetryMQTTMessageJob.perform_now(topic, message) - expect(MqttMessagesHandler).to have_received(:new).with(mqtt_client) + expect(MqttMessagesHandler).to have_received(:new) end it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do @@ -41,11 +29,6 @@ expect(mqtt_message_handler).to have_received(:handle_topic).with(topic, message, false) end - it "disconnects the client when done" do - RetryMQTTMessageJob.perform_now(topic, message) - expect(mqtt_client).to have_received(:disconnect) - end - context "when the handler returns nil" do let(:handler_results) { [nil, nil, true] } @@ -54,10 +37,5 @@ RetryMQTTMessageJob.perform_later(topic, message) end end - - it "closes the mqtt connection" do - RetryMQTTMessageJob.perform_now(topic, message) - expect(mqtt_client).to have_received(:disconnect) - end end end diff --git a/spec/lib/mqtt_messages_handler_spec.rb b/spec/lib/mqtt_messages_handler_spec.rb index cdc39749..c015d2a6 100644 --- a/spec/lib/mqtt_messages_handler_spec.rb +++ b/spec/lib/mqtt_messages_handler_spec.rb @@ -8,14 +8,8 @@ let(:device_inventory) { create(:device_inventory, report: '{"random_property": "random_result"}') } - let(:mqtt_client) { - double(:mqtt_client).tap do |mqtt_client| - allow(mqtt_client).to receive(:publish) - end - } - subject(:message_handler) { - MqttMessagesHandler.new(mqtt_client) + MqttMessagesHandler.new } diff --git a/spec/models/raw_storer_spec.rb b/spec/models/raw_storer_spec.rb index d46c89ad..989e1ab2 100644 --- a/spec/models/raw_storer_spec.rb +++ b/spec/models/raw_storer_spec.rb @@ -27,20 +27,8 @@ def to_ts(time) Component.create!(id: 21, device: device, sensor: Sensor.find(21)) end - let(:mqtt_client) { - double(:mqtt_client).tap do |mqtt_client| - allow(mqtt_client).to receive(:publish) - end - } - - let(:renderer) { - # TODO: refactor these tests so they don't depend on the actual rendering, - # then replace this with a mock, to reduce brittleness. - ActionController::Base.new.view_context - } - subject(:storer) { - RawStorer.new(mqtt_client, renderer) + RawStorer.new } let(:json) { @@ -102,23 +90,9 @@ def to_ts(time) end context "when the device allows forwarding" do - let(:device_json) { - double(:device_json) - } - - let(:renderer) { - double(:renderer).tap do |renderer| - allow(renderer).to receive(:render).and_return(device_json) - end - } - - it "forwards the message with the forwarding token and the device's id" do - forwarding_token = double(:forwarding_token) - allow_any_instance_of(Device).to receive(:forwarding_token).and_return(forwarding_token) + it "forwards the message" do allow_any_instance_of(Device).to receive(:forward_readings?).and_return(true) - forwarder = double(:mqtt_forwarder) - allow(MQTTForwarder).to receive(:new).and_return(forwarder) - expect(forwarder).to receive(:forward_reading).with(forwarding_token, device.id, device_json) + expect(MQTTForwardingJob).to receive(:perform_later).with(device.id, json) storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) end end @@ -126,7 +100,7 @@ def to_ts(time) context "when the device does not have allow forwarding" do it "does not forward the message" do allow_any_instance_of(Device).to receive(:forward_readings?).and_return(false) - expect_any_instance_of(MQTTForwarder).not_to receive(:forward_reading) + expect(MQTTForwardingJob).not_to receive(:perform_later) storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) end end diff --git a/spec/models/storer_spec.rb b/spec/models/storer_spec.rb index fc205344..43923a15 100644 --- a/spec/models/storer_spec.rb +++ b/spec/models/storer_spec.rb @@ -5,21 +5,8 @@ let(:device) { create(:device, device_token: 'aA1234') } let(:component){ create(:component, id: 12, device: device, sensor: sensor) } - - let(:mqtt_client) { - double(:mqtt_client).tap do |mqtt_client| - allow(mqtt_client).to receive(:publish) - end - } - - let(:renderer) { - double(:renderer).tap do |renderer| - allow(renderer).to receive(:render) - end - } - subject(:storer) { - Storer.new(mqtt_client, renderer) + Storer.new } context 'when receiving good data' do @@ -89,13 +76,8 @@ } it "forwards the message with the forwarding token and the device's id" do - forwarding_token = double(:forwarding_token) - forwarder = double(:mqtt_forwarder) - allow(device).to receive(:forwarding_token).and_return(forwarding_token) allow(device).to receive(:forward_readings?).and_return(true) - allow(renderer).to receive(:render).and_return(device_json) - allow(MQTTForwarder).to receive(:new).and_return(forwarder) - expect(forwarder).to receive(:forward_reading).with(forwarding_token, device.id, device_json) + expect(MQTTForwardingJob).to receive(:perform_later).with(device.id, @data) storer.store(device, @data) end end @@ -103,7 +85,7 @@ context "when the device does not allow forwarding" do it "does not forward the message" do allow(device).to receive(:forward_readings?).and_return(false) - expect_any_instance_of(MQTTForwarder).not_to receive(:forward_reading) + expect(MQTTForwardingJob).not_to receive(:perform_later) storer.store(device, @data) end end