Skip to content

Commit

Permalink
[v2][storage] Implement read path for v2 storage interface (jaegertra…
Browse files Browse the repository at this point in the history
…cing#6170)

## Which problem is this PR solving?
- Towards jaegertracing#5079

## Description of the changes
- Implemented the read path for the v2 storage interface. This path
currently just wraps a v1 span reader and exposes a static method to
access the v1 reader.
- Change the jaeger query extension to initialize a v2 storage factory
and obtain the v1 span reader from it.
- This will unblock the development of more efficient v2 storage
implementations, like ClickHouse.

## How was this change tested?
- Added unit tests for new functionality

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
3 people authored Dec 6, 2024
1 parent c367225 commit 4f45182
Show file tree
Hide file tree
Showing 18 changed files with 520 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@ packages:
github.com/jaegertracing/jaeger/storage/spanstore:
config:
all: true
github.com/jaegertracing/jaeger/storage_v2/tracestore:
config:
all: true
12 changes: 7 additions & 5 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

// all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store
Expand Down Expand Up @@ -104,7 +105,8 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to init storage factory", zap.Error(err))
}

spanReader, err := storageFactory.CreateSpanReader()
v2Factory := factoryadapter.NewFactory(storageFactory)
traceReader, err := v2Factory.CreateTraceReader()
if err != nil {
logger.Fatal("Failed to create span reader", zap.Error(err))
}
Expand Down Expand Up @@ -168,7 +170,7 @@ by default uses only in-memory database.`,
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
traceReader, dependencyReader, metricsQueryService,
tm, queryTelset,
)

Expand Down Expand Up @@ -215,13 +217,13 @@ func startQuery(
svc *flags.Service,
qOpts *queryApp.QueryOptions,
queryOpts *querysvc.QueryServiceOptions,
spanReader spanstore.Reader,
traceReader tracestore.Reader,
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Settings,
) *queryApp.Server {
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
qs := querysvc.NewQueryService(traceReader, depReader, *queryOpts)

server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -55,11 +56,12 @@ type testServer struct {

func newTestServer(t *testing.T) *testServer {
spanReader := &spanstoremocks.Reader{}
traceReader := factoryadapter.NewTraceReader(spanReader)
metricsReader, err := disabled.NewMetricsReader()
require.NoError(t, err)

q := querysvc.NewQueryService(
spanReader,
traceReader,
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
18 changes: 12 additions & 6 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,32 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
Namespace(metrics.NSOptions{Name: "jaeger"}).
Namespace(metrics.NSOptions{Name: "query"})

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
// TODO currently v1 is still needed because of dependency storage
v1Factory, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot find primary storage %s: %w", s.config.Storage.TracesPrimary, err)
return fmt.Errorf("cannot find v1 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err)
}
f, err := jaegerstorage.GetStorageFactoryV2(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot find v2 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err)
}

spanReader, err := f.CreateSpanReader()
traceReader, err := f.CreateTraceReader()
if err != nil {
return fmt.Errorf("cannot create span reader: %w", err)
return fmt.Errorf("cannot create trace reader: %w", err)
}

depReader, err := f.CreateDependencyReader()
depReader, err := v1Factory.CreateDependencyReader()
if err != nil {
return fmt.Errorf("cannot create dependencies reader: %w", err)
}

var opts querysvc.QueryServiceOptions
// TODO archive storage still uses v1 factory
if err := s.addArchiveStorage(&opts, host); err != nil {
return err
}
qs := querysvc.NewQueryService(spanReader, depReader, opts)
qs := querysvc.NewQueryService(traceReader, depReader, opts)

mqs, err := s.createMetricReader(host)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "need-factory-error",
},
},
expectedErr: "cannot find primary storage",
expectedErr: "cannot find v1 factory for primary storage",
},
{
name: "span reader error",
Expand All @@ -159,7 +159,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "need-span-reader-error",
},
},
expectedErr: "cannot create span reader",
expectedErr: "cannot create trace reader",
},
{
name: "dependency error",
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -57,7 +58,7 @@ func newTestServerClient(t *testing.T) *testServerClient {
}

q := querysvc.NewQueryService(
tsc.reader,
factoryadapter.NewTraceReader(tsc.reader),
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

func setupHTTPGatewayNoServer(
Expand All @@ -35,7 +36,7 @@ func setupHTTPGatewayNoServer(
reader: &spanstoremocks.Reader{},
}

q := querysvc.NewQueryService(gw.reader,
q := querysvc.NewQueryService(factoryadapter.NewTraceReader(gw.reader),
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
5 changes: 3 additions & 2 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -901,7 +902,7 @@ func initializeTenantedTestServerGRPCWithOptions(t *testing.T, tm *tenancy.Manag
require.NoError(t, err)

q := querysvc.NewQueryService(
spanReader,
factoryadapter.NewTraceReader(spanReader),
dependencyReader,
querysvc.QueryServiceOptions{
ArchiveSpanReader: archiveSpanReader,
Expand Down Expand Up @@ -1165,7 +1166,7 @@ func TestNewGRPCHandlerWithEmptyOptions(t *testing.T) {
require.NoError(t, err)

q := querysvc.NewQueryService(
&spanstoremocks.Reader{},
factoryadapter.NewTraceReader(&spanstoremocks.Reader{}),
&depsmocks.Reader{},
querysvc.QueryServiceOptions{})

Expand Down
7 changes: 5 additions & 2 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond)
Expand Down Expand Up @@ -119,7 +120,8 @@ func initializeTestServerWithOptions(
options = append(options, HandlerOptions.Logger(zaptest.NewLogger(t)))
readStorage := &spanstoremocks.Reader{}
dependencyStorage := &depsmocks.Reader{}
qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions)
traceReader := factoryadapter.NewTraceReader(readStorage)
qs := querysvc.NewQueryService(traceReader, dependencyStorage, queryOptions)
r := NewRouter()
apiHandler := NewAPIHandler(qs, options...)
apiHandler.RegisterRoutes(r)
Expand Down Expand Up @@ -198,8 +200,9 @@ func TestLogOnServerError(t *testing.T) {
zapCore, logs := observer.New(zap.InfoLevel)
logger := zap.New(zapCore)
readStorage := &spanstoremocks.Reader{}
traceReader := factoryadapter.NewTraceReader(readStorage)
dependencyStorage := &depsmocks.Reader{}
qs := querysvc.NewQueryService(readStorage, dependencyStorage, querysvc.QueryServiceOptions{})
qs := querysvc.NewQueryService(traceReader, dependencyStorage, querysvc.QueryServiceOptions{})
h := NewAPIHandler(qs, HandlerOptions.Logger(logger))
e := errors.New("test error")
h.handleError(&httptest.ResponseRecorder{}, e, http.StatusInternalServerError)
Expand Down
32 changes: 25 additions & 7 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")
Expand All @@ -40,15 +42,15 @@ type StorageCapabilities struct {

// QueryService contains span utils required by the query-service.
type QueryService struct {
spanReader spanstore.Reader
traceReader tracestore.Reader
dependencyReader dependencystore.Reader
options QueryServiceOptions
}

// NewQueryService returns a new QueryService.
func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService {
func NewQueryService(traceReader tracestore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService {
qsvc := &QueryService{
spanReader: spanReader,
traceReader: traceReader,
dependencyReader: dependencyReader,
options: options,
}
Expand All @@ -61,7 +63,11 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto

// GetTrace is the queryService implementation of spanstore.Reader.GetTrace
func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
trace, err := qs.spanReader.GetTrace(ctx, traceID)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
trace, err := spanReader.GetTrace(ctx, traceID)
if errors.Is(err, spanstore.ErrTraceNotFound) {
if qs.options.ArchiveSpanReader == nil {
return nil, err
Expand All @@ -73,20 +79,32 @@ func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*mo

// GetServices is the queryService implementation of spanstore.Reader.GetServices
func (qs QueryService) GetServices(ctx context.Context) ([]string, error) {
return qs.spanReader.GetServices(ctx)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.GetServices(ctx)
}

// GetOperations is the queryService implementation of spanstore.Reader.GetOperations
func (qs QueryService) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
return qs.spanReader.GetOperations(ctx, query)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.GetOperations(ctx, query)
}

// FindTraces is the queryService implementation of spanstore.Reader.FindTraces
func (qs QueryService) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
return qs.spanReader.FindTraces(ctx, query)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.FindTraces(ctx, query)
}

// ArchiveTrace is the queryService utility to archive traces.
Expand Down
Loading

0 comments on commit 4f45182

Please sign in to comment.