Skip to content

Commit

Permalink
Data forwarding
Browse files Browse the repository at this point in the history
Users can opt in to have their messages forwarded to an MQTT topic that
they subscribe to, identified by a secret forwarding_key
  • Loading branch information
timcowlishaw committed May 7, 2024
1 parent 5364a6e commit cbb7899
Show file tree
Hide file tree
Showing 21 changed files with 440 additions and 87 deletions.
15 changes: 8 additions & 7 deletions app/controllers/v0/readings_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ def create
def legacy_create

if request.headers['X-SmartCitizenData']
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|

mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)

RawStorer.new(raw_reading,mac,version,ip)
MQTTClientFactory.create_client({clean_session: true, client_id: nil}) do |mqtt_client|
storer = RawStorer.new(mqtt_client)
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|
mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)
storer.store(raw_reading,mac,version,ip)
end
end
end

Expand Down
23 changes: 20 additions & 3 deletions app/jobs/retry_mqtt_message_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,26 @@ class RetryMQTTMessageJob < ApplicationJob
end

def perform(topic, message)
result = MqttMessagesHandler.handle_topic(topic, message, false)
raise "Message handler returned nil, retrying" if result.nil?
begin
result = handler.handle_topic(topic, message, false)
raise "Message handler returned nil, retrying" if result.nil?
ensure
disconnect_mqtt
end
end

private

def handler
@handler ||= MqttMessagesHandler.new(mqtt_client)
end
end

def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
@mqtt_client&.disconnect
end
end

27 changes: 20 additions & 7 deletions app/jobs/send_to_datastore_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ class SendToDatastoreJob < ApplicationJob
queue_as :default

def perform(data_param, device_id)
begin
@device = Device.includes(:components).find(device_id)
the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
storer.store(@device, reading, do_update)
end
ensure
disconnect_mqtt
end
end

@device = Device.includes(:components).find(device_id)
def storer
@storer ||= Storer.new(mqtt_client)
end

the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
Storer.new(@device, reading, do_update)
end
def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
@mqtt_client&.disconnect
end
end
33 changes: 33 additions & 0 deletions app/lib/mqtt_client_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module MQTTClientFactory
def self.create_client(args={}, &block)
host = args.fetch(:host, default_host)
port = args.fetch(:port, default_port)
clean_sesion = args.fetch(:clean_session, default_clean_session)
client_id = args.fetch(:client_id, default_client_id)
ssl = args.fetch(:ssl, default_ssl)
MQTT::Client.connect(
{ host: host, port: port, clean_session: clean_sesion, client_id: client_id, ssl: ssl},
&block
)
end

def self.default_host
ENV.fetch('MQTT_HOST', 'mqtt')
end

def self.default_port
ENV.fetch('MQTT_PORT', "1883").to_i
end

def self.default_clean_session
ENV.fetch('MQTT_CLEAN_SESSION', "true") == "true"
end

def self.default_client_id
ENV.fetch('MQTT_CLIENT_ID', nil)
end

def self.default_ssl
ENV.fetch('MQTT_SSL', "false") == "true"
end
end
20 changes: 20 additions & 0 deletions app/lib/mqtt_forwarder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class MQTTForwarder
def initialize(client, prefix="forward/devices", suffix="readings")
@client = client
@prefix = prefix
@suffix = suffix
end

def forward_reading(token, device_id, reading)
topic = topic_path(token, device_id)
client.publish(topic, reading)
end

private

def topic_path(token, device_id)
[prefix, token, device_id, suffix].join("/")
end

attr_reader :client, :prefix, :suffix
end
33 changes: 24 additions & 9 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
class MqttMessagesHandler
def self.handle_topic(topic, message, retry_on_nil_device=true)

def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def handle_topic(topic, message, retry_on_nil_device=true)
Sentry.set_tags('mqtt-topic': topic)

crumb = Sentry::Breadcrumb.new(
Expand Down Expand Up @@ -47,23 +52,23 @@ def self.handle_topic(topic, message, retry_on_nil_device=true)
return true
end

def self.handle_nil_device(topic, message, retry_on_nil_device)
def handle_nil_device(topic, message, retry_on_nil_device)
if !topic.to_s.include?("inventory")
retry_later(topic, message) if retry_on_nil_device
end
end

def self.retry_later(topic, message)
def retry_later(topic, message)
RetryMQTTMessageJob.perform_later(topic, message)
end

# takes a packet and stores data
def self.handle_readings(device, message)
def handle_readings(device, message)
data = self.data(message)
return if data.nil? or data&.empty?

data.each do |reading|
Storer.new(device, reading)
storer.store(device, reading)
end
rescue Exception => e
Sentry.capture_exception(e)
Expand All @@ -73,7 +78,7 @@ def self.handle_readings(device, message)
end

# takes a raw packet and converts into JSON
def self.parse_raw_readings(message, device_id=nil)
def parse_raw_readings(message, device_id=nil)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.parse_raw_readings",
message: "Parsing raw readings",
Expand Down Expand Up @@ -101,7 +106,7 @@ def self.parse_raw_readings(message, device_id=nil)
JSON[reading]
end

def self.handshake_device(topic)
def handshake_device(topic)
orphan_device = OrphanDevice.find_by(device_token: device_token(topic))
return if orphan_device.nil?
orphan_device.update!(device_handshake: true)
Expand All @@ -111,12 +116,12 @@ def self.handshake_device(topic)
end

# takes a packet and returns 'device token' from topic
def self.device_token(topic)
def device_token(topic)
topic[/device\/sck\/(.*?)\//m, 1].to_s
end

# takes a packet and returns 'data' from payload
def self.data(message)
def data(message)
# TODO: what if message is empty?
if message
begin
Expand All @@ -128,4 +133,14 @@ def self.data(message)
raise "No data(message)"
end
end


private

attr_reader :mqtt_client


def storer
@storer ||= Storer.new(mqtt_client)
end
end
15 changes: 15 additions & 0 deletions app/models/concerns/message_forwarding.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module MessageForwarding

extend ActiveSupport::Concern

def forward_reading(device, reading)
forwarder = MQTTForwarder.new(mqtt_client)
forwarder.forward_reading(device.forwarding_token, device.id, reading) if device.forward_readings?
end

private

def mqtt_client
raise NotImplementedError
end
end
8 changes: 8 additions & 0 deletions app/models/device.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ def hardware_slug
hardware_slug_override || [hardware_type.downcase, hardware_version&.gsub(".", ",")].compact.join(":")
end

def forward_readings?
owner.forward_device_readings?
end

def forwarding_token
owner.forwarding_token
end

private

def set_state
Expand Down
12 changes: 10 additions & 2 deletions app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
# ingest raw data posted by Devices into Kairos and Postgres (backup purposes).

class RawStorer
include MessageForwarding

def initialize data, mac, version, ip, raise_errors=false
def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def store data, mac, version, ip, raise_errors=false
success = true

begin
Expand Down Expand Up @@ -62,6 +66,7 @@ def initialize data, mac, version, ip, raise_errors=false
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
end

forward_reading(device, data)
rescue Exception => e

success = false
Expand All @@ -74,7 +79,10 @@ def initialize data, mac, version, ip, raise_errors=false
rescue
end
end

end

private

attr_reader :mqtt_client

end
44 changes: 28 additions & 16 deletions app/models/storer.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
class Storer
include DataParser::Storer
include MessageForwarding

def initialize device, reading, do_update = true
@device = device
begin
parsed_reading = Storer.parse_reading(@device, reading)
def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def store device, reading, do_update = true
begin
parsed_reading = Storer.parse_reading(device, reading)
kairos_publish(parsed_reading[:_data])

update_device(parsed_reading[:parsed_ts], parsed_reading[:sql_data]) if do_update
if do_update
update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data])
ws_publish(device)
end

forward_reading(device, reading)

rescue Exception => e
Sentry.capture_exception(e)
Expand All @@ -18,21 +26,20 @@ def initialize device, reading, do_update = true
raise e unless e.nil?
end

def update_device(parsed_ts, sql_data)
def update_device(device, parsed_ts, sql_data)
return if parsed_ts <= Time.at(0)

if @device.last_reading_at.present?
# Comparison errors if @device.last_reading_at is nil (new devices).
if device.last_reading_at.present?
# Comparison errors if device.last_reading_at is nil (new devices).
# Devices can post multiple readings, in a non-sorted order.
# Do not update data with an older timestamp.
return if parsed_ts < @device.last_reading_at
return if parsed_ts < device.last_reading_at
end

sql_data = @device.data.present? ? @device.data.merge(sql_data) : sql_data
@device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq
@device.update_component_timestamps(parsed_ts, sensor_ids)
ws_publish()
device.update_component_timestamps(parsed_ts, sensor_ids)
end

def kairos_publish(reading_data)
Expand All @@ -41,11 +48,16 @@ def kairos_publish(reading_data)
Redis.current.publish('telnet_queue', reading_data.to_json)
end

def ws_publish()
return if Rails.env.test? or @device.blank?
def ws_publish(device)
return if Rails.env.test? or device.blank?
begin
Redis.current.publish("data-received", ActionController::Base.new.view_context.render( partial: "v0/devices/device", locals: {device: @device, current_user: nil}))
Redis.current.publish("data-received", ActionController::Base.new.view_context.render( partial: "v0/devices/device", locals: {device: device, current_user: nil}))
rescue
end
end

private

attr_reader :mqtt_client

end
8 changes: 8 additions & 0 deletions app/models/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ def self.check_bad_avatar_urls
end
end

def forward_device_readings?
!!forwarding_token
end

def regenerate_forwarding_token!
self.forwarding_token = SecureRandom.urlsafe_base64(12) if self.is_admin_or_researcher?
end

private

def check_if_users_have_valid_email
Expand Down
5 changes: 5 additions & 0 deletions db/migrate/20240423162838_add_forwarding_token_to_users.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddForwardingTokenToUsers < ActiveRecord::Migration[6.1]
def change
add_column :users, :forwarding_token, :string
end
end
Loading

0 comments on commit cbb7899

Please sign in to comment.