Skip to content

Commit

Permalink
Implement distributed backend access management
Browse files Browse the repository at this point in the history
details:
  Implemented and tested as described in chapter 5 of the `README.md`
  file.
  • Loading branch information
mjfh committed Aug 17, 2023
1 parent e109055 commit ef3cb1b
Show file tree
Hide file tree
Showing 8 changed files with 632 additions and 32 deletions.
18 changes: 11 additions & 7 deletions nimbus/db/aristo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ should be allocated in the structural table associated with the zero key.
db | stack[n] | |
desc | : | | optional passive delta layers, handled by
obj | stack[1] | | transaction management (can be used to
| | stack[0] | | successively replace the top layer)
| | stack[0] | | successively recover the top layer)
| +----------+ v
| +----------+
| | roFilter | optional read-only backend filter
Expand Down Expand Up @@ -449,11 +449,15 @@ Nevertheless, *(8)* can alse be transformed by committing and saving *tx2*
| ø, ø | tx2+PBE
| tx3, ~tx2 |

As *(11)* and *(13)* represent the same API, one has
As *(11)* and *(13)* represent the same API, one has

tx2+PBE == tx1+(tx2+~tx1)+PBE because of the middle rows (14)
~tx2 == ~tx1+~(tx2+~tx1) because of (14) (15)
tx2+PBE =~ tx1+(tx2+~tx1)+PBE because of the middle rows (14)
~tx2 =~ ~tx1+~(tx2+~tx1) because of (14) (15)

which shows some distributive property in *(14)* and commutative property in
*(15)* for this example. In particulat it might be handy for testing/verifying
against this example.
which looks like some distributive property in *(14)* and commutative
property in *(15)* for this example (but it is not straight algebraically.)
The *=~* operator above indicates that the representations are equivalent in
the sense that they have the same effect on the backend database (looks a
bit like residue classes.)

It might be handy for testing/verifying an implementation using this example.
15 changes: 11 additions & 4 deletions nimbus/db/aristo/aristo_desc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
## Aristo DB -- a Patricia Trie with labeled edges
## ===============================================
##
## These data structures allows to overlay the *Patricia Trie* with *Merkel
## These data structures allow to overlay the *Patricia Trie* with *Merkel
## Trie* hashes. See the `README.md` in the `aristo` folder for documentation.
##
## Some semantic explanations;
Expand All @@ -22,7 +22,7 @@
{.push raises: [].}

import
std/[sets, tables],
std/[hashes, sets, tables],
eth/common,
./aristo_constants,
./aristo_desc/[
Expand Down Expand Up @@ -53,7 +53,7 @@ type

AristoDbRef* = ref AristoDbObj
AristoDbObj* = object
## Set of database layers, supporting transaction frames
## Three tier database object supporting distributed instances.
top*: AristoLayerRef ## Database working layer, mutable
stack*: seq[AristoLayerRef] ## Stashed immutable parent layers
roFilter*: AristoFilterRef ## Apply read filter (locks writing)
Expand All @@ -66,6 +66,9 @@ type
# Debugging data below, might go away in future
xMap*: Table[HashLabel,VertexID] ## For pretty printing, extends `pAmk`

AristoDbAction* = proc(db: AristoDbRef) {.gcsafe, raises: [CatchableError].}
## Generic call back function/closure.

# ------------------------------------------------------------------------------
# Public helpers
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -106,9 +109,13 @@ func isValid*(vid: VertexID): bool =
# Public functions, miscellaneous
# ------------------------------------------------------------------------------

# Hash set helper
func hash*(db: AristoDbRef): Hash =
## Table/KeyedQueue/HashSet mixin
cast[pointer](db).hash

# Note that the below `init()` function cannot go into
# `aristo_types_identifiers` as this would result in a circular import.

func init*(key: var HashKey; data: openArray[byte]): bool =
## Import argument `data` into `key` which must have length either `32`, or
## `0`. The latter case is equivalent to an all zero byte array of size `32`.
Expand Down
1 change: 1 addition & 0 deletions nimbus/db/aristo/aristo_desc/aristo_error.nim
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type
FilStateRootMismatch
FilPrettyPointlessLayer
FilDudeFilterUpdateError
FilNotReadOnlyDude

# Get functions form `aristo_get.nim`
GetLeafNotFound
Expand Down
47 changes: 47 additions & 0 deletions nimbus/db/aristo/aristo_filter.nim
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,53 @@ proc resolveBE*(db: AristoDbRef): Result[void,(VertexID,AristoError)] =

ok()


proc ackqRwMode*(db: AristoDbRef): Result[void,AristoError] =
## Re-focus the `db` argument descriptor to backend read-write permission.
if not db.dudes.isNil and not db.dudes.rwOk:
# Steal dudes list, make the rw-parent a read-only dude
let parent = db.dudes.rwDb
db.dudes = parent.dudes
parent.dudes = AristoDudesRef(rwOk: false, rwDb: db)

# Exclude self
db.dudes.roDudes.excl db

# Update dudes
for w in db.dudes.roDudes:
# Let all other dudes refer to this one
w.dudes.rwDb = db

# Update dudes list (parent was alredy updated)
db.dudes.roDudes.incl parent
return ok()

err(FilNotReadOnlyDude)


proc dispose*(db: AristoDbRef): Result[void,AristoError] =
## Terminate usage of the `db` argument descriptor with backend read-only
## permission.
##
## This type of descriptoy should always be terminated after use. Otherwise
## it would always be udated when running `resolveBE()` which costs
## unnecessary computing ressources. Also, the read-only backend filter
## copies might grow big when it could be avoided.
if not db.isNil and
not db.dudes.isNil and
not db.dudes.rwOk:
# Unlink argument `db`
db.dudes.rwDb.dudes.roDudes.excl db

# Unlink more so it would not do harm if used wrongly
db.stack.setlen(0)
db.backend = AristoBackendRef(nil)
db.txRef = AristoTxRef(nil)
db.dudes = AristoDudesRef(nil)
return ok()

err(FilNotReadOnlyDude)

# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------
21 changes: 16 additions & 5 deletions nimbus/db/aristo/aristo_init/memory_only.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
{.push raises: [].}

import
std/sets,
results,
../aristo_desc,
../aristo_desc/aristo_types_backend,
Expand Down Expand Up @@ -57,12 +58,22 @@ proc finish*(db: AristoDbRef; flush = false) =
## depending on the type of backend (e.g. the `BackendMemory` backend will
## always flush on close.)
##
## In case of distributed descriptors accessing the same backend, all
## distributed descriptors will be destroyed.
##
## This distructor may be used on already *destructed* descriptors.
if not db.backend.isNil:
db.backend.closeFn flush
db.backend = AristoBackendRef(nil)
db.top = AristoLayerRef(nil)
db.stack.setLen(0)
##
if not db.isNil:
if not db.backend.isNil:
db.backend.closeFn flush

if db.dudes.isNil:
db[] = AristoDbObj()
else:
let lebo = if db.dudes.rwOk: db else: db.dudes.rwDb
for w in lebo.dudes.roDudes:
w[] = AristoDbObj()
lebo[] = AristoDbObj()

# -----------------

Expand Down
130 changes: 115 additions & 15 deletions nimbus/db/aristo/aristo_tx.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
{.push raises: [].}

import
std/options,
std/[options, sets],
results,
"."/[aristo_desc, aristo_filter, aristo_hashify]
"."/[aristo_desc, aristo_filter, aristo_get, aristo_hashify]

func isTop*(tx: AristoTxRef): bool

Expand All @@ -38,6 +38,18 @@ proc getTxUid(db: AristoDbRef): uint =
db.txUidGen.inc
db.txUidGen

proc linkClone(db: AristoDbRef; clone: AristoDbRef) =
## Link clone to parent
clone.dudes = AristoDudesRef(
rwOk: false,
rwDb: db)
if db.dudes.isNil:
db.dudes = AristoDudesRef(
rwOk: true,
roDudes: @[clone].toHashSet)
else:
db.dudes.roDudes.incl clone

# ------------------------------------------------------------------------------
# Public functions, getters
# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -71,19 +83,107 @@ func to*(tx: AristoTxRef; T: type[AristoDbRef]): T =
## Getter, retrieves the parent database descriptor from argument `tx`
tx.db

proc rebase*(
tx: AristoTxRef; # Some transaction on database
): Result[void,AristoError] =
## Revert transaction stack to an earlier point in time.
if not tx.isTop():
let
db = tx.db
inx = tx.level
if db.stack.len <= inx or db.stack[inx].txUid != tx.txUid:
return err(TxArgStaleTx)
# Roll back to some earlier layer.
db.top = db.stack[inx]
db.stack.setLen(inx)

proc copyCat*(tx: AristoTxRef): Result[AristoDbRef,AristoError] =
## Clone a transaction into a new DB descriptor. The new descriptor is linked
## to the transaction parent and will be updated with functions like
## `aristo_filter.resolveBE()` or `aristo_filter.ackqRwMode()`. The new
## descriptor is fully functional apart from the fact that the physical
## backend cannot be updated (but see `aristo_filter.ackqRwMode()`.)
##
## The new DB descriptor contains a copy of the argument transaction `tx` as
## top layer of level 1 (i.e. this is he only transaction.) Rolling back will
## end up at the backend layer (incl. backend filter.)
##
## Use `aristo_filter.dispose()` to clean up the new DB descriptor.
##
let db = tx.db

# Provide new top layer
var topLayer: AristoLayerRef
if db.txRef == tx:
topLayer = db.top.dup
elif tx.level < db.stack.len:
topLayer = db.stack[tx.level].dup
else:
return err(TxArgStaleTx)
if topLayer.txUid != tx.txUid:
return err(TxArgStaleTx)
topLayer.txUid = 1

# Empty stack
let stackLayer = block:
let rc = db.getIdgBE()
if rc.isOk:
AristoLayerRef(vGen: rc.value)
elif rc.error == GetIdgNotFound:
AristoLayerRef()
else:
return err(rc.error)

# Set up clone associated to `db`
let txClone = AristoDbRef(
top: topLayer, # is a deep copy
stack: @[stackLayer],
roFilter: db.roFilter, # no need to copy contents (done when updated)
backend: db.backend,
txUidGen: 1,
dudes: AristoDudesRef(
rwOk: false,
rwDb: db))

# Link clone to parent
db.linkClone txClone

# Install transaction similar to `tx` on clone
txClone.txRef = AristoTxRef(
db: txClone,
txUid: 1,
level: 1)

ok(txClone)

proc copyCat*(db: AristoDbRef): Result[AristoDbRef,AristoError] =
## Variant of `copyCat()`. If there is a transaction pending, then the
## function returns `db.txTop.value.copyCat()`. Otherwise it returns a
## clone of the top layer.
##
## Use `aristo_filter.dispose()` to clean up the copy cat descriptor.
##
if db.txRef.isNil:
let dbClone = AristoDbRef(
top: db.top.dup, # is a deep copy
roFilter: db.roFilter, # no need to copy contents (done when updated)
backend: db.backend)

# Link clone to parent
db.linkClone dbClone
return ok(dbClone)

db.txRef.copyCat()


proc exec*(
tx: AristoTxRef;
action: AristoDbAction;
): Result[void,AristoError]
{.gcsafe, raises: [CatchableError].} =
## Execute function argument `action()` on a temporary `tx.copyCat()`
## transaction database. After return, the temporary database gets
## destroyed.
##
let db = block:
let rc = tx.copyCat()
if rc.isErr:
return err(rc.error)
rc.value

db.action()

block:
let rc = db.dispose()
if rc.isErr:
return err(rc.error)
ok()

# ------------------------------------------------------------------------------
Expand Down
9 changes: 8 additions & 1 deletion tests/test_aristo.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import
../nimbus/sync/snap/worker/db/[rocky_bulk_load, snapdb_accounts, snapdb_desc],
./replay/[pp, undump_accounts, undump_storages],
./test_sync_snap/[snap_test_xx, test_accounts, test_types],
./test_aristo/[test_backend, test_helpers, test_transcode, test_tx]
./test_aristo/[
test_backend, test_filter, test_helpers, test_transcode, test_tx]

const
baseDir = [".", "..", ".."/"..", $DirSep]
Expand Down Expand Up @@ -220,6 +221,9 @@ proc accountsRunner(
test &"Delete accounts database, successively {accLst.len} entries":
check noisy.testTxMergeAndDelete(accLst, dbDir)

test &"Distributed backend access {accLst.len} entries":
check noisy.testDistributedAccess(accLst, dbDir)


proc storagesRunner(
noisy = true;
Expand Down Expand Up @@ -253,6 +257,9 @@ proc storagesRunner(
test &"Delete storage database, successively {stoLst.len} entries":
check noisy.testTxMergeAndDelete(stoLst, dbDir)

test &"Distributed backend access {stoLst.len} entries":
check noisy.testDistributedAccess(stoLst, dbDir)

# ------------------------------------------------------------------------------
# Main function(s)
# ------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit ef3cb1b

Please sign in to comment.