Skip to content

Commit

Permalink
ensuring the graceful stop awaits for the grpc streaming requests
Browse files Browse the repository at this point in the history
  • Loading branch information
StanChe committed Dec 26, 2023
1 parent 74e08f2 commit 3f4da7c
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 39 deletions.
7 changes: 4 additions & 3 deletions nft_ingester/src/bin/ingester/backfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use std::num::ParseIntError;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::{JoinError, JoinSet};
use tokio::time::Duration;

Expand Down Expand Up @@ -78,7 +79,7 @@ impl Backfiller {

pub async fn start_backfill(
&self,
tasks: &mut JoinSet<Result<(), JoinError>>,
tasks: Arc<Mutex<JoinSet<core::result::Result<(), JoinError>>>>,
keep_running: Arc<AtomicBool>,
metrics: Arc<BackfillerMetricsConfig>,
) -> Result<(), IngesterError> {
Expand All @@ -94,7 +95,7 @@ impl Backfiller {
.await;

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
tasks.lock().await.spawn(tokio::spawn(async move {
info!("Running slots parser...");

slots_collector.collect_slots(cloned_keep_running).await;
Expand All @@ -109,7 +110,7 @@ impl Backfiller {
.await;

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
tasks.lock().await.spawn(tokio::spawn(async move {
info!("Running transactions parser...");

transactions_parser
Expand Down
25 changes: 14 additions & 11 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use clap::Parser;
use log::{error, info};
use tokio::sync::Mutex;
use tokio::task::JoinSet;

use metrics_utils::utils::setup_metrics;
Expand Down Expand Up @@ -130,12 +131,14 @@ pub async fn main() -> Result<(), IngesterError> {
}
}));

let mutexed_tasks = Arc::new(Mutex::new(tasks));
// start parsers
let storage = Storage::open(
&config
.rocks_db_path_container
.clone()
.unwrap_or(DEFAULT_ROCKSDB_PATH.to_string()),
mutexed_tasks.clone(),
)
.unwrap();

Expand All @@ -151,7 +154,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_metrics = metrics_state.ingester_metrics.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
backup_service.perform_backup(cloned_metrics, cloned_keep_running)
}));

Expand All @@ -176,7 +179,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_mplx_parser = mplx_accs_parser.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
cloned_mplx_parser
.process_metadata_accs(cloned_keep_running)
.await;
Expand All @@ -185,7 +188,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_token_parser = token_accs_parser.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
cloned_token_parser
.process_token_accs(cloned_keep_running)
.await;
Expand All @@ -194,7 +197,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_token_parser = token_accs_parser.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
cloned_token_parser
.process_mint_accs(cloned_keep_running)
.await;
Expand All @@ -205,7 +208,7 @@ pub async fn main() -> Result<(), IngesterError> {
let first_processed_slot_clone = first_processed_slot.clone();
let cloned_rocks_storage = rocks_storage.clone();
let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
while cloned_keep_running.load(Ordering::SeqCst) {
let slot = cloned_rocks_storage
.last_saved_slot()
Expand All @@ -225,7 +228,7 @@ pub async fn main() -> Result<(), IngesterError> {

let cloned_keep_running = keep_running.clone();
let cloned_rocks_storage = rocks_storage.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
match start_api(
cloned_rocks_storage.clone(),
cloned_keep_running,
Expand All @@ -247,7 +250,7 @@ pub async fn main() -> Result<(), IngesterError> {
);

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
bubblegum_updates_processor.run(cloned_keep_running).await;
}));

Expand All @@ -258,7 +261,7 @@ pub async fn main() -> Result<(), IngesterError> {
.await;

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
json_downloader.run(cloned_keep_running).await;
}));

Expand All @@ -271,7 +274,7 @@ pub async fn main() -> Result<(), IngesterError> {

backfiller
.start_backfill(
&mut tasks,
mutexed_tasks.clone(),
keep_running.clone(),
metrics_state.backfiller_metrics.clone(),
)
Expand Down Expand Up @@ -300,7 +303,7 @@ pub async fn main() -> Result<(), IngesterError> {
);

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
while cloned_keep_running.load(Ordering::SeqCst) {
let res = synchronizer
.synchronize_asset_indexes(cloned_keep_running.clone())
Expand All @@ -318,7 +321,7 @@ pub async fn main() -> Result<(), IngesterError> {
}));

// --stop
graceful_stop(tasks, true, keep_running.clone()).await;
graceful_stop(mutexed_tasks, true, keep_running.clone()).await;

Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion nft_ingester/src/bin/migrator/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::sync::Arc;

use log::error;
use sqlx::{QueryBuilder, Row};
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Instant;

use nft_ingester::config::{setup_config, IngesterConfig};
Expand All @@ -21,6 +22,7 @@ pub async fn main() -> Result<(), IngesterError> {
.rocks_db_path_container
.clone()
.unwrap_or("./my_rocksdb".to_string()),
Arc::new(Mutex::new(JoinSet::new())),
)
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions nft_ingester/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use log::{error, info};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::signal;
use tokio::sync::Mutex;
use tokio::task::{JoinError, JoinSet};

pub async fn graceful_stop(
mut tasks: JoinSet<Result<(), JoinError>>,
tasks: Arc<Mutex<JoinSet<core::result::Result<(), JoinError>>>>,
wait_all: bool,
keep_running: Arc<AtomicBool>,
) {
Expand All @@ -17,7 +18,7 @@ pub async fn graceful_stop(
}
keep_running.store(false, Ordering::SeqCst);

while let Some(task) = tasks.join_next().await {
while let Some(task) = tasks.lock().await.join_next().await {
match task {
Ok(_) => {
if wait_all {
Expand Down
11 changes: 4 additions & 7 deletions rocks-db/src/asset_streaming_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ impl AssetDetailsStreamer for Storage {
) -> Result<AssetDetailsStream, AsyncError> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let backend = self.slot_asset_idx.backend.clone();
let mut join_set = self.join_set.lock().await;

tokio::spawn(async move {
if let Err(e) =
process_asset_details_range(backend, start_slot, end_slot, tx.clone()).await
{
let _ = tx.send(Err(e)).await;
}
});
join_set.spawn(tokio::spawn(async move {
let _ = process_asset_details_range(backend, start_slot, end_slot, tx.clone()).await;
}));

Ok(Box::pin(ReceiverStream::new(rx)) as AssetDetailsStream)
}
Expand Down
9 changes: 8 additions & 1 deletion rocks-db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub use asset::{
};
pub use column::columns;
use column::{Column, TypedColumn};
use tokio::sync::Mutex;
use tokio::task::JoinSet;

use crate::errors::StorageError;

Expand Down Expand Up @@ -42,10 +44,14 @@ pub struct Storage {
pub assets_update_idx: Column<AssetsUpdateIdx>,
pub slot_asset_idx: Column<SlotAssetIdx>,
assets_update_last_seq: AtomicU64,
join_set: Arc<Mutex<JoinSet<core::result::Result<(), tokio::task::JoinError>>>>,
}

impl Storage {
pub fn open(db_path: &str) -> Result<Self> {
pub fn open(
db_path: &str,
join_set: Arc<Mutex<JoinSet<core::result::Result<(), tokio::task::JoinError>>>>,
) -> Result<Self> {
let db = Arc::new(DB::open_cf_descriptors(
&Self::get_db_options(),
db_path,
Expand Down Expand Up @@ -96,6 +102,7 @@ impl Storage {
assets_update_idx,
slot_asset_idx,
assets_update_last_seq: AtomicU64::new(0),
join_set,
})
}

Expand Down
38 changes: 27 additions & 11 deletions rocks-db/tests/asset_streaming_client_tests.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
mod setup;
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::{collections::HashSet, sync::Arc};

use interface::AssetDetailsStreamer;
use solana_sdk::pubkey::Pubkey;
use tempfile::TempDir;

use rocks_db::Storage;
use tokio::{sync::Mutex, task::JoinSet};
use tokio_stream::StreamExt;

use crate::setup::setup::*;

#[tokio::test]
async fn test_get_asset_details_stream_in_range_empty_db() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let storage = Storage::open(
temp_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
)
.expect("Failed to create a database");

// Call get_asset_details_stream_in_range on an empty database
let response = storage.get_asset_details_stream_in_range(100, 200).await;
Expand All @@ -31,8 +35,11 @@ mod tests {
#[tokio::test]
async fn test_get_asset_details_stream_in_range_data_only_before_target() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let storage = Storage::open(
temp_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
)
.expect("Failed to create a database");
let pk = Pubkey::new_unique();

storage.asset_updated(10, pk.clone()).unwrap();
Expand All @@ -49,8 +56,11 @@ mod tests {
#[tokio::test]
async fn test_get_asset_details_stream_in_range_data_only_after_target() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let storage = Storage::open(
temp_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
)
.expect("Failed to create a database");
let pk = Pubkey::new_unique();

storage.asset_updated(1000, pk.clone()).unwrap();
Expand All @@ -67,8 +77,11 @@ mod tests {
#[tokio::test]
async fn test_get_asset_details_stream_in_range_data_missing_data() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let storage = Storage::open(
temp_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
)
.expect("Failed to create a database");
let pk = Pubkey::new_unique();

storage.asset_updated(100, pk.clone()).unwrap();
Expand All @@ -92,8 +105,11 @@ mod tests {
async fn test_get_asset_details_stream_in_range_data() {
let cnt = 1000;
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let storage = Storage::open(
temp_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
)
.expect("Failed to create a database");
let pks = (0..cnt).map(|_| Pubkey::new_unique()).collect::<Vec<_>>();
let mut slot = 100;
for pk in pks.iter() {
Expand Down
10 changes: 8 additions & 2 deletions rocks-db/tests/batch_client_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod setup;
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use entities::models::Updated;
use solana_sdk::pubkey::Pubkey;
Expand All @@ -10,6 +11,8 @@ mod tests {
use rocks_db::key_encoders::encode_u64x2_pubkey;
use rocks_db::storage_traits::AssetUpdateIndexStorage;
use rocks_db::{AssetDynamicDetails, Storage};
use tokio::sync::Mutex;
use tokio::task::JoinSet;

use crate::setup::setup::{
create_test_dynamic_data, TestEnvironment, DEFAULT_PUBKEY_OF_ONES, PUBKEY_OF_TWOS,
Expand All @@ -18,8 +21,11 @@ mod tests {
#[test]
fn test_process_asset_updates_batch_empty_db() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage =
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database");
let storage = Storage::open(
temp_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
)
.expect("Failed to create a database");

// Call fetch_asset_updated_keys on an empty database
let (keys, last_key) = storage
Expand Down
6 changes: 5 additions & 1 deletion rocks-db/tests/setup.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#[cfg(test)]
pub mod setup {
use std::sync::Arc;

use entities::models::Updated;
use rand::Rng;
use solana_sdk::pubkey::Pubkey;
use tempfile::TempDir;

use rocks_db::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage};
use tokio::{sync::Mutex, task::JoinSet};

pub struct TestEnvironment {
pub storage: Storage,
Expand All @@ -14,7 +17,8 @@ pub mod setup {

impl TestEnvironment {
pub fn new(temp_dir: TempDir, keys: &[(u64, Pubkey)]) -> Self {
let storage = Storage::open(temp_dir.path().to_str().unwrap())
let join_set = Arc::new(Mutex::new(JoinSet::new()));
let storage = Storage::open(temp_dir.path().to_str().unwrap(), join_set)
.expect("Failed to create a database");
for &(slot, ref pubkey) in keys {
storage.asset_updated(slot, pubkey.clone()).unwrap();
Expand Down

0 comments on commit 3f4da7c

Please sign in to comment.