From eb8406441b410e41ac254f708b26f547939608b2 Mon Sep 17 00:00:00 2001 From: linuskendall Date: Mon, 27 Nov 2023 14:02:14 +0000 Subject: [PATCH 1/2] Make workers configurable Make workers fully configurab le and remove reference to the plerkle plugin. --- nft_ingester/src/config.rs | 53 +++++++++----- nft_ingester/src/main.rs | 144 +++++++++++++------------------------ 2 files changed, 86 insertions(+), 111 deletions(-) diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 6e11fb539..4f281fcc1 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -27,15 +27,25 @@ pub struct IngesterConfig { pub backfiller_trees: Option>, pub role: Option, pub max_postgres_connections: Option, - pub account_stream_worker_count: Option, - pub account_backfill_stream_worker_count: Option, - pub transaction_stream_worker_count: Option, - pub transaction_backfill_stream_worker_count: Option, + pub worker_config: Option>, pub code_version: Option<&'static str>, pub background_task_runner_config: Option, pub cl_audits: Option, // save transaction logs for compressed nfts } +#[derive(Deserialize, PartialEq, Debug, Clone)] +pub struct WorkerConfig { + pub stream_name: String, + pub worker_type: WorkerType, + pub worker_count: u32, +} + +#[derive(Deserialize, PartialEq, Debug, Clone)] +pub enum WorkerType { + Account, + Transaction, +} + impl IngesterConfig { /// Get the db url out of the dict, this is built a a dict so that future extra db parameters can be easily shoved in. /// this panics if the key is not present @@ -66,20 +76,31 @@ impl IngesterConfig { mc } - pub fn get_account_stream_worker_count(&self) -> u32 { - self.account_stream_worker_count.unwrap_or(2) + pub fn get_worker_config(&self) -> Vec { + return if let Some(wc) = &self.worker_config { + wc.to_vec() + } else { + vec![ + WorkerConfig { + stream_name: "ACC".to_string(), + worker_count: 2, + worker_type: WorkerType::Account, + }, + WorkerConfig { + stream_name: "TXN".to_string(), + worker_count: 2, + worker_type: WorkerType::Transaction, + }, + ] + }; } - pub fn get_account_backfill_stream_worker_count(&self) -> u32 { - self.account_backfill_stream_worker_count.unwrap_or(0) - } - - pub fn get_transaction_stream_worker_count(&self) -> u32 { - self.transaction_stream_worker_count.unwrap_or(2) - } - - pub fn get_transaction_backfill_stream_worker_count(&self) -> u32 { - self.transaction_backfill_stream_worker_count.unwrap_or(0) + pub fn get_worker_count(&self) -> u32 { + let mut count = 0; + for wc in self.get_worker_config() { + count += wc.worker_count; + } + count } } diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 3d08cafb1..f68b31d72 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -14,7 +14,7 @@ use crate::{ account_updates::account_worker, ack::ack_worker, backfiller::setup_backfiller, - config::{init_logger, rand_string, setup_config, IngesterRole}, + config::{init_logger, rand_string, setup_config, IngesterRole, WorkerType}, database::setup_database, error::IngesterError, metrics::setup_metrics, @@ -26,9 +26,7 @@ use cadence_macros::{is_global_default_set, statsd_count}; use chrono::Duration; use clap::{arg, command, value_parser}; use log::{error, info}; -use plerkle_messenger::{ - redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM -}; +use plerkle_messenger::{redis_messenger::RedisMessenger, ConsumptionType}; use std::{path::PathBuf, time}; use tokio::{signal, task::JoinSet}; @@ -97,102 +95,58 @@ pub async fn main() -> Result<(), IngesterError> { if role != IngesterRole::BackgroundTaskRunner { tasks.spawn(bg_task_listener); } - let mut timer_acc = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - ACCOUNT_STREAM, - )?; - let mut timer_backfiller_acc = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - ACCOUNT_BACKFILL_STREAM, - )?; - let mut timer_txn = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - TRANSACTION_STREAM, - )?; - let mut timer_backfiller_txn = StreamSizeTimer::new( - stream_metrics_timer, - config.messenger_config.clone(), - TRANSACTION_BACKFILL_STREAM, - )?; - - - if let Some(t) = timer_acc.start::().await { - tasks.spawn(t); - } - if let Some(t) = timer_backfiller_acc.start::().await { - tasks.spawn(t); - } - if let Some(t) = timer_txn.start::().await { - tasks.spawn(t); - } - if let Some(t) = timer_backfiller_txn.start::().await { - tasks.spawn(t); - } // Stream Consumers Setup ------------------------------------- if role == IngesterRole::Ingester || role == IngesterRole::All { + let workers = config.get_worker_config().clone(); + let (_ack_task, ack_sender) = ack_worker::(config.get_messneger_client_config()); - for i in 0..config.get_account_stream_worker_count() { - let _account = account_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - ACCOUNT_STREAM, - ); - } - for i in 0..config.get_account_backfill_stream_worker_count() { - let _account_backfill = account_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - ACCOUNT_BACKFILL_STREAM, - ); - } - for i in 0..config.get_transaction_stream_worker_count() { - let _txn = transaction_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - config.cl_audits.unwrap_or(false), - TRANSACTION_STREAM, - ); - } - for i in 0..config.get_transaction_backfill_stream_worker_count() { - let _txn_backfill = transaction_worker::( - database_pool.clone(), - config.get_messneger_client_config(), - bg_task_sender.clone(), - ack_sender.clone(), - if i == 0 { - ConsumptionType::Redeliver - } else { - ConsumptionType::New - }, - config.cl_audits.unwrap_or(false), - TRANSACTION_BACKFILL_STREAM, - ); + + // iterate all the workers + for worker in workers { + let stream_name = worker.stream_name.to_owned().as_str(); + + let mut timer_worker = StreamSizeTimer::new( + stream_metrics_timer, + config.messenger_config.clone(), + stream_name.clone(), + )?; + + if let Some(t) = timer_worker.start::().await { + tasks.spawn(t); + } + + for i in 0..worker.worker_count { + if worker.worker_type == WorkerType::Account { + let _account = account_worker::( + database_pool.clone(), + config.get_messneger_client_config(), + bg_task_sender.clone(), + ack_sender.clone(), + if i == 0 { + ConsumptionType::Redeliver + } else { + ConsumptionType::New + }, + stream_name, + ); + } else if worker.worker_type == WorkerType::Transaction { + let _txn = transaction_worker::( + database_pool.clone(), + config.get_messneger_client_config(), + bg_task_sender.clone(), + ack_sender.clone(), + if i == 0 { + ConsumptionType::Redeliver + } else { + ConsumptionType::New + }, + config.cl_audits.unwrap_or(false), + stream_name, + ); + } + } } } // Stream Size Timers ---------------------------------------- From 3fd93d569bde235932e0ebee5bb45a58325a9316 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Mon, 27 Nov 2023 09:41:33 -0500 Subject: [PATCH 2/2] fix lifetime --- nft_ingester/src/database.rs | 4 ++-- nft_ingester/src/main.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nft_ingester/src/database.rs b/nft_ingester/src/database.rs index 578f58cbc..3208923ac 100644 --- a/nft_ingester/src/database.rs +++ b/nft_ingester/src/database.rs @@ -8,8 +8,8 @@ const DEFAULT_MAX: u32 = 125; pub async fn setup_database(config: IngesterConfig) -> PgPool { let max = config.max_postgres_connections.unwrap_or(DEFAULT_MAX); if config.role == Some(IngesterRole::All) || config.role == Some(IngesterRole::Ingester) { - let relative_max = - config.get_account_stream_worker_count() + config.get_transaction_stream_worker_count(); + let relative_max: u32 = + config.get_worker_config().iter().map(|c| c.worker_count).sum(); let should_be_at_least = relative_max * 5; if should_be_at_least > max { panic!("Please increase max_postgres_connections to at least {}, at least 5 connections per worker process should be given", should_be_at_least); diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index f68b31d72..e69c4f9ee 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -105,12 +105,12 @@ pub async fn main() -> Result<(), IngesterError> { // iterate all the workers for worker in workers { - let stream_name = worker.stream_name.to_owned().as_str(); + let stream_name = Box::leak(Box::new(worker.stream_name.to_owned())); let mut timer_worker = StreamSizeTimer::new( stream_metrics_timer, config.messenger_config.clone(), - stream_name.clone(), + stream_name.as_str(), )?; if let Some(t) = timer_worker.start::().await {