Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cln_plugin : Support wildcard subscriptions #7106

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion plugins/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,14 @@ PLUGIN_BASES := $(PLUGINS:plugins/%=%) $(PY_PLUGINS:plugins/%=%)
plugins/list_of_builtin_plugins_gen.h: plugins/Makefile Makefile config.vars
@$(call VERBOSE,GEN $@,echo "static const char *list_of_builtin_plugins[] = { $(PLUGIN_BASES:%=\"%\",) NULL };" > $@)

target/${RUST_PROFILE}/examples/cln-subscribe-wildcard: ${CLN_PLUGIN_SRC} plugins/examples/cln-subscribe-wildcard.rs
cargo build ${CARGO_OPTS} --example cln-subscribe-wildcard

CLN_PLUGIN_EXAMPLES := \
target/${RUST_PROFILE}/examples/cln-plugin-startup \
target/${RUST_PROFILE}/examples/cln-plugin-reentrant \
target/${RUST_PROFILE}/examples/cln-rpc-getinfo
target/${RUST_PROFILE}/examples/cln-rpc-getinfo \
target/${RUST_PROFILE}/examples/cln-subscribe-wildcard

CLN_PLUGIN_SRC = $(shell find plugins/src -name "*.rs")

Expand Down
35 changes: 35 additions & 0 deletions plugins/examples/cln-subscribe-wildcard.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/// This plug-in subscribes to the wildcard-notifications
/// and creates a corresponding log-entry

use anyhow::Result;
use cln_plugin::{Builder, Plugin};

#[tokio::main]
async fn main() -> Result<()> {
let state = ();

let configured = Builder::new(tokio::io::stdin(), tokio::io::stdout())
.subscribe("*", handle_wildcard_notification)
.start(state)
.await?;

match configured {
Some(p) => p.join().await?,
None => return Ok(()) // cln was started with --help
};

Ok(())
}

async fn handle_wildcard_notification(_plugin: Plugin<()>, value : serde_json::Value) -> Result<()> {
let notification_type : String = value
.as_object()
.unwrap()
.keys()
.next()
.unwrap()
.into();

log::info!("Received notification {}", notification_type);
Ok(())
}
68 changes: 50 additions & 18 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ where
option_values: HashMap<String, Option<options::Value>>,
rpcmethods: HashMap<String, RpcMethod<S>>,
subscriptions: HashMap<String, Subscription<S>>,
// Contains a Subscription if the user subscribed to "*"
wildcard_subscription : Option<Subscription<S>>,
notifications: Vec<NotificationTopic>,
custommessages: Vec<u16>,
featurebits: FeatureBits,
Expand All @@ -72,6 +74,7 @@ where
rpcmethods: HashMap<String, AsyncCallback<S>>,
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
wildcard_subscription : Option<AsyncNotificationCallback<S>>,
#[allow(dead_code)] // unsure why rust thinks this field isn't used
notifications: Vec<NotificationTopic>,
}
Expand All @@ -91,6 +94,7 @@ where
#[allow(dead_code)] // Unused until we fill in the Hook structs.
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
wildcard_subscription : Option<AsyncNotificationCallback<S>>
}

#[derive(Clone)]
Expand Down Expand Up @@ -123,6 +127,7 @@ where
output: Some(output),
hooks: HashMap::new(),
subscriptions: HashMap::new(),
wildcard_subscription: None,
options: HashMap::new(),
// Should not be configured by user.
// This values are set when parsing the init-call
Expand Down Expand Up @@ -173,12 +178,16 @@ where
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Result<(), Error>> + Send + 'static,
{
self.subscriptions.insert(
topic.to_string(),
Subscription {
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
},
);
let subscription = Subscription {
callback : Box::new(move |p, r| Box::pin(callback(p, r)))
};

if topic == "*" {
self.wildcard_subscription = Some(subscription);
}
else {
self.subscriptions.insert(topic.to_string(), subscription);
};
self
}

Expand Down Expand Up @@ -328,6 +337,7 @@ where

let subscriptions =
HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
let all_subscription = self.wildcard_subscription.map(|s| s.callback);

// Leave the `init` reply pending, so we can disable based on
// the options if required.
Expand All @@ -339,6 +349,7 @@ where
rpcmethods,
notifications: self.notifications,
subscriptions,
wildcard_subscription: all_subscription,
options: self.options,
option_values: self.option_values,
configuration,
Expand Down Expand Up @@ -378,9 +389,13 @@ where
})
.collect();

let subscriptions = self.subscriptions.keys()
.map(|s| s.clone())
.chain(self.wildcard_subscription.iter().map(|_| String::from("*"))).collect();

messages::GetManifestResponse {
options: self.options.values().cloned().collect(),
subscriptions: self.subscriptions.keys().map(|s| s.clone()).collect(),
subscriptions,
hooks: self.hooks.keys().map(|s| s.clone()).collect(),
rpcmethods,
notifications: self.notifications.clone(),
Expand Down Expand Up @@ -553,6 +568,7 @@ where
rpcmethods: self.rpcmethods,
hooks: self.hooks,
subscriptions: self.subscriptions,
wildcard_subscription : self.wildcard_subscription
};

output
Expand Down Expand Up @@ -724,25 +740,41 @@ where
Ok(())
}
messages::JsonRpc::CustomNotification(request) => {
// This code handles notifications
trace!("Dispatching custom notification {:?}", request);
let method = request
.get("method")
.context("Missing 'method' in request")?
.as_str()
.context("'method' is not a string")?;
let callback = self.subscriptions.get(method).with_context(|| {
anyhow!("No handler for notification '{}' registered", method)
})?;

let params = request
.get("params")
.context("Missing 'params' field in request")?
.clone();

let plugin = plugin.clone();
let call = callback(plugin.clone(), params);

tokio::spawn(async move { call.await.unwrap() });
Ok(())
.context("Missing 'params' field in request")?;

// Send to notification to the wildcard
// subscription "*" it it exists
match &self.wildcard_subscription {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move {call.await.unwrap()});}
None => {}
};

// Find the appropriate callback and process it
// We'll log a warning if no handler is defined
match self.subscriptions.get(method) {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move {call.await.unwrap()});
},
None => {
if self.wildcard_subscription.is_none() {
log::warn!("No handler for notification '{}' registered", method);
}
}
};
Ok(())
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions tests/test_cln_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,15 @@ def test_grpc_decode(node_factory):
string=inv.bolt11
))
print(res)


def test_rust_plugin_subscribe_wildcard(node_factory):
ErikDeSmedt marked this conversation as resolved.
Show resolved Hide resolved
""" Creates a plugin that loads the subscribe_wildcard plugin
"""
bin_path = Path.cwd() / "target" / RUST_PROFILE / "examples" / "cln-subscribe-wildcard"
l1 = node_factory.get_node(options={"plugin": bin_path})
l2 = node_factory.get_node()

l2.connect(l1)

l1.daemon.wait_for_log("Received notification connect")
Loading