Skip to content

Commit

Permalink
Merge pull request #1330 from qstokkink/add_dht_docs
Browse files Browse the repository at this point in the history
Added documentation for the DHT(Discovery)Community
  • Loading branch information
qstokkink authored Nov 26, 2024
2 parents c7b733b + 6e315e7 commit 8957f45
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 23 deletions.
44 changes: 44 additions & 0 deletions doc/further-reading/dht.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
DHT(Discovery)Community
=======================

This document contains a description of how to use the ``DHTCommunity`` class for distributed hash table (DHT) data storage and the ``DHTDiscoveryCommunity`` extension of this functionality, which provides functionality to connect given public keys.

In particular this document will **not** discuss how distributed hash table work, for this we refer the reader to other resources on the Internet.


Storing values and finding keys
-------------------------------

The ``DHTCommunity`` is the main overlay that allows for decentralized key-value storage.
There are two main functions in this overlay: the ``store_value()`` function and the ``find_values()`` function.

When you call ``store_value()``, you choose the globally unique ``key`` that your given value ``data`` is stored under.
You can, but are not required to, sign this new stored value with your public key to provide it with authenticity.
Note that this function may lead to a ``ipv8.dht.DHTError``: in this case, you will have to try again later.
An example of a call that stores the signed value ``b"my value"`` under the key ``b"my key"``, is the following.

.. literalinclude:: dht_1.py
:lines: 43-47

The value can later be retrieved from the network by calling ``find_values()`` with the key that the information was stored under.
The following snippet retrieves the value that was stored in the previous snippet, under the ``b"my key"`` key.

.. literalinclude:: dht_1.py
:lines: 49-55

Note that multiple peers may respond with answers and if (a) the orginal value is not signed or (b) multiple values are published under the same key, the reported values may be different.
In this example, only one value is published and it is signed so only a single value is ever returned.

Finding peers
-------------

The ``DHTDiscoveryCommunity`` allows for peers to be found by their public key.
You can search for public keys by their SHA-1 hash (conveniently available as ``Peer.mid``).
To do so, you can call ``connect_peer()`` with the hash/mid as shown in the following example.


.. literalinclude:: dht_1.py
:lines: 58-65

Note that you may need a few attempts to find the peer you are looking for.
Of course, if the peer you are looking for is not online, you may be waiting forever.
67 changes: 67 additions & 0 deletions doc/further-reading/dht_1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from asyncio import run, sleep
from itertools import combinations
from typing import cast

from ipv8.configuration import ConfigBuilder
from ipv8.dht import DHTError
from ipv8.dht.community import DHTCommunity
from ipv8.dht.discovery import DHTDiscoveryCommunity
from ipv8.peer import Peer
from ipv8_service import IPv8


async def main() -> None:
instances = []

# Put some peers in the network
for _ in range(10):
config = ConfigBuilder().clear_keys()
config.config["overlays"] = [o for o in config.config["overlays"] if o["class"] == "DHTDiscoveryCommunity"]
config.add_ephemeral_key("anonymous id")
config.set_address("127.0.0.1") # We don't want this test to connect to the actual network!
ipv8 = IPv8(config.finalize())
instances.append(ipv8)
await ipv8.start()

# Supercharge introductions, normally this takes longer
for id1, id2 in combinations(range(10), 2):
overlay1 = instances[id1].get_overlay(DHTCommunity)
overlay2 = instances[id2].get_overlay(DHTCommunity)
peer1 = Peer(overlay2.my_peer.public_key.key_to_bin(), ("127.0.0.1", overlay2.my_estimated_lan[1]))
peer1.address_frozen = True
peer2 = Peer(overlay1.my_peer.public_key.key_to_bin(), ("127.0.0.1", overlay1.my_estimated_lan[1]))
peer2.address_frozen = True
overlay1.network.add_verified_peer(peer2)
overlay1.get_requesting_node(peer2)
overlay2.network.add_verified_peer(peer1)
overlay2.get_requesting_node(peer1)
for i in range(10):
await instances[i].get_overlay(DHTDiscoveryCommunity).store_peer()
instances[i].get_overlay(DHTDiscoveryCommunity).ping_all()

dht_community = cast(DHTCommunity, instances[0].get_overlay(DHTCommunity))
try:
await dht_community.store_value(b"my key", b"my value", True)
print(dht_community.my_peer.public_key.key_to_bin(), "published b'my value' under b'my key'!")
except DHTError as e:
print("Failed to store my value under my key!", e)

try:
results = await dht_community.find_values(b"my key")
print(f"We got results from {len(results)} peers!")
for value, signer_key in results:
print(f"The value {value} was found, signed by {signer_key}")
except DHTError as e:
print("Failed to find key!", e)

dht_discovery_community = cast(DHTDiscoveryCommunity, instances[7].get_overlay(DHTDiscoveryCommunity))
some_peer_mid = instances[2].keys["anonymous id"].mid
while True:
try:
await sleep(0.5)
await dht_discovery_community.connect_peer(some_peer_mid)
break
except DHTError as e:
print("Failed to connect to peer!", e)

run(main())
8 changes: 1 addition & 7 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,7 @@ Table of contents
further-reading/advanced_identity.rst
further-reading/advanced_peer_discovery.rst
further-reading/anonymization.rst

.. toctree::
:maxdepth: 2
:caption: Deprecated/Archive:

deprecated/attestation_prototype.rst
deprecated/attestation_tutorial.rst
further-reading/dht.rst


Search
Expand Down
58 changes: 42 additions & 16 deletions ipv8/dht/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import defaultdict, deque
from collections.abc import Coroutine, Iterator, Sequence
from itertools import zip_longest
from typing import TYPE_CHECKING, Any, Optional, cast
from typing import TYPE_CHECKING, Any, Literal, Optional, TypeVar, cast, overload

from ..community import Community, CommunitySettings
from ..lazy_community import lazy_wrapper, lazy_wrapper_wd
Expand Down Expand Up @@ -83,6 +83,19 @@ async def gather_without_errors(*futures: Future) -> list:
return [r for r in results if not isinstance(r, Exception)]


FindResultType = TypeVar("FindResultType", Node, DHTValue)


def merge_results(results: tuple[list[FindResultType], ...]) -> tuple[FindResultType, ...]:
"""
Merge the results from a tuple of lists into a flat tuple.
"""
out: list[FindResultType] = []
for result in results:
out.extend(result)
return tuple(out)


class Request(RandomNumberCache):
"""
This request cache keeps track of all outstanding requests within the DHTCommunity.
Expand Down Expand Up @@ -624,36 +637,49 @@ def post_process_values(self, values: list[bytes]) -> list[DHTValue]:
# Unsigned data
return [*results, *((data[1], None) for data in unpacked[None])]

@overload
async def find(self, target: bytes, force_nodes: bool, offset: int,
debug: Literal[False]) -> tuple[DHTValue, ...] | tuple[Node, ...]:
...

@overload
async def find(self, target: bytes, force_nodes: bool, offset: int,
debug: Literal[True]) -> tuple[tuple[DHTValue, ...], list[Crawl]]:
...

@overload
async def find(self, target: bytes, force_nodes: bool, offset: int,
debug: bool) -> tuple[DHTValue, ...] | tuple[Node, ...] | tuple[tuple[DHTValue, ...], list[Crawl]]:
...

async def find(self, target: bytes, force_nodes: bool, offset: int,
debug: bool) -> Sequence[DHTValue] | \
tuple[Sequence[DHTValue], list[Crawl]] | \
Sequence[Node]:
debug: bool) -> tuple[DHTValue, ...] | tuple[Node, ...] | tuple[tuple[DHTValue, ...], list[Crawl]]:
"""
Get the values belonging to the given target key.
"""
futures: list[Coroutine[Any, Any, list[Node] | \
list[DHTValue] | \
futures: list[Coroutine[Any, Any, list[Node] |
list[DHTValue] |
tuple[list[DHTValue], Crawl]]] = []
for routing_table in self.routing_tables.values():
crawl = Crawl(target, routing_table, force_nodes=force_nodes, offset=offset)
futures.append(self._find(crawl, debug=debug))
results: list[list[Any] |
list[DHTValue] |
tuple[list[DHTValue], Crawl]] = await gather(*futures)
results = await gather(*futures)

if debug:
results_debug = cast(list[tuple[list[DHTValue], Crawl]], results)
return tuple(*[r[0] for r in results]), [r[1] for r in results_debug]
return tuple(*results)
results_debug = cast(tuple[tuple[list[DHTValue], Crawl], ...], results)
return tuple(*[r[0] for r in results_debug]), cast(list[Crawl], [r[1] for r in results_debug])
# ``results`` is of type ``tuple[list[Node] | list[DHTValue], ...]``
# However, mypy 1.13.0 is not yet powerful enough to infer the argument type from this.
return merge_results(results) # type: ignore[arg-type]

async def find_values(self, target: bytes, offset: int = 0,
debug: bool = False) -> Sequence[DHTValue] | tuple[Sequence[DHTValue], list[Crawl]]:
debug: bool = False) -> (tuple[DHTValue, ...]
| tuple[Node, ...]
| tuple[tuple[DHTValue, ...], list[Crawl]]):
"""
Find the values belonging to the target key.
"""
values = await self.find(target, False, offset, debug)
return (cast(tuple[Sequence[tuple[bytes, Optional[bytes]]], list[Crawl]], values) if debug
else cast(Sequence[tuple[bytes, Optional[bytes]]], values))
return await self.find(target, False, offset, debug)

async def find_nodes(self, target: bytes, debug: bool = False) -> Sequence[Node]:
"""
Expand Down

0 comments on commit 8957f45

Please sign in to comment.