Skip to content

Commit

Permalink
feat: Improve pattern ingester tracing (backport k226) (#14731)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Nov 1, 2024
1 parent 88d516f commit 8e67b52
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ func (t *Loki) initPatternIngesterTee() (services.Service, error) {
t.Cfg.Pattern,
t.Overrides,
t.PatternRingClient,
t.tenantConfigs,
t.Cfg.MetricsNamespace,
prometheus.DefaultRegisterer,
logger,
Expand Down
37 changes: 35 additions & 2 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"io"
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -160,7 +162,13 @@ func (p *Push) Stop() {
}

// buildPayload creates the snappy compressed protobuf to send to Loki
func (p *Push) buildPayload() ([]byte, error) {
func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {
sp, _ := opentracing.StartSpanFromContext(
ctx,
"patternIngester.aggregation.Push.buildPayload",
)
defer sp.Finish()

entries := p.entries.reset()

entriesByStream := make(map[string][]logproto.Entry)
Expand All @@ -179,6 +187,14 @@ func (p *Push) buildPayload() ([]byte, error) {
}

streams := make([]logproto.Stream, 0, len(entriesByStream))

// limit the number of services to log to 1000
serviceLimit := len(entriesByStream)
if serviceLimit > 1000 {
serviceLimit = 1000
}

services := make([]string, 0, serviceLimit)
for s, entries := range entriesByStream {
lbls, err := syntax.ParseLabels(s)
if err != nil {
Expand All @@ -190,6 +206,10 @@ func (p *Push) buildPayload() ([]byte, error) {
Entries: entries,
Hash: lbls.Hash(),
})

if len(services) < serviceLimit {
services = append(services, lbls.Get(push.AggregatedMetricLabel))
}
}

req := &logproto.PushRequest{
Expand All @@ -202,6 +222,14 @@ func (p *Push) buildPayload() ([]byte, error) {

payload = snappy.Encode(nil, payload)

sp.LogKV(
"event", "build aggregated metrics payload",
"num_service", len(entriesByStream),
"first_1k_services", strings.Join(services, ","),
"num_streams", len(streams),
"num_entries", len(entries),
)

return payload, nil
}

Expand All @@ -221,7 +249,7 @@ func (p *Push) run(pushPeriod time.Duration) {
cancel()
return
case <-pushTicker.C:
payload, err := p.buildPayload()
payload, err := p.buildPayload(ctx)
if err != nil {
level.Error(p.logger).Log("msg", "failed to build payload", "err", err)
continue
Expand Down Expand Up @@ -265,9 +293,14 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {
err error
resp *http.Response
)

// Set a timeout for the request
ctx, cancel := context.WithTimeout(ctx, p.httpClient.Timeout)
defer cancel()

sp, ctx := opentracing.StartSpanFromContext(ctx, "patternIngester.aggregation.Push.send")
defer sp.Finish()

req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload))
if err != nil {
return -1, fmt.Errorf("failed to create push request: %w", err)
Expand Down
17 changes: 15 additions & 2 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/ring"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand Down Expand Up @@ -95,7 +96,7 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
for _, reqStream := range req.Streams {
// All streams are observed for metrics
// TODO(twhitney): this would be better as a queue that drops in response to backpressure
i.Observe(reqStream.Labels, reqStream.Entries)
i.Observe(ctx, reqStream.Labels, reqStream.Entries)

// But only owned streamed are processed for patterns
ownedStream, err := i.isOwnedStream(i.ingesterID, reqStream.Labels)
Expand Down Expand Up @@ -252,10 +253,22 @@ func (i *instance) removeStream(s *stream) {
}
}

func (i *instance) Observe(stream string, entries []logproto.Entry) {
func (i *instance) Observe(ctx context.Context, stream string, entries []logproto.Entry) {
i.aggMetricsLock.Lock()
defer i.aggMetricsLock.Unlock()

sp, _ := opentracing.StartSpanFromContext(
ctx,
"patternIngester.Observe",
)
defer sp.Finish()

sp.LogKV(
"event", "observe stream for metrics",
"stream", stream,
"entries", len(entries),
)

for _, entry := range entries {
lvl := constants.LogLevelUnknown
structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata)
Expand Down
43 changes: 42 additions & 1 deletion pkg/pattern/tee_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ import (
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util/spanlogger"

ring_client "github.com/grafana/dskit/ring/client"
)

type TeeService struct {
cfg Config
limits Limits
tenantCfgs *runtime.TenantConfigs
logger log.Logger
ringClient RingClient
wg *sync.WaitGroup
Expand All @@ -51,6 +54,7 @@ func NewTeeService(
cfg Config,
limits Limits,
ringClient RingClient,
tenantCfgs *runtime.TenantConfigs,
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
Expand Down Expand Up @@ -86,6 +90,7 @@ func NewTeeService(
),
cfg: cfg,
limits: limits,
tenantCfgs: tenantCfgs,
ringClient: ringClient,

wg: &sync.WaitGroup{},
Expand Down Expand Up @@ -293,10 +298,11 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
// are gathered by this request
_ = instrument.CollectedRequest(
ctx,
"FlushTeedLogsToPatternIngested",
"FlushTeedLogsToPatternIngester",
ts.sendDuration,
instrument.ErrorCode,
func(ctx context.Context) error {
sp := spanlogger.FromContext(ctx)
client, err := ts.ringClient.GetClientFor(clientRequest.ingesterAddr)
if err != nil {
return err
Expand All @@ -313,6 +319,41 @@ func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest
// Success here means the stream will be processed for both metrics and patterns
ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "success").Inc()
ts.ingesterMetricAppends.WithLabelValues("success").Inc()

// limit logged labels to 1000
labelsLimit := len(req.Streams)
if labelsLimit > 1000 {
labelsLimit = 1000
}

labels := make([]string, 0, labelsLimit)
for _, stream := range req.Streams {
if len(labels) >= 1000 {
break
}

labels = append(labels, stream.Labels)
}

sp.LogKV(
"event", "forwarded push request to pattern ingester",
"num_streams", len(req.Streams),
"first_1k_labels", strings.Join(labels, ", "),
"tenant", clientRequest.tenant,
)

// this is basically the same as logging push request streams,
// so put it behind the same flag
if ts.tenantCfgs.LogPushRequestStreams(clientRequest.tenant) {
level.Debug(ts.logger).
Log(
"msg", "forwarded push request to pattern ingester",
"num_streams", len(req.Streams),
"first_1k_labels", strings.Join(labels, ", "),
"tenant", clientRequest.tenant,
)
}

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/pattern/tee_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/runtime"

"github.com/grafana/loki/pkg/push"
)
Expand Down Expand Up @@ -51,6 +52,7 @@ func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) {
metricAggregationEnabled: true,
},
ringClient,
runtime.DefaultTenantConfigs(),
"test",
nil,
log.NewNopLogger(),
Expand Down

0 comments on commit 8e67b52

Please sign in to comment.