Skip to content

Commit

Permalink
final set of adaptations to current nwaku
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Nov 2, 2024
1 parent e5aaa4d commit dfe0682
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/basic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ futures = "0.3.30"
tokio = { version = "1.36.0", features = ["full"] }
tokio-util = { version = "0.7.10", features = ["rt"] }
waku = { path = "../../waku-bindings", package = "waku-bindings" }
serde_json = "1.0"
54 changes: 37 additions & 17 deletions examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::from_utf8;
use std::time::SystemTime;
use tokio::time::{sleep, Duration};
use waku::{
waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig,
waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig, LibwakuResponse,
};

#[tokio::main]
Expand All @@ -25,25 +25,45 @@ async fn main() -> Result<(), Error> {

// ========================================================================
// Setting an event callback to be executed each time a message is received
node2.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let message = message.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
node2.set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");

match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
});

node1.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let message = message.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
node1.set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");

match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
});

Expand Down
1 change: 1 addition & 0 deletions waku-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ waku-sys = { version = "0.5.0", path = "../waku-sys" }
libc = "0.2"
serde-aux = "4.3.1"
rln = "0.3.4"
tokio = { version = "1", features = ["full"] }

[dev-dependencies]
futures = "0.3.25"
Expand Down
6 changes: 3 additions & 3 deletions waku-bindings/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ pub struct WakuMessageEvent {

/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse)>(ctx: &WakuNodeContext, closure: F) {
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse)>(ctx: &WakuNodeContext, closure: &F) {
unsafe {
let cb = get_trampoline(&closure);
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &closure as *const _ as *mut c_void)
let cb = get_trampoline(closure);
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, closure as *const _ as *mut c_void)
};
}

Expand Down
4 changes: 2 additions & 2 deletions waku-bindings/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl WakuNodeHandle<Running> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
}

pub fn set_event_callback<F: FnMut(LibwakuResponse)>(&self, f: F) {
events::waku_set_event_callback(&self.ctx, f)
pub fn set_event_callback<F: Fn(LibwakuResponse)>(&self, closure: &F) {
events::waku_set_event_callback(&self.ctx, closure)
}
}
48 changes: 17 additions & 31 deletions waku-bindings/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@ use std::str::FromStr;
use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8};
use std::cell::OnceCell;
use tokio::sync::broadcast::{self, Sender};
use waku_bindings::LibwakuResponse;
use std::sync::{Arc, OnceLock, Mutex}; // Import Arc and Mutex
use tokio::time;
use tokio::time::sleep;
use waku_bindings::utils;
use waku_bindings::{
waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage,
WakuNodeConfig, WakuNodeHandle,
};
use std::ffi::c_void;
const ECHO_TIMEOUT: u64 = 1000;
const ECHO_MESSAGE: &str = "Hi from 🦀!";
const TEST_PUBSUBTOPIC: &str = "test";
Expand All @@ -29,24 +25,18 @@ fn try_publish_relay_messages(
]))
}

#[derive(Debug, Clone)]
struct Response {
hash: MessageHash,
payload: Vec<u8>,
}

async fn test_echo_messages(
node1: &WakuNodeHandle<Running>,
node2: &WakuNodeHandle<Running>,
content: &'static str,
content_topic: WakuContentTopic,
) {
) -> Result<(), String> {
// setting a naïve event handler to avoid appearing ERR messages in logs
node1.set_event_callback(|_LibwakuResponse| {});
node1.set_event_callback(&|_| {});

let rx_waku_message: OnceCell<WakuMessage> = OnceCell::new();

let closure = |response: LibwakuResponse| {
let closure = |response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
Expand All @@ -65,12 +55,7 @@ async fn test_echo_messages(

println!("Before setting event callback");

unsafe {
let cb = utils::get_trampoline(&closure);
waku_sys::waku_set_event_callback(node2.ctx.obj_ptr, cb, &closure as *const _ as *mut c_void)
};

// node2.set_event_callback(closure); // Set the event callback with the closure
node2.set_event_callback(&closure); // Set the event callback with the closure

let topic = TEST_PUBSUBTOPIC.to_string();
node1.relay_subscribe(&topic).unwrap();
Expand Down Expand Up @@ -100,22 +85,28 @@ async fn test_echo_messages(
Vec::new(),
false,
);
let ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
println!("After publish");
let _ids = try_publish_relay_messages(node1, &message).expect("send relay messages");

// Wait for the msg to arrive
for _ in 0..50 {
if let Some(value) = rx_waku_message.get() {
println!("The waku message value is: {:?}", value);
break;
if let Some(msg) = rx_waku_message.get() {
println!("The waku message value is: {:?}", msg);
let payload = msg.payload.to_vec();
let payload_str = from_utf8(&payload).expect("should be valid message");
println!("payload: {:?}", payload_str);
if payload_str == ECHO_MESSAGE {
return Ok(())
}
} else {
sleep(Duration::from_millis(100)).await;
}
}

if let None = rx_waku_message.get() {
println!("ERROR could not get waku message");
return Err("could not get waku message".to_string())
}

return Err("Unexpected test ending".to_string())
}

#[tokio::test]
Expand All @@ -134,7 +125,6 @@ async fn default_echo() -> Result<(), String> {
let node1 = node1.start()?;
let node2 = node2.start()?;

let waku_version = node2.version()?;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);

let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));
Expand All @@ -148,12 +138,8 @@ async fn default_echo() -> Result<(), String> {

assert!(got_all);

let node2 = node2.stop()?;
let node1 = node1.stop()?;

let sleep = time::sleep(Duration::from_secs(5));
tokio::pin!(sleep);

let node2 = node2.stop()?;
waku_destroy(node1)?;
waku_destroy(node2)?;

Expand Down

0 comments on commit dfe0682

Please sign in to comment.