diff --git a/Cargo.lock b/Cargo.lock index 95df1016c..2a77bc9be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7719,7 +7719,6 @@ dependencies = [ "env_logger", "flexible-transcript", "hex", - "jsonrpsee", "log", "once_cell", "rand_core", @@ -7727,9 +7726,6 @@ dependencies = [ "serai-db", "serai-env", "serai-primitives", - "serde", - "serde_json", - "simple-request", "tokio", "zeroize", ] diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 06319f70a..194a9cee5 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -769,7 +769,7 @@ async fn handle_processor_messages( mut db: D, key: Zeroizing<::F>, serai: Arc, - mut processors: Pro, + processors: Pro, p2p: P, cosign_channel: mpsc::UnboundedSender, network: NetworkId, diff --git a/coordinator/src/processors.rs b/coordinator/src/processors.rs index 7dc34ec99..9157e2a6e 100644 --- a/coordinator/src/processors.rs +++ b/coordinator/src/processors.rs @@ -15,8 +15,8 @@ pub struct Message { #[async_trait::async_trait] pub trait Processors: 'static + Send + Sync + Clone { async fn send(&self, network: NetworkId, msg: impl Send + Into); - async fn recv(&mut self, network: NetworkId) -> Message; - async fn ack(&mut self, msg: Message); + async fn recv(&self, network: NetworkId) -> Message; + async fn ack(&self, msg: Message); } #[async_trait::async_trait] @@ -28,7 +28,7 @@ impl Processors for Arc { let msg = borsh::to_vec(&msg).unwrap(); self.queue(metadata, msg).await; } - async fn recv(&mut self, network: NetworkId) -> Message { + async fn recv(&self, network: NetworkId) -> Message { let msg = self.next(Service::Processor(network)).await; assert_eq!(msg.from, Service::Processor(network)); @@ -40,7 +40,7 @@ impl Processors for Arc { return Message { id, network, msg }; } - async fn ack(&mut self, msg: Message) { + async fn ack(&self, msg: Message) { MessageQueue::ack(self, Service::Processor(msg.network), msg.id).await } } diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index 29aec9043..3763cf0da 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -35,10 +35,10 @@ impl Processors for MemProcessors { let processor = processors.entry(network).or_insert_with(VecDeque::new); processor.push_back(msg.into()); } - async fn recv(&mut self, _: NetworkId) -> Message { + async fn recv(&self, _: NetworkId) -> Message { todo!() } - async fn ack(&mut self, _: Message) { + async fn ack(&self, _: Message) { todo!() } } diff --git a/message-queue/Cargo.toml b/message-queue/Cargo.toml index 5d2285260..77fb5264e 100644 --- a/message-queue/Cargo.toml +++ b/message-queue/Cargo.toml @@ -16,12 +16,10 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] # Macros once_cell = { version = "1", default-features = false } -serde = { version = "1", default-features = false, features = ["std", "derive"] } # Encoders hex = { version = "0.4", default-features = false, features = ["std"] } borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } -serde_json = { version = "1", default-features = false, features = ["std"] } # Libs zeroize = { version = "1", default-features = false, features = ["std"] } @@ -37,16 +35,13 @@ log = { version = "0.4", default-features = false, features = ["std"] } env_logger = { version = "0.10", default-features = false, features = ["humantime"] } # Uses a single threaded runtime since this shouldn't ever be CPU-bound -tokio = { version = "1", default-features = false, features = ["rt", "time", "macros"] } +tokio = { version = "1", default-features = false, features = ["rt", "time", "io-util", "net", "macros"] } serai-db = { path = "../common/db", features = ["rocksdb"], optional = true } serai-env = { path = "../common/env" } -serai-primitives = { path = "../substrate/primitives", features = ["borsh", "serde"] } - -jsonrpsee = { version = "0.16", default-features = false, features = ["server"], optional = true } -simple-request = { path = "../common/request", default-features = false } +serai-primitives = { path = "../substrate/primitives", features = ["borsh"] } [features] -binaries = ["serai-db", "jsonrpsee"] +binaries = ["serai-db"] diff --git a/message-queue/src/client.rs b/message-queue/src/client.rs index ed1031022..e46987b6a 100644 --- a/message-queue/src/client.rs +++ b/message-queue/src/client.rs @@ -9,19 +9,20 @@ use ciphersuite::{ }; use schnorr_signatures::SchnorrSignature; -use serde::{Serialize, Deserialize}; - -use simple_request::{hyper::Request, Client}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, +}; use serai_env as env; -use crate::{Service, Metadata, QueuedMessage, message_challenge, ack_challenge}; +#[rustfmt::skip] +use crate::{Service, Metadata, QueuedMessage, MessageQueueRequest, message_challenge, ack_challenge}; pub struct MessageQueue { pub service: Service, priv_key: Zeroizing<::F>, pub_key: ::G, - client: Client, url: String, } @@ -37,17 +38,8 @@ impl MessageQueue { if !url.contains(':') { url += ":2287"; } - if !url.starts_with("http://") { - url = "http://".to_string() + &url; - } - MessageQueue { - service, - pub_key: Ristretto::generator() * priv_key.deref(), - priv_key, - client: Client::with_connection_pool(), - url, - } + MessageQueue { service, pub_key: Ristretto::generator() * priv_key.deref(), priv_key, url } } pub fn from_env(service: Service) -> MessageQueue { @@ -72,60 +64,14 @@ impl MessageQueue { Self::new(service, url, priv_key) } - async fn json_call(&self, method: &'static str, params: serde_json::Value) -> serde_json::Value { - #[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] - struct JsonRpcRequest { - jsonrpc: &'static str, - method: &'static str, - params: serde_json::Value, - id: u64, - } - - let mut res = loop { - // Make the request - match self - .client - .request( - Request::post(&self.url) - .header("Content-Type", "application/json") - .body( - serde_json::to_vec(&JsonRpcRequest { - jsonrpc: "2.0", - method, - params: params.clone(), - id: 0, - }) - .unwrap() - .into(), - ) - .unwrap(), - ) - .await - { - Ok(req) => { - // Get the response - match req.body().await { - Ok(res) => break res, - Err(e) => { - dbg!(e); - } - } - } - Err(e) => { - dbg!(e); - } - } - - // Sleep for a second before trying again - tokio::time::sleep(core::time::Duration::from_secs(1)).await; + #[must_use] + async fn send(socket: &mut TcpStream, msg: MessageQueueRequest) -> bool { + let msg = borsh::to_vec(&msg).unwrap(); + let Ok(_) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { + return false; }; - - let json: serde_json::Value = - serde_json::from_reader(&mut res).expect("message-queue returned invalid JSON"); - if json.get("result").is_none() { - panic!("call failed: {json}"); - } - json + let Ok(_) = socket.write_all(&msg).await else { return false }; + true } pub async fn queue(&self, metadata: Metadata, msg: Vec) { @@ -146,30 +92,76 @@ impl MessageQueue { ) .serialize(); - let json = self.json_call("queue", serde_json::json!([metadata, msg, sig])).await; - if json.get("result") != Some(&serde_json::Value::Bool(true)) { - panic!("failed to queue message: {json}"); + let msg = MessageQueueRequest::Queue { meta: metadata, msg, sig }; + let mut first = true; + loop { + // Sleep, so we don't hammer re-attempts + if !first { + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + } + first = false; + + let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; + if !Self::send(&mut socket, msg.clone()).await { + continue; + } + if socket.read_u8().await.ok() != Some(1) { + continue; + } + break; } } pub async fn next(&self, from: Service) -> QueuedMessage { - loop { - let json = self.json_call("next", serde_json::json!([from, self.service])).await; - - // Convert from a Value to a type via reserialization - let msg: Option = serde_json::from_str( - &serde_json::to_string( - &json.get("result").expect("successful JSON RPC call didn't have result"), - ) - .unwrap(), - ) - .expect("next didn't return an Option"); - - // If there wasn't a message, check again in 1s - let Some(msg) = msg else { - tokio::time::sleep(core::time::Duration::from_secs(1)).await; + let msg = MessageQueueRequest::Next { from, to: self.service }; + let mut first = true; + 'outer: loop { + if !first { + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + continue; + } + first = false; + + let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; + + loop { + if !Self::send(&mut socket, msg.clone()).await { + continue 'outer; + } + let Ok(status) = socket.read_u8().await else { + continue 'outer; + }; + // If there wasn't a message, check again in 1s + if status == 0 { + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + continue; + } + assert_eq!(status, 1); + break; + } + + // Timeout after 5 seconds in case there's an issue with the length handling + let Ok(msg) = tokio::time::timeout(core::time::Duration::from_secs(5), async { + // Read the message length + let Ok(len) = socket.read_u32_le().await else { + return vec![]; + }; + let mut buf = vec![0; usize::try_from(len).unwrap()]; + // Read the message + let Ok(_) = socket.read_exact(&mut buf).await else { + return vec![]; + }; + buf + }) + .await + else { continue; }; + if msg.is_empty() { + continue; + } + + let msg: QueuedMessage = borsh::from_slice(msg.as_slice()).unwrap(); // Verify the message // Verify the sender is sane @@ -202,9 +194,22 @@ impl MessageQueue { ) .serialize(); - let json = self.json_call("ack", serde_json::json!([from, self.service, id, sig])).await; - if json.get("result") != Some(&serde_json::Value::Bool(true)) { - panic!("failed to ack message {id}: {json}"); + let msg = MessageQueueRequest::Ack { from, to: self.service, id, sig }; + let mut first = true; + loop { + if !first { + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + } + first = false; + + let Ok(mut socket) = TcpStream::connect(&self.url).await else { continue }; + if !Self::send(&mut socket, msg.clone()).await { + continue; + } + if socket.read_u8().await.ok() != Some(1) { + continue; + } + break; } } } diff --git a/message-queue/src/main.rs b/message-queue/src/main.rs index 36c2dcd9a..cbb8772eb 100644 --- a/message-queue/src/main.rs +++ b/message-queue/src/main.rs @@ -15,9 +15,12 @@ mod binaries { pub(crate) use serai_primitives::NetworkId; - use serai_db::{Get, DbTxn, Db as DbTrait}; + pub(crate) use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, + }; - pub(crate) use jsonrpsee::{RpcModule, server::ServerBuilder}; + use serai_db::{Get, DbTxn, Db as DbTrait}; pub(crate) use crate::messages::*; @@ -51,7 +54,7 @@ mod binaries { successful ordering by the time this call returns. */ pub(crate) fn queue_message( - db: &RwLock, + db: &mut Db, meta: Metadata, msg: Vec, sig: SchnorrSignature, @@ -78,7 +81,6 @@ mod binaries { fn intent_key(from: Service, to: Service, intent: &[u8]) -> Vec { key(b"intent_seen", borsh::to_vec(&(from, to, intent)).unwrap()) } - let mut db = db.write().unwrap(); let mut txn = db.txn(); let intent_key = intent_key(meta.from, meta.to, &meta.intent); if Get::get(&txn, &intent_key).is_some() { @@ -148,7 +150,7 @@ mod binaries { } #[cfg(feature = "binaries")] -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() { use binaries::*; @@ -225,48 +227,55 @@ async fn main() { register_service(Service::Coordinator, read_key("COORDINATOR_KEY").unwrap()); // Start server - let builder = ServerBuilder::new(); - // TODO: Add middleware to check some key is present in the header, making this an authed - // connection - // TODO: Set max request/response size // 5132 ^ ((b'M' << 8) | b'Q') - let listen_on: &[std::net::SocketAddr] = &["0.0.0.0:2287".parse().unwrap()]; - let server = builder.build(listen_on).await.unwrap(); - - let mut module = RpcModule::new(RwLock::new(db)); - module - .register_method("queue", |args, db| { - let args = args.parse::<(Metadata, Vec, Vec)>().unwrap(); - queue_message( - db, - args.0, - args.1, - SchnorrSignature::::read(&mut args.2.as_slice()).unwrap(), - ); - Ok(true) - }) - .unwrap(); - module - .register_method("next", |args, _| { - let (from, to) = args.parse::<(Service, Service)>().unwrap(); - Ok(get_next_message(from, to)) - }) - .unwrap(); - module - .register_method("ack", |args, _| { - let args = args.parse::<(Service, Service, u64, Vec)>().unwrap(); - ack_message( - args.0, - args.1, - args.2, - SchnorrSignature::::read(&mut args.3.as_slice()).unwrap(), - ); - Ok(true) - }) - .unwrap(); - - // Run until stopped, which it never will - server.start(module).unwrap().stopped().await; + let server = TcpListener::bind("0.0.0.0:2287").await.unwrap(); + + loop { + let (mut socket, _) = server.accept().await.unwrap(); + // TODO: Add a magic value with a key at the start of the connection to make this authed + let mut db = db.clone(); + tokio::spawn(async move { + loop { + let Ok(msg_len) = socket.read_u32_le().await else { break }; + let mut buf = vec![0; usize::try_from(msg_len).unwrap()]; + let Ok(_) = socket.read_exact(&mut buf).await else { break }; + let msg = borsh::from_slice(&buf).unwrap(); + + match msg { + MessageQueueRequest::Queue { meta, msg, sig } => { + queue_message( + &mut db, + meta, + msg, + SchnorrSignature::::read(&mut sig.as_slice()).unwrap(), + ); + let Ok(_) = socket.write_all(&[1]).await else { break }; + } + MessageQueueRequest::Next { from, to } => match get_next_message(from, to) { + Some(msg) => { + let Ok(_) = socket.write_all(&[1]).await else { break }; + let msg = borsh::to_vec(&msg).unwrap(); + let len = u32::try_from(msg.len()).unwrap(); + let Ok(_) = socket.write_all(&len.to_le_bytes()).await else { break }; + let Ok(_) = socket.write_all(&msg).await else { break }; + } + None => { + let Ok(_) = socket.write_all(&[0]).await else { break }; + } + }, + MessageQueueRequest::Ack { from, to, id, sig } => { + ack_message( + from, + to, + id, + SchnorrSignature::::read(&mut sig.as_slice()).unwrap(), + ); + let Ok(_) = socket.write_all(&[1]).await else { break }; + } + } + } + }); + } } #[cfg(not(feature = "binaries"))] diff --git a/message-queue/src/messages.rs b/message-queue/src/messages.rs index edaea72b5..65c18dd20 100644 --- a/message-queue/src/messages.rs +++ b/message-queue/src/messages.rs @@ -2,19 +2,16 @@ use transcript::{Transcript, RecommendedTranscript}; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; use borsh::{BorshSerialize, BorshDeserialize}; -use serde::{Serialize, Deserialize}; use serai_primitives::NetworkId; -#[derive( - Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize, Serialize, Deserialize, -)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] pub enum Service { Processor(NetworkId), Coordinator, } -#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub struct QueuedMessage { pub from: Service, pub id: u64, @@ -22,13 +19,20 @@ pub struct QueuedMessage { pub sig: Vec, } -#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] pub struct Metadata { pub from: Service, pub to: Service, pub intent: Vec, } +#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] +pub enum MessageQueueRequest { + Queue { meta: Metadata, msg: Vec, sig: Vec }, + Next { from: Service, to: Service }, + Ack { from: Service, to: Service, id: u64, sig: Vec }, +} + pub fn message_challenge( from: Service, from_key: ::G, diff --git a/message-queue/src/queue.rs b/message-queue/src/queue.rs index 257c83c27..46148d414 100644 --- a/message-queue/src/queue.rs +++ b/message-queue/src/queue.rs @@ -45,7 +45,7 @@ impl Queue { let msg_key = self.message_key(id); let msg_count_key = self.message_count_key(); - txn.put(msg_key, serde_json::to_vec(&msg).unwrap()); + txn.put(msg_key, borsh::to_vec(&msg).unwrap()); txn.put(msg_count_key, (id + 1).to_le_bytes()); id @@ -53,7 +53,7 @@ impl Queue { pub(crate) fn get_message(&self, id: u64) -> Option { let msg: Option = - self.0.get(self.message_key(id)).map(|bytes| serde_json::from_slice(&bytes).unwrap()); + self.0.get(self.message_key(id)).map(|bytes| borsh::from_slice(&bytes).unwrap()); if let Some(msg) = msg.as_ref() { assert_eq!(msg.id, id, "message stored at {id} has ID {}", msg.id); } diff --git a/tests/message-queue/src/lib.rs b/tests/message-queue/src/lib.rs index 1d011fc39..e2bfd3a75 100644 --- a/tests/message-queue/src/lib.rs +++ b/tests/message-queue/src/lib.rs @@ -70,92 +70,96 @@ fn basic_functionality() { let (coord_key, priv_keys, composition) = instance(); test.provide_container(composition); test.run(|ops| async move { - // Sleep for a second for the message-queue to boot - // It isn't an error to start immediately, it just silences an error - tokio::time::sleep(core::time::Duration::from_secs(1)).await; - - let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); - let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); - - // Queue some messages - let coordinator = - MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key)); - coordinator - .queue( - Metadata { - from: Service::Coordinator, - to: Service::Processor(NetworkId::Bitcoin), - intent: b"intent".to_vec(), - }, - b"Hello, World!".to_vec(), - ) - .await; - - // Queue this twice, which message-queue should de-duplicate - for _ in 0 .. 2 { + tokio::time::timeout(core::time::Duration::from_secs(60), async move { + // Sleep for a second for the message-queue to boot + // It isn't an error to start immediately, it just silences an error + tokio::time::sleep(core::time::Duration::from_secs(1)).await; + + let rpc = ops.handle("serai-dev-message-queue").host_port(2287).unwrap(); + let rpc = rpc.0.to_string() + ":" + &rpc.1.to_string(); + + // Queue some messages + let coordinator = + MessageQueue::new(Service::Coordinator, rpc.clone(), Zeroizing::new(coord_key)); coordinator .queue( Metadata { from: Service::Coordinator, to: Service::Processor(NetworkId::Bitcoin), - intent: b"intent 2".to_vec(), + intent: b"intent".to_vec(), }, - b"Hello, World, again!".to_vec(), + b"Hello, World!".to_vec(), ) .await; - } - - // Successfully get it - let bitcoin = MessageQueue::new( - Service::Processor(NetworkId::Bitcoin), - rpc.clone(), - Zeroizing::new(priv_keys[&NetworkId::Bitcoin]), - ); - let msg = bitcoin.next(Service::Coordinator).await; - assert_eq!(msg.from, Service::Coordinator); - assert_eq!(msg.id, 0); - assert_eq!(&msg.msg, b"Hello, World!"); - - // If we don't ack it, it should continue to be returned - assert_eq!(msg, bitcoin.next(Service::Coordinator).await); - - // Acknowledging it should yield the next message - bitcoin.ack(Service::Coordinator, 0).await; - - let next_msg = bitcoin.next(Service::Coordinator).await; - assert!(msg != next_msg); - assert_eq!(next_msg.from, Service::Coordinator); - assert_eq!(next_msg.id, 1); - assert_eq!(&next_msg.msg, b"Hello, World, again!"); - bitcoin.ack(Service::Coordinator, 1).await; - - // No further messages should be available - tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(Service::Coordinator)) - .await - .unwrap_err(); - - // Queueing to a distinct processor should work, with a unique ID - coordinator - .queue( - Metadata { - from: Service::Coordinator, - to: Service::Processor(NetworkId::Monero), - // Intents should be per-from-to, making this valid - intent: b"intent".to_vec(), - }, - b"Hello, World!".to_vec(), - ) - .await; - - let monero = MessageQueue::new( - Service::Processor(NetworkId::Monero), - rpc, - Zeroizing::new(priv_keys[&NetworkId::Monero]), - ); - assert_eq!(monero.next(Service::Coordinator).await.id, 0); - monero.ack(Service::Coordinator, 0).await; - tokio::time::timeout(core::time::Duration::from_secs(10), monero.next(Service::Coordinator)) - .await - .unwrap_err(); + + // Queue this twice, which message-queue should de-duplicate + for _ in 0 .. 2 { + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Bitcoin), + intent: b"intent 2".to_vec(), + }, + b"Hello, World, again!".to_vec(), + ) + .await; + } + + // Successfully get it + let bitcoin = MessageQueue::new( + Service::Processor(NetworkId::Bitcoin), + rpc.clone(), + Zeroizing::new(priv_keys[&NetworkId::Bitcoin]), + ); + let msg = bitcoin.next(Service::Coordinator).await; + assert_eq!(msg.from, Service::Coordinator); + assert_eq!(msg.id, 0); + assert_eq!(&msg.msg, b"Hello, World!"); + + // If we don't ack it, it should continue to be returned + assert_eq!(msg, bitcoin.next(Service::Coordinator).await); + + // Acknowledging it should yield the next message + bitcoin.ack(Service::Coordinator, 0).await; + + let next_msg = bitcoin.next(Service::Coordinator).await; + assert!(msg != next_msg); + assert_eq!(next_msg.from, Service::Coordinator); + assert_eq!(next_msg.id, 1); + assert_eq!(&next_msg.msg, b"Hello, World, again!"); + bitcoin.ack(Service::Coordinator, 1).await; + + // No further messages should be available + tokio::time::timeout(core::time::Duration::from_secs(10), bitcoin.next(Service::Coordinator)) + .await + .unwrap_err(); + + // Queueing to a distinct processor should work, with a unique ID + coordinator + .queue( + Metadata { + from: Service::Coordinator, + to: Service::Processor(NetworkId::Monero), + // Intents should be per-from-to, making this valid + intent: b"intent".to_vec(), + }, + b"Hello, World!".to_vec(), + ) + .await; + + let monero = MessageQueue::new( + Service::Processor(NetworkId::Monero), + rpc, + Zeroizing::new(priv_keys[&NetworkId::Monero]), + ); + assert_eq!(monero.next(Service::Coordinator).await.id, 0); + monero.ack(Service::Coordinator, 0).await; + tokio::time::timeout(core::time::Duration::from_secs(10), monero.next(Service::Coordinator)) + .await + .unwrap_err(); + }) + .await + .unwrap(); }); }