Skip to content

Commit

Permalink
dont duplicate sequence for event func
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Nov 14, 2024
1 parent da9457b commit 54f0388
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 24 deletions.
8 changes: 4 additions & 4 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
case <-done:
return ErrPlaybackShutdown
case out <- e:
seq := sequenceForEvent(e)
seq := SequenceForEvent(e)
if seq > 0 {
lastSeq = seq
}
Expand All @@ -315,8 +315,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func

// run playback again to get us to the events that have started buffering
if err := em.persister.Playback(ctx, lastSeq, func(e *XRPCStreamEvent) error {
seq := sequenceForEvent(e)
if seq > sequenceForEvent(first) {
seq := SequenceForEvent(e)
if seq > SequenceForEvent(first) {
return ErrCaughtUp
}

Expand Down Expand Up @@ -351,7 +351,7 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
return out, sub.cleanup, nil
}

func sequenceForEvent(evt *XRPCStreamEvent) int64 {
func SequenceForEvent(evt *XRPCStreamEvent) int64 {
switch {
case evt == nil:
return -1
Expand Down
4 changes: 2 additions & 2 deletions splitter/ringbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb fu
for ; i >= 0; i-- {
c := chunks[i]
evts := c.events()
if since > sequenceForEvent(evts[len(evts)-1]) {
if since > events.SequenceForEvent(evts[len(evts)-1]) {
i++
break
}
Expand All @@ -112,7 +112,7 @@ func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb fu
for nread < len(evts) {
for _, e := range evts[nread:] {
nread++
seq := sequenceForEvent(e)
seq := events.SequenceForEvent(e)
if seq <= since {
continue
}
Expand Down
20 changes: 2 additions & 18 deletions splitter/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso
defer cancel()

sched := sequential.NewScheduler("splitter", func(ctx context.Context, evt *events.XRPCStreamEvent) error {
seq := sequenceForEvent(evt)
seq := events.SequenceForEvent(evt)
if seq < 0 {
// ignore info events and other unsupported types
return nil
Expand All @@ -402,24 +402,8 @@ func (s *Splitter) handleConnection(ctx context.Context, host string, con *webso
*lastCursor = seq
return nil
})
return events.HandleRepoStream(ctx, con, sched)
}

func sequenceForEvent(evt *events.XRPCStreamEvent) int64 {
switch {
case evt.RepoCommit != nil:
return evt.RepoCommit.Seq
case evt.RepoHandle != nil:
return evt.RepoHandle.Seq
case evt.RepoMigrate != nil:
return evt.RepoMigrate.Seq
case evt.RepoTombstone != nil:
return evt.RepoTombstone.Seq
case evt.RepoInfo != nil:
return -1
default:
return -1
}
return events.HandleRepoStream(ctx, con, sched)
}

func (s *Splitter) getLastCursor() (int64, error) {
Expand Down

0 comments on commit 54f0388

Please sign in to comment.