Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][storage] Implement read path for v2 storage interface #6170

Merged
merged 24 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c5600ba
Add Reader With V2 Interface
mahadzaryab1 Nov 16, 2024
23c7700
Hold TraceReader In Query Service
mahadzaryab1 Nov 16, 2024
2b53471
Generate Mocks For V2 Storage
mahadzaryab1 Nov 16, 2024
617c30c
Fix Unit Tests
mahadzaryab1 Nov 16, 2024
dcf6357
Implement Create Trace Reader For V2 Factory
mahadzaryab1 Nov 17, 2024
1175e6e
Initialize V2 Factories And Use CreateTraceReader
mahadzaryab1 Nov 17, 2024
1683007
Address Feedback For Error Variable
mahadzaryab1 Nov 17, 2024
840a679
Merge branch 'main' into v2-reader
mahadzaryab1 Nov 19, 2024
24a53df
Merge branch 'main' into v2-reader
mahadzaryab1 Nov 30, 2024
df9875a
Fix Errors From Merge Commit
mahadzaryab1 Nov 30, 2024
51bffb4
Move V2 Factory Initialization After V1
mahadzaryab1 Nov 30, 2024
226e3ca
Add Missing Tests In Query Service
mahadzaryab1 Nov 30, 2024
f4f6cfb
Merge branch 'main' into v2-reader
mahadzaryab1 Nov 30, 2024
4b712d1
Merge branch 'main' into v2-reader
mahadzaryab1 Dec 2, 2024
d76eeba
Remove Unused Imports
mahadzaryab1 Dec 2, 2024
a76d544
Merge branch 'main' into v2-reader
mahadzaryab1 Dec 2, 2024
5cad28e
Fix Package Renaming
mahadzaryab1 Dec 2, 2024
2bf30b6
Regenerate Mocks
mahadzaryab1 Dec 2, 2024
a0ccfe3
Remove Old Mocks
mahadzaryab1 Dec 2, 2024
9cbbce8
Merge branch 'main' into v2-reader
mahadzaryab1 Dec 2, 2024
9b181e4
Apply suggestions from code review
yurishkuro Dec 5, 2024
cbfeb0e
Merge branch 'main' into v2-reader
yurishkuro Dec 5, 2024
d30b889
regen-mock
yurishkuro Dec 5, 2024
6828667
fix test
yurishkuro Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,6 @@ packages:
github.com/jaegertracing/jaeger/storage/spanstore:
config:
all: true
github.com/jaegertracing/jaeger/storage_v2/tracestore:
config:
all: true
12 changes: 7 additions & 5 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

// all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store
Expand Down Expand Up @@ -104,7 +105,8 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to init storage factory", zap.Error(err))
}

spanReader, err := storageFactory.CreateSpanReader()
v2Factory := factoryadapter.NewFactory(storageFactory)
traceReader, err := v2Factory.CreateTraceReader()
if err != nil {
logger.Fatal("Failed to create span reader", zap.Error(err))
}
Expand Down Expand Up @@ -168,7 +170,7 @@ by default uses only in-memory database.`,
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
traceReader, dependencyReader, metricsQueryService,
tm, queryTelset,
)

Expand Down Expand Up @@ -215,13 +217,13 @@ func startQuery(
svc *flags.Service,
qOpts *queryApp.QueryOptions,
queryOpts *querysvc.QueryServiceOptions,
spanReader spanstore.Reader,
traceReader tracestore.Reader,
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Settings,
) *queryApp.Server {
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
qs := querysvc.NewQueryService(traceReader, depReader, *queryOpts)

server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -55,11 +56,12 @@ type testServer struct {

func newTestServer(t *testing.T) *testServer {
spanReader := &spanstoremocks.Reader{}
traceReader := factoryadapter.NewTraceReader(spanReader)
metricsReader, err := disabled.NewMetricsReader()
require.NoError(t, err)

q := querysvc.NewQueryService(
spanReader,
traceReader,
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
18 changes: 12 additions & 6 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,32 @@
Namespace(metrics.NSOptions{Name: "jaeger"}).
Namespace(metrics.NSOptions{Name: "query"})

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
// TODO currently v1 is still needed because of dependency storage
v1Factory, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("cannot find primary storage %s: %w", s.config.Storage.TracesPrimary, err)
return fmt.Errorf("cannot find v1 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err)
}
f, err := jaegerstorage.GetStorageFactoryV2(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot find v2 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err)

Check warning on line 82 in cmd/jaeger/internal/extension/jaegerquery/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerquery/server.go#L82

Added line #L82 was not covered by tests
}

spanReader, err := f.CreateSpanReader()
traceReader, err := f.CreateTraceReader()
if err != nil {
return fmt.Errorf("cannot create span reader: %w", err)
return fmt.Errorf("cannot create trace reader: %w", err)
}

depReader, err := f.CreateDependencyReader()
depReader, err := v1Factory.CreateDependencyReader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to create storage_v2/depstore/ before continuing, so that we don't have to keep this bifurcation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Sure. What do we want to put in that package?

if err != nil {
return fmt.Errorf("cannot create dependencies reader: %w", err)
}

var opts querysvc.QueryServiceOptions
// TODO archive storage still uses v1 factory
if err := s.addArchiveStorage(&opts, host); err != nil {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return err
}
qs := querysvc.NewQueryService(spanReader, depReader, opts)
qs := querysvc.NewQueryService(traceReader, depReader, opts)

mqs, err := s.createMetricReader(host)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "need-factory-error",
},
},
expectedErr: "cannot find primary storage",
expectedErr: "cannot find v1 factory for primary storage",
},
{
name: "span reader error",
Expand All @@ -159,7 +159,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "need-span-reader-error",
},
},
expectedErr: "cannot create span reader",
expectedErr: "cannot create trace reader",
},
{
name: "dependency error",
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -57,7 +58,7 @@ func newTestServerClient(t *testing.T) *testServerClient {
}

q := querysvc.NewQueryService(
tsc.reader,
factoryadapter.NewTraceReader(tsc.reader),
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

func setupHTTPGatewayNoServer(
Expand All @@ -35,7 +36,7 @@ func setupHTTPGatewayNoServer(
reader: &spanstoremocks.Reader{},
}

q := querysvc.NewQueryService(gw.reader,
q := querysvc.NewQueryService(factoryadapter.NewTraceReader(gw.reader),
&dependencyStoreMocks.Reader{},
querysvc.QueryServiceOptions{},
)
Expand Down
5 changes: 3 additions & 2 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

var (
Expand Down Expand Up @@ -901,7 +902,7 @@ func initializeTenantedTestServerGRPCWithOptions(t *testing.T, tm *tenancy.Manag
require.NoError(t, err)

q := querysvc.NewQueryService(
spanReader,
factoryadapter.NewTraceReader(spanReader),
dependencyReader,
querysvc.QueryServiceOptions{
ArchiveSpanReader: archiveSpanReader,
Expand Down Expand Up @@ -1165,7 +1166,7 @@ func TestNewGRPCHandlerWithEmptyOptions(t *testing.T) {
require.NoError(t, err)

q := querysvc.NewQueryService(
&spanstoremocks.Reader{},
factoryadapter.NewTraceReader(&spanstoremocks.Reader{}),
&depsmocks.Reader{},
querysvc.QueryServiceOptions{})

Expand Down
7 changes: 5 additions & 2 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond)
Expand Down Expand Up @@ -119,7 +120,8 @@ func initializeTestServerWithOptions(
options = append(options, HandlerOptions.Logger(zaptest.NewLogger(t)))
readStorage := &spanstoremocks.Reader{}
dependencyStorage := &depsmocks.Reader{}
qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions)
traceReader := factoryadapter.NewTraceReader(readStorage)
qs := querysvc.NewQueryService(traceReader, dependencyStorage, queryOptions)
r := NewRouter()
apiHandler := NewAPIHandler(qs, options...)
apiHandler.RegisterRoutes(r)
Expand Down Expand Up @@ -198,8 +200,9 @@ func TestLogOnServerError(t *testing.T) {
zapCore, logs := observer.New(zap.InfoLevel)
logger := zap.New(zapCore)
readStorage := &spanstoremocks.Reader{}
traceReader := factoryadapter.NewTraceReader(readStorage)
dependencyStorage := &depsmocks.Reader{}
qs := querysvc.NewQueryService(readStorage, dependencyStorage, querysvc.QueryServiceOptions{})
qs := querysvc.NewQueryService(traceReader, dependencyStorage, querysvc.QueryServiceOptions{})
h := NewAPIHandler(qs, HandlerOptions.Logger(logger))
e := errors.New("test error")
h.handleError(&httptest.ResponseRecorder{}, e, http.StatusInternalServerError)
Expand Down
32 changes: 25 additions & 7 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

var errNoArchiveSpanStorage = errors.New("archive span storage was not configured")
Expand All @@ -40,15 +42,15 @@ type StorageCapabilities struct {

// QueryService contains span utils required by the query-service.
type QueryService struct {
spanReader spanstore.Reader
traceReader tracestore.Reader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about archive reader/writer, any reason not to upgrade them too? (or PR is too large?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro I initially thought that we couldn't do this yet because I thought we were using the ArchiveFactory interface here. However, it seems like we just use the normal reader interface on the archive reader as well (see https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/querysvc/query_service.go#L69) so this can be done. I can do it in this PR or a separate one. Let me know if you have a preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the other question (not having v2 reader fully implemented) it's ok to do these separately, no particular benefit afaict.

dependencyReader dependencystore.Reader
options QueryServiceOptions
}

// NewQueryService returns a new QueryService.
func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService {
func NewQueryService(traceReader tracestore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService {
qsvc := &QueryService{
spanReader: spanReader,
traceReader: traceReader,
dependencyReader: dependencyReader,
options: options,
}
Expand All @@ -61,7 +63,11 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto

// GetTrace is the queryService implementation of spanstore.Reader.GetTrace
func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
trace, err := qs.spanReader.GetTrace(ctx, traceID)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
trace, err := spanReader.GetTrace(ctx, traceID)
if errors.Is(err, spanstore.ErrTraceNotFound) {
if qs.options.ArchiveSpanReader == nil {
return nil, err
Expand All @@ -73,20 +79,32 @@ func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*mo

// GetServices is the queryService implementation of spanstore.Reader.GetServices
func (qs QueryService) GetServices(ctx context.Context) ([]string, error) {
return qs.spanReader.GetServices(ctx)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.GetServices(ctx)
}

// GetOperations is the queryService implementation of spanstore.Reader.GetOperations
func (qs QueryService) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
return qs.spanReader.GetOperations(ctx, query)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.GetOperations(ctx, query)
}

// FindTraces is the queryService implementation of spanstore.Reader.FindTraces
func (qs QueryService) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
return qs.spanReader.FindTraces(ctx, query)
spanReader, err := factoryadapter.GetV1Reader(qs.traceReader)
if err != nil {
return nil, err
}
return spanReader.FindTraces(ctx, query)
}

// ArchiveTrace is the queryService utility to archive traces.
Expand Down
Loading
Loading