From a39c1bc8875251c0761685b1c4f0d032edcd97dc Mon Sep 17 00:00:00 2001 From: Kyrylo Stepanov Date: Fri, 15 Nov 2024 17:40:42 +0200 Subject: [PATCH] Separate timeouts for two sub-synchronizers --- nft_ingester/src/bin/synchronizer/main.rs | 79 ++++++++++++----------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/nft_ingester/src/bin/synchronizer/main.rs b/nft_ingester/src/bin/synchronizer/main.rs index 4c2d92b1..7e679b3e 100644 --- a/nft_ingester/src/bin/synchronizer/main.rs +++ b/nft_ingester/src/bin/synchronizer/main.rs @@ -152,52 +152,57 @@ pub async fn main() -> Result<(), IngesterError> { } while let Some(_) = mutexed_tasks.lock().await.join_next().await {} - while shutdown_rx.is_empty() { - if let Err(e) = rocks_storage.db.try_catch_up_with_primary() { - tracing::error!("Sync rocksdb error: {}", e); + let shutdown_rx_clone = shutdown_rx.resubscribe(); + let synchronizer_clone = synchronizer.clone(); + mutexed_tasks.lock().await.spawn(async move { + let shutdown_rx = shutdown_rx_clone.resubscribe(); + let synchronizer = synchronizer_clone.clone(); + while shutdown_rx.is_empty() { + let result = synchronizer + .synchronize_non_fungible_asset_indexes(&shutdown_rx, config.dump_sync_threshold) + .await; + + match result { + Ok(_) => { + tracing::info!("Non Fungible Synchronization finished successfully",) + } + Err(e) => tracing::error!("Non Fungible Synchronization failed: {:?}", e), + } + + tokio::time::sleep(tokio::time::Duration::from_secs( + config.timeout_between_syncs_sec, + )) + .await; } - for asset_type in asset_types.into_iter() { - let synchronizer = synchronizer.clone(); - let shutdown_rx = shutdown_rx.resubscribe(); - - mutexed_tasks.lock().await.spawn(async move { - let result = match asset_type { - AssetType::NonFungible => { - synchronizer - .synchronize_non_fungible_asset_indexes( - &shutdown_rx, - config.dump_sync_threshold, - ) - .await - } - AssetType::Fungible => { - synchronizer - .synchronize_fungible_asset_indexes( - &shutdown_rx, - config.dump_sync_threshold, - ) - .await - } - }; + Ok(()) + }); - match result { - Ok(_) => { - tracing::info!("{:?} Synchronization finished successfully", asset_type) - } - Err(e) => tracing::error!("{:?} Synchronization failed: {:?}", asset_type, e), + mutexed_tasks.lock().await.spawn(async move { + let shutdown_rx = shutdown_rx.resubscribe(); + let synchronizer = synchronizer.clone(); + while shutdown_rx.is_empty() { + let result = synchronizer + .synchronize_non_fungible_asset_indexes(&shutdown_rx, config.dump_sync_threshold) + .await; + + match result { + Ok(_) => { + tracing::info!("Fungible Synchronization finished successfully",) } - - Ok(()) - }); - - while let Some(_) = mutexed_tasks.lock().await.join_next().await {} + Err(e) => tracing::error!("Fungible Synchronization failed: {:?}", e), + } tokio::time::sleep(tokio::time::Duration::from_secs( config.timeout_between_syncs_sec, )) .await; } - } + + Ok(()) + }); + + while let Some(_) = mutexed_tasks.lock().await.join_next().await {} + Ok(()) }