Skip to content

Commit

Permalink
Add utpTransfer bool to recursiveFindContent JSON-RPC result (#1710)
Browse files Browse the repository at this point in the history
Rename portal protocol ContentInfo to ContentKV in the
process, due to duplicate types and simply better name
for this object.
  • Loading branch information
kdeme authored Aug 24, 2023
1 parent 820525d commit 7aead61
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 66 deletions.
32 changes: 20 additions & 12 deletions fluffy/network/network_seed.nim
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,23 @@ proc depthContentPropagate*(
seenOnly = true
)

proc worker(p: PortalProtocol, db: SeedDb, node: Node, radius: UInt256): Future[void] {.async.} =
proc worker(
p: PortalProtocol, db: SeedDb, node: Node, radius: UInt256):
Future[void] {.async.} =
var offset = 0
while true:
let content = db.getContentInRange(node.id, radius, batchSize, offset)

if len(content) == 0:
break

var contentInfo: seq[ContentInfo]
var contentKV: seq[ContentKV]
for e in content:
let info = ContentInfo(contentKey: ByteList.init(e.contentKey), content: e.content)
contentInfo.add(info)
let info = ContentKV(
contentKey: ByteList.init(e.contentKey), content: e.content)
contentKV.add(info)

let offerResult = await p.offer(node, contentInfo)
let offerResult = await p.offer(node, contentKV)

if offerResult.isErr() or len(content) < batchSize:
# peer failed or we reached end of database stop offering more content
Expand All @@ -89,7 +92,8 @@ proc depthContentPropagate*(

var offset = 0
while true:
let content = db.getContentInRange(p.localNode.id, p.dataRadius, localBatchSize, offset)
let content = db.getContentInRange(
p.localNode.id, p.dataRadius, localBatchSize, offset)

if len(content) == 0:
break
Expand Down Expand Up @@ -127,7 +131,8 @@ proc depthContentPropagate*(

return ok()

func contentDataToKeys(contentData: seq[ContentDataDist]): (ContentKeysList, seq[seq[byte]]) =
func contentDataToKeys(
contentData: seq[ContentDataDist]): (ContentKeysList, seq[seq[byte]]) =
var contentKeys: seq[ByteList]
var content: seq[seq[byte]]
for cd in contentData:
Expand Down Expand Up @@ -176,7 +181,8 @@ proc breadthContentPropagate*(
while true:
# Setting radius to `UInt256.high` and using batchSize and offset, means
# we will iterate over whole database in batches of `maxItemsPerOffer` items
var contentData = db.getContentInRange(target, UInt256.high, batchSize, offset)
var contentData = db.getContentInRange(
target, UInt256.high, batchSize, offset)

if len(contentData) == 0:
break
Expand Down Expand Up @@ -237,16 +243,17 @@ proc offerContentInNodeRange*(
let
db = SeedDb.new(path = dbPath, name = dbName)
(node, radius) = maybeNodeAndRadius.unsafeGet()
content = db.getContentInRange(node.id, radius, int64(numberToToOffer), int64(starting))
content = db.getContentInRange(
node.id, radius, int64(numberToToOffer), int64(starting))

# We got all we wanted from seed_db, it can be closed now.
db.close()

var ci: seq[ContentInfo]
var ci: seq[ContentKV]

for cont in content:
let k = ByteList.init(cont.contentKey)
let info = ContentInfo(contentKey: k, content: cont.content)
let info = ContentKV(contentKey: k, content: cont.content)
ci.add(info)

# waiting for offer result, by the end of this call remote node should
Expand Down Expand Up @@ -274,7 +281,8 @@ proc storeContentInNodeRange*(
localRadius = p.dataRadius
db = SeedDb.new(path = dbPath, name = dbName)
localId = p.localNode.id
contentInRange = db.getContentInRange(localId, localRadius, int64(max), int64(starting))
contentInRange = db.getContentInRange(
localId, localRadius, int64(max), int64(starting))

db.close()

Expand Down
30 changes: 17 additions & 13 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type

RadiusCache* = LRUCache[NodeId, UInt256]

ContentInfo* = object
ContentKV* = object
contentKey*: ByteList
content*: seq[byte]

Expand All @@ -149,7 +149,7 @@ type
dst: Node
case kind: OfferRequestType
of Direct:
contentList: List[ContentInfo, contentKeysLimit]
contentList: List[ContentKV, contentKeysLimit]
of Database:
contentKeys: ContentKeysList

Expand Down Expand Up @@ -188,25 +188,28 @@ type

ContentLookupResult* = object
content*: seq[byte]
utpTransfer*: bool
# List of nodes which do not have requested content, and for which
# content is in their range
nodesInterestedInContent*: seq[Node]

proc init*(
T: type ContentInfo,
T: type ContentKV,
contentKey: ByteList,
content: seq[byte]): T =
ContentInfo(
ContentKV(
contentKey: contentKey,
content: content
)

proc init*(
T: type ContentLookupResult,
content: seq[byte],
utpTransfer: bool,
nodesInterestedInContent: seq[Node]): T =
ContentLookupResult(
content: content,
utpTransfer: utpTransfer,
nodesInterestedInContent: nodesInterestedInContent
)

Expand Down Expand Up @@ -832,12 +835,12 @@ proc offer*(p: PortalProtocol, dst: Node, contentKeys: ContentKeysList):
let req = OfferRequest(dst: dst, kind: Database, contentKeys: contentKeys)
return await p.offer(req)

proc offer*(p: PortalProtocol, dst: Node, content: seq[ContentInfo]):
proc offer*(p: PortalProtocol, dst: Node, content: seq[ContentKV]):
Future[PortalResult[ContentKeysBitList]] {.async.} =
if len(content) > contentKeysLimit:
return err("Cannot offer more than 64 content items")

let contentList = List[ContentInfo, contentKeysLimit].init(content)
let contentList = List[ContentKV, contentKeysLimit].init(content)
let req = OfferRequest(dst: dst, kind: Direct, contentList: contentList)
return await p.offer(req)

Expand Down Expand Up @@ -939,8 +942,8 @@ proc triggerPoke*(
if not p.offerQueue.full():
try:
let
ci = ContentInfo(contentKey: contentKey, content: content)
list = List[ContentInfo, contentKeysLimit].init(@[ci])
contentKV = ContentKV(contentKey: contentKey, content: content)
list = List[ContentKV, contentKeysLimit].init(@[contentKV])
req = OfferRequest(dst: node, kind: Direct, contentList: list)
p.offerQueue.putNoWait(req)
except AsyncQueueFullError as e:
Expand Down Expand Up @@ -1034,7 +1037,8 @@ proc contentLookup*(p: PortalProtocol, target: ByteList, targetId: UInt256):
for f in pendingQueries:
f.cancel()
portal_lookup_content_requests.observe(requestAmount)
return Opt.some(ContentLookupResult.init(content.content, nodesWithoutContent))
return Opt.some(ContentLookupResult.init(
content.content, content.utpTransfer, nodesWithoutContent))
else:
# TODO: Should we do something with the node that failed responding our
# query?
Expand Down Expand Up @@ -1120,11 +1124,11 @@ proc neighborhoodGossip*(
if content.len() == 0:
return 0

var contentList = List[ContentInfo, contentKeysLimit].init(@[])
var contentList = List[ContentKV, contentKeysLimit].init(@[])
for i, contentItem in content:
let contentInfo =
ContentInfo(contentKey: contentKeys[i], content: contentItem)
discard contentList.add(contentInfo)
let contentKV =
ContentKV(contentKey: contentKeys[i], content: contentItem)
discard contentList.add(contentKV)

# Just taking the first content item as target id.
# TODO: come up with something better?
Expand Down
22 changes: 13 additions & 9 deletions fluffy/rpc/rpc_portal_api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ export rpcserver
# Portal Network JSON-RPC impelentation as per specification:
# https://github.com/ethereum/portal-network-specs/tree/master/jsonrpc

type
ContentInfo = object
content: string
utpTransfer: bool

# Note:
# Using a string for the network parameter will give an error in the rpc macro:
# Error: Invalid node kind nnkInfix for macros.`$`
Expand Down Expand Up @@ -114,10 +119,6 @@ proc installPortalApiHandlers*(

rpcServer.rpc("portal_" & network & "FindContent") do(
enr: Record, contentKey: string) -> JsonNode:
type ContentInfo = object
content: string
utpTransfer: bool

let
node = toNodeWithAddress(enr)
foundContentResult = await p.findContent(
Expand All @@ -144,8 +145,8 @@ proc installPortalApiHandlers*(
node = toNodeWithAddress(enr)
key = hexToSeqByte(contentKey)
content = hexToSeqByte(contentValue)
contentInfo = ContentInfo(contentKey: ByteList.init(key), content: content)
res = await p.offer(node, @[contentInfo])
contentKV = ContentKV(contentKey: ByteList.init(key), content: content)
res = await p.offer(node, @[contentKV])

if res.isOk():
return SSZ.encode(res.get()).to0xHex()
Expand All @@ -158,16 +159,19 @@ proc installPortalApiHandlers*(
return discovered.map(proc(n: Node): Record = n.record)

rpcServer.rpc("portal_" & network & "RecursiveFindContent") do(
contentKey: string) -> string:
contentKey: string) -> ContentInfo:
let
key = ByteList.init(hexToSeqByte(contentKey))
contentId = p.toContentId(key).valueOr:
raise newException(ValueError, "Invalid content key")

contentResult = (await p.contentLookup(key, contentId)).valueOr:
return "0x"
return ContentInfo(content: "0x", utpTransfer: false)

return contentResult.content.to0xHex()
return ContentInfo(
content: contentResult.content.to0xHex(),
utpTransfer: contentResult.utpTransfer
)

rpcServer.rpc("portal_" & network & "Store") do(
contentKey: string, contentValue: string) -> bool:
Expand Down
52 changes: 26 additions & 26 deletions fluffy/tests/test_history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ proc createEmptyHeaders(fromNum: int, toNum: int): seq[BlockHeader] =
headers.add(bh)
return headers

proc headersToContentInfo(
headersWithProof: seq[BlockHeaderWithProof]): seq[ContentInfo] =
var contentInfos: seq[ContentInfo]
proc headersToContentKV(
headersWithProof: seq[BlockHeaderWithProof]): seq[ContentKV] =
var contentKVs: seq[ContentKV]
for headerWithProof in headersWithProof:
let
# TODO: Decoding step could be avoided
Expand All @@ -70,10 +70,10 @@ proc headersToContentInfo(
blockKey = BlockKey(blockHash: headerHash)
contentKey = encode(ContentKey(
contentType: blockHeader, blockHeaderKey: blockKey))
contentInfo = ContentInfo(
contentKV = ContentKV(
contentKey: contentKey, content: SSZ.encode(headerWithProof))
contentInfos.add(contentInfo)
return contentInfos
contentKVs.add(contentKV)
return contentKVs

procSuite "History Content Network":
let rng = newRng()
Expand Down Expand Up @@ -198,45 +198,45 @@ procSuite "History Content Network":
check headersWithProof.isOk()

# This is one header more than maxOfferedHistoryContent
let contentInfos = headersToContentInfo(headersWithProof.get())
let contentKVs = headersToContentKV(headersWithProof.get())

# node 1 will offer the content so it needs to have it in its database
for contentInfo in contentInfos:
let id = toContentId(contentInfo.contentKey)
for contentKV in contentKVs:
let id = toContentId(contentKV.contentKey)
historyNode1.portalProtocol.storeContent(
contentInfo.contentKey,
contentKV.contentKey,
id,
contentInfo.content
contentKV.content
)

# Offering 1 content item too much which should result in a discv5 packet
# that is too large and thus not get any response.
block:
let offerResult = await historyNode1.portalProtocol.offer(
historyNode2.localNode(),
contentInfos
contentKVs
)

# Fail due timeout, as remote side must drop the too large discv5 packet
check offerResult.isErr()

for contentInfo in contentInfos:
let id = toContentId(contentInfo.contentKey)
for contentKV in contentKVs:
let id = toContentId(contentKV.contentKey)
check historyNode2.containsId(id) == false

# One content key less should make offer be succesful and should result
# in the content being transferred and stored on the other node.
block:
let offerResult = await historyNode1.portalProtocol.offer(
historyNode2.localNode(),
contentInfos[0..<maxOfferedHistoryContent]
contentKVs[0..<maxOfferedHistoryContent]
)

check offerResult.isOk()

for i, contentInfo in contentInfos:
let id = toContentId(contentInfo.contentKey)
if i < len(contentInfos) - 1:
for i, contentKV in contentKVs:
let id = toContentId(contentKV.contentKey)
if i < len(contentKVs) - 1:
check historyNode2.containsId(id) == true
else:
check historyNode2.containsId(id) == false
Expand Down Expand Up @@ -283,23 +283,23 @@ procSuite "History Content Network":
selectedHeaders, epochAccumulators)
check headersWithProof.isOk()

let contentInfos = headersToContentInfo(headersWithProof.get())
let contentKVs = headersToContentKV(headersWithProof.get())

for contentInfo in contentInfos:
let id = toContentId(contentInfo.contentKey)
for contentKV in contentKVs:
let id = toContentId(contentKV.contentKey)
historyNode1.portalProtocol.storeContent(
contentInfo.contentKey,
contentKV.contentKey,
id,
contentInfo.content
contentKV.content
)

let offerResult = await historyNode1.portalProtocol.offer(
historyNode2.localNode(), @[contentInfo])
historyNode2.localNode(), @[contentKV])

check offerResult.isOk()

for contentInfo in contentInfos:
let id = toContentId(contentInfo.contentKey)
for contentKV in contentKVs:
let id = toContentId(contentKV.contentKey)
check historyNode2.containsId(id) == true

await historyNode1.stop()
Expand Down
12 changes: 6 additions & 6 deletions fluffy/tests/test_portal_wire_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ procSuite "Portal Wire Protocol Tests":

asyncTest "Offer/Accept/Stream":
let (proto1, proto2) = defaultTestSetup(rng)
var content: seq[ContentInfo]
var content: seq[ContentKV]
for i in 0..<contentKeysLimit:
let contentItem = ContentInfo(
let contentKV = ContentKV(
contentKey: ByteList(@[byte i]), content: repeat(byte i, 5000))
content.add(contentItem)
content.add(contentKV)

let res = await proto1.offer(proto2.baseProtocol.localNode, content)

Expand All @@ -181,10 +181,10 @@ procSuite "Portal Wire Protocol Tests":
check contentItems.len() == content.len()

for i, contentItem in contentItems:
let contentInfo = content[i]
let contentKV = content[i]
check:
contentItem == contentInfo.content
contentKeys[i] == contentInfo.contentKey
contentItem == contentKV.content
contentKeys[i] == contentKV.contentKey

await proto1.stopPortalProtocol()
await proto2.stopPortalProtocol()
Expand Down

0 comments on commit 7aead61

Please sign in to comment.