Skip to content

Commit

Permalink
Beacon sync updates tbc (#2818)
Browse files Browse the repository at this point in the history
* Clear rejected sync target so that it would not be processed again

* Use in-memory table to stash headers after FCU import has started

why:
  After block imported has started, there is no way to save/stash block
  headers persistently. The FCU handlers always maintain a positive
  transaction level and in some instances the current transaction is
  flushed and re-opened.

  This patch fixes an exception thrown when a block header has gone
  missing.

* When resuming sync, delete stale headers and state

why:
  Deleting headers saves some persistent space that would get lost
  otherwise. Deleting the state after resuming prevents from race
  conditions.

* On clean start hibernate sync `deamon` entity before first update from CL

details:
  Only reduces services are running
  * accept FCU from CL
  * fetch finalised header after accepting FCY (provides hash only)

* Improve text/meaning of some log messages

* Revisit error handling for useless peers

why:
  A peer is abandoned from if the error score is too high. This was not
  properly handled for some fringe case when the error was detected at
  staging time but fetching via eth/xx was ok.

* Clarify `break` meaning by using labelled `break` statements

* Fix action how to commit when sync target has been reached

why:
  The sync target block number might precede than latest FCU block number.
  This happens when the engine API squeezes in some request to execute
  and import subsequent blocks.

  This patch fixes and assert thrown when after reaching target the latest
  FCU block number is higher than the expected target block number.

* Update TODO list
  • Loading branch information
mjfh authored Nov 1, 2024
1 parent 73661fd commit 430611d
Show file tree
Hide file tree
Showing 13 changed files with 331 additions and 134 deletions.
34 changes: 34 additions & 0 deletions nimbus/sync/beacon/TODO.md
Original file line number Diff line number Diff line change
@@ -1 +1,35 @@
## General TODO items

* Update/resolve code fragments which are tagged FIXME

## Open issues

### 1. Weird behaviour of the RPC/engine API

See issue [#2816](https://github.com/status-im/nimbus-eth1/issues/2816)

### 2. Some assert

Error: unhandled exception: key not found: 0x441a0f..027bc96a [AssertionDefect]

which happened on several `holesky` tests immediately after loging somehing like

NTC 2024-10-31 21:37:34.728 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=044d22843cbe baseNumber=2646764 baseHash=21ec11c1deac

or from another machine with literally the same exception text (but the stack-trace differs)

NTC 2024-10-31 21:58:07.616 Finalized blocks persisted file=forked_chain.nim:231 numberOfBlocks=129 last=9cbcc52953a8 baseNumber=2646857 baseHash=9db5c2ac537b


### 3. Some assert

Seen on `holesky`, sometimes the header chain cannot not be joined with its
lower end after completing due to different hashes leading to an assert failure

Error: unhandled exception: header chains C-D joining hashes do not match L=#2646126 lHash=3bc2beb1b565 C=#2646126 cHash=3bc2beb1b565 D=#2646127 dParent=671c7c6cb904

which was preceeded somewhat earlier by log entries

INF 2024-10-31 18:21:16.464 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=3bc2beb1b565
[..]
INF 2024-10-31 18:21:25.872 Forkchoice requested sync to new head file=api_forkchoice.nim:107 number=2646126 hash=671c7c6cb904
31 changes: 18 additions & 13 deletions nimbus/sync/beacon/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ proc bodiesToFetchOk(buddy: BeaconBuddyRef): bool =

proc napUnlessSomethingToFetch(buddy: BeaconBuddyRef): Future[bool] {.async.} =
## When idle, save cpu cycles waiting for something to do.
if buddy.ctx.pool.importRunningOk or
not (buddy.headersToFetchOk() or
if buddy.ctx.pool.blockImportOk or # currently importing blocks
buddy.ctx.hibernate or # not activated yet?
not (buddy.headersToFetchOk() or # something on TODO list
buddy.bodiesToFetchOk()):
await sleepAsync workerIdleWaitInterval
return true
return false
else:
return false

# ------------------------------------------------------------------------------
# Public start/stop and admin functions
Expand All @@ -54,9 +56,6 @@ proc setup*(ctx: BeaconCtxRef; info: static[string]): bool =

# Debugging stuff, might be an empty template
ctx.setupTicker()

# Enable background daemon
ctx.daemon = true
true

proc release*(ctx: BeaconCtxRef; info: static[string]) =
Expand All @@ -70,20 +69,20 @@ proc start*(buddy: BeaconBuddyRef; info: static[string]): bool =
let peer = buddy.peer

if runsThisManyPeersOnly <= buddy.ctx.pool.nBuddies:
debug info & ": peers limit reached", peer
if not buddy.ctx.hibernate: debug info & ": peers limit reached", peer
return false

if not buddy.startBuddy():
debug info & ": failed", peer
if not buddy.ctx.hibernate: debug info & ": failed", peer
return false

debug info & ": new peer", peer
if not buddy.ctx.hibernate: debug info & ": new peer", peer
true

proc stop*(buddy: BeaconBuddyRef; info: static[string]) =
## Clean up this peer
debug info & ": release peer", peer=buddy.peer,
nInvocations=buddy.only.nMultiLoop,
if not buddy.ctx.hibernate: debug info & ": release peer", peer=buddy.peer,
ctrl=buddy.ctrl.state, nInvocations=buddy.only.nMultiLoop,
lastIdleGap=buddy.only.multiRunIdle.toStr
buddy.stopBuddy()

Expand All @@ -98,8 +97,14 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} =
## as `true` not before there is some activity on the `runPool()`,
## `runSingle()`, or `runMulti()` functions.
##
## On a fresh start, the flag `ctx.daemon` will not be set `true` before the
## first usable request from the CL (via RPC) stumbles in.
##
# Check for a possible header layout and body request changes
ctx.updateSyncStateLayout info
if ctx.hibernate:
return

ctx.updateBlockRequests info

# Execute staged block records.
Expand All @@ -110,8 +115,8 @@ proc runDaemon*(ctx: BeaconCtxRef; info: static[string]) {.async.} =
# place. So there might be some peers active. If they are waiting for
# a message reply, this will most probably time out as all processing
# power is usurped by the import task here.
ctx.pool.importRunningOk = true
defer: ctx.pool.importRunningOk = false
ctx.pool.blockImportOk = true
defer: ctx.pool.blockImportOk = false

# Import from staged queue.
while await ctx.blocksStagedImport(info):
Expand Down
101 changes: 62 additions & 39 deletions nimbus/sync/beacon/worker/blocks_staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ import
../worker_desc,
./blocks_staged/bodies,
./update/metrics,
"."/[blocks_unproc, db]
"."/[blocks_unproc, db, helpers]

# ------------------------------------------------------------------------------
# Private debugging & logging helpers
# ------------------------------------------------------------------------------

formatIt(Hash32):
it.data.short

# ------------------------------------------------------------------------------
# Private functions
Expand Down Expand Up @@ -50,11 +57,16 @@ proc fetchAndCheck(
blk.blocks.setLen(offset + ivReq.len)
var blockHash = newSeq[Hash32](ivReq.len)
for n in 1u ..< ivReq.len:
let header = ctx.dbPeekHeader(ivReq.minPt + n).expect "stashed header"
let header = ctx.dbPeekHeader(ivReq.minPt + n).valueOr:
# There is nothing one can do here
raiseAssert info & " stashed header missing: n=" & $n &
" ivReq=" & $ivReq & " nth=" & (ivReq.minPt + n).bnStr
blockHash[n - 1] = header.parentHash
blk.blocks[offset + n].header = header
blk.blocks[offset].header =
ctx.dbPeekHeader(ivReq.minPt).expect "stashed header"
blk.blocks[offset].header = ctx.dbPeekHeader(ivReq.minPt).valueOr:
# There is nothing one can do here
raiseAssert info & " stashed header missing: n=0" &
" ivReq=" & $ivReq & " nth=" & ivReq.minPt.bnStr
blockHash[ivReq.len - 1] =
rlp.encode(blk.blocks[offset + ivReq.len - 1].header).keccak256

Expand Down Expand Up @@ -177,11 +189,17 @@ proc blocksStagedCollect*(

# Fetch and extend staging record
if not await buddy.fetchAndCheck(ivReq, blk, info):

# Throw away first time block fetch data. Keep other data for a
# partially assembled list.
if nBlkBlocks == 0:
if 0 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped:
buddy.only.nBdyRespErrors.inc

if (1 < buddy.only.nBdyRespErrors and buddy.ctrl.stopped) or
fetchBodiesReqThresholdCount < buddy.only.nBdyRespErrors:
# Make sure that this peer does not immediately reconnect
buddy.ctrl.zombie = true
trace info & ": list completely failed", peer, iv, ivReq,
trace info & ": current block list discarded", peer, iv, ivReq,
ctrl=buddy.ctrl.state, nRespErrors=buddy.only.nBdyRespErrors
ctx.blocksUnprocCommit(iv.len, iv)
# At this stage allow a task switch so that some other peer might try
Expand Down Expand Up @@ -238,7 +256,8 @@ proc blocksStagedImport*(
let imported = ctx.chain.latestNumber()
if qItem.key != imported + 1:
trace info & ": there is a gap L vs. staged",
B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr
B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr,
C=ctx.layout.coupler.bnStr
doAssert imported < qItem.key
return false

Expand All @@ -253,45 +272,49 @@ proc blocksStagedImport*(
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr

var maxImport = iv.maxPt
for n in 0 ..< nBlocks:
let nBn = qItem.data.blocks[n].header.number
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
warn info & ": import block error", iv, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, nBn=nBn.bnStr, `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break

# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot
if not ctx.daemon:
# Shutdown?
maxImport = ctx.chain.latestNumber()
break

# Update, so it can be followed nicely
ctx.updateMetrics()

# Occasionally mark the chain finalized
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
nthHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nthHash else: ctx.layout.finalHash

doAssert nBn == ctx.chain.latestNumber()
ctx.pool.chain.forkChoice(headHash=nthHash, finalizedHash=finHash).isOkOr:
warn info & ": fork choice error", n, iv, B=ctx.chain.baseNumber.bnStr,
L=ctx.chain.latestNumber.bnStr, F=ctx.layout.final.bnStr, nthHash,
finHash=(if finHash == nthHash: "nHash" else: "F"), `error`=error
block importLoop:
for n in 0 ..< nBlocks:
let nBn = qItem.data.blocks[n].header.number
ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr:
warn info & ": import block error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
nthBn=nBn.bnStr, nthHash=qItem.data.getNthHash(n), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break
break importLoop

# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot
if not ctx.daemon:
# Shutdown?
maxImport = ctx.chain.latestNumber()
break
break importLoop

# Update, so it can be followed nicely
ctx.updateMetrics()

# Occasionally mark the chain finalized
if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks:
let
nthHash = qItem.data.getNthHash(n)
finHash = if nBn < ctx.layout.final: nthHash
else: ctx.layout.finalHash

doAssert nBn == ctx.chain.latestNumber()
ctx.pool.chain.forkChoice(nthHash, finHash).isOkOr:
warn info & ": fork choice error", n, iv,
B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr,
F=ctx.layout.final.bnStr, nthBn=nBn.bnStr, nthHash,
finHash=(if finHash == nthHash: "nthHash" else: "F"), `error`=error
# Restore what is left over below
maxImport = ctx.chain.latestNumber()
break importLoop

# Allow pseudo/async thread switch.
await sleepAsync asyncThreadSwitchTimeSlot
if not ctx.daemon:
maxImport = ctx.chain.latestNumber()
break importLoop

# Import probably incomplete, so a partial roll back may be needed
if maxImport < iv.maxPt:
Expand Down
Loading

0 comments on commit 430611d

Please sign in to comment.