diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3d232dda..a786978f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -19,7 +19,7 @@ jobs: ~/.cargo/registry/ ~/.cargo/git/ target/ - key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} + key: ${{ runner.os }}-cargo-utility-chain - name: Install dependencies run: sudo apt-get update && sudo apt-get install -y protobuf-compiler @@ -38,6 +38,12 @@ jobs: command: fmt args: --all -- --check + - name: Clean up 2 workspaces with mock dependencies that may fail intermittently + uses: actions-rs/cargo@v1 + with: + command: clean + args: -p rocks-db -p postgre-client -p interface + - name: Lint with Clippy uses: actions-rs/clippy-check@v1 with: diff --git a/Cargo.lock b/Cargo.lock index 029d1ed8..d01173c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4381,12 +4381,14 @@ dependencies = [ "env_logger 0.10.1", "figment", "futures-util", + "interface", "itertools 0.12.0", "log", "lz4", "metrics-utils", "mockall 0.12.0", "num_cpus", + "rand 0.8.5", "reqwest", "rocksdb", "serde", @@ -4397,6 +4399,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", ] [[package]] diff --git a/nft_ingester/src/bin/ingester/backfiller.rs b/nft_ingester/src/bin/ingester/backfiller.rs index 501a9707..bcdb4892 100644 --- a/nft_ingester/src/bin/ingester/backfiller.rs +++ b/nft_ingester/src/bin/ingester/backfiller.rs @@ -17,6 +17,7 @@ use std::collections::HashSet; use std::num::ParseIntError; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use tokio::sync::Mutex; use tokio::task::{JoinError, JoinSet}; use tokio::time::Duration; @@ -78,7 +79,7 @@ impl Backfiller { pub async fn start_backfill( &self, - tasks: &mut JoinSet>, + tasks: Arc>>>, keep_running: Arc, metrics: Arc, ) -> Result<(), IngesterError> { @@ -94,7 +95,7 @@ impl Backfiller { .await; let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + tasks.lock().await.spawn(tokio::spawn(async move { info!("Running slots parser..."); slots_collector.collect_slots(cloned_keep_running).await; @@ -109,7 +110,7 @@ impl Backfiller { .await; let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + tasks.lock().await.spawn(tokio::spawn(async move { info!("Running transactions parser..."); transactions_parser diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 6c4579b4..2342cb91 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -4,6 +4,7 @@ use std::time::Duration; use clap::Parser; use log::{error, info}; +use tokio::sync::Mutex; use tokio::task::JoinSet; use metrics_utils::utils::setup_metrics; @@ -130,12 +131,14 @@ pub async fn main() -> Result<(), IngesterError> { } })); + let mutexed_tasks = Arc::new(Mutex::new(tasks)); // start parsers let storage = Storage::open( &config .rocks_db_path_container .clone() .unwrap_or(DEFAULT_ROCKSDB_PATH.to_string()), + mutexed_tasks.clone(), ) .unwrap(); @@ -151,7 +154,7 @@ pub async fn main() -> Result<(), IngesterError> { let cloned_metrics = metrics_state.ingester_metrics.clone(); let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { backup_service.perform_backup(cloned_metrics, cloned_keep_running) })); @@ -176,7 +179,7 @@ pub async fn main() -> Result<(), IngesterError> { let cloned_mplx_parser = mplx_accs_parser.clone(); let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { cloned_mplx_parser .process_metadata_accs(cloned_keep_running) .await; @@ -185,7 +188,7 @@ pub async fn main() -> Result<(), IngesterError> { let cloned_token_parser = token_accs_parser.clone(); let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { cloned_token_parser .process_token_accs(cloned_keep_running) .await; @@ -194,7 +197,7 @@ pub async fn main() -> Result<(), IngesterError> { let cloned_token_parser = token_accs_parser.clone(); let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { cloned_token_parser .process_mint_accs(cloned_keep_running) .await; @@ -205,7 +208,7 @@ pub async fn main() -> Result<(), IngesterError> { let first_processed_slot_clone = first_processed_slot.clone(); let cloned_rocks_storage = rocks_storage.clone(); let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { while cloned_keep_running.load(Ordering::SeqCst) { let slot = cloned_rocks_storage .last_saved_slot() @@ -225,7 +228,7 @@ pub async fn main() -> Result<(), IngesterError> { let cloned_keep_running = keep_running.clone(); let cloned_rocks_storage = rocks_storage.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { match start_api( cloned_rocks_storage.clone(), cloned_keep_running, @@ -247,7 +250,7 @@ pub async fn main() -> Result<(), IngesterError> { ); let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { bubblegum_updates_processor.run(cloned_keep_running).await; })); @@ -258,7 +261,7 @@ pub async fn main() -> Result<(), IngesterError> { .await; let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { json_downloader.run(cloned_keep_running).await; })); @@ -271,7 +274,7 @@ pub async fn main() -> Result<(), IngesterError> { backfiller .start_backfill( - &mut tasks, + mutexed_tasks.clone(), keep_running.clone(), metrics_state.backfiller_metrics.clone(), ) @@ -300,7 +303,7 @@ pub async fn main() -> Result<(), IngesterError> { ); let cloned_keep_running = keep_running.clone(); - tasks.spawn(tokio::spawn(async move { + mutexed_tasks.lock().await.spawn(tokio::spawn(async move { while cloned_keep_running.load(Ordering::SeqCst) { let res = synchronizer .synchronize_asset_indexes(cloned_keep_running.clone()) @@ -318,7 +321,7 @@ pub async fn main() -> Result<(), IngesterError> { })); // --stop - graceful_stop(tasks, true, keep_running.clone()).await; + graceful_stop(mutexed_tasks, true, keep_running.clone()).await; Ok(()) } diff --git a/nft_ingester/src/bin/migrator/main.rs b/nft_ingester/src/bin/migrator/main.rs index f17df2d8..226aa7cb 100644 --- a/nft_ingester/src/bin/migrator/main.rs +++ b/nft_ingester/src/bin/migrator/main.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use log::error; use sqlx::{QueryBuilder, Row}; -use tokio::sync::Semaphore; +use tokio::sync::{Mutex, Semaphore}; +use tokio::task::JoinSet; use tokio::time::Instant; use nft_ingester::config::{setup_config, IngesterConfig}; @@ -21,6 +22,7 @@ pub async fn main() -> Result<(), IngesterError> { .rocks_db_path_container .clone() .unwrap_or("./my_rocksdb".to_string()), + Arc::new(Mutex::new(JoinSet::new())), ) .unwrap(); diff --git a/nft_ingester/src/init.rs b/nft_ingester/src/init.rs index 39d862a5..af2a73e0 100644 --- a/nft_ingester/src/init.rs +++ b/nft_ingester/src/init.rs @@ -2,10 +2,11 @@ use log::{error, info}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::signal; +use tokio::sync::Mutex; use tokio::task::{JoinError, JoinSet}; pub async fn graceful_stop( - mut tasks: JoinSet>, + tasks: Arc>>>, wait_all: bool, keep_running: Arc, ) { @@ -17,7 +18,7 @@ pub async fn graceful_stop( } keep_running.store(false, Ordering::SeqCst); - while let Some(task) = tasks.join_next().await { + while let Some(task) = tasks.lock().await.join_next().await { match task { Ok(_) => { if wait_all { diff --git a/nft_ingester/tests/gapfiller_tests.rs b/nft_ingester/tests/gapfiller_tests.rs index 5d15f239..8291fe41 100644 --- a/nft_ingester/tests/gapfiller_tests.rs +++ b/nft_ingester/tests/gapfiller_tests.rs @@ -5,6 +5,7 @@ use nft_ingester::gapfiller::process_asset_details_stream; use solana_sdk::pubkey::Pubkey; use std::sync::Arc; use tempfile::TempDir; +use tokio::{sync::Mutex, task::JoinSet}; use rocks_db::Storage; @@ -20,7 +21,11 @@ fn create_test_complete_asset_details(pubkey: Pubkey) -> CompleteAssetDetails { async fn test_process_asset_details_stream() { let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); let storage = Arc::new( - Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database"), + Storage::open( + temp_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + ) + .expect("Failed to create a database"), ); let first_key = Pubkey::new_unique(); diff --git a/rocks-db/Cargo.toml b/rocks-db/Cargo.toml index 6554e796..bad83168 100644 --- a/rocks-db/Cargo.toml +++ b/rocks-db/Cargo.toml @@ -28,9 +28,13 @@ serde_json = "1.0.81" mockall = "0.12.0" async-trait = "0.1.74" itertools = "0.12.0" +tokio-stream = "0.1.14" entities = { path = "../entities" } +interface = { path = "../interface" } [dev-dependencies] tempfile = "3.8.1" +rand = "0.8.5" + [features] integration_tests = [] \ No newline at end of file diff --git a/rocks-db/src/asset_streaming_client.rs b/rocks-db/src/asset_streaming_client.rs new file mode 100644 index 00000000..96a391c6 --- /dev/null +++ b/rocks-db/src/asset_streaming_client.rs @@ -0,0 +1,178 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use entities::models::{CompleteAssetDetails, Updated}; +use interface::{AssetDetailsStream, AssetDetailsStreamer, AsyncError}; +use rocksdb::DB; +use solana_sdk::pubkey::Pubkey; +use tokio_stream::wrappers::ReceiverStream; + +use crate::{ + asset::{AssetCollection, AssetLeaf, SlotAssetIdx}, + column::TypedColumn, + errors::StorageError, + AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage, +}; + +#[async_trait] +impl AssetDetailsStreamer for Storage { + async fn get_asset_details_stream_in_range( + &self, + start_slot: u64, + end_slot: u64, + ) -> Result { + let (tx, rx) = tokio::sync::mpsc::channel(32); + let backend = self.slot_asset_idx.backend.clone(); + let mut join_set = self.join_set.lock().await; + + join_set.spawn(tokio::spawn(async move { + let _ = process_asset_details_range(backend, start_slot, end_slot, tx.clone()).await; + })); + + Ok(Box::pin(ReceiverStream::new(rx)) as AssetDetailsStream) + } +} + +async fn process_asset_details_range( + backend: Arc, + start_slot: u64, + end_slot: u64, + tx: tokio::sync::mpsc::Sender>, +) -> Result<(), AsyncError> { + let slot_asset_idx = Storage::column::(backend.clone()); + let iterator = slot_asset_idx.iter((start_slot, solana_sdk::pubkey::Pubkey::default())); + + for pair in iterator { + let (idx_key, _) = pair.map_err(|e| Box::new(e) as AsyncError)?; + let (slot, pubkey) = + SlotAssetIdx::decode_key(idx_key.to_vec()).map_err(|e| Box::new(e) as AsyncError)?; + if slot > end_slot { + break; + } + + let details = get_complete_asset_details(backend.clone(), pubkey); + match details { + Err(e) => { + if tx.send(Err(Box::new(e) as AsyncError)).await.is_err() { + break; // Receiver is dropped + } + } + Ok(details) => { + if tx.send(Ok(details)).await.is_err() { + break; // Receiver is dropped + } + } + } + } + + Ok(()) +} + +fn get_complete_asset_details( + backend: Arc, + pubkey: Pubkey, +) -> crate::Result { + let static_data = Storage::column::(backend.clone()).get(pubkey)?; + let static_data = match static_data { + None => { + return Err(crate::errors::StorageError::Common( + "Asset static data not found".to_string(), + )); + } + Some(static_data) => static_data, + }; + + let dynamic_data = Storage::column::(backend.clone()).get(pubkey)?; + let dynamic_data = match dynamic_data { + None => { + return Err(crate::errors::StorageError::Common( + "Asset dynamic data not found".to_string(), + )); + } + Some(dynamic_data) => dynamic_data, + }; + let authority = Storage::column::(backend.clone()).get(pubkey)?; + let authority = match authority { + None => { + return Err(crate::errors::StorageError::Common( + "Asset authority not found".to_string(), + )); + } + Some(authority) => authority, + }; + let owner = Storage::column::(backend.clone()).get(pubkey)?; + let owner = match owner { + None => { + return Err(crate::errors::StorageError::Common( + "Asset owner not found".to_string(), + )); + } + Some(owner) => owner, + }; + + let leaves = Storage::column::(backend.clone()).get(pubkey)?; + let collection = Storage::column::(backend).get(pubkey)?; + + let onchain_data = match dynamic_data.onchain_data { + None => None, + Some(onchain_data) => { + let v = serde_json::from_str(&onchain_data.value) + .map_err(|e| StorageError::Common(e.to_string()))?; + Some(Updated::new(onchain_data.slot_updated, onchain_data.seq, v)) + } + }; + + Ok(CompleteAssetDetails { + pubkey: static_data.pubkey, + specification_asset_class: static_data.specification_asset_class, + royalty_target_type: static_data.royalty_target_type, + slot_created: static_data.created_at as u64, + is_compressible: dynamic_data.is_compressible, + is_compressed: dynamic_data.is_compressed, + is_frozen: dynamic_data.is_frozen, + supply: dynamic_data.supply, + seq: dynamic_data.seq, + is_burnt: dynamic_data.is_burnt, + was_decompressed: dynamic_data.was_decompressed, + onchain_data, + creators: dynamic_data.creators, + royalty_amount: dynamic_data.royalty_amount, + authority: Updated::new( + authority.slot_updated, + None, //todo: where do we get seq? + authority.authority, + ), + owner: owner.owner, + delegate: owner.delegate, + owner_type: owner.owner_type, + owner_delegate_seq: owner.owner_delegate_seq, + collection: collection.map(|collection| { + Updated::new( + collection.slot_updated, + None, //todo: where do we get seq? + entities::models::AssetCollection { + collection: collection.collection, + is_collection_verified: collection.is_collection_verified, + collection_seq: collection.collection_seq, + }, + ) + }), + leaves: leaves + .into_iter() + .map(|leaf| { + Updated::new( + leaf.slot_updated, + None, + entities::models::AssetLeaf { + leaf: leaf.leaf, + tree_id: leaf.tree_id, + nonce: leaf.nonce, + data_hash: leaf.data_hash, + creator_hash: leaf.creator_hash, + leaf_seq: leaf.leaf_seq, + }, + ) + }) + .collect(), + }) +} diff --git a/rocks-db/src/lib.rs b/rocks-db/src/lib.rs index e7c20e6f..446744c2 100644 --- a/rocks-db/src/lib.rs +++ b/rocks-db/src/lib.rs @@ -9,11 +9,14 @@ pub use asset::{ }; pub use column::columns; use column::{Column, TypedColumn}; +use tokio::sync::Mutex; +use tokio::task::JoinSet; use crate::errors::StorageError; pub mod asset; mod asset_client; +pub mod asset_streaming_client; pub mod backup_service; mod batch_client; pub mod bubblegum_slots; @@ -41,10 +44,14 @@ pub struct Storage { pub assets_update_idx: Column, pub slot_asset_idx: Column, assets_update_last_seq: AtomicU64, + join_set: Arc>>>, } impl Storage { - pub fn open(db_path: &str) -> Result { + pub fn open( + db_path: &str, + join_set: Arc>>>, + ) -> Result { let db = Arc::new(DB::open_cf_descriptors( &Self::get_db_options(), db_path, @@ -95,6 +102,7 @@ impl Storage { assets_update_idx, slot_asset_idx, assets_update_last_seq: AtomicU64::new(0), + join_set, }) } diff --git a/rocks-db/tests/asset_streaming_client_tests.rs b/rocks-db/tests/asset_streaming_client_tests.rs new file mode 100644 index 00000000..ed6fc8bb --- /dev/null +++ b/rocks-db/tests/asset_streaming_client_tests.rs @@ -0,0 +1,167 @@ +mod setup; +#[cfg(test)] +mod tests { + use std::{collections::HashSet, sync::Arc}; + + use interface::AssetDetailsStreamer; + use solana_sdk::pubkey::Pubkey; + use tempfile::TempDir; + + use rocks_db::Storage; + use tokio::{sync::Mutex, task::JoinSet}; + use tokio_stream::StreamExt; + + use crate::setup::setup::*; + + #[tokio::test] + async fn test_get_asset_details_stream_in_range_empty_db() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = Storage::open( + temp_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + ) + .expect("Failed to create a database"); + + // Call get_asset_details_stream_in_range on an empty database + let response = storage.get_asset_details_stream_in_range(100, 200).await; + + assert!(response.is_ok()); + let mut stream = response.unwrap(); + + // Check that the stream is empty + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_get_asset_details_stream_in_range_data_only_before_target() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = Storage::open( + temp_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + ) + .expect("Failed to create a database"); + let pk = Pubkey::new_unique(); + + storage.asset_updated(10, pk.clone()).unwrap(); + // Call get_asset_details_stream_in_range on a database + let response = storage.get_asset_details_stream_in_range(100, 200).await; + + assert!(response.is_ok()); + let mut stream = response.unwrap(); + + // Check that the stream is empty + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_get_asset_details_stream_in_range_data_only_after_target() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = Storage::open( + temp_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + ) + .expect("Failed to create a database"); + let pk = Pubkey::new_unique(); + + storage.asset_updated(1000, pk.clone()).unwrap(); + // Call get_asset_details_stream_in_range on a database + let response = storage.get_asset_details_stream_in_range(100, 200).await; + + assert!(response.is_ok()); + let mut stream = response.unwrap(); + + // Check that the stream is empty + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_get_asset_details_stream_in_range_data_missing_data() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = Storage::open( + temp_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + ) + .expect("Failed to create a database"); + let pk = Pubkey::new_unique(); + + storage.asset_updated(100, pk.clone()).unwrap(); + // Call get_asset_details_stream_in_range on a database + let response = storage.get_asset_details_stream_in_range(100, 200).await; + + assert!(response.is_ok()); + let mut stream = response.unwrap(); + + // Check that the stream contains an error + let first_resp = stream.next().await; + assert!(first_resp.is_some()); + let first_resp = first_resp.unwrap(); + assert!(first_resp.is_err()); + + // Check that the stream is closed + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn test_get_asset_details_stream_in_range_data() { + let cnt = 1000; + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = Storage::open( + temp_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + ) + .expect("Failed to create a database"); + let pks = (0..cnt).map(|_| Pubkey::new_unique()).collect::>(); + let slot = 100; + for pk in pks.iter() { + storage.asset_updated(slot, pk.clone()).unwrap(); + } + // generate 1000 units of data using generate_test_static_data,generate_test_authority,generate_test_owner and create_test_dynamic_data for a 1000 unique pubkeys + let static_data = pks + .iter() + .map(|pk| generate_test_static_data(pk.clone(), slot)) + .collect::>(); + let authority_data = pks + .iter() + .map(|pk| generate_test_authority(pk.clone())) + .collect::>(); + let owner_data = pks + .iter() + .map(|pk| generate_test_owner(pk.clone())) + .collect::>(); + let dynamic_data = pks + .iter() + .map(|pk| create_test_dynamic_data(pk.clone(), slot)) + .collect::>(); + // put everything in the database + for ((((pk, static_data), authority_data), owner_data), dynamic_data) in pks + .iter() + .zip(static_data.iter()) + .zip(authority_data.iter()) + .zip(owner_data.iter()) + .zip(dynamic_data.iter()) + { + storage + .asset_authority_data + .put(*pk, authority_data) + .unwrap(); + storage.asset_owner_data.put(*pk, owner_data).unwrap(); + storage.asset_static_data.put(*pk, static_data).unwrap(); + storage.asset_owner_data.put(*pk, owner_data).unwrap(); + storage.asset_dynamic_data.put(*pk, dynamic_data).unwrap(); + } + // Call get_asset_details_stream_in_range on a database + let response = storage.get_asset_details_stream_in_range(100, 200).await; + + assert!(response.is_ok()); + let mut stream = response.unwrap(); + + // Check that the stream contains all the data + let mut pk_set = HashSet::new(); + while let Some(resp) = stream.next().await { + let resp = resp.unwrap(); + pk_set.insert(resp.pubkey); + } + assert_eq!(pk_set.len(), cnt); + assert_eq!(pk_set, pks.into_iter().collect::>()); + } +} diff --git a/rocks-db/tests/batch_client_integration_tests.rs b/rocks-db/tests/batch_client_integration_tests.rs index b8fd7ddd..bac43acc 100644 --- a/rocks-db/tests/batch_client_integration_tests.rs +++ b/rocks-db/tests/batch_client_integration_tests.rs @@ -1,448 +1,425 @@ -use std::collections::HashSet; - -use entities::models::Updated; -use solana_sdk::pubkey::Pubkey; -use tempfile::TempDir; - -use rocks_db::key_encoders::encode_u64x2_pubkey; -use rocks_db::storage_traits::AssetUpdateIndexStorage; -use rocks_db::{AssetDynamicDetails, Storage}; - -struct TestEnvironment { - storage: Storage, - _temp_dir: TempDir, -} - -impl TestEnvironment { - fn new(temp_dir: TempDir, keys: &[(u64, Pubkey)]) -> Self { - let storage = - Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database"); - for &(slot, ref pubkey) in keys { - storage.asset_updated(slot, pubkey.clone()).unwrap(); - } +mod setup; +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::sync::Arc; + + use entities::models::Updated; + use solana_sdk::pubkey::Pubkey; + use tempfile::TempDir; + + use rocks_db::key_encoders::encode_u64x2_pubkey; + use rocks_db::storage_traits::AssetUpdateIndexStorage; + use rocks_db::{AssetDynamicDetails, Storage}; + use tokio::sync::Mutex; + use tokio::task::JoinSet; + + use crate::setup::setup::{ + create_test_dynamic_data, TestEnvironment, DEFAULT_PUBKEY_OF_ONES, PUBKEY_OF_TWOS, + }; - TestEnvironment { - storage, - _temp_dir: temp_dir, - } + #[test] + fn test_process_asset_updates_batch_empty_db() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = Storage::open( + temp_dir.path().to_str().unwrap(), + Arc::new(Mutex::new(JoinSet::new())), + ) + .expect("Failed to create a database"); + + // Call fetch_asset_updated_keys on an empty database + let (keys, last_key) = storage + .fetch_asset_updated_keys(None, None, 10, None) + .expect("Failed to fetch asset updated keys"); + // Assertions + assert!(keys.is_empty(), "Expected no keys from an empty database"); + assert!( + last_key.is_none(), + "Expected no last key from an empty database" + ); } -} - -const DEFAULT_PUBKEY_OF_ONES: Pubkey = Pubkey::new_from_array([1u8; 32]); -const PUBKEY_OF_TWOS: Pubkey = Pubkey::new_from_array([2u8; 32]); - -#[test] -fn test_process_asset_updates_batch_empty_db() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = - Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database"); - - // Call fetch_asset_updated_keys on an empty database - let (keys, last_key) = storage - .fetch_asset_updated_keys(None, None, 10, None) - .expect("Failed to fetch asset updated keys"); - // Assertions - assert!(keys.is_empty(), "Expected no keys from an empty database"); - assert!( - last_key.is_none(), - "Expected no last key from an empty database" - ); -} -#[test] -fn test_process_asset_updates_batch_with_same_key_records_iteration_order() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new( - temp_dir, - &[ - (4, DEFAULT_PUBKEY_OF_ONES.clone()), - (2, DEFAULT_PUBKEY_OF_ONES.clone()), - ], - ) - .storage; - // Verify iteration order - let mut iter = storage.assets_update_idx.iter_start(); - let first_key = iter.next().unwrap().unwrap().0; // Get the first key - let expected_key = encode_u64x2_pubkey(1, 4, DEFAULT_PUBKEY_OF_ONES.clone()); - assert_eq!( - first_key.as_ref(), - expected_key.as_slice(), - "The first key does not match the expected key" - ); - - let second_key = iter.next().unwrap().unwrap().0; // Get the second key - let expected_key = encode_u64x2_pubkey(2, 2, DEFAULT_PUBKEY_OF_ONES.clone()); - assert_eq!( - second_key.as_ref(), - expected_key.as_slice(), - "The second key does not match the expected key" - ); -} - -#[test] -fn test_process_asset_updates_batch_with_same_key_records() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new( - temp_dir, - &[ - (4, DEFAULT_PUBKEY_OF_ONES.clone()), - (2, DEFAULT_PUBKEY_OF_ONES.clone()), - ], - ) - .storage; - // Verify fetch_asset_updated_keys with None as last key - let (keys, last_key) = storage - .fetch_asset_updated_keys(None, None, 10, None) - .unwrap(); - assert_eq!(keys.len(), 1, "Expected a single key"); - assert_eq!( - keys.iter().next().unwrap(), - &DEFAULT_PUBKEY_OF_ONES, - "Expected the specific pubkey" - ); - assert!(last_key.is_some(), "Expected a last key"); - // Verify fetch_asset_updated_keys with the last key from previous call - let (new_keys, new_last_key) = storage - .fetch_asset_updated_keys(last_key, None, 10, None) - .unwrap(); - assert!( - new_keys.is_empty(), - "Expected no new keys, but found: {:?}", - new_keys - ); - assert_eq!(new_last_key, last_key, "Expected no new last key"); -} + #[test] + fn test_process_asset_updates_batch_with_same_key_records_iteration_order() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new( + temp_dir, + &[ + (4, DEFAULT_PUBKEY_OF_ONES.clone()), + (2, DEFAULT_PUBKEY_OF_ONES.clone()), + ], + ) + .storage; + // Verify iteration order + let mut iter = storage.assets_update_idx.iter_start(); + let first_key = iter.next().unwrap().unwrap().0; // Get the first key + let expected_key = encode_u64x2_pubkey(1, 4, DEFAULT_PUBKEY_OF_ONES.clone()); + assert_eq!( + first_key.as_ref(), + expected_key.as_slice(), + "The first key does not match the expected key" + ); -#[test] -fn test_fetch_asset_updated_keys_with_limit_and_skip() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new( - temp_dir, - &[ - (4, DEFAULT_PUBKEY_OF_ONES.clone()), - (2, DEFAULT_PUBKEY_OF_ONES.clone()), - ], - ) - .storage; - // Verify fetch_asset_updated_keys with None as last key - let (keys, last_key) = storage - .fetch_asset_updated_keys(None, None, 1, None) - .unwrap(); - assert_eq!(keys.len(), 1, "Expected a single key"); - assert_eq!( - keys.iter().next().unwrap(), - &DEFAULT_PUBKEY_OF_ONES, - "Expected the specific pubkey" - ); - assert!(last_key.is_some(), "Expected a last key"); - // Verify fetch_asset_updated_keys with the last key from previous call - let (new_keys, new_last_key) = storage - .fetch_asset_updated_keys(last_key, None, 1, Some(keys)) - .unwrap(); - assert!( - new_keys.is_empty(), - "Expected no new keys, but found: {:?}", - new_keys - ); - assert_ne!(new_last_key, last_key, "Expected a new last key"); -} + let second_key = iter.next().unwrap().unwrap().0; // Get the second key + let expected_key = encode_u64x2_pubkey(2, 2, DEFAULT_PUBKEY_OF_ONES.clone()); + assert_eq!( + second_key.as_ref(), + expected_key.as_slice(), + "The second key does not match the expected key" + ); + } -#[test] -fn test_fetch_asset_updated_keys_with_skip() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new( - temp_dir, - &[ - (4, DEFAULT_PUBKEY_OF_ONES.clone()), - (2, DEFAULT_PUBKEY_OF_ONES.clone()), - ], - ) - .storage; - // Verify fetch_asset_updated_keys with None as last key - let (keys, last_key) = storage - .fetch_asset_updated_keys( - None, - None, - 1, - Some(HashSet::from_iter(vec![DEFAULT_PUBKEY_OF_ONES.clone()])), + #[test] + fn test_process_asset_updates_batch_with_same_key_records() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new( + temp_dir, + &[ + (4, DEFAULT_PUBKEY_OF_ONES.clone()), + (2, DEFAULT_PUBKEY_OF_ONES.clone()), + ], ) - .unwrap(); - assert_eq!(keys.len(), 0, "Expected no keys"); - assert!(last_key.is_some(), "Expected a last key"); - let expected_key = (2, 2, DEFAULT_PUBKEY_OF_ONES.clone()); - assert_eq!( - last_key.unwrap(), - expected_key, - "Expected the specific last key" - ); -} + .storage; + // Verify fetch_asset_updated_keys with None as last key + let (keys, last_key) = storage + .fetch_asset_updated_keys(None, None, 10, None) + .unwrap(); + assert_eq!(keys.len(), 1, "Expected a single key"); + assert_eq!( + keys.iter().next().unwrap(), + &DEFAULT_PUBKEY_OF_ONES, + "Expected the specific pubkey" + ); + assert!(last_key.is_some(), "Expected a last key"); + // Verify fetch_asset_updated_keys with the last key from previous call + let (new_keys, new_last_key) = storage + .fetch_asset_updated_keys(last_key, None, 10, None) + .unwrap(); + assert!( + new_keys.is_empty(), + "Expected no new keys, but found: {:?}", + new_keys + ); + assert_eq!(new_last_key, last_key, "Expected no new last key"); + } -#[test] -fn test_up_to_filter() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new( - temp_dir, - &[ - (4, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 1 - (2, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 2 - (5, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 3 - (5, PUBKEY_OF_TWOS.clone()), // seq = 4 - ], - ) - .storage; + #[test] + fn test_fetch_asset_updated_keys_with_limit_and_skip() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new( + temp_dir, + &[ + (4, DEFAULT_PUBKEY_OF_ONES.clone()), + (2, DEFAULT_PUBKEY_OF_ONES.clone()), + ], + ) + .storage; + // Verify fetch_asset_updated_keys with None as last key + let (keys, last_key) = storage + .fetch_asset_updated_keys(None, None, 1, None) + .unwrap(); + assert_eq!(keys.len(), 1, "Expected a single key"); + assert_eq!( + keys.iter().next().unwrap(), + &DEFAULT_PUBKEY_OF_ONES, + "Expected the specific pubkey" + ); + assert!(last_key.is_some(), "Expected a last key"); + // Verify fetch_asset_updated_keys with the last key from previous call + let (new_keys, new_last_key) = storage + .fetch_asset_updated_keys(last_key, None, 1, Some(keys)) + .unwrap(); + assert!( + new_keys.is_empty(), + "Expected no new keys, but found: {:?}", + new_keys + ); + assert_ne!(new_last_key, last_key, "Expected a new last key"); + } - // Verify fetch_asset_updated_keys with up to key which is less then the first key - let (keys, last_key) = storage - .fetch_asset_updated_keys(None, Some((0, 2, DEFAULT_PUBKEY_OF_ONES.clone())), 10, None) - .unwrap(); - assert_eq!(keys.len(), 0, "Expected no keys"); - assert!(last_key.is_none(), "Expected an empty last key"); + #[test] + fn test_fetch_asset_updated_keys_with_skip() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new( + temp_dir, + &[ + (4, DEFAULT_PUBKEY_OF_ONES.clone()), + (2, DEFAULT_PUBKEY_OF_ONES.clone()), + ], + ) + .storage; + // Verify fetch_asset_updated_keys with None as last key + let (keys, last_key) = storage + .fetch_asset_updated_keys( + None, + None, + 1, + Some(HashSet::from_iter(vec![DEFAULT_PUBKEY_OF_ONES.clone()])), + ) + .unwrap(); + assert_eq!(keys.len(), 0, "Expected no keys"); + assert!(last_key.is_some(), "Expected a last key"); + let expected_key = (2, 2, DEFAULT_PUBKEY_OF_ONES.clone()); + assert_eq!( + last_key.unwrap(), + expected_key, + "Expected the specific last key" + ); + } - // verify fetch_asset_updated_keys with up to key which is equal to the first key - let (keys, last_key) = storage - .fetch_asset_updated_keys(None, Some((1, 4, DEFAULT_PUBKEY_OF_ONES.clone())), 10, None) - .unwrap(); - assert_eq!(keys.len(), 1, "Expected a single key"); - assert_eq!( - keys.iter().next().unwrap(), - &DEFAULT_PUBKEY_OF_ONES, - "Expected the specific pubkey" - ); - assert!(last_key.is_some(), "Expected a last key"); - let expected_key = (1, 4, DEFAULT_PUBKEY_OF_ONES.clone()); - assert_eq!( - last_key.unwrap(), - expected_key, - "Expected the specific last key {:?}, got {:?}", - expected_key, - last_key - ); + #[test] + fn test_up_to_filter() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new( + temp_dir, + &[ + (4, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 1 + (2, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 2 + (5, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 3 + (5, PUBKEY_OF_TWOS.clone()), // seq = 4 + ], + ) + .storage; - // verify fetch_asset_updated_keys with up to key which is equal to the last key returns all the keys - let (keys, last_key) = storage - .fetch_asset_updated_keys(None, Some((4, 5, PUBKEY_OF_TWOS.clone())), 10, None) - .unwrap(); - assert_eq!(keys.len(), 2, "Expected 2 keys, got {:?}", keys); - assert!( - keys.contains(&DEFAULT_PUBKEY_OF_ONES), - "Expected the specific pubkey" - ); - assert!( - keys.contains(&PUBKEY_OF_TWOS), - "Expected the specific pubkey" - ); - assert!(last_key.is_some(), "Expected a last key"); - let expected_key = (4, 5, PUBKEY_OF_TWOS.clone()); - assert_eq!( - last_key.unwrap(), - expected_key, - "Expected the specific last key {:?}, got {:?}", - expected_key, - last_key - ); -} + // Verify fetch_asset_updated_keys with up to key which is less then the first key + let (keys, last_key) = storage + .fetch_asset_updated_keys(None, Some((0, 2, DEFAULT_PUBKEY_OF_ONES.clone())), 10, None) + .unwrap(); + assert_eq!(keys.len(), 0, "Expected no keys"); + assert!(last_key.is_none(), "Expected an empty last key"); -#[test] -fn test_last_known_asset_updated_key_on_empty_db() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new(temp_dir, &[]).storage; - let last_key = storage - .last_known_asset_updated_key() - .expect("Failed to get last known asset updated key"); - assert!(last_key.is_none(), "Expected no last key"); -} + // verify fetch_asset_updated_keys with up to key which is equal to the first key + let (keys, last_key) = storage + .fetch_asset_updated_keys(None, Some((1, 4, DEFAULT_PUBKEY_OF_ONES.clone())), 10, None) + .unwrap(); + assert_eq!(keys.len(), 1, "Expected a single key"); + assert_eq!( + keys.iter().next().unwrap(), + &DEFAULT_PUBKEY_OF_ONES, + "Expected the specific pubkey" + ); + assert!(last_key.is_some(), "Expected a last key"); + let expected_key = (1, 4, DEFAULT_PUBKEY_OF_ONES.clone()); + assert_eq!( + last_key.unwrap(), + expected_key, + "Expected the specific last key {:?}, got {:?}", + expected_key, + last_key + ); -#[test] -fn test_last_known_asset_updated_key_on_non_empty_db() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new( - temp_dir, - &[ - (4, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 1 - (2, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 2 - (5, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 3 - (5, PUBKEY_OF_TWOS.clone()), // seq = 4 - ], - ) - .storage; - let last_key = storage - .last_known_asset_updated_key() - .expect("Failed to get last known asset updated key"); - assert!(last_key.is_some(), "Expected a last key"); - let expected_key = (4, 5, PUBKEY_OF_TWOS.clone()); - assert_eq!( - last_key.unwrap(), - expected_key, - "Expected the specific last key {:?}, got {:?}", - expected_key, - last_key - ); -} + // verify fetch_asset_updated_keys with up to key which is equal to the last key returns all the keys + let (keys, last_key) = storage + .fetch_asset_updated_keys(None, Some((4, 5, PUBKEY_OF_TWOS.clone())), 10, None) + .unwrap(); + assert_eq!(keys.len(), 2, "Expected 2 keys, got {:?}", keys); + assert!( + keys.contains(&DEFAULT_PUBKEY_OF_ONES), + "Expected the specific pubkey" + ); + assert!( + keys.contains(&PUBKEY_OF_TWOS), + "Expected the specific pubkey" + ); + assert!(last_key.is_some(), "Expected a last key"); + let expected_key = (4, 5, PUBKEY_OF_TWOS.clone()); + assert_eq!( + last_key.unwrap(), + expected_key, + "Expected the specific last key {:?}, got {:?}", + expected_key, + last_key + ); + } -#[test] -fn test_process_asset_updates_batch_iteration_results() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new( - temp_dir, - &[ - (4, DEFAULT_PUBKEY_OF_ONES.clone()), - (2, PUBKEY_OF_TWOS.clone()), - ], - ) - .storage; - let (keys, last_key) = storage - .fetch_asset_updated_keys(None, None, 10, None) - .unwrap(); + #[test] + fn test_last_known_asset_updated_key_on_empty_db() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new(temp_dir, &[]).storage; + let last_key = storage + .last_known_asset_updated_key() + .expect("Failed to get last known asset updated key"); + assert!(last_key.is_none(), "Expected no last key"); + } - assert_eq!(keys.len(), 2, "Expected 2 keys"); - assert!( - keys.contains(&DEFAULT_PUBKEY_OF_ONES), - "Expected the specific pubkey" - ); - assert!( - keys.contains(&PUBKEY_OF_TWOS), - "Expected the specific pubkey" - ); - assert!(last_key.is_some(), "Expected a last key"); - let expected_key = (2, 2, PUBKEY_OF_TWOS.clone()); - assert_eq!( - last_key.unwrap(), - expected_key, - "Expected the specific last key {:?}, got {:?}", - expected_key, - last_key - ); - let key = Pubkey::new_unique(); - storage.asset_updated(5, key.clone()).unwrap(); - let (keys, last_key) = storage - .fetch_asset_updated_keys(last_key, None, 10, None) - .unwrap(); + #[test] + fn test_last_known_asset_updated_key_on_non_empty_db() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new( + temp_dir, + &[ + (4, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 1 + (2, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 2 + (5, DEFAULT_PUBKEY_OF_ONES.clone()), // seq = 3 + (5, PUBKEY_OF_TWOS.clone()), // seq = 4 + ], + ) + .storage; + let last_key = storage + .last_known_asset_updated_key() + .expect("Failed to get last known asset updated key"); + assert!(last_key.is_some(), "Expected a last key"); + let expected_key = (4, 5, PUBKEY_OF_TWOS.clone()); + assert_eq!( + last_key.unwrap(), + expected_key, + "Expected the specific last key {:?}, got {:?}", + expected_key, + last_key + ); + } - assert_eq!(keys.len(), 1, "Expected 1 key"); - assert!( - keys.contains(&key), - "Expected the specific pubkey {:?}, got {:?}", - key, - keys - ); - assert!(last_key.is_some(), "Expected a last key"); - let expected_key = (3, 5, key.clone()); - assert_eq!( - last_key.unwrap(), - expected_key, - "Expected the specific last key {:?}, got {:?}", - expected_key, - last_key - ); + #[test] + fn test_process_asset_updates_batch_iteration_results() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new( + temp_dir, + &[ + (4, DEFAULT_PUBKEY_OF_ONES.clone()), + (2, PUBKEY_OF_TWOS.clone()), + ], + ) + .storage; + let (keys, last_key) = storage + .fetch_asset_updated_keys(None, None, 10, None) + .unwrap(); - // generate 10k more records and then batch read those - let mut keys = Vec::new(); - for i in 0..10000 { + assert_eq!(keys.len(), 2, "Expected 2 keys"); + assert!( + keys.contains(&DEFAULT_PUBKEY_OF_ONES), + "Expected the specific pubkey" + ); + assert!( + keys.contains(&PUBKEY_OF_TWOS), + "Expected the specific pubkey" + ); + assert!(last_key.is_some(), "Expected a last key"); + let expected_key = (2, 2, PUBKEY_OF_TWOS.clone()); + assert_eq!( + last_key.unwrap(), + expected_key, + "Expected the specific last key {:?}, got {:?}", + expected_key, + last_key + ); let key = Pubkey::new_unique(); - storage.asset_updated(i, key.clone()).unwrap(); - keys.push((4 + i, i, key.clone())); - } - let mut last_seen_key = last_key.clone(); - for i in 0..10 { - let (new_keys, last_key) = storage - .fetch_asset_updated_keys(last_seen_key, None, 1000, None) + storage.asset_updated(5, key.clone()).unwrap(); + let (keys, last_key) = storage + .fetch_asset_updated_keys(last_key, None, 10, None) .unwrap(); - assert_eq!(new_keys.len(), 1000, "Expected 1000 keys"); + + assert_eq!(keys.len(), 1, "Expected 1 key"); + assert!( + keys.contains(&key), + "Expected the specific pubkey {:?}, got {:?}", + key, + keys + ); assert!(last_key.is_some(), "Expected a last key"); - let expected_key = keys[i * 1000 + 999].clone(); + let expected_key = (3, 5, key.clone()); assert_eq!( last_key.unwrap(), expected_key, - "Expected the specific last key {:?}, got {:?} for {:?} iteration", + "Expected the specific last key {:?}, got {:?}", expected_key, - last_key, - i + last_key ); - for j in 0..1000 { - assert!( - new_keys.contains(&keys[i * 1000 + j].2), - "Expected the specific pubkey {:?}, got {:?}", - keys[i * 1000 + j].2, - new_keys + + // generate 10k more records and then batch read those + let mut keys = Vec::new(); + for i in 0..10000 { + let key = Pubkey::new_unique(); + storage.asset_updated(i, key.clone()).unwrap(); + keys.push((4 + i, i, key.clone())); + } + let mut last_seen_key = last_key.clone(); + for i in 0..10 { + let (new_keys, last_key) = storage + .fetch_asset_updated_keys(last_seen_key, None, 1000, None) + .unwrap(); + assert_eq!(new_keys.len(), 1000, "Expected 1000 keys"); + assert!(last_key.is_some(), "Expected a last key"); + let expected_key = keys[i * 1000 + 999].clone(); + assert_eq!( + last_key.unwrap(), + expected_key, + "Expected the specific last key {:?}, got {:?} for {:?} iteration", + expected_key, + last_key, + i ); + for j in 0..1000 { + assert!( + new_keys.contains(&keys[i * 1000 + j].2), + "Expected the specific pubkey {:?}, got {:?}", + keys[i * 1000 + j].2, + new_keys + ); + } + last_seen_key = last_key.clone(); } - last_seen_key = last_key.clone(); - } -} - -fn create_test_dynamic_data(pubkey: Pubkey, slot: u64) -> AssetDynamicDetails { - AssetDynamicDetails { - pubkey, - is_compressible: Updated::new(slot, None, false), - is_compressed: Updated::new(slot, None, false), - is_frozen: Updated::new(slot, None, false), - is_burnt: Updated::new(slot, None, false), - was_decompressed: Updated::new(slot, None, false), - creators: Updated::new(slot, None, Vec::new()), - royalty_amount: Updated::new(slot, None, 0), - ..Default::default() } -} -#[test] -fn test_multiple_slot_updates() { - let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); - let storage = TestEnvironment::new(temp_dir, &[]).storage; - let pk = Pubkey::new_unique(); - let dynamic_data = create_test_dynamic_data(pk, 0); + #[test] + fn test_multiple_slot_updates() { + let temp_dir = TempDir::new().expect("Failed to create a temporary directory"); + let storage = TestEnvironment::new(temp_dir, &[]).storage; + let pk = Pubkey::new_unique(); + let dynamic_data = create_test_dynamic_data(pk, 0); - storage - .asset_dynamic_data - .merge(dynamic_data.pubkey, &dynamic_data) - .unwrap(); - - let new_data = AssetDynamicDetails { - pubkey: pk, - is_compressible: Updated::new(10, None, true), - is_compressed: Updated::new(0, None, true), - supply: Some(Updated::new(0, None, 5)), - ..Default::default() - }; - storage - .asset_dynamic_data - .merge(dynamic_data.pubkey, &new_data) - .unwrap(); - - let selected_data = storage.asset_dynamic_data.get(pk).unwrap().unwrap(); - assert_eq!(selected_data.is_compressible, Updated::new(10, None, true)); - assert_eq!(selected_data.is_compressed, Updated::new(0, None, false)); // slot in new_data not greater than slot in start data, so that field must not change - assert_eq!(selected_data.supply, None); // slot in new_data not greater than slot in start data, so that field must not change + storage + .asset_dynamic_data + .merge(dynamic_data.pubkey, &dynamic_data) + .unwrap(); - let new_data = AssetDynamicDetails { - pubkey: pk, - is_compressible: Updated::new(5, None, false), - is_compressed: Updated::new(0, None, true), - supply: Some(Updated::new(3, None, 5)), - ..Default::default() - }; - storage - .asset_dynamic_data - .merge(dynamic_data.pubkey, &new_data) - .unwrap(); + let new_data = AssetDynamicDetails { + pubkey: pk, + is_compressible: Updated::new(10, None, true), + is_compressed: Updated::new(0, None, true), + supply: Some(Updated::new(0, None, 5)), + ..Default::default() + }; + storage + .asset_dynamic_data + .merge(dynamic_data.pubkey, &new_data) + .unwrap(); - let selected_data = storage.asset_dynamic_data.get(pk).unwrap().unwrap(); - assert_eq!(selected_data.is_compressible, Updated::new(10, None, true)); - assert_eq!(selected_data.is_compressed, Updated::new(0, None, false)); - assert_eq!(selected_data.supply, Some(Updated::new(3, None, 5))); + let selected_data = storage.asset_dynamic_data.get(pk).unwrap().unwrap(); + assert_eq!(selected_data.is_compressible, Updated::new(10, None, true)); + assert_eq!(selected_data.is_compressed, Updated::new(0, None, false)); // slot in new_data not greater than slot in start data, so that field must not change + assert_eq!(selected_data.supply, None); // slot in new_data not greater than slot in start data, so that field must not change + + let new_data = AssetDynamicDetails { + pubkey: pk, + is_compressible: Updated::new(5, None, false), + is_compressed: Updated::new(0, None, true), + supply: Some(Updated::new(3, None, 5)), + ..Default::default() + }; + storage + .asset_dynamic_data + .merge(dynamic_data.pubkey, &new_data) + .unwrap(); - let new_data = AssetDynamicDetails { - pubkey: pk, - is_compressible: Updated::new(5, Some(1), false), - ..Default::default() - }; - storage - .asset_dynamic_data - .merge(dynamic_data.pubkey, &new_data) - .unwrap(); + let selected_data = storage.asset_dynamic_data.get(pk).unwrap().unwrap(); + assert_eq!(selected_data.is_compressible, Updated::new(10, None, true)); + assert_eq!(selected_data.is_compressed, Updated::new(0, None, false)); + assert_eq!(selected_data.supply, Some(Updated::new(3, None, 5))); + + let new_data = AssetDynamicDetails { + pubkey: pk, + is_compressible: Updated::new(5, Some(1), false), + ..Default::default() + }; + storage + .asset_dynamic_data + .merge(dynamic_data.pubkey, &new_data) + .unwrap(); - let selected_data = storage.asset_dynamic_data.get(pk).unwrap().unwrap(); - assert_eq!( - selected_data.is_compressible, - Updated::new(5, Some(1), false) - ); + let selected_data = storage.asset_dynamic_data.get(pk).unwrap().unwrap(); + assert_eq!( + selected_data.is_compressible, + Updated::new(5, Some(1), false) + ); + } } diff --git a/rocks-db/tests/setup.rs b/rocks-db/tests/setup.rs new file mode 100644 index 00000000..b8f69bd9 --- /dev/null +++ b/rocks-db/tests/setup.rs @@ -0,0 +1,81 @@ +#[cfg(test)] +pub mod setup { + use std::sync::Arc; + + use entities::models::Updated; + use rand::Rng; + use solana_sdk::pubkey::Pubkey; + use tempfile::TempDir; + + use rocks_db::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage}; + use tokio::{sync::Mutex, task::JoinSet}; + + pub struct TestEnvironment { + pub storage: Storage, + _temp_dir: TempDir, + } + + impl TestEnvironment { + pub fn new(temp_dir: TempDir, keys: &[(u64, Pubkey)]) -> Self { + let join_set = Arc::new(Mutex::new(JoinSet::new())); + let storage = Storage::open(temp_dir.path().to_str().unwrap(), join_set) + .expect("Failed to create a database"); + for &(slot, ref pubkey) in keys { + storage.asset_updated(slot, pubkey.clone()).unwrap(); + } + + TestEnvironment { + storage, + _temp_dir: temp_dir, + } + } + } + + pub const DEFAULT_PUBKEY_OF_ONES: Pubkey = Pubkey::new_from_array([1u8; 32]); + pub const PUBKEY_OF_TWOS: Pubkey = Pubkey::new_from_array([2u8; 32]); + + pub fn create_test_dynamic_data(pubkey: Pubkey, slot: u64) -> AssetDynamicDetails { + AssetDynamicDetails { + pubkey, + is_compressible: Updated::new(slot, None, false), + is_compressed: Updated::new(slot, None, false), + is_frozen: Updated::new(slot, None, false), + is_burnt: Updated::new(slot, None, false), + was_decompressed: Updated::new(slot, None, false), + creators: Updated::new(slot, None, Vec::new()), + royalty_amount: Updated::new(slot, None, 0), + ..Default::default() + } + } + + pub fn generate_test_static_data(pubkey: Pubkey, slot: u64) -> AssetStaticDetails { + AssetStaticDetails { + pubkey, + created_at: slot as i64, + specification_asset_class: entities::enums::SpecificationAssetClass::IdentityNft, + royalty_target_type: entities::enums::RoyaltyTargetType::Creators, + } + } + + pub fn generate_test_authority(pubkey: Pubkey) -> AssetAuthority { + AssetAuthority { + pubkey, + authority: Pubkey::new_unique(), + slot_updated: rand::thread_rng().gen_range(0..100), + } + } + + pub fn generate_test_owner(pubkey: Pubkey) -> AssetOwner { + AssetOwner { + pubkey, + owner: generate_test_updated(Pubkey::new_unique()), + owner_type: generate_test_updated(entities::enums::OwnerType::Single), + owner_delegate_seq: Some(generate_test_updated(rand::thread_rng().gen_range(0..100))), + delegate: Some(generate_test_updated(Pubkey::new_unique())), + } + } + + fn generate_test_updated(v: T) -> Updated { + Updated::new(rand::thread_rng().gen_range(0..100), None, v) + } +}