From 522459051e986fe52f58c104d0b96d2b17c200cf Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Wed, 24 Apr 2024 09:39:44 +0200 Subject: [PATCH] WIP data forwarding --- app/controllers/v0/readings_controller.rb | 15 +++-- app/jobs/retry_mqtt_message_job.rb | 23 ++++++- app/jobs/send_to_datastore_job.rb | 27 ++++++-- app/lib/mqtt_client_factory.rb | 33 ++++++++++ app/lib/mqtt_forwarder.rb | 20 ++++++ app/lib/mqtt_messages_handler.rb | 33 +++++++--- app/models/concerns/message_forwarding.rb | 15 +++++ app/models/device.rb | 8 +++ app/models/raw_storer.rb | 12 +++- app/models/storer.rb | 44 ++++++++----- app/models/user.rb | 8 +++ ...423162838_add_forwarding_token_to_users.rb | 5 ++ db/schema.rb | 3 +- lib/tasks/mqtt_subscriber.rake | 6 +- spec/jobs/retry_mqtt_message_job_spec.rb | 65 ++++++++++++++++--- spec/lib/mqtt_forwarder_spec.rb | 0 spec/lib/mqtt_messages_handler_spec.rb | 46 ++++++++----- spec/models/device_spec.rb | 18 +++++ spec/models/raw_storer_spec.rb | 55 ++++++++++++++-- spec/models/storer_spec.rb | 44 +++++++++++-- spec/models/user_spec.rb | 43 ++++++++++++ 21 files changed, 438 insertions(+), 85 deletions(-) create mode 100644 app/lib/mqtt_client_factory.rb create mode 100644 app/lib/mqtt_forwarder.rb create mode 100644 app/models/concerns/message_forwarding.rb create mode 100644 db/migrate/20240423162838_add_forwarding_token_to_users.rb create mode 100644 spec/lib/mqtt_forwarder_spec.rb diff --git a/app/controllers/v0/readings_controller.rb b/app/controllers/v0/readings_controller.rb index 71ae774f..1d136b94 100644 --- a/app/controllers/v0/readings_controller.rb +++ b/app/controllers/v0/readings_controller.rb @@ -36,13 +36,14 @@ def create def legacy_create if request.headers['X-SmartCitizenData'] - JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading| - - mac = request.headers['X-SmartCitizenMacADDR'] - version = request.headers['X-SmartCitizenVersion'] - ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip) - - RawStorer.new(raw_reading,mac,version,ip) + MQTTClientFactory.create_client({clean_session: true, client_id: nil}) do |mqtt_client| + storer = RawStorer.new(mqtt_client) + JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading| + mac = request.headers['X-SmartCitizenMacADDR'] + version = request.headers['X-SmartCitizenVersion'] + ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip) + storer.store(raw_reading,mac,version,ip) + end end end diff --git a/app/jobs/retry_mqtt_message_job.rb b/app/jobs/retry_mqtt_message_job.rb index de868f9d..f297f158 100644 --- a/app/jobs/retry_mqtt_message_job.rb +++ b/app/jobs/retry_mqtt_message_job.rb @@ -2,9 +2,26 @@ class RetryMQTTMessageJob < ApplicationJob queue_as :default def perform(topic, message) - result = MqttMessagesHandler.handle_topic(topic, message, false) - raise "Message handler returned nil, retrying" if result.nil? + begin + result = handler.handle_topic(topic, message, false) + raise "Message handler returned nil, retrying" if result.nil? + ensure + disconnect_mqtt + end + end + + private + + def handler + @handler ||= MqttMessagesHandler.new(mqtt_client) end -end + def mqtt_client + @mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil }) + end + + def disconnect_mqtt + @mqtt_client&.disconnect + end +end diff --git a/app/jobs/send_to_datastore_job.rb b/app/jobs/send_to_datastore_job.rb index d3cff487..1513d41d 100644 --- a/app/jobs/send_to_datastore_job.rb +++ b/app/jobs/send_to_datastore_job.rb @@ -2,15 +2,28 @@ class SendToDatastoreJob < ApplicationJob queue_as :default def perform(data_param, device_id) + begin + @device = Device.includes(:components).find(device_id) + the_data = JSON.parse(data_param) + the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index| + # move to async method call + do_update = index == 0 + storer.store(@device, reading, do_update) + end + ensure + disconnect_mqtt + end + end - @device = Device.includes(:components).find(device_id) + def storer + @storer ||= Storer.new(mqtt_client) + end - the_data = JSON.parse(data_param) - the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index| - # move to async method call - do_update = index == 0 - Storer.new(@device, reading, do_update) - end + def mqtt_client + @mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil }) + end + def disconnect_mqtt + mqtt_client.disconnect if @mqtt_client end end diff --git a/app/lib/mqtt_client_factory.rb b/app/lib/mqtt_client_factory.rb new file mode 100644 index 00000000..43674811 --- /dev/null +++ b/app/lib/mqtt_client_factory.rb @@ -0,0 +1,33 @@ +module MQTTClientFactory + def self.create_client(args={}, &block) + host = args.fetch(:host, default_host) + port = args.fetch(:port, default_port) + clean_sesion = args.fetch(:clean_session, default_clean_session) + client_id = args.fetch(:client_id, default_client_id) + ssl = args.fetch(:ssl, default_ssl) + MQTT::Client.connect( + { host: host, port: port, clean_session: clean_sesion, client_id: client_id, ssl: ssl}, + &block + ) + end + + def self.default_host + ENV.fetch('MQTT_HOST', 'mqtt') + end + + def self.default_port + ENV.fetch('MQTT_PORT', "1883").to_i + end + + def self.default_clean_session + ENV.fetch('MQTT_CLEAN_SESSION', "true") == "true" + end + + def self.default_client_id + ENV.fetch('MQTT_CLIENT_ID', nil) + end + + def self.default_ssl + ENV.fetch('MQTT_SSL', "false") == "true" + end +end diff --git a/app/lib/mqtt_forwarder.rb b/app/lib/mqtt_forwarder.rb new file mode 100644 index 00000000..6b014139 --- /dev/null +++ b/app/lib/mqtt_forwarder.rb @@ -0,0 +1,20 @@ +class MQTTForwarder + def initialize(client, prefix="forward/devices", suffix="readings") + @client = client + @prefix = prefix + @suffix = suffix + end + + def forward_reading(token, device_id, reading) + topic = topic_path(token, device_id) + client.publish(topic, reading) + end + + private + + def topic_path(token, device_id) + [prefix, token, device_id, suffix].join("/") + end + + attr_reader :client, :prefix, :suffix +end diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index 607d32be..b34d8f1d 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -1,5 +1,10 @@ class MqttMessagesHandler - def self.handle_topic(topic, message, retry_on_nil_device=true) + + def initialize(mqtt_client) + @mqtt_client = mqtt_client + end + + def handle_topic(topic, message, retry_on_nil_device=true) Sentry.set_tags('mqtt-topic': topic) crumb = Sentry::Breadcrumb.new( @@ -45,23 +50,23 @@ def self.handle_topic(topic, message, retry_on_nil_device=true) end end - def self.handle_nil_device(topic, message, retry_on_nil_device) + def handle_nil_device(topic, message, retry_on_nil_device) if topic.to_s.include?("info") retry_later(topic, message) if retry_on_nil_device end end - def self.retry_later(topic, message) + def retry_later(topic, message) RetryMQTTMessageJob.perform_later(topic, message) end # takes a packet and stores data - def self.handle_readings(device, message) + def handle_readings(device, message) data = self.data(message) return if data.nil? or data&.empty? data.each do |reading| - Storer.new(device, reading) + storer.store(device, reading) end rescue Exception => e Sentry.capture_exception(e) @@ -71,7 +76,7 @@ def self.handle_readings(device, message) end # takes a raw packet and converts into JSON - def self.parse_raw_readings(message, device_id=nil) + def parse_raw_readings(message, device_id=nil) crumb = Sentry::Breadcrumb.new( category: "MqttMessagesHandler.parse_raw_readings", message: "Parsing raw readings", @@ -99,7 +104,7 @@ def self.parse_raw_readings(message, device_id=nil) JSON[reading] end - def self.handshake_device(topic) + def handshake_device(topic) orphan_device = OrphanDevice.find_by(device_token: device_token(topic)) return if orphan_device.nil? orphan_device.update!(device_handshake: true) @@ -109,12 +114,12 @@ def self.handshake_device(topic) end # takes a packet and returns 'device token' from topic - def self.device_token(topic) + def device_token(topic) topic[/device\/sck\/(.*?)\//m, 1].to_s end # takes a packet and returns 'data' from payload - def self.data(message) + def data(message) # TODO: what if message is empty? if message begin @@ -126,4 +131,14 @@ def self.data(message) raise "No data(message)" end end + + + private + + attr_reader :mqtt_client + + + def storer + @storer ||= Storer.new(mqtt_client) + end end diff --git a/app/models/concerns/message_forwarding.rb b/app/models/concerns/message_forwarding.rb new file mode 100644 index 00000000..420f3932 --- /dev/null +++ b/app/models/concerns/message_forwarding.rb @@ -0,0 +1,15 @@ +module MessageForwarding + + extend ActiveSupport::Concern + + def forward_reading(device, reading) + forwarder = MQTTForwarder.new(mqtt_client) + forwarder.forward_reading(device.forwarding_token, device.id, reading) if device.forward_readings? + end + + private + + def mqtt_client + raise NotImplementedError + end +end diff --git a/app/models/device.rb b/app/models/device.rb index 92fc8cd9..3d06d268 100644 --- a/app/models/device.rb +++ b/app/models/device.rb @@ -271,6 +271,14 @@ def hardware_slug hardware_slug_override || [hardware_type.downcase, hardware_version&.gsub(".", ",")].compact.join(":") end + def forward_readings? + owner.forward_device_readings? + end + + def forwarding_token + owner.forwarding_token + end + private def set_state diff --git a/app/models/raw_storer.rb b/app/models/raw_storer.rb index d1037329..43116504 100644 --- a/app/models/raw_storer.rb +++ b/app/models/raw_storer.rb @@ -3,9 +3,13 @@ # ingest raw data posted by Devices into Kairos and Postgres (backup purposes). class RawStorer + include MessageForwarding - def initialize data, mac, version, ip, raise_errors=false + def initialize(mqtt_client) + @mqtt_client = mqtt_client + end + def store data, mac, version, ip, raise_errors=false success = true begin @@ -62,6 +66,7 @@ def initialize data, mac, version, ip, raise_errors=false device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') end + forward_reading(device, data) rescue Exception => e success = false @@ -74,7 +79,10 @@ def initialize data, mac, version, ip, raise_errors=false rescue end end - end + private + + attr_reader :mqtt_client + end diff --git a/app/models/storer.rb b/app/models/storer.rb index efa5762a..c62a4359 100644 --- a/app/models/storer.rb +++ b/app/models/storer.rb @@ -1,14 +1,22 @@ class Storer include DataParser::Storer + include MessageForwarding - def initialize device, reading, do_update = true - @device = device - begin - parsed_reading = Storer.parse_reading(@device, reading) + def initialize(mqtt_client) + @mqtt_client = mqtt_client + end + def store device, reading, do_update = true + begin + parsed_reading = Storer.parse_reading(device, reading) kairos_publish(parsed_reading[:_data]) - update_device(parsed_reading[:parsed_ts], parsed_reading[:sql_data]) if do_update + if do_update + update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data]) + ws_publish(device) + end + + forward_reading(device, reading) rescue Exception => e Sentry.capture_exception(e) @@ -18,21 +26,20 @@ def initialize device, reading, do_update = true raise e unless e.nil? end - def update_device(parsed_ts, sql_data) + def update_device(device, parsed_ts, sql_data) return if parsed_ts <= Time.at(0) - if @device.last_reading_at.present? - # Comparison errors if @device.last_reading_at is nil (new devices). + if device.last_reading_at.present? + # Comparison errors if device.last_reading_at is nil (new devices). # Devices can post multiple readings, in a non-sorted order. # Do not update data with an older timestamp. - return if parsed_ts < @device.last_reading_at + return if parsed_ts < device.last_reading_at end - sql_data = @device.data.present? ? @device.data.merge(sql_data) : sql_data - @device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') + sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data + device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq - @device.update_component_timestamps(parsed_ts, sensor_ids) - ws_publish() + device.update_component_timestamps(parsed_ts, sensor_ids) end def kairos_publish(reading_data) @@ -41,11 +48,16 @@ def kairos_publish(reading_data) Redis.current.publish('telnet_queue', reading_data.to_json) end - def ws_publish() - return if Rails.env.test? or @device.blank? + def ws_publish(device) + return if Rails.env.test? or device.blank? begin - Redis.current.publish("data-received", ActionController::Base.new.view_context.render( partial: "v0/devices/device", locals: {device: @device, current_user: nil})) + Redis.current.publish("data-received", ActionController::Base.new.view_context.render( partial: "v0/devices/device", locals: {device: device, current_user: nil})) rescue end end + + private + + attr_reader :mqtt_client + end diff --git a/app/models/user.rb b/app/models/user.rb index fd99874b..9862d8b0 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -144,6 +144,14 @@ def self.check_bad_avatar_urls end end + def forward_device_readings? + !!forwarding_token + end + + def regenerate_forwarding_token! + self.forwarding_token = SecureRandom.urlsafe_base64(12) if self.is_admin_or_researcher? + end + private def check_if_users_have_valid_email diff --git a/db/migrate/20240423162838_add_forwarding_token_to_users.rb b/db/migrate/20240423162838_add_forwarding_token_to_users.rb new file mode 100644 index 00000000..8492f988 --- /dev/null +++ b/db/migrate/20240423162838_add_forwarding_token_to_users.rb @@ -0,0 +1,5 @@ +class AddForwardingTokenToUsers < ActiveRecord::Migration[6.1] + def change + add_column :users, :forwarding_token, :string + end +end diff --git a/db/schema.rb b/db/schema.rb index 2c934353..544e912e 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_03_18_171656) do +ActiveRecord::Schema.define(version: 2024_04_23_162838) do # These are extensions that must be enabled in order to support this database enable_extension "adminpack" @@ -296,6 +296,7 @@ t.jsonb "old_data" t.integer "cached_device_ids", array: true t.string "workflow_state" + t.string "forwarding_token" t.index ["legacy_api_key"], name: "index_users_on_legacy_api_key", unique: true t.index ["workflow_state"], name: "index_users_on_workflow_state" end diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 0099587c..526583d0 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -20,7 +20,7 @@ namespace :mqtt do mqtt_log.info("ssl: #{mqtt_ssl}") begin - MQTT::Client.connect( + MQTTClientFactory.create_client( :host => mqtt_host, :port => mqtt_port, :clean_session => mqtt_clean_session, @@ -31,6 +31,8 @@ namespace :mqtt do mqtt_log.info "Connected to #{client.host}" mqtt_log.info "Using clean_session setting: #{client.clean_session}" + message_handler = MqttMessagesHandler.new(client) + client.subscribe(*mqtt_topics.flat_map { |topic| topic = topic == "" ? topic : topic + "/" [ @@ -46,7 +48,7 @@ namespace :mqtt do Sentry.with_scope do begin time = Benchmark.measure do - MqttMessagesHandler.handle_topic(topic, message) + message_handler.handle_topic(topic, message) end mqtt_log.info "Processed MQTT message in #{time}" mqtt_log.info "MQTT queue length: #{client.queue_length}" diff --git a/spec/jobs/retry_mqtt_message_job_spec.rb b/spec/jobs/retry_mqtt_message_job_spec.rb index 2746c164..bf269f12 100644 --- a/spec/jobs/retry_mqtt_message_job_spec.rb +++ b/spec/jobs/retry_mqtt_message_job_spec.rb @@ -1,19 +1,64 @@ require 'rails_helper' RSpec.describe RetryMQTTMessageJob, type: :job do + + let(:mqtt_client) { + double(:mqtt_client).tap do |mqtt_client| + allow(mqtt_client).to receive(:disconnect) + end + } + + let(:handle_result) { true } + + let(:mqtt_message_handler) { + double(:mqtt_message_handler).tap do |mqtt_message_handler| + allow(mqtt_message_handler).to receive(:handle_topic).and_return(handle_result) + end + } + + let(:topic) { "topic/1/2/3" } + + let(:message) { '{"foo": "bar", "test": "message"}' } + + before do + allow(MQTTClientFactory).to receive(:create_client).and_return(mqtt_client) + allow(MqttMessagesHandler).to receive(:new).and_return(mqtt_message_handler) + end + + it "creates an MQTT client, overriding clean_session to true and the client_id to nil" do + RetryMQTTMessageJob.perform_now(topic, message) + expect(MQTTClientFactory).to have_received(:create_client).with({clean_session: true, client_id: nil }) + end + + it "creates an MQTTMessagesHandler, passing the client" do + RetryMQTTMessageJob.perform_now(topic, message) + expect(MqttMessagesHandler).to have_received(:new).with(mqtt_client) + end + it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do - topic = "topic/1/2/3" - message = '{"foo": "bar", "test": "message"}' - expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(true) RetryMQTTMessageJob.perform_now(topic, message) + expect(mqtt_message_handler).to have_received(:handle_topic).with(topic, message, false) end - it "raises an error if the handler returns nil" do - topic = "topic/1/2/3" - message = '{"foo": "bar", "test": "message"}' - expect(MqttMessagesHandler).to receive(:handle_topic).with(topic, message, false).and_return(nil) - expect { - RetryMQTTMessageJob.perform_now(topic, message) - }.to raise_error + it "disconnects the cliient when done" do + RetryMQTTMessageJob.perform_now(topic, message) + expect(mqtt_client).to have_received(:disconnect) + end + + context "when the handler returns nil" do + let(:handle_result) { nil } + + it "raises an error" do + expect { + RetryMQTTMessageJob.perform_now(topic, message) + }.to raise_error + end + + it "closes the mqtt connection" do + expect { + RetryMQTTMessageJob.perform_now(topic, message) + }.to raise_error + expect(mqtt_client).to have_received(:disconnect) + end end end diff --git a/spec/lib/mqtt_forwarder_spec.rb b/spec/lib/mqtt_forwarder_spec.rb new file mode 100644 index 00000000..e69de29b diff --git a/spec/lib/mqtt_messages_handler_spec.rb b/spec/lib/mqtt_messages_handler_spec.rb index 5e2fb7ea..0776835b 100644 --- a/spec/lib/mqtt_messages_handler_spec.rb +++ b/spec/lib/mqtt_messages_handler_spec.rb @@ -7,6 +7,18 @@ let(:device_inventory) { create(:device_inventory, report: '{"random_property": "random_result"}') } + + let(:mqtt_client) { + double(:mqtt_client).tap do |mqtt_client| + allow(mqtt_client).to receive(:publish) + end + } + + subject(:message_handler) { + MqttMessagesHandler.new(mqtt_client) + } + + before do device.components << component create(:sensor, id: 13, default_key: "key13") @@ -48,13 +60,13 @@ describe '#device_token' do it 'returns device_token from topic' do - expect(MqttMessagesHandler.device_token(@packet.topic)).to eq(device.device_token) + expect(message_handler.device_token(@packet.topic)).to eq(device.device_token) end end describe '#data' do it 'returns parsed data from payload' do - expect(MqttMessagesHandler.data(@packet.payload)).to match_array(@data) + expect(message_handler.data(@packet.payload)).to match_array(@data) end end @@ -90,7 +102,7 @@ } }].to_json ) - MqttMessagesHandler.handle_topic(@packet.topic, @packet.payload) + message_handler.handle_topic(@packet.topic, @packet.payload) end it 'handshakes the device if an orphan device exists' do @@ -102,7 +114,7 @@ onboarding_session: orphan_device.onboarding_session }.to_json ) - MqttMessagesHandler.handle_topic(@packet.topic, @packet.payload) + message_handler.handle_topic(@packet.topic, @packet.payload) expect(orphan_device.reload.device_handshake).to be true end @@ -118,7 +130,7 @@ } }].to_json ) - MqttMessagesHandler.handle_topic(@packet.topic, @hardware_info_packet.payload) + message_handler.handle_topic(@packet.topic, @hardware_info_packet.payload) end end @@ -126,7 +138,7 @@ it 'it notifies Sentry' do allow(Sentry).to receive(:capture_exception) expect(Kairos).not_to receive(:http_post_to) - MqttMessagesHandler.handle_topic(@invalid_packet.topic, @invalid_packet.payload) + message_handler.handle_topic(@invalid_packet.topic, @invalid_packet.payload) #expect(Sentry).to have_received(:capture_exception).with(RuntimeError) end end @@ -159,7 +171,7 @@ }].to_json ) - MqttMessagesHandler.handle_topic("device/sck/#{device.device_token}/readings/raw", the_data) + message_handler.handle_topic("device/sck/#{device.device_token}/readings/raw", the_data) # TODO: we should expect that a new Storer object should contain the correct, processed readings #expect(Storer).to receive(:new) @@ -174,7 +186,7 @@ onboarding_session: orphan_device.onboarding_session }.to_json ) - MqttMessagesHandler.handle_topic("device/sck/#{device.device_token}/readings/raw", the_data) + message_handler.handle_topic("device/sck/#{device.device_token}/readings/raw", the_data) expect(orphan_device.reload.device_handshake).to be true end end @@ -187,7 +199,7 @@ onboarding_session: orphan_device.onboarding_session }.to_json ) - MqttMessagesHandler.handle_topic( + message_handler.handle_topic( "device/sck/#{orphan_device.device_token}/hello", 'content ignored by MqttMessagesHandler\#hello' ) @@ -201,21 +213,21 @@ # This creates a new device_inventory item expect(@inventory_packet.payload).to eq((device_inventory.report.to_json)) expect(DeviceInventory.count).to eq(1) - MqttMessagesHandler.handle_topic(@inventory_packet.topic, @inventory_packet.payload) + message_handler.handle_topic(@inventory_packet.topic, @inventory_packet.payload) expect(DeviceInventory.last.report["random_property"]).to eq('random_result') expect(DeviceInventory.count).to eq(2) end it 'does not log inventory with an incorrect / nil topic' do expect(DeviceInventory.count).to eq(0) - MqttMessagesHandler.handle_topic('invenxxx','{"random_property":"random_result2"}') - MqttMessagesHandler.handle_topic(nil,'{"random_property":"random_result2"}') + message_handler.handle_topic('invenxxx','{"random_property":"random_result2"}') + message_handler.handle_topic(nil,'{"random_property":"random_result2"}') expect(DeviceInventory.count).to eq(0) end it 'does not handshake any device' do expect(Redis.current).not_to receive(:publish) - MqttMessagesHandler.handle_topic( + message_handler.handle_topic( @inventory_packet.topic, @inventory_packet.payload ) end @@ -224,7 +236,7 @@ describe '#hardware_info' do it 'hardware info has been received and id changed from 47 -> 48' do expect(device.hardware_info["id"]).to eq(47) - MqttMessagesHandler.handle_topic(@hardware_info_packet.topic, @hardware_info_packet.payload) + message_handler.handle_topic(@hardware_info_packet.topic, @hardware_info_packet.payload) device.reload expect(device.hardware_info["id"]).to eq(48) expect(@hardware_info_packet.payload).to eq((device.hardware_info.to_json)) @@ -239,14 +251,14 @@ onboarding_session: orphan_device.onboarding_session }.to_json ) - MqttMessagesHandler.handle_topic(@hardware_info_packet.topic, @hardware_info_packet.payload) + message_handler.handle_topic(@hardware_info_packet.topic, @hardware_info_packet.payload) expect(orphan_device.reload.device_handshake).to be true end it 'defers messages with unknown device tokens if retry flag is true' do expect(device.hardware_info["id"]).to eq(47) expect(RetryMQTTMessageJob).to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) - MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) + message_handler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) device.reload expect(device.hardware_info["id"]).to eq(47) expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) @@ -255,7 +267,7 @@ it 'does not defer messages with unknown device tokens if retry flag is false' do expect(device.hardware_info["id"]).to eq(47) expect(RetryMQTTMessageJob).not_to receive(:perform_later).with(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload) - MqttMessagesHandler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload, false) + message_handler.handle_topic(@hardware_info_packet_bad.topic, @hardware_info_packet_bad.payload, false) device.reload expect(device.hardware_info["id"]).to eq(47) expect(@hardware_info_packet_bad.payload).to_not eq((device.hardware_info.to_json)) diff --git a/spec/models/device_spec.rb b/spec/models/device_spec.rb index eb79e12c..7b233fc6 100644 --- a/spec/models/device_spec.rb +++ b/spec/models/device_spec.rb @@ -522,6 +522,24 @@ end end + describe "forwarding" do + describe "#forward_readings?" do + it "delegates to the forward_device_readings? method on the device owner" do + forward_readings = double(:forward_readings) + expect(device.owner).to receive(:forward_device_readings?).and_return(forward_readings) + expect(device.forward_readings?).to eq(forward_readings) + end + end + + describe "#forwarding_token?" do + it "delegates to the device owner" do + forwarding_token = double(:forwarding_token) + expect(device.owner).to receive(:forwarding_token).and_return(forwarding_token) + expect(device.forwarding_token).to eq(forwarding_token) + end + end + end + describe "#find_or_create_component_by_sensor_id" do context "when the sensor exists and a component already exists for this device" do it "returns the existing component" do diff --git a/spec/models/raw_storer_spec.rb b/spec/models/raw_storer_spec.rb index 5a3debb3..a11cbcf5 100644 --- a/spec/models/raw_storer_spec.rb +++ b/spec/models/raw_storer_spec.rb @@ -27,6 +27,16 @@ def to_ts(time) Component.create!(id: 21, device: device, sensor: Sensor.find(21)) end + let(:mqtt_client) { + double(:mqtt_client).tap do |mqtt_client| + allow(mqtt_client).to receive(:publish) + end + } + + subject(:storer) { + RawStorer.new(mqtt_client) + } + let(:json) { { "co": "118439", "bat": "1000", "hum": "21592", "no2": "260941", "nets": "17", "temp": "25768", "light": "509", "noise": "0", "panel": "0", "timestamp": to_ts(1.day.ago) } } @@ -37,7 +47,13 @@ def to_ts(time) it "will not be created with invalid past timestamp" do ts = { timestamp: to_ts(5.years.ago) } - raw_storer = RawStorer.new(json.merge(ts), device.mac_address, "1.1-0.9.0-A", "127.0.0.1") + includes_proxy = double({ where: double({last: device.reload})}) + allow(Device).to receive(:includes).and_return(includes_proxy) + expect(device).not_to receive(:update_component_timestamps) + expect(Redis.current).not_to receive(:publish) + expect { + storer.store(json.merge(ts), device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) + }.to raise_error end it "updates component last_reading_at" do @@ -49,17 +65,24 @@ def to_ts(time) [16, 17, 13, 15, 21, 12, 14, 7, 18] ) - raw_storer = RawStorer.new(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) + storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) end it "will not be created with invalid future timestamp" do ts = { timestamp: to_ts(2.days.from_now) } - raw_storer = RawStorer.new(json.merge(ts), device.mac_address, "1.1-0.9.0-A", "127.0.0.1") + includes_proxy = double({ where: double({last: device.reload})}) + allow(Device).to receive(:includes).and_return(includes_proxy) + expect(device).not_to receive(:update_component_timestamps) + expect(Redis.current).not_to receive(:publish) + storer.store(json.merge(ts), device.mac_address, "1.1-0.9.0-A", "127.0.0.1") end it "will not be created with invalid data" do + includes_proxy = double({ where: double({last: device.reload})}) + allow(Device).to receive(:includes).and_return(includes_proxy) + expect(device).not_to receive(:update_component_timestamps) expect(Redis.current).not_to receive(:publish) - raw_storer = RawStorer.new({}, device.mac_address, "1.1-0.9.0-A", "127.0.0.1") + storer.store({}, device.mac_address, "1.1-0.9.0-A", "127.0.0.1") end it "should return a correct sensor id number" do @@ -69,6 +92,28 @@ def to_ts(time) it "will be created with valid data" do expect(Redis.current).to receive(:publish) - raw_storer = RawStorer.new(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) + storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) + end + + context "when the device allows forwarding" do + # TODO Tim Refactor this now you're passing in the MQTT client + it "forwards the message with the forwarding token and the device's id" do + forwarding_token = double(:forwarding_token) + allow_any_instance_of(Device).to receive(:forwarding_token).and_return(forwarding_token) + allow_any_instance_of(Device).to receive(:forward_readings?).and_return(true) + + forwarder = double(:mqtt_forwarder) + allow(MQTTForwarder).to receive(:new).and_return(forwarder) + expect(forwarder).to receive(:forward_reading).with(forwarding_token, device.id, json) + storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) + end + end + + context "when the device does not have allow forwarding" do + it "does not forward the message" do + allow_any_instance_of(Device).to receive(:forward_readings?).and_return(false) + expect_any_instance_of(MQTTForwarder).not_to receive(:forward_reading) + storer.store(json, device.mac_address, "1.1-0.9.0-A", "127.0.0.1", true) + end end end diff --git a/spec/models/storer_spec.rb b/spec/models/storer_spec.rb index db3a784d..b0acd793 100644 --- a/spec/models/storer_spec.rb +++ b/spec/models/storer_spec.rb @@ -6,6 +6,16 @@ let(:component){ create(:component, id: 12, device: device, sensor: sensor) } + let(:mqtt_client) { + double(:mqtt_client).tap do |mqtt_client| + allow(mqtt_client).to receive(:publish) + end + } + + subject(:storer) { + Storer.new(mqtt_client) + } + context 'when receiving good data' do before do allow(Rails.env).to receive(:production?).and_return(true) @@ -42,7 +52,7 @@ # expect(Kairos).to receive(:http_post_to).with("/datapoints", @karios_data) # expect_any_instance_of(Storer).to receive(:ws_publish) expect do - Storer.new(device, @data) + storer.store(device, @data) end.not_to raise_error end @@ -51,13 +61,15 @@ Time.parse(@data['recorded_at']), [sensor.id] ) - Storer.new(device, @data) + storer.store(device, @data) end skip 'updates device without touching updated_at' do updated_at = device.updated_at - Storer.new(device, @data) + expect(storer).to receive(:ws_publish) + + storer.store(device, @data) expect(device.reload.updated_at).to eq(updated_at) @@ -65,7 +77,27 @@ expect(device.reload.last_reading_at).not_to eq(nil) expect(device.reload.state).to eq('has_published') - expect(Storer).to receive(:ws_publish) + end + + context "when the device allows forwarding" do + # TODO tim refactor this now you're injecting the MQTT client + it "forwards the message with the forwarding token and the device's id" do + forwarding_token = double(:forwarding_token) + allow(device).to receive(:forwarding_token).and_return(forwarding_token) + allow(device).to receive(:forward_readings?).and_return(true) + forwarder = double(:mqtt_forwarder) + allow(MQTTForwarder).to receive(:new).and_return(forwarder) + expect(forwarder).to receive(:forward_reading).with(forwarding_token, device.id, @data) + storer.store(device, @data) + end + end + + context "when the device does not allow forwarding" do + it "does not forward the message" do + allow(device).to receive(:forward_readings?).and_return(false) + expect_any_instance_of(MQTTForwarder).not_to receive(:forward_reading) + storer.store(device, @data) + end end end @@ -81,11 +113,11 @@ it 'does raise error' do expect(Kairos).not_to receive(:http_post_to).with("/datapoints", anything) - expect{ Storer.new(device, @bad_data) }.to raise_error(ArgumentError) + expect{ storer.store(device, @bad_data) }.to raise_error(ArgumentError) end it 'does not update device' do - expect{ Storer.new(device, @bad_data) }.to raise_error(ArgumentError) + expect{ storer.store(device, @bad_data) }.to raise_error(ArgumentError) expect(device.reload.last_reading_at).to eq(nil) expect(device.reload.data).to eq(nil) diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index fafe6ae7..a415062d 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -71,6 +71,49 @@ expect(last_email.to).to eq([user.email]) end + describe "forwarding" do + describe "generating a forwarding token" do + context "when the user is a citizen" do + it "does not generate a forwarding token" do + user.role_mask = 0 + user.regenerate_forwarding_token! + expect(user.forwarding_token).to be(nil) + end + end + context "when the user is a researcher" do + it "generates a forwarding token" do + user.role_mask = 2 + user.regenerate_forwarding_token! + expect(user.forwarding_token).not_to be(nil) + end + end + context "when the user is an admin" do + it "generates a forwarding token" do + user.role_mask = 5 + user.regenerate_forwarding_token! + expect(user.forwarding_token).not_to be(nil) + end + end + end + + describe "forwarding device readings" do + context "when the user has a forwarding token" do + it "forwards device readings" do + user.forwarding_token = double(:forwarding_token) + expect(user.forward_device_readings?).to be(true) + end + end + + context "when the user has no forwarding token" do + it "does not forward device readings" do + user.forwarding_token = nil + expect(user.forward_device_readings?).to be(false) + end + end + end + end + + describe "authenticate_with_legacy_support" do let(:user) { build_stubbed(:user, password: 'password') }