Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forwarding of device data #318

Merged
merged 7 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions app/controllers/v0/forwarding_controller.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module V0
class ForwardingController < ApplicationController
after_action :verify_authorized, except: :authorize
def authorize
topic = params[:topic]
username = params[:username]
token = topic && get_forwarding_token(topic)
authorized = token && username && User.forwarding_subscription_authorized?(token, username)
render json: { result: authorized ? "allow" : "deny" }
end

private

def get_forwarding_token(topic)
match = topic.match(/forward\/([^\/]+)\//)
match && match[1]
end
end
end

15 changes: 8 additions & 7 deletions app/controllers/v0/readings_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ def create
def legacy_create

if request.headers['X-SmartCitizenData']
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|

mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)

RawStorer.new(raw_reading,mac,version,ip)
MQTTClientFactory.create_client({clean_session: true, client_id: nil}) do |mqtt_client|
storer = RawStorer.new(mqtt_client, self)
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|
mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)
storer.store(raw_reading,mac,version,ip)
end
end
end

Expand Down
23 changes: 20 additions & 3 deletions app/jobs/retry_mqtt_message_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,26 @@ class RetryMessageHandlerError < RuntimeError
end

def perform(topic, message)
result = MqttMessagesHandler.handle_topic(topic, message, false)
raise RetryMessageHandlerError if result.nil?
begin
result = handler.handle_topic(topic, message, false)
raise RetryMessageHandlerError if result.nil?
ensure
disconnect_mqtt
end
end

private

def handler
@handler ||= MqttMessagesHandler.new(mqtt_client)
end
end

def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
@mqtt_client&.disconnect
end
end

27 changes: 20 additions & 7 deletions app/jobs/send_to_datastore_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ class SendToDatastoreJob < ApplicationJob
queue_as :default

def perform(data_param, device_id)
begin
@device = Device.includes(:components).find(device_id)
the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
storer.store(@device, reading, do_update)
end
ensure
disconnect_mqtt
end
end

@device = Device.includes(:components).find(device_id)
def storer
@storer ||= Storer.new(mqtt_client, ActionController::Base.new.view_context)
end

the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
Storer.new(@device, reading, do_update)
end
def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
@mqtt_client&.disconnect
end
end
33 changes: 33 additions & 0 deletions app/lib/mqtt_client_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module MQTTClientFactory
def self.create_client(args={}, &block)
host = args.fetch(:host, default_host)
port = args.fetch(:port, default_port)
clean_sesion = args.fetch(:clean_session, default_clean_session)
client_id = args.fetch(:client_id, default_client_id)
ssl = args.fetch(:ssl, default_ssl)
MQTT::Client.connect(
{ host: host, port: port, clean_session: clean_sesion, client_id: client_id, ssl: ssl},
&block
)
end

def self.default_host
ENV.fetch('MQTT_HOST', 'mqtt')
end

def self.default_port
ENV.fetch('MQTT_PORT', "1883").to_i
end

def self.default_clean_session
ENV.fetch('MQTT_CLEAN_SESSION', "true") == "true"
end

def self.default_client_id
ENV.fetch('MQTT_CLIENT_ID', nil)
end

def self.default_ssl
ENV.fetch('MQTT_SSL', "false") == "true"
end
end
20 changes: 20 additions & 0 deletions app/lib/mqtt_forwarder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class MQTTForwarder
def initialize(client)
@client = client
@prefix = prefix
@suffix = suffix
end

def forward_reading(token, device_id, reading)
topic = topic_path(token, device_id)
client.publish(topic, reading)
end

private

def topic_path(token, device_id)
["/forward", token, "device", device_id, "readings"].join("/")
end

attr_reader :client, :prefix, :suffix
end
35 changes: 25 additions & 10 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
class MqttMessagesHandler
def self.handle_topic(topic, message, retry_on_nil_device=true)

def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def handle_topic(topic, message, retry_on_nil_device=true)
Sentry.set_tags('mqtt-topic': topic)

crumb = Sentry::Breadcrumb.new(
Expand Down Expand Up @@ -47,23 +52,23 @@ def self.handle_topic(topic, message, retry_on_nil_device=true)
return true
end

def self.handle_nil_device(topic, message, retry_on_nil_device)
if !topic.to_s.include?("inventory")
def handle_nil_device(topic, message, retry_on_nil_device)
if !topic.to_s.include?("inventory") && !topic.to_s.include?("bridge")
retry_later(topic, message) if retry_on_nil_device
end
end

def self.retry_later(topic, message)
def retry_later(topic, message)
RetryMQTTMessageJob.perform_later(topic, message)
end

# takes a packet and stores data
def self.handle_readings(device, message)
def handle_readings(device, message)
data = self.data(message)
return if data.nil? or data&.empty?

data.each do |reading|
Storer.new(device, reading)
storer.store(device, reading)
end
rescue Exception => e
Sentry.capture_exception(e)
Expand All @@ -73,7 +78,7 @@ def self.handle_readings(device, message)
end

# takes a raw packet and converts into JSON
def self.parse_raw_readings(message, device_id=nil)
def parse_raw_readings(message, device_id=nil)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.parse_raw_readings",
message: "Parsing raw readings",
Expand Down Expand Up @@ -101,7 +106,7 @@ def self.parse_raw_readings(message, device_id=nil)
JSON[reading]
end

def self.handshake_device(topic)
def handshake_device(topic)
orphan_device = OrphanDevice.find_by(device_token: device_token(topic))
return if orphan_device.nil?
orphan_device.update!(device_handshake: true)
Expand All @@ -111,12 +116,12 @@ def self.handshake_device(topic)
end

# takes a packet and returns 'device token' from topic
def self.device_token(topic)
def device_token(topic)
topic[/device\/sck\/(.*?)\//m, 1].to_s
end

# takes a packet and returns 'data' from payload
def self.data(message)
def data(message)
# TODO: what if message is empty?
if message
begin
Expand All @@ -128,4 +133,14 @@ def self.data(message)
raise "No data(message)"
end
end


private

attr_reader :mqtt_client


def storer
@storer ||= Storer.new(mqtt_client, ActionController::Base.new.view_context)
end
end
31 changes: 31 additions & 0 deletions app/models/concerns/message_forwarding.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module MessageForwarding

extend ActiveSupport::Concern

def forward_reading(device, reading)
forwarder = MQTTForwarder.new(mqtt_client)
payload = payload_for(device, reading)
forwarder.forward_reading(device.forwarding_token, device.id, payload) if device.forward_readings?
end

def payload_for(device, reading)
renderer.render(
partial: "v0/devices/device",
locals: {
device: device.reload,
current_user: nil,
slim_owner: true
}
)
end

private

def mqtt_client
raise NotImplementedError
end

def renderer
raise NotImplementedError
end
end
8 changes: 8 additions & 0 deletions app/models/device.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ def hardware_slug
hardware_slug_override || [hardware_type.downcase, hardware_version&.gsub(".", ",")].compact.join(":")
end

def forward_readings?
owner.forward_device_readings?
end

def forwarding_token
owner.forwarding_token
end

private

def set_state
Expand Down
15 changes: 12 additions & 3 deletions app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
# ingest raw data posted by Devices into Kairos and Postgres (backup purposes).

class RawStorer
include MessageForwarding

def initialize data, mac, version, ip, raise_errors=false
def initialize(mqtt_client, renderer)
@mqtt_client = mqtt_client
@renderer = renderer
end

def store data, mac, version, ip, raise_errors=false
success = true

begin
Expand Down Expand Up @@ -62,6 +67,7 @@ def initialize data, mac, version, ip, raise_errors=false
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
end

forward_reading(device, data)
rescue Exception => e

success = false
Expand All @@ -70,11 +76,14 @@ def initialize data, mac, version, ip, raise_errors=false

if !Rails.env.test? and device
begin
Redis.current.publish("data-received", ActionController::Base.new.view_context.render( partial: "v0/devices/device", locals: {device: @device, current_user: nil}))
Redis.current.publish("data-received", renderer.render( partial: "v0/devices/device", locals: {device: @device, current_user: nil}))
rescue
end
end

end

private

attr_reader :mqtt_client, :renderer

end
Loading
Loading