From e109055f0a44c33ee3c4e277ff8f56aeeb2a0f43 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Fri, 11 Aug 2023 18:29:28 +0100 Subject: [PATCH] Implement distributed merge of backend filters --- nimbus/db/aristo/aristo_debug.nim | 22 +- nimbus/db/aristo/aristo_desc.nim | 12 +- nimbus/db/aristo/aristo_desc/aristo_error.nim | 5 +- .../aristo_desc/aristo_types_structural.nim | 10 + nimbus/db/aristo/aristo_filter.nim | 301 +++++++++++++----- nimbus/db/aristo/aristo_tx.nim | 51 ++- tests/test_aristo/test_tx.nim | 4 +- 7 files changed, 290 insertions(+), 115 deletions(-) diff --git a/nimbus/db/aristo/aristo_debug.nim b/nimbus/db/aristo/aristo_debug.nim index 3b3fde84dc..0603453d6a 100644 --- a/nimbus/db/aristo/aristo_debug.nim +++ b/nimbus/db/aristo/aristo_debug.nim @@ -200,7 +200,7 @@ proc ppSTab( "{" & sTab.sortedKeys .mapIt((it, sTab.getOrVoid it)) .mapIt("(" & it[0].ppVid & "," & it[1].ppVtx(db,it[0]) & ")") - .join("," & indent.toPfx(1)) & "}" + .join(indent.toPfx(2)) & "}" proc ppLTab( lTab: Table[LeafTie,VertexID]; @@ -210,7 +210,7 @@ proc ppLTab( "{" & lTab.sortedKeys .mapIt((it, lTab.getOrVoid it)) .mapIt("(" & it[0].ppLeafTie(db) & "," & it[1].ppVid & ")") - .join("," & indent.toPfx(1)) & "}" + .join(indent.toPfx(2)) & "}" proc ppPPrf(pPrf: HashSet[VertexID]): string = "{" & pPrf.sortedKeys.mapIt(it.ppVid).join(",") & "}" @@ -324,9 +324,11 @@ proc ppFilter(fl: AristoFilterRef; db: AristoDbRef; indent: int): string = pfx1 = indent.toPfx(1) pfx2 = indent.toPfx(2) result = "" - if db.roFilter.isNil: + if fl.isNil: result &= " n/a" return + result &= pfx & "trg(" & fl.trg.ppKey & ")" + result &= pfx & "src(" & fl.src.ppKey & ")" result &= pfx & "vGen" & pfx1 & "[" if fl.vGen.isSome: result &= fl.vGen.unsafeGet.mapIt(it.ppVid).join(",") @@ -361,7 +363,7 @@ proc ppBeOnly[T](be: T; db: AristoDbRef; indent: int): string = proc ppBe[T](be: T; db: AristoDbRef; indent: int): string = ## backend + filter - db.roFilter.ppFilter(db, indent) & indent.toPfx & be.ppBeOnly(db,indent) + db.roFilter.ppFilter(db, indent+1) & indent.toPfx & be.ppBeOnly(db,indent+1) proc ppLayer( layer: AristoLayerRef; @@ -374,8 +376,8 @@ proc ppLayer( indent = 4; ): string = let - pfx1 = indent.toPfx - pfx2 = indent.toPfx(1) + pfx1 = indent.toPfx(1) + pfx2 = indent.toPfx(2) nOKs = sTabOk.ord + lTabOk.ord + kMapOk.ord + pPrfOk.ord + vGenOk.ord tagOk = 1 < nOKs var @@ -392,6 +394,8 @@ proc ppLayer( rc if not layer.isNil: + if 2 < nOKs: + result &= "".doPrefix(false) if vGenOk: let tLen = layer.vGen.len @@ -613,6 +617,12 @@ proc pp*( ): string = db.top.pp(db, xTabOk=xTabOk, kMapOk=kMapOk, other=other, indent=indent) +proc pp*( + filter: AristoFilterRef; + db = AristoDbRef(); + indent = 4; + ): string = + filter.ppFilter(db, indent) proc pp*( be: TypedBackendRef; diff --git a/nimbus/db/aristo/aristo_desc.nim b/nimbus/db/aristo/aristo_desc.nim index e53d10484f..80392caac4 100644 --- a/nimbus/db/aristo/aristo_desc.nim +++ b/nimbus/db/aristo/aristo_desc.nim @@ -22,7 +22,7 @@ {.push raises: [].} import - std/tables, + std/[sets, tables], eth/common, ./aristo_constants, ./aristo_desc/[ @@ -31,8 +31,8 @@ import from ./aristo_desc/aristo_types_backend import AristoBackendRef +# Not auto-exporting backend export - # Not auto-exporting backend aristo_constants, aristo_error, aristo_types_identifiers, aristo_types_structural @@ -44,6 +44,13 @@ type txUid*: uint ## Unique ID among transactions level*: int ## Stack index for this transaction + AristoDudesRef* = ref object + case rwOk*: bool + of true: + roDudes*: HashSet[AristoDbRef] ## Read-only peers + else: + rwDb*: AristoDbRef ## Link to writable descriptor + AristoDbRef* = ref AristoDbObj AristoDbObj* = object ## Set of database layers, supporting transaction frames @@ -54,6 +61,7 @@ type txRef*: AristoTxRef ## Latest active transaction txUidGen*: uint ## Tx-relative unique number generator + dudes*: AristoDudesRef ## Related DB descriptors # Debugging data below, might go away in future xMap*: Table[HashLabel,VertexID] ## For pretty printing, extends `pAmk` diff --git a/nimbus/db/aristo/aristo_desc/aristo_error.nim b/nimbus/db/aristo/aristo_desc/aristo_error.nim index c70b29ce23..89a9f8f4a7 100644 --- a/nimbus/db/aristo/aristo_desc/aristo_error.nim +++ b/nimbus/db/aristo/aristo_desc/aristo_error.nim @@ -169,9 +169,11 @@ type DelVidStaleVtx # Functions from `aristo_filter.nim` + FilRoBackendOrMissing FilStateRootMissing FilStateRootMismatch FilPrettyPointlessLayer + FilDudeFilterUpdateError # Get functions form `aristo_get.nim` GetLeafNotFound @@ -194,8 +196,9 @@ type # Transaction wrappers TxArgStaleTx - TxBackendMissing + TxRoBackendOrMissing TxNoPendingTx + TxPendingTx TxNotTopTx TxStackGarbled TxStackUnderflow diff --git a/nimbus/db/aristo/aristo_desc/aristo_types_structural.nim b/nimbus/db/aristo/aristo_desc/aristo_types_structural.nim index 51f61b8384..f0b0bff372 100644 --- a/nimbus/db/aristo/aristo_desc/aristo_types_structural.nim +++ b/nimbus/db/aristo/aristo_desc/aristo_types_structural.nim @@ -250,6 +250,16 @@ proc dup*(layer: AristoLayerRef): AristoLayerRef = for (k,v) in layer.sTab.pairs: result.sTab[k] = v.dup +proc dup*(filter: AristoFilterRef): AristoFilterRef = + ## Duplicate layer. + result = AristoFilterRef( + src: filter.src, + kMap: filter.kMap, + vGen: filter.vGen, + trg: filter.trg) + for (k,v) in filter.sTab.pairs: + result.sTab[k] = v.dup + proc to*(node: NodeRef; T: type VertexRef): T = ## Extract a copy of the `VertexRef` part from a `NodeRef`. node.VertexRef.dup diff --git a/nimbus/db/aristo/aristo_filter.nim b/nimbus/db/aristo/aristo_filter.nim index 83c1edc0c9..da33e22858 100644 --- a/nimbus/db/aristo/aristo_filter.nim +++ b/nimbus/db/aristo/aristo_filter.nim @@ -13,8 +13,9 @@ ## import - std/[options, sequtils, tables], + std/[options, sequtils, sets, tables], results, + ./aristo_desc/aristo_types_backend, "."/[aristo_desc, aristo_get, aristo_vid] type @@ -26,16 +27,6 @@ type # Private helpers # ------------------------------------------------------------------------------ -proc getBeStateRoot( - db: AristoDbRef; - ): Result[HashKey,AristoError] = - let rc = db.getKeyBE VertexID(1) - if rc.isOk: - return ok(rc.value) - if rc.error == GetKeyNotFound: - return ok(VOID_HASH_KEY) - err(rc.error) - proc getLayerStateRoots( db: AristoDbRef; layer: AristoLayerRef; @@ -44,26 +35,115 @@ proc getLayerStateRoots( ## Get the Merkle hash key for target state root to arrive at after this ## reverse filter was applied. var spr: StateRootPair - block: - let rc = db.getBeStateRoot() - if rc.isErr: + + spr.be = block: + let rc = db.getKeyBE VertexID(1) + if rc.isOk: + rc.value + elif rc.error == GetKeyNotFound: + VOID_HASH_KEY + else: return err(rc.error) - spr.be = rc.value + block: spr.fg = layer.kMap.getOrVoid(VertexID 1).key if spr.fg.isValid: return ok(spr) + if chunkedMpt: let vid = layer.pAmk.getOrVoid HashLabel(root: VertexID(1), key: spr.be) if vid == VertexID(1): spr.fg = spr.be return ok(spr) + if layer.sTab.len == 0 and layer.kMap.len == 0 and layer.pAmk.len == 0: return err(FilPrettyPointlessLayer) + err(FilStateRootMismatch) +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ + +proc merge( + db: AristoDbRef; + upper: AristoFilterRef; # Src filter, `nil` is ok + lower: AristoFilterRef; # Trg filter, `nil` is ok + beStateRoot: HashKey; # Merkle hash key + ): Result[AristoFilterRef,(VertexID,AristoError)] = + ## Merge argument `upper` into the `lower` filter instance. + ## + ## Comparing before and after merge + ## :: + ## current | merged + ## ----------------------------+-------------------------------- + ## trg2 --upper-- (src2==trg1) | + ## | trg2 --newFilter-- (src1==trg0) + ## trg1 --lower-- (src1==trg0) | + ## | + ## trg0 --beStateRoot | trg0 --beStateRoot + ## | + ## + # Degenerate case: `upper` is void + if lower.isNil or lower.vGen.isNone: + if upper.isNil or upper.vGen.isNone: + # Even more degenerate case when both filters are void + return ok AristoFilterRef( + src: beStateRoot, + trg: beStateRoot, + vGen: none(seq[VertexID])) + if upper.src != beStateRoot: + return err((VertexID(1),FilStateRootMismatch)) + return ok(upper) + + # Degenerate case: `upper` is non-trivial and `lower` is void + if upper.isNil or upper.vGen.isNone: + if lower.src != beStateRoot: + return err((VertexID(0), FilStateRootMismatch)) + return ok(lower) + + # Verify stackability + if upper.src != lower.trg or + lower.src != beStateRoot: + return err((VertexID(0), FilStateRootMismatch)) + + # There is no need to deep copy table vertices as they will not be modified. + let newFilter = AristoFilterRef( + src: lower.src, + sTab: lower.sTab, + kMap: lower.kMap, + vGen: upper.vGen, + trg: upper.trg) + + for (vid,vtx) in upper.sTab.pairs: + if vtx.isValid or not newFilter.sTab.hasKey vid: + newFilter.sTab[vid] = vtx + elif newFilter.sTab.getOrVoid(vid).isValid: + let rc = db.getVtxUBE vid + if rc.isOk: + newFilter.sTab[vid] = vtx # VertexRef(nil) + elif rc.error == GetVtxNotFound: + newFilter.sTab.del vid + else: + return err((vid,rc.error)) + + for (vid,key) in upper.kMap.pairs: + if key.isValid or not newFilter.kMap.hasKey vid: + newFilter.kMap[vid] = key + elif newFilter.kMap.getOrVoid(vid).isValid: + let rc = db.getKeyUBE vid + if rc.isOk: + newFilter.kMap[vid] = key # VOID_HASH_KEY + elif rc.error == GetKeyNotFound: + newFilter.kMap.del vid + else: + return err((vid,rc.error)) + + ok newFilter + + # ------------------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------------------ @@ -122,6 +202,51 @@ proc fwdFilter*( vGen: some(layer.vGen.vidReorg), # Compact recycled IDs trg: trgRoot) + +proc revFilter*( + db: AristoDbRef; + filter: AristoFilterRef; + ): Result[AristoFilterRef,(VertexID,AristoError)] = + ## Assemble reverse filter for the `filter` argument, i.e. changes to the + ## backend that reverse the effect of applying the this read-only filter. + ## + ## This read-only filter is calculated against the current unfiltered + ## backend (excluding optionally installed read-only filter.) + ## + # Register MPT state roots for reverting back + let rev = AristoFilterRef( + src: filter.trg, + trg: filter.src) + + # Get vid generator state on backend + block: + let rc = db.getIdgUBE() + if rc.isErr: + return err((VertexID(0), rc.error)) + rev.vGen = some rc.value + + # Calculate reverse changes for the `sTab[]` structural table + for vid in filter.sTab.keys: + let rc = db.getVtxUBE vid + if rc.isOk: + rev.sTab[vid] = rc.value + elif rc.error == GetVtxNotFound: + rev.sTab[vid] = VertexRef(nil) + else: + return err((vid,rc.error)) + + # Calculate reverse changes for the `kMap` sequence. + for vid in filter.kMap.keys: + let rc = db.getKeyUBE vid + if rc.isOk: + rev.kMap[vid] = rc.value + elif rc.error == GetKeyNotFound: + rev.kMap[vid] = VOID_HASH_KEY + else: + return err((vid,rc.error)) + + ok(rev) + # ------------------------------------------------------------------------------ # Public functions, apply/install filters # ------------------------------------------------------------------------------ @@ -130,79 +255,99 @@ proc merge*( db: AristoDbRef; filter: AristoFilterRef; ): Result[void,(VertexID,AristoError)] = - ## Merge argument `filter` to the filter layer. - ## - ## Comparing before and after merge - ## :: - ## current | merged - ## ----------------------------------+-------------------------------- - ## trg2 --filter-- (src2==trg1) | - ## | trg2 --newFilter-- (src1==trg0) - ## trg1 --db.roFilter-- (src1==trg0) | - ## | - ## trg0 --db.backend | trg0 --db.backend - ## | - let beRoot = block: - let rc = db.getBeStateRoot() + ## Merge the argument `filter` into the read-only filter layer. Note that + ## this function has no control of the filter source. Having merged the + ## argument `filter`, all the `top` and `stack` layers should be cleared. + let ubeRootKey = block: + let rc = db.getKeyUBE VertexID(1) + if rc.isOk: + rc.value + elif rc.error == GetKeyNotFound: + VOID_HASH_KEY + else: + return err((VertexID(1),rc.error)) + + db.roFilter = block: + let rc = db.merge(filter, db.roFilter, ubeRootKey) if rc.isErr: - return err((VertexID(1),FilStateRootMissing)) + return err(rc.error) rc.value - if filter.vGen.isNone: - # Blind argument filter - if db.roFilter.isNil: - # Force read-only system - db.roFilter = AristoFilterRef( - src: beRoot, - trg: beRoot, - vGen: none(seq[VertexID])) - return ok() + ok() - # Simple case: no read-only filter yet - if db.roFilter.isNil or db.roFilter.vGen.isNone: - if filter.src != beRoot: - return err((VertexID(1),FilStateRootMismatch)) - db.roFilter = filter + +proc canResolveBE*(db: AristoDbRef): bool = + ## Check whether the read-only filter can be merged into the backend + if not db.backend.isNil: + if db.dudes.isNil or db.dudes.rwOk: + return true + + +proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] = + ## Resolve the backend filter into the physical backend. This requires that + ## the argument `db` descriptor has read-write permission for the backend + ## (see also the below function `ackqRwMode()`.) + ## + ## For any associated descriptors working on the same backend, their backend + ## filters will be updated so that the change of the backend DB remains + ## unnoticed. + if not db.canResolveBE(): + return err((VertexID(1),FilRoBackendOrMissing)) + + # Blind or missing filter + if db.roFilter.isNil: + return ok() + if db.roFilter.vGen.isNone: + db.roFilter = AristoFilterRef(nil) return ok() - # Verify merge stackability into existing read-only filter - if filter.src != db.roFilter.trg: - return err((VertexID(1),FilStateRootMismatch)) + let ubeRootKey = block: + let rc = db.getKeyUBE VertexID(1) + if rc.isOk: + rc.value + elif rc.error == GetKeyNotFound: + VOID_HASH_KEY + else: + return err((VertexID(1),rc.error)) - # Merge `filter` into `roFilter` as `newFilter`. There is no need to deep - # copy table vertices as they will not be modified. - let newFilter = AristoFilterRef( - src: db.roFilter.src, - sTab: db.roFilter.sTab, - kMap: db.roFilter.kMap, - vGen: filter.vGen, - trg: filter.trg) + # Filters rollback helper + var roFilters: seq[(AristoDbRef,AristoFilterRef)] + proc rollback() = + for (d,f) in roFilters: + d.roFilter = f - for (vid,vtx) in filter.sTab.pairs: - if vtx.isValid or not newFilter.sTab.hasKey vid: - newFilter.sTab[vid] = vtx - elif newFilter.sTab.getOrVoid(vid).isValid: - let rc = db.getVtxUBE vid - if rc.isOk: - newFilter.sTab[vid] = vtx # VertexRef(nil) - elif rc.error == GetVtxNotFound: - newFilter.sTab.del vid - else: - return err((vid,rc.error)) + # Update dudes + if not db.dudes.isNil: + # Calculate reverse filter from current filter + let rev = block: + let rc = db.revFilter db.roFilter + if rc.isErr: + return err(rc.error) + rc.value - for (vid,key) in filter.kMap.pairs: - if key.isValid or not newFilter.kMap.hasKey vid: - newFilter.kMap[vid] = key - elif newFilter.kMap.getOrVoid(vid).isValid: - let rc = db.getKeyUBE vid - if rc.isOk: - newFilter.kMap[vid] = key # VOID_HASH_KEY - elif rc.error == GetKeyNotFound: - newFilter.kMap.del vid - else: - return err((vid,rc.error)) + # Update distributed filters. Note that the physical backend database + # has not been updated, yet. So the new root key for the backend will + # be `db.roFilter.trg`. + for dude in db.dudes.roDudes.items: + let rc = db.merge(dude.roFilter, rev, db.roFilter.trg) + if rc.isErr: + rollback() + return err(rc.error) + roFilters.add (dude, dude.roFilter) + dude.roFilter = rc.value + + # Save structural and other table entries + let + be = db.backend + txFrame = be.putBegFn() + be.putVtxFn(txFrame, db.roFilter.sTab.pairs.toSeq) + be.putKeyFn(txFrame, db.roFilter.kMap.pairs.toSeq) + be.putIdgFn(txFrame, db.roFilter.vGen.unsafeGet) + let w = be.putEndFn txFrame + if w != AristoError(0): + rollback() + return err((VertexID(0),w)) - db.roFilter = newFilter ok() # ------------------------------------------------------------------------------ diff --git a/nimbus/db/aristo/aristo_tx.nim b/nimbus/db/aristo/aristo_tx.nim index a0a2395f02..5b126851b8 100644 --- a/nimbus/db/aristo/aristo_tx.nim +++ b/nimbus/db/aristo/aristo_tx.nim @@ -14,7 +14,7 @@ {.push raises: [].} import - std/[options, sequtils, tables], + std/options, results, "."/[aristo_desc, aristo_filter, aristo_hashify] @@ -32,10 +32,6 @@ func getDbDescFromTopTx(tx: AristoTxRef): Result[AristoDbRef,AristoError] = return err(TxStackUnderflow) ok db -# ------------------------------------------------------------------------------ -# Private functions -# ------------------------------------------------------------------------------ - proc getTxUid(db: AristoDbRef): uint = if db.txUidGen == high(uint): db.txUidGen = 0 @@ -158,7 +154,7 @@ proc commit*( return err((VertexID(0),rc.error)) rc.value - if not dontHashify: + if db.top.dirty and not dontHashify: let rc = db.hashify() if rc.isErr: return err(rc.error) @@ -196,8 +192,8 @@ proc collapse*( if not commit: db.stack[0].swap db.top - if not dontHashify: - var rc = db.hashify() + if db.top.dirty and not dontHashify: + let rc = db.hashify() if rc.isErr: if not commit: db.stack[0].swap db.top # restore @@ -218,8 +214,8 @@ proc stow*( chunkedMpt = false; # Partial data (e.g. from `snap`) ): Result[void,(VertexID,AristoError)] = ## If there is no backend while the `persistent` argument is set `true`, - ## the function returns immediately with an error.The same happens if the - ## backend is locked while `persistent` is set (e.g. by an `exec()` call.) + ## the function returns immediately with an error.The same happens if there + ## is a pending transaction. ## ## The `dontHashify` is treated as described for `commit()`. ## @@ -234,9 +230,17 @@ proc stow*( ## If the argument `persistent` is set `true`, all the staged data are merged ## into the physical backend database and the staged data area is cleared. ## - let be = db.backend - if be.isNil and persistent: - return err((VertexID(0),TxBackendMissing)) + if not db.txRef.isNil: + return err((VertexID(0),TxPendingTx)) + if 0 < db.stack.len: + return err((VertexID(0),TxStackGarbled)) + if persistent and not db.canResolveBE(): + return err((VertexID(0),TxRoBackendOrMissing)) + + if db.top.dirty and not dontHashify: + let rc = db.hashify() + if rc.isErr: + return err(rc.error) let fwd = block: let rc = db.fwdFilter(db.top, chunkedMpt) @@ -246,22 +250,17 @@ proc stow*( if fwd.vGen.isSome: # Otherwise this layer is pointless block: + # Merge `top` layer into `roFilter` let rc = db.merge fwd if rc.isErr: return err(rc.error) - rc.value - - if persistent: - # Save structural and other table entries - let txFrame = be.putBegFn() - be.putVtxFn(txFrame, db.roFilter.sTab.pairs.toSeq) - be.putKeyFn(txFrame, db.roFilter.kMap.pairs.toSeq) - be.putIdgFn(txFrame, db.roFilter.vGen.unsafeGet) - let w = be.putEndFn txFrame - if w != AristoError(0): - return err((VertexID(0),w)) - - db.roFilter = AristoFilterRef(nil) + db.top = AristoLayerRef(vGen: db.roFilter.vGen.unsafeGet) + + if persistent: + let rc = db.resolveBE() + if rc.isErr: + return err(rc.error) + db.roFilter = AristoFilterRef(nil) # Delete or clear stack and clear top db.stack.setLen(0) diff --git a/tests/test_aristo/test_tx.nim b/tests/test_aristo/test_tx.nim index 18bac22a92..b03a020289 100644 --- a/tests/test_aristo/test_tx.nim +++ b/tests/test_aristo/test_tx.nim @@ -9,12 +9,12 @@ # at your option. This file may not be copied, modified, or # distributed except according to those terms. -## Aristo (aka Patricia) DB records merge test +## Aristo (aka Patricia) DB records transaction based merge test import std/[algorithm, bitops, sequtils, sets, tables], eth/common, - stew/results, + results, unittest2, ../../nimbus/db/aristo/[ aristo_check, aristo_delete, aristo_desc, aristo_get, aristo_merge],