Skip to content

Commit

Permalink
Adding mock support for the Database trait in AKD (#333)
Browse files Browse the repository at this point in the history
* Adding mock support for the Database trait in AKD

Related to: #332

* adding a mocked database test

* Move to Arc in StorageManager so we can clone it cleanly.

* Make storage manager create and manage the Arc

* Bump version for contract changes
  • Loading branch information
slawlor authored Jan 4, 2023
1 parent 3a353e7 commit 7bb93b7
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 55 deletions.
4 changes: 3 additions & 1 deletion akd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "akd"
version = "0.8.5"
version = "0.8.6"
authors = ["Harjasleen Malvai <[email protected]>", "Kevin Lewi <[email protected]>", "Sean Lawlor <[email protected]>"]
description = "An implementation of an auditable key directory"
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -62,6 +62,8 @@ once_cell = { version = "1" }
ctor = "0.1"
tokio-test = "0.4"
tokio = { version = "1.21", features = ["rt", "sync", "time", "macros"] }
mockall = "0.11"
futures = "0.3"

# To enable the public-test feature in tests
akd = { path = ".", features = ["public-tests"], default-features = false }
Expand Down
24 changes: 20 additions & 4 deletions akd/src/storage/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,26 @@ mod tests;

/// Represents the manager of the storage mediums, including caching
/// and transactional operations (creating the transaction, committing it, etc)
#[derive(Clone)]
pub struct StorageManager<Db: Database> {
cache: Option<TimedCache>,
transaction: Transaction,
/// The underlying database managed by this storage manager
pub db: Db,
db: Arc<Db>,

metrics: [Arc<AtomicU64>; NUM_METRICS],
}

impl<Db: Database> Clone for StorageManager<Db> {
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
transaction: self.transaction.clone(),
db: self.db.clone(),
metrics: self.metrics.clone(),
}
}
}

unsafe impl<Db: Database> Sync for StorageManager<Db> {}
unsafe impl<Db: Database> Send for StorageManager<Db> {}

Expand All @@ -70,7 +80,7 @@ impl<Db: Database> StorageManager<Db> {
Self {
cache: None,
transaction: Transaction::new(),
db,
db: Arc::new(db),
metrics: [0; NUM_METRICS].map(|_| Arc::new(AtomicU64::new(0))),
}
}
Expand All @@ -89,11 +99,17 @@ impl<Db: Database> StorageManager<Db> {
cache_clean_frequency,
)),
transaction: Transaction::new(),
db,
db: Arc::new(db),
metrics: [0; NUM_METRICS].map(|_| Arc::new(AtomicU64::new(0))),
}
}

/// Retrieve a reference to the database implementation
#[cfg(any(test, feature = "public-tests"))]
pub fn get_db(&self) -> Arc<Db> {
self.db.clone()
}

/// Returns whether the storage manager has a cache
pub fn has_cache(&self) -> bool {
self.cache.is_some()
Expand Down
26 changes: 18 additions & 8 deletions akd/src/storage/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::*;
#[tokio::test]
async fn test_storage_manager_transaction() {
let db = AsyncInMemoryDatabase::new();
let storage_manager = StorageManager::new_no_cache(db.clone());
let storage_manager = StorageManager::new_no_cache(db);

assert!(
storage_manager.begin_transaction(),
Expand Down Expand Up @@ -67,7 +67,11 @@ async fn test_storage_manager_transaction() {
// there should be no items in the db, as they should all be in the transaction log
assert_eq!(
Ok(0),
db.batch_get_all_direct().await.map(|items| items.len())
storage_manager
.db
.batch_get_all_direct()
.await
.map(|items| items.len())
);
assert_eq!(11, storage_manager.transaction.count());

Expand Down Expand Up @@ -101,15 +105,20 @@ async fn test_storage_manager_transaction() {
// now the records should be in the database and the transaction log empty
assert_eq!(
Ok(11),
db.batch_get_all_direct().await.map(|items| items.len())
storage_manager
.db
.batch_get_all_direct()
.await
.map(|items| items.len())
);
assert_eq!(0, storage_manager.transaction.count());
}

#[tokio::test]
async fn test_storage_manager_cache_populated_by_batch_set() {
let db = AsyncInMemoryDatabase::new();
let storage_manager = StorageManager::new(db.clone(), None, None, None);

let storage_manager = StorageManager::new(db, None, None, None);

let mut records = (0..10)
.into_iter()
Expand Down Expand Up @@ -153,7 +162,7 @@ async fn test_storage_manager_cache_populated_by_batch_set() {
.expect("Failed to set batch of records");

// flush the database
db.clear().await;
storage_manager.db.clear().await;

// test a retrieval still gets data (from the cache)
let key = NodeKey(NodeLabel {
Expand Down Expand Up @@ -190,7 +199,7 @@ async fn test_storage_manager_cache_populated_by_batch_set() {
#[tokio::test]
async fn test_storage_manager_cache_populated_by_batch_get() {
let db = AsyncInMemoryDatabase::new();
let storage_manager = StorageManager::new(db.clone(), None, None, None);
let storage_manager = StorageManager::new(db, None, None, None);

let mut keys = vec![];
let mut records = (0..10)
Expand Down Expand Up @@ -235,12 +244,13 @@ async fn test_storage_manager_cache_populated_by_batch_get() {
.await
.expect("Failed to set batch of records");

let db_arc = storage_manager.get_db();
// flush the cache by destroying the storage manager
drop(storage_manager);

// re-create the storage manager, and run a batch_get of the same data keys to populate the cache
let storage_manager = StorageManager::new(
db.clone(),
Arc::try_unwrap(db_arc).expect("Failed to grab arc"),
Some(std::time::Duration::from_secs(1000)),
None,
None,
Expand All @@ -252,7 +262,7 @@ async fn test_storage_manager_cache_populated_by_batch_get() {
.expect("Failed to get a batch of records");

// flush the database
db.clear().await;
storage_manager.db.clear().await;

// test a retrieval still gets data (from the cache)
let key = NodeKey(NodeLabel {
Expand Down
6 changes: 3 additions & 3 deletions akd/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum DbSetState {

/// Storable represents an _item_ which can be stored in the storage layer
#[cfg(feature = "serde_serialization")]
pub trait Storable: Clone + Serialize + DeserializeOwned + Sync {
pub trait Storable: Clone + Serialize + DeserializeOwned + Sync + 'static {
/// This particular storage will have a key type
type StorageKey: Clone + Serialize + Eq + Hash + Send + Sync + std::fmt::Debug;

Expand All @@ -68,7 +68,7 @@ pub trait Storable: Clone + Serialize + DeserializeOwned + Sync {

/// Storable represents an _item_ which can be stored in the storage layer
#[cfg(not(feature = "serde_serialization"))]
pub trait Storable: Clone + Sync {
pub trait Storable: Clone + Sync + 'static {
/// This particular storage will have a key type
type StorageKey: Clone + Eq + Hash + Send + Sync + std::fmt::Debug;

Expand All @@ -93,7 +93,7 @@ pub trait Storable: Clone + Sync {

/// A database implementation backing storage for the AKD
#[async_trait]
pub trait Database: Clone + Send + Sync {
pub trait Database: Send + Sync {
/// Set a record in the database
async fn set(&self, record: DbRecord) -> Result<(), StorageError>;

Expand Down
19 changes: 9 additions & 10 deletions akd/src/storage/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,23 @@ mod memory_storage_tests {
#[serial]
async fn test_in_memory_db() {
let db = AsyncInMemoryDatabase::new();
crate::storage::tests::run_test_cases_for_storage_impl(&db).await;
crate::storage::tests::run_test_cases_for_storage_impl(db).await;
}
}

// *** Run the test cases for a given data-layer impl *** //
/// Run the storage-layer test suite for a given storage implementation.
/// This is public because it can be used by other implemented storage layers
/// for consistency checks (e.g. mysql, memcached, etc)
pub async fn run_test_cases_for_storage_impl<S: Database>(db: &S) {
test_get_and_set_item(db).await;
test_user_data(db).await;
test_transactions(db).await;
test_batch_get_items(db).await;
pub async fn run_test_cases_for_storage_impl<S: Database>(db: S) -> StorageManager<S> {
test_get_and_set_item(&db).await;
test_user_data(&db).await;
test_batch_get_items(&db).await;

let manager = StorageManager::new_no_cache(db.clone());
let manager = StorageManager::new_no_cache(db);
test_transactions(&manager).await;
test_tombstoning_data(&manager).await.unwrap();
manager
}

// *** New Test Helper Functions *** //
Expand Down Expand Up @@ -312,9 +313,7 @@ async fn test_batch_get_items<Ns: Database>(storage: &Ns) {
}
}

async fn test_transactions<S: Database>(db: &S) {
let storage = crate::storage::manager::StorageManager::new_no_cache(db.clone());

async fn test_transactions<S: Database>(storage: &StorageManager<S>) {
let mut rand_users: Vec<Vec<u8>> = vec![];
for _ in 0..20 {
let str: String = thread_rng()
Expand Down
Loading

0 comments on commit 7bb93b7

Please sign in to comment.