Skip to content

Commit

Permalink
view: only commit to database on nonempty blocks
Browse files Browse the repository at this point in the history
The initial version of the view service's sync logic suffered from poor sync
performance as the size of the NCT grows, because it commits the full NCT to
the database on every block.  This causes a massive amount of disk traffic.

In the future, we'll avoid this by using incremental serialization of the NCT,
so that the data to serialize drops from being on the order of megabytes to on
the order of kilobytes.

In the meantime, this is an easy optimization for empty blocks: only commit
changes to the database when the block is nonempty.  Otherwise, if the block
was empty, there was no data to update, except the NCT, which is already
in-memory.

To actually implement this, we have to be a bit more careful, and also add an
in-memory cache for the `last_sync_height` field, update it when we "uncommit"
an empty block, and reset it when we commit a nonempty block.

Testing locally (without carefully measuring) seems to give a 4-5x speedup on
sync times.
  • Loading branch information
hdevalence committed May 31, 2022
1 parent 11cc77b commit ab98ccf
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions chain/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ pub struct CompactBlock {
pub nullifiers: Vec<Nullifier>,
}

impl CompactBlock {
/// Returns true if the compact block is empty.
pub fn is_empty(&self) -> bool {
self.note_payloads.is_empty() && self.nullifiers.is_empty()
}
}

impl Protobuf<pb::CompactBlock> for CompactBlock {}

impl From<CompactBlock> for pb::CompactBlock {
Expand Down
1 change: 1 addition & 0 deletions view/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ hex = "0.4"
metrics = "0.18"
async-stream = "0.2"
reqwest = { version = "0.11", features = ["json"] }
parking_lot = "0.12"

[build-dependencies]
vergen = "5"
46 changes: 43 additions & 3 deletions view/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::anyhow;
use parking_lot::Mutex;
use penumbra_chain::params::ChainParams;
use penumbra_crypto::{
asset::{self, Id},
Expand All @@ -11,14 +12,23 @@ use penumbra_proto::{
};
use penumbra_tct as tct;
use sqlx::{migrate::MigrateDatabase, query, Pool, Sqlite};
use std::path::PathBuf;
use std::{num::NonZeroU64, path::PathBuf, sync::Arc};
use tonic::transport::Channel;

use crate::{sync::ScanResult, NoteRecord};

#[derive(Clone)]
pub struct Storage {
pool: Pool<Sqlite>,

/// This allows an optimization where we only commit to the database after
/// scanning a nonempty block.
///
/// If this is `Some`, we have uncommitted empty blocks up to the inner height.
/// If this is `None`, we don't.
///
/// Using a `NonZeroU64` ensures that `Option<NonZeroU64>` fits in 8 bytes.
uncommitted_height: Arc<Mutex<Option<NonZeroU64>>>,
}

impl Storage {
Expand All @@ -45,6 +55,7 @@ impl Storage {
pub async fn load(storage_path: String) -> anyhow::Result<Self> {
Ok(Self {
pool: Pool::<Sqlite>::connect(&storage_path).await?,
uncommitted_height: Arc::new(Mutex::new(None)),
})
}

Expand Down Expand Up @@ -103,11 +114,19 @@ impl Storage {

tx.commit().await?;

Ok(Storage { pool })
Ok(Storage {
pool,
uncommitted_height: Arc::new(Mutex::new(None)),
})
}

/// The last block height we've scanned to, if any.
pub async fn last_sync_height(&self) -> anyhow::Result<Option<u64>> {
// Check if we have uncommitted blocks beyond the database height.
if let Some(height) = *self.uncommitted_height.lock() {
return Ok(Some(height.get()));
}

let result = sqlx::query!(
r#"
SELECT height
Expand Down Expand Up @@ -291,6 +310,24 @@ impl Storage {
Ok(())
}

pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> {
//Check that the incoming block height follows the latest recorded height
let last_sync_height = self.last_sync_height().await?.ok_or_else(|| {
anyhow::anyhow!("invalid: tried to record empty block as genesis block")
})?;

if height != last_sync_height + 1 {
return Err(anyhow::anyhow!(
"Wrong block height {} for latest sync height {}",
height,
last_sync_height
));
}

*self.uncommitted_height.lock() = Some(height.try_into().unwrap());
Ok(())
}

pub async fn record_block(
&self,
scan_result: ScanResult,
Expand All @@ -308,7 +345,7 @@ impl Storage {

if !correct_height {
return Err(anyhow::anyhow!(
"Wrong block height {:?} for latest sync height {:?}",
"Wrong block height {} for latest sync height {:?}",
scan_result.height,
last_sync_height
));
Expand Down Expand Up @@ -416,6 +453,9 @@ impl Storage {
.await?;

tx.commit().await?;
// It's critical to reset the uncommitted height here, since we've just
// invalidated it by committing.
self.uncommitted_height.lock().take();

Ok(())
}
Expand Down
32 changes: 23 additions & 9 deletions view/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, Mutex};

use crate::{sync::scan_block, Storage};
use penumbra_chain::sync::CompactBlock;
use penumbra_crypto::{Asset, FullViewingKey};
use penumbra_proto::client::oblivious::{
oblivious_query_client::ObliviousQueryClient, AssetListRequest, CompactBlockRangeRequest,
Expand Down Expand Up @@ -101,17 +102,30 @@ impl Worker {
.into_inner();

while let Some(block) = stream.message().await? {
// Lock the NCT only while processing this block.
let mut nct = self.nct.write().await;

let scan_result = scan_block(&self.fvk, &mut nct, block.try_into()?, epoch_duration);
let height = scan_result.height;
let block = CompactBlock::try_from(block)?;

self.storage.record_block(scan_result, &mut nct).await?;
// Notify all watchers of the new height we just recorded.
self.sync_height_tx.send(height)?;
// Lock the NCT only while processing this block.
let mut nct_guard = self.nct.write().await;

if block.is_empty() {
// Optimization: if the block is empty, seal the in-memory NCT,
// and skip touching the database:
nct_guard.end_block().unwrap();
self.storage.record_empty_block(block.height).await?;
} else {
// Otherwise, scan the block and commit its changes:
let scan_result =
scan_block(&self.fvk, &mut nct_guard, block.try_into()?, epoch_duration);
let height = scan_result.height;

self.storage
.record_block(scan_result, &mut nct_guard)
.await?;
// Notify all watchers of the new height we just recorded.
self.sync_height_tx.send(height)?;
}
// Release the NCT RwLock
drop(nct);
drop(nct_guard);

// Check if we should stop waiting for blocks to arrive, because the view
// services are dropped and we're supposed to shut down.
Expand Down

0 comments on commit ab98ccf

Please sign in to comment.