Skip to content

Commit

Permalink
Merge pull request #369 from xuqingyun/git-xqy/search_from_extension
Browse files Browse the repository at this point in the history
feat(frontend): search spans from extension tracers when object name is specified
  • Loading branch information
SOF3 authored Oct 14, 2024
2 parents f8d189a + e6c150f commit c3cea2f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 4 deletions.
69 changes: 65 additions & 4 deletions pkg/frontend/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace
return nil, err
}

fetchFromExtension := false

if len(tts) == 0 {
// searched object has no events during this period,
// try to discover any possible spans from extensions.
if query.Tags["resource"] != "" && query.Tags["name"] != "" {
tts = append(tts, &jaegerbackend.TraceThumbnail{
Identifier: nil,
Spans: tftree.NewSpanTree(fakeObjectSpans(query)),
})
fetchFromExtension = true
}
}

twmList := make([]merge.TraceWithMetadata[any], len(tts))
for i, tt := range tts {
twmList[i] = merge.TraceWithMetadata[any]{
Expand Down Expand Up @@ -245,14 +259,24 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace
for _, mergeTree := range mergeTrees {
cacheId := generateCacheId(config.Id)

var cachedSpans []*model.Span
if fetchFromExtension {
rootSpan, _ := tftree.CopySpan(mergeTree.Tree.Root)
cachedSpans = []*model.Span{rootSpan}
}

trace, extensionCache, err := reader.prepareEntry(ctx, rootKey, query, mergeTree.Tree, cacheId)
if err != nil {
return nil, err
}

if fetchFromExtension && len(trace.Spans) <= 1 {
continue
}

traces = append(traces, trace)

cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache)
cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache, cachedSpans)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -318,6 +342,7 @@ func (reader *spanReader) prepareCache(
identifiers []any,
cacheId model.TraceID,
extensionCache []tracecache.ExtensionCache,
cachedSpans []*model.Span,
) (tracecache.Entry, error) {
identifiersJson := make([]json.RawMessage, len(identifiers))
for i, identifier := range identifiers {
Expand All @@ -336,6 +361,7 @@ func (reader *spanReader) prepareCache(
StartTime: query.StartTimeMin,
EndTime: query.StartTimeMax,
RootObject: rootKey,
Spans: cachedSpans,
},
}
if reader.options.cacheExtensions {
Expand All @@ -360,6 +386,9 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) (

traces := make([]merge.TraceWithMetadata[struct{}], 0, len(entry.Identifiers))
for _, identifier := range entry.Identifiers {
if identifier == nil || string(identifier) == "null" {
continue
}
trace, err := reader.Backend.Get(ctx, identifier, cacheId, entry.StartTime, entry.EndTime)
if err != nil {
return nil, fmt.Errorf("cannot fetch trace pointed by the cache: %w", err)
Expand Down Expand Up @@ -394,15 +423,21 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) (
}

// if spans were connected, they should continue to be connected since link spans cannot be deleted, so assume there is only one trace
if len(mergedTrees) != 1 {
if len(traces) != 0 && len(mergedTrees) != 1 {
return nil, fmt.Errorf("inconsistent linked trace count %d", len(mergedTrees))
}
mergedTree := mergedTrees[0]

aggTrace := &model.Trace{
ProcessMap: []model.Trace_ProcessMapping{{
ProcessID: "0",
}},
Spans: mergedTree.Tree.GetSpans(),
}

if len(mergedTrees) > 0 {
aggTrace.Spans = mergedTrees[0].Tree.GetSpans()
} else {
spans, _ := tftree.CopySpans(entry.Spans)
aggTrace.Spans = spans
}

var extensions transform.ExtensionProcessor = &transform.FetchExtensionsAndStoreCache{}
Expand Down Expand Up @@ -533,3 +568,29 @@ func mergeListWithBackend[M any](backend jaegerbackend.Backend, convertMetadata
return twmList, nil
}
}

func fakeObjectSpans(query *spanstore.TraceQueryParameters) []*model.Span {
tags := []model.KeyValue{
model.String(zconstants.TraceSource, zconstants.TraceSourceObject),
model.String(zconstants.PseudoType, string(zconstants.PseudoTypeObject)),
}
for tagKey, tagVal := range query.Tags {
tags = append(tags, model.String(tagKey, tagVal))
}

for _, requiredKey := range []string{"cluster", "group", "version", "resource", "namespace", "name"} {
if _, ok := query.Tags[requiredKey]; !ok {
tags = append(tags, model.String(requiredKey, query.Tags[requiredKey]))
}
}

return []*model.Span{{
SpanID: model.SpanID(1),
Flags: 0,
StartTime: query.StartTimeMin,
Duration: query.StartTimeMax.Sub(query.StartTimeMin),
Process: &model.Process{ServiceName: fmt.Sprintf("%s %s", query.Tags["cluster"], query.Tags["resource"])},
ProcessID: "0",
Tags: tags,
}}
}
14 changes: 14 additions & 0 deletions pkg/frontend/tf/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ func (tree *SpanTree) Clone() (*SpanTree, error) {
return NewSpanTree(copiedSpans), nil
}

func CopySpans(spans []*model.Span) ([]*model.Span, error) {
copiedSpans := make([]*model.Span, 0, len(spans))
for _, span := range spans {
spanCopy, err := CopySpan(span)
if err != nil {
return nil, err
}

copiedSpans = append(copiedSpans, spanCopy)
}

return copiedSpans, nil
}

func CopySpan(span *model.Span) (*model.Span, error) {
spanJson, err := json.Marshal(span)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/tracecache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type EntryValue struct {
RootObject *utilobject.Key `json:"rootObject"`

Extensions []ExtensionCache `json:"extensions"`
Spans []*model.Span `json:"spans"`
}

type ExtensionCache struct {
Expand Down

0 comments on commit c3cea2f

Please sign in to comment.