From c0f9898500e87ccf58b49d8f7f00eb106ae2b806 Mon Sep 17 00:00:00 2001 From: Oscar Gonzalez Date: Wed, 25 Sep 2024 04:34:04 +0000 Subject: [PATCH 01/11] First attempt at shared MQTT subscription --- compose.override.local.yml | 6 +++++- compose/app.yml | 4 +++- compose/db.yml | 1 + compose/mqtt-task-common.yml | 14 ++++++++++++ compose/mqtt-task.yml | 39 ++++++++++++++++++++++------------ compose/telnet-task.yml | 2 +- env.example | 1 + lib/tasks/mqtt_subscriber.rake | 15 +++++++------ 8 files changed, 59 insertions(+), 23 deletions(-) create mode 100644 compose/mqtt-task-common.yml diff --git a/compose.override.local.yml b/compose.override.local.yml index 7d37b254..696dd628 100644 --- a/compose.override.local.yml +++ b/compose.override.local.yml @@ -7,7 +7,11 @@ services: restart: "no" sidekiq: restart: "no" - mqtt-task: + mqtt-task-main-1: + restart: "no" + mqtt-task-main-2: + restart: "no" + mqtt-task-secondary: restart: "no" telnet-task: restart: "no" diff --git a/compose/app.yml b/compose/app.yml index e4b311b9..0d202996 100644 --- a/compose/app.yml +++ b/compose/app.yml @@ -15,7 +15,9 @@ services: - auth - redis - sidekiq - - mqtt-task + - mqtt-task-main-1 + - mqtt-task-main-2 + - mqtt-task-secondary - telnet-task #- mqtt restart: always diff --git a/compose/db.yml b/compose/db.yml index 1f67828a..f11b4a59 100644 --- a/compose/db.yml +++ b/compose/db.yml @@ -1,6 +1,7 @@ services: db: image: postgres:10 + command: -c max_connections=200 volumes: - sck-postgres:/var/lib/postgresql/data env_file: ../.env diff --git a/compose/mqtt-task-common.yml b/compose/mqtt-task-common.yml new file mode 100644 index 00000000..194254f3 --- /dev/null +++ b/compose/mqtt-task-common.yml @@ -0,0 +1,14 @@ +services: + mqtt-task: + build: ../ + env_file: ../.env + command: bundle exec rake mqtt:sub + restart: always + volumes: + - "../log:/app/log" + logging: + driver: "json-file" + options: + max-size: "100m" + environment: + db_pool_size: 2 diff --git a/compose/mqtt-task.yml b/compose/mqtt-task.yml index e7f94c43..03c982d0 100644 --- a/compose/mqtt-task.yml +++ b/compose/mqtt-task.yml @@ -1,14 +1,27 @@ services: - mqtt-task: - build: ../ - env_file: ../.env - command: bundle exec rake mqtt:sub - restart: always - volumes: - - "../log:/app/log" - logging: - driver: "json-file" - options: - max-size: "100m" - environment: - db_pool_size: 5 + mqtt-task-main-1: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: smartcitizen-api-staging-main-1 + MQTT_CLEAN_SESSION: false + mqtt-task-main-2: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: smartcitizen-api-staging-main-2 + MQTT_CLEAN_SESSION: false + mqtt-task-secondary: + extends: + file: mqtt-task-common.yml + service: mqtt-task + environment: + MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary-${HOSTNAME}" + MQTT_CLEAN_SESSION: true + deploy: + mode: replicated + replicas: 2 + + diff --git a/compose/telnet-task.yml b/compose/telnet-task.yml index caa14fa2..272dc162 100644 --- a/compose/telnet-task.yml +++ b/compose/telnet-task.yml @@ -5,4 +5,4 @@ services: command: bundle exec rake telnet:push restart: always environment: - db_pool_size: 5 + db_pool_size: 2 diff --git a/env.example b/env.example index f018c684..fcdd837d 100644 --- a/env.example +++ b/env.example @@ -18,6 +18,7 @@ REDIS_STORE=redis://redis:6379/3 # MQTT Settings MQTT_HOST=mqtt +#MQTT_SHARED_SUBSCRIPTION_GROUP="group1" #MQTT_CLEAN_SESSION=true #MQTT_CLIENT_ID=some_id #MQTT_PORT=port diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 41c45a15..56529a49 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -3,15 +3,16 @@ namespace :mqtt do task sub: :environment do pid_file = Rails.root.join('tmp/pids/mqtt_subscriber.pid') File.open(pid_file, 'w') { |f| f.puts Process.pid } - mqtt_log = Logger.new('log/mqtt.log', 5, 100.megabytes) mqtt_clean_session = ENV.has_key?('MQTT_CLEAN_SESSION') ? ENV['MQTT_CLEAN_SESSION'] == "true" : true mqtt_client_id = ENV.has_key?('MQTT_CLIENT_ID') ? ENV['MQTT_CLIENT_ID'] : nil mqtt_host = ENV.has_key?('MQTT_HOST') ? ENV['MQTT_HOST'] : 'mqtt' mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false + mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] + mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes) mqtt_log.info('MQTT TASK STARTING') mqtt_log.info("clean_session: #{mqtt_clean_session}") mqtt_log.info("client_id: #{mqtt_client_id}") @@ -32,15 +33,15 @@ namespace :mqtt do mqtt_log.info "Using clean_session setting: #{client.clean_session}" message_handler = MqttMessagesHandler.new - + prefix = mqtt_shared_subscription_group ? "$share/#{mqtt_shared_subscription_group}" : "$queue" client.subscribe(*mqtt_topics.flat_map { |topic| topic = topic == "" ? topic : topic + "/" [ - "$queue/#{topic}device/sck/+/readings" => 2, - "$queue/#{topic}device/sck/+/readings/raw" => 2, - "$queue/#{topic}device/sck/+/hello" => 2, - "$queue/#{topic}device/sck/+/info" => 2, - "$queue/#{topic}device/inventory" => 2 + "#{prefix}/#{topic}device/sck/+/readings" => 2, + "#{prefix}/#{topic}device/sck/+/readings/raw" => 2, + "#{prefix}/#{topic}device/sck/+/hello" => 2, + "#{prefix}/#{topic}device/sck/+/info" => 2, + "#{prefix}/#{topic}device/inventory" => 2 ] }) From 9d17cc795e6b300e3ecbccfd475605a47a395444 Mon Sep 17 00:00:00 2001 From: Oscar Gonzalez Date: Wed, 25 Sep 2024 11:13:36 +0000 Subject: [PATCH 02/11] fix mqtt secondary task cliend id --- compose/mqtt-task-common.yml | 2 +- compose/mqtt-task.yml | 2 +- lib/tasks/mqtt_subscriber.rake | 6 ++++++ mqtt_subscriber.sh | 2 ++ 4 files changed, 10 insertions(+), 2 deletions(-) create mode 100755 mqtt_subscriber.sh diff --git a/compose/mqtt-task-common.yml b/compose/mqtt-task-common.yml index 194254f3..9d59002b 100644 --- a/compose/mqtt-task-common.yml +++ b/compose/mqtt-task-common.yml @@ -2,7 +2,7 @@ services: mqtt-task: build: ../ env_file: ../.env - command: bundle exec rake mqtt:sub + command: ./mqtt_subscriber.sh restart: always volumes: - "../log:/app/log" diff --git a/compose/mqtt-task.yml b/compose/mqtt-task.yml index 03c982d0..ab83109b 100644 --- a/compose/mqtt-task.yml +++ b/compose/mqtt-task.yml @@ -18,7 +18,7 @@ services: file: mqtt-task-common.yml service: mqtt-task environment: - MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary-${HOSTNAME}" + MQTT_CLIENT_ID: "smartcitizen-api-staging-secondary" MQTT_CLEAN_SESSION: true deploy: mode: replicated diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 56529a49..83460187 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -10,8 +10,14 @@ namespace :mqtt do mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) + mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] + + if mqtt_shared_subscription_group && mqtt_clean_session + mqtt_client_id += "-#{ENV.fetch("HOSTNAME")}" + end + mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes) mqtt_log.info('MQTT TASK STARTING') mqtt_log.info("clean_session: #{mqtt_clean_session}") diff --git a/mqtt_subscriber.sh b/mqtt_subscriber.sh new file mode 100755 index 00000000..481d728e --- /dev/null +++ b/mqtt_subscriber.sh @@ -0,0 +1,2 @@ +#!/bin/bash +bundle exec rake mqtt:sub From 8779d53a8c1958ede0e7cac7fdf804c0e2376eb8 Mon Sep 17 00:00:00 2001 From: Oscar Gonzalez Date: Wed, 25 Sep 2024 11:35:02 +0000 Subject: [PATCH 03/11] up number of connections per mqtt task to 5 --- compose/mqtt-task-common.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compose/mqtt-task-common.yml b/compose/mqtt-task-common.yml index 9d59002b..12146f9c 100644 --- a/compose/mqtt-task-common.yml +++ b/compose/mqtt-task-common.yml @@ -11,4 +11,4 @@ services: options: max-size: "100m" environment: - db_pool_size: 2 + db_pool_size: 5 From e50a11f9b1f770877771ff2a2c331ab93b468eee Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Wed, 9 Oct 2024 21:04:45 +0200 Subject: [PATCH 04/11] ensure atomic access to and creation of components in mqtt-task --- app/models/component.rb | 13 +++++++++-- app/models/concerns/data_parser/storer.rb | 2 +- app/models/device.rb | 19 ++++++++------- app/models/raw_storer.rb | 2 +- ...241009174732_unique_index_on_components.rb | 23 +++++++++++++++++++ db/schema.rb | 7 +++--- spec/models/component_spec.rb | 2 +- spec/models/device_spec.rb | 10 ++++---- 8 files changed, 57 insertions(+), 21 deletions(-) create mode 100644 db/migrate/20241009174732_unique_index_on_components.rb diff --git a/app/models/component.rb b/app/models/component.rb index fa4b9ae5..c3b2c5a9 100644 --- a/app/models/component.rb +++ b/app/models/component.rb @@ -5,8 +5,17 @@ class Component < ActiveRecord::Base belongs_to :sensor validates_presence_of :device, :sensor - validates :sensor_id, :uniqueness => { :scope => [:device_id] } - validates :key, :uniqueness => { :scope => [:device_id] } + + # IMPORTANT: Validation of sensor/device uniqueness is done at the database level, + # as this allows us to use the create_or_find_by! method to atomically upsert components + # in the mqtt-task, avoiding component duplication due to race conditions. + # For some reason, create_or_find_by! ONLY works when the database constraint is + # the ONLY uniqueness constraint on those two values, so adding a rails validation here + # causes an error. Leaving the validations here commented out by way of documentation. + # See https://stackoverflow.com/questions/74566974/create-or-find-by-not-working-as-it-should-in-rails-6 + # validates :sensor_id, :uniqueness => { :scope => [:device_id] } + # validates :key, :uniqueness => { :scope => [:device_id] } + before_validation :set_key, on: :create diff --git a/app/models/concerns/data_parser/storer.rb b/app/models/concerns/data_parser/storer.rb index 721ddd10..84a37720 100644 --- a/app/models/concerns/data_parser/storer.rb +++ b/app/models/concerns/data_parser/storer.rb @@ -54,7 +54,7 @@ def timestamp_parse(timestamp) end def sensor_reading(device, sensor) - component = device.find_or_create_component_for_sensor_reading(sensor) + component = device.create_or_find_component_for_sensor_reading(sensor) return nil if component.nil? value = component.normalized_value( (Float(sensor['value']) rescue sensor['value']) ) { diff --git a/app/models/device.rb b/app/models/device.rb index 503997c0..50ec7300 100644 --- a/app/models/device.rb +++ b/app/models/device.rb @@ -110,25 +110,25 @@ def sensor_map components.map { |c| [c.key, c.sensor.id]}.to_h end - def find_or_create_component_by_sensor_id(sensor_id) + def create_or_find_component_by_sensor_id(sensor_id) return nil if sensor_id.nil? || !Sensor.exists?(id: sensor_id) - components.find_or_create_by(sensor_id: sensor_id) + components.create_or_find_by!(sensor_id: sensor_id) end - def find_or_create_component_by_sensor_key(sensor_key) + def create_or_find_component_by_sensor_key(sensor_key) return nil if sensor_key.nil? sensor = Sensor.find_by(default_key: sensor_key) return nil if sensor.nil? - components.find_or_create_by(sensor_id: sensor.id) + components.create_or_find_by!(sensor_id: sensor.id) end - def find_or_create_component_for_sensor_reading(reading) + def create_or_find_component_for_sensor_reading(reading) key_or_id = reading["id"] if key_or_id.is_a?(Integer) || key_or_id =~ /\d+/ # It's an integer and therefore a sensor id - find_or_create_component_by_sensor_id(key_or_id) + create_or_find_component_by_sensor_id(key_or_id) else - find_or_create_component_by_sensor_key(key_or_id) + create_or_find_component_by_sensor_key(key_or_id) end end @@ -251,7 +251,10 @@ def remove_mac_address_for_newly_registered_device! def update_component_timestamps(timestamp, sensor_ids) components.select {|c| sensor_ids.include?(c.sensor_id) }.each do |component| - component.update_column(:last_reading_at, timestamp) + component.transaction do + component.lock! + component.update_column(:last_reading_at, timestamp) + end end end diff --git a/app/models/raw_storer.rb b/app/models/raw_storer.rb index 0e46402b..89e5f144 100644 --- a/app/models/raw_storer.rb +++ b/app/models/raw_storer.rb @@ -30,7 +30,7 @@ def store data, mac, version, ip, raise_errors=false metric_id = device.find_sensor_id_by_key(metric) - component = device.find_or_create_component_by_sensor_id(metric_id) + component = device.create_or_find_component_by_sensor_id(metric_id) next if component.nil? value = component.normalized_value( (Float(value) rescue value) ) diff --git a/db/migrate/20241009174732_unique_index_on_components.rb b/db/migrate/20241009174732_unique_index_on_components.rb new file mode 100644 index 00000000..a3fc5f01 --- /dev/null +++ b/db/migrate/20241009174732_unique_index_on_components.rb @@ -0,0 +1,23 @@ +class UniqueIndexOnComponents < ActiveRecord::Migration[6.1] + def up + remove_index :components, [:device_id, :sensor_id] + add_index :components, [:device_id, :sensor_id], unique: true + execute %{ + ALTER TABLE components ADD CONSTRAINT unique_sensor_for_device UNIQUE (device_id, sensor_id) + } + execute %{ + ALTER TABLE components ADD CONSTRAINT unique_key_for_device UNIQUE (device_id, key) + } + end + + def down + execute %{ + ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_key_for_device + } + execute %{ + ALTER TABLE components DROP CONSTRAINT IF EXISTS unique_sensor_for_device + } + remove_index :components, [:device_id, :sensor_id], unique: true + add_index :components, [:device_id, :sensor_id] + end +end diff --git a/db/schema.rb b/db/schema.rb index 7d9f5d01..cf2d78d4 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,8 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_10_01_080033) do - +ActiveRecord::Schema.define(version: 2024_10_09_174732) do # These are extensions that must be enabled in order to support this database enable_extension "adminpack" enable_extension "hstore" @@ -66,7 +65,9 @@ t.string "key" t.integer "bus", default: 1, null: false t.datetime "last_reading_at" - t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id" + t.index ["device_id", "key"], name: "unique_key_for_device", unique: true + t.index ["device_id", "sensor_id"], name: "index_components_on_device_id_and_sensor_id", unique: true + t.index ["device_id", "sensor_id"], name: "unique_sensor_for_device", unique: true end create_table "devices", id: :serial, force: :cascade do |t| diff --git a/spec/models/component_spec.rb b/spec/models/component_spec.rb index e6ea21ad..dc30c5b9 100644 --- a/spec/models/component_spec.rb +++ b/spec/models/component_spec.rb @@ -8,7 +8,7 @@ it "validates uniqueness of board to sensor" do component = create(:component, device: create(:device), sensor: create(:sensor)) - expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordInvalid) + expect{ create(:component, device: component.device, sensor: component.sensor) }.to raise_error(ActiveRecord::RecordNotUnique) end describe "creating a unique sensor key" do diff --git a/spec/models/device_spec.rb b/spec/models/device_spec.rb index c03a8c81..39aed487 100644 --- a/spec/models/device_spec.rb +++ b/spec/models/device_spec.rb @@ -645,19 +645,19 @@ end end - describe "#find_or_create_component_by_sensor_id" do + describe "#create_or_find_component_by_sensor_id" do context "when the sensor exists and a component already exists for this device" do it "returns the existing component" do sensor = create(:sensor) component = create(:component, sensor: sensor, device: device) - expect(device.find_or_create_component_by_sensor_id(sensor.id)).to eq(component) + expect(device.create_or_find_component_by_sensor_id(sensor.id)).to eq(component) end end context "when the sensor exists and a component does not already exist for this device" do it "returns a new valid component with the correct sensor and device" do sensor = create(:sensor) - component = device.find_or_create_component_by_sensor_id(sensor.id) + component = device.create_or_find_component_by_sensor_id(sensor.id) expect(component).not_to be_blank expect(component).to be_a Component expect(component.valid?).to be(true) @@ -670,13 +670,13 @@ context "when no sensor exists with this id" do it "returns nil" do create(:sensor, id: 12345) - expect(device.find_or_create_component_by_sensor_id(54321)).to be_blank + expect(device.create_or_find_component_by_sensor_id(54321)).to be_blank end end context "when the id is nil" do it "returns nil" do - expect(device.find_or_create_component_by_sensor_id(nil)).to be_blank + expect(device.create_or_find_component_by_sensor_id(nil)).to be_blank end end end From 4f06f4089c320817770ef3764c0e3e5696d2ea20 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Thu, 10 Oct 2024 07:40:22 +0200 Subject: [PATCH 05/11] ensure timestamp updates are atomic and applied in order --- Gemfile | 1 + Gemfile.lock | 5 +++++ app/models/device.rb | 4 ++-- app/models/storer.rb | 22 ++++++++++++---------- spec/rails_helper.rb | 2 +- spec/spec_helper.rb | 14 +++++++++++++- 6 files changed, 34 insertions(+), 14 deletions(-) diff --git a/Gemfile b/Gemfile index b003fde5..e305e16c 100644 --- a/Gemfile +++ b/Gemfile @@ -67,6 +67,7 @@ gem 'workflow-activerecord' gem 'em-mqtt' group :test do + gem "database_cleaner-active_record" gem 'simplecov', require: false gem 'timecop' gem 'vcr' diff --git a/Gemfile.lock b/Gemfile.lock index c32c6f2f..17566a47 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -131,6 +131,10 @@ GEM css_parser (1.14.0) addressable dalli (3.2.4) + database_cleaner-active_record (2.2.0) + activerecord (>= 5.a) + database_cleaner-core (~> 2.0.0) + database_cleaner-core (2.0.1) date (3.3.3) date_validator (0.12.0) activemodel (>= 3) @@ -548,6 +552,7 @@ DEPENDENCIES cane countries dalli + database_cleaner-active_record date_validator diffy doorkeeper (~> 5) diff --git a/app/models/device.rb b/app/models/device.rb index 50ec7300..52c37a8e 100644 --- a/app/models/device.rb +++ b/app/models/device.rb @@ -251,8 +251,8 @@ def remove_mac_address_for_newly_registered_device! def update_component_timestamps(timestamp, sensor_ids) components.select {|c| sensor_ids.include?(c.sensor_id) }.each do |component| - component.transaction do - component.lock! + component.lock! if self.class.connection.transaction_open? + if !component.reload.last_reading_at || timestamp > component.last_reading_at component.update_column(:last_reading_at, timestamp) end end diff --git a/app/models/storer.rb b/app/models/storer.rb index bd40d9cd..3d6288ec 100644 --- a/app/models/storer.rb +++ b/app/models/storer.rb @@ -23,18 +23,20 @@ def store device, reading, do_update = true def update_device(device, parsed_ts, sql_data) return if parsed_ts <= Time.at(0) + device.transaction(isolation: :serializable) do + device.lock! + if device.reload.last_reading_at.present? + # Comparison errors if device.last_reading_at is nil (new devices). + # Devices can post multiple readings, in a non-sorted order. + # Do not update data with an older timestamp. + return if parsed_ts < device.last_reading_at + end - if device.last_reading_at.present? - # Comparison errors if device.last_reading_at is nil (new devices). - # Devices can post multiple readings, in a non-sorted order. - # Do not update data with an older timestamp. - return if parsed_ts < device.last_reading_at + sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data + device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') + sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq + device.update_component_timestamps(parsed_ts, sensor_ids) end - - sql_data = device.data.present? ? device.data.merge(sql_data) : sql_data - device.update_columns(last_reading_at: parsed_ts, data: sql_data, state: 'has_published') - sensor_ids = sql_data.select { |k, v| k.is_a?(Integer) }.keys.compact.uniq - device.update_component_timestamps(parsed_ts, sensor_ids) end def kairos_publish(reading_data) diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 250da1ab..ab3831f6 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -74,7 +74,7 @@ # If you're not using ActiveRecord, or you'd prefer not to run each of your # examples within a transaction, remove the following line or assign false # instead of true. - config.use_transactional_fixtures = true + config.use_transactional_fixtures = false # RSpec Rails can automatically mix in different behaviours to your tests # based on their file location, for example enabling you to call `get` and diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 2d5c8cb4..e952fa21 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -14,7 +14,7 @@ # users commonly want. # # See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration - +require "database_cleaner/active_record" RSpec.configure do |config| # rspec-expectations config goes here. You can use an alternate # assertion/expectation library such as wrong or the stdlib/minitest @@ -40,6 +40,18 @@ mocks.verify_partial_doubles = true end + DatabaseCleaner.strategy = :truncation + + config.before(:suite) do + DatabaseCleaner.clean + end + + config.around(:each) do |example| + DatabaseCleaner.cleaning do + example.run + end + end + # The settings below are suggested to provide a good initial experience # with RSpec, but feel free to customize to your heart's content. From e590bbb5311f05a58af2be270aeb9e01ff4809f3 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Thu, 10 Oct 2024 08:13:14 +0200 Subject: [PATCH 06/11] reset transaction isolation level to default --- app/models/storer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/storer.rb b/app/models/storer.rb index 3d6288ec..07fd1d6a 100644 --- a/app/models/storer.rb +++ b/app/models/storer.rb @@ -23,7 +23,7 @@ def store device, reading, do_update = true def update_device(device, parsed_ts, sql_data) return if parsed_ts <= Time.at(0) - device.transaction(isolation: :serializable) do + device.transaction do device.lock! if device.reload.last_reading_at.present? # Comparison errors if device.last_reading_at is nil (new devices). From d4ab05672c18d522bb8e7bab55dab683289427f7 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Thu, 10 Oct 2024 08:50:59 +0200 Subject: [PATCH 07/11] use transactional fixtures in tests --- Gemfile | 1 - Gemfile.lock | 7 +------ spec/rails_helper.rb | 2 +- spec/spec_helper.rb | 13 ------------- 4 files changed, 2 insertions(+), 21 deletions(-) diff --git a/Gemfile b/Gemfile index e305e16c..b003fde5 100644 --- a/Gemfile +++ b/Gemfile @@ -67,7 +67,6 @@ gem 'workflow-activerecord' gem 'em-mqtt' group :test do - gem "database_cleaner-active_record" gem 'simplecov', require: false gem 'timecop' gem 'vcr' diff --git a/Gemfile.lock b/Gemfile.lock index 17566a47..09d294a4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -131,10 +131,6 @@ GEM css_parser (1.14.0) addressable dalli (3.2.4) - database_cleaner-active_record (2.2.0) - activerecord (>= 5.a) - database_cleaner-core (~> 2.0.0) - database_cleaner-core (2.0.1) date (3.3.3) date_validator (0.12.0) activemodel (>= 3) @@ -552,7 +548,6 @@ DEPENDENCIES cane countries dalli - database_cleaner-active_record date_validator diffy doorkeeper (~> 5) @@ -625,4 +620,4 @@ RUBY VERSION ruby 3.0.6p216 BUNDLED WITH - 2.5.20 + 2.5.21 diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index ab3831f6..250da1ab 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -74,7 +74,7 @@ # If you're not using ActiveRecord, or you'd prefer not to run each of your # examples within a transaction, remove the following line or assign false # instead of true. - config.use_transactional_fixtures = false + config.use_transactional_fixtures = true # RSpec Rails can automatically mix in different behaviours to your tests # based on their file location, for example enabling you to call `get` and diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index e952fa21..9de16f78 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -14,7 +14,6 @@ # users commonly want. # # See http://rubydoc.info/gems/rspec-core/RSpec/Core/Configuration -require "database_cleaner/active_record" RSpec.configure do |config| # rspec-expectations config goes here. You can use an alternate # assertion/expectation library such as wrong or the stdlib/minitest @@ -40,18 +39,6 @@ mocks.verify_partial_doubles = true end - DatabaseCleaner.strategy = :truncation - - config.before(:suite) do - DatabaseCleaner.clean - end - - config.around(:each) do |example| - DatabaseCleaner.cleaning do - example.run - end - end - # The settings below are suggested to provide a good initial experience # with RSpec, but feel free to customize to your heart's content. From d1f8f2e964ac2582bbfd74a622e60c593cf806f3 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 14 Oct 2024 06:08:50 +0200 Subject: [PATCH 08/11] sentry warning when internal mqtt queue passes a threshold --- lib/tasks/mqtt_subscriber.rake | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 83460187..68b4e271 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -10,14 +10,15 @@ namespace :mqtt do mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) - + mqtt_queue_length_warning_threshold = Env.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", 30) + mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] if mqtt_shared_subscription_group && mqtt_clean_session mqtt_client_id += "-#{ENV.fetch("HOSTNAME")}" end - + mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes) mqtt_log.info('MQTT TASK STARTING') mqtt_log.info("clean_session: #{mqtt_clean_session}") @@ -50,7 +51,7 @@ namespace :mqtt do "#{prefix}/#{topic}device/inventory" => 2 ] }) - + threshold_passed = false client.get do |topic, message| Sentry.with_scope do begin @@ -59,6 +60,12 @@ namespace :mqtt do end mqtt_log.info "Processed MQTT message in #{time}" mqtt_log.info "MQTT queue length: #{client.queue_length}" + if !threshold_passed && client.queue_length >= mqtt_queue_length_warning_threshold + threshold_passed = true + Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).") + else + threshold_passed = false + end rescue Exception => e mqtt_log.info e Sentry.capture_exception(e) From cd70c5ba456d8be7c723714965817c1fde1f62f3 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 14 Oct 2024 06:15:55 +0200 Subject: [PATCH 09/11] typo --- lib/tasks/mqtt_subscriber.rake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 68b4e271..df30a747 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -10,7 +10,7 @@ namespace :mqtt do mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) - mqtt_queue_length_warning_threshold = Env.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", 30) + mqtt_queue_length_warning_threshold = ENV.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", 30) mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] From 2adfd2148a53cd8a7f0fde58fcfa4fe0ccf6b71f Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 14 Oct 2024 06:18:43 +0200 Subject: [PATCH 10/11] env vars are strings! --- lib/tasks/mqtt_subscriber.rake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index df30a747..e91b6b3f 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -10,7 +10,7 @@ namespace :mqtt do mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) - mqtt_queue_length_warning_threshold = ENV.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", 30) + mqtt_queue_length_warning_threshold = ENV.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", "30").to_i mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] From a728506d0a9ceceb34c6415f3717b6d2cc2dbb2e Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 14 Oct 2024 06:28:35 +0200 Subject: [PATCH 11/11] threshold warning logic --- lib/tasks/mqtt_subscriber.rake | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index e91b6b3f..1e4afe26 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -60,9 +60,11 @@ namespace :mqtt do end mqtt_log.info "Processed MQTT message in #{time}" mqtt_log.info "MQTT queue length: #{client.queue_length}" - if !threshold_passed && client.queue_length >= mqtt_queue_length_warning_threshold - threshold_passed = true - Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).") + if client.queue_length >= mqtt_queue_length_warning_threshold + if !threshold_passed + Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).") + threshold_passed = true + end else threshold_passed = false end