From 520b87a2512e43beb1032a9f3d6b6b1f542f0144 Mon Sep 17 00:00:00 2001 From: Jaz Volpert Date: Fri, 10 Nov 2023 06:40:21 +0000 Subject: [PATCH] Handle channel closure gracefully without panicing --- events/events.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/events/events.go b/events/events.go index f89aec303..92c3417e4 100644 --- a/events/events.go +++ b/events/events.go @@ -77,15 +77,19 @@ func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) { default: log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident) go func(torem *Subscriber) { - select { - case torem.outgoing <- &XRPCStreamEvent{ - Error: &ErrorFrame{ - Error: "ConsumerTooSlow", - }, - }: - case <-time.After(time.Second * 5): - log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) + torem.lk.Lock() + if !torem.cleanedUp { + select { + case torem.outgoing <- &XRPCStreamEvent{ + Error: &ErrorFrame{ + Error: "ConsumerTooSlow", + }, + }: + case <-time.After(time.Second * 5): + log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident) + } } + torem.lk.Unlock() torem.cleanup() }(s) } @@ -112,6 +116,9 @@ type Subscriber struct { cleanup func() + lk sync.Mutex + cleanedUp bool + ident string enqueuedCounter prometheus.Counter broadcastCounter prometheus.Counter @@ -177,9 +184,12 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func } sub.cleanup = sync.OnceFunc(func() { + sub.lk.Lock() + defer sub.lk.Unlock() close(done) em.rmSubscriber(sub) close(sub.outgoing) + sub.cleanedUp = true }) if since == nil { @@ -260,6 +270,8 @@ func (em *EventManager) Subscribe(ctx context.Context, ident string, filter func func sequenceForEvent(evt *XRPCStreamEvent) int64 { switch { + case evt == nil: + return -1 case evt.RepoCommit != nil: return evt.RepoCommit.Seq case evt.RepoHandle != nil: @@ -270,6 +282,8 @@ func sequenceForEvent(evt *XRPCStreamEvent) int64 { return evt.RepoTombstone.Seq case evt.RepoInfo != nil: return -1 + case evt.Error != nil: + return -1 default: return -1 }