From ab98ccfdc20e0eabaf2aff2a9aac06e4fb2ff8a2 Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Mon, 30 May 2022 20:30:04 -0700 Subject: [PATCH] view: only commit to database on nonempty blocks The initial version of the view service's sync logic suffered from poor sync performance as the size of the NCT grows, because it commits the full NCT to the database on every block. This causes a massive amount of disk traffic. In the future, we'll avoid this by using incremental serialization of the NCT, so that the data to serialize drops from being on the order of megabytes to on the order of kilobytes. In the meantime, this is an easy optimization for empty blocks: only commit changes to the database when the block is nonempty. Otherwise, if the block was empty, there was no data to update, except the NCT, which is already in-memory. To actually implement this, we have to be a bit more careful, and also add an in-memory cache for the `last_sync_height` field, update it when we "uncommit" an empty block, and reset it when we commit a nonempty block. Testing locally (without carefully measuring) seems to give a 4-5x speedup on sync times. --- Cargo.lock | 1 + chain/src/sync.rs | 7 +++++++ view/Cargo.toml | 1 + view/src/storage.rs | 46 ++++++++++++++++++++++++++++++++++++++++++--- view/src/worker.rs | 32 ++++++++++++++++++++++--------- 5 files changed, 75 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 56f2efb65a..fd08adee9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2989,6 +2989,7 @@ dependencies = [ "futures", "hex", "metrics", + "parking_lot 0.12.0", "penumbra-chain", "penumbra-crypto", "penumbra-proto", diff --git a/chain/src/sync.rs b/chain/src/sync.rs index d4cfae749c..e52f36a4c6 100644 --- a/chain/src/sync.rs +++ b/chain/src/sync.rs @@ -18,6 +18,13 @@ pub struct CompactBlock { pub nullifiers: Vec, } +impl CompactBlock { + /// Returns true if the compact block is empty. + pub fn is_empty(&self) -> bool { + self.note_payloads.is_empty() && self.nullifiers.is_empty() + } +} + impl Protobuf for CompactBlock {} impl From for pb::CompactBlock { diff --git a/view/Cargo.toml b/view/Cargo.toml index c138365ccb..8e04817093 100644 --- a/view/Cargo.toml +++ b/view/Cargo.toml @@ -42,6 +42,7 @@ hex = "0.4" metrics = "0.18" async-stream = "0.2" reqwest = { version = "0.11", features = ["json"] } +parking_lot = "0.12" [build-dependencies] vergen = "5" diff --git a/view/src/storage.rs b/view/src/storage.rs index dd3cb0a3a7..c6cc0e0a44 100644 --- a/view/src/storage.rs +++ b/view/src/storage.rs @@ -1,4 +1,5 @@ use anyhow::anyhow; +use parking_lot::Mutex; use penumbra_chain::params::ChainParams; use penumbra_crypto::{ asset::{self, Id}, @@ -11,7 +12,7 @@ use penumbra_proto::{ }; use penumbra_tct as tct; use sqlx::{migrate::MigrateDatabase, query, Pool, Sqlite}; -use std::path::PathBuf; +use std::{num::NonZeroU64, path::PathBuf, sync::Arc}; use tonic::transport::Channel; use crate::{sync::ScanResult, NoteRecord}; @@ -19,6 +20,15 @@ use crate::{sync::ScanResult, NoteRecord}; #[derive(Clone)] pub struct Storage { pool: Pool, + + /// This allows an optimization where we only commit to the database after + /// scanning a nonempty block. + /// + /// If this is `Some`, we have uncommitted empty blocks up to the inner height. + /// If this is `None`, we don't. + /// + /// Using a `NonZeroU64` ensures that `Option` fits in 8 bytes. + uncommitted_height: Arc>>, } impl Storage { @@ -45,6 +55,7 @@ impl Storage { pub async fn load(storage_path: String) -> anyhow::Result { Ok(Self { pool: Pool::::connect(&storage_path).await?, + uncommitted_height: Arc::new(Mutex::new(None)), }) } @@ -103,11 +114,19 @@ impl Storage { tx.commit().await?; - Ok(Storage { pool }) + Ok(Storage { + pool, + uncommitted_height: Arc::new(Mutex::new(None)), + }) } /// The last block height we've scanned to, if any. pub async fn last_sync_height(&self) -> anyhow::Result> { + // Check if we have uncommitted blocks beyond the database height. + if let Some(height) = *self.uncommitted_height.lock() { + return Ok(Some(height.get())); + } + let result = sqlx::query!( r#" SELECT height @@ -291,6 +310,24 @@ impl Storage { Ok(()) } + pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> { + //Check that the incoming block height follows the latest recorded height + let last_sync_height = self.last_sync_height().await?.ok_or_else(|| { + anyhow::anyhow!("invalid: tried to record empty block as genesis block") + })?; + + if height != last_sync_height + 1 { + return Err(anyhow::anyhow!( + "Wrong block height {} for latest sync height {}", + height, + last_sync_height + )); + } + + *self.uncommitted_height.lock() = Some(height.try_into().unwrap()); + Ok(()) + } + pub async fn record_block( &self, scan_result: ScanResult, @@ -308,7 +345,7 @@ impl Storage { if !correct_height { return Err(anyhow::anyhow!( - "Wrong block height {:?} for latest sync height {:?}", + "Wrong block height {} for latest sync height {:?}", scan_result.height, last_sync_height )); @@ -416,6 +453,9 @@ impl Storage { .await?; tx.commit().await?; + // It's critical to reset the uncommitted height here, since we've just + // invalidated it by committing. + self.uncommitted_height.lock().take(); Ok(()) } diff --git a/view/src/worker.rs b/view/src/worker.rs index 9aaa9de46c..ce7583fffa 100644 --- a/view/src/worker.rs +++ b/view/src/worker.rs @@ -1,6 +1,7 @@ use std::sync::{Arc, Mutex}; use crate::{sync::scan_block, Storage}; +use penumbra_chain::sync::CompactBlock; use penumbra_crypto::{Asset, FullViewingKey}; use penumbra_proto::client::oblivious::{ oblivious_query_client::ObliviousQueryClient, AssetListRequest, CompactBlockRangeRequest, @@ -101,17 +102,30 @@ impl Worker { .into_inner(); while let Some(block) = stream.message().await? { - // Lock the NCT only while processing this block. - let mut nct = self.nct.write().await; - - let scan_result = scan_block(&self.fvk, &mut nct, block.try_into()?, epoch_duration); - let height = scan_result.height; + let block = CompactBlock::try_from(block)?; - self.storage.record_block(scan_result, &mut nct).await?; - // Notify all watchers of the new height we just recorded. - self.sync_height_tx.send(height)?; + // Lock the NCT only while processing this block. + let mut nct_guard = self.nct.write().await; + + if block.is_empty() { + // Optimization: if the block is empty, seal the in-memory NCT, + // and skip touching the database: + nct_guard.end_block().unwrap(); + self.storage.record_empty_block(block.height).await?; + } else { + // Otherwise, scan the block and commit its changes: + let scan_result = + scan_block(&self.fvk, &mut nct_guard, block.try_into()?, epoch_duration); + let height = scan_result.height; + + self.storage + .record_block(scan_result, &mut nct_guard) + .await?; + // Notify all watchers of the new height we just recorded. + self.sync_height_tx.send(height)?; + } // Release the NCT RwLock - drop(nct); + drop(nct_guard); // Check if we should stop waiting for blocks to arrive, because the view // services are dropped and we're supposed to shut down.