diff --git a/events/events.go b/events/events.go index 915beaf19..83438ee44 100644 --- a/events/events.go +++ b/events/events.go @@ -290,7 +290,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func case <-done: return ErrPlaybackShutdown case out <- e: - seq := sequenceForEvent(e) + seq := SequenceForEvent(e) if seq > 0 { lastSeq = seq } @@ -315,8 +315,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func // run playback again to get us to the events that have started buffering if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error { - seq := sequenceForEvent(e) - if seq > sequenceForEvent(first) { + seq := SequenceForEvent(e) + if seq > SequenceForEvent(first) { return ErrCaughtUp } @@ -351,7 +351,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func return out, sub.cleanup, nil } -func sequenceForEvent(evt *XRPCStreamEvent) int64 { +func SequenceForEvent(evt *XRPCStreamEvent) int64 { switch { case evt == nil: return -1 diff --git a/splitter/ringbuf.go b/splitter/ringbuf.go index 9168cdf04..2417f4eb0 100644 --- a/splitter/ringbuf.go +++ b/splitter/ringbuf.go @@ -96,7 +96,7 @@ func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb fu for ; i >= 0; i-- { c := chunks[i] evts := c.events() - if since > sequenceForEvent(evts[len(evts)-1]) { + if since > events.SequenceForEvent(evts[len(evts)-1]) { i++ break } @@ -112,7 +112,7 @@ func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb fu for nread < len(evts) { for _, e := range evts[nread:] { nread++ - seq := sequenceForEvent(e) + seq := events.SequenceForEvent(e) if seq <= since { continue } diff --git a/splitter/splitter.go b/splitter/splitter.go index 0f4a6cf8c..c39fbb5e3 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -383,7 +383,7 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso defer cancel() sched := sequential.NewScheduler("splitter", func(ctx context.Context, evt *events.XRPCStreamEvent) error { - seq := sequenceForEvent(evt) + seq := events.SequenceForEvent(evt) if seq < 0 { // ignore info events and other unsupported types return nil @@ -402,24 +402,8 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso *lastCursor = seq return nil }) - return events.HandleRepoStream(ctx, con, sched) -} -func sequenceForEvent(evt *events.XRPCStreamEvent) int64 { - switch { - case evt.RepoCommit != nil: - return evt.RepoCommit.Seq - case evt.RepoHandle != nil: - return evt.RepoHandle.Seq - case evt.RepoMigrate != nil: - return evt.RepoMigrate.Seq - case evt.RepoTombstone != nil: - return evt.RepoTombstone.Seq - case evt.RepoInfo != nil: - return -1 - default: - return -1 - } + return events.HandleRepoStream(ctx, con, sched) } func (s *Splitter) getLastCursor() (int64, error) {