Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC client for fetching tx lists #23

Merged
merged 5 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,234 changes: 1,139 additions & 95 deletions Node/Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ edition = "2021"
[dependencies]
tokio = { version = "1.38", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = "0.3"
tracing-subscriber = "0.3"
jsonrpsee = { version = "0.23", features = ["http-client", "server"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
lazy_static = "1.4"
anyhow = "1.0.86"
7 changes: 5 additions & 2 deletions Node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ mod mev_boost;
mod node;
mod p2p_network;
mod taiko;
mod utils;

use anyhow::Error;
use tokio::sync::mpsc;

const MESSAGE_QUEUE_SIZE: usize = 100;

#[tokio::main]
async fn main() {
async fn main() -> Result<(), Error> {
init_logging();

let (avs_p2p_tx, avs_p2p_rx) = mpsc::channel(MESSAGE_QUEUE_SIZE);
Expand All @@ -18,7 +20,8 @@ async fn main() {
p2p.start();

let node = node::Node::new(node_rx, avs_p2p_tx);
node.start();
node.entrypoint().await?;
Ok(())
}

fn init_logging() {
Expand Down
75 changes: 35 additions & 40 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,67 @@
use crate::taiko::Taiko;
use anyhow::{anyhow as err, Context, Error};
use tokio::sync::mpsc::{Receiver, Sender};

pub struct Node {
taiko: Taiko,
node_rx: Option<Receiver<String>>,
node_rx: Receiver<String>,
avs_p2p_tx: Sender<String>,
}

impl Node {
pub fn new(node_rx: Receiver<String>, avs_p2p_tx: Sender<String>) -> Self {
let taiko = Taiko::new();
let taiko = Taiko::new("http://127.0.0.1:1234");
Self {
taiko,
node_rx: Some(node_rx),
node_rx,
avs_p2p_tx,
}
}

/// Consumes the Node and starts two loops:
/// one for handling incoming messages and one for the block preconfirmation
pub fn start(mut self) {
pub async fn entrypoint(mut self) -> Result<(), Error> {
tracing::info!("Starting node");
self.start_new_msg_receiver_thread();
self.main_block_preconfirmation_loop()
}

fn main_block_preconfirmation_loop(&self) {
loop {
self.taiko.get_pending_l2_tx_lists();
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p();
self.taiko.submit_new_l2_blocks();
if let Err(err) = self.step().await {
tracing::error!("Node processing step failed: {}", err);
}
}
}

//TODO: remove after implementation of above methods
std::thread::sleep(std::time::Duration::from_secs(1));
async fn step(&mut self) -> Result<(), Error> {
if let Ok(msg) = self.node_rx.try_recv() {
self.process_incoming_message(msg).await?;
} else {
self.main_block_preconfirmation_step().await?;
}
Ok(())
}

fn commit_to_the_tx_lists(&self) {
//TODO: implement
async fn main_block_preconfirmation_step(&self) -> Result<(), Error> {
self.taiko
.get_pending_l2_tx_lists()
.await
.context("Failed to get pending l2 tx lists")?;
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p().await?;
self.taiko.submit_new_l2_blocks();
Ok(())
}

fn send_preconfirmations_to_the_avs_p2p(&self) {
let avs_p2p_tx = self.avs_p2p_tx.clone();
tokio::spawn(async move {
if let Err(e) = avs_p2p_tx.send("Hello from node!".to_string()).await {
tracing::error!("Failed to send message to avs_p2p_tx: {}", e);
}
});
async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> {
tracing::debug!("Node received message: {}", msg);
Ok(())
}

fn start_new_msg_receiver_thread(&mut self) {
if let Some(node_rx) = self.node_rx.take() {
tokio::spawn(async move {
Self::handle_incoming_messages(node_rx).await;
});
} else {
tracing::error!("node_rx has already been moved");
}
fn commit_to_the_tx_lists(&self) {
//TODO: implement
}

async fn handle_incoming_messages(mut node_rx: Receiver<String>) {
loop {
tokio::select! {
Some(message) = node_rx.recv() => {
tracing::debug!("Node received message: {}", message);
}
}
}
async fn send_preconfirmations_to_the_avs_p2p(&self) -> Result<(), Error> {
self.avs_p2p_tx
.send("Hello from node!".to_string())
.await
.map_err(|e| err!("Failed to send message to avs_p2p_tx: {}", e))
}
}
49 changes: 45 additions & 4 deletions Node/src/taiko/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,56 @@
pub struct Taiko {}
use crate::utils::rpc_client::RpcClient;
use anyhow::Error;
use serde_json::Value;

pub struct Taiko {
rpc_client: RpcClient,
}

impl Taiko {
pub fn new() -> Self {
Self {}
pub fn new(url: &str) -> Self {
Self {
rpc_client: RpcClient::new(url),
}
}

pub fn get_pending_l2_tx_lists(&self) {
pub async fn get_pending_l2_tx_lists(&self) -> Result<Value, Error> {
tracing::debug!("Getting L2 tx lists");
self.rpc_client
.call_method("RPC.GetL2TxLists", vec![])
.await
}

pub fn submit_new_l2_blocks(&self) {
tracing::debug!("Submitting new L2 blocks");
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::utils::rpc_server::test::RpcServer;
use std::net::SocketAddr;

#[tokio::test]
async fn test_get_pending_l2_tx_lists() {
tracing_subscriber::fmt::init();

// Start the RPC server
let mut rpc_server = RpcServer::new();
let addr: SocketAddr = "127.0.0.1:3030".parse().unwrap();
rpc_server.start_test_responses(addr).await.unwrap();

let taiko = Taiko::new("http://127.0.0.1:3030");
let json = taiko.get_pending_l2_tx_lists().await.unwrap();

assert_eq!(json["result"]["TxLists"].as_array().unwrap().len(), 1);
assert_eq!(json["result"]["TxLists"][0].as_array().unwrap().len(), 3);
assert_eq!(json["result"]["TxLists"][0][0]["type"], "0x0");
assert_eq!(json["result"]["TxLists"][0][0]["hash"], "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd");
assert_eq!(json["result"]["TxLists"][0][1]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][1]["hash"], "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b");
assert_eq!(json["result"]["TxLists"][0][2]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][2]["hash"], "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193");
rpc_server.stop().await;
}
}
2 changes: 2 additions & 0 deletions Node/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod rpc_client;
pub mod rpc_server;
28 changes: 28 additions & 0 deletions Node/src/utils/rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use anyhow::Error;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use serde_json::Value;
use std::time::Duration;

pub struct RpcClient {
client: HttpClient,
}

impl RpcClient {
pub fn new(url: &str) -> Self {
// let client = HttpClientBuilder::default().build(url).unwrap();

let client = HttpClientBuilder::default()
.request_timeout(Duration::from_secs(1))
.build(url)
.unwrap();
RpcClient { client }
}

pub async fn call_method(&self, method: &str, params: Vec<Value>) -> Result<Value, Error> {
self.client
.request(method, params)
.await
.map_err(Error::from)
}
}
112 changes: 112 additions & 0 deletions Node/src/utils/rpc_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#[cfg(test)]
pub mod test {
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use lazy_static::lazy_static;
use serde_json::json;
use std::net::SocketAddr;
use tracing::info;

pub struct RpcServer {
handle: Option<ServerHandle>,
}

impl RpcServer {
pub fn new() -> Self {
RpcServer {
handle: None::<ServerHandle>,
}
}

#[cfg(test)]
pub async fn start_test_responses(
&mut self,
addr: SocketAddr,
) -> Result<(), Box<dyn std::error::Error>> {
let server = ServerBuilder::default().build(addr).await?;
let mut module = RpcModule::new(());

module.register_async_method("RPC.GetL2TxLists", |_, _, _| async {
TX_LISTS_RESPONSE.clone()
})?;

let handle = server.start(module);
tokio::spawn(handle.clone().stopped());

self.handle = Some(handle);
Ok(())
}

pub async fn stop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.stop().unwrap();
}
info!("Server stopped");
}
}

lazy_static! {
pub static ref TX_LISTS_RESPONSE: serde_json::Value = json!({
"result": {
"TxLists": [
[
{
"type": "0x0",
"chainId": "0x28c61",
"nonce": "0x1",
"to": "0xbfadd5365bb2890ad832038837115e60b71f7cbb",
"gas": "0x267ac",
"gasPrice": "0x5e76e0800",
"maxPriorityFeePerGas": null,
"maxFeePerGas": null,
"value": "0x0",
"input": "0x40d097c30000000000000000000000004cea2c7d358e313f5d0287c475f9ae943fe1a913",
"v": "0x518e6",
"r": "0xb22da5cdc4c091ec85d2dda9054aa497088e55bd9f0335f39864ae1c598dd35",
"s": "0x6eee1bcfe6a1855e89dd23d40942c90a036f273159b4c4fd217d58169493f055",
"hash": "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd"
},
{
"type": "0x2",
"chainId": "0x28c61",
"nonce": "0x3f",
"to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9",
"gas": "0x2c2c8",
"gasPrice": null,
"maxPriorityFeePerGas": "0x1",
"maxFeePerGas": "0x3",
"value": "0x5af3107a4000",
"input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0a400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000",
"accessList": [],
"v": "0x1",
"r": "0x36517a175a60d3026380318917976fa32c82e542850357a611af05d2212ab9a4",
"s": "0x32d89dce30d76287ddba907b0c662cd09dc30891b1c9c2ef644edfc53160b298",
"yParity": "0x1",
"hash": "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b"
},
{
"type": "0x2",
"chainId": "0x28c61",
"nonce": "0x39",
"to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9",
"gas": "0x2c2c8",
"gasPrice": null,
"maxPriorityFeePerGas": "0x1",
"maxFeePerGas": "0x3",
"value": "0x5af3107a4000",
"input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0d400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000",
"accessList": [],
"v": "0x0",
"r": "0xc779421d1ee81dbd3dfbfad5fd632b45303b4513ea1b8ac0bc647f5430cd97b9",
"s": "0x13cedef844bf5a954183182992ffbf9b8b23331de255157528be7da6614618b2",
"yParity": "0x0",
"hash": "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193"
}
]
]
},
"error": null,
"id": 1
});
}
}