From 3a629d1d91f8af07527bf8246329cc4038b83271 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 10 Sep 2023 15:58:17 +0200 Subject: [PATCH] refactor(sync): turn apply stage into end of pipeline (#86) --- src/bin/dolos/daemon.rs | 11 ++------- src/bin/dolos/serve.rs | 1 - src/bin/dolos/sync.rs | 5 +--- src/serve/grpc/mod.rs | 9 ++----- src/serve/grpc/sync.rs | 52 +++------------------------------------ src/storage/rolldb/wal.rs | 2 +- src/sync/apply.rs | 10 -------- src/sync/mod.rs | 1 - 8 files changed, 10 insertions(+), 81 deletions(-) diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index 82d95087..48a6de63 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -39,19 +39,12 @@ pub async fn run( let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?; - // channel that connects output from sync pipeline to gRPC server - let (to_serve, from_sync) = gasket::messaging::tokio::broadcast_channel(100); - let rolldb_copy = rolldb.clone(); if let Some(grpc_config) = config.serve.grpc { - let server = tokio::spawn(dolos::serve::grpc::serve( - grpc_config, - rolldb_copy, - from_sync.try_into().unwrap(), - )); + let server = tokio::spawn(dolos::serve::grpc::serve(grpc_config, rolldb_copy)); - dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_serve, policy) + dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy) .unwrap() .block(); diff --git a/src/bin/dolos/serve.rs b/src/bin/dolos/serve.rs index f0c31854..8e11eac2 100644 --- a/src/bin/dolos/serve.rs +++ b/src/bin/dolos/serve.rs @@ -34,7 +34,6 @@ pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> { tasks.push(tokio::spawn(dolos::serve::grpc::serve( grpc_config, db.clone(), - from_sync.clone().try_into().unwrap(), ))); } else { info!("no gRPC config found, not serving over gRPC") diff --git a/src/bin/dolos/sync.rs b/src/bin/dolos/sync.rs index 4586c819..f55ccef7 100644 --- a/src/bin/dolos/sync.rs +++ b/src/bin/dolos/sync.rs @@ -37,10 +37,7 @@ pub fn run( let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?; - // placeholder while we implement monitoring sink - let (to_monitor, _) = gasket::messaging::tokio::broadcast_channel(100); - - dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_monitor, policy) + dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy) .unwrap() .block(); diff --git a/src/serve/grpc/mod.rs b/src/serve/grpc/mod.rs index d6c8a9c8..1328c90e 100644 --- a/src/serve/grpc/mod.rs +++ b/src/serve/grpc/mod.rs @@ -1,7 +1,6 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; -use tokio::sync::broadcast::Receiver; use tonic::transport::{Certificate, Server, ServerTlsConfig}; use tracing::info; @@ -18,13 +17,9 @@ pub struct Config { tls_client_ca_root: Option, } -pub async fn serve( - config: Config, - db: RollDB, - sync_events: Receiver>, -) -> Result<(), Error> { +pub async fn serve(config: Config, db: RollDB) -> Result<(), Error> { let addr = config.listen_address.parse().unwrap(); - let service = sync::ChainSyncServiceImpl::new(db, sync_events); + let service = sync::ChainSyncServiceImpl::new(db); let service = ChainSyncServiceServer::new(service); let mut server = Server::builder().accept_http1(true); diff --git a/src/serve/grpc/sync.rs b/src/serve/grpc/sync.rs index 9c97a324..6a6ef276 100644 --- a/src/serve/grpc/sync.rs +++ b/src/serve/grpc/sync.rs @@ -1,16 +1,11 @@ use futures_core::Stream; -use gasket::messaging::Message; use pallas::crypto::hash::Hash; use std::pin::Pin; -use tokio::sync::broadcast::Receiver; use tokio_stream::StreamExt; use tonic::{Request, Response, Status}; use utxorpc::proto::sync::v1::*; -use crate::{ - prelude::RollEvent, - storage::rolldb::{wal::WalAction, RollDB}, -}; +use crate::storage::rolldb::{wal::WalAction, RollDB}; fn bytes_to_hash(raw: &[u8]) -> Hash<32> { let array: [u8; 32] = raw.try_into().unwrap(); @@ -43,11 +38,11 @@ fn roll_to_tip_response( } } -pub struct ChainSyncServiceImpl(RollDB, Receiver>); +pub struct ChainSyncServiceImpl(RollDB); impl ChainSyncServiceImpl { - pub fn new(db: RollDB, sync_events: Receiver>) -> Self { - Self(db, sync_events) + pub fn new(db: RollDB) -> Self { + Self(db) } } @@ -125,45 +120,6 @@ impl chain_sync_service_server::ChainSyncService for ChainSyncServiceImpl { Ok(Response::new(response)) } - // async fn follow_tip( - // &self, - // _request: Request, - // ) -> Result, tonic::Status> { - // let (tx, rx) = tokio::sync::mpsc::channel(1); - - // let db2 = self.0.clone(); - - // tokio::spawn(async move { - // let iter = db2.crawl_from_origin(); - // let mut last_seq = None; - - // for x in iter { - // if let Ok((val, seq)) = x { - // let val = roll_to_tip_response(val, &db2); - // tx.send(val).await.unwrap(); - // last_seq = seq; - // } - // } - - // loop { - // db2.tip_change.notified().await; - // let iter = db2.crawl_wal(last_seq).skip(1); - - // for x in iter { - // if let Ok((seq, val)) = x { - // let val = roll_to_tip_response(val, &db2); - // tx.send(val).await.unwrap(); - // last_seq = Some(seq); - // } - // } - // } - // }); - - // let rx = tokio_stream::wrappers::ReceiverStream::new(rx); - - // Ok(Response::new(Box::pin(rx))) - // } - async fn follow_tip( &self, _request: Request, diff --git a/src/storage/rolldb/wal.rs b/src/storage/rolldb/wal.rs index ea860c08..2a50d21c 100644 --- a/src/storage/rolldb/wal.rs +++ b/src/storage/rolldb/wal.rs @@ -1,7 +1,7 @@ use rocksdb::{IteratorMode, WriteBatch, DB}; use serde::{Deserialize, Serialize}; -use crate::{prelude::BlockSlot, storage::kvtable::*}; +use crate::storage::kvtable::*; pub type Seq = u64; diff --git a/src/sync/apply.rs b/src/sync/apply.rs index f2060cfe..2e8f9b48 100644 --- a/src/sync/apply.rs +++ b/src/sync/apply.rs @@ -5,7 +5,6 @@ use crate::prelude::*; use crate::storage::applydb::{ApplyDB, UtxoRef}; pub type UpstreamPort = gasket::messaging::tokio::InputPort; -// pub type DownstreamPort = gasket::messaging::tokio::OutputPort; // gasket::messaging::tokio::OutputPort; #[derive(Stage)] #[stage(name = "apply", unit = "RollEvent", worker = "Worker")] @@ -13,10 +12,7 @@ pub struct Stage { applydb: ApplyDB, pub upstream: UpstreamPort, - // pub downstream: DownstreamPort, - // placeholder - //pub downstream: DownstreamPort, #[metric] block_count: gasket::metrics::Counter, @@ -133,12 +129,6 @@ impl gasket::framework::Worker for Worker { RollEvent::Reset(_) => todo!(), }; - // stage - // .downstream - // .send(unit.clone().into()) - // .await - // .or_panic()?; - Ok(()) } } diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 872af568..44f5b288 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -18,7 +18,6 @@ pub fn pipeline( config: &Config, rolldb: RollDB, applydb: ApplyDB, - _output: gasket::messaging::tokio::ChannelSendAdapter, policy: &gasket::runtime::Policy, ) -> Result { let pull_cursor = rolldb