Skip to content

Commit

Permalink
Merge pull request #1651 from skalenetwork/enhancement/sync-node-broa…
Browse files Browse the repository at this point in the history
…dcast-txs

Enhancement/sync node broadcast txs
  • Loading branch information
DmytroNazarenko authored Dec 29, 2023
2 parents f7c560a + a2d5363 commit c18ad39
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 107 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ jobs:
./testeth -t BlockQueueSuite -- --express && touch /tmp/BlockQueueSuitePassed
./testeth -t ClientBase -- --express && touch /tmp/ClientBasePassed
./testeth -t EstimateGas -- --express && touch /tmp/EstimateGasPassed
./testeth -t IMABLSPublicKey -- --express && touch /tmp/IMABLSPublicKeyPassed
./testeth -t getHistoricNodesData -- --express && touch /tmp/getHistoricNodesDataPassed
./testeth -t ExtVmSuite -- --express && touch /tmp/ExtVmSuitePassed
./testeth -t GasPricer -- --express && touch /tmp/GasPricerPassed
./testeth -t BasicTests -- --express && touch /tmp/BasicTestsPassed
Expand Down Expand Up @@ -239,7 +239,7 @@ jobs:
ls /tmp/BlockQueueSuitePassed || ./testeth -t BlockQueueSuite -- --express --verbosity 4
ls /tmp/ClientBasePassed || ./testeth -t ClientBase -- --express --verbosity 4
ls /tmp/EstimateGasPassed || ./testeth -t EstimateGas -- --express --verbosity 4
ls /tmp/IMABLSPublicKeyPassed || ./testeth -t IMABLSPublicKey -- --express --verbosity 4
ls /tmp/getHistoricNodesDataPassed || ./testeth -t getHistoricNodesData -- --express --verbosity 4
ls /tmp/ExtVmSuitePassed || ./testeth -t ExtVmSuite -- --express --verbosity 4
ls /tmp/GasPricerPassed || ./testeth -t GasPricer -- --express --verbosity 4
ls /tmp/BasicTestsPassed || ./testeth -t BasicTests -- --express --verbosity 4
Expand Down
2 changes: 1 addition & 1 deletion libconsensus
133 changes: 61 additions & 72 deletions libethereum/SkaleHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,12 @@ void SkaleHost::createBlock( const ConsensusExtFace::transactions_vector& _appro
std::lock_guard< std::recursive_mutex > lock( m_pending_createMutex );

if ( m_ignoreNewBlocks ) {
clog( VerbosityWarning, "skale-host" ) << "WARNING: skaled got new block #" << _blockID
<< " after timestamp-related exit initiated!";
LOG( m_warningLogger ) << "WARNING: skaled got new block #" << _blockID
<< " after timestamp-related exit initiated!";
return;
}

LOG( m_debugLogger ) << cc::debug( "createBlock " ) << cc::notice( "ID" ) << cc::debug( " = " )
<< cc::warn( "#" ) << cc::num10( _blockID ) << std::endl;
LOG( m_debugLogger ) << "createBlock ID = #" << _blockID;
m_debugTracer.tracepoint( "create_block" );

// convert bytes back to transactions (using caching), delete them from q and push results into
Expand All @@ -587,23 +586,18 @@ void SkaleHost::createBlock( const ConsensusExtFace::transactions_vector& _appro
dev::h256 stCurrent =
this->m_client.blockInfo( this->m_client.hashFromNumber( _blockID - 1 ) ).stateRoot();

LOG( m_traceLogger ) << cc::debug( "STATE ROOT FOR BLOCK: " )
<< cc::debug( std::to_string( _blockID - 1 ) ) << ' '
<< cc::debug( stCurrent.hex() ) << std::endl;
LOG( m_traceLogger ) << "STATE ROOT FOR BLOCK: " << std::to_string( _blockID - 1 ) << " "
<< stCurrent.hex();

// FATAL if mismatch in non-default
if ( _winningNodeIndex != 0 && dev::h256::Arith( stCurrent ) != _stateRoot &&
!this->m_client.chainParams().nodeInfo.syncNode ) {
clog( VerbosityError, "skale-host" )
<< cc::fatal( "FATAL STATE ROOT MISMATCH ERROR:" )
<< cc::error( " current state root " )
<< cc::warn( dev::h256::Arith( stCurrent ).str() )
<< cc::error( " is not equal to arrived state root " )
<< cc::warn( _stateRoot.str() ) << cc::error( " with block ID " )
<< cc::notice( "#" ) << cc::num10( _blockID ) << cc::warn( ", " )
<< cc::p( "/data_dir" )
<< cc::error( " cleanup is recommended, exiting with code " )
<< cc::num10( int( ExitHandler::ec_state_root_mismatch ) ) << "...";
LOG( m_errorLogger ) << "FATAL STATE ROOT MISMATCH ERROR: current state root "
<< dev::h256::Arith( stCurrent ).str()
<< " is not equal to arrived state root " << _stateRoot.str()
<< " with block ID #" << _blockID
<< ", /data_dir cleanup is recommended, exiting with code "
<< int( ExitHandler::ec_state_root_mismatch ) << "...";
if ( AmsterdamFixPatch::stateRootCheckingEnabled( m_client ) ) {
m_ignoreNewBlocks = true;
m_consensus->exitGracefully();
Expand All @@ -613,16 +607,14 @@ void SkaleHost::createBlock( const ConsensusExtFace::transactions_vector& _appro

// WARN if default but non-zero
if ( _winningNodeIndex == 0 && _stateRoot != u256() )
clog( VerbosityWarning, "skale-host" )
<< cc::warn( "WARNING: STATE ROOT MISMATCH!" )
<< cc::warn( " Current block is DEFAULT BUT arrived state root is " )
<< cc::warn( _stateRoot.str() ) << cc::warn( " with block ID " )
<< cc::notice( "#" ) << cc::num10( _blockID );
LOG( m_warningLogger ) << "WARNING: STATE ROOT MISMATCH!"
<< "Current block is DEFAULT BUT arrived state root is "
<< _stateRoot.str() << " with block ID #" << _blockID;
}

std::vector< Transaction > out_txns; // resultant Transaction vector

std::atomic_bool have_consensus_born = false; // means we need to re-verify old txns
std::atomic_bool haveConsensusBorn = false; // means we need to re-verify old txns

// HACK this is for not allowing new transactions in tq between deletion and block creation!
// TODO decouple SkaleHost and Client!!!
Expand All @@ -636,11 +628,11 @@ void SkaleHost::createBlock( const ConsensusExtFace::transactions_vector& _appro
for ( auto it = _approvedTransactions.begin(); it != _approvedTransactions.end(); ++it ) {
const bytes& data = *it;
h256 sha = sha3( data );
LOG( m_traceLogger ) << cc::debug( "Arrived txn: " ) << sha << std::endl;
LOG( m_traceLogger ) << "Arrived txn: " << sha;
jarrProcessedTxns.push_back( toJS( sha ) );
#ifdef DEBUG_TX_BALANCE
if ( sent.count( sha ) != m_transaction_cache.count( sha.asArray() ) ) {
std::cerr << cc::error( "createBlock assert" ) << std::endl;
LOG( m_errorLogger ) << "createBlock assert";
// sleep(200);
assert( sent.count( sha ) == m_transaction_cache.count( sha.asArray() ) );
}
Expand All @@ -667,14 +659,14 @@ void SkaleHost::createBlock( const ConsensusExtFace::transactions_vector& _appro
Transaction t( data, CheckTransaction::Everything, true );
t.checkOutExternalGas( m_client.chainParams().externalGasDifficulty );
out_txns.push_back( t );
LOG( m_debugLogger ) << "Will import consensus-born txn!";
LOG( m_debugLogger ) << "Will import consensus-born txn";
m_debugTracer.tracepoint( "import_consensus_born" );
have_consensus_born = true;
haveConsensusBorn = true;
}

if ( m_tq.knownTransactions().count( sha ) != 0 ) {
// TODO fix this!!?
clog( VerbosityWarning, "skale-host" )
<< "Consensus returned 'future'' transaction that we didn't yet send!!";
LOG( m_traceLogger )
<< "Consensus returned future transaction that we didn't yet send";
m_debugTracer.tracepoint( "import_future" );
}

Expand Down Expand Up @@ -707,51 +699,49 @@ void SkaleHost::createBlock( const ConsensusExtFace::transactions_vector& _appro
boost::chrono::high_resolution_clock::time_point skaledTimeFinish =
boost::chrono::high_resolution_clock::now();
if ( latestBlockTime != boost::chrono::high_resolution_clock::time_point() ) {
clog( VerbosityInfo, "skale-host" )
<< "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count()
<< ':' << "BFT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - latestBlockTime )
.count();
LOG( m_infoLogger ) << "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count()
<< ':' << "BFT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - latestBlockTime )
.count();
} else {
clog( VerbosityInfo, "skale-host" )
<< "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count();
LOG( m_infoLogger ) << "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count();
}
latestBlockTime = skaledTimeFinish;
LOG( m_debugLogger ) << cc::success( "Successfully imported " ) << n_succeeded
<< cc::success( " of " ) << out_txns.size()
<< cc::success( " transactions" ) << std::endl;
LOG( m_debugLogger ) << "Successfully imported " << n_succeeded << " of " << out_txns.size()
<< " transactions";

if ( have_consensus_born )
if ( haveConsensusBorn )
this->m_lastBlockWithBornTransactions = _blockID;

logState();

clog( VerbosityInfo, "skale-host" )
<< "TQBYTES:CTQ:" << m_tq.status().currentBytes << ":FTQ:" << m_tq.status().futureBytes
<< ":TQSIZE:CTQ:" << m_tq.status().current << ":FTQ:" << m_tq.status().future;
LOG( m_infoLogger ) << "TQBYTES:CTQ:" << m_tq.status().currentBytes
<< ":FTQ:" << m_tq.status().futureBytes
<< ":TQSIZE:CTQ:" << m_tq.status().current
<< ":FTQ:" << m_tq.status().future;

if ( m_instanceMonitor != nullptr ) {
if ( m_instanceMonitor->isTimeToRotate( _timeStamp ) ) {
m_instanceMonitor->prepareRotation();
m_ignoreNewBlocks = true;
m_consensus->exitGracefully();
ExitHandler::exitHandler( -1, ExitHandler::ec_rotation_complete );
clog( VerbosityInfo, "skale-host" ) << "Rotation is completed. Instance is exiting";
LOG( m_infoLogger ) << "Rotation is completed. Instance is exiting";
}
}
} catch ( const std::exception& ex ) {
cerror << "CRITICAL " << ex.what() << " (in createBlock)";
cerror << "\n" << skutils::signal::generate_stack_trace() << "\n" << std::endl;
LOG( m_errorLogger ) << "CRITICAL " << ex.what() << " (in createBlock)";
LOG( m_errorLogger ) << "\n" << skutils::signal::generate_stack_trace() << "\n";
} catch ( ... ) {
cerror << "CRITICAL unknown exception (in createBlock)";
cerror << "\n" << skutils::signal::generate_stack_trace() << "\n" << std::endl;
LOG( m_errorLogger ) << "CRITICAL unknown exception (in createBlock)";
LOG( m_errorLogger ) << "\n" << skutils::signal::generate_stack_trace() << "\n";
}

void SkaleHost::startWorking() {
Expand All @@ -762,27 +752,26 @@ void SkaleHost::startWorking() {
// recursively calls this func - so working is still false!)
working = true;

if ( !this->m_client.chainParams().nodeInfo.syncNode ) {
try {
m_broadcaster->startService();
} catch ( const Broadcaster::StartupException& ) {
working = false;
std::throw_with_nested( SkaleHost::CreationException() );
}

auto bcast_func = std::bind( &SkaleHost::broadcastFunc, this );
m_broadcastThread = std::thread( bcast_func );
try {
m_broadcaster->startService();
} catch ( const Broadcaster::StartupException& ) {
working = false;
std::throw_with_nested( SkaleHost::CreationException() );
} catch ( ... ) {
working = false;
std::throw_with_nested( std::runtime_error( "Error in starting broadcaster service" ) );
}

auto csus_func = [&]() {
auto broadcastFunction = std::bind( &SkaleHost::broadcastFunc, this );
m_broadcastThread = std::thread( broadcastFunction );

auto consensusFunction = [&]() {
try {
m_consensus->startAll();
} catch ( const std::exception& ) {
} catch ( ... ) {
// cleanup
m_exitNeeded = true;
if ( !this->m_client.chainParams().nodeInfo.syncNode ) {
m_broadcastThread.join();
}
m_broadcastThread.join();
ExitHandler::exitHandler( -1, ExitHandler::ec_termninated_by_signal );
return;
}
Expand Down Expand Up @@ -813,7 +802,7 @@ void SkaleHost::startWorking() {
// m_consensus->setEmptyBlockIntervalMs( tmp_interval );
}; // func

m_consensusThread = std::thread( csus_func );
m_consensusThread = std::thread( consensusFunction );
}

// TODO finish all gracefully to allow all undone jobs be finished
Expand Down
4 changes: 4 additions & 0 deletions libethereum/SkaleHost.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ class SkaleHost {

bool m_broadcastEnabled;


dev::Logger m_errorLogger{ dev::createLogger( dev::VerbosityError, "skale-host" ) };
dev::Logger m_warningLogger{ dev::createLogger( dev::VerbosityWarning, "skale-host" ) };
dev::Logger m_infoLogger{ dev::createLogger( dev::VerbosityInfo, "skale-host" ) };
dev::Logger m_debugLogger{ dev::createLogger( dev::VerbosityDebug, "skale-host" ) };
dev::Logger m_traceLogger{ dev::createLogger( dev::VerbosityTrace, "skale-host" ) };
void logState();
Expand Down
26 changes: 0 additions & 26 deletions libweb3jsonrpc/Eth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,6 @@ Eth::Eth( const std::string& configPath, eth::Interface& _eth, eth::AccountHolde
{
}

bool Eth::isEnabledTransactionSending() const {
bool isEnabled = true;
try {
nlohmann::json joConfig = getConfigJSON();
if ( joConfig.count( "skaleConfig" ) == 0 )
throw std::runtime_error( "error config.json file, cannot find \"skaleConfig\"" );
const nlohmann::json& joSkaleConfig = joConfig["skaleConfig"];
if ( joSkaleConfig.count( "nodeInfo" ) == 0 )
throw std::runtime_error(
"error config.json file, cannot find \"skaleConfig\"/\"nodeInfo\"" );
const nlohmann::json& joSkaleConfig_nodeInfo = joSkaleConfig["nodeInfo"];
if ( joSkaleConfig_nodeInfo.count( "syncNode" ) == 0 )
throw std::runtime_error(
"error config.json file, cannot find "
"\"skaleConfig\"/\"nodeInfo\"/\"syncNode\"" );
const nlohmann::json& joSkaleConfig_nodeInfo_syncNode = joSkaleConfig_nodeInfo["syncNode"];
isEnabled = joSkaleConfig_nodeInfo_syncNode.get< bool >() ? false : true;
} catch ( ... ) {
}
return isEnabled;
}

string Eth::eth_protocolVersion() {
return toJS( eth::c_protocolVersion );
}
Expand Down Expand Up @@ -384,8 +362,6 @@ void Eth::setTransactionDefaults( TransactionSkeleton& _t ) {

string Eth::eth_sendTransaction( Json::Value const& _json ) {
try {
if ( !isEnabledTransactionSending() )
throw std::runtime_error( "transacton sending feature is disabled on this instance" );
TransactionSkeleton t = toTransactionSkeleton( _json );
setTransactionDefaults( t );
pair< bool, Secret > ar = m_ethAccounts.authenticate( t );
Expand Down Expand Up @@ -453,8 +429,6 @@ Json::Value Eth::eth_inspectTransaction( std::string const& _rlp ) {
// TODO Catch exceptions for all calls other eth_-calls in outer scope!
/// skale
string Eth::eth_sendRawTransaction( std::string const& _rlp ) {
if ( !isEnabledTransactionSending() )
throw JsonRpcException( "transacton sending feature is disabled on this instance" );
// Don't need to check the transaction signature (CheckTransaction::None) since it
// will be checked as a part of transaction import
Transaction t( jsToBytes( _rlp, OnFailed::Throw ), CheckTransaction::None );
Expand Down
2 changes: 1 addition & 1 deletion libweb3jsonrpc/Net.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Net : public NetFace {
virtual bool net_listening() override;

private:
const dev::eth::ChainParams& m_chainParams;
const dev::eth::ChainParams m_chainParams;
};

} // namespace rpc
Expand Down
Loading

0 comments on commit c18ad39

Please sign in to comment.