Skip to content

Commit

Permalink
tests for the streaming rocks client
Browse files Browse the repository at this point in the history
  • Loading branch information
StanChe committed Dec 26, 2023
1 parent d0d5c3d commit 6d40393
Show file tree
Hide file tree
Showing 5 changed files with 610 additions and 408 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rocks-db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,7 @@ interface = { path = "../interface" }

[dev-dependencies]
tempfile = "3.8.1"
rand = "0.8.5"

[features]
integration_tests = []
151 changes: 151 additions & 0 deletions rocks-db/tests/asset_streaming_client_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
mod setup;
#[cfg(test)]
mod tests {
use std::collections::HashSet;

use interface::AssetDetailsStreamer;
use solana_sdk::pubkey::Pubkey;
use tempfile::TempDir;

use rocks_db::Storage;
use tokio_stream::StreamExt;

use crate::setup::setup::*;

#[tokio::test]
async fn test_get_asset_details_stream_in_range_empty_db() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");

// Call get_asset_details_stream_in_range on an empty database
let response = storage.get_asset_details_stream_in_range(100, 200).await;

assert!(response.is_ok());
let mut stream = response.unwrap();

// Check that the stream is empty
assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn test_get_asset_details_stream_in_range_data_only_before_target() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let pk = Pubkey::new_unique();

storage.asset_updated(10, pk.clone()).unwrap();
// Call get_asset_details_stream_in_range on a database
let response = storage.get_asset_details_stream_in_range(100, 200).await;

assert!(response.is_ok());
let mut stream = response.unwrap();

// Check that the stream is empty
assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn test_get_asset_details_stream_in_range_data_only_after_target() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let pk = Pubkey::new_unique();

storage.asset_updated(1000, pk.clone()).unwrap();
// Call get_asset_details_stream_in_range on a database
let response = storage.get_asset_details_stream_in_range(100, 200).await;

assert!(response.is_ok());
let mut stream = response.unwrap();

// Check that the stream is empty
assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn test_get_asset_details_stream_in_range_data_missing_data() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let pk = Pubkey::new_unique();

storage.asset_updated(100, pk.clone()).unwrap();
// Call get_asset_details_stream_in_range on a database
let response = storage.get_asset_details_stream_in_range(100, 200).await;

assert!(response.is_ok());
let mut stream = response.unwrap();

// Check that the stream contains an error
let first_resp = stream.next().await;
assert!(first_resp.is_some());
let first_resp = first_resp.unwrap();
assert!(first_resp.is_err());

// Check that the stream is closed
assert!(stream.next().await.is_none());
}

#[tokio::test]
async fn test_get_asset_details_stream_in_range_data() {
let cnt = 1000;
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let pks = (0..cnt).map(|_| Pubkey::new_unique()).collect::<Vec<_>>();
let mut slot = 100;

Check warning on line 98 in rocks-db/tests/asset_streaming_client_tests.rs

View workflow job for this annotation

GitHub Actions / build

variable does not need to be mutable
for pk in pks.iter() {
storage.asset_updated(slot, pk.clone()).unwrap();
}
// generate 1000 units of data using generate_test_static_data,generate_test_authority,generate_test_owner and create_test_dynamic_data for a 1000 unique pubkeys
let static_data = pks
.iter()
.map(|pk| generate_test_static_data(pk.clone(), slot))
.collect::<Vec<_>>();
let authority_data = pks
.iter()
.map(|pk| generate_test_authority(pk.clone()))
.collect::<Vec<_>>();
let owner_data = pks
.iter()
.map(|pk| generate_test_owner(pk.clone()))
.collect::<Vec<_>>();
let dynamic_data = pks
.iter()
.map(|pk| create_test_dynamic_data(pk.clone(), slot))
.collect::<Vec<_>>();
// put everything in the database
for ((((pk, static_data), authority_data), owner_data), dynamic_data) in pks
.iter()
.zip(static_data.iter())
.zip(authority_data.iter())
.zip(owner_data.iter())
.zip(dynamic_data.iter())
{
storage
.asset_authority_data
.put(*pk, authority_data)
.unwrap();
storage.asset_owner_data.put(*pk, owner_data).unwrap();
storage.asset_static_data.put(*pk, static_data).unwrap();
storage.asset_owner_data.put(*pk, owner_data).unwrap();
storage.asset_dynamic_data.put(*pk, dynamic_data).unwrap();
}
// Call get_asset_details_stream_in_range on a database
let response = storage.get_asset_details_stream_in_range(100, 200).await;

assert!(response.is_ok());
let mut stream = response.unwrap();

// Check that the stream contains all the data
let mut pk_set = HashSet::new();
while let Some(resp) = stream.next().await {
let resp = resp.unwrap();
pk_set.insert(resp.pubkey);
}
assert_eq!(pk_set.len(), cnt);
assert_eq!(pk_set, pks.into_iter().collect::<HashSet<_>>());
}
}
Loading

0 comments on commit 6d40393

Please sign in to comment.