diff --git a/cmd/jetstream/main.go b/cmd/jetstream/main.go index ef5ba46..cfb9cec 100644 --- a/cmd/jetstream/main.go +++ b/cmd/jetstream/main.go @@ -332,17 +332,49 @@ func Jetstream(cctx *cli.Context) error { } log.Info("shutting down, waiting for workers to clean up...") + close(shutdownRepoStream) close(shutdownLivenessChecker) close(shutdownCursorManager) close(shutdownEcho) close(shutdownMetrics) - <-repoStreamShutdown - <-livenessCheckerShutdown - <-cursorManagerShutdown - <-echoShutdown - <-metricsShutdown + shutdownTimeout := time.After(10 * time.Second) + + select { + case <-repoStreamShutdown: + log.Info("Repo stream shutdown completed") + case <-shutdownTimeout: + log.Warn("Shutdown timeout reached for repo stream") + } + + select { + case <-livenessCheckerShutdown: + log.Info("Liveness checker shutdown completed") + case <-shutdownTimeout: + log.Warn("Shutdown timeout reached for liveness checker") + } + + select { + case <-cursorManagerShutdown: + log.Info("Cursor manager shutdown completed") + case <-shutdownTimeout: + log.Warn("Shutdown timeout reached for cursor manager") + } + + select { + case <-echoShutdown: + log.Info("Echo shutdown completed") + case <-shutdownTimeout: + log.Warn("Shutdown timeout reached for echo server") + } + + select { + case <-metricsShutdown: + log.Info("Metrics shutdown completed") + case <-shutdownTimeout: + log.Warn("Shutdown timeout reached for metrics server") + } c.Shutdown() diff --git a/pkg/consumer/consumer.go b/pkg/consumer/consumer.go index bf44cf6..2d53e1c 100644 --- a/pkg/consumer/consumer.go +++ b/pkg/consumer/consumer.go @@ -116,6 +116,13 @@ func NewConsumer( func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error { ctx, span := tracer.Start(ctx, "HandleStreamEvent") defer span.End() + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + switch { case xe.RepoCommit != nil: eventsProcessedCounter.WithLabelValues("commit", c.SocketURL).Inc() @@ -373,7 +380,14 @@ func (c *Consumer) RunSequencer(ctx context.Context) error { } func (c *Consumer) Shutdown() { + shutdownTimeout := time.After(10 * time.Second) shutdown := make(chan struct{}) c.sequencerShutdown <- shutdown - <-shutdown + + select { + case <-shutdownTimeout: + c.logger.Warn("sequencer shutdown timed out") + case <-shutdown: + c.logger.Info("sequencer shutdown complete") + } }