From 81b7fb7b18cd8997bf9deca1eeee8dfcb82757bb Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Fri, 22 Nov 2024 11:55:31 +0000 Subject: [PATCH 1/8] Use channels rather than BlockingCollection for block processing --- .../Nethermind.Consensus.Ethash/Ethash.cs | 10 +- .../Processing/BlockchainProcessor.cs | 155 +++++++----------- .../Processing/ProcessingStats.cs | 98 +++++------ .../Threading/ThreadExtensions.cs | 9 +- src/Nethermind/Nethermind.Evm/Metrics.cs | 7 +- .../Nethermind.Evm/VirtualMachine.cs | 2 +- .../Nethermind.Trie/Pruning/TrieStore.cs | 4 +- 7 files changed, 139 insertions(+), 146 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus.Ethash/Ethash.cs b/src/Nethermind/Nethermind.Consensus.Ethash/Ethash.cs index ad57e69a64a..1ab94f174a8 100644 --- a/src/Nethermind/Nethermind.Consensus.Ethash/Ethash.cs +++ b/src/Nethermind/Nethermind.Consensus.Ethash/Ethash.cs @@ -247,7 +247,15 @@ private IEthashDataSet BuildCache(uint epoch) _cacheStopwatch.Restart(); IEthashDataSet dataSet = new EthashCache(cacheSize, seed.Bytes); _cacheStopwatch.Stop(); - if (_logger.IsInfo) _logger.Info($"Cache for epoch {epoch} with size {cacheSize} and seed {seed.Bytes.ToHexString()} built in {_cacheStopwatch.ElapsedMilliseconds}ms"); + if (_logger.IsInfo) + { + var seedText = seed.Bytes.ToHexString(withZeroX: true); + if (seedText.Length > 17) + { + seedText = $"{seedText[..8]}...{seedText[^6..]}"; + } + _logger.Info($"Cache for epoch {epoch} with size {cacheSize} and seed {seedText} built in {_cacheStopwatch.ElapsedMilliseconds}ms"); + } return dataSet; } diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs index b302da9177e..afca3e291db 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs @@ -2,11 +2,11 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Blockchain.Find; @@ -14,6 +14,7 @@ using Nethermind.Core.Attributes; using Nethermind.Core.Crypto; using Nethermind.Core.Extensions; +using Nethermind.Core.Threading; using Nethermind.Evm.Tracing; using Nethermind.Evm.Tracing.GethStyle; using Nethermind.Evm.Tracing.ParityStyle; @@ -27,10 +28,10 @@ namespace Nethermind.Consensus.Processing; public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessingQueue { public int SoftMaxRecoveryQueueSizeInTx = 10000; // adjust based on tx or gas - public const int MaxProcessingQueueSize = 2000; // adjust based on tx or gas + public const int MaxProcessingQueueSize = 2048; // adjust based on tx or gas - [ThreadStatic] private static bool _isMainProcessingThread; - public static bool IsMainProcessingThread => _isMainProcessingThread; + private static AsyncLocal _isMainProcessingThread = new(); + public static bool IsMainProcessingThread => _isMainProcessingThread.Value; public bool IsMainProcessor { get; init; } public ITracerBag Tracers => _compositeBlockTracer; @@ -42,10 +43,10 @@ public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessing private readonly IBlockTree _blockTree; private readonly ILogger _logger; - private readonly BlockingCollection _recoveryQueue = new(new ConcurrentQueue()); + private readonly Channel _recoveryQueue = Channel.CreateUnbounded(); + private bool _recoveryComplete = false; - private readonly BlockingCollection _blockQueue = new(new ConcurrentQueue(), - MaxProcessingQueueSize); + private readonly Channel _blockQueue = Channel.CreateBounded(MaxProcessingQueueSize); private int _queueCount; @@ -122,19 +123,19 @@ public void Enqueue(Block block, ProcessingOptions processingOptions) ? new BlockRef(blockHash, processingOptions) : new BlockRef(block, processingOptions); - if (!_recoveryQueue.IsAddingCompleted) + if (!_recoveryComplete) { Interlocked.Increment(ref _queueCount); try { - _recoveryQueue.Add(blockRef); + _recoveryQueue.Writer.TryWrite(blockRef); if (_logger.IsTrace) _logger.Trace($"A new block {block.ToString(Block.Format.Short)} enqueued for processing."); } catch (Exception e) { Interlocked.Decrement(ref _queueCount); BlockRemoved?.Invoke(this, new BlockRemovedEventArgs(blockHash, ProcessingResult.QueueException, e)); - if (e is not InvalidOperationException || !_recoveryQueue.IsAddingCompleted) + if (e is not InvalidOperationException || !_recoveryComplete) { throw; } @@ -151,59 +152,42 @@ public void Start() public async Task StopAsync(bool processRemainingBlocks = false) { + _recoveryComplete = true; if (processRemainingBlocks) { - _recoveryQueue.CompleteAdding(); + _recoveryQueue.Writer.TryComplete(); await (_recoveryTask ?? Task.CompletedTask); - _blockQueue.CompleteAdding(); + _blockQueue.Writer.TryComplete(); } else { _loopCancellationSource?.Cancel(); - _recoveryQueue.CompleteAdding(); - _blockQueue.CompleteAdding(); + _recoveryQueue.Writer.TryComplete(); + _blockQueue.Writer.TryComplete(); } await Task.WhenAll(_recoveryTask ?? Task.CompletedTask, _processorTask ?? Task.CompletedTask); if (_logger.IsInfo) _logger.Info("Blockchain Processor shutdown complete.. please wait for all components to close"); } - private Task RunRecovery() + private async Task RunRecovery() { - TaskCompletionSource tcs = new(); - - Thread thread = new(() => + try { - try - { - RunRecoveryLoop(); - if (_logger.IsDebug) _logger.Debug("Sender address recovery complete."); - } - catch (OperationCanceledException) - { - if (_logger.IsDebug) _logger.Debug("Sender address recovery stopped."); - } - catch (Exception ex) - { - if (_logger.IsError) _logger.Error("Sender address recovery encountered an exception.", ex); - } - finally - { - tcs.SetResult(); - } - }) + await RunRecoveryLoop(); + if (_logger.IsDebug) _logger.Debug("Sender address recovery complete."); + } + catch (OperationCanceledException) { - IsBackground = true, - Name = "Block Recovery", - // Boost priority to make sure we process blocks as fast as possible - Priority = ThreadPriority.AboveNormal, - }; - thread.Start(); - - return tcs.Task; + if (_logger.IsDebug) _logger.Debug("Sender address recovery stopped."); + } + catch (Exception ex) + { + if (_logger.IsError) _logger.Error("Sender address recovery encountered an exception.", ex); + } } - private void RunRecoveryLoop() + private async Task RunRecoveryLoop() { void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Exception? exception = null) { @@ -212,9 +196,9 @@ void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Except FireProcessingQueueEmpty(); } - if (_logger.IsDebug) _logger.Debug($"Starting recovery loop - {_blockQueue.Count} blocks waiting in the queue."); + if (_logger.IsDebug) _logger.Debug($"Starting recovery loop - {_blockQueue.Reader.Count} blocks waiting in the queue."); _lastProcessedBlock = DateTime.UtcNow; - foreach (BlockRef blockRef in _recoveryQueue.GetConsumingEnumerable(_loopCancellationSource.Token)) + await foreach (BlockRef blockRef in _recoveryQueue.Reader.ReadAllAsync(_loopCancellationSource.Token)) { try { @@ -226,9 +210,9 @@ void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Except try { - _blockQueue.Add(blockRef); + await _blockQueue.Writer.WriteAsync(blockRef); } - catch (Exception e) + catch (Exception e) when (e is not OperationCanceledException) { DecrementQueue(blockRef.BlockHash, ProcessingResult.QueueException, e); @@ -255,54 +239,37 @@ void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Except } } - private Task RunProcessing() + private async Task RunProcessing() { - TaskCompletionSource tcs = new(); + _isMainProcessingThread.Value = IsMainProcessor; - Thread thread = new(() => + try { - _isMainProcessingThread = IsMainProcessor; - - try - { - RunProcessingLoop(); - if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} complete."); - } - catch (OperationCanceledException) - { - if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} stopped."); - } - catch (Exception ex) - { - if (_logger.IsError) _logger.Error($"{nameof(BlockchainProcessor)} encountered an exception.", ex); - } - finally - { - tcs.SetResult(); - } - }) + await RunProcessingLoop(); + if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} complete."); + } + catch (OperationCanceledException) { - IsBackground = true, - Name = "Block Processor", - // Boost priority to make sure we process blocks as fast as possible - Priority = ThreadPriority.Highest, - }; - thread.Start(); - - return tcs.Task; + if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} stopped."); + } + catch (Exception ex) + { + if (_logger.IsError) _logger.Error($"{nameof(BlockchainProcessor)} encountered an exception.", ex); + } } - private void RunProcessingLoop() + private async Task RunProcessingLoop() { - if (_logger.IsDebug) _logger.Debug($"Starting block processor - {_blockQueue.Count} blocks waiting in the queue."); + if (_logger.IsDebug) _logger.Debug($"Starting block processor - {_blockQueue.Reader.Count} blocks waiting in the queue."); FireProcessingQueueEmpty(); GCScheduler.Instance.SwitchOnBackgroundGC(0); - foreach (BlockRef blockRef in _blockQueue.GetConsumingEnumerable(_loopCancellationSource.Token)) + await foreach (BlockRef blockRef in _blockQueue.Reader.ReadAllAsync(_loopCancellationSource.Token)) { + using var handle = Thread.CurrentThread.BoostPriorityHighest(); // Have block, switch off background GC timer - GCScheduler.Instance.SwitchOffBackgroundGC(_blockQueue.Count); + GCScheduler.Instance.SwitchOffBackgroundGC(_blockQueue.Reader.Count); try { @@ -329,7 +296,7 @@ private void RunProcessingLoop() BlockRemoved?.Invoke(this, new BlockRemovedEventArgs(blockRef.BlockHash, ProcessingResult.Success)); } } - catch (Exception exception) + catch (Exception exception) when (exception is not OperationCanceledException) { if (_logger.IsWarn) _logger.Warn($"Processing loop threw an exception. Block: {blockRef}, Exception: {exception}"); BlockRemoved?.Invoke(this, new BlockRemovedEventArgs(blockRef.BlockHash, ProcessingResult.Exception, exception)); @@ -339,10 +306,10 @@ private void RunProcessingLoop() Interlocked.Decrement(ref _queueCount); } - if (_logger.IsTrace) _logger.Trace($"Now {_blockQueue.Count} blocks waiting in the queue."); + if (_logger.IsTrace) _logger.Trace($"Now {_blockQueue.Reader.Count} blocks waiting in the queue."); FireProcessingQueueEmpty(); - GCScheduler.Instance.SwitchOnBackgroundGC(_blockQueue.Count); + GCScheduler.Instance.SwitchOnBackgroundGC(_blockQueue.Reader.Count); } if (_logger.IsInfo) _logger.Info("Block processor queue stopped."); @@ -389,6 +356,9 @@ private void FireProcessingQueueEmpty() return null; } + bool readonlyChain = options.ContainsFlag(ProcessingOptions.ReadOnlyChain); + if (!readonlyChain) _stats.CaptureStartStats(); + ProcessingBranch processingBranch = PrepareProcessingBranch(suggestedBlock, options); PrepareBlocksToProcess(suggestedBlock, options, processingBranch); @@ -412,14 +382,12 @@ private void FireProcessingQueueEmpty() if (_logger.IsDebug) _logger.Debug($"Skipped processing of {suggestedBlock.ToString(Block.Format.FullHashAndNumber)}, last processed is null: {true}, processedBlocks.Length: {processedBlocks.Length}"); } - bool readonlyChain = options.ContainsFlag(ProcessingOptions.ReadOnlyChain); if (!readonlyChain) { long blockProcessingTimeInMicrosecs = _stopwatch.ElapsedMicroseconds(); Metrics.LastBlockProcessingTimeInMs = blockProcessingTimeInMicrosecs / 1000; - Metrics.RecoveryQueueSize = _recoveryQueue.Count; - Metrics.ProcessingQueueSize = _blockQueue.Count; - + Metrics.RecoveryQueueSize = _recoveryQueue.Reader.Count; + Metrics.ProcessingQueueSize = _blockQueue.Reader.Count; _stats.UpdateStats(lastProcessed, processingBranch.Root, blockProcessingTimeInMicrosecs); } @@ -761,8 +729,9 @@ private bool RunSimpleChecksAheadOfProcessing(Block suggestedBlock, ProcessingOp public void Dispose() { - _recoveryQueue.Dispose(); - _blockQueue.Dispose(); + _recoveryComplete = true; + _recoveryQueue.Writer.TryComplete(); + _blockQueue.Writer.TryComplete(); _loopCancellationSource?.Dispose(); _blockTree.NewBestSuggestedBlock -= OnNewBestBlock; _blockTree.NewHeadBlock -= OnNewHeadBlock; diff --git a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs index 3dbba850b58..e00e97f0c29 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs @@ -22,32 +22,35 @@ internal class ProcessingStats : IThreadPoolWorkItem private readonly IStateReader _stateReader; private readonly ILogger _logger; private readonly Stopwatch _runStopwatch = new(); + + private Block? _lastBlock; + private Hash256? _lastBranchRoot; + private double _lastTotalMGas; private long _lastBlockNumber; private long _lastElapsedRunningMicroseconds; - private double _lastTotalMGas; private long _lastTotalTx; - private long _lastTotalCalls; - private long _lastTotalEmptyCalls; - private long _lastTotalSLoad; - private long _lastTotalSStore; - private long _lastSelfDestructs; + private long _runningMicroseconds; + private long _runMicroseconds; private long _chunkProcessingMicroseconds; - private long _lastTotalCreates; + private long _currentReportMs; private long _lastReportMs; + + private long _lastCallOps; + private long _currentCallOps; + private long _lastEmptyCalls; + private long _currentEmptyCalls; + private long _lastSLoadOps; + private long _currentSLoadOps; + private long _lastSStoreOps; + private long _currentSStoreOps; + private long _lastSelfDestructOps; + private long _currentSelfDestructOps; + private long _lastCreateOps; + private long _currentCreatesOps; private long _lastContractsAnalyzed; + private long _currentContractsAnalyzed; private long _lastCachedContractsUsed; - private long _runningMicroseconds; - private long _runMicroseconds; - private long _reportMs; - private Block? _lastBlock; - private Hash256? _lastBranchRoot; - private long _sloadOpcodeProcessing; - private long _sstoreOpcodeProcessing; - private long _callsProcessing; - private long _emptyCallsProcessing; - private long _codeDbCacheProcessing; - private long _contractAnalysedProcessing; - private long _createsProcessing; + private long _currentCachedContractsUsed; public ProcessingStats(IStateReader stateReader, ILogger logger) { @@ -61,6 +64,18 @@ public ProcessingStats(IStateReader stateReader, ILogger logger) #endif } + public void CaptureStartStats() + { + _lastSLoadOps = Evm.Metrics.ThreadLocalSLoadOpcode; + _lastSStoreOps = Evm.Metrics.ThreadLocalSStoreOpcode; + _lastCallOps = Evm.Metrics.ThreadLocalCalls; + _lastEmptyCalls = Evm.Metrics.ThreadLocalEmptyCalls; + _lastCachedContractsUsed = Db.Metrics.ThreadLocalCodeDbCache; + _lastContractsAnalyzed = Evm.Metrics.ThreadLocalContractsAnalysed; + _lastCreateOps = Evm.Metrics.ThreadLocalCreates; + _lastSelfDestructOps = Evm.Metrics.ThreadLocalSelfDestructs; + } + public void UpdateStats(Block? block, Hash256 branchRoot, long blockProcessingTimeInMicros) { if (block is null) return; @@ -85,19 +100,20 @@ public void UpdateStats(Block? block, Hash256 branchRoot, long blockProcessingTi _runningMicroseconds = _runStopwatch.ElapsedMicroseconds(); _runMicroseconds = (_runningMicroseconds - _lastElapsedRunningMicroseconds); - long reportMs = _reportMs = Environment.TickCount64; + long reportMs = _currentReportMs = Environment.TickCount64; if (reportMs - _lastReportMs > 1000 || _logger.IsDebug) { - _lastReportMs = _reportMs; + _lastReportMs = _currentReportMs; _lastBlock = block; _lastBranchRoot = branchRoot; - _sloadOpcodeProcessing = Evm.Metrics.ThreadLocalSLoadOpcode; - _sstoreOpcodeProcessing = Evm.Metrics.ThreadLocalSStoreOpcode; - _callsProcessing = Evm.Metrics.ThreadLocalCalls; - _emptyCallsProcessing = Evm.Metrics.ThreadLocalEmptyCalls; - _codeDbCacheProcessing = Db.Metrics.ThreadLocalCodeDbCache; - _contractAnalysedProcessing = Evm.Metrics.ThreadLocalContractsAnalysed; - _createsProcessing = Evm.Metrics.ThreadLocalCreates; + _currentSLoadOps = Evm.Metrics.ThreadLocalSLoadOpcode; + _currentSStoreOps = Evm.Metrics.ThreadLocalSStoreOpcode; + _currentCallOps = Evm.Metrics.ThreadLocalCalls; + _currentEmptyCalls = Evm.Metrics.ThreadLocalEmptyCalls; + _currentCachedContractsUsed = Db.Metrics.ThreadLocalCodeDbCache; + _currentContractsAnalyzed = Evm.Metrics.ThreadLocalContractsAnalysed; + _currentCreatesOps = Evm.Metrics.ThreadLocalCreates; + _currentSelfDestructOps = Evm.Metrics.ThreadLocalSelfDestructs; GenerateReport(); } } @@ -138,8 +154,6 @@ void IThreadPoolWorkItem.Execute() UInt256 afterBalance = _stateReader.GetBalance(block.StateRoot, beneficiary); UInt256 rewards = beforeBalance < afterBalance ? afterBalance - beforeBalance : default; - long currentSelfDestructs = Evm.Metrics.SelfDestructs; - long chunkBlocks = Metrics.Blocks - _lastBlockNumber; double chunkMicroseconds = _chunkProcessingMicroseconds; @@ -162,13 +176,13 @@ void IThreadPoolWorkItem.Execute() if (_logger.IsInfo) { long chunkTx = Metrics.Transactions - _lastTotalTx; - long chunkCalls = _callsProcessing - _lastTotalCalls; - long chunkEmptyCalls = _emptyCallsProcessing - _lastTotalEmptyCalls; - long chunkCreates = _createsProcessing - _lastTotalCreates; - long chunkSload = _sloadOpcodeProcessing - _lastTotalSLoad; - long chunkSstore = _sstoreOpcodeProcessing - _lastTotalSStore; - long contractsAnalysed = _contractAnalysedProcessing - _lastContractsAnalyzed; - long cachedContractsUsed = _codeDbCacheProcessing - _lastCachedContractsUsed; + long chunkCalls = _currentCallOps - _lastCallOps; + long chunkEmptyCalls = _currentEmptyCalls - _lastEmptyCalls; + long chunkCreates = _currentCreatesOps - _lastCreateOps; + long chunkSload = _currentSLoadOps - _lastSLoadOps; + long chunkSstore = _currentSStoreOps - _lastSStoreOps; + long contractsAnalysed = _currentContractsAnalyzed - _lastContractsAnalyzed; + long cachedContractsUsed = _currentCachedContractsUsed - _lastCachedContractsUsed; double txps = chunkMicroseconds == 0 ? -1 : chunkTx / chunkMicroseconds * 1_000_000.0; double bps = chunkMicroseconds == 0 ? -1 : chunkBlocks / chunkMicroseconds * 1_000_000.0; double chunkMs = (chunkMicroseconds == 0 ? -1 : chunkMicroseconds / 1000.0); @@ -249,7 +263,7 @@ void IThreadPoolWorkItem.Execute() var recoveryQueue = Metrics.RecoveryQueueSize; var processingQueue = Metrics.ProcessingQueueSize; - _logger.Info($" Block{(chunkBlocks > 1 ? $"s x{chunkBlocks,-9:N0} " : $"{(isMev ? " mev" : " ")} {rewards.ToDecimal(null) / weiToEth,5:N4}{BlocksConfig.GasTokenTicker,4}")}{(chunkBlocks == 1 ? mgasColor : "")} {chunkMGas,7:F2}{resetColor} MGas | {chunkTx,8:N0} txs | calls {callsColor}{chunkCalls,6:N0}{resetColor} {darkGreyText}({chunkEmptyCalls,3:N0}){resetColor} | sload {chunkSload,7:N0} | sstore {sstoreColor}{chunkSstore,6:N0}{resetColor} | create {createsColor}{chunkCreates,3:N0}{resetColor}{(currentSelfDestructs - _lastSelfDestructs > 0 ? $"{darkGreyText}({-(currentSelfDestructs - _lastSelfDestructs),3:N0}){resetColor}" : "")}"); + _logger.Info($" Block{(chunkBlocks > 1 ? $"s x{chunkBlocks,-9:N0} " : $"{(isMev ? " mev" : " ")} {rewards.ToDecimal(null) / weiToEth,5:N4}{BlocksConfig.GasTokenTicker,4}")}{(chunkBlocks == 1 ? mgasColor : "")} {chunkMGas,7:F2}{resetColor} MGas | {chunkTx,8:N0} txs | calls {callsColor}{chunkCalls,6:N0}{resetColor} {darkGreyText}({chunkEmptyCalls,3:N0}){resetColor} | sload {chunkSload,7:N0} | sstore {sstoreColor}{chunkSstore,6:N0}{resetColor} | create {createsColor}{chunkCreates,3:N0}{resetColor}{(_currentSelfDestructOps - _lastSelfDestructOps > 0 ? $"{darkGreyText}({-(_currentSelfDestructOps - _lastSelfDestructOps),3:N0}){resetColor}" : "")}"); if (recoveryQueue > 0 || processingQueue > 0) { _logger.Info($" Block throughput {mgasPerSecondColor}{mgasPerSecond,11:F2}{resetColor} MGas/s{(mgasPerSecond > 1000 ? "🔥" : " ")}| {txps,10:N1} tps | {bps,7:F2} Blk/s | recover {recoveryQueue,5:N0} | process {processingQueue,5:N0}"); @@ -260,18 +274,10 @@ void IThreadPoolWorkItem.Execute() } } - _lastCachedContractsUsed = _codeDbCacheProcessing; - _lastContractsAnalyzed = _contractAnalysedProcessing; _lastBlockNumber = Metrics.Blocks; _lastTotalMGas = Metrics.Mgas; _lastElapsedRunningMicroseconds = _runningMicroseconds; _lastTotalTx = Metrics.Transactions; - _lastTotalCalls = _callsProcessing; - _lastTotalEmptyCalls = _emptyCallsProcessing; - _lastTotalCreates = _createsProcessing; - _lastTotalSLoad = _sloadOpcodeProcessing; - _lastTotalSStore = _sstoreOpcodeProcessing; - _lastSelfDestructs = currentSelfDestructs; _chunkProcessingMicroseconds = 0; } diff --git a/src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs b/src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs index 002c6dadbe5..4206df64442 100644 --- a/src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs +++ b/src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs @@ -13,11 +13,11 @@ public static class ThreadExtensions private readonly Thread? _thread; private readonly ThreadPriority _previousPriority; - internal Disposable(Thread thread) + internal Disposable(Thread thread, ThreadPriority priority = ThreadPriority.AboveNormal) { _thread = thread; _previousPriority = thread.Priority; - thread.Priority = ThreadPriority.AboveNormal; + thread.Priority = priority; } public void Dispose() @@ -33,4 +33,9 @@ public static Disposable BoostPriority(this Thread thread) { return new Disposable(thread); } + + public static Disposable BoostPriorityHighest(this Thread thread) + { + return new Disposable(thread, ThreadPriority.Highest); + } } diff --git a/src/Nethermind/Nethermind.Evm/Metrics.cs b/src/Nethermind/Nethermind.Evm/Metrics.cs index 40d0dcbeea3..3dadc87c2a9 100644 --- a/src/Nethermind/Nethermind.Evm/Metrics.cs +++ b/src/Nethermind/Nethermind.Evm/Metrics.cs @@ -16,8 +16,13 @@ public class Metrics [Description("Number of EVM exceptions thrown by contracts.")] public static long EvmExceptions { get; set; } + [CounterMetric] [Description("Number of SELFDESTRUCT calls.")] - public static long SelfDestructs { get; set; } + public static long SelfDestructs => _selfDestructs.GetTotalValue(); + private static readonly ZeroContentionCounter _selfDestructs = new(); + [Description("Number of calls to other contracts on thread.")] + public static long ThreadLocalSelfDestructs => _selfDestructs.ThreadLocalValue; + public static void IncrementSelfDestructs() => _selfDestructs.Increment(); [CounterMetric] [Description("Number of calls to other contracts.")] diff --git a/src/Nethermind/Nethermind.Evm/VirtualMachine.cs b/src/Nethermind/Nethermind.Evm/VirtualMachine.cs index 101b88d120c..01d4eaf5e19 100644 --- a/src/Nethermind/Nethermind.Evm/VirtualMachine.cs +++ b/src/Nethermind/Nethermind.Evm/VirtualMachine.cs @@ -2315,7 +2315,7 @@ private static EvmExceptionType InstructionReturn(EvmState vmState, re private EvmExceptionType InstructionSelfDestruct(EvmState vmState, ref EvmStack stack, ref long gasAvailable, IReleaseSpec spec) where TTracing : struct, IIsTracing { - Metrics.SelfDestructs++; + Metrics.IncrementSelfDestructs(); Address inheritor = stack.PopAddress(); if (inheritor is null) return EvmExceptionType.StackUnderflow; diff --git a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs index cdccc13d808..6207eeee1b7 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs @@ -771,11 +771,11 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path) if (_pruningStrategy.PruningEnabled) { - if (_logger.IsInfo) _logger.Info($"Persisting from root {commitSet.Root?.Keccak} in block {commitSet.BlockNumber}"); + if (_logger.IsInfo) _logger.Info($"Persisting from root {commitSet.Root?.Keccak?.ToShortString()} in block {commitSet.BlockNumber}"); } else { - if (_logger.IsDebug) _logger.Debug($"Persisting from root {commitSet.Root?.Keccak} in block {commitSet.BlockNumber}"); + if (_logger.IsDebug) _logger.Debug($"Persisting from root {commitSet.Root?.Keccak?.ToShortString()} in block {commitSet.BlockNumber}"); } long start = Stopwatch.GetTimestamp(); From 8064c50843e7c45b62eecacf77452b4784b99dd0 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 30 Nov 2024 13:47:06 +0000 Subject: [PATCH 2/8] CliqueBlockProducer to Channel --- .../CliqueBlockProducer.cs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs b/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs index 53b97a680b4..0122714fc10 100644 --- a/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs +++ b/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Text; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using System.Timers; using Nethermind.Blockchain; @@ -67,8 +68,7 @@ public CliqueBlockProducerRunner( _timer.Start(); } - private readonly BlockingCollection _signalsQueue = - new(new ConcurrentQueue()); + private readonly Channel _signalsQueue = Channel.CreateUnbounded(); private Block? _scheduledBlock; @@ -91,7 +91,7 @@ public void UncastVote(Address signer) public void ProduceOnTopOf(Hash256 hash) { - _signalsQueue.Add(_blockTree.FindBlock(hash, BlockTreeLookupOptions.None)); + _signalsQueue.Writer.TryWrite(_blockTree.FindBlock(hash, BlockTreeLookupOptions.None)); } public IReadOnlyDictionary GetProposals() => _blockProducer.Proposals.ToDictionary(); @@ -111,7 +111,7 @@ private void TimerOnElapsed(object sender, ElapsedEventArgs e) { if (_blockTree.Head.Timestamp + _config.BlockPeriod < _timestamper.UnixTime.Seconds) { - _signalsQueue.Add(_blockTree.FindBlock(_blockTree.Head.Hash, BlockTreeLookupOptions.None)); + _signalsQueue.Writer.TryWrite(_blockTree.FindBlock(_blockTree.Head.Hash, BlockTreeLookupOptions.None)); } _timer.Enabled = true; @@ -216,17 +216,17 @@ private Task RunConsumeSignal() private void BlockTreeOnNewHeadBlock(object? sender, BlockEventArgs e) { - _signalsQueue.Add(e.Block); + _signalsQueue.Writer.TryWrite(e.Block); } private async Task ConsumeSignal() { _lastProducedBlock = DateTime.UtcNow; - foreach (Block signal in _signalsQueue.GetConsumingEnumerable(_cancellationTokenSource.Token)) + await foreach (Block signal in _signalsQueue.Reader.ReadAllAsync(_cancellationTokenSource.Token)) { // TODO: Maybe use IBlockProducer specific to clique? Block parentBlock = signal; - while (_signalsQueue.TryTake(out Block? nextSignal)) + while (_signalsQueue.Reader.TryRead(out Block? nextSignal)) { if (parentBlock.Number <= nextSignal.Number) { @@ -258,6 +258,7 @@ public async Task StopAsync() { _blockTree.NewHeadBlock -= BlockTreeOnNewHeadBlock; _cancellationTokenSource?.Cancel(); + _signalsQueue.Writer.TryComplete(); await (_producerTask ?? Task.CompletedTask); } @@ -273,11 +274,12 @@ bool IBlockProducerRunner.IsProducingBlocks(ulong? maxProducingInterval) public event EventHandler? BlockProduced; - public void Dispose() { _cancellationTokenSource?.Dispose(); _timer?.Dispose(); + _signalsQueue.Writer.TryComplete(); + BlockProduced = null; } } From 724a067c67de5684249e8fc94741f41ad13ff4fa Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 30 Nov 2024 13:47:43 +0000 Subject: [PATCH 3/8] SyncPeerPool to Channels --- .../Nethermind.Synchronization/Peers/SyncPeerPool.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs b/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs index 263617effbe..119c37b5c27 100644 --- a/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs +++ b/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Blockchain.Find; @@ -37,7 +38,7 @@ public class SyncPeerPool : ISyncPeerPool, IPeerDifficultyRefreshPool private readonly IBlockTree _blockTree; private readonly ILogger _logger; - private readonly BlockingCollection _peerRefreshQueue = new(); + private readonly Channel _peerRefreshQueue = Channel.CreateUnbounded(); private readonly ConcurrentDictionary _peers = new(); @@ -218,7 +219,7 @@ internal IEnumerable ReplaceableAllocations public void RefreshTotalDifficulty(ISyncPeer syncPeer, Hash256 blockHash) { RefreshTotalDiffTask task = new(blockHash, syncPeer); - _peerRefreshQueue.Add(task); + _peerRefreshQueue.Writer.TryWrite(task); } public void AddPeer(ISyncPeer syncPeer) @@ -257,7 +258,7 @@ public void AddPeer(ISyncPeer syncPeer) { if (_logger.IsDebug) _logger.Debug($"Adding {syncPeer.Node:c} to refresh queue"); if (NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportInterestingEvent(syncPeer.Node.Address, "adding node to refresh queue"); - _peerRefreshQueue.Add(new RefreshTotalDiffTask(syncPeer)); + _peerRefreshQueue.Writer.TryWrite(new RefreshTotalDiffTask(syncPeer)); } } @@ -354,7 +355,6 @@ public async Task Allocate( } } - private bool TryAllocateOnce(IPeerAllocationStrategy peerAllocationStrategy, AllocationContexts allocationContexts, SyncPeerAllocation allocation) { lock (_isAllocatedChecks) @@ -392,7 +392,7 @@ public void Free(SyncPeerAllocation syncPeerAllocation) private async Task RunRefreshPeerLoop() { - foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.GetConsumingEnumerable(_refreshLoopCancellation.Token)) + await foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.Reader.ReadAllAsync(_refreshLoopCancellation.Token)) { ISyncPeer syncPeer = refreshTask.SyncPeer; if (_logger.IsTrace) _logger.Trace($"Refreshing info for {syncPeer}."); @@ -713,7 +713,7 @@ public async ValueTask DisposeAsync() await (_refreshLoopTask ?? Task.CompletedTask); Parallel.ForEach(_peers, p => { p.Value.SyncPeer.Disconnect(DisconnectReason.AppClosing, "App Close"); }); - _peerRefreshQueue.Dispose(); + _peerRefreshQueue.Writer.TryComplete(); _refreshLoopCancellation.Dispose(); _refreshLoopTask?.Dispose(); _signals.Dispose(); From 1cfaf161f45c80d56d7b4f564fd1a340a6564cb7 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 30 Nov 2024 15:08:23 +0000 Subject: [PATCH 4/8] TrieStore to Channels --- .../Nethermind.Trie/Pruning/TrieStore.cs | 47 +++++++------ src/Nethermind/Nethermind.Trie/TrieNode.cs | 67 +++++++++++++++++++ 2 files changed, 93 insertions(+), 21 deletions(-) diff --git a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs index 993126ab29d..c9ad1136c06 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs @@ -3,12 +3,12 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Collections; @@ -790,26 +790,31 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path) // However, anything that we are trying to persist here should still be in dirty cache. // So parallel read should go there first instead of to the database for these dataset, // so it should be fine for these to be non atomic. - using BlockingCollection disposeQueue = new BlockingCollection(4); - - for (int index = 0; index < _disposeTasks.Length; index++) + Channel disposeQueue = Channel.CreateBounded(4); + try { - _disposeTasks[index] = Task.Run(() => + for (int index = 0; index < _disposeTasks.Length; index++) { - while (disposeQueue.TryTake(out INodeStorage.WriteBatch disposable, Timeout.Infinite)) + _disposeTasks[index] = Task.Run(async () => { - disposable.Dispose(); - } - }); - } + await foreach (var disposable in disposeQueue.Reader.ReadAllAsync()) + { + disposable.Dispose(); + } + }); + } - using ArrayPoolList persistNodeStartingFromTasks = parallelStartNodes.Select( - entry => Task.Run(() => PersistNodeStartingFrom(entry.trieNode, entry.address2, entry.path, persistedNodeRecorder, writeFlags, disposeQueue))) - .ToPooledList(parallelStartNodes.Count); + using ArrayPoolList persistNodeStartingFromTasks = parallelStartNodes.Select( + entry => Task.Run(() => PersistNodeStartingFrom(entry.trieNode, entry.address2, entry.path, persistedNodeRecorder, writeFlags, disposeQueue))) + .ToPooledList(parallelStartNodes.Count); - Task.WaitAll(persistNodeStartingFromTasks.AsSpan()); + Task.WaitAll(persistNodeStartingFromTasks.AsSpan()); + } + finally + { + disposeQueue.Writer.Complete(); + } - disposeQueue.CompleteAdding(); Task.WaitAll(_disposeTasks); // Dispose top level last in case something goes wrong, at least the root won't be stored @@ -824,14 +829,14 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path) LastPersistedBlockNumber = commitSet.BlockNumber; } - private void PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path, + private async Task PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path, Action? persistedNodeRecorder, - WriteFlags writeFlags, BlockingCollection disposeQueue) + WriteFlags writeFlags, Channel disposeQueue) { long persistedNodeCount = 0; INodeStorage.WriteBatch writeBatch = _nodeStorage.StartWriteBatch(); - void DoPersist(TrieNode node, Hash256? address3, TreePath path2) + async ValueTask DoPersist(TrieNode node, Hash256? address3, TreePath path2) { persistedNodeRecorder?.Invoke(path2, address3, node); PersistNode(address3, path2, node, writeBatch, writeFlags); @@ -839,13 +844,13 @@ void DoPersist(TrieNode node, Hash256? address3, TreePath path2) persistedNodeCount++; if (persistedNodeCount % 512 == 0) { - disposeQueue.Add(writeBatch); + await disposeQueue.Writer.WriteAsync(writeBatch); writeBatch = _nodeStorage.StartWriteBatch(); } } - tn.CallRecursively(DoPersist, address2, ref path, GetTrieStore(address2), true, _logger); - disposeQueue.Add(writeBatch); + await tn.CallRecursivelyAsync(DoPersist, address2, path, GetTrieStore(address2), true, _logger); + await disposeQueue.Writer.WriteAsync(writeBatch); } private void PersistNode(Hash256? address, in TreePath path, TrieNode currentNode, INodeStorage.WriteBatch writeBatch, WriteFlags writeFlags = WriteFlags.None) diff --git a/src/Nethermind/Nethermind.Trie/TrieNode.cs b/src/Nethermind/Nethermind.Trie/TrieNode.cs index b207653aedf..02d8e373dc2 100644 --- a/src/Nethermind/Nethermind.Trie/TrieNode.cs +++ b/src/Nethermind/Nethermind.Trie/TrieNode.cs @@ -7,6 +7,7 @@ using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; +using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Buffers; using Nethermind.Core.Crypto; @@ -976,6 +977,72 @@ public void CallRecursively( action(this, storageAddress, currentPath); } + public async ValueTask CallRecursivelyAsync( + Func action, + Hash256? storageAddress, + TreePath currentPath, + ITrieNodeResolver resolver, + bool skipPersisted, + ILogger logger, + int maxPathLength = Int32.MaxValue, + bool resolveStorageRoot = true) + { + if (skipPersisted && IsPersisted) + { + if (logger.IsTrace) logger.Trace($"Skipping {this} - already persisted"); + return; + } + + if (currentPath.Length >= maxPathLength) + { + await action(this, storageAddress, currentPath); + return; + } + + if (!IsLeaf) + { + if (_data is not null) + { + for (int i = 0; i < _data.Length; i++) + { + object o = _data[i]; + if (o is TrieNode child) + { + if (logger.IsTrace) logger.Trace($"Persist recursively on child {i} {child} of {this}"); + int previousLength = AppendChildPath(ref currentPath, i); + await child.CallRecursivelyAsync(action, storageAddress, currentPath, resolver, skipPersisted, logger, maxPathLength, resolveStorageRoot); + currentPath.TruncateMut(previousLength); + } + } + } + } + else + { + TrieNode? storageRoot = _storageRoot; + if (storageRoot is not null || (resolveStorageRoot && TryResolveStorageRoot(resolver, ref currentPath, out storageRoot))) + { + if (logger.IsTrace) logger.Trace($"Persist recursively on storage root {_storageRoot} of {this}"); + Hash256 storagePathAddr; + using (currentPath.ScopedAppend(Key)) + { + if (currentPath.Length != 64) throw new Exception("unexpected storage path length. Total nibble count should add up to 64."); + storagePathAddr = currentPath.Path.ToCommitment(); + } + + TreePath emptyPath = TreePath.Empty; + await storageRoot!.CallRecursivelyAsync( + action, + storagePathAddr, + emptyPath, + resolver.GetStorageTrieNodeResolver(storagePathAddr), + skipPersisted, + logger); + } + } + + await action(this, storageAddress, currentPath); + } + /// /// Imagine a branch like this: /// B From 323c2ffcbebf44f63215f5806de6cab10457e5e7 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Dec 2024 11:00:40 +0000 Subject: [PATCH 5/8] Revert "TrieStore to Channels" This reverts commit 1cfaf161f45c80d56d7b4f564fd1a340a6564cb7. --- .../Nethermind.Trie/Pruning/TrieStore.cs | 47 ++++++------- src/Nethermind/Nethermind.Trie/TrieNode.cs | 67 ------------------- 2 files changed, 21 insertions(+), 93 deletions(-) diff --git a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs index c9ad1136c06..993126ab29d 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs @@ -3,12 +3,12 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Collections; @@ -790,31 +790,26 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path) // However, anything that we are trying to persist here should still be in dirty cache. // So parallel read should go there first instead of to the database for these dataset, // so it should be fine for these to be non atomic. - Channel disposeQueue = Channel.CreateBounded(4); - try + using BlockingCollection disposeQueue = new BlockingCollection(4); + + for (int index = 0; index < _disposeTasks.Length; index++) { - for (int index = 0; index < _disposeTasks.Length; index++) + _disposeTasks[index] = Task.Run(() => { - _disposeTasks[index] = Task.Run(async () => + while (disposeQueue.TryTake(out INodeStorage.WriteBatch disposable, Timeout.Infinite)) { - await foreach (var disposable in disposeQueue.Reader.ReadAllAsync()) - { - disposable.Dispose(); - } - }); - } + disposable.Dispose(); + } + }); + } - using ArrayPoolList persistNodeStartingFromTasks = parallelStartNodes.Select( - entry => Task.Run(() => PersistNodeStartingFrom(entry.trieNode, entry.address2, entry.path, persistedNodeRecorder, writeFlags, disposeQueue))) - .ToPooledList(parallelStartNodes.Count); + using ArrayPoolList persistNodeStartingFromTasks = parallelStartNodes.Select( + entry => Task.Run(() => PersistNodeStartingFrom(entry.trieNode, entry.address2, entry.path, persistedNodeRecorder, writeFlags, disposeQueue))) + .ToPooledList(parallelStartNodes.Count); - Task.WaitAll(persistNodeStartingFromTasks.AsSpan()); - } - finally - { - disposeQueue.Writer.Complete(); - } + Task.WaitAll(persistNodeStartingFromTasks.AsSpan()); + disposeQueue.CompleteAdding(); Task.WaitAll(_disposeTasks); // Dispose top level last in case something goes wrong, at least the root won't be stored @@ -829,14 +824,14 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path) LastPersistedBlockNumber = commitSet.BlockNumber; } - private async Task PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path, + private void PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path, Action? persistedNodeRecorder, - WriteFlags writeFlags, Channel disposeQueue) + WriteFlags writeFlags, BlockingCollection disposeQueue) { long persistedNodeCount = 0; INodeStorage.WriteBatch writeBatch = _nodeStorage.StartWriteBatch(); - async ValueTask DoPersist(TrieNode node, Hash256? address3, TreePath path2) + void DoPersist(TrieNode node, Hash256? address3, TreePath path2) { persistedNodeRecorder?.Invoke(path2, address3, node); PersistNode(address3, path2, node, writeBatch, writeFlags); @@ -844,13 +839,13 @@ async ValueTask DoPersist(TrieNode node, Hash256? address3, TreePath path2) persistedNodeCount++; if (persistedNodeCount % 512 == 0) { - await disposeQueue.Writer.WriteAsync(writeBatch); + disposeQueue.Add(writeBatch); writeBatch = _nodeStorage.StartWriteBatch(); } } - await tn.CallRecursivelyAsync(DoPersist, address2, path, GetTrieStore(address2), true, _logger); - await disposeQueue.Writer.WriteAsync(writeBatch); + tn.CallRecursively(DoPersist, address2, ref path, GetTrieStore(address2), true, _logger); + disposeQueue.Add(writeBatch); } private void PersistNode(Hash256? address, in TreePath path, TrieNode currentNode, INodeStorage.WriteBatch writeBatch, WriteFlags writeFlags = WriteFlags.None) diff --git a/src/Nethermind/Nethermind.Trie/TrieNode.cs b/src/Nethermind/Nethermind.Trie/TrieNode.cs index 02d8e373dc2..b207653aedf 100644 --- a/src/Nethermind/Nethermind.Trie/TrieNode.cs +++ b/src/Nethermind/Nethermind.Trie/TrieNode.cs @@ -7,7 +7,6 @@ using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; -using System.Threading.Tasks; using Nethermind.Core; using Nethermind.Core.Buffers; using Nethermind.Core.Crypto; @@ -977,72 +976,6 @@ public void CallRecursively( action(this, storageAddress, currentPath); } - public async ValueTask CallRecursivelyAsync( - Func action, - Hash256? storageAddress, - TreePath currentPath, - ITrieNodeResolver resolver, - bool skipPersisted, - ILogger logger, - int maxPathLength = Int32.MaxValue, - bool resolveStorageRoot = true) - { - if (skipPersisted && IsPersisted) - { - if (logger.IsTrace) logger.Trace($"Skipping {this} - already persisted"); - return; - } - - if (currentPath.Length >= maxPathLength) - { - await action(this, storageAddress, currentPath); - return; - } - - if (!IsLeaf) - { - if (_data is not null) - { - for (int i = 0; i < _data.Length; i++) - { - object o = _data[i]; - if (o is TrieNode child) - { - if (logger.IsTrace) logger.Trace($"Persist recursively on child {i} {child} of {this}"); - int previousLength = AppendChildPath(ref currentPath, i); - await child.CallRecursivelyAsync(action, storageAddress, currentPath, resolver, skipPersisted, logger, maxPathLength, resolveStorageRoot); - currentPath.TruncateMut(previousLength); - } - } - } - } - else - { - TrieNode? storageRoot = _storageRoot; - if (storageRoot is not null || (resolveStorageRoot && TryResolveStorageRoot(resolver, ref currentPath, out storageRoot))) - { - if (logger.IsTrace) logger.Trace($"Persist recursively on storage root {_storageRoot} of {this}"); - Hash256 storagePathAddr; - using (currentPath.ScopedAppend(Key)) - { - if (currentPath.Length != 64) throw new Exception("unexpected storage path length. Total nibble count should add up to 64."); - storagePathAddr = currentPath.Path.ToCommitment(); - } - - TreePath emptyPath = TreePath.Empty; - await storageRoot!.CallRecursivelyAsync( - action, - storagePathAddr, - emptyPath, - resolver.GetStorageTrieNodeResolver(storagePathAddr), - skipPersisted, - logger); - } - } - - await action(this, storageAddress, currentPath); - } - /// /// Imagine a branch like this: /// B From dd22aa073047914abf3cc7b5774999f0d0d0fbff Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Dec 2024 11:00:46 +0000 Subject: [PATCH 6/8] Revert "SyncPeerPool to Channels" This reverts commit 724a067c67de5684249e8fc94741f41ad13ff4fa. --- .../Nethermind.Synchronization/Peers/SyncPeerPool.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs b/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs index 119c37b5c27..263617effbe 100644 --- a/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs +++ b/src/Nethermind/Nethermind.Synchronization/Peers/SyncPeerPool.cs @@ -7,7 +7,6 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Blockchain.Find; @@ -38,7 +37,7 @@ public class SyncPeerPool : ISyncPeerPool, IPeerDifficultyRefreshPool private readonly IBlockTree _blockTree; private readonly ILogger _logger; - private readonly Channel _peerRefreshQueue = Channel.CreateUnbounded(); + private readonly BlockingCollection _peerRefreshQueue = new(); private readonly ConcurrentDictionary _peers = new(); @@ -219,7 +218,7 @@ internal IEnumerable ReplaceableAllocations public void RefreshTotalDifficulty(ISyncPeer syncPeer, Hash256 blockHash) { RefreshTotalDiffTask task = new(blockHash, syncPeer); - _peerRefreshQueue.Writer.TryWrite(task); + _peerRefreshQueue.Add(task); } public void AddPeer(ISyncPeer syncPeer) @@ -258,7 +257,7 @@ public void AddPeer(ISyncPeer syncPeer) { if (_logger.IsDebug) _logger.Debug($"Adding {syncPeer.Node:c} to refresh queue"); if (NetworkDiagTracer.IsEnabled) NetworkDiagTracer.ReportInterestingEvent(syncPeer.Node.Address, "adding node to refresh queue"); - _peerRefreshQueue.Writer.TryWrite(new RefreshTotalDiffTask(syncPeer)); + _peerRefreshQueue.Add(new RefreshTotalDiffTask(syncPeer)); } } @@ -355,6 +354,7 @@ public async Task Allocate( } } + private bool TryAllocateOnce(IPeerAllocationStrategy peerAllocationStrategy, AllocationContexts allocationContexts, SyncPeerAllocation allocation) { lock (_isAllocatedChecks) @@ -392,7 +392,7 @@ public void Free(SyncPeerAllocation syncPeerAllocation) private async Task RunRefreshPeerLoop() { - await foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.Reader.ReadAllAsync(_refreshLoopCancellation.Token)) + foreach (RefreshTotalDiffTask refreshTask in _peerRefreshQueue.GetConsumingEnumerable(_refreshLoopCancellation.Token)) { ISyncPeer syncPeer = refreshTask.SyncPeer; if (_logger.IsTrace) _logger.Trace($"Refreshing info for {syncPeer}."); @@ -713,7 +713,7 @@ public async ValueTask DisposeAsync() await (_refreshLoopTask ?? Task.CompletedTask); Parallel.ForEach(_peers, p => { p.Value.SyncPeer.Disconnect(DisconnectReason.AppClosing, "App Close"); }); - _peerRefreshQueue.Writer.TryComplete(); + _peerRefreshQueue.Dispose(); _refreshLoopCancellation.Dispose(); _refreshLoopTask?.Dispose(); _signals.Dispose(); From 69a3590bb38fb33f7ed1948391b220b21fe10175 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Dec 2024 11:00:54 +0000 Subject: [PATCH 7/8] Revert "CliqueBlockProducer to Channel" This reverts commit 8064c50843e7c45b62eecacf77452b4784b99dd0. --- .../CliqueBlockProducer.cs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs b/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs index 0122714fc10..53b97a680b4 100644 --- a/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs +++ b/src/Nethermind/Nethermind.Consensus.Clique/CliqueBlockProducer.cs @@ -7,7 +7,6 @@ using System.Linq; using System.Text; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using System.Timers; using Nethermind.Blockchain; @@ -68,7 +67,8 @@ public CliqueBlockProducerRunner( _timer.Start(); } - private readonly Channel _signalsQueue = Channel.CreateUnbounded(); + private readonly BlockingCollection _signalsQueue = + new(new ConcurrentQueue()); private Block? _scheduledBlock; @@ -91,7 +91,7 @@ public void UncastVote(Address signer) public void ProduceOnTopOf(Hash256 hash) { - _signalsQueue.Writer.TryWrite(_blockTree.FindBlock(hash, BlockTreeLookupOptions.None)); + _signalsQueue.Add(_blockTree.FindBlock(hash, BlockTreeLookupOptions.None)); } public IReadOnlyDictionary GetProposals() => _blockProducer.Proposals.ToDictionary(); @@ -111,7 +111,7 @@ private void TimerOnElapsed(object sender, ElapsedEventArgs e) { if (_blockTree.Head.Timestamp + _config.BlockPeriod < _timestamper.UnixTime.Seconds) { - _signalsQueue.Writer.TryWrite(_blockTree.FindBlock(_blockTree.Head.Hash, BlockTreeLookupOptions.None)); + _signalsQueue.Add(_blockTree.FindBlock(_blockTree.Head.Hash, BlockTreeLookupOptions.None)); } _timer.Enabled = true; @@ -216,17 +216,17 @@ private Task RunConsumeSignal() private void BlockTreeOnNewHeadBlock(object? sender, BlockEventArgs e) { - _signalsQueue.Writer.TryWrite(e.Block); + _signalsQueue.Add(e.Block); } private async Task ConsumeSignal() { _lastProducedBlock = DateTime.UtcNow; - await foreach (Block signal in _signalsQueue.Reader.ReadAllAsync(_cancellationTokenSource.Token)) + foreach (Block signal in _signalsQueue.GetConsumingEnumerable(_cancellationTokenSource.Token)) { // TODO: Maybe use IBlockProducer specific to clique? Block parentBlock = signal; - while (_signalsQueue.Reader.TryRead(out Block? nextSignal)) + while (_signalsQueue.TryTake(out Block? nextSignal)) { if (parentBlock.Number <= nextSignal.Number) { @@ -258,7 +258,6 @@ public async Task StopAsync() { _blockTree.NewHeadBlock -= BlockTreeOnNewHeadBlock; _cancellationTokenSource?.Cancel(); - _signalsQueue.Writer.TryComplete(); await (_producerTask ?? Task.CompletedTask); } @@ -274,12 +273,11 @@ bool IBlockProducerRunner.IsProducingBlocks(ulong? maxProducingInterval) public event EventHandler? BlockProduced; + public void Dispose() { _cancellationTokenSource?.Dispose(); _timer?.Dispose(); - _signalsQueue.Writer.TryComplete(); - BlockProduced = null; } } From c63fb6f8beae5dd2c4cd3e2cd6d8ac96690c2aad Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 1 Dec 2024 11:26:55 +0000 Subject: [PATCH 8/8] Boost new payload also --- .../Nethermind.Merge.Plugin/Handlers/NewPayloadHandler.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Handlers/NewPayloadHandler.cs b/src/Nethermind/Nethermind.Merge.Plugin/Handlers/NewPayloadHandler.cs index 49651d8ae49..e0202c101b3 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Handlers/NewPayloadHandler.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Handlers/NewPayloadHandler.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Blockchain.Synchronization; @@ -12,6 +13,7 @@ using Nethermind.Core; using Nethermind.Core.Caching; using Nethermind.Core.Crypto; +using Nethermind.Core.Threading; using Nethermind.Crypto; using Nethermind.Int256; using Nethermind.JsonRpc; @@ -202,6 +204,7 @@ public async Task> HandleAsync(ExecutionPayload r // Otherwise, we can just process this block and we don't need to do BeaconSync anymore. _mergeSyncController.StopSyncing(); + using var handle = Thread.CurrentThread.BoostPriority(); // Try to execute block (ValidationResult result, string? message) = await ValidateBlockAndProcess(block, parentHeader, processingOptions);