Skip to content

Commit

Permalink
refactor: Use precise error type in trait BlockStore
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Feb 15, 2024
1 parent adc7823 commit df5b83a
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 48 deletions.
1 change: 1 addition & 0 deletions wnfs-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ base64 = { version = "0.21", optional = true }
base64-serde = { version = "0.7", optional = true }
bytes = { version = "1.4", features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "std"] }
cid = "0.10"
dashmap = "5.5.3"
futures = "0.3"
libipld = { version = "0.16", features = ["dag-cbor", "derive", "serde-codec"] }
Expand Down
71 changes: 48 additions & 23 deletions wnfs-common/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::{
utils::{Arc, CondSend, CondSync},
BlockStoreError, MAX_BLOCK_SIZE,
};
use anyhow::{bail, Result};
use bytes::Bytes;
use futures::Future;
use libipld::{
Expand Down Expand Up @@ -53,7 +52,10 @@ pub trait BlockStore: CondSync {
/// Retrieve a block from this store via its hash (`Cid`).
///
/// If this store can't find the block, it may raise an error like `BlockNotFound`.
fn get_block(&self, cid: &Cid) -> impl Future<Output = Result<Bytes>> + CondSend;
fn get_block(
&self,
cid: &Cid,
) -> impl Future<Output = Result<Bytes, BlockStoreError>> + CondSend;

/// Put some bytes into the blockstore. These bytes should be encoded with the given codec.
///
Expand All @@ -72,7 +74,7 @@ pub trait BlockStore: CondSync {
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> impl Future<Output = Result<Cid>> + CondSend {
) -> impl Future<Output = Result<Cid, BlockStoreError>> + CondSend {
let bytes = bytes.into();
async move {
let cid = self.create_cid(&bytes, codec)?;
Expand All @@ -94,19 +96,22 @@ pub trait BlockStore: CondSync {
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> impl Future<Output = Result<()>> + CondSend;
) -> impl Future<Output = Result<(), BlockStoreError>> + CondSend;

/// Find out whether a call to `get_block` would return with a result or not.
///
/// This is useful for data exchange protocols to find out what needs to be fetched
/// externally and what doesn't.
fn has_block(&self, cid: &Cid) -> impl Future<Output = Result<bool>> + CondSend;
fn has_block(
&self,
cid: &Cid,
) -> impl Future<Output = Result<bool, BlockStoreError>> + CondSend;

// This should be the same in all implementations of BlockStore
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid> {
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
// If there are too many bytes, abandon this task
if bytes.len() > MAX_BLOCK_SIZE {
bail!(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()))
return Err(BlockStoreError::MaximumBlockSizeExceeded(bytes.len()));
}

// Compute the Blake3 hash of the bytes
Expand All @@ -124,45 +129,61 @@ pub trait BlockStore: CondSync {
//--------------------------------------------------------------------------------------------------

impl<B: BlockStore> BlockStore for &B {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
(**self).get_block(cid).await
}

async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
(**self).put_block(bytes, codec).await
}

async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()> {
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
(**self).put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
(**self).has_block(cid).await
}

fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid> {
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
(**self).create_cid(bytes, codec)
}
}

impl<B: BlockStore> BlockStore for Box<B> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
(**self).get_block(cid).await
}

async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
(**self).put_block(bytes, codec).await
}

async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()> {
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
(**self).put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
(**self).has_block(cid).await
}

fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid> {
fn create_cid(&self, bytes: &[u8], codec: u64) -> Result<Cid, BlockStoreError> {
(**self).create_cid(bytes, codec)
}
}
Expand All @@ -186,7 +207,7 @@ impl MemoryBlockStore {
}

impl BlockStore for MemoryBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
let bytes = self
.0
.lock()
Expand All @@ -197,13 +218,17 @@ impl BlockStore for MemoryBlockStore {
Ok(bytes)
}

async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()> {
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
self.0.lock().insert(cid, bytes.into());

Ok(())
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
Ok(self.0.lock().contains_key(cid))
}
}
Expand All @@ -213,7 +238,7 @@ impl BlockStore for MemoryBlockStore {
//--------------------------------------------------------------------------------------------------

/// Tests the retrieval property of a BlockStore-conforming type.
pub async fn bs_retrieval_test<T>(store: impl BlockStore) -> Result<()> {
pub async fn bs_retrieval_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = b"hello world".to_vec();
Expand All @@ -234,7 +259,7 @@ pub async fn bs_retrieval_test<T>(store: impl BlockStore) -> Result<()> {
}

/// Tests the duplication of a BlockStore-conforming type.
pub async fn bs_duplication_test<T>(store: impl BlockStore) -> Result<()> {
pub async fn bs_duplication_test<T>(store: impl BlockStore) -> Result<(), BlockStoreError> {
// Example objects to insert and remove from the blockstore
let first_bytes = vec![1, 2, 3, 4, 5];
let second_bytes = first_bytes.clone();
Expand All @@ -261,7 +286,7 @@ pub async fn bs_duplication_test<T>(store: impl BlockStore) -> Result<()> {
}

/// Tests the serialization of a BlockStore-conforming type.
pub async fn bs_serialization_test<T>(store: &T) -> Result<()>
pub async fn bs_serialization_test<T>(store: &T) -> Result<(), BlockStoreError>
where
T: BlockStore + Serialize + for<'de> Deserialize<'de>,
{
Expand Down
8 changes: 4 additions & 4 deletions wnfs-common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ pub enum BlockStoreError {
#[error("Cannot find specified CID in block store: {0}")]
CIDNotFound(Cid),

#[error("Cannot find handler for block with CID: {0}")]
BlockHandlerNotFound(Cid),
#[error("CID error during blockstore operation: {0}")]
CIDError(#[from] cid::Error),

#[error("Lock poisoned")]
LockPoisoned,
#[error(transparent)]
Custom(#[from] anyhow::Error),
}
2 changes: 1 addition & 1 deletion wnfs-common/src/storable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub trait Storable: Sized {
{
let store_future = async {
let (bytes, codec) = self.to_serializable(store).await?.encode_ipld()?;
store.put_block(bytes, codec).await
Ok(store.put_block(bytes, codec).await?)
};

async {
Expand Down
18 changes: 13 additions & 5 deletions wnfs-common/src/utils/test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{Arc, CondSend, CondSync};
use crate::{BlockStore, MemoryBlockStore, CODEC_DAG_CBOR, CODEC_RAW};
use crate::{BlockStore, BlockStoreError, MemoryBlockStore, CODEC_DAG_CBOR, CODEC_RAW};
use anyhow::Result;
use base64_serde::base64_serde_type;
use bytes::Bytes;
Expand Down Expand Up @@ -113,22 +113,30 @@ impl SnapshotBlockStore {

impl BlockStore for SnapshotBlockStore {
#[inline]
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
self.inner.get_block(cid).await
}

#[inline]
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
self.inner.put_block(bytes, codec).await
}

#[inline]
async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()> {
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
self.inner.put_block_keyed(cid, bytes).await
}

#[inline]
async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
self.inner.has_block(cid).await
}
}
Expand Down
4 changes: 3 additions & 1 deletion wnfs-unixfs-file/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ impl Block {
}

pub async fn store(&self, store: &impl BlockStore) -> Result<Cid> {
store.put_block(self.data.clone(), self.codec.into()).await
Ok(store
.put_block(self.data.clone(), self.codec.into())
.await?)
}

/// Validate the block. Will return an error if the links are wrong.
Expand Down
12 changes: 8 additions & 4 deletions wnfs-wasm/src/fs/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use js_sys::{Promise, Uint8Array};
use libipld_core::cid::Cid;
use wasm_bindgen::prelude::wasm_bindgen;
use wasm_bindgen_futures::JsFuture;
use wnfs::common::BlockStore as WnfsBlockStore;
use wnfs::common::{BlockStore as WnfsBlockStore, BlockStoreError};

//--------------------------------------------------------------------------------------------------
// Externs
Expand Down Expand Up @@ -50,7 +50,11 @@ pub struct ForeignBlockStore(pub(crate) BlockStore);
//--------------------------------------------------------------------------------------------------

impl WnfsBlockStore for ForeignBlockStore {
async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes>) -> Result<()> {
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes>,
) -> Result<(), BlockStoreError> {
let bytes: Bytes = bytes.into();

JsFuture::from(self.0.put_block_keyed(cid.to_bytes(), bytes.into()))
Expand All @@ -60,7 +64,7 @@ impl WnfsBlockStore for ForeignBlockStore {
Ok(())
}

async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
let value = JsFuture::from(self.0.get_block(cid.to_bytes()))
.await
.map_err(anyhow_error("Cannot get block: {:?}"))?;
Expand All @@ -70,7 +74,7 @@ impl WnfsBlockStore for ForeignBlockStore {
Ok(Bytes::from(bytes))
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
let value = JsFuture::from(self.0.has_block(cid.to_bytes()))
.await
.map_err(anyhow_error("Cannot run has_block: {:?}"))?;
Expand Down
18 changes: 13 additions & 5 deletions wnfs/examples/tiered_blockstores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use wnfs::{
PrivateDirectory, PrivateNode,
},
};
use wnfs_common::{utils::CondSend, Storable};
use wnfs_common::{utils::CondSend, BlockStoreError, Storable};

#[async_std::main]
async fn main() -> Result<()> {
Expand Down Expand Up @@ -96,23 +96,31 @@ struct TieredBlockStore<H: BlockStore, C: BlockStore> {
}

impl<H: BlockStore, C: BlockStore> BlockStore for TieredBlockStore<H, C> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
if self.hot.has_block(cid).await? {
self.hot.get_block(cid).await
} else {
self.cold.get_block(cid).await
}
}

async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
self.hot.put_block(bytes, codec).await
}

async fn put_block_keyed(&self, cid: Cid, bytes: impl Into<Bytes> + CondSend) -> Result<()> {
async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
self.hot.put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool> {
async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
if self.hot.has_block(cid).await? {
return Ok(true);
}
Expand Down
2 changes: 1 addition & 1 deletion wnfs/src/private/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ impl PrivateDirectoryContent {
let block = snapshot_key.encrypt(&bytes, rng)?;

// Store content section in blockstore and get Cid.
store.put_block(block, CODEC_RAW).await
Ok(store.put_block(block, CODEC_RAW).await?)
})
.await?)
}
Expand Down
2 changes: 1 addition & 1 deletion wnfs/src/private/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ impl PrivateFileContent {
let block = snapshot_key.encrypt(&bytes, rng)?;

// Store content section in blockstore and get Cid.
store.put_block(block, CODEC_RAW).await
Ok(store.put_block(block, CODEC_RAW).await?)
})
.await?)
}
Expand Down
2 changes: 1 addition & 1 deletion wnfs/src/private/node/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl PrivateNodeHeader {
let temporal_key = self.derive_temporal_key();
let cbor_bytes = serde_ipld_dagcbor::to_vec(&self.to_serializable(forest))?;
let ciphertext = temporal_key.key_wrap_encrypt(&cbor_bytes)?;
store.put_block(ciphertext, CODEC_RAW).await
Ok(store.put_block(ciphertext, CODEC_RAW).await?)
}

pub(crate) fn to_serializable(
Expand Down
6 changes: 4 additions & 2 deletions wnfs/src/root_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,11 @@ where
version: WNFS_VERSION,
};

store
let cid = store
.put_block(encode(&serializable, DagCborCodec)?, DagCborCodec.into())
.await
.await?;

Ok(cid)
}

pub async fn load(
Expand Down

0 comments on commit df5b83a

Please sign in to comment.