Skip to content

Commit

Permalink
Handle channel closure gracefully without panicing (#427)
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping authored Nov 10, 2023
2 parents 668f3da + 520b87a commit f654c94
Showing 1 changed file with 22 additions and 8 deletions.
30 changes: 22 additions & 8 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,19 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
default:
log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident)
go func(torem *Subscriber) {
select {
case torem.outgoing <- &XRPCStreamEvent{
Error: &ErrorFrame{
Error: "ConsumerTooSlow",
},
}:
case <-time.After(time.Second * 5):
log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident)
torem.lk.Lock()
if !torem.cleanedUp {
select {
case torem.outgoing <- &XRPCStreamEvent{
Error: &ErrorFrame{
Error: "ConsumerTooSlow",
},
}:
case <-time.After(time.Second * 5):
log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident)
}
}
torem.lk.Unlock()
torem.cleanup()
}(s)
}
Expand All @@ -112,6 +116,9 @@ type Subscriber struct {

cleanup func()

lk sync.Mutex
cleanedUp bool

ident string
enqueuedCounter prometheus.Counter
broadcastCounter prometheus.Counter
Expand Down Expand Up @@ -177,9 +184,12 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func
}

sub.cleanup = sync.OnceFunc(func() {
sub.lk.Lock()
defer sub.lk.Unlock()
close(done)
em.rmSubscriber(sub)
close(sub.outgoing)
sub.cleanedUp = true
})

if since == nil {
Expand Down Expand Up @@ -260,6 +270,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func

func sequenceForEvent(evt *XRPCStreamEvent) int64 {
switch {
case evt == nil:
return -1
case evt.RepoCommit != nil:
return evt.RepoCommit.Seq
case evt.RepoHandle != nil:
Expand All @@ -270,6 +282,8 @@ func sequenceForEvent(evt *XRPCStreamEvent) int64 {
return evt.RepoTombstone.Seq
case evt.RepoInfo != nil:
return -1
case evt.Error != nil:
return -1
default:
return -1
}
Expand Down

0 comments on commit f654c94

Please sign in to comment.