Skip to content

Commit

Permalink
feat(ingest): error when ingestion is not possible
Browse files Browse the repository at this point in the history
  • Loading branch information
aybabtme committed Oct 22, 2024
1 parent 0233994 commit ce5c214
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 11 deletions.
7 changes: 4 additions & 3 deletions cmd/humanlog/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func ingest(
getState func(*cli.Context) *state.State,
getTokenSource func(cctx *cli.Context) *auth.UserRefreshableTokenSource,
getHTTPClient func(*cli.Context) *http.Client,
notifyUnableToIngest func(error),
) (sink.Sink, error) {
state := getState(cctx)
tokenSource := getTokenSource(cctx)
Expand Down Expand Up @@ -65,13 +66,13 @@ func ingest(
var snk sink.Sink
switch sinkType := os.Getenv("HUMANLOG_SINK_TYPE"); sinkType {
case "unary":
snk = logsvcsink.StartUnarySink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true)
snk = logsvcsink.StartUnarySink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
case "bidi":
snk = logsvcsink.StartBidiStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true)
snk = logsvcsink.StartBidiStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
case "stream":
fallthrough // use the stream sink as default, it's the best tradeoff for performance and compatibility
default:
snk = logsvcsink.StartStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true)
snk = logsvcsink.StartStreamSink(ctx, ll, client, "api", uint64(*state.MachineID), 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
}

return snk, nil
Expand Down
9 changes: 7 additions & 2 deletions cmd/humanlog/localhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,13 @@ func startLocalhostServer(
localhostHttpClient *http.Client,
ownVersion *typesv1.Version,
) (localsink sink.Sink, done func(context.Context) error, err error) {
localhostAddr := net.JoinHostPort("localhost", strconv.Itoa(port))

notifyUnableToIngest := func(err error) {
ll.ErrorContext(ctx, "localhost ingestor is unable to ingest", slog.Any("err", err))
// TODO: take this as a hint to become the localhost ingestor
}

localhostAddr := net.JoinHostPort("localhost", strconv.Itoa(port))
l, err := net.Listen("tcp", localhostAddr)
if err != nil && !isEADDRINUSE(err) {
return nil, nil, fmt.Errorf("listening on host/port: %v", err)
Expand All @@ -78,7 +83,7 @@ func startLocalhostServer(
}
logdebug("sending logs to localhost forwarder")
client := ingestv1connect.NewIngestServiceClient(localhostHttpClient, addr.String())
localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true)
localhostSink := logsvcsink.StartStreamSink(ctx, ll, client, "local", machineID, 1<<20, 100*time.Millisecond, true, notifyUnableToIngest)
return localhostSink, func(ctx context.Context) error {
logdebug("flushing localhost sink")
return localhostSink.Flush(ctx)
Expand Down
8 changes: 7 additions & 1 deletion cmd/humanlog/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,20 @@ func newApp() *cli.App {
ll := getLogger(cctx)
apiURL := getAPIUrl(cctx)

notifyUnableToIngest := func(err error) {
// TODO: notify using system notification?
logerror("configured to ingest, but unable to do so: %v", err)
os.Exit(1)
}

flushTimeout := 300 * time.Millisecond
ingestctx, ingestcancel := context.WithCancel(context.WithoutCancel(ctx))
go func() {
<-ctx.Done()
time.Sleep(2 * flushTimeout) // give it 2x timeout to flush before nipping the ctx entirely
ingestcancel()
}()
remotesink, err := ingest(ingestctx, ll, cctx, apiURL, getCfg, getState, getTokenSource, getHTTPClient)
remotesink, err := ingest(ingestctx, ll, cctx, apiURL, getCfg, getState, getTokenSource, getHTTPClient, notifyUnableToIngest)
if err != nil {
return fmt.Errorf("can't send logs: %v", err)
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/sink/logsvcsink/bidistream_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logsvcsink

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
Expand All @@ -28,7 +29,17 @@ type ConnectBidiStreamSink struct {
doneFlushing chan struct{}
}

func StartBidiStreamSink(ctx context.Context, ll *slog.Logger, client ingestv1connect.IngestServiceClient, name string, machineID uint64, bufferSize int, drainBufferFor time.Duration, dropIfFull bool) *ConnectBidiStreamSink {
func StartBidiStreamSink(
ctx context.Context,
ll *slog.Logger,
client ingestv1connect.IngestServiceClient,
name string,
machineID uint64,
bufferSize int,
drainBufferFor time.Duration,
dropIfFull bool,
notifyUnableToIngest func(err error),
) *ConnectBidiStreamSink {
snk := &ConnectBidiStreamSink{
ll: ll.With(
slog.String("sink", name),
Expand All @@ -53,6 +64,12 @@ func StartBidiStreamSink(ctx context.Context, ll *slog.Logger, client ingestv1co
close(snk.doneFlushing)
return
}
var cerr *connect.Error
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
close(snk.doneFlushing)
notifyUnableToIngest(err)
return
}
if err != nil {
ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err))
}
Expand Down Expand Up @@ -92,6 +109,10 @@ func (snk *ConnectBidiStreamSink) connectAndHandleBuffer(
stream = client.IngestBidiStream(ctx)
firstReq := &v1.IngestBidiStreamRequest{Events: buffered, MachineId: machineID, ResumeSessionId: resumeSessionID}
if err := stream.Send(firstReq); err != nil {
var cerr *connect.Error
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
return false, cerr
}
return true, fmt.Errorf("creating ingestion stream: %w", err)
}
return false, nil
Expand Down
27 changes: 23 additions & 4 deletions pkg/sink/logsvcsink/stream_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logsvcsink

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
Expand Down Expand Up @@ -37,6 +38,7 @@ func StartStreamSink(
bufferSize int,
drainBufferFor time.Duration,
dropIfFull bool,
notifyUnableToIngest func(err error),
) *ConnectStreamSink {

snk := &ConnectStreamSink{
Expand Down Expand Up @@ -64,6 +66,12 @@ func StartStreamSink(
close(snk.doneFlushing)
return
}
var cerr *connect.Error
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
close(snk.doneFlushing)
notifyUnableToIngest(err)
return
}
if err != nil {
ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err))
}
Expand Down Expand Up @@ -96,14 +104,18 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
buffered []*typesv1.LogEvent,
sessionID uint64,
heartbeatEvery time.Duration,
) (lastBuffer []*typesv1.LogEvent, _ uint64, _ time.Duration, _ error) {
) (lastBuffer []*typesv1.LogEvent, _ uint64, _ time.Duration, sendErr error) {
ll := snk.ll
ll.DebugContext(ctx, "contacting log ingestor")
var stream *connect.ClientStreamForClient[v1.IngestStreamRequest, v1.IngestStreamResponse]
err := retry.Do(ctx, func(ctx context.Context) (bool, error) {

hbRes, err := client.GetHeartbeat(ctx, connect.NewRequest(&v1.GetHeartbeatRequest{MachineId: &machineID}))
if err != nil {
var cerr *connect.Error
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
return false, cerr
}
return true, fmt.Errorf("requesting heartbeat config from ingestor: %v", err)
}
heartbeatEvery = hbRes.Msg.HeartbeatIn.AsDuration()
Expand All @@ -124,6 +136,13 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
defer func() {
res, err := stream.CloseAndReceive()
if err != nil {
var cerr *connect.Error
if errors.Is(sendErr, io.EOF) && errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
sendErr = cerr
ll.ErrorContext(ctx, "no active plan, can't ingest logs", slog.Any("err", err))
return
}

ll.ErrorContext(ctx, "closing and receiving response for log ingestor session", slog.Any("err", err))
return
}
Expand Down Expand Up @@ -186,7 +205,7 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
// until it's empty, then send what we have
}
start := time.Now()
err := stream.Send(req)
sendErr = stream.Send(req)
dur := time.Since(start)
ll.DebugContext(ctx, "sent logs",
slog.String("sink", snk.name),
Expand All @@ -196,8 +215,8 @@ func (snk *ConnectStreamSink) connectAndHandleBuffer(
slog.Int("buffer_size", bufferSize),
slog.Int64("drain_for_ms", drainBufferFor.Milliseconds()),
)
if err != nil {
return req.Events, sessionID, heartbeatEvery, err
if sendErr != nil {
return req.Events, sessionID, heartbeatEvery, sendErr
}
req.Events = req.Events[:0:len(req.Events)]
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/sink/logsvcsink/unary_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logsvcsink

import (
"context"
"errors"
"fmt"
"io"
"log/slog"
Expand Down Expand Up @@ -37,6 +38,7 @@ func StartUnarySink(
bufferSize int,
drainBufferFor time.Duration,
dropIfFull bool,
notifyUnableToIngest func(err error),
) *ConnectUnarySink {
snk := &ConnectUnarySink{
ll: ll.With(
Expand All @@ -63,6 +65,12 @@ func StartUnarySink(
close(snk.doneFlushing)
return
}
var cerr *connect.Error
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
close(snk.doneFlushing)
notifyUnableToIngest(err)
return
}
if err != nil {
ll.ErrorContext(ctx, "failed to send logs", slog.Any("err", err))
}
Expand Down Expand Up @@ -101,6 +109,10 @@ func (snk *ConnectUnarySink) connectAndHandleBuffer(
err := retry.Do(ctx, func(ctx context.Context) (bool, error) {
hbRes, err := client.GetHeartbeat(ctx, connect.NewRequest(&v1.GetHeartbeatRequest{MachineId: &machineID}))
if err != nil {
var cerr *connect.Error
if errors.As(err, &cerr) && cerr.Code() == connect.CodeResourceExhausted {
return false, cerr
}
return true, fmt.Errorf("requesting heartbeat config from ingestor: %v", err)
}
heartbeatEvery = hbRes.Msg.HeartbeatIn.AsDuration()
Expand Down

0 comments on commit ce5c214

Please sign in to comment.