Skip to content

Commit

Permalink
Separate timeouts for two sub-synchronizers
Browse files Browse the repository at this point in the history
  • Loading branch information
kstepanovdev committed Nov 15, 2024
1 parent 20a4572 commit a39c1bc
Showing 1 changed file with 42 additions and 37 deletions.
79 changes: 42 additions & 37 deletions nft_ingester/src/bin/synchronizer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,52 +152,57 @@ pub async fn main() -> Result<(), IngesterError> {
}
while let Some(_) = mutexed_tasks.lock().await.join_next().await {}

while shutdown_rx.is_empty() {
if let Err(e) = rocks_storage.db.try_catch_up_with_primary() {
tracing::error!("Sync rocksdb error: {}", e);
let shutdown_rx_clone = shutdown_rx.resubscribe();
let synchronizer_clone = synchronizer.clone();
mutexed_tasks.lock().await.spawn(async move {
let shutdown_rx = shutdown_rx_clone.resubscribe();
let synchronizer = synchronizer_clone.clone();
while shutdown_rx.is_empty() {
let result = synchronizer
.synchronize_non_fungible_asset_indexes(&shutdown_rx, config.dump_sync_threshold)
.await;

match result {
Ok(_) => {
tracing::info!("Non Fungible Synchronization finished successfully",)
}
Err(e) => tracing::error!("Non Fungible Synchronization failed: {:?}", e),
}

tokio::time::sleep(tokio::time::Duration::from_secs(
config.timeout_between_syncs_sec,
))
.await;
}

for asset_type in asset_types.into_iter() {
let synchronizer = synchronizer.clone();
let shutdown_rx = shutdown_rx.resubscribe();

mutexed_tasks.lock().await.spawn(async move {
let result = match asset_type {
AssetType::NonFungible => {
synchronizer
.synchronize_non_fungible_asset_indexes(
&shutdown_rx,
config.dump_sync_threshold,
)
.await
}
AssetType::Fungible => {
synchronizer
.synchronize_fungible_asset_indexes(
&shutdown_rx,
config.dump_sync_threshold,
)
.await
}
};
Ok(())
});

match result {
Ok(_) => {
tracing::info!("{:?} Synchronization finished successfully", asset_type)
}
Err(e) => tracing::error!("{:?} Synchronization failed: {:?}", asset_type, e),
mutexed_tasks.lock().await.spawn(async move {
let shutdown_rx = shutdown_rx.resubscribe();
let synchronizer = synchronizer.clone();
while shutdown_rx.is_empty() {
let result = synchronizer
.synchronize_non_fungible_asset_indexes(&shutdown_rx, config.dump_sync_threshold)
.await;

match result {
Ok(_) => {
tracing::info!("Fungible Synchronization finished successfully",)
}

Ok(())
});

while let Some(_) = mutexed_tasks.lock().await.join_next().await {}
Err(e) => tracing::error!("Fungible Synchronization failed: {:?}", e),
}

tokio::time::sleep(tokio::time::Duration::from_secs(
config.timeout_between_syncs_sec,
))
.await;
}
}

Ok(())
});

while let Some(_) = mutexed_tasks.lock().await.join_next().await {}

Ok(())
}

0 comments on commit a39c1bc

Please sign in to comment.