Skip to content

Commit

Permalink
Merge pull request #338 from fablabbcn/forwarding-in-job
Browse files Browse the repository at this point in the history
Forwarding in job
  • Loading branch information
timcowlishaw authored Jul 16, 2024
2 parents dfc6e2b + 1d20137 commit d68b0a0
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 176 deletions.
14 changes: 6 additions & 8 deletions app/controllers/v0/readings_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,12 @@ def create
def legacy_create

if request.headers['X-SmartCitizenData']
MQTTClientFactory.create_client({clean_session: true, client_id: nil}) do |mqtt_client|
storer = RawStorer.new(mqtt_client)
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|
mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)
storer.store(raw_reading,mac,version,ip)
end
storer = RawStorer.new
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|
mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)
storer.store(raw_reading,mac,version,ip)
end
end

Expand Down
43 changes: 43 additions & 0 deletions app/jobs/mqtt_forwarding_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
class MQTTForwardingJob < ApplicationJob

queue_as :mqtt_forward

def perform(device_id, reading)
begin
device = Device.find(device_id)
forwarder = MQTTForwarder.new(mqtt_client)
payload = payload_for(device, reading)
forwarder.forward_reading(device.forwarding_token, device.id, payload)
ensure
disconnect_mqtt!
end
end

private

def payload_for(device, reading)
renderer.render(
partial: "v0/devices/device",
locals: {
device: device.reload,
current_user: nil,
slim_owner: true
}
)
end

def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({
clean_session: true, client_id: nil
})
end

def renderer
@renderer ||= ActionController::Base.new.view_context
end

def disconnect_mqtt!
@mqtt_client&.disconnect
end
end

18 changes: 3 additions & 15 deletions app/jobs/retry_mqtt_message_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,14 @@ class RetryMessageHandlerError < RuntimeError
end

def perform(topic, message)
begin
result = handler.handle_topic(topic, message, false)
raise RetryMessageHandlerError if result.nil?
ensure
disconnect_mqtt
end
result = handler.handle_topic(topic, message, false)
raise RetryMessageHandlerError if result.nil?
end

private

def handler
@handler ||= MqttMessagesHandler.new(mqtt_client)
end

def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
@mqtt_client&.disconnect
@handler ||= MqttMessagesHandler.new
end
end

26 changes: 7 additions & 19 deletions app/jobs/send_to_datastore_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,16 @@ class SendToDatastoreJob < ApplicationJob
queue_as :default

def perform(data_param, device_id)
begin
@device = Device.includes(:components).find(device_id)
the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
storer.store(@device, reading, do_update)
end
ensure
disconnect_mqtt
@device = Device.includes(:components).find(device_id)
the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
storer.store(@device, reading, do_update)
end
end

def storer
@storer ||= Storer.new(mqtt_client)
end

def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
@mqtt_client&.disconnect
@storer ||= Storer.new
end
end
9 changes: 1 addition & 8 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
class MqttMessagesHandler

def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def handle_topic(topic, message, retry_on_nil_device=true)
Sentry.set_tags('mqtt-topic': topic)

Expand Down Expand Up @@ -138,10 +134,7 @@ def data(message)

private

attr_reader :mqtt_client


def storer
@storer ||= Storer.new(mqtt_client)
@storer ||= Storer.new
end
end
24 changes: 1 addition & 23 deletions app/models/concerns/message_forwarding.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,8 @@ module MessageForwarding

def forward_reading(device, reading)
if device.forward_readings?
forwarder = MQTTForwarder.new(mqtt_client)
payload = payload_for(device, reading)
forwarder.forward_reading(device.forwarding_token, device.id, payload)
MQTTForwardingJob.perform_later(device.id, reading)
end
end

def payload_for(device, reading)
renderer.render(
partial: "v0/devices/device",
locals: {
device: device.reload,
current_user: nil,
slim_owner: true
}
)
end

private

def mqtt_client
raise NotImplementedError
end

def renderer
raise NotImplementedError
end
end
10 changes: 0 additions & 10 deletions app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,6 @@
class RawStorer
include MessageForwarding

def initialize(mqtt_client, renderer=nil)
@mqtt_client = mqtt_client
@renderer = renderer || ActionController::Base.new.view_context
end

def store data, mac, version, ip, raise_errors=false
success = true

Expand Down Expand Up @@ -81,9 +76,4 @@ def store data, mac, version, ip, raise_errors=false
end
end
end

private

attr_reader :mqtt_client, :renderer

end
10 changes: 0 additions & 10 deletions app/models/storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@ class Storer
include DataParser::Storer
include MessageForwarding

def initialize(mqtt_client, renderer=nil)
@mqtt_client = mqtt_client
@renderer = renderer || ActionController::Base.new.view_context
end

def store device, reading, do_update = true
begin
parsed_reading = Storer.parse_reading(device, reading)
Expand Down Expand Up @@ -47,9 +42,4 @@ def kairos_publish(reading_data)
#NOTE: If you want to use the Telnet port below, make sure it is open!
Redis.current.publish('telnet_queue', reading_data.to_json)
end

private

attr_reader :mqtt_client, :renderer

end
1 change: 1 addition & 0 deletions config/sidekiq.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- default
- mailers
- mqtt_retry
- mqtt_forward

production:
:concurrency: 25
Expand Down
2 changes: 1 addition & 1 deletion lib/tasks/mqtt_subscriber.rake
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace :mqtt do
mqtt_log.info "Connected to #{client.host}"
mqtt_log.info "Using clean_session setting: #{client.clean_session}"

message_handler = MqttMessagesHandler.new(client)
message_handler = MqttMessagesHandler.new

client.subscribe(*mqtt_topics.flat_map { |topic|
topic = topic == "" ? topic : topic + "/"
Expand Down
75 changes: 75 additions & 0 deletions spec/jobs/mqtt_forwarding_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
require 'rails_helper'

RSpec.describe MQTTForwardingJob, type: :job do

let(:forwarding_token) { "abc123_forwarding_token" }

let(:device) { create(:device) }

let(:reading) { double(:reading) }

let(:mqtt_client) {
double(:mqtt_client).tap do |mqtt_client|
allow(mqtt_client).to receive(:disconnect)
end
}

let(:device_json) {
double(:device_json)
}

let(:renderer) {
double(:renderer).tap do |renderer|
allow(renderer).to receive(:render).and_return(device_json)
end
}

let(:forwarder) {
double(:forwarder).tap do |forwarder|
allow(forwarder).to receive(:forward_reading)
end
}

before do
allow(MQTTClientFactory).to receive(:create_client).and_return(mqtt_client)
allow_any_instance_of(ActionController::Base).to receive(:view_context).and_return(renderer)
allow(MQTTForwarder).to receive(:new).and_return(forwarder)
allow_any_instance_of(Device).to receive(:forwarding_token).and_return(forwarding_token)
end

it "creates an mqtt client with a clean session and no client id" do
MQTTForwardingJob.perform_now(device.id, reading)
expect(MQTTClientFactory).to have_received(:create_client).with({
clean_session: true,
client_id: nil
})
end

it "creates a forwarder with the mqtt client" do
MQTTForwardingJob.perform_now(device.id, reading)
expect(MQTTForwarder).to have_received(:new).with(mqtt_client)
end

it "renders the device json for the given device, as an unauthorized user" do
MQTTForwardingJob.perform_now(device.id, reading)
expect(renderer).to have_received(:render).with({
partial: "v0/devices/device",
locals: {
device: device.reload,
current_user: nil,
slim_owner: true
}
})
end

it "forwards using the device's id and forwarding token, with the rendered json payload" do
MQTTForwardingJob.perform_now(device.id, reading)
expect(forwarder).to have_received(:forward_reading).with(forwarding_token, device.id, device_json)
end

it "disconnects the MQTT client" do
MQTTForwardingJob.perform_now(device.id, reading)
expect(mqtt_client).to have_received(:disconnect)
end

end
26 changes: 2 additions & 24 deletions spec/jobs/retry_mqtt_message_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
RSpec.describe RetryMQTTMessageJob, type: :job do
include ActiveJob::TestHelper

let(:mqtt_client) {
double(:mqtt_client).tap do |mqtt_client|
allow(mqtt_client).to receive(:disconnect)
end
}

let(:handler_results) { [true] }

let(:mqtt_message_handler) {
Expand All @@ -22,30 +16,19 @@
let(:message) { '{"foo": "bar", "test": "message"}' }

before do
allow(MQTTClientFactory).to receive(:create_client).and_return(mqtt_client)
allow(MqttMessagesHandler).to receive(:new).and_return(mqtt_message_handler)
end

it "creates an MQTT client, overriding clean_session to true and the client_id to nil" do
RetryMQTTMessageJob.perform_now(topic, message)
expect(MQTTClientFactory).to have_received(:create_client).with({clean_session: true, client_id: nil })
end

it "creates an MQTTMessagesHandler, passing the client" do
it "creates an MQTTMessagesHandler" do
RetryMQTTMessageJob.perform_now(topic, message)
expect(MqttMessagesHandler).to have_received(:new).with(mqtt_client)
expect(MqttMessagesHandler).to have_received(:new)
end

it "retries the mqtt ingest with the given topic and message, and with automatic retries disabled" do
RetryMQTTMessageJob.perform_now(topic, message)
expect(mqtt_message_handler).to have_received(:handle_topic).with(topic, message, false)
end

it "disconnects the client when done" do
RetryMQTTMessageJob.perform_now(topic, message)
expect(mqtt_client).to have_received(:disconnect)
end

context "when the handler returns nil" do
let(:handler_results) { [nil, nil, true] }

Expand All @@ -54,10 +37,5 @@
RetryMQTTMessageJob.perform_later(topic, message)
end
end

it "closes the mqtt connection" do
RetryMQTTMessageJob.perform_now(topic, message)
expect(mqtt_client).to have_received(:disconnect)
end
end
end
8 changes: 1 addition & 7 deletions spec/lib/mqtt_messages_handler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,8 @@
let(:device_inventory) { create(:device_inventory, report: '{"random_property": "random_result"}') }


let(:mqtt_client) {
double(:mqtt_client).tap do |mqtt_client|
allow(mqtt_client).to receive(:publish)
end
}

subject(:message_handler) {
MqttMessagesHandler.new(mqtt_client)
MqttMessagesHandler.new
}


Expand Down
Loading

0 comments on commit d68b0a0

Please sign in to comment.