Skip to content

Commit

Permalink
Merge pull request #362 from fablabbcn/treetop-grammar-for-message-fo…
Browse files Browse the repository at this point in the history
…rmat

New Raw MQTT Message parser
  • Loading branch information
timcowlishaw authored Oct 22, 2024
2 parents a12bcc6 + 23373c3 commit 8ca2cb4
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 107 deletions.
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ gem 'sentry-sidekiq'
gem 'sinatra'
#gem 'skylight'
gem 'stamp'
gem "treetop"
gem 'versionist', github: 'bploetz/versionist'
gem 'webrick'
gem 'workflow'
Expand Down
6 changes: 5 additions & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ GEM
pg_search (2.3.6)
activerecord (>= 5.2)
activesupport (>= 5.2)
polyglot (0.3.5)
premailer (1.21.0)
addressable
css_parser (>= 1.12.0)
Expand Down Expand Up @@ -502,6 +503,8 @@ GEM
tilt (2.0.11)
timecop (0.9.6)
timeout (0.3.2)
treetop (1.6.12)
polyglot (~> 0.3)
tzinfo (2.0.6)
concurrent-ruby (~> 1.0)
unaccent (0.4.0)
Expand Down Expand Up @@ -604,6 +607,7 @@ DEPENDENCIES
sshkit-sudo
stamp
timecop
treetop
vcr
versionist!
webmock
Expand All @@ -616,4 +620,4 @@ RUBY VERSION
ruby 3.0.6p216

BUNDLED WITH
2.5.6
2.5.20
100 changes: 100 additions & 0 deletions app/grammars/raw_message.tt
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
grammar RawMessage
rule message
'{' whitespace message_body whitespace '}' {
def to_hash
{ data: [ message_body.to_hash ] }
end
}
end

rule message_body
(pairs:pair*) {
def to_hash
pairs.elements.inject({ recorded_at: nil, sensors: [] }) { |accum, pair_node|
pair = pair_node.to_value
if pair[:key] == :t
accum.merge(recorded_at: pair[:value])
elsif pair && pair[:value]
new_sensors_list = accum[:sensors] + [{id: pair[:key], value: pair[:value]}]
accum.merge(sensors: new_sensors_list)
else
accum
end
}
end
}
end

rule pair
pair:(timestamp_pair / value_pair) whitespace ","? whitespace {
def to_value
pair.to_value
end
}
end

rule timestamp_pair
't' whitespace ':' timestamp {
def to_value
{ key: :t, value: timestamp.to_value }
end
}
end

rule value_pair
number whitespace ':' whitespace optional_value {
def to_value
{ key: number.to_value, value: optional_value.to_value }
end
}
end

rule optional_value
value / null
end

rule value
float / number
end

rule null
"null" {
def to_value
nil
end
}
end

rule float
float:(number decimal_part) {
def to_value
float.text_value
end
}
end

rule number
number:[\-0-9]+ {
def to_value
number.text_value
end
}
end

rule decimal_part
'.' number
end

rule timestamp
timestamp:([0-9] 4..4 '-' [0-9] 2..2 '-' [0-9] 2..2 'T' [0-9] 2..2 ':' [0-9] 2..2 ':' [0-9] 2..2 'Z') {
def to_value
timestamp.text_value
end
}
end

rule whitespace
' '*
end

end
150 changes: 56 additions & 94 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
@@ -1,106 +1,59 @@
class MqttMessagesHandler

def handle_topic(topic, message, retry_on_nil_device=true)
Sentry.set_tags('mqtt-topic': topic)

crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.handle_topic",
message: "Handling topic #{topic}",
data: { topic: topic, message: message.encode("UTF-8", invalid: :replace, undef: :replace) }
)
Sentry.add_breadcrumb(crumb)

return if topic.nil?

message = message.encode("US-ASCII", invalid: :replace, undef: :replace, replace: "")
log_message_to_sentry(topic, message)
handshake_device(topic)

# The following do NOT need a device
if topic.to_s.include?('inventory')
DeviceInventory.create({ report: (message rescue nil) })
return true
end

device = Device.find_by(device_token: device_token(topic))
if device.nil?
handle_nil_device(topic, message, retry_on_nil_device)
return nil
end

if topic.to_s.include?('raw')
handle_readings(device, parse_raw_readings(message, device.id))
handle_inventory(topic, message)
elsif topic.to_s.include?('raw')
handle_readings(topic, parse_raw_readings(message), retry_on_nil_device)
elsif topic.to_s.include?('readings')
handle_readings(device, message)
handle_readings(topic, message, retry_on_nil_device)
elsif topic.to_s.include?('info')
json_message = JSON.parse(message)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.handle_topic",
message: "Parsing info message",
data: {
topic: topic,
message: message.encode("UTF-8", invalid: :replace, undef: :replace),
json: json_message,
device_id: device.id
}
)
Sentry.add_breadcrumb(crumb)
device.update_column(:hardware_info, json_message)
handle_info(topic, message, retry_on_nil_device)
else
true
end
return true
end

def handle_nil_device(topic, message, retry_on_nil_device)
orphan_device = OrphanDevice.find_by_device_token(device_token(topic))
if topic.to_s.include?("info") && !topic.to_s.include?("bridge") && orphan_device
retry_later(topic, message) if retry_on_nil_device
end
end
private

def retry_later(topic, message)
RetryMQTTMessageJob.perform_later(topic, message)
def handle_inventory(topic, message)
DeviceInventory.create({ report: (message rescue nil) })
return true
end

# takes a packet and stores data
def handle_readings(device, message)
data = self.data(message)
return if data.nil? or data&.empty?
def handle_readings(topic, message, retry_on_nil_device)
device = find_device_for_topic(topic, message, retry_on_nil_device)
return nil if device.nil?

parsed = JSON.parse(message) if message
data = parsed["data"] if parsed
return nil if data.nil? or data&.empty?

data.each do |reading|
storer.store(device, reading)
end

return true
rescue Exception => e
Sentry.capture_exception(e)
raise e if Rails.env.test?
#puts e.inspect
#puts message
end

# takes a raw packet and converts into JSON
def parse_raw_readings(message, device_id=nil)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.parse_raw_readings",
message: "Parsing raw readings",
data: { message: message.encode("UTF-8", invalid: :replace, undef: :replace), device_id: device_id }
)
Sentry.add_breadcrumb(crumb)
clean_tm = message[1..-2].split(",")[0].gsub("t:", "").strip
raw_readings = message[1..-2].split(",")[1..]

reading = { 'data' => ['recorded_at' => clean_tm, 'sensors' => []] }

raw_readings.each do |raw_read|
raw_id = raw_read.split(":")[0].strip
raw_value = raw_read.split(":")[1]&.strip
reading['data'].first['sensors'] << { 'id' => raw_id, 'value' => raw_value }
end

crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.parse_raw_readings",
message: "Readings data constructed",
data: { message: message.encode("UTF-8", invalid: :replace, undef: :replace), reading: reading, device_id: device_id }
)
Sentry.add_breadcrumb(crumb)
def handle_info(topic, message, retry_on_nil_device)
device = find_device_for_topic(topic, message, retry_on_nil_device)
return nil if device.nil?
json_message = JSON.parse(message)
device.update_column(:hardware_info, json_message)
return true
end

JSON[reading]
def parse_raw_readings(message)
JSON[raw_readings_parser.parse(message)]
end

def handshake_device(topic)
Expand All @@ -112,29 +65,38 @@ def handshake_device(topic)
}.to_json)
end

# takes a packet and returns 'device token' from topic
def device_token(topic)
topic[/device\/sck\/(.*?)\//m, 1].to_s
def log_message_to_sentry(topic, message)
Sentry.set_tags('mqtt-topic': topic)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.handle_topic",
message: "Handling topic #{topic}",
data: { topic: topic, message: message }
)
Sentry.add_breadcrumb(crumb)
end

# takes a packet and returns 'data' from payload
def data(message)
# TODO: what if message is empty?
if message
begin
JSON.parse(message)['data']
rescue JSON::ParserError
# Handle error
end
else
raise "No data(message)"
end
def find_device_for_topic(topic, message, retry_on_nil_device)
device = Device.find_by(device_token: device_token(topic))
handle_nil_device(topic, message, retry_on_nil_device) if device.nil?
return device
end

def handle_nil_device(topic, message, retry_on_nil_device)
orphan_device = OrphanDevice.find_by_device_token(device_token(topic))
if topic.to_s.include?("info") && !topic.to_s.include?("bridge") && orphan_device
RetryMQTTMessageJob.perform_later(topic, message) if retry_on_nil_device
end
end

private
def device_token(topic)
device_token = topic[/device\/sck\/(.*?)\//m, 1].to_s
end

def storer
@storer ||= Storer.new
end

def raw_readings_parser
@raw_readings_parser ||= RawMqttMessageParser.new
end
end
19 changes: 19 additions & 0 deletions app/lib/raw_mqtt_message_parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
require_relative "../grammars/raw_message"

class RawMqttMessageParser
def initialize
@parser = RawMessageParser.new
end

def parse(message)
parser.parse(self.convert_to_ascii(message.strip))&.to_hash
end

private

def convert_to_ascii(string)
string.encode("US-ASCII", invalid: :replace, undef: :replace, replace: "")
end

attr_reader :parser
end
12 changes: 0 additions & 12 deletions spec/lib/mqtt_messages_handler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,6 @@
)
end

describe '#device_token' do
it 'returns device_token from topic' do
expect(message_handler.device_token(@packet.topic)).to eq(device.device_token)
end
end

describe '#data' do
it 'returns parsed data from payload' do
expect(message_handler.data(@packet.payload)).to match_array(@data)
end
end

describe '#readings' do
before do
# storer data processing
Expand Down
Loading

0 comments on commit 8ca2cb4

Please sign in to comment.