diff --git a/pkg/frontend/reader/reader.go b/pkg/frontend/reader/reader.go index 27a46190..2ae6c723 100644 --- a/pkg/frontend/reader/reader.go +++ b/pkg/frontend/reader/reader.go @@ -205,6 +205,20 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace return nil, err } + fetchFromExtension := false + + if len(tts) == 0 { + // searched object has no events during this period, + // try to discover any possible spans from extensions. + if query.Tags["resource"] != "" && query.Tags["name"] != "" { + tts = append(tts, &jaegerbackend.TraceThumbnail{ + Identifier: nil, + Spans: tftree.NewSpanTree(fakeObjectSpans(query)), + }) + fetchFromExtension = true + } + } + twmList := make([]merge.TraceWithMetadata[any], len(tts)) for i, tt := range tts { twmList[i] = merge.TraceWithMetadata[any]{ @@ -245,14 +259,24 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace for _, mergeTree := range mergeTrees { cacheId := generateCacheId(config.Id) + var cachedSpans []*model.Span + if fetchFromExtension { + rootSpan, _ := tftree.CopySpan(mergeTree.Tree.Root) + cachedSpans = []*model.Span{rootSpan} + } + trace, extensionCache, err := reader.prepareEntry(ctx, rootKey, query, mergeTree.Tree, cacheId) if err != nil { return nil, err } + if fetchFromExtension && len(trace.Spans) <= 1 { + continue + } + traces = append(traces, trace) - cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache) + cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache, cachedSpans) if err != nil { return nil, err } @@ -318,6 +342,7 @@ func (reader *spanReader) prepareCache( identifiers []any, cacheId model.TraceID, extensionCache []tracecache.ExtensionCache, + cachedSpans []*model.Span, ) (tracecache.Entry, error) { identifiersJson := make([]json.RawMessage, len(identifiers)) for i, identifier := range identifiers { @@ -336,6 +361,7 @@ func (reader *spanReader) prepareCache( StartTime: query.StartTimeMin, EndTime: query.StartTimeMax, RootObject: rootKey, + Spans: cachedSpans, }, } if reader.options.cacheExtensions { @@ -360,6 +386,9 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) ( traces := make([]merge.TraceWithMetadata[struct{}], 0, len(entry.Identifiers)) for _, identifier := range entry.Identifiers { + if identifier == nil || string(identifier) == "null" { + continue + } trace, err := reader.Backend.Get(ctx, identifier, cacheId, entry.StartTime, entry.EndTime) if err != nil { return nil, fmt.Errorf("cannot fetch trace pointed by the cache: %w", err) @@ -394,15 +423,21 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) ( } // if spans were connected, they should continue to be connected since link spans cannot be deleted, so assume there is only one trace - if len(mergedTrees) != 1 { + if len(traces) != 0 && len(mergedTrees) != 1 { return nil, fmt.Errorf("inconsistent linked trace count %d", len(mergedTrees)) } - mergedTree := mergedTrees[0] + aggTrace := &model.Trace{ ProcessMap: []model.Trace_ProcessMapping{{ ProcessID: "0", }}, - Spans: mergedTree.Tree.GetSpans(), + } + + if len(mergedTrees) > 0 { + aggTrace.Spans = mergedTrees[0].Tree.GetSpans() + } else { + spans, _ := tftree.CopySpans(entry.Spans) + aggTrace.Spans = spans } var extensions transform.ExtensionProcessor = &transform.FetchExtensionsAndStoreCache{} @@ -533,3 +568,29 @@ func mergeListWithBackend[M any](backend jaegerbackend.Backend, convertMetadata return twmList, nil } } + +func fakeObjectSpans(query *spanstore.TraceQueryParameters) []*model.Span { + tags := []model.KeyValue{ + model.String(zconstants.TraceSource, zconstants.TraceSourceObject), + model.String(zconstants.PseudoType, string(zconstants.PseudoTypeObject)), + } + for tagKey, tagVal := range query.Tags { + tags = append(tags, model.String(tagKey, tagVal)) + } + + for _, requiredKey := range []string{"cluster", "group", "version", "resource", "namespace", "name"} { + if _, ok := query.Tags[requiredKey]; !ok { + tags = append(tags, model.String(requiredKey, query.Tags[requiredKey])) + } + } + + return []*model.Span{{ + SpanID: model.SpanID(1), + Flags: 0, + StartTime: query.StartTimeMin, + Duration: query.StartTimeMax.Sub(query.StartTimeMin), + Process: &model.Process{ServiceName: fmt.Sprintf("%s %s", query.Tags["cluster"], query.Tags["resource"])}, + ProcessID: "0", + Tags: tags, + }} +} diff --git a/pkg/frontend/tf/tree/tree.go b/pkg/frontend/tf/tree/tree.go index dfd2ce61..3ff8125b 100644 --- a/pkg/frontend/tf/tree/tree.go +++ b/pkg/frontend/tf/tree/tree.go @@ -83,6 +83,20 @@ func (tree *SpanTree) Clone() (*SpanTree, error) { return NewSpanTree(copiedSpans), nil } +func CopySpans(spans []*model.Span) ([]*model.Span, error) { + copiedSpans := make([]*model.Span, 0, len(spans)) + for _, span := range spans { + spanCopy, err := CopySpan(span) + if err != nil { + return nil, err + } + + copiedSpans = append(copiedSpans, spanCopy) + } + + return copiedSpans, nil +} + func CopySpan(span *model.Span) (*model.Span, error) { spanJson, err := json.Marshal(span) if err != nil { diff --git a/pkg/frontend/tracecache/interface.go b/pkg/frontend/tracecache/interface.go index 6e061403..5f2590bc 100644 --- a/pkg/frontend/tracecache/interface.go +++ b/pkg/frontend/tracecache/interface.go @@ -48,6 +48,7 @@ type EntryValue struct { RootObject *utilobject.Key `json:"rootObject"` Extensions []ExtensionCache `json:"extensions"` + Spans []*model.Span `json:"spans"` } type ExtensionCache struct {