Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 6, 2023
1 parent 5d11616 commit b9d84a9
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 56 deletions.
6 changes: 6 additions & 0 deletions internal/coord/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,9 @@ type EventRoutingPoll struct{}

func (*EventRoutingPoll) behaviourEvent() {}
func (*EventRoutingPoll) routingCommand() {}

type EventStartCrawl struct {
Seed []kadt.PeerID
}

func (*EventStartCrawl) behaviourEvent() {}
93 changes: 85 additions & 8 deletions internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ type RoutingBehaviour struct {
explore coordt.StateMachine[routing.ExploreEvent, routing.ExploreState]

// crawl is the state machine that can crawl the network from a set of seed nodes
crawl coordt.StateMachine[routing.ExploreEvent, routing.ExploreState]
crawl coordt.StateMachine[routing.CrawlEvent, routing.CrawlState]

pendingMu sync.Mutex
pending []BehaviourEvent
Expand Down Expand Up @@ -374,9 +374,14 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key,
return nil, fmt.Errorf("explore: %w", err)
}

// crawl, err := routing.NewCrawl(self)
crawlCfg := routing.DefaultCrawlConfig()

return ComposeRoutingBehaviour(self, bootstrap, include, probe, explore, cfg)
crawl, err := routing.NewCrawl(self, cplutil.GenRandPeerID, crawlCfg)
if err != nil {
return nil, fmt.Errorf("crawl: %w", err)
}

return ComposeRoutingBehaviour(self, bootstrap, include, probe, explore, crawl, cfg)
}

// ComposeRoutingBehaviour creates a [RoutingBehaviour] composed of the supplied state machines.
Expand All @@ -387,6 +392,7 @@ func ComposeRoutingBehaviour(
include coordt.StateMachine[routing.IncludeEvent, routing.IncludeState],
probe coordt.StateMachine[routing.ProbeEvent, routing.ProbeState],
explore coordt.StateMachine[routing.ExploreEvent, routing.ExploreState],
crawl coordt.StateMachine[routing.CrawlEvent, routing.CrawlState],
cfg *RoutingConfig,
) (*RoutingBehaviour, error) {
if cfg == nil {
Expand All @@ -402,6 +408,7 @@ func ComposeRoutingBehaviour(
include: include,
probe: probe,
explore: explore,
crawl: crawl,
ready: make(chan struct{}, 1),
}
return r, nil
Expand All @@ -418,12 +425,11 @@ func (r *RoutingBehaviour) Notify(ctx context.Context, ev BehaviourEvent) {

// notify must only be called while r.pendingMu is held
func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.notify", trace.WithAttributes(attribute.String("event", fmt.Sprintf("%T", ev))))
ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.notify", trace.WithAttributes(tele.AttrInEvent(ev)))
defer span.End()

switch ev := ev.(type) {
case *EventStartBootstrap:
span.SetAttributes(attribute.String("event", "EventStartBootstrap"))
cmd := &routing.EventBootstrapStart[kadt.Key, kadt.PeerID]{
KnownClosestNodes: ev.SeedNodes,
}
Expand All @@ -433,8 +439,17 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
r.pending = append(r.pending, next)
}

case *EventStartCrawl:
cmd := &routing.EventCrawlStart[kadt.Key, kadt.PeerID]{
Seed: ev.Seed,
}
// attempt to advance the bootstrap
next, ok := r.advanceCrawl(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

case *EventAddNode:
span.SetAttributes(attribute.String("event", "EventAddAddrInfo"))
// Ignore self
if r.self.Equal(ev.NodeID) {
break
Expand All @@ -450,7 +465,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
}

case *EventRoutingUpdated:
span.SetAttributes(attribute.String("event", "EventRoutingUpdated"), attribute.String("nodeid", ev.NodeID.String()))
span.SetAttributes(attribute.String("nodeid", ev.NodeID.String()))
cmd := &routing.EventProbeAdd[kadt.Key, kadt.PeerID]{
NodeID: ev.NodeID,
}
Expand Down Expand Up @@ -533,9 +548,23 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
r.pending = append(r.pending, next)
}

case routing.CrawlQueryID:
cmd := &routing.EventCrawlNodeResponse[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Target: ev.Target,
CloserNodes: ev.CloserNodes,
}

// attempt to advance the crawl
next, ok := r.advanceCrawl(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
}

case *EventGetCloserNodesFailure:
span.SetAttributes(attribute.String("event", "EventGetCloserNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String()))
span.RecordError(ev.Err)
Expand Down Expand Up @@ -580,10 +609,22 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
if ok {
r.pending = append(r.pending, next)
}
case routing.CrawlQueryID:
cmd := &routing.EventCrawlNodeFailure[kadt.Key, kadt.PeerID]{
NodeID: ev.To,
Target: ev.Target,
Error: ev.Err,
}
// attempt to advance the crawl
next, ok := r.advanceCrawl(ctx, cmd)
if ok {
r.pending = append(r.pending, next)
}

default:
panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID))
}

case *EventNotifyConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))
// ignore self
Expand All @@ -609,6 +650,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
if ok {
r.pending = append(r.pending, nextProbe)
}

case *EventNotifyNonConnectivity:
span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String()))

Expand All @@ -620,6 +662,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) {
if ok {
r.pending = append(r.pending, nextProbe)
}

case *EventRoutingPoll:
r.pollChildren(ctx)

Expand Down Expand Up @@ -693,6 +736,11 @@ func (r *RoutingBehaviour) pollChildren(ctx context.Context) {
if ok {
r.pending = append(r.pending, ev)
}

ev, ok = r.advanceCrawl(ctx, &routing.EventCrawlPoll{})
if ok {
r.pending = append(r.pending, ev)
}
}

func (r *RoutingBehaviour) advanceBootstrap(ctx context.Context, ev routing.BootstrapEvent) (BehaviourEvent, bool) {
Expand Down Expand Up @@ -817,9 +865,9 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve
func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.ExploreEvent) (BehaviourEvent, bool) {
ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.advanceExplore")
defer span.End()

bstate := r.explore.Advance(ctx, ev)
switch st := bstate.(type) {

case *routing.StateExploreFindCloser[kadt.Key, kadt.PeerID]:
r.cfg.Logger.Debug("starting explore", slog.Int("cpl", st.Cpl), tele.LogAttrPeerID(st.NodeID))
return &EventOutboundGetCloserNodes{
Expand All @@ -845,3 +893,32 @@ func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.Explor

return nil, false
}

func (r *RoutingBehaviour) advanceCrawl(ctx context.Context, ev routing.CrawlEvent) (BehaviourEvent, bool) {
ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.advanceCrawl")
defer span.End()

cstate := r.crawl.Advance(ctx, ev)
switch st := cstate.(type) {
case *routing.StateCrawlFindCloser[kadt.Key, kadt.PeerID]:
return &EventOutboundGetCloserNodes{
QueryID: routing.CrawlQueryID,
To: st.NodeID,
Target: st.Target,
Notify: r,
}, true

case *routing.StateCrawlWaitingWithCapacity:
// crawl waiting for a message response but has capacity to do more
case *routing.StateCrawlWaitingAtCapacity:
// crawl waiting for a message response but has no capacity to do more
case *routing.StateCrawlFinished:
r.cfg.Logger.Info("crawl finished")
case *routing.StateCrawlIdle:
// bootstrap not running, nothing to do
default:
panic(fmt.Sprintf("unexpected explore state: %T", st))
}

return nil, false
}
52 changes: 16 additions & 36 deletions internal/coord/routing/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
"github.com/plprobelab/zikade/tele"
)

// CrawlQueryID is the id for the query operated by the crawl state machine
const CrawlQueryID = coordt.QueryID("crawl")

// CrawlConfig specifies optional configuration for a Crawl
type CrawlConfig struct {
MaxCPL int // the maximum CPL until we should crawl the peer
Expand Down Expand Up @@ -65,7 +68,6 @@ type Crawl[K kad.Key[K], N kad.NodeID[K]] struct {
}

type crawlInformation[K kad.Key[K], N kad.NodeID[K]] struct {
queryID coordt.QueryID
todo []crawlJob[K, N]
cpls map[string]int
waiting map[string]N
Expand Down Expand Up @@ -109,7 +111,6 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat
span.SetAttributes(attribute.Int("seed", len(tev.Seed)))

ci := &crawlInformation[K, N]{
queryID: tev.QueryID,
todo: []crawlJob[K, N]{},
cpls: map[string]int{},
waiting: map[string]N{},
Expand Down Expand Up @@ -149,9 +150,6 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat

if c.info == nil {
return &StateCrawlIdle{}
} else if c.info.queryID != tev.QueryID {
// if we don't know this query, pretend it was a poll by breaking
break
}

job := crawlJob[K, N]{
Expand Down Expand Up @@ -193,9 +191,6 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat
case *EventCrawlNodeFailure[K, N]:
if c.info == nil {
return &StateCrawlIdle{}
} else if c.info.queryID != tev.QueryID {
// if we don't know this query, pretend it was a poll by breaking
break
}

span.RecordError(tev.Error)
Expand Down Expand Up @@ -224,9 +219,7 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat
}

if len(c.info.waiting) >= c.cfg.MaxCPL*c.cfg.Concurrency {
return &StateCrawlWaitingAtCapacity{
QueryID: c.info.queryID,
}
return &StateCrawlWaitingAtCapacity{}
}

if len(c.info.todo) > 0 {
Expand All @@ -240,16 +233,13 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat
c.info.waiting[mapKey] = job.node

return &StateCrawlFindCloser[K, N]{
QueryID: c.info.queryID,
Target: job.target,
NodeID: job.node,
Target: job.target,
NodeID: job.node,
}
}

if len(c.info.waiting) > 0 {
return &StateCrawlWaitingWithCapacity{
QueryID: c.info.queryID,
}
return &StateCrawlWaitingWithCapacity{}
}

c.info = nil
Expand Down Expand Up @@ -291,21 +281,14 @@ type CrawlState interface {

type StateCrawlIdle struct{}

type StateCrawlFinished struct {
QueryID coordt.QueryID
}
type StateCrawlFinished struct{}

type StateCrawlWaitingAtCapacity struct {
QueryID coordt.QueryID
}
type StateCrawlWaitingWithCapacity struct {
QueryID coordt.QueryID
}
type StateCrawlWaitingAtCapacity struct{}
type StateCrawlWaitingWithCapacity struct{}

type StateCrawlFindCloser[K kad.Key[K], N kad.NodeID[K]] struct {
QueryID coordt.QueryID
Target K // the key that the query wants to find closer nodes for
NodeID N // the node to send the message to
Target K // the key that the query wants to find closer nodes for
NodeID N // the node to send the message to
}

// crawlState() ensures that only [Crawl] states can be assigned to a CrawlState.
Expand All @@ -325,22 +308,19 @@ type EventCrawlPoll struct{}
// type EventCrawlCancel struct{} // TODO: implement

type EventCrawlStart[K kad.Key[K], N kad.NodeID[K]] struct {
QueryID coordt.QueryID
Seed []N
Seed []N
}

type EventCrawlNodeResponse[K kad.Key[K], N kad.NodeID[K]] struct {
QueryID coordt.QueryID
NodeID N // the node the message was sent to
Target K // the key that the node was asked for
CloserNodes []N // the closer nodes sent by the node
}

type EventCrawlNodeFailure[K kad.Key[K], N kad.NodeID[K]] struct {
QueryID coordt.QueryID
NodeID N // the node the message was sent to
Target K // the key that the node was asked for
Error error // the error that caused the failure, if any
NodeID N // the node the message was sent to
Target K // the key that the node was asked for
Error error // the error that caused the failure, if any
}

// crawlEvent() ensures that only events accepted by [Crawl] can be assigned to a [CrawlEvent].
Expand Down
4 changes: 2 additions & 2 deletions internal/coord/routing/crawl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestCrawl_Advance_unrelated_response(t *testing.T) {

// send it an unrelated response
state = qry.Advance(ctx, &EventCrawlNodeResponse[tiny.Key, tiny.Node]{
QueryID: coordt.QueryID("another"),
QueryID: "another",
NodeID: tstate.NodeID,
Target: tstate.Target,
CloserNodes: []tiny.Node{},
Expand All @@ -314,7 +314,7 @@ func TestCrawl_Advance_unrelated_response(t *testing.T) {

// send it an unrelated response
state = qry.Advance(ctx, &EventCrawlNodeFailure[tiny.Key, tiny.Node]{
QueryID: coordt.QueryID("another"),
QueryID: "another",
NodeID: tstate.NodeID,
Target: tstate.Target,
Error: fmt.Errorf("some error"),
Expand Down
Loading

0 comments on commit b9d84a9

Please sign in to comment.