Skip to content

Commit

Permalink
refactor(sync): turn apply stage into end of pipeline (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Sep 10, 2023
1 parent 119852f commit 3a629d1
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 81 deletions.
11 changes: 2 additions & 9 deletions src/bin/dolos/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,12 @@ pub async fn run(

let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?;

// channel that connects output from sync pipeline to gRPC server
let (to_serve, from_sync) = gasket::messaging::tokio::broadcast_channel(100);

let rolldb_copy = rolldb.clone();

if let Some(grpc_config) = config.serve.grpc {
let server = tokio::spawn(dolos::serve::grpc::serve(
grpc_config,
rolldb_copy,
from_sync.try_into().unwrap(),
));
let server = tokio::spawn(dolos::serve::grpc::serve(grpc_config, rolldb_copy));

dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_serve, policy)
dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy)
.unwrap()
.block();

Expand Down
1 change: 0 additions & 1 deletion src/bin/dolos/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub async fn run(config: super::Config, _args: &Args) -> Result<(), Error> {
tasks.push(tokio::spawn(dolos::serve::grpc::serve(
grpc_config,
db.clone(),
from_sync.clone().try_into().unwrap(),
)));
} else {
info!("no gRPC config found, not serving over gRPC")
Expand Down
5 changes: 1 addition & 4 deletions src/bin/dolos/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ pub fn run(

let applydb = ApplyDB::open(applydb_path).map_err(Error::storage)?;

// placeholder while we implement monitoring sink
let (to_monitor, _) = gasket::messaging::tokio::broadcast_channel(100);

dolos::sync::pipeline(&config.upstream, rolldb, applydb, to_monitor, policy)
dolos::sync::pipeline(&config.upstream, rolldb, applydb, policy)
.unwrap()
.block();

Expand Down
9 changes: 2 additions & 7 deletions src/serve/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::path::PathBuf;

use serde::{Deserialize, Serialize};
use tokio::sync::broadcast::Receiver;
use tonic::transport::{Certificate, Server, ServerTlsConfig};

use tracing::info;
Expand All @@ -18,13 +17,9 @@ pub struct Config {
tls_client_ca_root: Option<PathBuf>,
}

pub async fn serve(
config: Config,
db: RollDB,
sync_events: Receiver<gasket::messaging::Message<RollEvent>>,
) -> Result<(), Error> {
pub async fn serve(config: Config, db: RollDB) -> Result<(), Error> {
let addr = config.listen_address.parse().unwrap();
let service = sync::ChainSyncServiceImpl::new(db, sync_events);
let service = sync::ChainSyncServiceImpl::new(db);
let service = ChainSyncServiceServer::new(service);

let mut server = Server::builder().accept_http1(true);
Expand Down
52 changes: 4 additions & 48 deletions src/serve/grpc/sync.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
use futures_core::Stream;
use gasket::messaging::Message;
use pallas::crypto::hash::Hash;
use std::pin::Pin;
use tokio::sync::broadcast::Receiver;
use tokio_stream::StreamExt;
use tonic::{Request, Response, Status};
use utxorpc::proto::sync::v1::*;

use crate::{
prelude::RollEvent,
storage::rolldb::{wal::WalAction, RollDB},
};
use crate::storage::rolldb::{wal::WalAction, RollDB};

fn bytes_to_hash(raw: &[u8]) -> Hash<32> {
let array: [u8; 32] = raw.try_into().unwrap();
Expand Down Expand Up @@ -43,11 +38,11 @@ fn roll_to_tip_response(
}
}

pub struct ChainSyncServiceImpl(RollDB, Receiver<Message<RollEvent>>);
pub struct ChainSyncServiceImpl(RollDB);

impl ChainSyncServiceImpl {
pub fn new(db: RollDB, sync_events: Receiver<Message<RollEvent>>) -> Self {
Self(db, sync_events)
pub fn new(db: RollDB) -> Self {
Self(db)
}
}

Expand Down Expand Up @@ -125,45 +120,6 @@ impl chain_sync_service_server::ChainSyncService for ChainSyncServiceImpl {
Ok(Response::new(response))
}

// async fn follow_tip(
// &self,
// _request: Request<FollowTipRequest>,
// ) -> Result<Response<Self::FollowTipStream>, tonic::Status> {
// let (tx, rx) = tokio::sync::mpsc::channel(1);

// let db2 = self.0.clone();

// tokio::spawn(async move {
// let iter = db2.crawl_from_origin();
// let mut last_seq = None;

// for x in iter {
// if let Ok((val, seq)) = x {
// let val = roll_to_tip_response(val, &db2);
// tx.send(val).await.unwrap();
// last_seq = seq;
// }
// }

// loop {
// db2.tip_change.notified().await;
// let iter = db2.crawl_wal(last_seq).skip(1);

// for x in iter {
// if let Ok((seq, val)) = x {
// let val = roll_to_tip_response(val, &db2);
// tx.send(val).await.unwrap();
// last_seq = Some(seq);
// }
// }
// }
// });

// let rx = tokio_stream::wrappers::ReceiverStream::new(rx);

// Ok(Response::new(Box::pin(rx)))
// }

async fn follow_tip(
&self,
_request: Request<FollowTipRequest>,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rolldb/wal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rocksdb::{IteratorMode, WriteBatch, DB};
use serde::{Deserialize, Serialize};

use crate::{prelude::BlockSlot, storage::kvtable::*};
use crate::storage::kvtable::*;

pub type Seq = u64;

Expand Down
10 changes: 0 additions & 10 deletions src/sync/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@ use crate::prelude::*;
use crate::storage::applydb::{ApplyDB, UtxoRef};

pub type UpstreamPort = gasket::messaging::tokio::InputPort<RollEvent>;
// pub type DownstreamPort = gasket::messaging::tokio::OutputPort<RollEvent>; // gasket::messaging::tokio::OutputPort<RollEvent>;

#[derive(Stage)]
#[stage(name = "apply", unit = "RollEvent", worker = "Worker")]
pub struct Stage {
applydb: ApplyDB,

pub upstream: UpstreamPort,
// pub downstream: DownstreamPort,

// placeholder
//pub downstream: DownstreamPort,
#[metric]
block_count: gasket::metrics::Counter,

Expand Down Expand Up @@ -133,12 +129,6 @@ impl gasket::framework::Worker<Stage> for Worker {
RollEvent::Reset(_) => todo!(),
};

// stage
// .downstream
// .send(unit.clone().into())
// .await
// .or_panic()?;

Ok(())
}
}
1 change: 0 additions & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ pub fn pipeline(
config: &Config,
rolldb: RollDB,
applydb: ApplyDB,
_output: gasket::messaging::tokio::ChannelSendAdapter<RollEvent>,
policy: &gasket::runtime::Policy,
) -> Result<gasket::daemon::Daemon, Error> {
let pull_cursor = rolldb
Expand Down

0 comments on commit 3a629d1

Please sign in to comment.