Skip to content

Commit

Permalink
cln-plugin: Add dynamic configs and a callback for changes
Browse files Browse the repository at this point in the history
Changelog-Added: cln-plugin: Add dynamic configs and a callback for changes
  • Loading branch information
daywalker90 authored and cdecker committed May 15, 2024
1 parent dab9605 commit b69609b
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 47 deletions.
33 changes: 30 additions & 3 deletions plugins/examples/cln-plugin-startup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
//! plugins using the Rust API against Core Lightning.
#[macro_use]
extern crate serde_json;
use cln_plugin::options::{DefaultIntegerConfigOption, IntegerConfigOption};
use cln_plugin::options::{
self, BooleanConfigOption, DefaultIntegerConfigOption, IntegerConfigOption,
};
use cln_plugin::{messages, Builder, Error, Plugin};
use tokio;

Expand All @@ -21,9 +23,17 @@ const TEST_OPTION_NO_DEFAULT: IntegerConfigOption =
async fn main() -> Result<(), anyhow::Error> {
let state = ();

let test_dynamic_option: BooleanConfigOption = BooleanConfigOption::new_bool_no_default(
"test-dynamic-option",
"A option that can be changed dynamically",
)
.dynamic();

if let Some(plugin) = Builder::new(tokio::io::stdin(), tokio::io::stdout())
.option(TEST_OPTION)
.option(TEST_OPTION_NO_DEFAULT)
.option(test_dynamic_option)
.setconfig_callback(setconfig_callback)
.rpcmethod("testmethod", "This is a test", testmethod)
.rpcmethod(
"testoptions",
Expand All @@ -48,10 +58,27 @@ async fn main() -> Result<(), anyhow::Error> {
}
}

async fn setconfig_callback(
plugin: Plugin<()>,
args: serde_json::Value,
) -> Result<serde_json::Value, Error> {
let name = args.get("config").unwrap().as_str().unwrap();
let value = args.get("val").unwrap();

let opt_value = options::Value::String(value.to_string());

plugin.set_option_str(name, opt_value)?;
log::info!(
"cln-plugin-startup: Got dynamic option change: {} {}",
name,
plugin.option_str(name).unwrap().unwrap().as_str().unwrap()
);
Ok(json!({}))
}

async fn testoptions(p: Plugin<()>, _v: serde_json::Value) -> Result<serde_json::Value, Error> {
let test_option = p.option(&TEST_OPTION)?;
let test_option_no_default = p
.option(&TEST_OPTION_NO_DEFAULT)?;
let test_option_no_default = p.option(&TEST_OPTION_NO_DEFAULT)?;

Ok(json!({
"test-option": test_option,
Expand Down
135 changes: 93 additions & 42 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ where
options: HashMap<String, UntypedConfigOption>,
option_values: HashMap<String, Option<options::Value>>,
rpcmethods: HashMap<String, RpcMethod<S>>,
setconfig_callback: Option<AsyncCallback<S>>,
subscriptions: HashMap<String, Subscription<S>>,
// Contains a Subscription if the user subscribed to "*"
wildcard_subscription : Option<Subscription<S>>,
wildcard_subscription: Option<Subscription<S>>,
notifications: Vec<NotificationTopic>,
custommessages: Vec<u16>,
featurebits: FeatureBits,
Expand All @@ -72,9 +73,10 @@ where
option_values: HashMap<String, Option<options::Value>>,
configuration: Configuration,
rpcmethods: HashMap<String, AsyncCallback<S>>,
setconfig_callback: Option<AsyncCallback<S>>,
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
wildcard_subscription : Option<AsyncNotificationCallback<S>>,
wildcard_subscription: Option<AsyncNotificationCallback<S>>,
#[allow(dead_code)] // unsure why rust thinks this field isn't used
notifications: Vec<NotificationTopic>,
}
Expand All @@ -90,11 +92,12 @@ where
{
plugin: Plugin<S>,
rpcmethods: HashMap<String, AsyncCallback<S>>,
setconfig_callback: Option<AsyncCallback<S>>,

#[allow(dead_code)] // Unused until we fill in the Hook structs.
hooks: HashMap<String, AsyncCallback<S>>,
subscriptions: HashMap<String, AsyncNotificationCallback<S>>,
wildcard_subscription : Option<AsyncNotificationCallback<S>>
wildcard_subscription: Option<AsyncNotificationCallback<S>>,
}

#[derive(Clone)]
Expand All @@ -106,7 +109,7 @@ where
state: S,
/// "options" field of "init" message sent by cln
options: HashMap<String, UntypedConfigOption>,
option_values: HashMap<String, Option<options::Value>>,
option_values: Arc<std::sync::Mutex<HashMap<String, Option<options::Value>>>>,
/// "configuration" field of "init" message sent by cln
configuration: Configuration,
/// A signal that allows us to wait on the plugin's shutdown.
Expand All @@ -133,6 +136,7 @@ where
// This values are set when parsing the init-call
option_values: HashMap::new(),
rpcmethods: HashMap::new(),
setconfig_callback: None,
notifications: vec![],
featurebits: FeatureBits::default(),
dynamic: false,
Expand Down Expand Up @@ -179,13 +183,12 @@ where
F: Future<Output = Result<(), Error>> + Send + 'static,
{
let subscription = Subscription {
callback : Box::new(move |p, r| Box::pin(callback(p, r)))
callback: Box::new(move |p, r| Box::pin(callback(p, r))),
};

if topic == "*" {
self.wildcard_subscription = Some(subscription);
}
else {
} else {
self.subscriptions.insert(topic.to_string(), subscription);
};
self
Expand Down Expand Up @@ -233,6 +236,17 @@ where
self
}

/// Register a callback for setconfig to accept changes for dynamic options
pub fn setconfig_callback<C, F>(mut self, setconfig_callback: C) -> Builder<S, I, O>
where
C: Send + Sync + 'static,
C: Fn(Plugin<S>, Request) -> F + 'static,
F: Future<Output = Response> + Send + 'static,
{
self.setconfig_callback = Some(Box::new(move |p, r| Box::pin(setconfig_callback(p, r))));
self
}

/// Send true value for "dynamic" field in "getmanifest" response
pub fn dynamic(mut self) -> Builder<S, I, O> {
self.dynamic = true;
Expand Down Expand Up @@ -337,7 +351,7 @@ where

let subscriptions =
HashMap::from_iter(self.subscriptions.drain().map(|(k, v)| (k, v.callback)));
let all_subscription = self.wildcard_subscription.map(|s| s.callback);
let all_subscription = self.wildcard_subscription.map(|s| s.callback);

// Leave the `init` reply pending, so we can disable based on
// the options if required.
Expand All @@ -347,9 +361,10 @@ where
input,
output,
rpcmethods,
setconfig_callback: self.setconfig_callback,
notifications: self.notifications,
subscriptions,
wildcard_subscription: all_subscription,
wildcard_subscription: all_subscription,
options: self.options,
option_values: self.option_values,
configuration,
Expand Down Expand Up @@ -389,9 +404,12 @@ where
})
.collect();

let subscriptions = self.subscriptions.keys()
let subscriptions = self
.subscriptions
.keys()
.map(|s| s.clone())
.chain(self.wildcard_subscription.iter().map(|_| String::from("*"))).collect();
.chain(self.wildcard_subscription.iter().map(|_| String::from("*")))
.collect();

messages::GetManifestResponse {
options: self.options.values().cloned().collect(),
Expand Down Expand Up @@ -524,9 +542,11 @@ where
{
pub fn option_str(&self, name: &str) -> Result<Option<options::Value>> {
self.option_values
.lock()
.unwrap()
.get(name)
.ok_or(anyhow!("No option named {}", name))
.map(|c| c.clone())
.cloned()
}

pub fn option<'a, OV: OptionType<'a>>(
Expand All @@ -536,6 +556,25 @@ where
let value = self.option_str(config_option.name())?;
Ok(OV::from_value(&value))
}

pub fn set_option_str(&self, name: &str, value: options::Value) -> Result<()> {
*self
.option_values
.lock()
.unwrap()
.get_mut(name)
.ok_or(anyhow!("No option named {}", name))? = Some(value);
Ok(())
}

pub fn set_option<'a, OV: OptionType<'a>>(
&self,
config_option: &options::ConfigOption<'a, OV>,
value: options::Value,
) -> Result<()> {
self.set_option_str(config_option.name(), value)?;
Ok(())
}
}

impl<S, I, O> ConfiguredPlugin<S, I, O>
Expand All @@ -557,7 +596,7 @@ where
let plugin = Plugin {
state,
options: self.options,
option_values: self.option_values,
option_values: Arc::new(std::sync::Mutex::new(self.option_values)),
configuration: self.configuration,
wait_handle,
sender,
Expand All @@ -566,9 +605,10 @@ where
let driver = PluginDriver {
plugin: plugin.clone(),
rpcmethods: self.rpcmethods,
setconfig_callback: self.setconfig_callback,
hooks: self.hooks,
subscriptions: self.subscriptions,
wildcard_subscription : self.wildcard_subscription
wildcard_subscription: self.wildcard_subscription,
};

output
Expand Down Expand Up @@ -704,9 +744,16 @@ where
.context("Missing 'method' in request")?
.as_str()
.context("'method' is not a string")?;
let callback = self.rpcmethods.get(method).with_context(|| {
anyhow!("No handler for method '{}' registered", method)
})?;
let callback = match method {
name if name.eq("setconfig") => {
self.setconfig_callback.as_ref().ok_or_else(|| {
anyhow!("No handler for method '{}' registered", method)
})?
}
_ => self.rpcmethods.get(method).with_context(|| {
anyhow!("No handler for method '{}' registered", method)
})?,
};
let params = request
.get("params")
.context("Missing 'params' field in request")?
Expand Down Expand Up @@ -740,7 +787,7 @@ where
Ok(())
}
messages::JsonRpc::CustomNotification(request) => {
// This code handles notifications
// This code handles notifications
trace!("Dispatching custom notification {:?}", request);
let method = request
.get("method")
Expand All @@ -751,30 +798,34 @@ where
let params = request
.get("params")
.context("Missing 'params' field in request")?;

// Send to notification to the wildcard
// subscription "*" it it exists
match &self.wildcard_subscription {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move {call.await.unwrap()});}
None => {}
};

// Find the appropriate callback and process it
// We'll log a warning if no handler is defined
match self.subscriptions.get(method) {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move {call.await.unwrap()});
},
None => {
if self.wildcard_subscription.is_none() {
log::warn!("No handler for notification '{}' registered", method);
}
}
};
Ok(())

// Send to notification to the wildcard
// subscription "*" it it exists
match &self.wildcard_subscription {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move { call.await.unwrap() });
}
None => {}
};

// Find the appropriate callback and process it
// We'll log a warning if no handler is defined
match self.subscriptions.get(method) {
Some(cb) => {
let call = cb(plugin.clone(), params.clone());
tokio::spawn(async move { call.await.unwrap() });
}
None => {
if self.wildcard_subscription.is_none() {
log::warn!(
"No handler for notification '{}' registered",
method
);
}
}
};
Ok(())
}
}
}
Expand Down
Loading

0 comments on commit b69609b

Please sign in to comment.