Skip to content

Commit

Permalink
refactor(publisher): Remove limit on fuel-core runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed Nov 7, 2024
1 parent fc4b6c0 commit 8a2fb61
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ NATS_PUBLIC_PASS=temp-public-pass
SURREALDB_URL=127.0.0.1:8000
SURREALDB_USER=root
SURREALDB_PASS=root
CONCURRENCY_LIMIT=16
PUBLISHER_MAX_THREADS=16
ELASTICSEARCH_URL=http://127.0.0.1:9200
ELASTICSEARCH_USERNAME=elastic
ELASTICSEARCH_PASSWORD=generated-secret
Expand Down
14 changes: 2 additions & 12 deletions crates/fuel-streams-publisher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,11 @@ use fuel_streams_core::prelude::*;
pub use publisher::{Publisher, Streams};
use sha2::{Digest, Sha256};

pub static FUEL_CORE_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
let available_cpus = num_cpus::get();
let default_threads = (available_cpus * 2 / 3).max(2); // Use 2/3 of CPUs, minimum 2

env::var("FUEL_CORE_CONCURRENCY_LIMIT")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(default_threads)
});

pub static PUBLISHER_CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
pub static PUBLISHER_MAX_THREADS: LazyLock<usize> = LazyLock::new(|| {
let available_cpus = num_cpus::get();
let default_threads = (available_cpus / 3).max(1); // Use 1/3 of CPUs, minimum 1

env::var("PUBLISHER_CONCURRENCY_LIMIT")
env::var("PUBLISHER_MAX_THREADS")
.ok()
.and_then(|val| val.parse().ok())
.unwrap_or(default_threads)
Expand Down
15 changes: 3 additions & 12 deletions crates/fuel-streams-publisher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use fuel_streams_publisher::{
server::create_web_server,
state::SharedState,
system::System,
FUEL_CORE_CONCURRENCY_LIMIT,
PUBLISHER_CONCURRENCY_LIMIT,
PUBLISHER_MAX_THREADS,
};
use parking_lot::RwLock;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -134,19 +133,11 @@ async fn setup_server(
fn main() -> anyhow::Result<()> {
fuel_core_bin::cli::init_logging();
let cli = Cli::parse();
let publisher_threads = *PUBLISHER_MAX_THREADS;

let fuel_core_threads = *FUEL_CORE_CONCURRENCY_LIMIT;
let publisher_threads = *PUBLISHER_CONCURRENCY_LIMIT;

println!("Fuel core threads: {}", fuel_core_threads);
println!("Publisher threads: {}", publisher_threads);

let fuel_core_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(fuel_core_threads)
.max_blocking_threads(fuel_core_threads * 2)
.enable_all()
.build()?;

let fuel_core_runtime = tokio::runtime::Runtime::new()?;
let fuel_core = fuel_core_runtime
.block_on(async { setup_fuel_core(&cli.fuel_core_config).await })?;

Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-streams-publisher/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
transactions,
FuelCore,
FuelCoreLike,
PUBLISHER_CONCURRENCY_LIMIT,
PUBLISHER_MAX_THREADS,
};

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -367,7 +367,7 @@ impl Publisher {
) -> anyhow::Result<()> {
let start_time = std::time::Instant::now();
let fuel_core = &*self.fuel_core;
let semaphore = Arc::new(Semaphore::new(*PUBLISHER_CONCURRENCY_LIMIT));
let semaphore = Arc::new(Semaphore::new(*PUBLISHER_MAX_THREADS));
let chain_id = Arc::new(*self.fuel_core.chain_id());
let block_producer = Arc::new(block_producer.clone());
let block_height = block.header().consensus().height;
Expand Down

0 comments on commit 8a2fb61

Please sign in to comment.