Skip to content

Commit

Permalink
WIP data forwarding
Browse files Browse the repository at this point in the history
  • Loading branch information
timcowlishaw committed Apr 24, 2024
1 parent 736ad61 commit 5224590
Show file tree
Hide file tree
Showing 21 changed files with 438 additions and 85 deletions.
15 changes: 8 additions & 7 deletions app/controllers/v0/readings_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ def create
def legacy_create

if request.headers['X-SmartCitizenData']
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|

mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)

RawStorer.new(raw_reading,mac,version,ip)
MQTTClientFactory.create_client({clean_session: true, client_id: nil}) do |mqtt_client|
storer = RawStorer.new(mqtt_client)
JSON.parse(request.headers['X-SmartCitizenData']).each do |raw_reading|
mac = request.headers['X-SmartCitizenMacADDR']
version = request.headers['X-SmartCitizenVersion']
ip = (request.headers['X-SmartCitizenIP'] || request.remote_ip)
storer.store(raw_reading,mac,version,ip)
end
end
end

Expand Down
23 changes: 20 additions & 3 deletions app/jobs/retry_mqtt_message_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,26 @@ class RetryMQTTMessageJob < ApplicationJob
queue_as :default

def perform(topic, message)
result = MqttMessagesHandler.handle_topic(topic, message, false)
raise "Message handler returned nil, retrying" if result.nil?
begin
result = handler.handle_topic(topic, message, false)
raise "Message handler returned nil, retrying" if result.nil?
ensure
disconnect_mqtt
end
end

private

def handler
@handler ||= MqttMessagesHandler.new(mqtt_client)
end
end

def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
@mqtt_client&.disconnect
end
end

27 changes: 20 additions & 7 deletions app/jobs/send_to_datastore_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,28 @@ class SendToDatastoreJob < ApplicationJob
queue_as :default

def perform(data_param, device_id)
begin
@device = Device.includes(:components).find(device_id)
the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
storer.store(@device, reading, do_update)
end
ensure
disconnect_mqtt
end
end

@device = Device.includes(:components).find(device_id)
def storer
@storer ||= Storer.new(mqtt_client)
end

the_data = JSON.parse(data_param)
the_data.sort_by {|a| a['recorded_at']}.reverse.each_with_index do |reading, index|
# move to async method call
do_update = index == 0
Storer.new(@device, reading, do_update)
end
def mqtt_client
@mqtt_client ||= MQTTClientFactory.create_client({clean_session: true, client_id: nil })
end

def disconnect_mqtt
mqtt_client.disconnect if @mqtt_client
end
end
33 changes: 33 additions & 0 deletions app/lib/mqtt_client_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module MQTTClientFactory
def self.create_client(args={}, &block)
host = args.fetch(:host, default_host)
port = args.fetch(:port, default_port)
clean_sesion = args.fetch(:clean_session, default_clean_session)
client_id = args.fetch(:client_id, default_client_id)
ssl = args.fetch(:ssl, default_ssl)
MQTT::Client.connect(
{ host: host, port: port, clean_session: clean_sesion, client_id: client_id, ssl: ssl},
&block
)
end

def self.default_host
ENV.fetch('MQTT_HOST', 'mqtt')
end

def self.default_port
ENV.fetch('MQTT_PORT', "1883").to_i
end

def self.default_clean_session
ENV.fetch('MQTT_CLEAN_SESSION', "true") == "true"
end

def self.default_client_id
ENV.fetch('MQTT_CLIENT_ID', nil)
end

def self.default_ssl
ENV.fetch('MQTT_SSL', "false") == "true"
end
end
20 changes: 20 additions & 0 deletions app/lib/mqtt_forwarder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
class MQTTForwarder
def initialize(client, prefix="forward/devices", suffix="readings")
@client = client
@prefix = prefix
@suffix = suffix
end

def forward_reading(token, device_id, reading)
topic = topic_path(token, device_id)
client.publish(topic, reading)
end

private

def topic_path(token, device_id)
[prefix, token, device_id, suffix].join("/")
end

attr_reader :client, :prefix, :suffix
end
33 changes: 24 additions & 9 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
class MqttMessagesHandler
def self.handle_topic(topic, message, retry_on_nil_device=true)

def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def handle_topic(topic, message, retry_on_nil_device=true)
Sentry.set_tags('mqtt-topic': topic)

crumb = Sentry::Breadcrumb.new(
Expand Down Expand Up @@ -45,23 +50,23 @@ def self.handle_topic(topic, message, retry_on_nil_device=true)
end
end

def self.handle_nil_device(topic, message, retry_on_nil_device)
def handle_nil_device(topic, message, retry_on_nil_device)
if topic.to_s.include?("info")
retry_later(topic, message) if retry_on_nil_device
end
end

def self.retry_later(topic, message)
def retry_later(topic, message)
RetryMQTTMessageJob.perform_later(topic, message)
end

# takes a packet and stores data
def self.handle_readings(device, message)
def handle_readings(device, message)
data = self.data(message)
return if data.nil? or data&.empty?

data.each do |reading|
Storer.new(device, reading)
storer.store(device, reading)
end
rescue Exception => e
Sentry.capture_exception(e)
Expand All @@ -71,7 +76,7 @@ def self.handle_readings(device, message)
end

# takes a raw packet and converts into JSON
def self.parse_raw_readings(message, device_id=nil)
def parse_raw_readings(message, device_id=nil)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.parse_raw_readings",
message: "Parsing raw readings",
Expand Down Expand Up @@ -99,7 +104,7 @@ def self.parse_raw_readings(message, device_id=nil)
JSON[reading]
end

def self.handshake_device(topic)
def handshake_device(topic)
orphan_device = OrphanDevice.find_by(device_token: device_token(topic))
return if orphan_device.nil?
orphan_device.update!(device_handshake: true)
Expand All @@ -109,12 +114,12 @@ def self.handshake_device(topic)
end

# takes a packet and returns 'device token' from topic
def self.device_token(topic)
def device_token(topic)
topic[/device\/sck\/(.*?)\//m, 1].to_s
end

# takes a packet and returns 'data' from payload
def self.data(message)
def data(message)
# TODO: what if message is empty?
if message
begin
Expand All @@ -126,4 +131,14 @@ def self.data(message)
raise "No data(message)"
end
end


private

attr_reader :mqtt_client


def storer
@storer ||= Storer.new(mqtt_client)
end
end
15 changes: 15 additions & 0 deletions app/models/concerns/message_forwarding.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module MessageForwarding

extend ActiveSupport::Concern

def forward_reading(device, reading)
forwarder = MQTTForwarder.new(mqtt_client)
forwarder.forward_reading(device.forwarding_token, device.id, reading) if device.forward_readings?
end

private

def mqtt_client
raise NotImplementedError
end
end
8 changes: 8 additions & 0 deletions app/models/device.rb
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ def hardware_slug
hardware_slug_override || [hardware_type.downcase, hardware_version&.gsub(".", ",")].compact.join(":")
end

def forward_readings?
owner.forward_device_readings?
end

def forwarding_token
owner.forwarding_token
end

private

def set_state
Expand Down
12 changes: 10 additions & 2 deletions app/models/raw_storer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
# ingest raw data posted by Devices into Kairos and Postgres (backup purposes).

class RawStorer
include MessageForwarding

def initialize data, mac, version, ip, raise_errors=false
def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def store data, mac, version, ip, raise_errors=false
success = true

begin
Expand Down Expand Up @@ -62,6 +66,7 @@ def initialize data, mac, version, ip, raise_errors=false
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
end

forward_reading(device, data)
rescue Exception => e

success = false
Expand All @@ -74,7 +79,10 @@ def initialize data, mac, version, ip, raise_errors=false
rescue
end
end

end

private

attr_reader :mqtt_client

end
44 changes: 28 additions & 16 deletions app/models/storer.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
class Storer
include DataParser::Storer
include MessageForwarding

def initialize device, reading, do_update = true
@device = device
begin
parsed_reading = Storer.parse_reading(@device, reading)
def initialize(mqtt_client)
@mqtt_client = mqtt_client
end

def store device, reading, do_update = true
begin
parsed_reading = Storer.parse_reading(device, reading)
kairos_publish(parsed_reading[:_data])

update_device(parsed_reading[:parsed_ts], parsed_reading[:sql_data]) if do_update
if do_update
update_device(device, parsed_reading[:parsed_ts], parsed_reading[:sql_data])
ws_publish(device)
end

forward_reading(device, reading)

rescue Exception => e
Sentry.capture_exception(e)
Expand All @@ -18,21 +26,20 @@ def initialize device, reading, do_update = true
raise e unless e.nil?
end

def update_device(parsed_ts, sql_data)
def update_device(device, parsed_ts, sql_data)
return if parsed_ts <= Time.at(0)

if @device.last_reading_at.present?
# Comparison errors if @device.last_reading_at is nil (new devices).
if device.last_reading_at.present?
# Comparison errors if device.last_reading_at is nil (new devices).
# Devices can post multiple readings, in a non-sorted order.
# Do not update data with an older timestamp.
return if parsed_ts < @device.last_reading_at
return if parsed_ts < device.last_reading_at
end

sql_data = @device.data.present? ? @device.data.merge(sql_data) : sql_data
@device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data
device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published')
sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq
@device.update_component_timestamps(parsed_ts, sensor_ids)
ws_publish()
device.update_component_timestamps(parsed_ts, sensor_ids)
end

def kairos_publish(reading_data)
Expand All @@ -41,11 +48,16 @@ def kairos_publish(reading_data)
Redis.current.publish('telnet_queue', reading_data.to_json)
end

def ws_publish()
return if Rails.env.test? or @device.blank?
def ws_publish(device)
return if Rails.env.test? or device.blank?
begin
Redis.current.publish("data-received", ActionController::Base.new.view_context.render( partial: "v0/devices/device", locals: {device: @device, current_user: nil}))
Redis.current.publish("data-received", ActionController::Base.new.view_context.render( partial: "v0/devices/device", locals: {device: device, current_user: nil}))
rescue
end
end

private

attr_reader :mqtt_client

end
8 changes: 8 additions & 0 deletions app/models/user.rb
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ def self.check_bad_avatar_urls
end
end

def forward_device_readings?
!!forwarding_token
end

def regenerate_forwarding_token!
self.forwarding_token = SecureRandom.urlsafe_base64(12) if self.is_admin_or_researcher?
end

private

def check_if_users_have_valid_email
Expand Down
5 changes: 5 additions & 0 deletions db/migrate/20240423162838_add_forwarding_token_to_users.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class AddForwardingTokenToUsers < ActiveRecord::Migration[6.1]
def change
add_column :users, :forwarding_token, :string
end
end
Loading

0 comments on commit 5224590

Please sign in to comment.