Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: buffer Framed<.., Codec> packets #826

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion benchmarks/parsers/v4.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::{Buf, BytesMut};
use rumqttc::mqttbytes::v4;
use rumqttc::mqttbytes::QoS;
use rumqttc::Packet;
use std::time::Instant;

mod common;
Expand Down Expand Up @@ -31,7 +32,7 @@ fn main() {
let start = Instant::now();
let mut packets = Vec::with_capacity(count);
while output.has_remaining() {
let packet = v4::read(&mut output, 10 * 1024).unwrap();
let packet = Packet::read(&mut output, 10 * 1024).unwrap();
packets.push(packet);
}

Expand Down
2 changes: 2 additions & 0 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

* Refactor `Network`, simplify with `Framed`

### Deprecated

### Removed
Expand Down
4 changes: 3 additions & 1 deletion rumqttc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ websocket = ["dep:async-tungstenite", "dep:ws_stream_tungstenite", "dep:http"]
proxy = ["dep:async-http-proxy"]

[dependencies]
futures-util = { version = "0.3", default_features = false, features = ["std"] }
futures-util = { version = "0.3", default_features = false, features = ["std", "sink"] }
tokio = { version = "1.36", features = ["rt", "macros", "io-util", "net", "time"] }
tokio-util = { version = "0.7", features = ["codec"] }
bytes = "1.5"
log = "0.4"
flume = { version = "0.11", default-features = false, features = ["async"] }
Expand All @@ -47,6 +48,7 @@ native-tls = { version = "0.2.11", optional = true }
url = { version = "2", default-features = false, optional = true }
# proxy
async-http-proxy = { version = "1.2.5", features = ["runtime-tokio", "basic-auth"], optional = true }
tokio-stream = "0.1.15"

[dev-dependencies]
bincode = "1.3.3"
Expand Down
85 changes: 54 additions & 31 deletions rumqttc/src/eventloop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{framed::Network, Transport};
use crate::{Incoming, MqttState, NetworkOptions, Packet, Request, StateError};
use crate::{MqttOptions, Outgoing};

use crate::framed::N;
use crate::framed::AsyncReadWrite;
use crate::mqttbytes::v4::*;
use flume::{bounded, Receiver, Sender};
use tokio::net::{lookup_host, TcpSocket, TcpStream};
Expand Down Expand Up @@ -38,8 +38,6 @@ pub enum ConnectionError {
MqttState(#[from] StateError),
#[error("Network timeout")]
NetworkTimeout,
#[error("Flush timeout")]
FlushTimeout,
#[cfg(feature = "websocket")]
#[error("Websocket: {0}")]
Websocket(#[from] async_tungstenite::tungstenite::error::Error),
Expand Down Expand Up @@ -81,7 +79,7 @@ pub struct EventLoop {
/// Pending packets from last session
pub pending: VecDeque<Request>,
/// Network connection to the broker
network: Option<Network>,
pub network: Option<Network>,
/// Keep alive time
keepalive_timeout: Option<Pin<Box<Sleep>>>,
pub network_options: NetworkOptions,
Expand All @@ -104,11 +102,10 @@ impl EventLoop {
let pending = VecDeque::new();
let max_inflight = mqtt_options.inflight;
let manual_acks = mqtt_options.manual_acks;
let max_outgoing_packet_size = mqtt_options.max_outgoing_packet_size;

EventLoop {
mqtt_options,
state: MqttState::new(max_inflight, manual_acks, max_outgoing_packet_size),
state: MqttState::new(max_inflight, manual_acks),
requests_tx,
requests_rx,
pending,
Expand Down Expand Up @@ -174,7 +171,6 @@ impl EventLoop {
// let await_acks = self.state.await_acks;
let inflight_full = self.state.inflight >= self.mqtt_options.inflight;
let collision = self.state.collision.is_some();
let network_timeout = Duration::from_secs(self.network_options.connection_timeout());

// Read buffered events from previous polls before calling a new poll
if let Some(event) = self.state.events.pop_front() {
Expand All @@ -186,13 +182,11 @@ impl EventLoop {
// instead of returning a None event, we try again.
select! {
// Pull a bunch of packets from network, reply in bunch and yield the first item
o = network.readb(&mut self.state) => {
o?;
// flush all the acks and return first incoming packet
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
Ok(inner) => inner?,
Err(_)=> return Err(ConnectionError::FlushTimeout),
};
o = network.read() => {
let incoming = o?;
if let Some(packet) = self.state.handle_incoming_packet(incoming)? {
network.write(packet).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This packet is never flushed unless there's a ping or options.max_request_batch - 1 more packets "sent".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had the same doubt, was using tests as a crutch, but I see how even the tests aren't complete.

What type of acks should we instantly respond to, I guess only PingResp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so deep in the MQTT standard but think it's just PingResp.

}
Ok(self.state.events.pop_front().unwrap())
},
// Handles pending and new requests.
Expand Down Expand Up @@ -229,11 +223,10 @@ impl EventLoop {
self.mqtt_options.pending_throttle
), if !self.pending.is_empty() || (!inflight_full && !collision) => match o {
Ok(request) => {
self.state.handle_outgoing_packet(request)?;
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
Ok(inner) => inner?,
Err(_)=> return Err(ConnectionError::FlushTimeout),
};
if let Some(outgoing) = self.state.handle_outgoing_packet(request)? {
network.write(outgoing).await?;
}

Ok(self.state.events.pop_front().unwrap())
}
Err(_) => Err(ConnectionError::RequestsDone),
Expand All @@ -245,11 +238,10 @@ impl EventLoop {
let timeout = self.keepalive_timeout.as_mut().unwrap();
timeout.as_mut().reset(Instant::now() + self.mqtt_options.keep_alive);

self.state.handle_outgoing_packet(Request::PingReq(PingReq))?;
match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
Ok(inner) => inner?,
Err(_)=> return Err(ConnectionError::FlushTimeout),
};
if let Some(outgoing) = self.state.handle_outgoing_packet(Request::PingReq(PingReq))? {
network.write(outgoing).await?;
}

Ok(self.state.events.pop_front().unwrap())
}
}
Expand Down Expand Up @@ -351,12 +343,19 @@ async fn network_connect(
options: &MqttOptions,
network_options: NetworkOptions,
) -> Result<Network, ConnectionError> {
let network_timeout = Duration::from_secs(network_options.connection_timeout());
// Process Unix files early, as proxy is not supported for them.
#[cfg(unix)]
if matches!(options.transport(), Transport::Unix) {
let file = options.broker_addr.as_str();
let socket = UnixStream::connect(Path::new(file)).await?;
let network = Network::new(socket, options.max_incoming_packet_size);
let network = Network::new(
socket,
options.max_incoming_packet_size,
options.max_outgoing_packet_size,
network_timeout,
options.network_buffer_capacity,
);
return Ok(network);
}

Expand All @@ -369,7 +368,7 @@ async fn network_connect(
_ => options.broker_address(),
};

let tcp_stream: Box<dyn N> = {
let tcp_stream: Box<dyn AsyncReadWrite> = {
#[cfg(feature = "proxy")]
match options.proxy() {
Some(proxy) => proxy.connect(&domain, port, network_options).await?,
Expand All @@ -388,13 +387,25 @@ async fn network_connect(
};

let network = match options.transport() {
Transport::Tcp => Network::new(tcp_stream, options.max_incoming_packet_size),
Transport::Tcp => Network::new(
tcp_stream,
options.max_incoming_packet_size,
options.max_outgoing_packet_size,
network_timeout,
options.network_buffer_capacity,
),
#[cfg(any(feature = "use-rustls", feature = "use-native-tls"))]
Transport::Tls(tls_config) => {
let socket =
tls::tls_connect(&options.broker_addr, options.port, &tls_config, tcp_stream)
.await?;
Network::new(socket, options.max_incoming_packet_size)
Network::new(
socket,
options.max_incoming_packet_size,
options.max_outgoing_packet_size,
network_timeout,
options.network_buffer_capacity,
)
}
#[cfg(unix)]
Transport::Unix => unreachable!(),
Expand All @@ -413,7 +424,13 @@ async fn network_connect(
async_tungstenite::tokio::client_async(request, tcp_stream).await?;
validate_response_headers(response)?;

Network::new(WsStream::new(socket), options.max_incoming_packet_size)
Network::new(
WsStream::new(socket),
options.max_incoming_packet_size,
options.max_outgoing_packet_size,
network_timeout,
options.network_buffer_capacity,
)
}
#[cfg(all(feature = "use-rustls", feature = "websocket"))]
Transport::Wss(tls_config) => {
Expand All @@ -436,7 +453,13 @@ async fn network_connect(
.await?;
validate_response_headers(response)?;

Network::new(WsStream::new(socket), options.max_incoming_packet_size)
Network::new(
WsStream::new(socket),
options.max_incoming_packet_size,
options.max_outgoing_packet_size,
network_timeout,
options.network_buffer_capacity,
)
}
};

Expand All @@ -462,7 +485,7 @@ async fn mqtt_connect(
}

// send mqtt connect packet
network.connect(connect).await?;
network.write(Packet::Connect(connect)).await?;

// validate connack
match network.read().await? {
Expand Down
Loading