diff --git a/cmd/humanlog/ingest.go b/cmd/humanlog/ingest.go index df54cf5f..86ba39d2 100644 --- a/cmd/humanlog/ingest.go +++ b/cmd/humanlog/ingest.go @@ -32,6 +32,7 @@ func ingest( getState func(*cli.Context) *state.State, getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource, getHTTPClient func(*cli.Context) *http.Client, + notifyUnableToIngest func(error), ) (sink.Sink, error) { state := getState(cctx) tokenSource := getTokenSource(cctx) @@ -65,13 +66,13 @@ func ingest( var snk sink.Sink switch sinkType := os.Getenv("HUMANLOG_SINK_TYPE"); sinkType { case "unary": - snk = logsvcsink.StartUnarySink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true) + snk = logsvcsink.StartUnarySink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest) case "bidi": - snk = logsvcsink.StartBidiStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true) + snk = logsvcsink.StartBidiStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest) case "stream": fallthrough // use the stream sink as default, it's the best tradeoff for performance and compatibility default: - snk = logsvcsink.StartStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true) + snk = logsvcsink.StartStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest) } return snk, nil diff --git a/cmd/humanlog/localhost.go b/cmd/humanlog/localhost.go index 131e01e9..f591a3a3 100644 --- a/cmd/humanlog/localhost.go +++ b/cmd/humanlog/localhost.go @@ -61,8 +61,13 @@ func startLocalhostServer( localhostHttpClient *http.Client, ownVersion *typesv1.Version, ) (localsink sink.Sink, done func(context.Context) error, err error) { - localhostAddr := net.JoinHostPort("localhost", strconv.Itoa(port)) + notifyUnableToIngest := func(err error) { + ll.ErrorContext(ctx, "localhost ingestor is unable to ingest", slog.Any("err", err)) + // TODO: take this as a hint to become the localhost ingestor + } + + localhostAddr := net.JoinHostPort("localhost", strconv.Itoa(port)) l, err := net.Listen("tcp", localhostAddr) if err != nil && !isEADDRINUSE(err) { return nil, nil, fmt.Errorf("listening on host/port: %v", err) @@ -78,7 +83,7 @@ func startLocalhostServer( } logdebug("sending logs to localhost forwarder") client := ingestv1connect.NewIngestServiceClient(localhostHttpClient, addr.String()) - localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true) + localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true, notifyUnableToIngest) return localhostSink, func(ctx context.Context) error { logdebug("flushing localhost sink") return localhostSink.Flush(ctx) diff --git a/cmd/humanlog/main.go b/cmd/humanlog/main.go index 3c2b7cc2..4f39d6d5 100644 --- a/cmd/humanlog/main.go +++ b/cmd/humanlog/main.go @@ -444,6 +444,12 @@ func newApp() *cli.App { ll := getLogger(cctx) apiURL := getAPIUrl(cctx) + notifyUnableToIngest := func(err error) { + // TODO: notify using system notification? + logerror("configured to ingest, but unable to do so: %v", err) + os.Exit(1) + } + flushTimeout := 300 * time.Millisecond ingestctx, ingestcancel := context.WithCancel(context.WithoutCancel(ctx)) go func() { @@ -451,7 +457,7 @@ func newApp() *cli.App { time.Sleep(2 * flushTimeout) // give it 2x timeout to flush before nipping the ctx entirely ingestcancel() }() - remotesink, err := ingest(ingestctx, ll, cctx, apiURL, getCfg, getState, getTokenSource, getHTTPClient) + remotesink, err := ingest(ingestctx, ll, cctx, apiURL, getCfg, getState, getTokenSource, getHTTPClient, notifyUnableToIngest) if err != nil { return fmt.Errorf("can't send logs: %v", err) } diff --git a/pkg/sink/logsvcsink/bidistream_sink.go b/pkg/sink/logsvcsink/bidistream_sink.go index 088c808c..351a6e52 100644 --- a/pkg/sink/logsvcsink/bidistream_sink.go +++ b/pkg/sink/logsvcsink/bidistream_sink.go @@ -2,6 +2,7 @@ package logsvcsink import ( "context" + "errors" "fmt" "io" "log/slog" @@ -28,7 +29,17 @@ type ConnectBidiStreamSink struct { doneFlushing chan struct{} } -func StartBidiStreamSink(ctx context.Context, ll *slog.Logger, client ingestv1connect.IngestServiceClient, name string, machineID uint64, bufferSize int, drainBufferFor time.Duration, dropIfFull bool) *ConnectBidiStreamSink { +func StartBidiStreamSink( + ctx context.Context, + ll *slog.Logger, + client ingestv1connect.IngestServiceClient, + name string, + machineID uint64, + bufferSize int, + drainBufferFor time.Duration, + dropIfFull bool, + notifyUnableToIngest func(err error), +) *ConnectBidiStreamSink { snk := &ConnectBidiStreamSink{ ll: ll.With( slog.String("sink", name), @@ -53,6 +64,12 @@ func StartBidiStreamSink(ctx context.Context, ll *slog.Logger, client ingestv1co close(snk.doneFlushing) return } + var cerr *connect.Error + if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted { + close(snk.doneFlushing) + notifyUnableToIngest(err) + return + } if err != nil { ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err)) } @@ -92,6 +109,10 @@ func (snk *ConnectBidiStreamSink) connectAndHandleBuffer( stream = client.IngestBidiStream(ctx) firstReq := &v1.IngestBidiStreamRequest{Events: buffered, MachineId: machineID, ResumeSessionId: resumeSessionID} if err := stream.Send(firstReq); err != nil { + var cerr *connect.Error + if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted { + return false, cerr + } return true, fmt.Errorf("creating ingestion stream: %w", err) } return false, nil diff --git a/pkg/sink/logsvcsink/stream_sink.go b/pkg/sink/logsvcsink/stream_sink.go index 096a28f0..cc5318e8 100644 --- a/pkg/sink/logsvcsink/stream_sink.go +++ b/pkg/sink/logsvcsink/stream_sink.go @@ -2,6 +2,7 @@ package logsvcsink import ( "context" + "errors" "fmt" "io" "log/slog" @@ -37,6 +38,7 @@ func StartStreamSink( bufferSize int, drainBufferFor time.Duration, dropIfFull bool, + notifyUnableToIngest func(err error), ) *ConnectStreamSink { snk := &ConnectStreamSink{ @@ -64,6 +66,12 @@ func StartStreamSink( close(snk.doneFlushing) return } + var cerr *connect.Error + if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted { + close(snk.doneFlushing) + notifyUnableToIngest(err) + return + } if err != nil { ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err)) } @@ -96,7 +104,7 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer( buffered []*typesv1.LogEvent, sessionID uint64, heartbeatEvery time.Duration, -) (lastBuffer []*typesv1.LogEvent, _ uint64, _ time.Duration, _ error) { +) (lastBuffer []*typesv1.LogEvent, _ uint64, _ time.Duration, sendErr error) { ll := snk.ll ll.DebugContext(ctx, "contacting log ingestor") var stream *connect.ClientStreamForClient[v1.IngestStreamRequest, v1.IngestStreamResponse] @@ -104,6 +112,10 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer( hbRes, err := client.GetHeartbeat(ctx, connect.NewRequest(&v1.GetHeartbeatRequest{MachineId: &machineID})) if err != nil { + var cerr *connect.Error + if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted { + return false, cerr + } return true, fmt.Errorf("requesting heartbeat config from ingestor: %v", err) } heartbeatEvery = hbRes.Msg.HeartbeatIn.AsDuration() @@ -124,6 +136,13 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer( defer func() { res, err := stream.CloseAndReceive() if err != nil { + var cerr *connect.Error + if errors.Is(sendErr, io.EOF) && errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted { + sendErr = cerr + ll.ErrorContext(ctx, "no active plan, can't ingest logs", slog.Any("err", err)) + return + } + ll.ErrorContext(ctx, "closing and receiving response for log ingestor session", slog.Any("err", err)) return } @@ -186,7 +205,7 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer( // until it's empty, then send what we have } start := time.Now() - err := stream.Send(req) + sendErr = stream.Send(req) dur := time.Since(start) ll.DebugContext(ctx, "sent logs", slog.String("sink", snk.name), @@ -196,8 +215,8 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer( slog.Int("buffer_size", bufferSize), slog.Int64("drain_for_ms", drainBufferFor.Milliseconds()), ) - if err != nil { - return req.Events, sessionID, heartbeatEvery, err + if sendErr != nil { + return req.Events, sessionID, heartbeatEvery, sendErr } req.Events = req.Events[:0:len(req.Events)] } diff --git a/pkg/sink/logsvcsink/unary_sink.go b/pkg/sink/logsvcsink/unary_sink.go index a21b99ad..eb03708d 100644 --- a/pkg/sink/logsvcsink/unary_sink.go +++ b/pkg/sink/logsvcsink/unary_sink.go @@ -2,6 +2,7 @@ package logsvcsink import ( "context" + "errors" "fmt" "io" "log/slog" @@ -37,6 +38,7 @@ func StartUnarySink( bufferSize int, drainBufferFor time.Duration, dropIfFull bool, + notifyUnableToIngest func(err error), ) *ConnectUnarySink { snk := &ConnectUnarySink{ ll: ll.With( @@ -63,6 +65,12 @@ func StartUnarySink( close(snk.doneFlushing) return } + var cerr *connect.Error + if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted { + close(snk.doneFlushing) + notifyUnableToIngest(err) + return + } if err != nil { ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err)) } @@ -101,6 +109,10 @@ func (snk *ConnectUnarySink) connectAndHandleBuffer( err := retry.Do(ctx, func(ctx context.Context) (bool, error) { hbRes, err := client.GetHeartbeat(ctx, connect.NewRequest(&v1.GetHeartbeatRequest{MachineId: &machineID})) if err != nil { + var cerr *connect.Error + if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted { + return false, cerr + } return true, fmt.Errorf("requesting heartbeat config from ingestor: %v", err) } heartbeatEvery = hbRes.Msg.HeartbeatIn.AsDuration()