Skip to content

Commit

Permalink
Query simplified
Browse files Browse the repository at this point in the history
Signed-off-by: Manik2708 <[email protected]>
  • Loading branch information
Manik2708 committed Jan 5, 2025
1 parent e6d0a69 commit 286b17f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 110 deletions.
1 change: 1 addition & 0 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type SearchService interface {
Aggregation(name string, aggregation elastic.Aggregation) SearchService
IgnoreUnavailable(ignoreUnavailable bool) SearchService
Query(query elastic.Query) SearchService
FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) SearchService
Do(ctx context.Context) (*elastic.SearchResult, error)
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/es/mocks/SearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ func (s SearchServiceWrapper) Do(ctx context.Context) (*elastic.SearchResult, er
return s.searchService.Do(ctx)
}

func (s SearchServiceWrapper) FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) es.SearchService {
return WrapESSearchService(s.searchService.FetchSourceContext(fetchSourceContext))
}

// MultiSearchServiceWrapper is a wrapper around elastic.ESMultiSearchService
type MultiSearchServiceWrapper struct {
multiSearchService *elastic.MultiSearchService
Expand Down
9 changes: 2 additions & 7 deletions plugin/storage/es/spanstore/internal/dbmodel/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,9 @@ type KeyValue struct {
Value any `json:"value"`
}

// Service is the JSON struct for service:operation documents in ElasticSearch
// Service is the JSON struct for service:kind:operation documents in ElasticSearch
type Service struct {
ServiceName string `json:"serviceName"`
Kind string `json:"spanKind,omitempty"`
OperationName string `json:"operationName"`
}

// ServiceWithKind is the JSON struct service:kind:operation documents in ElasticSearch
type ServiceWithKind struct {
Service
Kind string `json:"spanKind"`
}
69 changes: 43 additions & 26 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,27 +645,7 @@ func testGetWithKind(typ string, t *testing.T, testKind bool) {
goodAggregations[typ] = (*json.RawMessage)(&rawMessage)
var filterRawMessage json.RawMessage
if typ == operationsAggregation {
if !testKind {
filterRawMessage = json.RawMessage(`
{
"buckets": {
"distinct_operations_without_kind": {
"doc_count": 1,
"operationName": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "123",
"doc_count": 16
}
]
}
}
}
}
`)
} else {
if testKind {
filterRawMessage = rawMessage
}
goodAggregations[typ] = &filterRawMessage
Expand All @@ -687,14 +667,19 @@ func testGetWithKind(typ string, t *testing.T, testKind bool) {
return "search services failed: Search failure"
},
},
{
}

if (typ == operationsAggregation && testKind) || (typ != operationsAggregation) {
testCase := testGetStruct{
caption: typ + " search error",
searchResult: &elastic.SearchResult{Aggregations: elastic.Aggregations(badAggregations)},
expectedError: func() string {
return "could not find aggregation of " + typ
},
},
}
testCases = append(testCases, testCase)
}

if testKind {
testCases = append(testCases, testGetStruct{
caption: typ + " full behavior with kind",
Expand All @@ -707,7 +692,38 @@ func testGetWithKind(typ string, t *testing.T, testKind bool) {
return ""
},
})
} else {
}

if typ == operationsAggregation && !testKind {
score := 0.6931471
msg := json.RawMessage(`{"operationName": "123"}`)
hitModel := &elastic.SearchHits{
TotalHits: 1,
MaxScore: &score,
Hits: []*elastic.SearchHit{
{
Score: &score,
SeqNo: nil,
Id: "e232b0fbe5cebc85",
PrimaryTerm: nil,
Source: &msg,
},
},
}
testCases = append(testCases, testGetStruct{
caption: typ + " full behavior",
searchResult: &elastic.SearchResult{Hits: hitModel},
expectedOutput: map[string]any{
operationsAggregation: []spanstore.Operation{{Name: "123"}},
"default": []string{"123"},
},
expectedError: func() string {
return ""
},
})
}

if typ != operationsAggregation {
testCases = append(testCases, testGetStruct{
caption: typ + " full behavior",
searchResult: &elastic.SearchResult{Aggregations: goodAggregations},
Expand Down Expand Up @@ -1076,15 +1092,16 @@ func mockSearchService(r *spanReaderTest) *mock.Call {
func mockSearchServiceWithSpanKind(r *spanReaderTest, inputHasSpanKind bool) *mock.Call {
searchService := &mocks.SearchService{}
searchService.On("Query", mock.Anything).Return(searchService)
searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService)
searchService.On("Size", mock.MatchedBy(func(size int) bool {
return size == 0 // Aggregations apply size (bucket) limits in their own query objects, and do not apply at the parent query level.
})).Return(searchService)
searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService)
searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService)
if inputHasSpanKind {
searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService)
} else {
searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.AnythingOfType("*elastic.FiltersAggregation")).Return(searchService)
searchService.On("FetchSourceContext", mock.AnythingOfType("*elastic.FetchSourceContext"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService)
searchService.On("Size", mock.AnythingOfType("int")).Return(searchService)
}
searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService)
r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService)
Expand Down
99 changes: 23 additions & 76 deletions plugin/storage/es/spanstore/service_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const (
serviceName = "serviceName"
spanKind = "spanKind"
operationsAggregation = "distinct_operations"
operationsWithoutKind = "distinct_operations_without_kind"
servicesAggregation = "distinct_services"
)

Expand Down Expand Up @@ -58,27 +57,16 @@ func NewServiceOperationStorage(

// Write saves a service to operation pair.
func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span, kind model.SpanKind) {
// Insert serviceName:operationName document
// Insert serviceName:kind:operationName document
service := dbmodel.Service{
ServiceName: jsonSpan.Process.ServiceName,
OperationName: jsonSpan.OperationName,
Kind: string(kind),
}
if kind != model.SpanKindUnspecified {
serviceWithKind := dbmodel.ServiceWithKind{
Service: service,
Kind: string(kind),
}
cacheKey := hashCodeWithKind(serviceWithKind)
if !keyInCache(cacheKey, s.serviceCache) {
s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(serviceWithKind).Add()
writeCache(cacheKey, s.serviceCache)
}
} else {
cacheKey := hashCode(service)
if !keyInCache(cacheKey, s.serviceCache) {
s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add()
writeCache(cacheKey, s.serviceCache)
}
cacheKey := hashCode(service)
if !keyInCache(cacheKey, s.serviceCache) {
s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add()
writeCache(cacheKey, s.serviceCache)
}
}

Expand Down Expand Up @@ -137,32 +125,20 @@ func (s *ServiceOperationStorage) getOperations(ctx context.Context, indices []s
operationNamesBucket := bucket.Buckets
return bucketOfOperationNamesToOperationsArray(operationNamesBucket, kind)
}
serviceFilter := elastic.NewFiltersAggregation().
FilterWithName(string(model.SpanKindClient), elastic.NewTermQuery(spanKind, string(model.SpanKindClient))).
FilterWithName(string(model.SpanKindServer), elastic.NewTermQuery(spanKind, string(model.SpanKindServer))).
FilterWithName(string(model.SpanKindProducer), elastic.NewTermQuery(spanKind, string(model.SpanKindProducer))).
FilterWithName(string(model.SpanKindConsumer), elastic.NewTermQuery(spanKind, string(model.SpanKindConsumer))).
FilterWithName(string(model.SpanKindInternal), elastic.NewTermQuery(spanKind, string(model.SpanKindInternal))).
FilterWithName(operationsWithoutKind, elastic.NewBoolQuery().MustNot(elastic.NewExistsQuery(spanKind))).
SubAggregation(operationNameField, elastic.NewTermsAggregation().Field(operationNameField).Size(maxDocCount))
serviceQuery := elastic.NewTermQuery(serviceName, service)
searchService = s.client().Search(indices...).
Size(0).
Query(serviceQuery).
IgnoreUnavailable(true).
Aggregation(operationsAggregation, serviceFilter)
FetchSourceContext(elastic.NewFetchSourceContext(true).Include(spanKind, operationNameField)).
Size(maxDocCount)
searchResult, err := searchService.Do(ctx)
if err != nil {
return nil, fmt.Errorf("search operations failed: %w", es.DetailedError(err))
}
if searchResult.Aggregations == nil {
if searchResult.Hits == nil {
return []spanstore.Operation{}, nil
}
bucket, found := searchResult.Aggregations.Filters(operationsAggregation)
if !found {
return nil, errors.New("could not find aggregation of " + operationsAggregation)
}
return bucketOfOperationsToOperationsArray(bucket)
return bucketOfOperationsToOperationsArray(searchResult.Hits)
}

func getOperationsAggregation(maxDocCount int) elastic.Query {
Expand All @@ -186,57 +162,28 @@ func bucketOfOperationNamesToOperationsArray(buckets []*elastic.AggregationBucke
return result, nil
}

func bucketOfOperationsToOperationsArray(searchResult *elastic.AggregationBucketFilters) ([]spanstore.Operation, error) {
var result []spanstore.Operation
for name, bucket := range searchResult.NamedBuckets {
if kind, err := model.SpanKindFromString(name); err == nil {
if v, ok := bucket.Aggregations[operationNameField]; ok {
result, err = addOperationsFromRawData(v, string(kind), result)
if err != nil {
return nil, err
}
}
} else {
if name == operationsWithoutKind {
if v, ok := bucket.Aggregations[operationNameField]; ok {
result, err = addOperationsFromRawData(v, "", result)
if err != nil {
return nil, err
}
}
}
func bucketOfOperationsToOperationsArray(searchResult *elastic.SearchHits) ([]spanstore.Operation, error) {
result := make([]spanstore.Operation, len(searchResult.Hits))
for i, hit := range searchResult.Hits {
data := hit.Source
op, err := rawMessageToOperation(data)
if err != nil {
return nil, err
}
result[i] = op
}
return result, nil
}

func addOperationsFromRawData(raw *json.RawMessage, kind string, result []spanstore.Operation) ([]spanstore.Operation, error) {
var items *elastic.AggregationBucketKeyItems
err := json.Unmarshal(*raw, &items)
if err != nil {
return nil, err
func rawMessageToOperation(data *json.RawMessage) (spanstore.Operation, error) {
var service dbmodel.Service
if err := json.Unmarshal(*data, &service); err != nil {
return spanstore.Operation{}, err
}
for _, item := range items.Buckets {
str, ok := item.Key.(string)
if !ok {
return nil, errors.New("non-string key found in aggregation")
}
result = append(result, spanstore.Operation{
Name: str,
SpanKind: kind,
})
}
return result, nil
return spanstore.Operation{Name: service.OperationName, SpanKind: service.Kind}, nil
}

func hashCode(s dbmodel.Service) string {
h := fnv.New64a()
h.Write([]byte(s.ServiceName))
h.Write([]byte(s.OperationName))
return strconv.FormatUint(h.Sum64(), 16)
}

func hashCodeWithKind(s dbmodel.ServiceWithKind) string {
h := fnv.New64a()
h.Write([]byte(s.ServiceName))
h.Write([]byte(s.Kind))
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/spanstore/service_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestWriteServiceWithKind(t *testing.T) {
indexService.On("Index", stringMatcher(indexName)).Return(indexService)
indexService.On("Type", stringMatcher(serviceType)).Return(indexService)
indexService.On("Id", stringMatcher(serviceHash)).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("dbmodel.ServiceWithKind")).Return(indexService)
indexService.On("BodyJson", mock.AnythingOfType("dbmodel.Service")).Return(indexService)
indexService.On("Add")

w.client.On("Index").Return(indexService)
Expand Down

0 comments on commit 286b17f

Please sign in to comment.