Skip to content

Commit

Permalink
Merge pull request #19 from metaplex-foundation/feature/rocks-gap-fil…
Browse files Browse the repository at this point in the history
…ler-server

[MET-44]rocks gap filler server
  • Loading branch information
StanChe authored Dec 27, 2023
2 parents 3be9def + 663a5ab commit 2fccc00
Show file tree
Hide file tree
Showing 13 changed files with 870 additions and 434 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
~/.cargo/registry/
~/.cargo/git/
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
key: ${{ runner.os }}-cargo-utility-chain

- name: Install dependencies
run: sudo apt-get update && sudo apt-get install -y protobuf-compiler
Expand All @@ -38,6 +38,12 @@ jobs:
command: fmt
args: --all -- --check

- name: Clean up 2 workspaces with mock dependencies that may fail intermittently
uses: actions-rs/cargo@v1
with:
command: clean
args: -p rocks-db -p postgre-client -p interface

- name: Lint with Clippy
uses: actions-rs/clippy-check@v1
with:
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions nft_ingester/src/bin/ingester/backfiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashSet;
use std::num::ParseIntError;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::{JoinError, JoinSet};
use tokio::time::Duration;

Expand Down Expand Up @@ -78,7 +79,7 @@ impl Backfiller {

pub async fn start_backfill(
&self,
tasks: &mut JoinSet<Result<(), JoinError>>,
tasks: Arc<Mutex<JoinSet<core::result::Result<(), JoinError>>>>,
keep_running: Arc<AtomicBool>,
metrics: Arc<BackfillerMetricsConfig>,
) -> Result<(), IngesterError> {
Expand All @@ -94,7 +95,7 @@ impl Backfiller {
.await;

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
tasks.lock().await.spawn(tokio::spawn(async move {
info!("Running slots parser...");

slots_collector.collect_slots(cloned_keep_running).await;
Expand All @@ -109,7 +110,7 @@ impl Backfiller {
.await;

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
tasks.lock().await.spawn(tokio::spawn(async move {
info!("Running transactions parser...");

transactions_parser
Expand Down
25 changes: 14 additions & 11 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use clap::Parser;
use log::{error, info};
use tokio::sync::Mutex;
use tokio::task::JoinSet;

use metrics_utils::utils::setup_metrics;
Expand Down Expand Up @@ -130,12 +131,14 @@ pub async fn main() -> Result<(), IngesterError> {
}
}));

let mutexed_tasks = Arc::new(Mutex::new(tasks));
// start parsers
let storage = Storage::open(
&config
.rocks_db_path_container
.clone()
.unwrap_or(DEFAULT_ROCKSDB_PATH.to_string()),
mutexed_tasks.clone(),
)
.unwrap();

Expand All @@ -151,7 +154,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_metrics = metrics_state.ingester_metrics.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
backup_service.perform_backup(cloned_metrics, cloned_keep_running)
}));

Expand All @@ -176,7 +179,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_mplx_parser = mplx_accs_parser.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
cloned_mplx_parser
.process_metadata_accs(cloned_keep_running)
.await;
Expand All @@ -185,7 +188,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_token_parser = token_accs_parser.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
cloned_token_parser
.process_token_accs(cloned_keep_running)
.await;
Expand All @@ -194,7 +197,7 @@ pub async fn main() -> Result<(), IngesterError> {
let cloned_token_parser = token_accs_parser.clone();

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
cloned_token_parser
.process_mint_accs(cloned_keep_running)
.await;
Expand All @@ -205,7 +208,7 @@ pub async fn main() -> Result<(), IngesterError> {
let first_processed_slot_clone = first_processed_slot.clone();
let cloned_rocks_storage = rocks_storage.clone();
let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
while cloned_keep_running.load(Ordering::SeqCst) {
let slot = cloned_rocks_storage
.last_saved_slot()
Expand All @@ -225,7 +228,7 @@ pub async fn main() -> Result<(), IngesterError> {

let cloned_keep_running = keep_running.clone();
let cloned_rocks_storage = rocks_storage.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
match start_api(
cloned_rocks_storage.clone(),
cloned_keep_running,
Expand All @@ -247,7 +250,7 @@ pub async fn main() -> Result<(), IngesterError> {
);

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
bubblegum_updates_processor.run(cloned_keep_running).await;
}));

Expand All @@ -258,7 +261,7 @@ pub async fn main() -> Result<(), IngesterError> {
.await;

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
json_downloader.run(cloned_keep_running).await;
}));

Expand All @@ -271,7 +274,7 @@ pub async fn main() -> Result<(), IngesterError> {

backfiller
.start_backfill(
&mut tasks,
mutexed_tasks.clone(),
keep_running.clone(),
metrics_state.backfiller_metrics.clone(),
)
Expand Down Expand Up @@ -300,7 +303,7 @@ pub async fn main() -> Result<(), IngesterError> {
);

let cloned_keep_running = keep_running.clone();
tasks.spawn(tokio::spawn(async move {
mutexed_tasks.lock().await.spawn(tokio::spawn(async move {
while cloned_keep_running.load(Ordering::SeqCst) {
let res = synchronizer
.synchronize_asset_indexes(cloned_keep_running.clone())
Expand All @@ -318,7 +321,7 @@ pub async fn main() -> Result<(), IngesterError> {
}));

// --stop
graceful_stop(tasks, true, keep_running.clone()).await;
graceful_stop(mutexed_tasks, true, keep_running.clone()).await;

Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion nft_ingester/src/bin/migrator/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::sync::Arc;

use log::error;
use sqlx::{QueryBuilder, Row};
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, Semaphore};
use tokio::task::JoinSet;
use tokio::time::Instant;

use nft_ingester::config::{setup_config, IngesterConfig};
Expand All @@ -21,6 +22,7 @@ pub async fn main() -> Result<(), IngesterError> {
.rocks_db_path_container
.clone()
.unwrap_or("./my_rocksdb".to_string()),
Arc::new(Mutex::new(JoinSet::new())),
)
.unwrap();

Expand Down
5 changes: 3 additions & 2 deletions nft_ingester/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use log::{error, info};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::signal;
use tokio::sync::Mutex;
use tokio::task::{JoinError, JoinSet};

pub async fn graceful_stop(
mut tasks: JoinSet<Result<(), JoinError>>,
tasks: Arc<Mutex<JoinSet<core::result::Result<(), JoinError>>>>,
wait_all: bool,
keep_running: Arc<AtomicBool>,
) {
Expand All @@ -17,7 +18,7 @@ pub async fn graceful_stop(
}
keep_running.store(false, Ordering::SeqCst);

while let Some(task) = tasks.join_next().await {
while let Some(task) = tasks.lock().await.join_next().await {
match task {
Ok(_) => {
if wait_all {
Expand Down
7 changes: 6 additions & 1 deletion nft_ingester/tests/gapfiller_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use nft_ingester::gapfiller::process_asset_details_stream;
use solana_sdk::pubkey::Pubkey;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::{sync::Mutex, task::JoinSet};

use rocks_db::Storage;

Expand All @@ -20,7 +21,11 @@ fn create_test_complete_asset_details(pubkey: Pubkey) -> CompleteAssetDetails {
async fn test_process_asset_details_stream() {
let temp_dir = TempDir::new().expect("Failed to create a temporary directory");
let storage = Arc::new(
Storage::open(temp_dir.path().to_str().unwrap()).expect("Failed to create a database"),
Storage::open(
temp_dir.path().to_str().unwrap(),
Arc::new(Mutex::new(JoinSet::new())),
)
.expect("Failed to create a database"),
);

let first_key = Pubkey::new_unique();
Expand Down
4 changes: 4 additions & 0 deletions rocks-db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ serde_json = "1.0.81"
mockall = "0.12.0"
async-trait = "0.1.74"
itertools = "0.12.0"
tokio-stream = "0.1.14"
entities = { path = "../entities" }
interface = { path = "../interface" }

[dev-dependencies]
tempfile = "3.8.1"
rand = "0.8.5"

[features]
integration_tests = []
Loading

0 comments on commit 2fccc00

Please sign in to comment.