Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-blocking receive never notifies #820

Open
IsaacDynamo opened this issue Mar 15, 2024 · 3 comments
Open

Non-blocking receive never notifies #820

IsaacDynamo opened this issue Mar 15, 2024 · 3 comments

Comments

@IsaacDynamo
Copy link

Expected Behavior

Expected case 1, 2 and 3 to print the same notifications.

Current Behavior

Only case 1 prints notifications, case 2 and 3 show no notifications.

Context

Trying to integrate non-blocking receive into an existing event loop. But try_recv() and recv_timeout() never generate meaning full notifications. recv() does generate notifications, but is blocking.

use rumqttc::{Client, MqttOptions, QoS, RecvTimeoutError, TryRecvError};
use std::time::Duration;

const CASE: u8 = 1;

fn main() {
    let mut mqttoptions = MqttOptions::new("rumqtt-sync", "test.mosquitto.org", 1883);
    mqttoptions.set_keep_alive(Duration::from_secs(5));

    let (client, mut connection) = Client::new(mqttoptions, 10);
    client.subscribe("hello/rumqtt", QoS::AtMostOnce).unwrap();

    loop {
        // My own event loop

        match CASE {
            1 => {
                // Output:
                // Notification = Ok(Ok(Incoming(ConnAck(ConnAck { session_present: false, code: Success }))))
                // Notification = Ok(Ok(Outgoing(Subscribe(1))))
                // Notification = Ok(Ok(Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtMostOnce)] }))))
                // Notification = Ok(Ok(Incoming(Publish(Topic = hello/rumqtt, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 13))))
                let notification = connection.recv();
                println!("Notification = {:?}", notification);
            }
            2 => {
                // No output
                let notification = connection.try_recv();
                if !matches!(notification, Err(TryRecvError::Empty)) {
                    println!("Notification = {:?}", notification);
                }
                std::thread::sleep(Duration::from_millis(1));
            }
            3 => {
                // No output
                let notification = connection.recv_timeout(Duration::from_millis(1));
                if !matches!(notification, Err(RecvTimeoutError::Timeout)) {
                    println!("Notification = {:?}", notification);
                }
            }
            _ => (),
        }
    }
}
[dependencies]
rumqttc = "0.24.0"
uname -a
Linux 5.10.16.3-microsoft-standard-WSL2 #1 SMP Fri Apr 2 22:23:49 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
@swanandx
Copy link
Member

hey, thanks for reporting.

in case 3, you can increase the timeout duration, e.g. connection.recv_timeout(Duration::from_secs(1)), it will work.

for case 2, i will get back to you!

@de-sh
Copy link
Contributor

de-sh commented Mar 16, 2024

Requests take a while to be processed, so both try_recv and recv_timeout(1 ms) will fail to respond in time and hence the connection is never established, this is unfortunate, but it seems like atleast in the case of sync code, we should deprecate try_recv and put a note in place to deter use of extremely small timeouts for the other.

We require a sync specific EventLoop, but that is clearly not on priority, so that's upto anyone interested to contribute.

@IsaacDynamo
Copy link
Author

From a users perspective the timeout in recv_timeout() should apply to reception of notification, and be decoupled from timeouts of connections that are managed inside the stack.

If try_recv() gets removed, I would reach for recv_timeout(0ms) which would still be wrong. Adding a note would be helpful, but what would be an acceptable, but small timeout? It all seems a bit finicky, and leads to surprising behavior.

For now I moved the connection to a separate thread, and post notification into a queue that can be read non-blocking from my own event loop. This works without issues so far.

let (mut client, mut connection) = Client::new(mqttoptions, 100);
let (send_event, recv_event)= std::sync::mpsc::channel();
std::thread::spawn(move || {
    for notification in connection.iter() {
        send_event.send(notification).unwrap();
    }
});

... 

loop {
    if let Ok(notification) = recv_event.try_recv() {
        println!("Notification = {:?}", notification);
    }
}     

Maybe it is possible to do something similar in the Connection implementation. This could fix to the issue, without deprecation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants