From b69bd1a2717f4b9c99656c650dd7ea3a5bb0a5e4 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 10 Sep 2023 16:27:46 +0200 Subject: [PATCH] refactor(serve): unify serve bootstrapping procedure (#87) --- src/bin/dolos/daemon.rs | 17 +++++------------ src/bin/dolos/serve.rs | 28 +--------------------------- src/serve/mod.rs | 26 ++++++++++++++++++++++++++ src/serve/ouroboros/mod.rs | 7 +------ 4 files changed, 33 insertions(+), 45 deletions(-) diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index 48a6de63..0acb6d6b 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -4,7 +4,6 @@ use dolos::{ prelude::*, storage::{applydb::ApplyDB, rolldb::RollDB}, }; -use tracing::warn; #[derive(Debug, clap::Args)] pub struct Args {} @@ -39,19 +38,13 @@ pub async fn run( let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?; - let rolldb_copy = rolldb.clone(); + let server = tokio::spawn(dolos::serve::serve(config.serve, rolldb.clone())); - if let Some(grpc_config) = config.serve.grpc { - let server = tokio::spawn(dolos::serve::grpc::serve(grpc_config, rolldb_copy)); + dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy) + .unwrap() + .block(); - dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy) - .unwrap() - .block(); - - server.abort(); - } else { - warn!("no gRPC config found") - } + server.abort(); Ok(()) } diff --git a/src/bin/dolos/serve.rs b/src/bin/dolos/serve.rs index 8e11eac2..a2a44315 100644 --- a/src/bin/dolos/serve.rs +++ b/src/bin/dolos/serve.rs @@ -1,8 +1,6 @@ use dolos::prelude::*; use dolos::storage::rolldb::RollDB; -use futures_util::future::join_all; use std::path::Path; -use tracing::info; #[derive(Debug, clap::Args)] pub struct Args {} @@ -25,31 +23,7 @@ pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> { let db = RollDB::open(rolldb_path, config.rolldb.k_param.unwrap_or(1000)).map_err(Error::config)?; - // placeholder while we make follow-tip optional - let (_, from_sync) = gasket::messaging::tokio::broadcast_channel(100); - - let mut tasks = vec![]; - - if let Some(grpc_config) = config.serve.grpc { - tasks.push(tokio::spawn(dolos::serve::grpc::serve( - grpc_config, - db.clone(), - ))); - } else { - info!("no gRPC config found, not serving over gRPC") - } - - if let Some(ouroboros_config) = config.serve.ouroboros { - tasks.push(tokio::spawn(dolos::serve::ouroboros::serve( - ouroboros_config, - db, - from_sync.clone().try_into().unwrap(), - ))); - } else { - info!("no ouroboros config found, not serving over ouroboros") - } - - join_all(tasks).await; + dolos::serve::serve(config.serve, db).await?; Ok(()) } diff --git a/src/serve/mod.rs b/src/serve/mod.rs index 659cea68..a7c45dc5 100644 --- a/src/serve/mod.rs +++ b/src/serve/mod.rs @@ -1,4 +1,8 @@ +use futures_util::future::join_all; use serde::{Deserialize, Serialize}; +use tracing::info; + +use crate::{prelude::*, storage::rolldb::RollDB}; pub mod grpc; pub mod ouroboros; @@ -8,3 +12,25 @@ pub struct Config { pub grpc: Option, pub ouroboros: Option, } + +/// Serve remote requests +/// +/// Uses specified config to start listening for network connections on either gRPC, Ouroboros or both protocols. +pub async fn serve(config: Config, db: RollDB) -> Result<(), Error> { + let mut tasks = vec![]; + + if let Some(cfg) = config.grpc { + info!("found gRPC config"); + tasks.push(tokio::spawn(grpc::serve(cfg, db.clone()))); + } + + if let Some(cfg) = config.ouroboros { + info!("found Ouroboros config"); + tasks.push(tokio::spawn(ouroboros::serve(cfg, db.clone()))); + } + + // TODO: we should stop if any of the tasks breaks + join_all(tasks).await; + + Ok(()) +} diff --git a/src/serve/ouroboros/mod.rs b/src/serve/ouroboros/mod.rs index abbe9095..b3b4c6e7 100644 --- a/src/serve/ouroboros/mod.rs +++ b/src/serve/ouroboros/mod.rs @@ -3,7 +3,6 @@ use pallas::network::miniprotocols::blockfetch::BlockRequest; use pallas::network::miniprotocols::Point; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; -use tokio::sync::broadcast::Receiver; use tracing::{error, info, warn}; @@ -18,11 +17,7 @@ pub struct Config { magic: u64, } -pub async fn serve( - config: Config, - db: RollDB, - _sync_events: Receiver>, -) -> Result<(), Error> { +pub async fn serve(config: Config, db: RollDB) -> Result<(), Error> { if let Some(addr) = config.listen_address { info!("serving via N2N Ouroboros on address: {addr}");