Skip to content

Commit

Permalink
refactor(serve): unify serve bootstrapping procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Sep 10, 2023
1 parent 3a629d1 commit da4c796
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 45 deletions.
17 changes: 5 additions & 12 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use dolos::{
prelude::*,
storage::{applydb::ApplyDB, rolldb::RollDB},
};
use tracing::warn;

#[derive(Debug, clap::Args)]
pub struct Args {}
Expand Down Expand Up @@ -39,19 +38,13 @@ pub async fn run(

let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?;

let rolldb_copy = rolldb.clone();
let server = tokio::spawn(dolos::serve::serve(config.serve, rolldb.clone()));

if let Some(grpc_config) = config.serve.grpc {
let server = tokio::spawn(dolos::serve::grpc::serve(grpc_config, rolldb_copy));
dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy)
.unwrap()
.block();

dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy)
.unwrap()
.block();

server.abort();
} else {
warn!("no gRPC config found")
}
server.abort();

Ok(())
}
28 changes: 1 addition & 27 deletions src/bin/dolos/serve.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use dolos::prelude::*;
use dolos::storage::rolldb::RollDB;
use futures_util::future::join_all;
use std::path::Path;
use tracing::info;

#[derive(Debug, clap::Args)]
pub struct Args {}
Expand All @@ -25,31 +23,7 @@ pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> {
let db =
RollDB::open(rolldb_path, config.rolldb.k_param.unwrap_or(1000)).map_err(Error::config)?;

// placeholder while we make follow-tip optional
let (_, from_sync) = gasket::messaging::tokio::broadcast_channel(100);

let mut tasks = vec![];

if let Some(grpc_config) = config.serve.grpc {
tasks.push(tokio::spawn(dolos::serve::grpc::serve(
grpc_config,
db.clone(),
)));
} else {
info!("no gRPC config found, not serving over gRPC")
}

if let Some(ouroboros_config) = config.serve.ouroboros {
tasks.push(tokio::spawn(dolos::serve::ouroboros::serve(
ouroboros_config,
db,
from_sync.clone().try_into().unwrap(),
)));
} else {
info!("no ouroboros config found, not serving over ouroboros")
}

join_all(tasks).await;
dolos::serve::serve(config.serve, db).await?;

Ok(())
}
26 changes: 26 additions & 0 deletions src/serve/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use futures_util::future::join_all;
use serde::{Deserialize, Serialize};
use tracing::info;

use crate::{prelude::*, storage::rolldb::RollDB};

pub mod grpc;
pub mod ouroboros;
Expand All @@ -8,3 +12,25 @@ pub struct Config {
pub grpc: Option<grpc::Config>,
pub ouroboros: Option<ouroboros::Config>,
}

/// Serve remote requests
///
/// Uses specified config to start listening for network connections on either gRPC, Ouroboros or both protocols.
pub async fn serve(config: Config, db: RollDB) -> Result<(), Error> {
let mut tasks = vec![];

if let Some(cfg) = config.grpc {
info!("found gRPC config");
tasks.push(tokio::spawn(grpc::serve(cfg, db.clone())));
}

if let Some(cfg) = config.ouroboros {
info!("found Ouroboros config");
tasks.push(tokio::spawn(ouroboros::serve(cfg, db.clone())));
}

// TODO: we should stop if any of the tasks breaks
join_all(tasks).await;

Ok(())
}
7 changes: 1 addition & 6 deletions src/serve/ouroboros/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use pallas::network::miniprotocols::blockfetch::BlockRequest;
use pallas::network::miniprotocols::Point;
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use tokio::sync::broadcast::Receiver;

use tracing::{error, info, warn};

Expand All @@ -18,11 +17,7 @@ pub struct Config {
magic: u64,
}

pub async fn serve(
config: Config,
db: RollDB,
_sync_events: Receiver<gasket::messaging::Message<RollEvent>>,
) -> Result<(), Error> {
pub async fn serve(config: Config, db: RollDB) -> Result<(), Error> {
if let Some(addr) = config.listen_address {
info!("serving via N2N Ouroboros on address: {addr}");

Expand Down

0 comments on commit da4c796

Please sign in to comment.