Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api_v3][query] Change api_v3 grpc handler to use query service v2 #6452

Merged
merged 19 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -171,6 +172,7 @@ by default uses only in-memory database.`,
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
qOpts.BuildQueryServiceOptionsV2(storageFactory, logger),
traceReader, dependencyReader, metricsQueryService,
tm, queryTelset,
)
Expand Down Expand Up @@ -218,15 +220,17 @@ func startQuery(
svc *flags.Service,
qOpts *queryApp.QueryOptions,
queryOpts *querysvc.QueryServiceOptions,
v2QueryOpts *v2querysvc.QueryServiceOptions,
traceReader tracestore.Reader,
depReader depstore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Settings,
) *queryApp.Server {
qs := querysvc.NewQueryService(traceReader, depReader, *queryOpts)
v2qs := v2querysvc.NewQueryService(traceReader, depReader, *v2QueryOpts)

server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
server, err := queryApp.NewServer(context.Background(), qs, v2qs, metricsQueryService, qOpts, tm, telset)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
Expand Down
22 changes: 20 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/storage/metricstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var (
Expand Down Expand Up @@ -91,11 +93,13 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
}

var opts querysvc.QueryServiceOptions
var v2opts v2querysvc.QueryServiceOptions
// TODO archive storage still uses v1 factory
if err := s.addArchiveStorage(&opts, host); err != nil {
if err := s.addArchiveStorage(&opts, &v2opts, host); err != nil {
return err
}
qs := querysvc.NewQueryService(traceReader, depReader, opts)
v2qs := v2querysvc.NewQueryService(traceReader, depReader, v2opts)

mqs, err := s.createMetricReader(host)
if err != nil {
Expand All @@ -108,6 +112,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
ctx,
// TODO propagate healthcheck updates up to the collector's runtime
qs,
v2qs,
mqs,
&s.config.QueryOptions,
tm,
Expand All @@ -125,7 +130,11 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return nil
}

func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host component.Host) error {
func (s *server) addArchiveStorage(
opts *querysvc.QueryServiceOptions,
v2opts *v2querysvc.QueryServiceOptions,
host component.Host,
) error {
if s.config.Storage.TracesArchive == "" {
s.telset.Logger.Info("Archive storage not configured")
return nil
Expand All @@ -139,6 +148,15 @@ func (s *server) addArchiveStorage(opts *querysvc.QueryServiceOptions, host comp
if !opts.InitArchiveStorage(f, s.telset.Logger) {
s.telset.Logger.Info("Archive storage not initialized")
}

ar, aw := v1adapter.InitializeArchiveStorage(f, s.telset.Logger)
if ar != nil && aw != nil {
v2opts.ArchiveTraceReader = ar
v2opts.ArchiveTraceWriter = aw
} else {
s.telset.Logger.Info("Archive storage not initialized")
}

return nil
}

Expand Down
36 changes: 34 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
Expand Down Expand Up @@ -62,6 +63,14 @@ func (ff fakeFactory) CreateSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (fakeFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
return &spanstoremocks.Reader{}, nil
}

func (fakeFactory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return &spanstoremocks.Writer{}, nil
}

func (ff fakeFactory) Initialize(metrics.Factory, *zap.Logger) error {
if ff.name == "need-initialize-error" {
return errors.New("test-error")
Expand Down Expand Up @@ -95,7 +104,13 @@ var _ jaegerstorage.Extension = (*fakeStorageExt)(nil)
func (fakeStorageExt) TraceStorageFactory(name string) (storage.Factory, bool) {
if name == "need-factory-error" {
return nil, false
} else if name == "no-archive" {
f := fakeFactory{name: name}
return struct {
storage.Factory
}{f}, true
}

return fakeFactory{name: name}, true
}

Expand Down Expand Up @@ -257,6 +272,7 @@ func TestServerAddArchiveStorage(t *testing.T) {
tests := []struct {
name string
qSvcOpts *querysvc.QueryServiceOptions
v2qSvcOpts *v2querysvc.QueryServiceOptions
config *Config
extension component.Component
expectedOutput string
Expand All @@ -266,6 +282,7 @@ func TestServerAddArchiveStorage(t *testing.T) {
name: "Archive storage unset",
config: &Config{},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
expectedOutput: `{"level":"info","msg":"Archive storage not configured"}` + "\n",
expectedErr: "",
},
Expand All @@ -277,21 +294,36 @@ func TestServerAddArchiveStorage(t *testing.T) {
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
expectedOutput: "",
expectedErr: "cannot find archive storage factory: cannot find extension",
},
{
name: "Archive storage not supported",
config: &Config{
Storage: Storage{
TracesArchive: "badger",
TracesArchive: "no-archive",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "Archive storage not supported by the factory",
expectedErr: "",
},
{
name: "Archive storage supported",
config: &Config{
Storage: Storage{
TracesArchive: "some-archive-storage",
},
},
qSvcOpts: &querysvc.QueryServiceOptions{},
v2qSvcOpts: &v2querysvc.QueryServiceOptions{},
extension: fakeStorageExt{},
expectedOutput: "",
expectedErr: "",
},
}

for _, tt := range tests {
Expand All @@ -306,7 +338,7 @@ func TestServerAddArchiveStorage(t *testing.T) {
if tt.extension != nil {
host = storagetest.NewStorageHost().WithExtension(jaegerstorage.ID, tt.extension)
}
err := server.addArchiveStorage(tt.qSvcOpts, host)
err := server.addArchiveStorage(tt.qSvcOpts, tt.v2qSvcOpts, host)
if tt.expectedErr == "" {
require.NoError(t, err)
} else {
Expand Down
71 changes: 41 additions & 30 deletions cmd/query/app/apiv3/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"errors"
"fmt"

"go.opentelemetry.io/collector/pdata/ptrace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

// Handler implements api_v3.QueryServiceServer
Expand All @@ -32,21 +34,18 @@ func (h *Handler) GetTrace(request *api_v3.GetTraceRequest, stream api_v3.QueryS
return fmt.Errorf("malform trace ID: %w", err)
}

query := querysvc.GetTraceParameters{
GetTraceParameters: spanstore.GetTraceParameters{
TraceID: traceID,
StartTime: request.GetStartTime(),
EndTime: request.GetEndTime(),
query := querysvc.GetTraceParams{
TraceIDs: []tracestore.GetTraceParams{
{
TraceID: traceID.ToOTELTraceID(),
Start: request.GetStartTime(),
End: request.GetEndTime(),
},
},
RawTraces: request.GetRawTraces(),
}
trace, err := h.QueryService.GetTrace(stream.Context(), query)
if err != nil {
return fmt.Errorf("cannot retrieve trace: %w", err)
}
td := modelToOTLP(trace.GetSpans())
tracesData := api_v3.TracesData(td)
return stream.Send(&tracesData)
getTracesIter := h.QueryService.GetTraces(stream.Context(), query)
return receiveTraces(getTracesIter, stream.Send)
}

// FindTraces implements api_v3.QueryServiceServer's FindTraces
Expand All @@ -69,8 +68,8 @@ func (h *Handler) internalFindTraces(
return errors.New("start time min and max are required parameters")
}

queryParams := &querysvc.TraceQueryParameters{
TraceQueryParameters: spanstore.TraceQueryParameters{
queryParams := querysvc.TraceQueryParams{
TraceQueryParams: tracestore.TraceQueryParams{
ServiceName: query.GetServiceName(),
OperationName: query.GetOperationName(),
Tags: query.GetAttributes(),
Expand All @@ -91,19 +90,8 @@ func (h *Handler) internalFindTraces(
queryParams.DurationMax = d
}

traces, err := h.QueryService.FindTraces(ctx, queryParams)
if err != nil {
return err
}
for _, t := range traces {
td := modelToOTLP(t.GetSpans())
tracesData := api_v3.TracesData(td)
if err := streamSend(&tracesData); err != nil {
return status.Error(codes.Internal,
fmt.Sprintf("failed to send response stream chunk to client: %v", err))
}
}
return nil
findTracesIter := h.QueryService.FindTraces(ctx, queryParams)
return receiveTraces(findTracesIter, streamSend)
}

// GetServices implements api_v3.QueryServiceServer's GetServices
Expand All @@ -119,7 +107,7 @@ func (h *Handler) GetServices(ctx context.Context, _ *api_v3.GetServicesRequest)

// GetOperations implements api_v3.QueryService's GetOperations
func (h *Handler) GetOperations(ctx context.Context, request *api_v3.GetOperationsRequest) (*api_v3.GetOperationsResponse, error) {
operations, err := h.QueryService.GetOperations(ctx, spanstore.OperationQueryParameters{
operations, err := h.QueryService.GetOperations(ctx, tracestore.OperationQueryParams{
ServiceName: request.GetService(),
SpanKind: request.GetSpanKind(),
})
Expand All @@ -137,3 +125,26 @@ func (h *Handler) GetOperations(ctx context.Context, request *api_v3.GetOperatio
Operations: apiOperations,
}, nil
}

func receiveTraces(
seq iter.Seq2[[]ptrace.Traces, error],
sendFn func(*api_v3.TracesData) error,
) error {
var capturedErr error
seq(func(traces []ptrace.Traces, err error) bool {
if err != nil {
capturedErr = err
return false
}
for _, trace := range traces {
tracesData := api_v3.TracesData(trace)
if err := sendFn(&tracesData); err != nil {
capturedErr = status.Error(codes.Internal,
fmt.Sprintf("failed to send response stream chunk to client: %v", err))
return false
}
}
return true
})
return capturedErr
}
2 changes: 1 addition & 1 deletion cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/proto/api_v3"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
Expand Down
19 changes: 19 additions & 0 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2adjuster "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const (
Expand Down Expand Up @@ -147,6 +150,22 @@ func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseF
return opts
}

func (qOpts *QueryOptions) BuildQueryServiceOptionsV2(storageFactory storage.BaseFactory, logger *zap.Logger) *v2querysvc.QueryServiceOptions {
opts := &v2querysvc.QueryServiceOptions{}

ar, aw := v1adapter.InitializeArchiveStorage(storageFactory, logger)
if ar != nil && aw != nil {
opts.ArchiveTraceReader = ar
opts.ArchiveTraceWriter = aw
} else {
logger.Info("Archive storage not initialized")
}

opts.Adjuster = v2adjuster.Sequence(v2adjuster.StandardAdjusters(qOpts.MaxClockSkewAdjust)...)

return opts
}

// stringSliceAsHeader parses a slice of strings and returns a http.Header.
// Each string in the slice is expected to be in the format "key: value"
func stringSliceAsHeader(slice []string) (http.Header, error) {
Expand Down
Loading
Loading