diff --git a/src/Nethermind/Nethermind.State/Snap/AccountsToRefreshRequest.cs b/src/Nethermind/Nethermind.State/Snap/AccountsToRefreshRequest.cs index 2d752ed573e..ff1882853d7 100644 --- a/src/Nethermind/Nethermind.State/Snap/AccountsToRefreshRequest.cs +++ b/src/Nethermind/Nethermind.State/Snap/AccountsToRefreshRequest.cs @@ -31,5 +31,6 @@ public class AccountWithStorageStartingHash { public PathWithAccount PathAndAccount { get; set; } public ValueHash256 StorageStartingHash { get; set; } + public ValueHash256 StorageHashLimit { get; set; } } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs index 216f970cf18..811b6f94baf 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs @@ -161,10 +161,139 @@ public void Will_mark_progress_and_flush_when_finished() memDb[ProgressTracker.ACC_PROGRESS_KEY].Should().BeEquivalentTo(Keccak.MaxValue.BytesToArray()); } + [TestCase("0x0000000000000000000000000000000000000000000000000000000000000000", "0x2000000000000000000000000000000000000000000000000000000000000000", null, "0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")] + [TestCase("0x2000000000000000000000000000000000000000000000000000000000000000", "0x4000000000000000000000000000000000000000000000000000000000000000", "0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "0x67ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")] + [TestCase("0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "0xbfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", null, "0xdfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")] + public void Should_partition_storage_request_if_last_processed_less_than_threshold(string start, string lastProcessed, string? limit, string expectedSplit) + { + using ProgressTracker progressTracker = CreateProgressTracker(); + + var startHash = new ValueHash256(start); + var lastProcessedHash = new ValueHash256(lastProcessed); + ValueHash256? limitHash = limit is null ? (ValueHash256?)null : new ValueHash256(limit); + + progressTracker.EnqueueStorageRange(TestItem.Tree.AccountsWithPaths[0], startHash, lastProcessedHash, limitHash); + + //ignore account range + bool isFinished = progressTracker.IsFinished(out _); + + //expecting 2 batches + isFinished = progressTracker.IsFinished(out SnapSyncBatch? batch1); + isFinished.Should().BeFalse(); + batch1.Should().NotBeNull(); + + isFinished = progressTracker.IsFinished(out SnapSyncBatch? batch2); + isFinished.Should().BeFalse(); + batch2.Should().NotBeNull(); + + batch2?.StorageRangeRequest?.StartingHash.Should().Be(batch1?.StorageRangeRequest?.LimitHash); + batch1?.StorageRangeRequest?.StartingHash.Should().Be(lastProcessedHash); + batch2?.StorageRangeRequest?.LimitHash.Should().Be(limitHash ?? Keccak.MaxValue); + + batch1?.StorageRangeRequest?.LimitHash.Should().Be(new ValueHash256(expectedSplit)); + } + + + [TestCase("0x0000000000000000000000000000000000000000000000000000000000000000", "0xb100000000000000000000000000000000000000000000000000000000000000", null)] + [TestCase("0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "0xdfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", null)] + public void Should_not_partition_storage_request_if_last_processed_more_than_threshold(string start, string lastProcessed, string? limit) + { + using ProgressTracker progressTracker = CreateProgressTracker(); + + var startHash = new ValueHash256(start); + var lastProcessedHash = new ValueHash256(lastProcessed); + ValueHash256? limitHash = limit is null ? (ValueHash256?)null : new ValueHash256(limit); + + progressTracker.EnqueueStorageRange(TestItem.Tree.AccountsWithPaths[0], startHash, lastProcessedHash, limitHash); + + //ignore account range + bool isFinished = progressTracker.IsFinished(out _); + + //expecting 1 batch + isFinished = progressTracker.IsFinished(out SnapSyncBatch? batch1); + isFinished.Should().BeFalse(); + batch1.Should().NotBeNull(); + + batch1?.StorageRangeRequest?.StartingHash.Should().Be(lastProcessedHash); + batch1?.StorageRangeRequest?.LimitHash.Should().Be(limitHash ?? Keccak.MaxValue); + } + + [Test] + public void Will_process_with_storage_range_request_locks() + { + using ProgressTracker progressTracker = CreateProgressTracker(); + var accountPath = TestItem.Tree.AccountAddress0; + + int threadNumber = 4; + var tasks = new Task[threadNumber]; + CounterWrapper testValue = new(0); + for (int i = 0; i < threadNumber; i++) + { + tasks[i] = Task.Run(() => EnqueueRange(progressTracker, testValue, true)); + } + + Task.WaitAll(tasks); + + var rangeLock = progressTracker.GetLockObjectForPath(accountPath); + + //all should be removed + rangeLock.Should().BeOfType(); + testValue.Counter.Should().Be(threadNumber * 20); + + //same test but don't remove lock structs at the end + testValue = new(0); + for (int i = 0; i < threadNumber; i++) + { + tasks[i] = Task.Run(() => EnqueueRange(progressTracker, testValue, false)); + } + + Task.WaitAll(tasks); + + rangeLock = progressTracker.GetLockObjectForPath(accountPath); + rangeLock.Should().BeOfType(); + ((ProgressTracker.StorageRangeLock)rangeLock).Counter.Should().Be(0); + testValue.Counter.Should().Be(threadNumber * 20); + + + void EnqueueRange(ProgressTracker pt, CounterWrapper checkValue, bool shouldRemove) + { + ProgressTracker.IStorageRangeLock? rangeLock = null; + bool canRemove = false; + int loopCount = 20; + + //assume AddStorageRange does split + pt?.IncrementStorageRangeLock(accountPath, 1); + + for (int i = 0; i < loopCount; i++) + { + try + { + rangeLock = pt?.GetLockObjectForPath(accountPath); + + rangeLock?.ExecuteSafe(() => checkValue.Counter++); + + canRemove = shouldRemove && i == loopCount - 1; + + if (i < loopCount - 1) //no more ranges on last iteration + pt?.IncrementStorageRangeLockIfExists(accountPath, 1); + } + finally + { + rangeLock?.Decrement(accountPath, canRemove); + } + } + } + + } + private ProgressTracker CreateProgressTracker(int accountRangePartition = 1) { BlockTree blockTree = Build.A.BlockTree().WithBlocks(Build.A.Block.WithStateRoot(Keccak.EmptyTreeHash).TestObject).TestObject; SyncConfig syncConfig = new SyncConfig() { SnapSyncAccountRangePartitionCount = accountRangePartition }; return new(new MemDb(), syncConfig, new StateSyncPivot(blockTree, syncConfig, LimboLogs.Instance), LimboLogs.Instance); } + private class CounterWrapper(int c) + { + public int Counter = c; + }; } diff --git a/src/Nethermind/Nethermind.Synchronization/SnapSync/ProgressTracker.cs b/src/Nethermind/Nethermind.Synchronization/SnapSync/ProgressTracker.cs index f4dafca343a..181980cb395 100644 --- a/src/Nethermind/Nethermind.Synchronization/SnapSync/ProgressTracker.cs +++ b/src/Nethermind/Nethermind.Synchronization/SnapSync/ProgressTracker.cs @@ -15,6 +15,7 @@ using Nethermind.Core.Collections; using Nethermind.Core.Crypto; using Nethermind.Db; +using Nethermind.Int256; using Nethermind.Logging; using Nethermind.State.Snap; @@ -28,6 +29,7 @@ public class ProgressTracker : IDisposable public const int HIGH_STORAGE_QUEUE_SIZE = STORAGE_BATCH_SIZE * 100; private const int CODES_BATCH_SIZE = 1_000; public const int HIGH_CODES_QUEUE_SIZE = CODES_BATCH_SIZE * 5; + private const uint StorageRangeSplitFactor = 2; internal static readonly byte[] ACC_PROGRESS_KEY = Encoding.ASCII.GetBytes("AccountProgressKey"); // This does not need to be a lot as it spawn other requests. In fact 8 is probably too much. It is severely @@ -57,6 +59,8 @@ public class ProgressTracker : IDisposable private ConcurrentQueue CodesToRetrieve { get; set; } = new(); private ConcurrentQueue AccountsToRefresh { get; set; } = new(); + private readonly ConcurrentDictionary _storageRangeLocks = new(); + private readonly StorageRangeLockPassThrough _emptyStorageLock = new(); private readonly FastSync.StateSyncPivot _pivot; @@ -311,10 +315,10 @@ public void EnqueueAccountStorage(PathWithAccount pwa) StoragesToRetrieve.Enqueue(pwa); } - public void EnqueueAccountRefresh(PathWithAccount pathWithAccount, in ValueHash256? startingHash) + public void EnqueueAccountRefresh(PathWithAccount pathWithAccount, in ValueHash256? startingHash, in ValueHash256? hashLimit) { _pivot.UpdatedStorages.Add(pathWithAccount.Path); - AccountsToRefresh.Enqueue(new AccountWithStorageStartingHash() { PathAndAccount = pathWithAccount, StorageStartingHash = startingHash.GetValueOrDefault() }); + AccountsToRefresh.Enqueue(new AccountWithStorageStartingHash() { PathAndAccount = pathWithAccount, StorageStartingHash = startingHash.GetValueOrDefault(), StorageHashLimit = hashLimit ?? Keccak.MaxValue }); } public void ReportFullStorageRequestFinished(IEnumerable storages = default) @@ -338,6 +342,57 @@ public void EnqueueStorageRange(StorageRange storageRange) } } + public void EnqueueStorageRange(PathWithAccount account, ValueHash256? startingHash, ValueHash256 lastProcessedHash, ValueHash256? limitHash) + { + limitHash ??= Keccak.MaxValue; + + if (lastProcessedHash > limitHash) + return; + + UInt256 limit = new UInt256(limitHash.Value.Bytes, true); + UInt256 lastProcessed = new UInt256(lastProcessedHash.Bytes, true); + UInt256 start = startingHash.HasValue ? new UInt256(startingHash.Value.Bytes, true) : UInt256.Zero; + + var fullRange = limit - start; + + if (lastProcessed < fullRange / StorageRangeSplitFactor + start) + { + var halfOfLeft = (limit - lastProcessed) / 2 + lastProcessed; + var halfOfLeftHash = new ValueHash256(halfOfLeft); + + IncrementStorageRangeLock(account.Path, 2); + + NextSlotRange.Enqueue(new StorageRange() + { + Accounts = new ArrayPoolList(1) { account }, + StartingHash = lastProcessedHash, + LimitHash = halfOfLeftHash + }); + + NextSlotRange.Enqueue(new StorageRange() + { + Accounts = new ArrayPoolList(1) { account }, + StartingHash = halfOfLeftHash, + LimitHash = limitHash + }); + + if (_logger.IsTrace) + _logger.Trace($"EnqueueStorageRange account {account.Path} start hash: {startingHash} | last processed: {lastProcessedHash} | limit: {limitHash} | split {halfOfLeftHash}"); + + return; + } + + //default - no split + IncrementStorageRangeLockIfExists(account.Path); //if this storage trie was split before, need to continue execution with sync/lock + var storageRange = new StorageRange() + { + Accounts = new ArrayPoolList(1) { account }, + StartingHash = lastProcessedHash, + LimitHash = limitHash + }; + NextSlotRange.Enqueue(storageRange); + } + public void ReportStorageRangeRequestFinished(StorageRange storageRange = null) { EnqueueStorageRange(storageRange); @@ -367,14 +422,14 @@ public void UpdateAccountRangePartitionProgress(in ValueHash256 hashLimit, in Va public bool IsSnapGetRangesFinished() { return AccountRangeReadyForRequest.IsEmpty - && StoragesToRetrieve.IsEmpty - && NextSlotRange.IsEmpty - && CodesToRetrieve.IsEmpty - && AccountsToRefresh.IsEmpty - && _activeAccountRequests == 0 - && _activeStorageRequests == 0 - && _activeCodeRequests == 0 - && _activeAccRefreshRequests == 0; + && StoragesToRetrieve.IsEmpty + && NextSlotRange.IsEmpty + && CodesToRetrieve.IsEmpty + && AccountsToRefresh.IsEmpty + && _activeAccountRequests == 0 + && _activeStorageRequests == 0 + && _activeCodeRequests == 0 + && _activeAccRefreshRequests == 0; } private void GetSyncProgress() @@ -455,6 +510,22 @@ private bool TryDequeNextSlotRange(out StorageRange item) return true; } + public IStorageRangeLock GetLockObjectForPath(ValueHash256 accountPath) + { + return _storageRangeLocks.GetValueOrDefault(accountPath, _emptyStorageLock); + } + + public void IncrementStorageRangeLock(ValueHash256 accountPath, uint number = 1) + { + _storageRangeLocks.AddOrUpdate(accountPath, k => new StorageRangeLock(number, _storageRangeLocks), (k, old) => old.Increment(number)); + } + + public void IncrementStorageRangeLockIfExists(ValueHash256 accountPath, uint number = 1) + { + if (!_storageRangeLocks.TryGetValue(accountPath, out IStorageRangeLock lockInfo)) return; + lockInfo.Increment(number); + } + // A partition of the top level account range starting from `AccountPathStart` to `AccountPathLimit` (exclusive). private class AccountRangePartition { @@ -471,5 +542,55 @@ public void Dispose() range?.Dispose(); } } + + public interface IStorageRangeLock + { + IStorageRangeLock Increment(uint value = 1); + void Decrement(ValueHash256 key, bool removeFromOwner); + void ExecuteSafe(Action action); + } + + public class StorageRangeLock(uint counter, ConcurrentDictionary owner) + : IStorageRangeLock + { + private readonly object _lock = new(); + + public IStorageRangeLock Increment(uint value = 1) + { + lock (_lock) + { + Counter += value; + } + return this; + } + + public void Decrement(ValueHash256 key, bool removeFromOwner) + { + lock (_lock) + { + if (--Counter == 0 && removeFromOwner) + owner.TryRemove(key, out _); + } + } + + public void ExecuteSafe(Action action) + { + lock (_lock) + { + action(); + } + } + + public uint Counter { get; private set; } = counter; + } + + public class StorageRangeLockPassThrough : IStorageRangeLock + { + public IStorageRangeLock Increment(uint value = 1) => this; + + public void Decrement(ValueHash256 key, bool removeFromOwner) { } + + public void ExecuteSafe(Action action) => action(); + } } } diff --git a/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProvider.cs b/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProvider.cs index f5cf339b388..95b822ba648 100644 --- a/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProvider.cs +++ b/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProvider.cs @@ -170,7 +170,7 @@ public AddRangeResult AddStorageRange(StorageRange request, SlotsAndProofs respo } PathWithAccount account = request.Accounts[i]; - result = AddStorageRange(request.BlockNumber.Value, account, account.Account.StorageRoot, request.StartingHash, responses[i], proofs); + result = AddStorageRange(request.BlockNumber.Value, account, account.Account.StorageRoot, request.StartingHash, responses[i], proofs, request.LimitHash); slotCount += responses[i].Count; } @@ -194,44 +194,42 @@ public AddRangeResult AddStorageRange(StorageRange request, SlotsAndProofs respo return result; } - public AddRangeResult AddStorageRange(long blockNumber, PathWithAccount pathWithAccount, in ValueHash256 expectedRootHash, in ValueHash256? startingHash, IReadOnlyList slots, IReadOnlyList? proofs = null) + public AddRangeResult AddStorageRange(long blockNumber, PathWithAccount pathWithAccount, in ValueHash256 expectedRootHash, in ValueHash256? startingHash, IReadOnlyList slots, IReadOnlyList? proofs = null, ValueHash256? hashLimit = null) { ITrieStore store = _trieStorePool.Get(); StorageTree tree = new(store.GetTrieStore(pathWithAccount.Path.ToCommitment()), _logManager); + bool canRemoveFurtherLock = false; + ProgressTracker.IStorageRangeLock? rangeLock = null; try { - (AddRangeResult result, bool moreChildrenToRight) = SnapProviderHelper.AddStorageRange(tree, blockNumber, startingHash, slots, expectedRootHash, proofs); + (AddRangeResult result, bool moreChildrenToRight, rangeLock) = SnapProviderHelper.AddStorageRange(tree, blockNumber, startingHash, slots, expectedRootHash, hashLimit, pathWithAccount.Path, _progressTracker, proofs); if (result == AddRangeResult.OK) { + canRemoveFurtherLock = (moreChildrenToRight && slots[^1].Path > hashLimit) || !moreChildrenToRight; if (moreChildrenToRight) { - StorageRange range = new() - { - Accounts = new ArrayPoolList(1) { pathWithAccount }, - StartingHash = slots[^1].Path - }; - - _progressTracker.EnqueueStorageRange(range); + _progressTracker.EnqueueStorageRange(pathWithAccount, startingHash, slots[^1].Path, hashLimit); } } else if (result == AddRangeResult.MissingRootHashInProofs) { _logger.Trace($"SNAP - AddStorageRange failed, missing root hash {expectedRootHash} in the proofs, startingHash:{startingHash}"); - _progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash); + _progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash, hashLimit); } else if (result == AddRangeResult.DifferentRootHash) { _logger.Trace($"SNAP - AddStorageRange failed, expected storage root hash:{expectedRootHash} but was {tree.RootHash}, startingHash:{startingHash}"); - _progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash); + _progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash, hashLimit); } return result; } finally { + rangeLock?.Decrement(pathWithAccount.Path, canRemoveFurtherLock); _trieStorePool.Return(store); } } @@ -272,7 +270,8 @@ public void RefreshAccounts(AccountsToRefreshRequest request, IOwnedReadOnlyList StorageRange range = new() { Accounts = new ArrayPoolList(1) { requestedPath.PathAndAccount }, - StartingHash = requestedPath.StorageStartingHash + StartingHash = requestedPath.StorageStartingHash, + LimitHash = requestedPath.StorageHashLimit }; _progressTracker.EnqueueStorageRange(range); @@ -305,7 +304,7 @@ public void RefreshAccounts(AccountsToRefreshRequest request, IOwnedReadOnlyList private void RetryAccountRefresh(AccountWithStorageStartingHash requestedPath) { - _progressTracker.EnqueueAccountRefresh(requestedPath.PathAndAccount, requestedPath.StorageStartingHash); + _progressTracker.EnqueueAccountRefresh(requestedPath.PathAndAccount, requestedPath.StorageStartingHash, requestedPath.StorageHashLimit); } public void AddCodes(IReadOnlyList requestedHashes, IOwnedReadOnlyList codes) diff --git a/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProviderHelper.cs b/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProviderHelper.cs index 52dff9f353b..09c0266fbb6 100644 --- a/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProviderHelper.cs +++ b/src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProviderHelper.cs @@ -108,12 +108,15 @@ public static (AddRangeResult result, bool moreChildrenToRight, List slots, in ValueHash256 expectedRootHash, + in ValueHash256? limitHash, + ValueHash256 accountHash, + ProgressTracker progressTracker, IReadOnlyList? proofs = null ) { @@ -122,11 +125,11 @@ public static (AddRangeResult result, bool moreChildrenToRight) AddStorageRange( ValueHash256 lastHash = slots[^1].Path; (AddRangeResult result, List<(TrieNode, TreePath)> sortedBoundaryList, bool moreChildrenToRight) = FillBoundaryTree( - tree, startingHash, lastHash, ValueKeccak.MaxValue, expectedRootHash, proofs); + tree, startingHash, lastHash, limitHash ?? Keccak.MaxValue, expectedRootHash, proofs); if (result != AddRangeResult.OK) { - return (result, true); + return (result, true, null); } for (var index = 0; index < slots.Count; index++) @@ -140,14 +143,17 @@ public static (AddRangeResult result, bool moreChildrenToRight) AddStorageRange( if (tree.RootHash != expectedRootHash) { - return (AddRangeResult.DifferentRootHash, true); + return (AddRangeResult.DifferentRootHash, true, null); } - StitchBoundaries(sortedBoundaryList, tree.TrieStore); - - tree.Commit(writeFlags: WriteFlags.DisableWAL); + var storageLock = progressTracker.GetLockObjectForPath(accountHash); + storageLock.ExecuteSafe(() => + { + StitchBoundaries(sortedBoundaryList, tree.TrieStore); + tree.Commit(writeFlags: WriteFlags.DisableWAL); + }); - return (AddRangeResult.OK, moreChildrenToRight); + return (AddRangeResult.OK, moreChildrenToRight, storageLock); } [SkipLocalsInit] @@ -332,6 +338,17 @@ private static void StitchBoundaries(List<(TrieNode, TreePath)> sortedBoundaryLi node.IsBoundaryProofNode = isBoundaryProofNode; } + + //leaves as a part of boundary are only added if they are outside the processed range, + //therefore they will not be persisted during current Commit. Still it is possible, they have already + //been persisted when processing a range they belong, so it is needed to do a check here. + //Without it, there is a risk that the whole dependant path (including root) will not be eventually stitched and persisted + //leading to TrieNodeException after sync (as healing may not get to heal the particular storage trie) + if (node.IsLeaf) + { + node.IsPersisted = store.IsPersisted(path, node.Keccak); + node.IsBoundaryProofNode = !node.IsPersisted; + } } } }