Skip to content

Commit

Permalink
Merge pull request #1859 from AntelopeIO/GH-1690-performance
Browse files Browse the repository at this point in the history
Remove thread hop to producer thread for trx signature recovery
  • Loading branch information
heifner authored Nov 7, 2023
2 parents 3bbc463 + c5bb941 commit 1612650
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 131 deletions.
2 changes: 0 additions & 2 deletions docs/01_nodeos/03_plugins/producer_plugin/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,6 @@ Config Options for eosio::producer_plugin:
--disable-subjective-api-billing arg (=1)
Disable subjective CPU billing for API
transactions
--producer-threads arg (=2) Number of worker threads in producer
thread pool
--snapshots-dir arg (="snapshots") the location of the snapshots directory
(absolute path or relative to
application data dir)
Expand Down
6 changes: 6 additions & 0 deletions libraries/chain/include/eosio/chain/transaction_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ class transaction_metadata {
start_recover_keys( packed_transaction_ptr trx, boost::asio::io_context& thread_pool,
const chain_id_type& chain_id, fc::microseconds time_limit,
trx_type t, uint32_t max_variable_sig_size = UINT32_MAX );
/// Thread safe.
/// @returns transaction_metadata_ptr or throws
static transaction_metadata_ptr
recover_keys( packed_transaction_ptr trx,
const chain_id_type& chain_id, fc::microseconds time_limit,
trx_type t, uint32_t max_variable_sig_size = UINT32_MAX );

/// @returns constructed transaction_metadata with no key recovery (sig_cpu_usage=0, recovered_pub_keys=empty)
static transaction_metadata_ptr
Expand Down
26 changes: 17 additions & 9 deletions libraries/chain/transaction_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,23 @@ recover_keys_future transaction_metadata::start_recover_keys( packed_transaction
uint32_t max_variable_sig_size )
{
return post_async_task( thread_pool, [trx{std::move(trx)}, chain_id, time_limit, t, max_variable_sig_size]() mutable {
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
const signed_transaction& trn = trx->get_signed_transaction();
flat_set<public_key_type> recovered_pub_keys;
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );
return std::make_shared<transaction_metadata>( private_type(), std::move( trx ), cpu_usage, std::move( recovered_pub_keys ), t );
}
);
return recover_keys( std::move(trx), chain_id, time_limit, t, max_variable_sig_size );
});
}

transaction_metadata_ptr transaction_metadata::recover_keys( packed_transaction_ptr trx,
const chain_id_type& chain_id,
fc::microseconds time_limit,
trx_type t,
uint32_t max_variable_sig_size )
{
fc::time_point deadline = time_limit == fc::microseconds::maximum() ?
fc::time_point::maximum() : fc::time_point::now() + time_limit;
check_variable_sig_size( trx, max_variable_sig_size );
const signed_transaction& trn = trx->get_signed_transaction();
flat_set<public_key_type> recovered_pub_keys;
fc::microseconds cpu_usage = trn.get_signature_keys( chain_id, deadline, recovered_pub_keys );
return std::make_shared<transaction_metadata>( private_type(), std::move( trx ), cpu_usage, std::move( recovered_pub_keys ), t );
}

size_t transaction_metadata::get_estimated_size() const {
Expand Down
123 changes: 60 additions & 63 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,12 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
int64_t sub_bill,
uint32_t prev_billed_cpu_time_us);

void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace, const fc::time_point& start);
void log_trx_results(const transaction_metadata_ptr& trx, const transaction_trace_ptr& trace);
void log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr);
void log_trx_results(const packed_transaction_ptr& trx,
const transaction_trace_ptr& trace,
const fc::exception_ptr& except_ptr,
uint32_t billed_cpu_us,
const fc::time_point& start,
bool is_transient);

void add_greylist_accounts(const producer_plugin::greylist_params& params) {
Expand Down Expand Up @@ -495,8 +494,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
block_timing_util::producer_watermarks _producer_watermarks;
pending_block_mode _pending_block_mode = pending_block_mode::speculating;
unapplied_transaction_queue _unapplied_transactions;
size_t _thread_pool_size = config::default_controller_thread_pool_size;
named_thread_pool<struct prod> _thread_pool;
std::atomic<int32_t> _max_transaction_time_ms; // modified by app thread, read by net_plugin thread pool
std::atomic<uint32_t> _received_block{0}; // modified by net_plugin thread pool
fc::microseconds _max_irreversible_block_age_us;
Expand Down Expand Up @@ -798,17 +795,9 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
return;
}

chain::controller& chain = chain_plug->chain();
const auto max_trx_time_ms = (trx_type == transaction_metadata::trx_type::read_only) ? -1 : _max_transaction_time_ms.load();
fc::microseconds max_trx_cpu_usage = max_trx_time_ms < 0 ? fc::microseconds::maximum() : fc::milliseconds(max_trx_time_ms);

auto future = transaction_metadata::start_recover_keys(trx,
chain.get_thread_pool(),
chain.get_chain_id(),
fc::microseconds(max_trx_cpu_usage),
trx_type,
chain.configured_subjective_signature_length_limit());

auto is_transient = (trx_type == transaction_metadata::trx_type::read_only || trx_type == transaction_metadata::trx_type::dry_run);
if (!is_transient) {
next = [this, trx, next{std::move(next)}](const next_function_variant<transaction_trace_ptr>& response) {
Expand All @@ -825,38 +814,60 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
};
}

boost::asio::post(_thread_pool.get_executor(),
[self = this, future{std::move(future)}, api_trx, is_transient, return_failure_traces,
next{std::move(next)}, trx = trx]() mutable {
if (future.valid()) {
future.wait();
app().executor().post(priority::low, exec_queue::read_write,
[self, future{std::move(future)}, api_trx, is_transient, next{std::move(next)}, trx{std::move(trx)},
return_failure_traces]() mutable {
auto start = fc::time_point::now();
auto idle_time = self->_time_tracker.add_idle_time(start);
auto trx_tracker = self->_time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler =
[self, is_transient, &next, trx{std::move(trx)}, &start](fc::exception_ptr ex) {
self->log_trx_results(trx, nullptr, ex, 0, start, is_transient);
next(std::move(ex));
};
try {
auto result = future.get();
if (!self->process_incoming_transaction_async(result, api_trx, return_failure_traces, trx_tracker, next)) {
if (self->in_producing_mode()) {
self->schedule_maybe_produce_block(true);
} else {
self->restart_speculative_block();
}
}
}
CATCH_AND_CALL(exception_handler);
});
}
});
boost::asio::post(
chain_plug->chain().get_thread_pool(), // use chain thread pool for key recovery
[this, trx{trx}, time_limit{max_trx_cpu_usage}, trx_type, is_transient, next{std::move(next)}, api_trx, return_failure_traces]() mutable {

chain::controller& chain = chain_plug->chain();
transaction_metadata_ptr trx_meta;
try {
trx_meta = transaction_metadata::recover_keys(trx, chain.get_chain_id(), time_limit, trx_type,
chain.configured_subjective_signature_length_limit());
} catch (...) {
// use read_write when read is likely fine; maintains previous behavior of next() always being called from the main thread
app().executor().post(
priority::low, exec_queue::read_write,
[this, ex_ptr{std::current_exception()}, trx{std::move(trx)}, is_transient, next{std::move(next)}]() {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));
auto ex_handler = [this, is_transient, &next, &trx](fc::exception_ptr ex) {
log_trx_results(trx, nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
std::rethrow_exception(ex_ptr);
} CATCH_AND_CALL(ex_handler)
});
return;
}

// key recovery complete, continue execution on the main thread
app().executor().post(
priority::low, exec_queue::read_write,
[this, trx_meta{std::move(trx_meta)}, is_transient, next{std::move(next)}, api_trx, return_failure_traces]() {
auto start = fc::time_point::now();
auto idle_time = _time_tracker.add_idle_time(start);
auto trx_tracker = _time_tracker.start_trx(is_transient, start);
fc_tlog(_log, "Time since last trx: ${t}us", ("t", idle_time));

auto exception_handler = [this, is_transient, &next, &trx_meta](fc::exception_ptr ex) {
log_trx_results(trx_meta->packed_trx(), nullptr, ex, 0, is_transient);
next(std::move(ex));
};
try {
if (!process_incoming_transaction_async(trx_meta, api_trx, return_failure_traces, trx_tracker, next)) {
if (in_producing_mode()) {
schedule_maybe_produce_block(true);
} else {
restart_speculative_block();
}
}
}
CATCH_AND_CALL(exception_handler);
});
});
}

bool process_incoming_transaction_async(const transaction_metadata_ptr& trx,
Expand Down Expand Up @@ -1066,8 +1077,6 @@ void producer_plugin::set_program_options(
"Disable subjective CPU billing for P2P transactions")
("disable-subjective-api-billing", bpo::value<bool>()->default_value(true),
"Disable subjective CPU billing for API transactions")
("producer-threads", bpo::value<uint16_t>()->default_value(my->_thread_pool_size),
"Number of worker threads in producer thread pool")
("snapshots-dir", bpo::value<std::filesystem::path>()->default_value("snapshots"),
"the location of the snapshots directory (absolute path or relative to application data dir)")
("read-only-threads", bpo::value<uint32_t>(),
Expand Down Expand Up @@ -1185,9 +1194,6 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia
ilog("Subjective CPU billing of API trxs disabled ");
}

_thread_pool_size = options.at("producer-threads").as<uint16_t>();
EOS_ASSERT(_thread_pool_size > 0, plugin_config_exception, "producer-threads ${num} must be greater than 0", ("num", _thread_pool_size));

if (options.count("snapshots-dir")) {
auto sd = options.at("snapshots-dir").as<std::filesystem::path>();
if (sd.is_relative()) {
Expand Down Expand Up @@ -1313,12 +1319,6 @@ void producer_plugin_impl::plugin_startup() {
try {
ilog("producer plugin: plugin_startup() begin");

_thread_pool.start(_thread_pool_size, [](const fc::exception& e) {
fc_elog(_log, "Exception in producer thread pool, exiting: ${e}", ("e", e.to_detail_string()));
app().quit();
});


chain::controller& chain = chain_plug->chain();
EOS_ASSERT(_producers.empty() || chain.get_read_mode() != chain::db_read_mode::IRREVERSIBLE, plugin_config_exception,
"node cannot have any producer-name configured because block production is impossible when read_mode is \"irreversible\"");
Expand Down Expand Up @@ -1398,7 +1398,6 @@ void producer_plugin_impl::plugin_shutdown() {
_ro_timer.cancel(ec);
app().executor().stop();
_ro_thread_pool.stop();
_thread_pool.stop();
_unapplied_transactions.clear();

app().executor().post(0, [me = shared_from_this()]() {}); // keep my pointer alive until queue is drained
Expand Down Expand Up @@ -2043,22 +2042,20 @@ inline std::string get_detailed_contract_except_info(const packed_transaction_pt
}

void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx,
const transaction_trace_ptr& trace,
const fc::time_point& start) {
const transaction_trace_ptr& trace) {
uint32_t billed_cpu_time_us = (trace && trace->receipt) ? trace->receipt->cpu_usage_us : 0;
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, start, trx->is_transient());
log_trx_results(trx->packed_trx(), trace, nullptr, billed_cpu_time_us, trx->is_transient());
}

void producer_plugin_impl::log_trx_results(const transaction_metadata_ptr& trx, const fc::exception_ptr& except_ptr) {
uint32_t billed_cpu_time_us = trx ? trx->billed_cpu_time_us : 0;
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, fc::time_point::now(), trx->is_transient());
log_trx_results(trx->packed_trx(), nullptr, except_ptr, billed_cpu_time_us, trx->is_transient());
}

void producer_plugin_impl::log_trx_results(const packed_transaction_ptr& trx,
const transaction_trace_ptr& trace,
const fc::exception_ptr& except_ptr,
uint32_t billed_cpu_us,
const fc::time_point& start,
bool is_transient) {
chain::controller& chain = chain_plug->chain();

Expand Down Expand Up @@ -2226,7 +2223,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr&
if (!disable_subjective_enforcement) // subjectively bill failure when producing since not in objective cpu account billing
subjective_bill.subjective_bill_failure(first_auth, trace->elapsed, fc::time_point::now());

log_trx_results(trx, trace, start);
log_trx_results(trx, trace);
// this failed our configured maximum transaction time, we don't want to replay it
fc_tlog(_log, "Failed ${c} trx, auth: ${a}, prev billed: ${p}us, ran: ${r}us, id: ${id}, except: ${e}",
("c", e.code())("a", first_auth)("p", prev_billed_cpu_time_us)("r", end - start)("id", trx->id())("e", e));
Expand All @@ -2245,7 +2242,7 @@ producer_plugin_impl::handle_push_result(const transaction_metadata_ptr&
} else {
fc_tlog(_log, "Subjective bill for success ${a}: ${b} elapsed ${t}us, time ${r}us",
("a", first_auth)("b", sub_bill)("t", trace->elapsed)("r", end - start));
log_trx_results(trx, trace, start);
log_trx_results(trx, trace);
// if producing then trx is in objective cpu account billing
if (!disable_subjective_enforcement && !in_producing_mode()) {
subjective_bill.subjective_bill(trx->id(), trx->packed_trx()->expiration(), first_auth, trace->elapsed);
Expand Down
Loading

0 comments on commit 1612650

Please sign in to comment.