Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

WIP: Migrate to OTEL tracing #830

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions migrate-to-otel.gopatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
@@
var a expression
var b expression
var s identifier
var t identifier
@@
-s, t := opentracing.StartSpanFromContext(a,b)
-...
- defer s.Finish()
+import "go.opentelemetry.io/otel"
+t, s := otel.Tracer("github.com/grafana/pyroscope").Start(a,b)
+defer s.End()

@@
var foo,x identifier
@@

-import foo "github.com/opentracing/opentracing-go/log"
+import foo "go.opentelemetry.io/otel/attribute"
foo.x

@@
@@
- otlog
+ attribute

@@
var span identifier
var x expression
@@
- span.LogFields(...)
+import "go.opentelemetry.io/otel/trace"
+ span.AddEvent("TODO", trace.WithAttributes(...))


@@
@@
-opentracing.Span
+import "go.opentelemetry.io/otel/trace"
+trace.Span
143 changes: 45 additions & 98 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import (
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/samber/lo"
"github.com/segmentio/parquet-go"
"github.com/thanos-io/objstore"
"golang.org/x/exp/constraints"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -462,58 +463,6 @@ func (b *singleBlockQuerier) Bounds() (model.Time, model.Time) {
return b.meta.MinTime, b.meta.MaxTime
}

type mapPredicate[K constraints.Integer, V any] struct {
min K
max K
m map[K]V
}

func newMapPredicate[K constraints.Integer, V any](m map[K]V) query.Predicate {
p := &mapPredicate[K, V]{
m: m,
}

first := true
for k := range m {
if first || p.max < k {
p.max = k
}
if first || p.min > k {
p.min = k
}
first = false
}

return p
}

func (m *mapPredicate[K, V]) KeepColumnChunk(c parquet.ColumnChunk) bool {
if ci := c.ColumnIndex(); ci != nil {
for i := 0; i < ci.NumPages(); i++ {
min := K(ci.MinValue(i).Int64())
max := K(ci.MaxValue(i).Int64())
if m.max >= min && m.min <= max {
return true
}
}
return false
}

return true
}

func (m *mapPredicate[K, V]) KeepPage(page parquet.Page) bool {
if min, max, ok := page.Bounds(); ok {
return m.max >= K(min.Int64()) && m.min <= K(max.Int64())
}
return true
}

func (m *mapPredicate[K, V]) KeepValue(v parquet.Value) bool {
_, exists := m.m[K(v.Int64())]
return exists
}

type labelsInfo struct {
fp model.Fingerprint
lbs phlaremodel.Labels
Expand Down Expand Up @@ -610,8 +559,8 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile
}

func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesStacktraces")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesStacktraces")
defer sp.End()

r, err := stream.Receive()
if err != nil {
Expand All @@ -625,12 +574,11 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
}
request := r.Request
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID)))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
Expand Down Expand Up @@ -674,7 +622,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in

// Signals the end of the profile streaming by sending an empty response.
// This allows the client to not block other streaming ingesters.
sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "signaling the end of the profile streaming")))
if err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{}); err != nil {
return err
}
Expand All @@ -684,7 +632,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
}

// sends the final result to the client.
sp.LogFields(otlog.String("msg", "sending the final result to the client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending the final result to the client")))
err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{
Result: &ingestv1.MergeProfilesStacktracesResult{
Format: ingestv1.StacktracesMergeFormat_MERGE_FORMAT_TREE,
Expand All @@ -702,8 +650,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
}

func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesLabels")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesLabels")
defer sp.End()

r, err := stream.Receive()
if err != nil {
Expand All @@ -719,13 +667,12 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
request := r.Request
by := r.By
sort.Strings(by)
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
otlog.String("by", strings.Join(by, ",")),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID),
attribute.String("by", strings.Join(by, ","))))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
Expand Down Expand Up @@ -796,8 +743,8 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
}

func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesPprof")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesPprof")
defer sp.End()

r, err := stream.Receive()
if err != nil {
Expand All @@ -811,12 +758,11 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
}
request := r.Request
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID)))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
Expand Down Expand Up @@ -942,8 +888,9 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
}

func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - Block")
defer sp.End()

if err := b.Open(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -988,26 +935,26 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params
}

var (
buf [][]parquet.Value
joinIters []query.Iterator
buf [][]parquet.Value
)

pIt := query.NewBinaryJoinIterator(
0,
b.profiles.columnIter(ctx, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
)

if b.meta.Version >= 2 {
joinIters = []query.Iterator{
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
pIt = query.NewBinaryJoinIterator(
0,
pIt,
b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"),
}
)
buf = make([][]parquet.Value, 3)
} else {
joinIters = []query.Iterator{
b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"),
b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"),
}
buf = make([][]parquet.Value, 2)
}

pIt := query.NewJoinIterator(0, joinIters, nil)
iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef))
defer pIt.Close()

Expand Down Expand Up @@ -1098,9 +1045,9 @@ func (q *singleBlockQuerier) openFiles(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - open")
defer func() {
q.metrics.blockOpeningLatency.Observe(time.Since(start).Seconds())
sp.LogFields(
otlog.String("block_ulid", q.meta.ULID.String()),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("block_ulid", q.meta.ULID.String())))

sp.Finish()
}()
g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -1206,7 +1153,7 @@ func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string,
return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath()))
}
ctx = query.AddMetricsToContext(ctx, r.metrics.query)
return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
return query.NewSyncIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias)
}

func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] {
Expand Down
1 change: 0 additions & 1 deletion pkg/phlaredb/block_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,4 @@ func TestBlockCompatability(t *testing.T) {
})

}

}
14 changes: 8 additions & 6 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"github.com/google/pprof/profile"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -533,8 +535,8 @@ func (h *Head) Queriers() Queriers {

// add the location IDs to the stacktraces
func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stacktracesByMapping) *ingestv1.MergeProfilesStacktracesResult {
sp, _ := opentracing.StartSpanFromContext(ctx, "resolveStacktraces - Head")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolveStacktraces - Head")
defer sp.End()

names := []string{}
functions := map[int64]int{}
Expand All @@ -548,7 +550,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
h.strings.lock.RUnlock()
}()

sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building MergeProfilesStacktracesResult")))
_ = stacktracesByMapping.ForEach(
func(mapping uint64, stacktraceSamples stacktraceSampleMap) error {
mp, ok := h.symbolDB.MappingReader(mapping)
Expand Down Expand Up @@ -595,8 +597,8 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
}

func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSampleByMapping) *profile.Profile {
sp, _ := opentracing.StartSpanFromContext(ctx, "resolvePprof - Head")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolvePprof - Head")
defer sp.End()

locations := map[int32]*profile.Location{}
functions := map[uint64]*profile.Function{}
Expand Down
Loading