Skip to content

Commit

Permalink
[api_v3][query] Change api_v3 grpc handler to use query service v2 (#…
Browse files Browse the repository at this point in the history
…6452)

## Which problem is this PR solving?
- Towards #6337

## Description of the changes
- This PR migrates the v3_api GRPC handler to use the new v2 query
service.

## How was this change tested?
- CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Jan 1, 2025
1 parent e3a883e commit 2fe9e21
Show file tree
Hide file tree
Showing 14 changed files with 352 additions and 53 deletions.
6 changes: 5 additions & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -171,6 +172,7 @@ by default uses only in-memory database.`,
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
qOpts.BuildQueryServiceOptionsV2(storageFactory, logger),
traceReader, dependencyReader, metricsQueryService,
tm, queryTelset,
)
Expand Down Expand Up @@ -218,15 +220,17 @@ func startQuery(
svc *flags.Service,
qOpts *queryApp.QueryOptions,
queryOpts *querysvc.QueryServiceOptions,
v2QueryOpts *v2querysvc.QueryServiceOptions,
traceReader tracestore.Reader,
depReader depstore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Settings,
) *queryApp.Server {
qs := querysvc.NewQueryService(traceReader, depReader, *queryOpts)
v2qs := v2querysvc.NewQueryService(traceReader, depReader, *v2QueryOpts)

server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
server, err := queryApp.NewServer(context.Background(), qs, v2qs, metricsQueryService, qOpts, tm, telset)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
Expand Down
22 changes: 20 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/storage/metricstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var (
Expand Down Expand Up @@ -91,11 +93,13 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
}

var opts querysvc.QueryServiceOptions
var v2opts v2querysvc.QueryServiceOptions
// TODO archive storage still uses v1 factory
if err := s.addArchiveStorage(&opts, host); err != nil {
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
}
qs := querysvc.NewQueryService(traceReader, depReader, opts)
v2qs := v2querysvc.NewQueryService(traceReader, depReader, v2opts)

mqs, err := s.createMetricReader(host)
if err != nil {
Expand All @@ -108,6 +112,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
ctx,
// TODO propagate healthcheck updates up to the collector's runtime
qs,
v2qs,
mqs,
&s.config.QueryOptions,
tm,
Expand All @@ -125,7 +130,11 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return nil
}

func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host component.Host) error {
func (s *server) addArchiveStorage(
opts *querysvc.QueryServiceOptions,
v2opts *v2querysvc.QueryServiceOptions,
host component.Host,
) error {
if s.config.Storage.TracesArchive == "" {
s.telset.Logger.Info("Archive storage not configured")
return nil
Expand All @@ -139,6 +148,15 @@ func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host comp
if !opts.InitArchiveStorage(f, s.telset.Logger) {
s.telset.Logger.Info("Archive storage not initialized")
}

ar, aw := v1adapter.InitializeArchiveStorage(f, s.telset.Logger)
if ar != nil && aw != nil {
v2opts.ArchiveTraceReader = ar
v2opts.ArchiveTraceWriter = aw
} else {
s.telset.Logger.Info("Archive storage not initialized")
}

return nil
}

Expand Down
36 changes: 34 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
Expand Down Expand Up @@ -62,6 +63,14 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (fakeFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return &spanstoremocks.Reader{}, nil
}

func (fakeFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error {
if ff.name == "need-initialize-error" {
return errors.New("test-error")
Expand Down Expand Up @@ -95,7 +104,13 @@ var _ jaegerstorage.Extension = (*fakeStorageExt)(nil)
func (fakeStorageExt) TraceStorageFactory(name string) (storage.Factory, bool) {
if name == "need-factory-error" {
return nil, false
} else if name == "no-archive" {
f := fakeFactory{name: name}
return struct {
storage.Factory
}{f}, true
}

return fakeFactory{name: name}, true
}

Expand Down Expand Up @@ -257,6 +272,7 @@ func TestServerAddArchiveStorage(t *testing.T) {
tests := []struct {
name string
qSvcOpts *querysvc.QueryServiceOptions
v2qSvcOpts *v2querysvc.QueryServiceOptions
config *Config
extension component.Component
expectedOutput string
Expand All @@ -266,6 +282,7 @@ func TestServerAddArchiveStorage(t *testing.T) {
name: "Archive storage unset",
config: &Config{},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
expectedOutput: `{"level":"info","msg":"Archive storage not configured"}` + "\n",
expectedErr: "",
},
Expand All @@ -277,21 +294,36 @@ func TestServerAddArchiveStorage(t *testing.T) {
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
expectedOutput: "",
expectedErr: "cannot find archive storage factory: cannot find extension",
},
{
name: "Archive storage not supported",
config: &Config{
Storage: Storage{
TracesArchive: "badger",
TracesArchive: "no-archive",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Archive storage not supported by the factory",
expectedErr: "",
},
{
name: "Archive storage supported",
config: &Config{
Storage: Storage{
TracesArchive: "some-archive-storage",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "",
expectedErr: "",
},
}

for _, tt := range tests {
Expand All @@ -306,7 +338,7 @@ func TestServerAddArchiveStorage(t *testing.T) {
if tt.extension != nil {
host = storagetest.NewStorageHost().WithExtension(jaegerstorage.ID, tt.extension)
}
err := server.addArchiveStorage(tt.qSvcOpts, host)
err := server.addArchiveStorage(tt.qSvcOpts, tt.v2qSvcOpts, host)
if tt.expectedErr == "" {
require.NoError(t, err)
} else {
Expand Down
71 changes: 41 additions & 30 deletions cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

// Handler implements api_v3.QueryServiceServer
Expand All @@ -32,21 +34,18 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return fmt.Errorf("malform trace ID: %w", err)
}

query := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
query := querysvc.GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{
{
TraceID: traceID.ToOTELTraceID(),
Start: request.GetStartTime(),
End: request.GetEndTime(),
},
},
RawTraces: request.GetRawTraces(),
}
trace, err := h.QueryService.GetTrace(stream.Context(), query)
if err != nil {
return fmt.Errorf("cannot retrieve trace: %w", err)
}
td := modelToOTLP(trace.GetSpans())
tracesData := api_v3.TracesData(td)
return stream.Send(&tracesData)
getTracesIter := h.QueryService.GetTraces(stream.Context(), query)
return receiveTraces(getTracesIter, stream.Send)
}

// FindTraces implements api_v3.QueryServiceServer's FindTraces
Expand All @@ -69,8 +68,8 @@ func (h *Handler) internalFindTraces(
return errors.New("start time min and max are required parameters")
}

queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
queryParams := querysvc.TraceQueryParams{
TraceQueryParams: tracestore.TraceQueryParams{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
Expand All @@ -91,19 +90,8 @@ func (h *Handler) internalFindTraces(
queryParams.DurationMax = d
}

traces, err := h.QueryService.FindTraces(ctx, queryParams)
if err != nil {
return err
}
for _, t := range traces {
td := modelToOTLP(t.GetSpans())
tracesData := api_v3.TracesData(td)
if err := streamSend(&tracesData); err != nil {
return status.Error(codes.Internal,
fmt.Sprintf("failed to send response stream chunk to client: %v", err))
}
}
return nil
findTracesIter := h.QueryService.FindTraces(ctx, queryParams)
return receiveTraces(findTracesIter, streamSend)
}

// GetServices implements api_v3.QueryServiceServer's GetServices
Expand All @@ -119,7 +107,7 @@ func (h *Handler) GetServices(ctx context.Context, _ *api_v3.GetServicesRequest)

// GetOperations implements api_v3.QueryService's GetOperations
func (h *Handler) GetOperations(ctx context.Context, request *api_v3.GetOperationsRequest) (*api_v3.GetOperationsResponse, error) {
operations, err := h.QueryService.GetOperations(ctx, spanstore.OperationQueryParameters{
operations, err := h.QueryService.GetOperations(ctx, tracestore.OperationQueryParams{
ServiceName: request.GetService(),
SpanKind: request.GetSpanKind(),
})
Expand All @@ -137,3 +125,26 @@ func (h *Handler) GetOperations(ctx context.Context, request *api_v3.GetOperatio
Operations: apiOperations,
}, nil
}

func receiveTraces(
seq iter.Seq2[[]ptrace.Traces, error],
sendFn func(*api_v3.TracesData) error,
) error {
var capturedErr error
seq(func(traces []ptrace.Traces, err error) bool {
if err != nil {
capturedErr = err
return false
}
for _, trace := range traces {
tracesData := api_v3.TracesData(trace)
if err := sendFn(&tracesData); err != nil {
capturedErr = status.Error(codes.Internal,
fmt.Sprintf("failed to send response stream chunk to client: %v", err))
return false
}
}
return true
})
return capturedErr
}
2 changes: 1 addition & 1 deletion cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
Expand Down
19 changes: 19 additions & 0 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2adjuster "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const (
Expand Down Expand Up @@ -147,6 +150,22 @@ func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseF
return opts
}

func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(storageFactory storage.BaseFactory, logger *zap.Logger) *v2querysvc.QueryServiceOptions {
opts := &v2querysvc.QueryServiceOptions{}

ar, aw := v1adapter.InitializeArchiveStorage(storageFactory, logger)
if ar != nil && aw != nil {
opts.ArchiveTraceReader = ar
opts.ArchiveTraceWriter = aw
} else {
logger.Info("Archive storage not initialized")
}

opts.Adjuster = v2adjuster.Sequence(v2adjuster.StandardAdjusters(qOpts.MaxClockSkewAdjust)...)

return opts
}

// stringSliceAsHeader parses a slice of strings and returns a http.Header.
// Each string in the slice is expected to be in the format "key: value"
func stringSliceAsHeader(slice []string) (http.Header, error) {
Expand Down
Loading

0 comments on commit 2fe9e21

Please sign in to comment.