Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement/sync node broadcast txs #1651

Merged
merged 19 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ jobs:
./testeth -t BlockQueueSuite -- --express && touch /tmp/BlockQueueSuitePassed
./testeth -t ClientBase -- --express && touch /tmp/ClientBasePassed
./testeth -t EstimateGas -- --express && touch /tmp/EstimateGasPassed
./testeth -t IMABLSPublicKey -- --express && touch /tmp/IMABLSPublicKeyPassed
./testeth -t getHistoricNodesData -- --express && touch /tmp/getHistoricNodesDataPassed
./testeth -t ExtVmSuite -- --express && touch /tmp/ExtVmSuitePassed
./testeth -t GasPricer -- --express && touch /tmp/GasPricerPassed
./testeth -t BasicTests -- --express && touch /tmp/BasicTestsPassed
Expand Down Expand Up @@ -239,7 +239,7 @@ jobs:
ls /tmp/BlockQueueSuitePassed || ./testeth -t BlockQueueSuite -- --express --verbosity 4
ls /tmp/ClientBasePassed || ./testeth -t ClientBase -- --express --verbosity 4
ls /tmp/EstimateGasPassed || ./testeth -t EstimateGas -- --express --verbosity 4
ls /tmp/IMABLSPublicKeyPassed || ./testeth -t IMABLSPublicKey -- --express --verbosity 4
ls /tmp/getHistoricNodesDataPassed || ./testeth -t getHistoricNodesData -- --express --verbosity 4
ls /tmp/ExtVmSuitePassed || ./testeth -t ExtVmSuite -- --express --verbosity 4
ls /tmp/GasPricerPassed || ./testeth -t GasPricer -- --express --verbosity 4
ls /tmp/BasicTestsPassed || ./testeth -t BasicTests -- --express --verbosity 4
Expand Down
2 changes: 1 addition & 1 deletion libconsensus
133 changes: 61 additions & 72 deletions libethereum/SkaleHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,12 @@
std::lock_guard< std::recursive_mutex > lock( m_pending_createMutex );

if ( m_ignoreNewBlocks ) {
clog( VerbosityWarning, "skale-host" ) << "WARNING: skaled got new block #" << _blockID
<< " after timestamp-related exit initiated!";
LOG( m_warningLogger ) << "WARNING: skaled got new block #" << _blockID
<< " after timestamp-related exit initiated!";

Check warning on line 575 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L574-L575

Added lines #L574 - L575 were not covered by tests
return;
}

LOG( m_debugLogger ) << cc::debug( "createBlock " ) << cc::notice( "ID" ) << cc::debug( " = " )
<< cc::warn( "#" ) << cc::num10( _blockID ) << std::endl;
LOG( m_debugLogger ) << "createBlock ID = #" << _blockID;
m_debugTracer.tracepoint( "create_block" );

// convert bytes back to transactions (using caching), delete them from q and push results into
Expand All @@ -587,23 +586,18 @@
dev::h256 stCurrent =
this->m_client.blockInfo( this->m_client.hashFromNumber( _blockID - 1 ) ).stateRoot();

LOG( m_traceLogger ) << cc::debug( "STATE ROOT FOR BLOCK: " )
<< cc::debug( std::to_string( _blockID - 1 ) ) << ' '
<< cc::debug( stCurrent.hex() ) << std::endl;
LOG( m_traceLogger ) << "STATE ROOT FOR BLOCK: " << std::to_string( _blockID - 1 ) << " "
<< stCurrent.hex();

Check warning on line 590 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L590

Added line #L590 was not covered by tests

// FATAL if mismatch in non-default
if ( _winningNodeIndex != 0 && dev::h256::Arith( stCurrent ) != _stateRoot &&
!this->m_client.chainParams().nodeInfo.syncNode ) {
clog( VerbosityError, "skale-host" )
<< cc::fatal( "FATAL STATE ROOT MISMATCH ERROR:" )
<< cc::error( " current state root " )
<< cc::warn( dev::h256::Arith( stCurrent ).str() )
<< cc::error( " is not equal to arrived state root " )
<< cc::warn( _stateRoot.str() ) << cc::error( " with block ID " )
<< cc::notice( "#" ) << cc::num10( _blockID ) << cc::warn( ", " )
<< cc::p( "/data_dir" )
<< cc::error( " cleanup is recommended, exiting with code " )
<< cc::num10( int( ExitHandler::ec_state_root_mismatch ) ) << "...";
LOG( m_errorLogger ) << "FATAL STATE ROOT MISMATCH ERROR: current state root "
<< dev::h256::Arith( stCurrent ).str()
<< " is not equal to arrived state root " << _stateRoot.str()
<< " with block ID #" << _blockID
<< ", /data_dir cleanup is recommended, exiting with code "
<< int( ExitHandler::ec_state_root_mismatch ) << "...";

Check warning on line 600 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L595-L600

Added lines #L595 - L600 were not covered by tests
if ( AmsterdamFixPatch::stateRootCheckingEnabled( m_client ) ) {
m_ignoreNewBlocks = true;
m_consensus->exitGracefully();
Expand All @@ -613,16 +607,14 @@

// WARN if default but non-zero
if ( _winningNodeIndex == 0 && _stateRoot != u256() )
clog( VerbosityWarning, "skale-host" )
<< cc::warn( "WARNING: STATE ROOT MISMATCH!" )
<< cc::warn( " Current block is DEFAULT BUT arrived state root is " )
<< cc::warn( _stateRoot.str() ) << cc::warn( " with block ID " )
<< cc::notice( "#" ) << cc::num10( _blockID );
LOG( m_warningLogger ) << "WARNING: STATE ROOT MISMATCH!"

Check warning on line 610 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L610

Added line #L610 was not covered by tests
<< "Current block is DEFAULT BUT arrived state root is "
<< _stateRoot.str() << " with block ID #" << _blockID;

Check warning on line 612 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L612

Added line #L612 was not covered by tests
}

std::vector< Transaction > out_txns; // resultant Transaction vector

std::atomic_bool have_consensus_born = false; // means we need to re-verify old txns
std::atomic_bool haveConsensusBorn = false; // means we need to re-verify old txns

// HACK this is for not allowing new transactions in tq between deletion and block creation!
// TODO decouple SkaleHost and Client!!!
Expand All @@ -636,11 +628,11 @@
for ( auto it = _approvedTransactions.begin(); it != _approvedTransactions.end(); ++it ) {
const bytes& data = *it;
h256 sha = sha3( data );
LOG( m_traceLogger ) << cc::debug( "Arrived txn: " ) << sha << std::endl;
LOG( m_traceLogger ) << "Arrived txn: " << sha;
jarrProcessedTxns.push_back( toJS( sha ) );
#ifdef DEBUG_TX_BALANCE
if ( sent.count( sha ) != m_transaction_cache.count( sha.asArray() ) ) {
std::cerr << cc::error( "createBlock assert" ) << std::endl;
LOG( m_errorLogger ) << "createBlock assert";
// sleep(200);
assert( sent.count( sha ) == m_transaction_cache.count( sha.asArray() ) );
}
Expand All @@ -667,14 +659,14 @@
Transaction t( data, CheckTransaction::Everything, true );
t.checkOutExternalGas( m_client.chainParams().externalGasDifficulty );
out_txns.push_back( t );
LOG( m_debugLogger ) << "Will import consensus-born txn!";
LOG( m_debugLogger ) << "Will import consensus-born txn";
m_debugTracer.tracepoint( "import_consensus_born" );
have_consensus_born = true;
haveConsensusBorn = true;
}

if ( m_tq.knownTransactions().count( sha ) != 0 ) {
// TODO fix this!!?
clog( VerbosityWarning, "skale-host" )
<< "Consensus returned 'future'' transaction that we didn't yet send!!";
LOG( m_traceLogger )
<< "Consensus returned future transaction that we didn't yet send";

Check warning on line 669 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L669

Added line #L669 was not covered by tests
m_debugTracer.tracepoint( "import_future" );
}

Expand Down Expand Up @@ -707,51 +699,49 @@
boost::chrono::high_resolution_clock::time_point skaledTimeFinish =
boost::chrono::high_resolution_clock::now();
if ( latestBlockTime != boost::chrono::high_resolution_clock::time_point() ) {
clog( VerbosityInfo, "skale-host" )
<< "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count()
<< ':' << "BFT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - latestBlockTime )
.count();
LOG( m_infoLogger ) << "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count()
<< ':' << "BFT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - latestBlockTime )
.count();

Check warning on line 709 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L703-L709

Added lines #L703 - L709 were not covered by tests
} else {
clog( VerbosityInfo, "skale-host" )
<< "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count();
LOG( m_infoLogger ) << "SWT:"
<< boost::chrono::duration_cast< boost::chrono::milliseconds >(
skaledTimeFinish - skaledTimeStart )
.count();

Check warning on line 714 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L712-L714

Added lines #L712 - L714 were not covered by tests
}
latestBlockTime = skaledTimeFinish;
LOG( m_debugLogger ) << cc::success( "Successfully imported " ) << n_succeeded
<< cc::success( " of " ) << out_txns.size()
<< cc::success( " transactions" ) << std::endl;
LOG( m_debugLogger ) << "Successfully imported " << n_succeeded << " of " << out_txns.size()
<< " transactions";

Check warning on line 718 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L718

Added line #L718 was not covered by tests

if ( have_consensus_born )
if ( haveConsensusBorn )
this->m_lastBlockWithBornTransactions = _blockID;

logState();

clog( VerbosityInfo, "skale-host" )
<< "TQBYTES:CTQ:" << m_tq.status().currentBytes << ":FTQ:" << m_tq.status().futureBytes
<< ":TQSIZE:CTQ:" << m_tq.status().current << ":FTQ:" << m_tq.status().future;
LOG( m_infoLogger ) << "TQBYTES:CTQ:" << m_tq.status().currentBytes
<< ":FTQ:" << m_tq.status().futureBytes
<< ":TQSIZE:CTQ:" << m_tq.status().current
<< ":FTQ:" << m_tq.status().future;

Check warning on line 728 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L726-L728

Added lines #L726 - L728 were not covered by tests

if ( m_instanceMonitor != nullptr ) {
if ( m_instanceMonitor->isTimeToRotate( _timeStamp ) ) {
m_instanceMonitor->prepareRotation();
m_ignoreNewBlocks = true;
m_consensus->exitGracefully();
ExitHandler::exitHandler( -1, ExitHandler::ec_rotation_complete );
clog( VerbosityInfo, "skale-host" ) << "Rotation is completed. Instance is exiting";
LOG( m_infoLogger ) << "Rotation is completed. Instance is exiting";

Check warning on line 736 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L736

Added line #L736 was not covered by tests
}
}
} catch ( const std::exception& ex ) {
cerror << "CRITICAL " << ex.what() << " (in createBlock)";
cerror << "\n" << skutils::signal::generate_stack_trace() << "\n" << std::endl;
LOG( m_errorLogger ) << "CRITICAL " << ex.what() << " (in createBlock)";
LOG( m_errorLogger ) << "\n" << skutils::signal::generate_stack_trace() << "\n";

Check warning on line 741 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L740-L741

Added lines #L740 - L741 were not covered by tests
} catch ( ... ) {
cerror << "CRITICAL unknown exception (in createBlock)";
cerror << "\n" << skutils::signal::generate_stack_trace() << "\n" << std::endl;
LOG( m_errorLogger ) << "CRITICAL unknown exception (in createBlock)";
LOG( m_errorLogger ) << "\n" << skutils::signal::generate_stack_trace() << "\n";

Check warning on line 744 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L743-L744

Added lines #L743 - L744 were not covered by tests
}

void SkaleHost::startWorking() {
Expand All @@ -762,27 +752,26 @@
// recursively calls this func - so working is still false!)
working = true;

if ( !this->m_client.chainParams().nodeInfo.syncNode ) {
try {
m_broadcaster->startService();
} catch ( const Broadcaster::StartupException& ) {
working = false;
std::throw_with_nested( SkaleHost::CreationException() );
}

auto bcast_func = std::bind( &SkaleHost::broadcastFunc, this );
m_broadcastThread = std::thread( bcast_func );
try {
m_broadcaster->startService();
} catch ( const Broadcaster::StartupException& ) {
working = false;
std::throw_with_nested( SkaleHost::CreationException() );
} catch ( ... ) {
working = false;
std::throw_with_nested( std::runtime_error( "Error in starting broadcaster service" ) );

Check warning on line 762 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L757-L762

Added lines #L757 - L762 were not covered by tests
}

auto csus_func = [&]() {
auto broadcastFunction = std::bind( &SkaleHost::broadcastFunc, this );
m_broadcastThread = std::thread( broadcastFunction );

auto consensusFunction = [&]() {
try {
m_consensus->startAll();
} catch ( const std::exception& ) {
} catch ( ... ) {

Check warning on line 771 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L771

Added line #L771 was not covered by tests
// cleanup
m_exitNeeded = true;
if ( !this->m_client.chainParams().nodeInfo.syncNode ) {
m_broadcastThread.join();
}
m_broadcastThread.join();

Check warning on line 774 in libethereum/SkaleHost.cpp

View check run for this annotation

Codecov / codecov/patch

libethereum/SkaleHost.cpp#L774

Added line #L774 was not covered by tests
ExitHandler::exitHandler( -1, ExitHandler::ec_termninated_by_signal );
return;
}
Expand Down Expand Up @@ -813,7 +802,7 @@
// m_consensus->setEmptyBlockIntervalMs( tmp_interval );
}; // func

m_consensusThread = std::thread( csus_func );
m_consensusThread = std::thread( consensusFunction );
}

// TODO finish all gracefully to allow all undone jobs be finished
Expand Down
4 changes: 4 additions & 0 deletions libethereum/SkaleHost.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ class SkaleHost {

bool m_broadcastEnabled;


dev::Logger m_errorLogger{ dev::createLogger( dev::VerbosityError, "skale-host" ) };
dev::Logger m_warningLogger{ dev::createLogger( dev::VerbosityWarning, "skale-host" ) };
dev::Logger m_infoLogger{ dev::createLogger( dev::VerbosityInfo, "skale-host" ) };
dev::Logger m_debugLogger{ dev::createLogger( dev::VerbosityDebug, "skale-host" ) };
dev::Logger m_traceLogger{ dev::createLogger( dev::VerbosityTrace, "skale-host" ) };
void logState();
Expand Down
26 changes: 0 additions & 26 deletions libweb3jsonrpc/Eth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,6 @@ Eth::Eth( const std::string& configPath, eth::Interface& _eth, eth::AccountHolde
{
}

bool Eth::isEnabledTransactionSending() const {
bool isEnabled = true;
try {
nlohmann::json joConfig = getConfigJSON();
if ( joConfig.count( "skaleConfig" ) == 0 )
throw std::runtime_error( "error config.json file, cannot find \"skaleConfig\"" );
const nlohmann::json& joSkaleConfig = joConfig["skaleConfig"];
if ( joSkaleConfig.count( "nodeInfo" ) == 0 )
throw std::runtime_error(
"error config.json file, cannot find \"skaleConfig\"/\"nodeInfo\"" );
const nlohmann::json& joSkaleConfig_nodeInfo = joSkaleConfig["nodeInfo"];
if ( joSkaleConfig_nodeInfo.count( "syncNode" ) == 0 )
throw std::runtime_error(
"error config.json file, cannot find "
"\"skaleConfig\"/\"nodeInfo\"/\"syncNode\"" );
const nlohmann::json& joSkaleConfig_nodeInfo_syncNode = joSkaleConfig_nodeInfo["syncNode"];
isEnabled = joSkaleConfig_nodeInfo_syncNode.get< bool >() ? false : true;
} catch ( ... ) {
}
return isEnabled;
}

string Eth::eth_protocolVersion() {
return toJS( eth::c_protocolVersion );
}
Expand Down Expand Up @@ -384,8 +362,6 @@ void Eth::setTransactionDefaults( TransactionSkeleton& _t ) {

string Eth::eth_sendTransaction( Json::Value const& _json ) {
try {
if ( !isEnabledTransactionSending() )
throw std::runtime_error( "transacton sending feature is disabled on this instance" );
TransactionSkeleton t = toTransactionSkeleton( _json );
setTransactionDefaults( t );
pair< bool, Secret > ar = m_ethAccounts.authenticate( t );
Expand Down Expand Up @@ -453,8 +429,6 @@ Json::Value Eth::eth_inspectTransaction( std::string const& _rlp ) {
// TODO Catch exceptions for all calls other eth_-calls in outer scope!
/// skale
string Eth::eth_sendRawTransaction( std::string const& _rlp ) {
if ( !isEnabledTransactionSending() )
throw JsonRpcException( "transacton sending feature is disabled on this instance" );
// Don't need to check the transaction signature (CheckTransaction::None) since it
// will be checked as a part of transaction import
Transaction t( jsToBytes( _rlp, OnFailed::Throw ), CheckTransaction::None );
Expand Down
2 changes: 1 addition & 1 deletion libweb3jsonrpc/Net.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class Net : public NetFace {
virtual bool net_listening() override;

private:
const dev::eth::ChainParams& m_chainParams;
const dev::eth::ChainParams m_chainParams;
};

} // namespace rpc
Expand Down
Loading
Loading