From 1d2beb3ee4e5ce260cdcd9dc77151d70eadc26a3 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 22 May 2024 18:50:11 -0400 Subject: [PATCH] Ethereum relayer server Causes send test to pass for the processor. --- Cargo.lock | 11 ++ Cargo.toml | 3 + coins/ethereum/relayer/Cargo.toml | 30 +++++ coins/ethereum/relayer/LICENSE | 15 +++ coins/ethereum/relayer/README.md | 4 + coins/ethereum/relayer/src/main.rs | 100 +++++++++++++++ deny.toml | 1 + .../dev/coins/ethereum-relayer/.folder | 11 ++ orchestration/src/ethereum_relayer.rs | 39 ++++++ orchestration/src/main.rs | 16 +++ orchestration/src/processor.rs | 24 ++-- .../testnet/coins/ethereum-relayer/.folder | 11 ++ processor/src/main.rs | 10 +- processor/src/networks/ethereum.rs | 43 ++++++- processor/src/tests/literal/mod.rs | 2 +- tests/docker/src/lib.rs | 3 +- tests/full-stack/src/tests/mod.rs | 8 +- tests/processor/src/lib.rs | 121 ++++++++++++++---- tests/processor/src/tests/batch.rs | 2 +- tests/processor/src/tests/send.rs | 6 +- 20 files changed, 416 insertions(+), 44 deletions(-) create mode 100644 coins/ethereum/relayer/Cargo.toml create mode 100644 coins/ethereum/relayer/LICENSE create mode 100644 coins/ethereum/relayer/README.md create mode 100644 coins/ethereum/relayer/src/main.rs create mode 100644 orchestration/dev/coins/ethereum-relayer/.folder create mode 100644 orchestration/src/ethereum_relayer.rs create mode 100644 orchestration/testnet/coins/ethereum-relayer/.folder diff --git a/Cargo.lock b/Cargo.lock index 4b7997cd4..ba0ab7659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7884,6 +7884,17 @@ dependencies = [ name = "serai-env" version = "0.1.0" +[[package]] +name = "serai-ethereum-relayer" +version = "0.1.0" +dependencies = [ + "env_logger", + "log", + "serai-db", + "serai-env", + "tokio", +] + [[package]] name = "serai-full-stack-tests" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index d608cff86..ce0062f01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,8 +38,11 @@ members = [ "crypto/schnorrkel", "coins/bitcoin", + "coins/ethereum/alloy-simple-request-transport", "coins/ethereum", + "coins/ethereum/relayer", + "coins/monero/generators", "coins/monero", diff --git a/coins/ethereum/relayer/Cargo.toml b/coins/ethereum/relayer/Cargo.toml new file mode 100644 index 000000000..22c200760 --- /dev/null +++ b/coins/ethereum/relayer/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "serai-ethereum-relayer" +version = "0.1.0" +description = "A relayer for Serai's Ethereum transactions" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/coins/ethereum/relayer" +authors = ["Luke Parker "] +keywords = [] +edition = "2021" +publish = false + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true + +[dependencies] +log = { version = "0.4", default-features = false, features = ["std"] } +env_logger = { version = "0.10", default-features = false, features = ["humantime"] } + +tokio = { version = "1", default-features = false, features = ["rt", "time", "io-util", "net", "macros"] } + +serai-env = { path = "../../../common/env" } +serai-db = { path = "../../../common/db" } + +[features] +parity-db = ["serai-db/parity-db"] +rocksdb = ["serai-db/rocksdb"] diff --git a/coins/ethereum/relayer/LICENSE b/coins/ethereum/relayer/LICENSE new file mode 100644 index 000000000..26d57cbb3 --- /dev/null +++ b/coins/ethereum/relayer/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2023-2024 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/coins/ethereum/relayer/README.md b/coins/ethereum/relayer/README.md new file mode 100644 index 000000000..beed4b724 --- /dev/null +++ b/coins/ethereum/relayer/README.md @@ -0,0 +1,4 @@ +# Ethereum Transaction Relayer + +This server collects Ethereum router commands to be published, offering an RPC +to fetch them. diff --git a/coins/ethereum/relayer/src/main.rs b/coins/ethereum/relayer/src/main.rs new file mode 100644 index 000000000..545930040 --- /dev/null +++ b/coins/ethereum/relayer/src/main.rs @@ -0,0 +1,100 @@ +pub(crate) use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpListener, +}; + +use serai_db::{Get, DbTxn, Db as DbTrait}; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + // Override the panic handler with one which will panic if any tokio task panics + { + let existing = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |panic| { + existing(panic); + const MSG: &str = "exiting the process due to a task panicking"; + println!("{MSG}"); + log::error!("{MSG}"); + std::process::exit(1); + })); + } + + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string())); + } + env_logger::init(); + + log::info!("Starting Ethereum relayer server..."); + + // Open the DB + #[allow(unused_variables, unreachable_code)] + let db = { + #[cfg(all(feature = "parity-db", feature = "rocksdb"))] + panic!("built with parity-db and rocksdb"); + #[cfg(all(feature = "parity-db", not(feature = "rocksdb")))] + let db = + serai_db::new_parity_db(&serai_env::var("DB_PATH").expect("path to DB wasn't specified")); + #[cfg(feature = "rocksdb")] + let db = + serai_db::new_rocksdb(&serai_env::var("DB_PATH").expect("path to DB wasn't specified")); + db + }; + + // Start command recipience server + // This should not be publicly exposed + // TODO: Add auth + tokio::spawn({ + let db = db.clone(); + async move { + // 5132 ^ ((b'E' << 8) | b'R') + let server = TcpListener::bind("0.0.0.0:20830").await.unwrap(); + loop { + let (mut socket, _) = server.accept().await.unwrap(); + let db = db.clone(); + tokio::spawn(async move { + let mut db = db.clone(); + loop { + let Ok(msg_len) = socket.read_u32_le().await else { break }; + let mut buf = vec![0; usize::try_from(msg_len).unwrap()]; + let Ok(_) = socket.read_exact(&mut buf).await else { break }; + + if buf.len() < 5 { + break; + } + let nonce = u32::from_le_bytes(buf[.. 4].try_into().unwrap()); + let mut txn = db.txn(); + txn.put(nonce.to_le_bytes(), &buf[4 ..]); + txn.commit(); + + let Ok(()) = socket.write_all(&[1]).await else { break }; + + log::info!("received signed command #{nonce}"); + } + }); + } + } + }); + + // Start command fetch server + // 5132 ^ ((b'E' << 8) | b'R') + 1 + let server = TcpListener::bind("0.0.0.0:20831").await.unwrap(); + loop { + let (mut socket, _) = server.accept().await.unwrap(); + let db = db.clone(); + tokio::spawn(async move { + let db = db.clone(); + loop { + // Nonce to get the router comamnd for + let mut buf = vec![0; 4]; + let Ok(_) = socket.read_exact(&mut buf).await else { break }; + + let command = db.get(&buf[.. 4]).unwrap_or(vec![]); + let Ok(()) = socket.write_all(&u32::try_from(command.len()).unwrap().to_le_bytes()).await + else { + break; + }; + let Ok(()) = socket.write_all(&command).await else { break }; + } + }); + } +} diff --git a/deny.toml b/deny.toml index d6972d5e9..a3e0e3d93 100644 --- a/deny.toml +++ b/deny.toml @@ -44,6 +44,7 @@ exceptions = [ { allow = ["AGPL-3.0"], name = "serai-env" }, { allow = ["AGPL-3.0"], name = "ethereum-serai" }, + { allow = ["AGPL-3.0"], name = "serai-ethereum-relayer" }, { allow = ["AGPL-3.0"], name = "serai-message-queue" }, diff --git a/orchestration/dev/coins/ethereum-relayer/.folder b/orchestration/dev/coins/ethereum-relayer/.folder new file mode 100644 index 000000000..675d44382 --- /dev/null +++ b/orchestration/dev/coins/ethereum-relayer/.folder @@ -0,0 +1,11 @@ +#!/bin/sh + +RPC_USER="${RPC_USER:=serai}" +RPC_PASS="${RPC_PASS:=seraidex}" + +# Run Monero +monerod --non-interactive --regtest --offline --fixed-difficulty=1 \ + --no-zmq --rpc-bind-ip=0.0.0.0 --rpc-bind-port=18081 --confirm-external-bind \ + --rpc-access-control-origins "*" --disable-rpc-ban \ + --rpc-login=$RPC_USER:$RPC_PASS \ + $1 diff --git a/orchestration/src/ethereum_relayer.rs b/orchestration/src/ethereum_relayer.rs new file mode 100644 index 000000000..523d3c62c --- /dev/null +++ b/orchestration/src/ethereum_relayer.rs @@ -0,0 +1,39 @@ +use std::path::Path; + +use crate::{Network, Os, mimalloc, os, build_serai_service, write_dockerfile}; + +pub fn ethereum_relayer(orchestration_path: &Path, network: Network) { + let setup = mimalloc(Os::Debian).to_string() + + &build_serai_service("", network.release(), network.db(), "serai-ethereum-relayer"); + + let env_vars = [ + ("DB_PATH", "/volume/ethereum-relayer-db".to_string()), + ("RUST_LOG", "info,serai_ethereum_relayer=trace".to_string()), + ]; + let mut env_vars_str = String::new(); + for (env_var, value) in env_vars { + env_vars_str += &format!(r#"{env_var}=${{{env_var}:="{value}"}} "#); + } + + let run_ethereum_relayer = format!( + r#" +# Copy the relayer server binary and relevant license +COPY --from=builder --chown=ethereumrelayer /serai/bin/serai-ethereum-relayer /bin + +# Run ethereum-relayer +EXPOSE 20830 +EXPOSE 20831 +CMD {env_vars_str} serai-ethereum-relayer +"# + ); + + let run = os(Os::Debian, "", "ethereumrelayer") + &run_ethereum_relayer; + let res = setup + &run; + + let mut ethereum_relayer_path = orchestration_path.to_path_buf(); + ethereum_relayer_path.push("coins"); + ethereum_relayer_path.push("ethereum-relayer"); + ethereum_relayer_path.push("Dockerfile"); + + write_dockerfile(ethereum_relayer_path, &res); +} diff --git a/orchestration/src/main.rs b/orchestration/src/main.rs index 1925b94c5..f1f76957a 100644 --- a/orchestration/src/main.rs +++ b/orchestration/src/main.rs @@ -32,6 +32,9 @@ use mimalloc::mimalloc; mod coins; use coins::*; +mod ethereum_relayer; +use ethereum_relayer::ethereum_relayer; + mod message_queue; use message_queue::message_queue; @@ -280,6 +283,8 @@ fn dockerfiles(network: Network) { let ethereum_key = infrastructure_keys.remove("ethereum").unwrap(); let monero_key = infrastructure_keys.remove("monero").unwrap(); + ethereum_relayer(&orchestration_path, network); + message_queue( &orchestration_path, network, @@ -363,6 +368,7 @@ fn start(network: Network, services: HashSet) { let name = match service.as_ref() { "serai" => "serai", "coordinator" => "coordinator", + "ethereum-relayer" => "ethereum-relayer", "message-queue" => "message-queue", "bitcoin-daemon" => "bitcoin", "bitcoin-processor" => "bitcoin-processor", @@ -495,6 +501,10 @@ fn start(network: Network, services: HashSet) { command } } + "ethereum-relayer" => { + // Expose the router command fetch server + command.arg("-p").arg("20831:20831") + } "monero" => { // Expose the RPC for tests if network == Network::Dev { @@ -561,6 +571,9 @@ Commands: - `message-queue` - `bitcoin-daemon` - `bitcoin-processor` + - `ethereum-daemon` + - `ethereum-processor` + - `ethereum-relayer` - `monero-daemon` - `monero-processor` - `monero-wallet-rpc` (if "dev") @@ -593,6 +606,9 @@ Commands: Some("start") => { let mut services = HashSet::new(); for arg in args { + if arg == "ethereum-processor" { + services.insert("ethereum-relayer".to_string()); + } if let Some(ext_network) = arg.strip_suffix("-processor") { services.insert(ext_network.to_string() + "-daemon"); } diff --git a/orchestration/src/processor.rs b/orchestration/src/processor.rs index 85f7ec5fc..cefe6455b 100644 --- a/orchestration/src/processor.rs +++ b/orchestration/src/processor.rs @@ -41,24 +41,32 @@ RUN apt install -y ca-certificates const RPC_PASS: &str = "seraidex"; // TODO: Isolate networks let hostname = format!("serai-{}-{coin}", network.label()); - let port = match coin { - "bitcoin" => 8332, - "ethereum" => 8545, - "monero" => 18081, - _ => panic!("unrecognized external network"), - }; + let port = format!( + "{}", + match coin { + "bitcoin" => 8332, + "ethereum" => 8545, + "monero" => 18081, + _ => panic!("unrecognized external network"), + } + ); - let env_vars = [ + let mut env_vars = vec![ ("MESSAGE_QUEUE_RPC", format!("serai-{}-message-queue", network.label())), ("MESSAGE_QUEUE_KEY", hex::encode(coin_key.to_repr())), ("ENTROPY", hex::encode(entropy.as_ref())), ("NETWORK", coin.to_string()), ("NETWORK_RPC_LOGIN", format!("{RPC_USER}:{RPC_PASS}")), ("NETWORK_RPC_HOSTNAME", hostname), - ("NETWORK_RPC_PORT", format!("{port}")), + ("NETWORK_RPC_PORT", port), ("DB_PATH", "/volume/processor-db".to_string()), ("RUST_LOG", "info,serai_processor=debug".to_string()), ]; + if coin == "ethereum" { + env_vars + .push(("ETHEREUM_RELAYER_HOSTNAME", format!("serai-{}-ethereum-relayer", network.label()))); + env_vars.push(("ETHEREUM_RELAYER_PORT", "20830".to_string())); + } let mut env_vars_str = String::new(); for (env_var, value) in env_vars { env_vars_str += &format!(r#"{env_var}=${{{env_var}:="{value}"}} "#); diff --git a/orchestration/testnet/coins/ethereum-relayer/.folder b/orchestration/testnet/coins/ethereum-relayer/.folder new file mode 100644 index 000000000..675d44382 --- /dev/null +++ b/orchestration/testnet/coins/ethereum-relayer/.folder @@ -0,0 +1,11 @@ +#!/bin/sh + +RPC_USER="${RPC_USER:=serai}" +RPC_PASS="${RPC_PASS:=seraidex}" + +# Run Monero +monerod --non-interactive --regtest --offline --fixed-difficulty=1 \ + --no-zmq --rpc-bind-ip=0.0.0.0 --rpc-bind-port=18081 --confirm-external-bind \ + --rpc-access-control-origins "*" --disable-rpc-ban \ + --rpc-login=$RPC_USER:$RPC_PASS \ + $1 diff --git a/processor/src/main.rs b/processor/src/main.rs index 1a50effa6..e0d97aa68 100644 --- a/processor/src/main.rs +++ b/processor/src/main.rs @@ -748,7 +748,15 @@ async fn main() { #[cfg(feature = "bitcoin")] NetworkId::Bitcoin => run(db, Bitcoin::new(url).await, coordinator).await, #[cfg(feature = "ethereum")] - NetworkId::Ethereum => run(db.clone(), Ethereum::new(db, url).await, coordinator).await, + NetworkId::Ethereum => { + let relayer_hostname = env::var("ETHEREUM_RELAYER_HOSTNAME") + .expect("ethereum relayer hostname wasn't specified") + .to_string(); + let relayer_port = + env::var("ETHEREUM_RELAYER_PORT").expect("ethereum relayer port wasn't specified"); + let relayer_url = relayer_hostname + ":" + &relayer_port; + run(db.clone(), Ethereum::new(db, url, relayer_url).await, coordinator).await + } #[cfg(feature = "monero")] NetworkId::Monero => run(db, Monero::new(url).await, coordinator).await, _ => panic!("spawning a processor for an unsupported network"), diff --git a/processor/src/networks/ethereum.rs b/processor/src/networks/ethereum.rs index 802ea68b2..b1965bae7 100644 --- a/processor/src/networks/ethereum.rs +++ b/processor/src/networks/ethereum.rs @@ -31,6 +31,11 @@ use tokio::{ time::sleep, sync::{RwLock, RwLockReadGuard}, }; +#[cfg(not(test))] +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, +}; use serai_client::{ primitives::{Coin, Amount, Balance, NetworkId}, @@ -290,6 +295,8 @@ pub struct Ethereum { // address. Accordingly, all methods present are consistent to a Serai chain with a finalized // first key (regardless of local state), and this is safe. db: D, + #[cfg_attr(test, allow(unused))] + relayer_url: String, provider: Arc>, deployer: Deployer, router: Arc>>, @@ -309,9 +316,9 @@ impl fmt::Debug for Ethereum { } } impl Ethereum { - pub async fn new(db: D, url: String) -> Self { + pub async fn new(db: D, daemon_url: String, relayer_url: String) -> Self { let provider = Arc::new(RootProvider::new( - ClientBuilder::default().transport(SimpleRequest::new(url), true), + ClientBuilder::default().transport(SimpleRequest::new(daemon_url), true), )); let mut deployer = Deployer::new(provider.clone()).await; @@ -322,7 +329,9 @@ impl Ethereum { } let deployer = deployer.unwrap().unwrap(); - Ethereum { db, provider, deployer, router: Arc::new(RwLock::new(None)) } + dbg!(&relayer_url); + dbg!(relayer_url.len()); + Ethereum { db, relayer_url, provider, deployer, router: Arc::new(RwLock::new(None)) } } // Obtain a reference to the Router, sleeping until it's deployed if it hasn't already been. @@ -714,8 +723,32 @@ impl Network for Ethereum { // Publish this to the dedicated TX server for a solver to actually publish #[cfg(not(test))] { - let _ = completion; - todo!("TODO"); + let mut msg = vec![]; + match completion.command() { + RouterCommand::UpdateSeraiKey { nonce, .. } | RouterCommand::Execute { nonce, .. } => { + msg.extend(&u32::try_from(nonce).unwrap().to_le_bytes()); + } + } + completion.write(&mut msg).unwrap(); + + let Ok(mut socket) = TcpStream::connect(&self.relayer_url).await else { + log::warn!("couldn't connect to the relayer server"); + Err(NetworkError::ConnectionError)? + }; + let Ok(()) = socket.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await else { + log::warn!("couldn't send the message's len to the relayer server"); + Err(NetworkError::ConnectionError)? + }; + let Ok(()) = socket.write_all(&msg).await else { + log::warn!("couldn't write the message to the relayer server"); + Err(NetworkError::ConnectionError)? + }; + if socket.read_u8().await.ok() != Some(1) { + log::warn!("didn't get the ack from the relayer server"); + Err(NetworkError::ConnectionError)?; + } + + Ok(()) } // Publish this using a dummy account we fund with magic RPC commands diff --git a/processor/src/tests/literal/mod.rs b/processor/src/tests/literal/mod.rs index 15c27b8ca..d45649d59 100644 --- a/processor/src/tests/literal/mod.rs +++ b/processor/src/tests/literal/mod.rs @@ -423,7 +423,7 @@ mod ethereum { }); } - Ethereum::new(db, url.clone()).await + Ethereum::new(db, url.clone(), String::new()).await }) } } diff --git a/tests/docker/src/lib.rs b/tests/docker/src/lib.rs index 3493d5029..986a1793c 100644 --- a/tests/docker/src/lib.rs +++ b/tests/docker/src/lib.rs @@ -85,7 +85,7 @@ pub fn build(name: String) { } let mut dockerfile_path = orchestration_path.clone(); - if HashSet::from(["bitcoin", "ethereum", "monero"]).contains(name.as_str()) { + if HashSet::from(["bitcoin", "ethereum", "ethereum-relayer", "monero"]).contains(name.as_str()) { dockerfile_path = dockerfile_path.join("coins"); } if name.contains("-processor") { @@ -125,6 +125,7 @@ pub fn build(name: String) { let meta = |path: PathBuf| (path.clone(), fs::metadata(path)); let mut metadatas = match name.as_str() { "bitcoin" | "ethereum" | "monero" => vec![], + "ethereum-relayer" => vec![meta(repo_path.join("common")), meta(repo_path.join("coins"))], "message-queue" => vec![ meta(repo_path.join("common")), meta(repo_path.join("crypto")), diff --git a/tests/full-stack/src/tests/mod.rs b/tests/full-stack/src/tests/mod.rs index 1fae8c481..7aaad8321 100644 --- a/tests/full-stack/src/tests/mod.rs +++ b/tests/full-stack/src/tests/mod.rs @@ -57,12 +57,16 @@ pub(crate) async fn new_test(test_body: impl TestBody) { let (coord_key, message_queue_keys, message_queue_composition) = message_queue_instance(); let (bitcoin_composition, bitcoin_port) = network_instance(NetworkId::Bitcoin); - let bitcoin_processor_composition = + let mut bitcoin_processor_composition = processor_instance(NetworkId::Bitcoin, bitcoin_port, message_queue_keys[&NetworkId::Bitcoin]); + assert_eq!(bitcoin_processor_composition.len(), 1); + let bitcoin_processor_composition = bitcoin_processor_composition.swap_remove(0); let (monero_composition, monero_port) = network_instance(NetworkId::Monero); - let monero_processor_composition = + let mut monero_processor_composition = processor_instance(NetworkId::Monero, monero_port, message_queue_keys[&NetworkId::Monero]); + assert_eq!(monero_processor_composition.len(), 1); + let monero_processor_composition = monero_processor_composition.swap_remove(0); let coordinator_composition = coordinator_instance(name, coord_key); let serai_composition = serai_composition(name); diff --git a/tests/processor/src/lib.rs b/tests/processor/src/lib.rs index 6e78e3971..1964e641c 100644 --- a/tests/processor/src/lib.rs +++ b/tests/processor/src/lib.rs @@ -28,7 +28,7 @@ pub fn processor_instance( network: NetworkId, port: u32, message_queue_key: ::F, -) -> TestBodySpecification { +) -> Vec { let mut entropy = [0; 32]; OsRng.fill_bytes(&mut entropy); @@ -41,7 +41,7 @@ pub fn processor_instance( let image = format!("{network_str}-processor"); serai_docker_tests::build(image.clone()); - TestBodySpecification::with_image( + let mut res = vec![TestBodySpecification::with_image( Image::with_repository(format!("serai-dev-{image}")).pull_policy(PullPolicy::Never), ) .replace_env( @@ -55,10 +55,30 @@ pub fn processor_instance( ("RUST_LOG".to_string(), "serai_processor=trace,".to_string()), ] .into(), - ) + )]; + + if network == NetworkId::Ethereum { + serai_docker_tests::build("ethereum-relayer".to_string()); + res.push( + TestBodySpecification::with_image( + Image::with_repository("serai-dev-ethereum-relayer".to_string()) + .pull_policy(PullPolicy::Never), + ) + .replace_env( + [ + ("DB_PATH".to_string(), "./ethereum-relayer-db".to_string()), + ("RUST_LOG".to_string(), "serai_ethereum_relayer=trace,".to_string()), + ] + .into(), + ) + .set_publish_all_ports(true), + ); + } + + res } -pub type Handles = (String, String, String); +pub type Handles = (String, String, String, String); pub fn processor_stack( network: NetworkId, network_hostname_override: Option, @@ -68,7 +88,7 @@ pub fn processor_stack( let (coord_key, message_queue_keys, message_queue_composition) = serai_message_queue_tests::instance(); - let processor_composition = + let mut processor_compositions = processor_instance(network, network_rpc_port, message_queue_keys[&network]); // Give every item in this stack a unique ID @@ -84,7 +104,7 @@ pub fn processor_stack( let mut compositions = vec![]; let mut handles = vec![]; for (name, composition) in [ - ( + Some(( match network { NetworkId::Serai => unreachable!(), NetworkId::Bitcoin => "bitcoin", @@ -92,10 +112,14 @@ pub fn processor_stack( NetworkId::Monero => "monero", }, network_composition, - ), - ("message_queue", message_queue_composition), - ("processor", processor_composition), - ] { + )), + Some(("message_queue", message_queue_composition)), + Some(("processor", processor_compositions.remove(0))), + processor_compositions.pop().map(|composition| ("relayer", composition)), + ] + .into_iter() + .flatten() + { let handle = format!("processor-{name}-{unique_id}"); compositions.push( composition.set_start_policy(StartPolicy::Strict).set_handle(handle.clone()).set_log_options( @@ -113,14 +137,27 @@ pub fn processor_stack( handles.push(handle); } - let processor_composition = compositions.last_mut().unwrap(); + let processor_composition = compositions.get_mut(2).unwrap(); processor_composition.inject_container_name( network_hostname_override.unwrap_or_else(|| handles[0].clone()), "NETWORK_RPC_HOSTNAME", ); + if let Some(hostname) = handles.get(3) { + processor_composition.inject_container_name(hostname, "ETHEREUM_RELAYER_HOSTNAME"); + processor_composition.modify_env("ETHEREUM_RELAYER_PORT", "20830"); + } processor_composition.inject_container_name(handles[1].clone(), "MESSAGE_QUEUE_RPC"); - ((handles[0].clone(), handles[1].clone(), handles[2].clone()), coord_key, compositions) + ( + ( + handles[0].clone(), + handles[1].clone(), + handles[2].clone(), + handles.get(3).cloned().unwrap_or(String::new()), + ), + coord_key, + compositions, + ) } #[derive(serde::Deserialize, Debug)] @@ -134,6 +171,7 @@ pub struct Coordinator { message_queue_handle: String, #[allow(unused)] processor_handle: String, + relayer_handle: String, next_send_id: u64, next_recv_id: u64, @@ -144,7 +182,7 @@ impl Coordinator { pub fn new( network: NetworkId, ops: &DockerOperations, - handles: (String, String, String), + handles: Handles, coord_key: ::F, ) -> Coordinator { let rpc = ops.handle(&handles.1).host_port(2287).unwrap(); @@ -156,6 +194,7 @@ impl Coordinator { network_handle: handles.0, message_queue_handle: handles.1, processor_handle: handles.2, + relayer_handle: handles.3, next_send_id: 0, next_recv_id: 0, @@ -508,7 +547,7 @@ impl Coordinator { } } - pub async fn publish_transacton(&self, ops: &DockerOperations, tx: &[u8]) { + pub async fn publish_transaction(&self, ops: &DockerOperations, tx: &[u8]) { let rpc_url = network_rpc(self.network, ops, &self.network_handle); match self.network { NetworkId::Bitcoin => { @@ -545,6 +584,14 @@ impl Coordinator { } } + pub async fn publish_eventuality_completion(&self, ops: &DockerOperations, tx: &[u8]) { + match self.network { + NetworkId::Bitcoin | NetworkId::Monero => self.publish_transaction(ops, tx).await, + NetworkId::Ethereum => (), + NetworkId::Serai => panic!("processor tests broadcasting block to Serai"), + } + } + pub async fn get_published_transaction( &self, ops: &DockerOperations, @@ -575,14 +622,7 @@ impl Coordinator { } } NetworkId::Ethereum => { - use ethereum_serai::alloy::{ - consensus::{TxLegacy, Signed}, - simple_request_transport::SimpleRequest, - rpc_client::ClientBuilder, - provider::{Provider, RootProvider}, - network::Ethereum, - }; - + /* let provider = RootProvider::<_, Ethereum>::new( ClientBuilder::default().transport(SimpleRequest::new(rpc_url.clone()), true), ); @@ -593,6 +633,43 @@ impl Coordinator { let mut bytes = vec![]; tx.encode_with_signature_fields(&sig, &mut bytes); Some(bytes) + */ + + // This is being passed a signature. We need to check the relayer has a TX with this + // signature + + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + }; + + let (ip, port) = ops.handle(&self.relayer_handle).host_port(20831).unwrap(); + let relayer_url = format!("{ip}:{port}"); + + let mut socket = TcpStream::connect(&relayer_url).await.unwrap(); + // Iterate over every published command + for i in 1 .. u32::MAX { + socket.write_all(&i.to_le_bytes()).await.unwrap(); + + let mut recvd_len = [0; 4]; + socket.read_exact(&mut recvd_len).await.unwrap(); + if recvd_len == [0; 4] { + break; + } + + let mut msg = vec![0; usize::try_from(u32::from_le_bytes(recvd_len)).unwrap()]; + socket.read_exact(&mut msg).await.unwrap(); + for start_pos in 0 .. msg.len() { + if (start_pos + tx.len()) > msg.len() { + break; + } + if &msg[start_pos .. (start_pos + tx.len())] == tx { + return Some(msg); + } + } + } + + None } NetworkId::Monero => { use monero_serai::rpc::HttpRpc; diff --git a/tests/processor/src/tests/batch.rs b/tests/processor/src/tests/batch.rs index 5397ad2d0..6170270ac 100644 --- a/tests/processor/src/tests/batch.rs +++ b/tests/processor/src/tests/batch.rs @@ -229,7 +229,7 @@ fn batch_test() { let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, instruction.clone()).await; for coordinator in &mut coordinators { - coordinator.publish_transacton(&ops, &tx).await; + coordinator.publish_transaction(&ops, &tx).await; } // Put the TX past the confirmation depth diff --git a/tests/processor/src/tests/send.rs b/tests/processor/src/tests/send.rs index b764f3068..62e80c095 100644 --- a/tests/processor/src/tests/send.rs +++ b/tests/processor/src/tests/send.rs @@ -147,7 +147,7 @@ pub(crate) async fn sign_tx( #[test] fn send_test() { - for network in [NetworkId::Bitcoin, /* TODO NetworkId::Ethereum, */ NetworkId::Monero] { + for network in [NetworkId::Bitcoin, NetworkId::Ethereum, NetworkId::Monero] { let (coordinators, test) = new_test(network); test.run(|ops| async move { @@ -182,7 +182,7 @@ fn send_test() { let (tx, balance_sent) = wallet.send_to_address(&ops, &key_pair.1, Some(instruction.clone())).await; for coordinator in &mut coordinators { - coordinator.publish_transacton(&ops, &tx).await; + coordinator.publish_transaction(&ops, &tx).await; } // Put the TX past the confirmation depth @@ -295,7 +295,7 @@ fn send_test() { .unwrap(); for (i, coordinator) in coordinators.iter_mut().enumerate() { if !participating.contains(&i) { - coordinator.publish_transacton(&ops, &tx).await; + coordinator.publish_eventuality_completion(&ops, &tx).await; // Tell them of it as a completion of the relevant signing nodes coordinator .send_message(messages::sign::CoordinatorMessage::Completed {