From 430611d3bc8f938220f2c28c9e4d7b2d9b814911 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 1 Nov 2024 19:18:41 +0000 Subject: [PATCH] Beacon sync updates tbc (#2818) * Clear rejected sync target so that it would not be processed again * Use in-memory table to stash headers after FCU import has started why: After block imported has started, there is no way to save/stash block headers persistently. The FCU handlers always maintain a positive transaction level and in some instances the current transaction is flushed and re-opened. This patch fixes an exception thrown when a block header has gone missing. * When resuming sync, delete stale headers and state why: Deleting headers saves some persistent space that would get lost otherwise. Deleting the state after resuming prevents from race conditions. * On clean start hibernate sync `deamon` entity before first update from CL details: Only reduces services are running * accept FCU from CL * fetch finalised header after accepting FCY (provides hash only) * Improve text/meaning of some log messages * Revisit error handling for useless peers why: A peer is abandoned from if the error score is too high. This was not properly handled for some fringe case when the error was detected at staging time but fetching via eth/xx was ok. * Clarify `break` meaning by using labelled `break` statements * Fix action how to commit when sync target has been reached why: The sync target block number might precede than latest FCU block number. This happens when the engine API squeezes in some request to execute and import subsequent blocks. This patch fixes and assert thrown when after reaching target the latest FCU block number is higher than the expected target block number. * Update TODO list --- nimbus/sync/beacon/TODO.md | 34 +++++ nimbus/sync/beacon/worker.nim | 31 +++-- nimbus/sync/beacon/worker/blocks_staged.nim | 101 +++++++++------ nimbus/sync/beacon/worker/db.nim | 119 +++++++++++++----- nimbus/sync/beacon/worker/headers_staged.nim | 41 +++--- .../worker/headers_staged/linked_hchain.nim | 2 - nimbus/sync/beacon/worker/helpers.nim | 6 +- nimbus/sync/beacon/worker/start_stop.nim | 22 ++-- nimbus/sync/beacon/worker/update.nim | 46 ++++--- nimbus/sync/beacon/worker_config.nim | 4 +- nimbus/sync/beacon/worker_desc.nim | 35 +++++- nimbus/sync/sync_desc.nim | 1 + nimbus/sync/sync_sched.nim | 23 ++-- 13 files changed, 331 insertions(+), 134 deletions(-) diff --git a/nimbus/sync/beacon/TODO.md b/nimbus/sync/beacon/TODO.md index 4c8326445..600ee7f31 100644 --- a/nimbus/sync/beacon/TODO.md +++ b/nimbus/sync/beacon/TODO.md @@ -1 +1,35 @@ +## General TODO items + * Update/resolve code fragments which are tagged FIXME + +## Open issues + +### 1. Weird behaviour of the RPC/engine API + +See issue [#2816](https://github.com/status-im/nimbus-eth1/issues/2816) + +### 2. Some assert + + Error: unhandled exception: key not found: 0x441a0f..027bc96a [AssertionDefect] + +which happened on several `holesky` tests immediately after loging somehing like + + NTC 2024-10-31 21:37:34.728 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=044d22843cbe baseNumber=2646764 baseHash=21ec11c1deac + +or from another machine with literally the same exception text (but the stack-trace differs) + + NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b + + +### 3. Some assert + +Seen on `holesky`, sometimes the header chain cannot not be joined with its +lower end after completing due to different hashes leading to an assert failure + + Error: unhandled exception: header chains C-D joining hashes do not match L=#2646126 lHash=3bc2beb1b565 C=#2646126 cHash=3bc2beb1b565 D=#2646127 dParent=671c7c6cb904 + +which was preceeded somewhat earlier by log entries + + INF 2024-10-31 18:21:16.464 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=3bc2beb1b565 + [..] + INF 2024-10-31 18:21:25.872 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=671c7c6cb904 diff --git a/nimbus/sync/beacon/worker.nim b/nimbus/sync/beacon/worker.nim index 9832363d3..a4c5aec1a 100644 --- a/nimbus/sync/beacon/worker.nim +++ b/nimbus/sync/beacon/worker.nim @@ -34,12 +34,14 @@ proc bodiesToFetchOk(buddy: BeaconBuddyRef): bool = proc napUnlessSomethingToFetch(buddy: BeaconBuddyRef): Future[bool] {.async.} = ## When idle, save cpu cycles waiting for something to do. - if buddy.ctx.pool.importRunningOk or - not (buddy.headersToFetchOk() or + if buddy.ctx.pool.blockImportOk or # currently importing blocks + buddy.ctx.hibernate or # not activated yet? + not (buddy.headersToFetchOk() or # something on TODO list buddy.bodiesToFetchOk()): await sleepAsync workerIdleWaitInterval return true - return false + else: + return false # ------------------------------------------------------------------------------ # Public start/stop and admin functions @@ -54,9 +56,6 @@ proc setup*(ctx: BeaconCtxRef; info: static[string]): bool = # Debugging stuff, might be an empty template ctx.setupTicker() - - # Enable background daemon - ctx.daemon = true true proc release*(ctx: BeaconCtxRef; info: static[string]) = @@ -70,20 +69,20 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool = let peer = buddy.peer if runsThisManyPeersOnly <= buddy.ctx.pool.nBuddies: - debug info & ": peers limit reached", peer + if not buddy.ctx.hibernate: debug info & ": peers limit reached", peer return false if not buddy.startBuddy(): - debug info & ": failed", peer + if not buddy.ctx.hibernate: debug info & ": failed", peer return false - debug info & ": new peer", peer + if not buddy.ctx.hibernate: debug info & ": new peer", peer true proc stop*(buddy: BeaconBuddyRef; info: static[string]) = ## Clean up this peer - debug info & ": release peer", peer=buddy.peer, - nInvocations=buddy.only.nMultiLoop, + if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer, + ctrl=buddy.ctrl.state, nInvocations=buddy.only.nMultiLoop, lastIdleGap=buddy.only.multiRunIdle.toStr buddy.stopBuddy() @@ -98,8 +97,14 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} = ## as `true` not before there is some activity on the `runPool()`, ## `runSingle()`, or `runMulti()` functions. ## + ## On a fresh start, the flag `ctx.daemon` will not be set `true` before the + ## first usable request from the CL (via RPC) stumbles in. + ## # Check for a possible header layout and body request changes ctx.updateSyncStateLayout info + if ctx.hibernate: + return + ctx.updateBlockRequests info # Execute staged block records. @@ -110,8 +115,8 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} = # place. So there might be some peers active. If they are waiting for # a message reply, this will most probably time out as all processing # power is usurped by the import task here. - ctx.pool.importRunningOk = true - defer: ctx.pool.importRunningOk = false + ctx.pool.blockImportOk = true + defer: ctx.pool.blockImportOk = false # Import from staged queue. while await ctx.blocksStagedImport(info): diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index 1fc8c01bb..79d3abb21 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -18,7 +18,14 @@ import ../worker_desc, ./blocks_staged/bodies, ./update/metrics, - "."/[blocks_unproc, db] + "."/[blocks_unproc, db, helpers] + +# ------------------------------------------------------------------------------ +# Private debugging & logging helpers +# ------------------------------------------------------------------------------ + +formatIt(Hash32): + it.data.short # ------------------------------------------------------------------------------ # Private functions @@ -50,11 +57,16 @@ proc fetchAndCheck( blk.blocks.setLen(offset + ivReq.len) var blockHash = newSeq[Hash32](ivReq.len) for n in 1u ..< ivReq.len: - let header = ctx.dbPeekHeader(ivReq.minPt + n).expect "stashed header" + let header = ctx.dbPeekHeader(ivReq.minPt + n).valueOr: + # There is nothing one can do here + raiseAssert info & " stashed header missing: n=" & $n & + " ivReq=" & $ivReq & " nth=" & (ivReq.minPt + n).bnStr blockHash[n - 1] = header.parentHash blk.blocks[offset + n].header = header - blk.blocks[offset].header = - ctx.dbPeekHeader(ivReq.minPt).expect "stashed header" + blk.blocks[offset].header = ctx.dbPeekHeader(ivReq.minPt).valueOr: + # There is nothing one can do here + raiseAssert info & " stashed header missing: n=0" & + " ivReq=" & $ivReq & " nth=" & ivReq.minPt.bnStr blockHash[ivReq.len - 1] = rlp.encode(blk.blocks[offset + ivReq.len - 1].header).keccak256 @@ -177,11 +189,17 @@ proc blocksStagedCollect*( # Fetch and extend staging record if not await buddy.fetchAndCheck(ivReq, blk, info): + + # Throw away first time block fetch data. Keep other data for a + # partially assembled list. if nBlkBlocks == 0: - if 0 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped: + buddy.only.nBdyRespErrors.inc + + if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or + fetchBodiesReqThresholdCount < buddy.only.nBdyRespErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true - trace info & ": list completely failed", peer, iv, ivReq, + trace info & ": current block list discarded", peer, iv, ivReq, ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors ctx.blocksUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try @@ -238,7 +256,8 @@ proc blocksStagedImport*( let imported = ctx.chain.latestNumber() if qItem.key != imported + 1: trace info & ": there is a gap L vs. staged", - B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr + B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr, + C=ctx.layout.coupler.bnStr doAssert imported < qItem.key return false @@ -253,45 +272,49 @@ proc blocksStagedImport*( B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr var maxImport = iv.maxPt - for n in 0 ..< nBlocks: - let nBn = qItem.data.blocks[n].header.number - ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr: - warn info & ": import block error", iv, B=ctx.chain.baseNumber.bnStr, - L=ctx.chain.latestNumber.bnStr, nBn=nBn.bnStr, `error`=error - # Restore what is left over below - maxImport = ctx.chain.latestNumber() - break - - # Allow pseudo/async thread switch. - await sleepAsync asyncThreadSwitchTimeSlot - if not ctx.daemon: - # Shutdown? - maxImport = ctx.chain.latestNumber() - break - - # Update, so it can be followed nicely - ctx.updateMetrics() - - # Occasionally mark the chain finalized - if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks: - let - nthHash = qItem.data.getNthHash(n) - finHash = if nBn < ctx.layout.final: nthHash else: ctx.layout.finalHash - - doAssert nBn == ctx.chain.latestNumber() - ctx.pool.chain.forkChoice(headHash=nthHash, finalizedHash=finHash).isOkOr: - warn info & ": fork choice error", n, iv, B=ctx.chain.baseNumber.bnStr, - L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr, nthHash, - finHash=(if finHash == nthHash: "nHash" else: "F"), `error`=error + block importLoop: + for n in 0 ..< nBlocks: + let nBn = qItem.data.blocks[n].header.number + ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr: + warn info & ": import block error", n, iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n), `error`=error # Restore what is left over below maxImport = ctx.chain.latestNumber() - break + break importLoop # Allow pseudo/async thread switch. await sleepAsync asyncThreadSwitchTimeSlot if not ctx.daemon: + # Shutdown? maxImport = ctx.chain.latestNumber() - break + break importLoop + + # Update, so it can be followed nicely + ctx.updateMetrics() + + # Occasionally mark the chain finalized + if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks: + let + nthHash = qItem.data.getNthHash(n) + finHash = if nBn < ctx.layout.final: nthHash + else: ctx.layout.finalHash + + doAssert nBn == ctx.chain.latestNumber() + ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr: + warn info & ": fork choice error", n, iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash, + finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error + # Restore what is left over below + maxImport = ctx.chain.latestNumber() + break importLoop + + # Allow pseudo/async thread switch. + await sleepAsync asyncThreadSwitchTimeSlot + if not ctx.daemon: + maxImport = ctx.chain.latestNumber() + break importLoop # Import probably incomplete, so a partial roll back may be needed if maxImport < iv.maxPt: diff --git a/nimbus/sync/beacon/worker/db.nim b/nimbus/sync/beacon/worker/db.nim index 143ca418b..d8ae4c2f0 100644 --- a/nimbus/sync/beacon/worker/db.nim +++ b/nimbus/sync/beacon/worker/db.nim @@ -13,7 +13,7 @@ import pkg/[chronicles, chronos], pkg/eth/[common, rlp], - pkg/stew/[byteutils, interval_set, sorted_set], + pkg/stew/[interval_set, sorted_set], pkg/results, "../../.."/[common, core/chain, db/storage_types], ../worker_desc, @@ -22,13 +22,6 @@ import const LhcStateKey = 1.beaconStateKey -# ------------------------------------------------------------------------------ -# Private debugging & logging helpers -# ------------------------------------------------------------------------------ - -formatIt(Hash32): - it.data.toHex - # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ @@ -42,6 +35,40 @@ proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] = discard err() + +proc deleteStaleHeadersAndState( + ctx: BeaconCtxRef; + upTo: BlockNumber; + info: static[string]; + ) = + ## Delete stale headers and state + let + kvt = ctx.db.ctx.getKvt() + stateNum = ctx.db.getSavedStateBlockNumber() # for persisting + + var bn = upTo + while 0 < bn and kvt.hasKey(beaconHeaderKey(bn).toOpenArray): + discard kvt.del(beaconHeaderKey(bn).toOpenArray) + bn.dec + + # Occasionallly persist the deleted headers. This will succeed if + # this function is called early enough after restart when there is + # no database transaction pending. + if (upTo - bn) mod 8192 == 0: + ctx.db.persistent(stateNum).isOkOr: + debug info & ": cannot persist deleted sync headers", error=($$error) + # So be it, stop here. + return + + # Delete persistent state, there will be no use of it anymore + discard kvt.del(LhcStateKey.toOpenArray) + ctx.db.persistent(stateNum).isOkOr: + debug info & ": cannot persist deleted sync headers", error=($$error) + return + + if bn < upTo: + debug info & ": deleted stale sync headers", iv=BnRange.new(bn+1,upTo) + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -57,30 +84,32 @@ proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = # While executing blocks there are frequent save cycles. Otherwise, an # extra save request might help to pick up an interrupted sync session. - let txLevel = ctx.db.level() - if txLevel == 0: + if ctx.db.level() == 0 and ctx.stash.len == 0: let number = ctx.db.getSavedStateBlockNumber() ctx.db.persistent(number).isOkOr: - debug info & ": failed to save sync state persistently", error=($$error) - return - else: - trace info & ": sync state not saved, tx pending", txLevel - return - - trace info & ": saved sync state persistently" + raiseAssert info & " persistent() failed: " & $$error -proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = - ## Restore chain layout from persistent db +proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]): bool = + ## Restore chain layout from persistent db. It returns `true` if a previous + ## state could be loaded, and `false` if a new state was created. let rc = ctx.fetchSyncStateLayout() latest = ctx.chain.latestNumber() - # See `dbLoadSyncStateAvailable()` for comments + # If there was a manual import after a previous sync, then saved state + # might be outdated. if rc.isOk and + # The base number is the least record of the FCU chains/tree. So the + # finalised entry must not be smaller. ctx.chain.baseNumber() <= rc.value.final and + # If the latest FCU number is not larger than the head, there is nothing + # to do (might also happen after a manual import.) latest < rc.value.head: + + # Assign saved sync state ctx.sst.layout = rc.value + ctx.sst.lastLayout = rc.value # Add interval of unprocessed block range `(L,C]` from `README.md` ctx.blocksUnprocSet(latest+1, ctx.layout.coupler) @@ -93,6 +122,8 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr, F=ctx.layout.final.bnStr, H=ctx.layout.head.bnStr + true + else: let latestHash = ctx.chain.latestHash() @@ -115,9 +146,25 @@ proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = headHash: latestHash, headLocked: false) - trace info & ": new sync state", L="C", C="D", D="F", F="H", H=latest.bnStr - - ctx.sst.lastLayout = ctx.layout + ctx.sst.lastLayout = ctx.layout + + if rc.isOk: + # Some stored headers might have become stale, so delete them. Even + # though it is not critical, stale headers just stay on the database + # forever occupying space without purpose. Also, delete the state record. + # After deleting headers, the state record becomes stale as well. + if rc.value.head <= latest: + # After manual import, the `latest` state might be ahead of the old + # `head` which leaves a gap `(rc.value.head,latest)` of missing headers. + # So the `deleteStaleHeadersAndState()` clean up routine needs to start + # at the `head` and work backwards. + ctx.deleteStaleHeadersAndState(rc.value.head, info) + else: + # Delete stale headers with block numbers starting at to `latest` wile + # working backwards. + ctx.deleteStaleHeadersAndState(latest, info) + + false # ------------------ @@ -139,15 +186,28 @@ proc dbStashHeaders*( ## .. ## let - kvt = ctx.db.ctx.getKvt() + txLevel = ctx.db.level() last = first + revBlobs.len.uint64 - 1 - for n,data in revBlobs: - let key = beaconHeaderKey(last - n.uint64) - kvt.put(key.toOpenArray, data).isOkOr: - raiseAssert info & ": put() failed: " & $$error + if 0 < txLevel: + # Need to cache it because FCU has blocked writing through to disk. + for n,data in revBlobs: + ctx.stash[last - n.uint64] = data + else: + let kvt = ctx.db.ctx.getKvt() + for n,data in revBlobs: + let key = beaconHeaderKey(last - n.uint64) + kvt.put(key.toOpenArray, data).isOkOr: + raiseAssert info & ": put() failed: " & $$error proc dbPeekHeader*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Header] = ## Retrieve some stashed header. + # Try cache first + ctx.stash.withValue(num, val): + try: + return ok(rlp.decode(val[], Header)) + except RlpError: + discard + # Use persistent storage next let key = beaconHeaderKey(num) rc = ctx.db.ctx.getKvt().get(key.toOpenArray) @@ -164,6 +224,9 @@ proc dbPeekParentHash*(ctx: BeaconCtxRef; num: BlockNumber): Opt[Hash32] = proc dbUnstashHeader*(ctx: BeaconCtxRef; bn: BlockNumber) = ## Remove header from temporary DB list + ctx.stash.withValue(bn, val): + ctx.stash.del bn + return discard ctx.db.ctx.getKvt().del(beaconHeaderKey(bn).toOpenArray) # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index 83b850047..5c79169bc 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -43,7 +43,7 @@ proc fetchAndCheck( # While assembling a `LinkedHChainRef`, verify that the `revHeaders` list # was sound, i.e. contiguous, linked, etc. - if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc, info): + if not revHeaders.extendLinkedHChain(buddy, ivReq.maxPt, lhc): return false return true @@ -81,10 +81,18 @@ proc headerStagedUpdateTarget*( let final = rc.value[0].number if final < ctx.chain.baseNumber(): trace info & ": finalised number too low", peer, - B=ctx.chain.baseNumber.bnStr, finalised=rc.value[0].number.bnStr + B=ctx.chain.baseNumber.bnStr, finalised=final.bnStr, + delta=(ctx.chain.baseNumber - final) + ctx.target.reset else: ctx.target.final = final + # Activate running (unless done yet) + if ctx.hibernate: + ctx.hibernate = false + trace info & ": activated syncer", peer, + finalised=final.bnStr, head=ctx.layout.head.bnStr + # Update, so it can be followed nicely ctx.updateMetrics() @@ -155,13 +163,17 @@ proc headersStagedCollect*( # Fetch and extend chain record if not await buddy.fetchAndCheck(ivReq, lhc, info): - # Throw away opportunistic data (or first time header fetch.) Turn back - # unused data. + # Throw away opportunistic data (or first time header fetch.) Keep + # other data for a partially assembled list. if isOpportunistic or nLhcHeaders == 0: - if 0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped: + buddy.only.nHdrRespErrors.inc + + if (0 < buddy.only.nHdrRespErrors and buddy.ctrl.stopped) or + fetchHeadersReqThresholdCount < buddy.only.nHdrRespErrors: # Make sure that this peer does not immediately reconnect buddy.ctrl.zombie = true - trace info & ": completely failed", peer, iv, ivReq, isOpportunistic, + trace info & ": current header list discarded", peer, iv, ivReq, + isOpportunistic, ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nHdrRespErrors ctx.headersUnprocCommit(iv.len, iv) # At this stage allow a task switch so that some other peer might try @@ -197,7 +209,7 @@ proc headersStagedCollect*( raiseAssert info & ": duplicate key on staged queue iv=" & $iv qItem.data = lhc[] - trace info & ": staged headers", peer, + trace info & ": staged header list", peer, topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len, nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state @@ -209,16 +221,15 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = ## chains layout and the persistent tables. The function returns the number ## of records processed and saved. while true: - # Fetch largest block + # Fetch list with largest block numbers let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr: - trace info & ": no staged headers", error=error break # all done let dangling = ctx.layout.dangling iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint64 + 1, qItem.key) if iv.maxPt+1 < dangling: - trace info & ": there is a gap", iv, D=dangling.bnStr, nSaved=result + trace info & ": there is a gap", iv, D=dangling.bnStr, nStashed=result break # there is a gap -- come back later # Overlap must not happen @@ -235,8 +246,8 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = if qItem.data.hash != ctx.layout.danglingParent: # Discard wrong chain and merge back the range into the `unproc` list. ctx.headersUnprocCommit(0,iv) - trace info & ": discarding staged record", - iv, D=dangling.bnStr, lap=result + trace info & ": discarding staged header list", iv, D=dangling.bnStr, + nStashed=result, nDiscarded=qItem.data.revHdrs.len break # Store headers on database @@ -245,10 +256,10 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = ctx.layout.danglingParent = qItem.data.parentHash ctx.dbStoreSyncStateLayout info - result.inc # count records + result += qItem.data.revHdrs.len # count headers - trace info & ": staged header lists saved", - nStaged=ctx.hdr.staged.len, nSaved=result + trace info & ": consecutive headers stashed", + nListsLeft=ctx.hdr.staged.len, nStashed=result if headersStagedQueueLengthLwm < ctx.hdr.staged.len: ctx.poolMode = true diff --git a/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim b/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim index 2edcb5f94..1116abf20 100644 --- a/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim +++ b/nimbus/sync/beacon/worker/headers_staged/linked_hchain.nim @@ -12,7 +12,6 @@ import pkg/eth/[common, p2p, rlp], - pkg/stew/byteutils, ../../../../common, ../../worker_desc @@ -25,7 +24,6 @@ proc extendLinkedHChain*( buddy: BeaconBuddyRef; topNumber: BlockNumber; lhc: ref LinkedHChain; # update in place - info: static[string]; ): bool = ## Returns sort of `lhc[] += rev[]` where `lhc[]` is updated in place. diff --git a/nimbus/sync/beacon/worker/helpers.nim b/nimbus/sync/beacon/worker/helpers.nim index 2ed702866..59b5e53e5 100644 --- a/nimbus/sync/beacon/worker/helpers.nim +++ b/nimbus/sync/beacon/worker/helpers.nim @@ -15,7 +15,11 @@ import pkg/chronos, pkg/eth/common, - pkg/stew/interval_set + pkg/stew/interval_set, + ../../../utils/utils + +export + short func bnStr*(w: BlockNumber): string = "#" & $w diff --git a/nimbus/sync/beacon/worker/start_stop.nim b/nimbus/sync/beacon/worker/start_stop.nim index 16dc878c8..9bda33677 100644 --- a/nimbus/sync/beacon/worker/start_stop.nim +++ b/nimbus/sync/beacon/worker/start_stop.nim @@ -57,16 +57,19 @@ when enableTicker: reorg: ctx.pool.nReorg, nBuddies: ctx.pool.nBuddies) + proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB = ## Update beacon header. This function is intended as a call back function ## for the RPC module. return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} = + # Check whether there is an update running (otherwise take next upate) - if not ctx.target.locked: - # Rpc checks empty header against a zero hash rather than `emptyRoot` - if f != zeroHash32 and - ctx.layout.head < h.number and - ctx.target.consHead.number < h.number: + if not ctx.target.locked and # ignore if currently updating + ctx.target.final == 0 and # ignore if complete already + f != zeroHash32 and # finalised hash is set + ctx.layout.head < h.number and # update is advancing + ctx.target.consHead.number < h.number: # .. ditto + ctx.target.consHead = h ctx.target.final = BlockNumber(0) ctx.target.finalHash = f @@ -101,8 +104,13 @@ proc setupDatabase*(ctx: BeaconCtxRef; info: static[string]) = ctx.headersUnprocInit() ctx.blocksUnprocInit() - # Load initial state from database if there is any - ctx.dbLoadSyncStateLayout info + # Load initial state from database if there is any. If the loader returns + # `true`, then the syncer will resume from previous sync in which case the + # system becomes fully active. Otherwise there is some polling only waiting + # for a new target so there is reduced service (aka `hibernate`.). + ctx.hibernate = not ctx.dbLoadSyncStateLayout info + if ctx.hibernate: + trace info & ": hibernating", latest=ctx.chain.latestNumber.bnStr # Set blocks batch import value for block import if ctx.pool.nBodiesBatch < nFetchBodiesRequest: diff --git a/nimbus/sync/beacon/worker/update.nim b/nimbus/sync/beacon/worker/update.nim index 080509708..2ae6b4fa0 100644 --- a/nimbus/sync/beacon/worker/update.nim +++ b/nimbus/sync/beacon/worker/update.nim @@ -13,12 +13,12 @@ import pkg/[chronicles, chronos], pkg/eth/[common, rlp], - pkg/stew/sorted_set, + pkg/stew/[byteutils, sorted_set], ../../../core/chain, ../worker_desc, ./update/metrics, ./headers_staged/staged_queue, - "."/[blocks_unproc, db, headers_unproc] + "."/[blocks_unproc, db, headers_unproc, helpers] # ------------------------------------------------------------------------------ # Private functions @@ -88,7 +88,7 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) = doAssert ctx.headersStagedQueueIsEmpty() ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1) - trace info & ": updated sync state", C=ctx.layout.coupler.bnStr, + trace info & ": updated sync state/new target", C=ctx.layout.coupler.bnStr, uTop=ctx.headersUnprocTop(), D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr @@ -109,10 +109,15 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = # Verify adjacent chains if ctx.layout.couplerHash != ctx.layout.danglingParent: # FIXME: Oops -- any better idea than to defect? - raiseAssert info & ": hashes do not match" & - " C=" & ctx.layout.coupler.bnStr & " D=" & $ctx.layout.dangling.bnStr - - trace info & ": merging adjacent chains", C=ctx.layout.coupler.bnStr, + raiseAssert info & ": header chains C-D joining hashes do not match" & + " L=" & ctx.chain.latestNumber().bnStr & + " lHash=" & ctx.chain.latestHash.short & + " C=" & ctx.layout.coupler.bnStr & + " cHash=" & ctx.layout.couplerHash.short & + " D=" & $ctx.layout.dangling.bnStr & + " dParent=" & ctx.layout.danglingParent.short + + trace info & ": merging adjacent header chains", C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr # Merge adjacent linked chains @@ -133,6 +138,21 @@ proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = # Update, so it can be followed nicely ctx.updateMetrics() + +proc updateTargetReached(ctx: BeaconCtxRef; info: static[string]) = + # Open up layout for update + ctx.layout.headLocked = false + + # Clean up target bucket and await a new target. + ctx.target.reset + ctx.hibernate = true + + let + latest {.used.} = ctx.chain.latestNumber() + head {.used.} = ctx.layout.head + trace info & ": hibernating, awaiting new sync target", + L=(if head == latest: "H" else: latest.bnStr), H=head.bnStr + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -143,13 +163,11 @@ proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = # Check whether the target has been reached. In that case, unlock the # consensus head `H` from the current layout so that it can be updated # in time. - if ctx.layout.headLocked: - # So we have a session - let latest= ctx.chain.latestNumber() - if ctx.layout.head <= latest: - doAssert ctx.layout.head == latest - ctx.layout.headLocked = false - + if ctx.layout.headLocked and # there is an active session + ctx.layout.head <= ctx.chain.latestNumber(): # and target has been reached + # Note that `latest` might exceed the `head`. This will happen when the + # engine API got some request to execute and import subsequent blocks. + ctx.updateTargetReached info # Check whether there is something to do regarding beacon node change if not ctx.layout.headLocked and # there was an active import request diff --git a/nimbus/sync/beacon/worker_config.nim b/nimbus/sync/beacon/worker_config.nim index cd1aa80f7..f41029e65 100644 --- a/nimbus/sync/beacon/worker_config.nim +++ b/nimbus/sync/beacon/worker_config.nim @@ -92,8 +92,8 @@ const nFetchBodiesRequest* = 128 ## Similar to `nFetchHeadersRequest` - fetchBodiesReqThresholdZombie* = chronos.seconds(2) - fetchBodiesReqThresholdCount* = 3 + fetchBodiesReqThresholdZombie* = chronos.seconds(4) + fetchBodiesReqThresholdCount* = 5 ## Similar to `fetchHeadersReqThreshold*` fetchBodiesReqMinResponsePC* = 10 diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index 8251f3149..ca55ae1ef 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -52,6 +52,16 @@ type ## Block request item sorted by least block number (i.e. from `blocks[0]`.) blocks*: seq[EthBlock] ## List of blocks for import + KvtCache* = Table[BlockNumber,seq[byte]] + ## This cache type is intended for holding block headers that cannot be + ## reliably saved persistently. This is the situation after blocks are + ## imported as the FCU handlers always maintain a positive transaction + ## level and in some instances the current transaction is flushed and + ## re-opened. + ## + ## The number of block headers to hold in memory after block import has + ## started is the distance to the new `canonical execution head`. + # ------------------- SyncStateTarget* = object @@ -133,8 +143,9 @@ type # Blocks import/execution settings for importing with # `nBodiesBatch` blocks in each round (minimum value is # `nFetchBodiesRequest`.) - chain*: ForkedChainRef ## Database - importRunningOk*: bool ## Advisory lock, fetch vs. import + chain*: ForkedChainRef ## Core database, FCU support + stash*: KvtCache ## Temporary header and state table + blockImportOk*: bool ## Don't fetch data while block importing nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault` blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault` @@ -179,10 +190,30 @@ func chain*(ctx: BeaconCtxRef): ForkedChainRef = ## Getter ctx.pool.chain +func stash*(ctx: BeaconCtxRef): var KvtCache = + ## Getter + ctx.pool.stash + func db*(ctx: BeaconCtxRef): CoreDbRef = ## Getter ctx.pool.chain.db +# ----- + +func hibernate*(ctx: BeaconCtxRef): bool = + ## Getter, re-interpretation of the daemon flag for reduced service mode + # No need for running the daemon with reduced service mode. So it is + # convenient to use this flag for indicating this. + not ctx.daemon + +proc `hibernate=`*(ctx: BeaconCtxRef; val: bool) = + ## Setter + ctx.daemon = not val + + # Control some error messages on the scheduler (e.g. zombie/banned-peer + # reconnection attempts, LRU flushing out oldest peer etc.) + ctx.noisyLog = not val + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/sync_desc.nim b/nimbus/sync/sync_desc.nim index 236d795f0..c1c179b3d 100644 --- a/nimbus/sync/sync_desc.nim +++ b/nimbus/sync/sync_desc.nim @@ -38,6 +38,7 @@ type CtxRef*[S] = ref object ## Shared state among all syncing peer workers (aka buddies.) + noisyLog*: bool ## Hold back `trace` and `debug` msgs if `false` poolMode*: bool ## Activate `runPool()` workers if set `true` daemon*: bool ## Enable global background job pool*: S ## Shared context for all worker peers diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index d6a0011ea..41cb7c326 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -341,12 +341,12 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = now = Moment.now() ttz = zombie.value.zombified + zombieTimeToLinger if ttz < Moment.now(): - trace "Reconnecting zombie peer ignored", peer, + if dsc.ctx.noisyLog: trace "Reconnecting zombie peer ignored", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers, canRequeue=(now-ttz) return # Zombie can be removed from the database dsc.buddies.del peer.key - trace "Zombie peer timeout, ready for requeing", peer, + if dsc.ctx.noisyLog: trace "Zombie peer timeout, ready for requeing", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers # Initialise worker for this peer @@ -357,7 +357,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = ctrl: BuddyCtrlRef(), peer: peer)) if not buddy.worker.runStart(): - trace "Ignoring useless peer", peer, nPeers, + if dsc.ctx.noisyLog: trace "Ignoring useless peer", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers buddy.worker.ctrl.zombie = true return @@ -373,7 +373,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = leastVal = dsc.buddies.shift.value # unqueue first/least item oldest = leastVal.data.worker if oldest.isNil: - trace "Dequeuing zombie peer", + if dsc.ctx.noisyLog: trace "Dequeuing zombie peer", # Fake `Peer` pretty print for `oldest` oldest=("Node[" & $leastVal.key.address & "]"), since=leastVal.data.zombified, nPeers, nWorkers=dsc.buddies.len, @@ -382,8 +382,8 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = else: # This could happen if there are idle entries in the table, i.e. # somehow hanging runners. - trace "Peer table full! Dequeuing least used entry", oldest, - nPeers, nWorkers=dsc.buddies.len, maxWorkers + if dsc.ctx.noisyLog: trace "Peer table full! Dequeuing least used entry", + oldest, nPeers, nWorkers=dsc.buddies.len, maxWorkers # Setting to `zombie` will trigger the worker to terminate (if any.) oldest.ctrl.zombie = true @@ -400,12 +400,12 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = nWorkers = dsc.buddies.len rc = dsc.buddies.eq peer.key if rc.isErr: - debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers - discard + if dsc.ctx.noisyLog: debug "Disconnected, unregistered peer", peer, + nPeers, nWorkers, maxWorkers elif rc.value.worker.isNil: # Re-visiting zombie - trace "Ignore zombie", peer, nPeers, nWorkers, maxWorkers - discard + if dsc.ctx.noisyLog: trace "Ignore zombie", peer, + nPeers, nWorkers, maxWorkers elif rc.value.worker.ctrl.zombie: # Don't disconnect, leave them fall out of the LRU cache. The effect is, # that reconnecting might be blocked, for a while. For few peers cases, @@ -414,7 +414,8 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = rc.value.worker = nil rc.value.dsc = nil rc.value.zombified = Moment.now() - trace "Disconnected, zombie", peer, nPeers, nWorkers, maxWorkers + if dsc.ctx.noisyLog: trace "Disconnected, zombie", peer, + nPeers, nWorkers, maxWorkers else: rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere dsc.buddies.del peer.key