Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 12, 2023
1 parent 5789eb4 commit 6a24dd1
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 33 deletions.
23 changes: 23 additions & 0 deletions fullrt.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package zikade
import (
"context"
"fmt"
"time"

"github.com/ipfs/go-cid"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
Expand All @@ -20,6 +22,27 @@ import (

type FullRT struct {
*DHT

cfg *FullRTConfig
}

type FullRTConfig struct {
*Config
CrawlInterval time.Duration
}

func NewFullRT(h host.Host, cfg *FullRTConfig) (*FullRT, error) {
d, err := New(h, cfg.Config)
if err != nil {
return nil, fmt.Errorf("new DHT: %w", err)
}

frt := &FullRT{
DHT: d,
cfg: cfg,
}

return frt, nil
}

var _ routing.Routing = (*FullRT)(nil)
Expand Down
3 changes: 2 additions & 1 deletion internal/coord/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (*EventStopQuery) queryCommand() {}

// EventAddNode notifies the routing behaviour of a potential new peer.
type EventAddNode struct {
NodeID kadt.PeerID
NodeID kadt.PeerID
Checked bool // indicates whether this node has already passed a connectivity check and should be added to the routing table right away
}

func (*EventAddNode) behaviourEvent() {}
Expand Down
30 changes: 22 additions & 8 deletions internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
cmd := &routing.EventBootstrapStart[kadt.Key, kadt.PeerID]{
KnownClosestNodes: ev.SeedNodes,
}
// attempt to advance the bootstrap
// attempt to advance the bootstrap state machine
next, ok := r.advanceBootstrap(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
Expand All @@ -443,7 +443,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
cmd := &routing.EventCrawlStart[kadt.Key, kadt.PeerID]{
Seed: ev.Seed,
}
// attempt to advance the bootstrap
// attempt to advance the crawl state machine
next, ok := r.advanceCrawl(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
Expand All @@ -454,11 +454,20 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
if r.self.Equal(ev.NodeID) {
break
}
// TODO: apply ttl
cmd := &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,

var cmd routing.IncludeEvent
if ev.Checked {
cmd = &routing.EventIncludeNode[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
}
} else {
// TODO: apply ttl
cmd = &routing.EventIncludeAddCandidate[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
}
}
// attempt to advance the include

// attempt to advance the include state machine
next, ok := r.advanceInclude(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
Expand Down Expand Up @@ -549,6 +558,11 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
}

case routing.CrawlQueryID:
r.pending = append(r.pending, &EventAddNode{
NodeID: ev.To,
Checked: true,
})

cmd := &routing.EventCrawlNodeResponse[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Target: ev.Target,
Expand Down Expand Up @@ -642,7 +656,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
r.pending = append(r.pending, next)
}

// tell the probe state machine in case there is are connectivity checks that could satisfied
// tell the probe state machine in case there are connectivity checks that could be satisfied
cmdProbe := &routing.EventProbeNotifyConnectivity[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
}
Expand Down Expand Up @@ -912,7 +926,7 @@ func (r *RoutingBehaviour) advanceCrawl(ctx context.Context, ev routing.CrawlEve
// crawl waiting for a message response but has capacity to do more
case *routing.StateCrawlWaitingAtCapacity:
// crawl waiting for a message response but has no capacity to do more
case *routing.StateCrawlFinished:
case *routing.StateCrawlFinished[kadt.Key, kadt.PeerID]:
r.cfg.Logger.Info("crawl finished")
case *routing.StateCrawlIdle:
// bootstrap not running, nothing to do
Expand Down
35 changes: 29 additions & 6 deletions internal/coord/routing/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package routing
import (
"context"
"fmt"
"time"

"github.com/plprobelab/go-libdht/kad"
"github.com/plprobelab/go-libdht/kad/key"
Expand All @@ -19,9 +20,10 @@ const CrawlQueryID = coordt.QueryID("crawl")

// CrawlConfig specifies optional configuration for a Crawl
type CrawlConfig struct {
MaxCPL int // the maximum CPL until we should crawl the peer
Concurrency int // the maximum number of concurrent peers that we may query
Tracer trace.Tracer // Tracer is the tracer that should be used to trace execution.
MaxCPL int // the maximum CPL until we should crawl the peer
Interval time.Duration // the interval in which the network should be crawled (0 means no crawling)
Concurrency int // the maximum number of concurrent peers that we may query
Tracer trace.Tracer // Tracer is the tracer that should be used to trace execution.
}

// Validate checks the configuration options and returns an error if any have invalid values.
Expand All @@ -33,6 +35,13 @@ func (cfg *CrawlConfig) Validate() error {
}
}

if cfg.Interval < 0 {
return &errs.ConfigurationError{
Component: "CrawlConfig",
Err: fmt.Errorf("crawl interval must be zero or positive"),
}
}

if cfg.Concurrency < 1 {
return &errs.ConfigurationError{
Component: "CrawlConfig",
Expand Down Expand Up @@ -242,8 +251,20 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat
return &StateCrawlWaitingWithCapacity{}
}

// generate list of new nodes for the routing table
nodes := make([]N, len(c.info.success))
i := 0
for _, node := range c.info.success {
nodes[i] = node
i += 1
}

// clear info to indicate that we're idle
c.info = nil
return &StateCrawlFinished{}

return &StateCrawlFinished[K, N]{
Nodes: nodes,
}
}

func (c *Crawl[K, N]) setMapSizes(span trace.Span, prefix string) {
Expand Down Expand Up @@ -281,7 +302,9 @@ type CrawlState interface {

type StateCrawlIdle struct{}

type StateCrawlFinished struct{}
type StateCrawlFinished[K kad.Key[K], N kad.NodeID[K]] struct {
Nodes []N
}

type (
StateCrawlWaitingAtCapacity struct{}
Expand All @@ -294,7 +317,7 @@ type StateCrawlFindCloser[K kad.Key[K], N kad.NodeID[K]] struct {
}

// crawlState() ensures that only [Crawl] states can be assigned to a CrawlState.
func (*StateCrawlFinished) crawlState() {}
func (*StateCrawlFinished[K, N]) crawlState() {}
func (*StateCrawlFindCloser[K, N]) crawlState() {}
func (*StateCrawlWaitingAtCapacity) crawlState() {}
func (*StateCrawlWaitingWithCapacity) crawlState() {}
Expand Down
21 changes: 18 additions & 3 deletions internal/coord/routing/crawl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ func TestCrawlConfig_Validate(t *testing.T) {
require.NoError(t, cfg.Validate())
})

t.Run("crawl interval must be 0 or positive", func(t *testing.T) {
cfg := DefaultCrawlConfig()
cfg.Interval = 0
require.NoError(t, cfg.Validate())

cfg = DefaultCrawlConfig()
cfg.Interval = -1
require.Error(t, cfg.Validate())
})

t.Run("tracer is not nil", func(t *testing.T) {
cfg := DefaultCrawlConfig()
cfg.Tracer = nil
Expand Down Expand Up @@ -83,7 +93,7 @@ func TestNewCrawl_Start(t *testing.T) {
Seed: []tiny.Node{self},
})
require.Nil(t, qry.info)
require.IsType(t, &StateCrawlFinished{}, state)
require.IsType(t, &StateCrawlFinished[tiny.Key, tiny.Node]{}, state)
})

t.Run("handles duplicate starts (does not panic)", func(t *testing.T) {
Expand Down Expand Up @@ -254,8 +264,9 @@ func TestCrawl_Advance(t *testing.T) {
continue
}

if _, ok = state.(*StateCrawlFinished); ok {
if tstate, ok := state.(*StateCrawlFinished[tiny.Key, tiny.Node]); ok {
require.Nil(t, qry.info)
require.Len(t, tstate.Nodes, 11)
break
}
}
Expand Down Expand Up @@ -307,6 +318,10 @@ func TestCrawl_Advance_unrelated_response(t *testing.T) {
Target: tstate.Target,
CloserNodes: []tiny.Node{},
})
require.IsType(t, &StateCrawlFinished{}, state)
fstate, ok := state.(*StateCrawlFinished[tiny.Key, tiny.Node])
require.True(t, ok, "type is %T", state)

require.Nil(t, qry.info)
require.Len(t, fstate.Nodes, 1)
require.Equal(t, tstate.NodeID, fstate.Nodes[0])
}
62 changes: 47 additions & 15 deletions internal/coord/routing/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl
}
in.candidates.Enqueue(ctx, tev.NodeID)

case *EventIncludeNode[K, N]:
delete(in.checks, key.HexString(tev.NodeID.Key()))
if in.rt.AddNode(tev.NodeID) {
return &StateIncludeRoutingUpdated[K, N]{
NodeID: tev.NodeID,
}
}

// no need to remove the node from the candidate queue because we
// will enqueue as many nodes from the queue until we find one that
// is not yet included in the routing table.

case *EventIncludeConnectivityCheckSuccess[K, N]:
in.counterChecksPassed.Add(ctx, 1)
ch, ok := in.checks[key.HexString(tev.NodeID.Key())]
Expand Down Expand Up @@ -252,25 +264,37 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl
return &StateIncludeWaitingAtCapacity{}
}

candidate, ok := in.candidates.Dequeue(ctx)
if !ok {
// No candidate in queue
if len(in.checks) > 0 {
return &StateIncludeWaitingWithCapacity{}
// dequeue multiple candidates and check if they are already in the routing
// table. If they are, we won't start a check for them. This can happen
// we have added them directly to the routing table via [EventIncludeNode].
for {
candidate, ok := in.candidates.Dequeue(ctx)
if !ok {
break
}
return &StateIncludeIdle{}
}

in.checks[key.HexString(candidate.Key())] = check[K, N]{
NodeID: candidate,
Started: in.cfg.Clock.Now(),
if _, exists := in.rt.GetNode(candidate.Key()); exists {
continue
}

in.checks[key.HexString(candidate.Key())] = check[K, N]{
NodeID: candidate,
Started: in.cfg.Clock.Now(),
}

// Ask the node to find itself
in.counterChecksSent.Add(ctx, 1)
return &StateIncludeConnectivityCheck[K, N]{
NodeID: candidate,
}
}

// Ask the node to find itself
in.counterChecksSent.Add(ctx, 1)
return &StateIncludeConnectivityCheck[K, N]{
NodeID: candidate,
// No candidate in queue
if len(in.checks) > 0 {
return &StateIncludeWaitingWithCapacity{}
}

return &StateIncludeIdle{}
}

// nodeQueue is a bounded queue of unique NodeIDs
Expand Down Expand Up @@ -304,7 +328,7 @@ func (q *nodeQueue[K, N]) Enqueue(ctx context.Context, id N) bool {
return true
}

// Dequeue reads an node from the queue. It returns the node and a true value
// Dequeue reads a node from the queue. It returns the node and a true value
// if a node was read or nil and false if no node was read.
func (q *nodeQueue[K, N]) Dequeue(ctx context.Context) (N, bool) {
if len(q.nodes) == 0 {
Expand Down Expand Up @@ -379,6 +403,13 @@ type EventIncludeAddCandidate[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N // the candidate node
}

// EventIncludeNode notifies an [Include] that a node should be added to the
// routing table straight away. This means this node will skip the candidate
// queue and potential checks.
type EventIncludeNode[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N // the node to be added
}

// EventIncludeConnectivityCheckSuccess notifies an [Include] that a requested connectivity check has received a successful response.
type EventIncludeConnectivityCheckSuccess[K kad.Key[K], N kad.NodeID[K]] struct {
NodeID N // the node the message was sent to
Expand All @@ -393,5 +424,6 @@ type EventIncludeConnectivityCheckFailure[K kad.Key[K], N kad.NodeID[K]] struct
// includeEvent() ensures that only events accepted by an [Include] can be assigned to the [IncludeEvent] interface.
func (*EventIncludePoll) includeEvent() {}
func (*EventIncludeAddCandidate[K, N]) includeEvent() {}
func (*EventIncludeNode[K, N]) includeEvent() {}
func (*EventIncludeConnectivityCheckSuccess[K, N]) includeEvent() {}
func (*EventIncludeConnectivityCheckFailure[K, N]) includeEvent() {}
Loading

0 comments on commit 6a24dd1

Please sign in to comment.