From 2a108adefd31aa8420f8ca7ac71a7fc9447b1359 Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Wed, 6 Dec 2023 12:14:21 -0800 Subject: [PATCH] Add packet ack configuration --- Cargo.lock | 23 +++++----- src/message_cache.rs | 83 ++++++++++++++++++++++++------------ src/packet.rs | 42 ++++++++++-------- src/packet_router/mod.rs | 43 ++++++++++++++----- src/service/mod.rs | 32 ++++++++++++++ src/service/packet_router.rs | 13 +++--- src/settings.rs | 16 ++++++- 7 files changed, 179 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7befaf6..29bde508 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -221,7 +221,7 @@ dependencies = [ "rand_chacha", "rust_decimal", "serde", - "sha2 0.9.9", + "sha2 0.10.8", "thiserror", ] @@ -1262,9 +1262,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" +checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", "wasi", @@ -1572,10 +1572,11 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e8366a6159044a37876a2b9817124296703c586a5c92e2c53751fa06d8d43e8" +checksum = "97dc5fea232fc28d2f597b37c4876b348a40e33f3b02cc975c8d006d78d94b1a" dependencies = [ + "toml_datetime", "toml_edit", ] @@ -2380,15 +2381,15 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.5" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" +checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" [[package]] name = "toml_edit" -version = "0.20.7" +version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81" +checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" dependencies = [ "indexmap 2.1.0", "toml_datetime", @@ -2784,9 +2785,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.24" +version = "0.5.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0383266b19108dfc6314a56047aa545a1b4d1be60e799b4dbdd407b56402704b" +checksum = "b7e87b8dfbe3baffbe687eef2e164e32286eff31a5ee16463ce03d991643ec94" dependencies = [ "memchr", ] diff --git a/src/message_cache.rs b/src/message_cache.rs index 330fa321..8206841a 100644 --- a/src/message_cache.rs +++ b/src/message_cache.rs @@ -38,11 +38,6 @@ impl Deref for CacheMessage { } } -pub enum PopFront<'a> { - Duration(Duration), - Ack(&'a [u8]), -} - impl MessageCache { pub fn new(max_messages: u16) -> Self { let waiting = VecDeque::new(); @@ -69,15 +64,18 @@ impl MessageCache { /// Returns the index of the first matching message in the cache or None if /// not present - pub fn index_of(&self, message: &T) -> Option { - self.cache.iter().position(|m| m.message == *message) + pub fn index_of

(&self, pred: P) -> Option + where + P: Fn(&T) -> bool, + { + self.cache.iter().position(|entry| pred(&entry.message)) } /// Promotes the given message to the back of the queue, effectively /// recreating an LRU cache. Returns true if a cache hit was found pub fn tag(&mut self, message: T, received: Instant) -> bool { let result = self - .index_of(&message) + .index_of(|msg| *msg == message) .and_then(|index| self.cache.remove(index)) .is_some(); self.push_back(message, received); @@ -100,30 +98,30 @@ impl MessageCache { self.cache.push_front(cache_message); } - pub fn pop_front(&mut self, args: PopFront) -> (usize, Option>) { + pub fn pop_front(&mut self, duration: Duration) -> (usize, Option>) { let mut dropped = 0; let mut front = None; while let Some(msg) = self.cache.pop_front() { - match args { - PopFront::Duration(duration) => { - if msg.hold_time() <= duration { - front = Some(msg); - break; - } - } - PopFront::Ack(ack) => { - if msg.hash() == ack { - front = self.cache.pop_front(); - break; - } - } - }; - // held for too long or acked, count as dropped and move on + if msg.hold_time() <= duration { + front = Some(msg); + break; + } dropped += 1; } (dropped, front) } + /// Removes all items from the cache up to and including the given index. + /// + /// The index is bounds checked and an index beyond the length of the cache + /// is ignored + pub fn remove_to(&mut self, index: usize) { + if index >= self.len() { + return; + } + self.cache = self.cache.split_off(index + 1); + } + /// Returns a reference to the first (and oldest/first to be removed) /// message in the cache pub fn peek_front(&self) -> Option<&CacheMessage> { @@ -141,7 +139,8 @@ impl MessageCache { #[cfg(test)] mod test { - use super::MessageCache; + use super::{Instant, MessageCache}; + use sha2::{Digest, Sha256}; #[test] fn test_cache_tagging() { @@ -161,8 +160,36 @@ mod test { // Third tag should evict the least recently used entry (2) assert!(!cache.tag_now(vec![3])); - assert_eq!(Some(0), cache.index_of(&vec![1u8])); - assert_eq!(Some(1), cache.index_of(&vec![3u8])); - assert!(cache.index_of(&vec![2u8]).is_none()); + assert_eq!(Some(0), cache.index_of(|msg| msg.as_slice() == &[1u8])); + assert_eq!(Some(1), cache.index_of(|msg| msg.as_slice() == &[3u8])); + assert!(cache.index_of(|msg| msg.as_slice() == &[2u8]).is_none()); + } + + #[test] + fn test_remove_to() { + let mut cache = MessageCache::>::new(5); + cache.push_back(vec![1], Instant::now()); + cache.push_back(vec![2], Instant::now()); + cache.push_back(vec![3], Instant::now()); + + let ack = Sha256::digest(vec![2]).to_vec(); + + // Find entry by hash as an example + let ack_index = cache.index_of(|msg| Sha256::digest(msg).to_vec() == ack); + assert_eq!(Some(1), ack_index); + // Can't find non existing + assert_eq!(None, cache.index_of(|_| false)); + + // remove and check inclusion of remove_to + cache.remove_to(1); + assert_eq!(1, cache.len()); + + // remove past last index + cache.remove_to(5); + assert_eq!(1, cache.len()); + + // remove last element + cache.remove_to(0); + assert!(cache.is_empty()); } } diff --git a/src/packet.rs b/src/packet.rs index 6c71a60e..0cc77c35 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -9,6 +9,7 @@ use semtech_udp::{ push_data::{self, CRC}, CodingRate, DataRate, Modulation, }; +use sha2::{Digest, Sha256}; use std::{ convert::TryFrom, fmt, @@ -17,7 +18,10 @@ use std::{ }; #[derive(Debug, Clone, PartialEq)] -pub struct PacketUp(PacketRouterPacketUpV1); +pub struct PacketUp { + packet: PacketRouterPacketUpV1, + pub(crate) hash: Vec, +} #[derive(Debug, Clone)] pub struct PacketDown(PacketRouterPacketDownV1); @@ -26,18 +30,19 @@ impl Deref for PacketUp { type Target = PacketRouterPacketUpV1; fn deref(&self) -> &Self::Target { - &self.0 + &self.packet } } impl From for PacketRouterPacketUpV1 { fn from(value: PacketUp) -> Self { - value.0 + value.packet } } + impl From<&PacketUp> for PacketRouterPacketUpV1 { fn from(value: &PacketUp) -> Self { - value.0.clone() + value.packet.clone() } } @@ -51,12 +56,12 @@ impl fmt::Display for PacketUp { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_fmt(format_args!( "@{} us, {:.2} MHz, {:?}, snr: {}, rssi: {}, len: {}", - self.0.timestamp, - self.0.frequency, - self.0.datarate(), - self.0.snr, - self.0.rssi, - self.0.payload.len() + self.packet.timestamp, + self.packet.frequency, + self.packet.datarate(), + self.packet.snr, + self.packet.rssi, + self.packet.payload.len() )) } } @@ -66,15 +71,15 @@ impl TryFrom for poc_lora::LoraWitnessReportReqV1 { fn try_from(value: PacketUp) -> Result { let report = poc_lora::LoraWitnessReportReqV1 { data: vec![], - tmst: value.0.timestamp as u32, + tmst: value.packet.timestamp as u32, timestamp: SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(Error::from)? .as_nanos() as u64, - signal: value.0.rssi * 10, - snr: (value.0.snr * 10.0) as i32, - frequency: value.0.frequency as u64, - datarate: value.0.datarate, + signal: value.packet.rssi * 10, + snr: (value.packet.snr * 10.0) as i32, + frequency: value.packet.frequency as u64, + datarate: value.packet.datarate, pub_key: vec![], signature: vec![], }; @@ -106,7 +111,10 @@ impl PacketUp { gateway: gateway.into(), signature: vec![], }; - Ok(Self(packet)) + Ok(Self { + hash: Sha256::digest(&packet.payload).to_vec(), + packet, + }) } pub fn is_potential_beacon(&self) -> bool { @@ -132,7 +140,7 @@ impl PacketUp { } pub fn payload(&self) -> &[u8] { - &self.0.payload + &self.packet.payload } pub fn parse_header(payload: &[u8]) -> Result { diff --git a/src/packet_router/mod.rs b/src/packet_router/mod.rs index bddefe6f..3e3cfe65 100644 --- a/src/packet_router/mod.rs +++ b/src/packet_router/mod.rs @@ -1,7 +1,7 @@ use crate::{ gateway, - message_cache::{CacheMessage, MessageCache, MessageHash, PopFront}, - service::{packet_router::PacketRouterService, Reconnect}, + message_cache::{CacheMessage, MessageCache, MessageHash}, + service::{packet_router::PacketRouterService, AckTimer, Reconnect}, sync, Base64, PacketUp, PublicKey, Result, Settings, }; use futures::TryFutureExt; @@ -13,7 +13,6 @@ use serde::Serialize; use sha2::{Digest, Sha256}; use std::{ops::Deref, time::Instant as StdInstant}; use tokio::time::Duration; - use tracing::{debug, info, warn}; const STORE_GC_INTERVAL: Duration = Duration::from_secs(60); @@ -57,6 +56,7 @@ pub struct PacketRouter { transmit: gateway::MessageSender, service: PacketRouterService, reconnect: Reconnect, + ack_timer: AckTimer, store: MessageCache, } @@ -73,16 +73,21 @@ impl PacketRouter { transmit: gateway::MessageSender, ) -> Self { let router_settings = &settings.router; - let service = - PacketRouterService::new(router_settings.uri.clone(), settings.keypair.clone()); + let service = PacketRouterService::new( + router_settings.uri.clone(), + router_settings.ack_timeout(), + settings.keypair.clone(), + ); let store = MessageCache::new(router_settings.queue); let reconnect = Reconnect::default(); + let ack_timer = AckTimer::new(router_settings.ack_timeout()); Self { service, transmit, messages, store, reconnect, + ack_timer, } } @@ -102,6 +107,7 @@ impl PacketRouter { self.service.disconnect(); warn!("router disconnected"); self.reconnect.update_next_time(true); + self.ack_timer.update_next_time(false); }, Some(Message::Status(tx_resp)) => { let status = RouterStatus { @@ -116,6 +122,13 @@ impl PacketRouter { _ = self.reconnect.wait() => { let reconnect_result = self.handle_reconnect().await; self.reconnect.update_next_time(reconnect_result.is_err()); + self.ack_timer.update_next_time(reconnect_result.is_ok()); + }, + _ = self.ack_timer.wait() => { + warn!("no packet acks received"); + let reconnect_result = self.handle_reconnect().await; + self.reconnect.update_next_time(reconnect_result.is_err()); + self.ack_timer.update_next_time(reconnect_result.is_ok()); }, router_message = self.service.recv() => match router_message { Ok(envelope_down_v1::Data::Packet(message)) => self.handle_downlink(message).await, @@ -130,11 +143,16 @@ impl PacketRouter { self.service.disconnect(); } self.reconnect.update_next_time(session_result.is_err()); + self.ack_timer.update_next_time(session_result.is_ok()); + }, + Ok(envelope_down_v1::Data::PacketAck(message)) => { + self.handle_packet_ack(message).await; + self.ack_timer.update_next_time(true); }, - Ok(envelope_down_v1::Data::PacketAck(message)) => self.handle_packet_ack(message).await, Err(err) => { warn!(?err, "router error"); self.reconnect.update_next_time(true); + self.ack_timer.update_next_time(false); }, } } @@ -164,7 +182,14 @@ impl PacketRouter { } async fn handle_packet_ack(&mut self, message: PacketRouterPacketAckV1) { - self.store.pop_front(PopFront::Ack(&message.payload_hash)); + if message.payload_hash.is_empty() { + // Empty ack is just a heartbeat and is ignored + return; + } + if let Some(index) = self.store.index_of(|msg| msg.hash == message.payload_hash) { + self.store.remove_to(index); + debug!(removed = index, "removed acked packets"); + } } async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) -> Result { @@ -175,9 +200,7 @@ impl PacketRouter { } async fn send_waiting_packets(&mut self) -> Result { - while let (removed, Some(packet)) = - self.store.pop_front(PopFront::Duration(STORE_GC_INTERVAL)) - { + while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) { if removed > 0 { info!(removed, "discarded queued packets"); } diff --git a/src/service/mod.rs b/src/service/mod.rs index 87920d67..b13d05bc 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -59,3 +59,35 @@ impl Reconnect { self.next_time = Instant::now() + backoff; } } + +pub struct AckTimer { + next_time: Instant, + timeout: Duration, +} + +impl AckTimer { + pub fn new(timeout: Duration) -> Self { + Self { + next_time: Instant::now() + timeout, + timeout, + } + } + + pub async fn wait(&self) { + if self.next_time >= Instant::now() { + time::sleep_until(self.next_time).await + } else { + std::future::pending().await + } + } + + pub fn update_next_time(&mut self, active: bool) { + // timeout is 0 if the ack timer is not requested. Active means the + // connection is open and acks are to be expected + self.next_time = if self.timeout.as_secs() > 0 && active { + Instant::now() + self.timeout + } else { + Instant::now() - self.timeout + }; + } +} diff --git a/src/service/packet_router.rs b/src/service/packet_router.rs index 6d9b1e86..dee7f831 100644 --- a/src/service/packet_router.rs +++ b/src/service/packet_router.rs @@ -16,7 +16,7 @@ use helium_proto::{ use http::Uri; use std::{ sync::Arc, - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -30,7 +30,9 @@ pub struct PacketRouterService( ConduitService, ); -pub struct PacketRouterConduitClient {} +pub struct PacketRouterConduitClient { + ack_timeout: Duration, +} #[async_trait] impl ConduitClient for PacketRouterConduitClient { @@ -51,8 +53,7 @@ impl ConduitClient for PacketRouterConduitClient { gateway: keypair.public_key().into(), signature: vec![], session_capable: true, - // TODO: get from settings/args - packet_ack_interval: 0, + packet_ack_interval: self.ack_timeout.as_secs() as u32, }; msg.sign(keypair.clone()).await?; let msg = EnvelopeUpV1 { @@ -100,8 +101,8 @@ impl std::ops::DerefMut for PacketRouterService { } impl PacketRouterService { - pub fn new(uri: Uri, keypair: Arc) -> Self { - let client = PacketRouterConduitClient {}; + pub fn new(uri: Uri, ack_timeout: Duration, keypair: Arc) -> Self { + let client = PacketRouterConduitClient { ack_timeout }; Self(ConduitService::new("packet_router", uri, client, keypair)) } diff --git a/src/settings.rs b/src/settings.rs index 0b45f3e9..873390fd 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -2,7 +2,7 @@ use crate::{api::GatewayStakingMode, KeyedUri, Keypair, PublicKey, Region, Resul use config::{Config, Environment, File}; use http::uri::Uri; use serde::Deserialize; -use std::{fmt, path::Path, str::FromStr, sync::Arc}; +use std::{fmt, path::Path, str::FromStr, sync::Arc, time::Duration}; pub fn version() -> semver::Version { semver::Version::parse(env!("CARGO_PKG_VERSION")).expect("unable to parse version") @@ -102,6 +102,15 @@ pub struct RouterSettings { pub uri: Uri, // Maximum number of packets to queue up for the packet router pub queue: u16, + /// Timeout for packet acks in seconds + #[serde(default = "default_ack_timeout")] + pub ack: u64, +} + +impl RouterSettings { + pub fn ack_timeout(&self) -> Duration { + Duration::from_secs(self.ack) + } } impl Settings { @@ -151,6 +160,11 @@ fn default_poc_interval() -> u64 { 6 * 3600 } +fn default_ack_timeout() -> u64 { + // disabled = 0 + 0 +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Copy, clap::ValueEnum)] #[clap(rename_all = "lower")] #[repr(u8)]