Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make workers configurable #106

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@ pub struct IngesterConfig {
pub backfiller_trees: Option<Vec<String>>,
pub role: Option<IngesterRole>,
pub max_postgres_connections: Option<u32>,
pub account_stream_worker_count: Option<u32>,
pub account_backfill_stream_worker_count: Option<u32>,
pub transaction_stream_worker_count: Option<u32>,
pub transaction_backfill_stream_worker_count: Option<u32>,
pub worker_config: Option<Vec<WorkerConfig>>,
pub code_version: Option<&'static str>,
pub background_task_runner_config: Option<BackgroundTaskRunnerConfig>,
pub cl_audits: Option<bool>, // save transaction logs for compressed nfts
}

#[derive(Deserialize, PartialEq, Debug, Clone)]
pub struct WorkerConfig {
pub stream_name: String,
pub worker_type: WorkerType,
pub worker_count: u32,
}

#[derive(Deserialize, PartialEq, Debug, Clone)]
pub enum WorkerType {
Account,
Transaction,
}

impl IngesterConfig {
/// Get the db url out of the dict, this is built a a dict so that future extra db parameters can be easily shoved in.
/// this panics if the key is not present
Expand Down Expand Up @@ -66,20 +76,31 @@ impl IngesterConfig {
mc
}

pub fn get_account_stream_worker_count(&self) -> u32 {
self.account_stream_worker_count.unwrap_or(2)
pub fn get_worker_config(&self) -> Vec<WorkerConfig> {
return if let Some(wc) = &self.worker_config {
wc.to_vec()
} else {
vec![
WorkerConfig {
stream_name: "ACC".to_string(),
worker_count: 2,
worker_type: WorkerType::Account,
},
WorkerConfig {
stream_name: "TXN".to_string(),
worker_count: 2,
worker_type: WorkerType::Transaction,
},
]
};
}

pub fn get_account_backfill_stream_worker_count(&self) -> u32 {
self.account_backfill_stream_worker_count.unwrap_or(0)
}

pub fn get_transaction_stream_worker_count(&self) -> u32 {
self.transaction_stream_worker_count.unwrap_or(2)
}

pub fn get_transaction_backfill_stream_worker_count(&self) -> u32 {
self.transaction_backfill_stream_worker_count.unwrap_or(0)
pub fn get_worker_count(&self) -> u32 {
let mut count = 0;
for wc in self.get_worker_config() {
count += wc.worker_count;
}
count
}
}

Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ const DEFAULT_MAX: u32 = 125;
pub async fn setup_database(config: IngesterConfig) -> PgPool {
let max = config.max_postgres_connections.unwrap_or(DEFAULT_MAX);
if config.role == Some(IngesterRole::All) || config.role == Some(IngesterRole::Ingester) {
let relative_max =
config.get_account_stream_worker_count() + config.get_transaction_stream_worker_count();
let relative_max: u32 =
config.get_worker_config().iter().map(|c| c.worker_count).sum();
let should_be_at_least = relative_max * 5;
if should_be_at_least > max {
panic!("Please increase max_postgres_connections to at least {}, at least 5 connections per worker process should be given", should_be_at_least);
Expand Down
144 changes: 49 additions & 95 deletions nft_ingester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
account_updates::account_worker,
ack::ack_worker,
backfiller::setup_backfiller,
config::{init_logger, rand_string, setup_config, IngesterRole},
config::{init_logger, rand_string, setup_config, IngesterRole, WorkerType},
database::setup_database,
error::IngesterError,
metrics::setup_metrics,
Expand All @@ -26,9 +26,7 @@ use cadence_macros::{is_global_default_set, statsd_count};
use chrono::Duration;
use clap::{arg, command, value_parser};
use log::{error, info};
use plerkle_messenger::{
redis_messenger::RedisMessenger, ConsumptionType, ACCOUNT_STREAM, ACCOUNT_BACKFILL_STREAM, TRANSACTION_STREAM, TRANSACTION_BACKFILL_STREAM
};
use plerkle_messenger::{redis_messenger::RedisMessenger, ConsumptionType};
use std::{path::PathBuf, time};
use tokio::{signal, task::JoinSet};

Expand Down Expand Up @@ -97,102 +95,58 @@ pub async fn main() -> Result<(), IngesterError> {
if role != IngesterRole::BackgroundTaskRunner {
tasks.spawn(bg_task_listener);
}
let mut timer_acc = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
ACCOUNT_STREAM,
)?;
let mut timer_backfiller_acc = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
ACCOUNT_BACKFILL_STREAM,
)?;
let mut timer_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_STREAM,
)?;
let mut timer_backfiller_txn = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
TRANSACTION_BACKFILL_STREAM,
)?;


if let Some(t) = timer_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_acc.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}
if let Some(t) = timer_backfiller_txn.start::<RedisMessenger>().await {
tasks.spawn(t);
}

// Stream Consumers Setup -------------------------------------
if role == IngesterRole::Ingester || role == IngesterRole::All {
let workers = config.get_worker_config().clone();

let (_ack_task, ack_sender) =
ack_worker::<RedisMessenger>(config.get_messneger_client_config());
for i in 0..config.get_account_stream_worker_count() {
let _account = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
ACCOUNT_STREAM,
);
}
for i in 0..config.get_account_backfill_stream_worker_count() {
let _account_backfill = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
ACCOUNT_BACKFILL_STREAM,
);
}
for i in 0..config.get_transaction_stream_worker_count() {
let _txn = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_STREAM,
);
}
for i in 0..config.get_transaction_backfill_stream_worker_count() {
let _txn_backfill = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
TRANSACTION_BACKFILL_STREAM,
);

// iterate all the workers
for worker in workers {
let stream_name = Box::leak(Box::new(worker.stream_name.to_owned()));

let mut timer_worker = StreamSizeTimer::new(
stream_metrics_timer,
config.messenger_config.clone(),
stream_name.as_str(),
)?;

if let Some(t) = timer_worker.start::<RedisMessenger>().await {
tasks.spawn(t);
}

for i in 0..worker.worker_count {
if worker.worker_type == WorkerType::Account {
let _account = account_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
stream_name,
);
} else if worker.worker_type == WorkerType::Transaction {
let _txn = transaction_worker::<RedisMessenger>(
database_pool.clone(),
config.get_messneger_client_config(),
bg_task_sender.clone(),
ack_sender.clone(),
if i == 0 {
ConsumptionType::Redeliver
} else {
ConsumptionType::New
},
config.cl_audits.unwrap_or(false),
stream_name,
);
}
}
}
}
// Stream Size Timers ----------------------------------------
Expand Down
Loading