Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][storage] Add dependency store to v2 storage interface #6297

Merged
merged 21 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,6 @@ packages:
github.com/jaegertracing/jaeger/storage_v2/tracestore:
config:
all: true
github.com/jaegertracing/jaeger/storage_v2/depstore:
config:
all: true
6 changes: 3 additions & 3 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)
Expand Down Expand Up @@ -114,7 +114,7 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to create span writer", zap.Error(err))
}
dependencyReader, err := storageFactory.CreateDependencyReader()
dependencyReader, err := v2Factory.CreateDependencyReader()
Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Dec 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro for v1, the factory adapter is created directly so we have direct access to this function - is this okay?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func startQuery(
qOpts *queryApp.QueryOptions,
queryOpts *querysvc.QueryServiceOptions,
traceReader tracestore.Reader,
depReader dependencystore.Reader,
depReader depstore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Settings,
Expand Down
2 changes: 1 addition & 1 deletion cmd/anonymizer/app/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor
}

func (exp *storageExporter) start(_ context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactoryV2(exp.config.TraceStorage, host)
f, err := jaegerstorage.GetTraceStoreFactory(exp.config.TraceStorage, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}
Expand Down
17 changes: 7 additions & 10 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,20 @@
telset.Metrics = telset.Metrics.
Namespace(metrics.NSOptions{Name: "jaeger"}).
Namespace(metrics.NSOptions{Name: "query"})

// TODO currently v1 is still needed because of dependency storage
v1Factory, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
tf, err := jaegerstorage.GetTraceStoreFactory(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot find v1 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err)
return fmt.Errorf("cannot find factory for trace storage %s: %w", s.config.Storage.TracesPrimary, err)
}
f, err := jaegerstorage.GetStorageFactoryV2(s.config.Storage.TracesPrimary, host)
traceReader, err := tf.CreateTraceReader()
if err != nil {
return fmt.Errorf("cannot find v2 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err)
return fmt.Errorf("cannot create trace reader: %w", err)
}

traceReader, err := f.CreateTraceReader()
df, err := jaegerstorage.GetDependencyStoreFactory(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot create trace reader: %w", err)
return fmt.Errorf("cannot find factory for dependency storage %s: %w", s.config.Storage.TracesPrimary, err)

Check warning on line 85 in cmd/jaeger/internal/extension/jaegerquery/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerquery/server.go#L85

Added line #L85 was not covered by tests
}

depReader, err := v1Factory.CreateDependencyReader()
depReader, err := df.CreateDependencyReader()
if err != nil {
return fmt.Errorf("cannot create dependencies reader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestServerStart(t *testing.T) {
TracesPrimary: "need-factory-error",
},
},
expectedErr: "cannot find v1 factory for primary storage",
expectedErr: "cannot find factory for trace storage",
},
{
name: "span reader error",
Expand Down
11 changes: 10 additions & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)
Expand Down Expand Up @@ -74,7 +75,15 @@
return mf, nil
}

func GetStorageFactoryV2(name string, host component.Host) (tracestore.Factory, error) {
func GetTraceStoreFactory(name string, host component.Host) (tracestore.Factory, error) {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return getV2FactoryAdapter(name, host)
}

func GetDependencyStoreFactory(name string, host component.Host) (depstore.Factory, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been looking at missing test coverage and realized that it's hard to get error coverage for this code path. We have this Extension API:

type Extension interface {
	extension.Extension
	TraceStorageFactory(name string) (storage.Factory, bool)
	MetricStorageFactory(name string) (storage.MetricStoreFactory, bool)
}

Shouldn't it be exposing v2 factory instead of v1? And either it should have deps-related method OR there shouldn't be a static function for deps and instead it should be a runtime interface check, as we discussed on the ticket.

Maybe this is transitional? I actually don't like that the adapter is being applied by the static functions - I think the right way would be to have the extension itself expose v2 API (instead of v1 API - we could get to v1 via downgreading, if needed).

return getV2FactoryAdapter(name, host)

Check warning on line 83 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L82-L83

Added lines #L82 - L83 were not covered by tests
}

func getV2FactoryAdapter(name string, host component.Host) (*factoryadapter.Factory, error) {
f, err := GetStorageFactory(name, host)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestStorageFactoryBadShutdownError(t *testing.T) {

func TestGetFactoryV2Error(t *testing.T) {
host := componenttest.NewNopHost()
_, err := GetStorageFactoryV2("something", host)
_, err := GetTraceStoreFactory("something", host)
require.ErrorContains(t, err, "cannot find extension")
}

Expand All @@ -112,7 +112,7 @@ func TestGetFactory(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, f)

f2, err := GetStorageFactoryV2(name, host)
f2, err := GetTraceStoreFactory(name, host)
require.NoError(t, err)
require.NotNil(t, f2)

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/apiv3/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/apiv3/http_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/testutils"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
dependencyStoreMocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

Expand Down
21 changes: 14 additions & 7 deletions cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/metricstore"
metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

Expand Down Expand Up @@ -511,10 +512,13 @@ func TestGetDependenciesSuccessGRPC(t *testing.T) {
withServerAndClient(t, func(server *grpcServer, client *grpcClient) {
expectedDependencies := []model.DependencyLink{{Parent: "killer", Child: "queen", CallCount: 12}}
endTs := time.Now().UTC()
expectedEndTs := endTs.Add(time.Duration(-1) * defaultDependencyLookbackDuration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is expectedEndTs offset from endTs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be called startTs perhaps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, does Add(-defaultDependencyLookbackDuration) work?

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Dec 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Its a bit weird because here startTime is passed into endTs (https://github.com/jaegertracing/jaeger/blob/main/cmd/query/app/grpc_handler.go#L226). Do you know why that is? I thought it would be passed the endTs and then the lookback

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like a bug

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened a patch at #6324

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro should be good to go now

server.depReader.On("GetDependencies",
mock.Anything, // context.Context
endTs.Add(time.Duration(-1)*defaultDependencyLookbackDuration),
defaultDependencyLookbackDuration,
depstore.QueryParameters{
StartTime: expectedEndTs.Add(-defaultDependencyLookbackDuration),
EndTime: expectedEndTs,
},
).Return(expectedDependencies, nil).Times(1)

res, err := client.GetDependencies(context.Background(), &api_v2.GetDependenciesRequest{
Expand All @@ -529,11 +533,14 @@ func TestGetDependenciesSuccessGRPC(t *testing.T) {
func TestGetDependenciesFailureGRPC(t *testing.T) {
withServerAndClient(t, func(server *grpcServer, client *grpcClient) {
endTs := time.Now().UTC()
server.depReader.On(
"GetDependencies",
expectedEndTs := endTs.Add(time.Duration(-1) * defaultDependencyLookbackDuration)
server.depReader.On("GetDependencies",
mock.Anything, // context.Context
endTs.Add(time.Duration(-1)*defaultDependencyLookbackDuration),
defaultDependencyLookbackDuration).Return(nil, errStorageGRPC).Times(1)
depstore.QueryParameters{
StartTime: expectedEndTs.Add(-defaultDependencyLookbackDuration),
EndTime: expectedEndTs,
},
).Return(nil, errStorageGRPC).Times(1)

_, err := client.GetDependencies(context.Background(), &api_v2.GetDependenciesRequest{
StartTime: endTs.Add(time.Duration(-1) * defaultDependencyLookbackDuration),
Expand Down
14 changes: 10 additions & 4 deletions cmd/query/app/handler_deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/jaegertracing/jaeger/model"
ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
)

func TestDeduplicateDependencies(t *testing.T) {
Expand Down Expand Up @@ -304,8 +305,10 @@ func TestGetDependenciesSuccess(t *testing.T) {
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
ts.dependencyReader.On("GetDependencies",
mock.Anything, // context
endTs,
defaultDependencyLookbackDuration,
depstore.QueryParameters{
StartTime: endTs.Add(-defaultDependencyLookbackDuration),
EndTime: endTs,
},
).Return(expectedDependencies, nil).Times(1)

var response structuredResponse
Expand All @@ -324,8 +327,11 @@ func TestGetDependenciesCassandraFailure(t *testing.T) {
endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier)
ts.dependencyReader.On("GetDependencies",
mock.Anything, // context
endTs,
defaultDependencyLookbackDuration).Return(nil, errStorage).Times(1)
depstore.QueryParameters{
StartTime: endTs.Add(-defaultDependencyLookbackDuration),
EndTime: endTs,
},
).Return(nil, errStorage).Times(1)

var response structuredResponse
err := getJSON(ts.server.URL+"/api/dependencies?endTs=1476374248550&service=testing", &response)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metricstore/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

Expand Down
11 changes: 7 additions & 4 deletions cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/adjuster"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)
Expand Down Expand Up @@ -43,12 +43,12 @@ type StorageCapabilities struct {
// QueryService contains span utils required by the query-service.
type QueryService struct {
traceReader tracestore.Reader
dependencyReader dependencystore.Reader
dependencyReader depstore.Reader
options QueryServiceOptions
}

// NewQueryService returns a new QueryService.
func NewQueryService(traceReader tracestore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService {
func NewQueryService(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService {
qsvc := &QueryService{
traceReader: traceReader,
dependencyReader: dependencyReader,
Expand Down Expand Up @@ -134,7 +134,10 @@ func (qs QueryService) Adjust(trace *model.Trace) (*model.Trace, error) {

// GetDependencies implements dependencystore.Reader.GetDependencies
func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
return qs.dependencyReader.GetDependencies(ctx, endTs, lookback)
return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{
StartTime: endTs.Add(-lookback),
EndTime: endTs,
})
}

// GetCapabilities returns the features supported by the query service.
Expand Down
9 changes: 6 additions & 3 deletions cmd/query/app/querysvc/query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/depstore"
depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)
Expand Down Expand Up @@ -345,8 +346,10 @@ func TestGetDependencies(t *testing.T) {
tqs.depsReader.On(
"GetDependencies",
mock.Anything, // context.Context
endTs,
defaultDependencyLookbackDuration).Return(expectedDependencies, nil).Times(1)
depstore.QueryParameters{
StartTime: endTs.Add(-defaultDependencyLookbackDuration),
EndTime: endTs,
}).Return(expectedDependencies, nil).Times(1)

actualDependencies, err := tqs.queryService.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks"
spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
depsmocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/factoryadapter"
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create trace reader", zap.Error(err))
}
dependencyReader, err := storageFactory.CreateDependencyReader()
dependencyReader, err := v2Factory.CreateDependencyReader()
if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}
Expand Down
11 changes: 11 additions & 0 deletions storage_v2/depstore/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package depstore

import "github.com/jaegertracing/jaeger/storage_v2"

type Factory interface {
storage_v2.FactoryBase
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
CreateDependencyReader() (Reader, error)
}
Loading
Loading