Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

temporary pr Nwaku4 #103

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/basic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ futures = "0.3.30"
tokio = { version = "1.36.0", features = ["full"] }
tokio-util = { version = "0.7.10", features = ["rt"] }
waku = { path = "../../waku-bindings", package = "waku-bindings" }
serde_json = "1.0"
54 changes: 37 additions & 17 deletions examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::from_utf8;
use std::time::SystemTime;
use tokio::time::{sleep, Duration};
use waku::{
waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig,
waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig, LibwakuResponse,
};

#[tokio::main]
Expand All @@ -25,25 +25,45 @@ async fn main() -> Result<(), Error> {

// ========================================================================
// Setting an event callback to be executed each time a message is received
node2.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let message = message.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
node2.set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");

match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
});

node1.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let message = message.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
node1.set_event_callback(&|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");

match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
});

Expand Down
1 change: 1 addition & 0 deletions waku-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ waku-sys = { version = "0.5.0", path = "../waku-sys" }
libc = "0.2"
serde-aux = "4.3.1"
rln = "0.3.4"
tokio = { version = "1", features = ["full"] }

[dev-dependencies]
futures = "0.3.25"
Expand Down
7 changes: 4 additions & 3 deletions waku-bindings/src/general/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub type Result<T> = std::result::Result<T, String>;
// TODO: Properly type and deserialize payload form base64 encoded string
/// Waku message in JSON format.
/// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct WakuMessage {
#[serde(with = "base64_serde", default = "Vec::new")]
Expand Down Expand Up @@ -67,8 +67,9 @@ impl WakuMessage {
}

/// WakuMessage encoding scheme
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub enum Encoding {
#[default]
Proto,
Rlp,
Rfc26,
Expand Down Expand Up @@ -105,7 +106,7 @@ impl RegexRepresentation for Encoding {
}

/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}`
#[derive(Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct WakuContentTopic {
pub application_name: Cow<'static, str>,
pub version: Cow<'static, str>,
Expand Down
5 changes: 4 additions & 1 deletion waku-bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
mod general;
mod node;
mod utils;
pub mod utils;

// Re-export the LibwakuResponse type to make it accessible outside this module
pub use utils::LibwakuResponse;

// Required so functions inside libwaku can call RLN functions even if we
// use it within the bindings functions
Expand Down
20 changes: 5 additions & 15 deletions waku-bindings/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::MessageHash;
/// Waku event
/// For now just WakuMessage is supported
#[non_exhaustive]
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "eventType", rename_all = "camelCase")]
pub enum Event {
#[serde(rename = "message")]
Expand All @@ -26,7 +26,7 @@ pub enum Event {
}

/// Type of `event` field for a `message` event
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received
Expand All @@ -39,20 +39,10 @@ pub struct WakuMessageEvent {

/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Event) + Send + Sync>(ctx: &WakuNodeContext, mut f: F) {
let cb = |response: LibwakuResponse| {
if let LibwakuResponse::Success(v) = response {
let data: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
f(data);
};
};

pub fn waku_set_event_callback<F: FnMut(LibwakuResponse)>(ctx: &WakuNodeContext, closure: &F) {
unsafe {
let mut closure = cb;
let cb = get_trampoline(&closure);

waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
let cb = get_trampoline(closure);
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, closure as *const _ as *mut c_void)
};
}

Expand Down
13 changes: 12 additions & 1 deletion waku-bindings/src/node/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::general::Result;
use crate::node::context::WakuNodeContext;
use crate::utils::LibwakuResponse;
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
use crate::utils::WakuDecode;

/// Instantiates a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
Expand All @@ -19,7 +20,6 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
waku_sys::waku_setup();
}


let config = config.unwrap_or_default();
let config_ptr = CString::new(
serde_json::to_string(&config)
Expand Down Expand Up @@ -101,6 +101,17 @@ pub fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
handle_response(code, result)
}

// Implement WakuDecode for Vec<Multiaddr>
impl WakuDecode for Vec<Multiaddr> {
fn decode(input: &str) -> Result<Self> {
input
.split(',')
.map(|s| s.trim().parse::<Multiaddr>().map_err(|err| err.to_string()))
.collect::<Result<Vec<Multiaddr>>>() // Collect results into a Vec
.map_err(|err| format!("could not parse Multiaddr: {}", err))
}
}

/// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
Expand Down
7 changes: 4 additions & 3 deletions waku-bindings/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::marker::PhantomData;
use std::time::Duration;
// internal
use crate::general::{MessageHash, Result, WakuMessage};
use crate::LibwakuResponse;
use context::WakuNodeContext;

pub use config::RLNConfig;
Expand All @@ -36,7 +37,7 @@ impl WakuNodeState for Running {}

/// Handle to the underliying waku node
pub struct WakuNodeHandle<State: WakuNodeState> {
ctx: WakuNodeContext,
pub ctx: WakuNodeContext,
phantom: PhantomData<State>,
}

Expand Down Expand Up @@ -116,7 +117,7 @@ impl WakuNodeHandle<Running> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
}

pub fn set_event_callback<F: FnMut(Event) + Send + Sync + 'static>(&self, f: F) {
events::waku_set_event_callback(&self.ctx, f)
pub fn set_event_callback<F: Fn(LibwakuResponse)>(&self, closure: &F) {
events::waku_set_event_callback(&self.ctx, closure)
}
}
17 changes: 10 additions & 7 deletions waku-bindings/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::general::Result;
use core::str::FromStr;
use serde::de::DeserializeOwned;
use std::convert::TryFrom;
use std::{slice, str};
use waku_sys::WakuCallBack;
Expand Down Expand Up @@ -32,9 +31,13 @@ impl TryFrom<(u32, &str)> for LibwakuResponse {
}
}

pub fn decode<T: DeserializeOwned>(input: String) -> Result<T> {
serde_json::from_str(input.as_str())
.map_err(|err| format!("could not deserialize waku response: {}", err))
// Define the WakuDecode trait
pub trait WakuDecode: Sized {
fn decode(input: &str) -> Result<Self>;
}

pub fn decode<T: WakuDecode>(input: String) -> Result<T> {
T::decode(input.as_str())
}

unsafe extern "C" fn trampoline<F>(
Expand All @@ -45,7 +48,7 @@ unsafe extern "C" fn trampoline<F>(
) where
F: FnMut(LibwakuResponse),
{
let user_data = &mut *(user_data as *mut F);
let closure = &mut *(user_data as *mut F);

let response = if data.is_null() {
""
Expand All @@ -57,7 +60,7 @@ unsafe extern "C" fn trampoline<F>(
let result = LibwakuResponse::try_from((ret_code as u32, response))
.expect("invalid response obtained from libwaku");

user_data(result);
closure(result);
}

pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack
Expand All @@ -84,7 +87,7 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
}
}

pub fn handle_json_response<F: DeserializeOwned>(code: i32, result: LibwakuResponse) -> Result<F> {
pub fn handle_json_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> {
match result {
LibwakuResponse::Success(v) => decode(v.unwrap_or_default()),
LibwakuResponse::Failure(v) => Err(v),
Expand Down
Loading
Loading