From b9d84a94c68aaafb7acf0dc04b25ce8c2941d92a Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Fri, 6 Oct 2023 17:51:24 +0200 Subject: [PATCH] WIP --- internal/coord/event.go | 6 ++ internal/coord/routing.go | 93 +++++++++++++++++++++++++--- internal/coord/routing/crawl.go | 52 +++++----------- internal/coord/routing/crawl_test.go | 4 +- internal/coord/routing_test.go | 55 +++++++++++++--- 5 files changed, 154 insertions(+), 56 deletions(-) diff --git a/internal/coord/event.go b/internal/coord/event.go index 04019c2..37c81f1 100644 --- a/internal/coord/event.go +++ b/internal/coord/event.go @@ -238,3 +238,9 @@ type EventRoutingPoll struct{} func (*EventRoutingPoll) behaviourEvent() {} func (*EventRoutingPoll) routingCommand() {} + +type EventStartCrawl struct { + Seed []kadt.PeerID +} + +func (*EventStartCrawl) behaviourEvent() {} diff --git a/internal/coord/routing.go b/internal/coord/routing.go index c0f9fdc..91cb209 100644 --- a/internal/coord/routing.go +++ b/internal/coord/routing.go @@ -303,7 +303,7 @@ type RoutingBehaviour struct { explore coordt.StateMachine[routing.ExploreEvent, routing.ExploreState] // crawl is the state machine that can crawl the network from a set of seed nodes - crawl coordt.StateMachine[routing.ExploreEvent, routing.ExploreState] + crawl coordt.StateMachine[routing.CrawlEvent, routing.CrawlState] pendingMu sync.Mutex pending []BehaviourEvent @@ -374,9 +374,14 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, return nil, fmt.Errorf("explore: %w", err) } - // crawl, err := routing.NewCrawl(self) + crawlCfg := routing.DefaultCrawlConfig() - return ComposeRoutingBehaviour(self, bootstrap, include, probe, explore, cfg) + crawl, err := routing.NewCrawl(self, cplutil.GenRandPeerID, crawlCfg) + if err != nil { + return nil, fmt.Errorf("crawl: %w", err) + } + + return ComposeRoutingBehaviour(self, bootstrap, include, probe, explore, crawl, cfg) } // ComposeRoutingBehaviour creates a [RoutingBehaviour] composed of the supplied state machines. @@ -387,6 +392,7 @@ func ComposeRoutingBehaviour( include coordt.StateMachine[routing.IncludeEvent, routing.IncludeState], probe coordt.StateMachine[routing.ProbeEvent, routing.ProbeState], explore coordt.StateMachine[routing.ExploreEvent, routing.ExploreState], + crawl coordt.StateMachine[routing.CrawlEvent, routing.CrawlState], cfg *RoutingConfig, ) (*RoutingBehaviour, error) { if cfg == nil { @@ -402,6 +408,7 @@ func ComposeRoutingBehaviour( include: include, probe: probe, explore: explore, + crawl: crawl, ready: make(chan struct{}, 1), } return r, nil @@ -418,12 +425,11 @@ func (r *RoutingBehaviour) Notify(ctx context.Context, ev BehaviourEvent) { // notify must only be called while r.pendingMu is held func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { - ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.notify", trace.WithAttributes(attribute.String("event", fmt.Sprintf("%T", ev)))) + ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.notify", trace.WithAttributes(tele.AttrInEvent(ev))) defer span.End() switch ev := ev.(type) { case *EventStartBootstrap: - span.SetAttributes(attribute.String("event", "EventStartBootstrap")) cmd := &routing.EventBootstrapStart[kadt.Key, kadt.PeerID]{ KnownClosestNodes: ev.SeedNodes, } @@ -433,8 +439,17 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { r.pending = append(r.pending, next) } + case *EventStartCrawl: + cmd := &routing.EventCrawlStart[kadt.Key, kadt.PeerID]{ + Seed: ev.Seed, + } + // attempt to advance the bootstrap + next, ok := r.advanceCrawl(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } + case *EventAddNode: - span.SetAttributes(attribute.String("event", "EventAddAddrInfo")) // Ignore self if r.self.Equal(ev.NodeID) { break @@ -450,7 +465,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { } case *EventRoutingUpdated: - span.SetAttributes(attribute.String("event", "EventRoutingUpdated"), attribute.String("nodeid", ev.NodeID.String())) + span.SetAttributes(attribute.String("nodeid", ev.NodeID.String())) cmd := &routing.EventProbeAdd[kadt.Key, kadt.PeerID]{ NodeID: ev.NodeID, } @@ -533,9 +548,23 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { r.pending = append(r.pending, next) } + case routing.CrawlQueryID: + cmd := &routing.EventCrawlNodeResponse[kadt.Key, kadt.PeerID]{ + NodeID: ev.To, + Target: ev.Target, + CloserNodes: ev.CloserNodes, + } + + // attempt to advance the crawl + next, ok := r.advanceCrawl(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } + default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) } + case *EventGetCloserNodesFailure: span.SetAttributes(attribute.String("event", "EventGetCloserNodesFailure"), attribute.String("queryid", string(ev.QueryID)), attribute.String("nodeid", ev.To.String())) span.RecordError(ev.Err) @@ -580,10 +609,22 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { if ok { r.pending = append(r.pending, next) } + case routing.CrawlQueryID: + cmd := &routing.EventCrawlNodeFailure[kadt.Key, kadt.PeerID]{ + NodeID: ev.To, + Target: ev.Target, + Error: ev.Err, + } + // attempt to advance the crawl + next, ok := r.advanceCrawl(ctx, cmd) + if ok { + r.pending = append(r.pending, next) + } default: panic(fmt.Sprintf("unexpected query id: %s", ev.QueryID)) } + case *EventNotifyConnectivity: span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String())) // ignore self @@ -609,6 +650,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { if ok { r.pending = append(r.pending, nextProbe) } + case *EventNotifyNonConnectivity: span.SetAttributes(attribute.String("event", "EventNotifyConnectivity"), attribute.String("nodeid", ev.NodeID.String())) @@ -620,6 +662,7 @@ func (r *RoutingBehaviour) notify(ctx context.Context, ev BehaviourEvent) { if ok { r.pending = append(r.pending, nextProbe) } + case *EventRoutingPoll: r.pollChildren(ctx) @@ -693,6 +736,11 @@ func (r *RoutingBehaviour) pollChildren(ctx context.Context) { if ok { r.pending = append(r.pending, ev) } + + ev, ok = r.advanceCrawl(ctx, &routing.EventCrawlPoll{}) + if ok { + r.pending = append(r.pending, ev) + } } func (r *RoutingBehaviour) advanceBootstrap(ctx context.Context, ev routing.BootstrapEvent) (BehaviourEvent, bool) { @@ -817,9 +865,9 @@ func (r *RoutingBehaviour) advanceProbe(ctx context.Context, ev routing.ProbeEve func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.ExploreEvent) (BehaviourEvent, bool) { ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.advanceExplore") defer span.End() + bstate := r.explore.Advance(ctx, ev) switch st := bstate.(type) { - case *routing.StateExploreFindCloser[kadt.Key, kadt.PeerID]: r.cfg.Logger.Debug("starting explore", slog.Int("cpl", st.Cpl), tele.LogAttrPeerID(st.NodeID)) return &EventOutboundGetCloserNodes{ @@ -845,3 +893,32 @@ func (r *RoutingBehaviour) advanceExplore(ctx context.Context, ev routing.Explor return nil, false } + +func (r *RoutingBehaviour) advanceCrawl(ctx context.Context, ev routing.CrawlEvent) (BehaviourEvent, bool) { + ctx, span := r.cfg.Tracer.Start(ctx, "RoutingBehaviour.advanceCrawl") + defer span.End() + + cstate := r.crawl.Advance(ctx, ev) + switch st := cstate.(type) { + case *routing.StateCrawlFindCloser[kadt.Key, kadt.PeerID]: + return &EventOutboundGetCloserNodes{ + QueryID: routing.CrawlQueryID, + To: st.NodeID, + Target: st.Target, + Notify: r, + }, true + + case *routing.StateCrawlWaitingWithCapacity: + // crawl waiting for a message response but has capacity to do more + case *routing.StateCrawlWaitingAtCapacity: + // crawl waiting for a message response but has no capacity to do more + case *routing.StateCrawlFinished: + r.cfg.Logger.Info("crawl finished") + case *routing.StateCrawlIdle: + // bootstrap not running, nothing to do + default: + panic(fmt.Sprintf("unexpected explore state: %T", st)) + } + + return nil, false +} diff --git a/internal/coord/routing/crawl.go b/internal/coord/routing/crawl.go index ecbf551..5e8a706 100644 --- a/internal/coord/routing/crawl.go +++ b/internal/coord/routing/crawl.go @@ -14,6 +14,9 @@ import ( "github.com/plprobelab/zikade/tele" ) +// CrawlQueryID is the id for the query operated by the crawl state machine +const CrawlQueryID = coordt.QueryID("crawl") + // CrawlConfig specifies optional configuration for a Crawl type CrawlConfig struct { MaxCPL int // the maximum CPL until we should crawl the peer @@ -65,7 +68,6 @@ type Crawl[K kad.Key[K], N kad.NodeID[K]] struct { } type crawlInformation[K kad.Key[K], N kad.NodeID[K]] struct { - queryID coordt.QueryID todo []crawlJob[K, N] cpls map[string]int waiting map[string]N @@ -109,7 +111,6 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat span.SetAttributes(attribute.Int("seed", len(tev.Seed))) ci := &crawlInformation[K, N]{ - queryID: tev.QueryID, todo: []crawlJob[K, N]{}, cpls: map[string]int{}, waiting: map[string]N{}, @@ -149,9 +150,6 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat if c.info == nil { return &StateCrawlIdle{} - } else if c.info.queryID != tev.QueryID { - // if we don't know this query, pretend it was a poll by breaking - break } job := crawlJob[K, N]{ @@ -193,9 +191,6 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat case *EventCrawlNodeFailure[K, N]: if c.info == nil { return &StateCrawlIdle{} - } else if c.info.queryID != tev.QueryID { - // if we don't know this query, pretend it was a poll by breaking - break } span.RecordError(tev.Error) @@ -224,9 +219,7 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat } if len(c.info.waiting) >= c.cfg.MaxCPL*c.cfg.Concurrency { - return &StateCrawlWaitingAtCapacity{ - QueryID: c.info.queryID, - } + return &StateCrawlWaitingAtCapacity{} } if len(c.info.todo) > 0 { @@ -240,16 +233,13 @@ func (c *Crawl[K, N]) Advance(ctx context.Context, ev CrawlEvent) (out CrawlStat c.info.waiting[mapKey] = job.node return &StateCrawlFindCloser[K, N]{ - QueryID: c.info.queryID, - Target: job.target, - NodeID: job.node, + Target: job.target, + NodeID: job.node, } } if len(c.info.waiting) > 0 { - return &StateCrawlWaitingWithCapacity{ - QueryID: c.info.queryID, - } + return &StateCrawlWaitingWithCapacity{} } c.info = nil @@ -291,21 +281,14 @@ type CrawlState interface { type StateCrawlIdle struct{} -type StateCrawlFinished struct { - QueryID coordt.QueryID -} +type StateCrawlFinished struct{} -type StateCrawlWaitingAtCapacity struct { - QueryID coordt.QueryID -} -type StateCrawlWaitingWithCapacity struct { - QueryID coordt.QueryID -} +type StateCrawlWaitingAtCapacity struct{} +type StateCrawlWaitingWithCapacity struct{} type StateCrawlFindCloser[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID coordt.QueryID - Target K // the key that the query wants to find closer nodes for - NodeID N // the node to send the message to + Target K // the key that the query wants to find closer nodes for + NodeID N // the node to send the message to } // crawlState() ensures that only [Crawl] states can be assigned to a CrawlState. @@ -325,22 +308,19 @@ type EventCrawlPoll struct{} // type EventCrawlCancel struct{} // TODO: implement type EventCrawlStart[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID coordt.QueryID - Seed []N + Seed []N } type EventCrawlNodeResponse[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID coordt.QueryID NodeID N // the node the message was sent to Target K // the key that the node was asked for CloserNodes []N // the closer nodes sent by the node } type EventCrawlNodeFailure[K kad.Key[K], N kad.NodeID[K]] struct { - QueryID coordt.QueryID - NodeID N // the node the message was sent to - Target K // the key that the node was asked for - Error error // the error that caused the failure, if any + NodeID N // the node the message was sent to + Target K // the key that the node was asked for + Error error // the error that caused the failure, if any } // crawlEvent() ensures that only events accepted by [Crawl] can be assigned to a [CrawlEvent]. diff --git a/internal/coord/routing/crawl_test.go b/internal/coord/routing/crawl_test.go index 4ddb2e5..54ea12d 100644 --- a/internal/coord/routing/crawl_test.go +++ b/internal/coord/routing/crawl_test.go @@ -305,7 +305,7 @@ func TestCrawl_Advance_unrelated_response(t *testing.T) { // send it an unrelated response state = qry.Advance(ctx, &EventCrawlNodeResponse[tiny.Key, tiny.Node]{ - QueryID: coordt.QueryID("another"), + QueryID: "another", NodeID: tstate.NodeID, Target: tstate.Target, CloserNodes: []tiny.Node{}, @@ -314,7 +314,7 @@ func TestCrawl_Advance_unrelated_response(t *testing.T) { // send it an unrelated response state = qry.Advance(ctx, &EventCrawlNodeFailure[tiny.Key, tiny.Node]{ - QueryID: coordt.QueryID("another"), + QueryID: "another", NodeID: tstate.NodeID, Target: tstate.Target, Error: fmt.Errorf("some error"), diff --git a/internal/coord/routing_test.go b/internal/coord/routing_test.go index c2f8dd5..952b66b 100644 --- a/internal/coord/routing_test.go +++ b/internal/coord/routing_test.go @@ -37,6 +37,11 @@ func idleExplore() *RecordingSM[routing.ExploreEvent, routing.ExploreState] { return NewRecordingSM[routing.ExploreEvent, routing.ExploreState](&routing.StateExploreIdle{}) } +// idleCrawl returns an crawl state machine that is always idle +func idleCrawl() *RecordingSM[routing.CrawlEvent, routing.CrawlState] { + return NewRecordingSM[routing.CrawlEvent, routing.CrawlState](&routing.StateCrawlIdle{}) +} + func TestRoutingConfigValidate(t *testing.T) { t.Run("default is valid", func(t *testing.T) { cfg := DefaultRoutingConfig() @@ -225,7 +230,7 @@ func TestRoutingStartBootstrapSendsEvent(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, bootstrap, idleInclude(), idleProbe(), idleExplore(), cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, bootstrap, idleInclude(), idleProbe(), idleExplore(), idleCrawl(), cfg) require.NoError(t, err) ev := &EventStartBootstrap{ @@ -255,7 +260,7 @@ func TestRoutingBootstrapGetClosestNodesSuccess(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, bootstrap, idleInclude(), idleProbe(), idleExplore(), cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, bootstrap, idleInclude(), idleProbe(), idleExplore(), idleCrawl(), cfg) require.NoError(t, err) ev := &EventGetCloserNodesSuccess{ @@ -289,7 +294,7 @@ func TestRoutingBootstrapGetClosestNodesFailure(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, bootstrap, idleInclude(), idleProbe(), idleExplore(), cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, bootstrap, idleInclude(), idleProbe(), idleExplore(), idleCrawl(), cfg) require.NoError(t, err) failure := errors.New("failed") @@ -324,7 +329,7 @@ func TestRoutingAddNodeInfoSendsEvent(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, idleProbe(), idleExplore(), cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, idleProbe(), idleExplore(), idleCrawl(), cfg) require.NoError(t, err) ev := &EventAddNode{ @@ -354,7 +359,7 @@ func TestRoutingIncludeGetClosestNodesSuccess(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, idleProbe(), idleExplore(), cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, idleProbe(), idleExplore(), idleCrawl(), cfg) require.NoError(t, err) ev := &EventGetCloserNodesSuccess{ @@ -387,7 +392,7 @@ func TestRoutingIncludeGetClosestNodesFailure(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, idleProbe(), idleExplore(), cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, idleProbe(), idleExplore(), idleCrawl(), cfg) require.NoError(t, err) failure := errors.New("failed") @@ -431,7 +436,7 @@ func TestRoutingIncludedNodeAddToProbeList(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, probe, idleExplore(), cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), include, probe, idleExplore(), idleCrawl(), cfg) require.NoError(t, err) // a new node to be included @@ -510,7 +515,7 @@ func TestRoutingExploreSendsEvent(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), idleInclude(), idleProbe(), explore, cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), idleInclude(), idleProbe(), explore, idleCrawl(), cfg) require.NoError(t, err) routingBehaviour.Notify(ctx, &EventRoutingPoll{}) @@ -543,7 +548,7 @@ func TestRoutingExploreGetClosestNodesSuccess(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), idleInclude(), idleProbe(), explore, cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), idleInclude(), idleProbe(), explore, idleCrawl(), cfg) require.NoError(t, err) ev := &EventGetCloserNodesSuccess{ @@ -576,7 +581,7 @@ func TestRoutingExploreGetClosestNodesFailure(t *testing.T) { cfg := DefaultRoutingConfig() cfg.Clock = clk - routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), idleInclude(), idleProbe(), explore, cfg) + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), idleInclude(), idleProbe(), explore, idleCrawl(), cfg) require.NoError(t, err) failure := errors.New("failed") @@ -596,3 +601,33 @@ func TestRoutingExploreGetClosestNodesFailure(t *testing.T) { require.Equal(t, peer.ID(nodes[1].NodeID), peer.ID(rev.NodeID)) require.Equal(t, failure, rev.Error) } + +func TestRoutingStartCrawlSendsEvent(t *testing.T) { + ctx := kadtest.CtxShort(t) + + clk := clock.NewMock() + _, nodes, err := nettest.LinearTopology(4, clk) + require.NoError(t, err) + + self := nodes[0].NodeID + + // records the event passed to bootstrap + crawl := NewRecordingSM[routing.CrawlEvent, routing.CrawlState](&routing.StateCrawlIdle{}) + + cfg := DefaultRoutingConfig() + cfg.Clock = clk + routingBehaviour, err := ComposeRoutingBehaviour(self, idleBootstrap(), idleInclude(), idleProbe(), idleExplore(), crawl, cfg) + require.NoError(t, err) + + ev := &EventStartCrawl{ + Seed: []kadt.PeerID{nodes[1].NodeID}, + } + + routingBehaviour.Notify(ctx, ev) + + // the event that should be passed to the bootstrap state machine + expected := &routing.EventCrawlStart[kadt.Key, kadt.PeerID]{ + Seed: ev.Seed, + } + require.Equal(t, expected, crawl.Received) +}