Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Adaptive Sampling Support for gRPC Remote Storage #6308

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-e2e-grpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ jobs:
run: |
case ${{ matrix.version }} in
v1)
SPAN_STORAGE_TYPE=memory make grpc-storage-integration-test
SAMPLING_STORAGE_TYPE=memory SPAN_STORAGE_TYPE=memory make grpc-storage-integration-test
;;
v2)
STORAGE=grpc SPAN_STORAGE_TYPE=memory make jaeger-v2-storage-integration-test
SAMPLING_STORAGE_TYPE=memory STORAGE=grpc SPAN_STORAGE_TYPE=memory make jaeger-v2-storage-integration-test
;;
esac

Expand Down
12 changes: 9 additions & 3 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -36,8 +37,8 @@
}

// NewServer creates and initializes Server.
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, telset.Logger)
func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings, samplingStoreFactory storage.SamplingStoreFactory) (*Server, error) {
handler, err := createGRPCHandler(storageFactory, samplingStoreFactory, telset.Logger)
if err != nil {
return nil, err
}
Expand All @@ -54,7 +55,7 @@
}, nil
}

func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
func createGRPCHandler(f storage.BaseFactory, samplingStoreFactory storage.SamplingStoreFactory, logger *zap.Logger) (*shared.GRPCHandler, error) {
reader, err := f.CreateSpanReader()
if err != nil {
return nil, err
Expand All @@ -67,12 +68,17 @@
if err != nil {
return nil, err
}
samplingStore, err := samplingStoreFactory.CreateSamplingStore(1)
Copy link
Member

@yurishkuro yurishkuro Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for readability, define named constant instead of passing unnamed 1

if err != nil {
return nil, err
}

Check warning on line 74 in cmd/remote-storage/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/remote-storage/app/server.go#L73-L74

Added lines #L73 - L74 were not covered by tests

impl := &shared.GRPCHandlerStorageImpl{
SpanReader: func() spanstore.Reader { return reader },
SpanWriter: func() spanstore.Writer { return writer },
DependencyReader: func() dependencystore.Reader { return depReader },
StreamingSpanWriter: func() spanstore.Writer { return nil },
SamplingStore: func() samplingstore.Store { return samplingStore },
}

// borrow code from Query service for archive storage
Expand Down
6 changes: 5 additions & 1 deletion cmd/remote-storage/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestNewServer_CreateStorageErrors(t *testing.T) {
factory,
tenancy.NewManager(&tenancy.Options{}),
telemetry.NoopSettings(),
nil,
)
}
_, err := f()
Expand Down Expand Up @@ -118,13 +119,14 @@ func TestNewServer_TLSConfigError(t *testing.T) {
storageMocks.factory,
tenancy.NewManager(&tenancy.Options{}),
telset,
nil,
)
assert.ErrorContains(t, err, "invalid TLS config")
}

func TestCreateGRPCHandler(t *testing.T) {
storageMocks := newStorageMocks()
h, err := createGRPCHandler(storageMocks.factory, zap.NewNop())
h, err := createGRPCHandler(storageMocks.factory, nil, zap.NewNop())
require.NoError(t, err)

storageMocks.writer.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("writer error"))
Expand Down Expand Up @@ -325,6 +327,7 @@ func TestServerGRPCTLS(t *testing.T) {
storageMocks.factory,
tm,
telset,
nil,
)
require.NoError(t, err)
require.NoError(t, server.Start())
Expand Down Expand Up @@ -374,6 +377,7 @@ func TestServerHandlesPortZero(t *testing.T) {
storageMocks.factory,
tenancy.NewManager(&tenancy.Options{}),
telset,
nil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a test where something real is passed

)
require.NoError(t, err)

Expand Down
2 changes: 1 addition & 1 deletion cmd/remote-storage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func main() {
tm := tenancy.NewManager(&opts.Tenancy)
telset := baseTelset // copy
telset.Metrics = metricsFactory
server, err := app.NewServer(opts, storageFactory, tm, telset)
server, err := app.NewServer(opts, storageFactory, tm, telset, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will it work if you pass nil?

if err != nil {
logger.Fatal("Failed to create server", zap.Error(err))
}
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)
Expand Down Expand Up @@ -161,6 +162,7 @@ func (f *Factory) newRemoteStorage(
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
SamplingStore: grpcClient,
},
Capabilities: grpcClient,
}, nil
Expand Down Expand Up @@ -230,6 +232,10 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return f.services.ArchiveStore.ArchiveSpanWriter(), nil
}

func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return f.services.SamplingStore.SamplingStore(), nil
}

// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
Expand Down
67 changes: 67 additions & 0 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,63 @@ message FindTraceIDsResponse {
];
}

message Throughput {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Schema should be documented. Wherever you are copying this from, we have comments explaining all types.

string Service = 1;
string Operation = 2;
int64 Count = 3;
repeated double Probabilities = 4;
}

message InsertThroughputRequest {
repeated Throughput throughput = 1;
}

message InsertThroughputResponse {
}

message StringFloatMap {
map<string, double> stringFloatMap = 1;
}

message ServiceOperationProbabilities {
map<string, StringFloatMap> serviceOperationProbabilities = 1;
}

message ServiceOperationQPS {
map<string, StringFloatMap> serviceOperationQPS = 1;
}

message InsertProbabilitiesAndQPSRequest {
string hostname = 1;
ServiceOperationProbabilities probabilities = 2;
ServiceOperationQPS qps = 3;
}

message InsertProbabilitiesAndQPSResponse {
}

message GetThroughputRequest {
google.protobuf.Timestamp start_time = 1[
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
google.protobuf.Timestamp end_time = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = false
];
}

message GetThroughputResponse {
repeated Throughput throughput = 1;
}

message GetLatestProbabilitiesRequest {
}

message GetLatestProbabilitiesResponse {
ServiceOperationProbabilities serviceOperationProbabilities = 1;
}

service SpanWriterPlugin {
// spanstore/Writer
rpc WriteSpan(WriteSpanRequest) returns (WriteSpanResponse);
Expand Down Expand Up @@ -182,6 +239,16 @@ service DependenciesReaderPlugin {
rpc GetDependencies(GetDependenciesRequest) returns (GetDependenciesResponse);
}

service SamplingStorePlugin{
rpc InsertThroughput(InsertThroughputRequest) returns (InsertThroughputResponse);

rpc InsertProbabilitiesAndQPS(InsertProbabilitiesAndQPSRequest) returns (InsertProbabilitiesAndQPSResponse);

rpc GetThroughput(GetThroughputRequest) returns (GetThroughputResponse);

rpc GetLatestProbabilities(GetLatestProbabilitiesRequest) returns (GetLatestProbabilitiesResponse);
}

// empty; extensible in the future
message CapabilitiesRequest {

Expand Down
67 changes: 67 additions & 0 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

samplingStoreModel "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/model"
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand All @@ -39,6 +41,7 @@
capabilitiesClient storage_v1.PluginCapabilitiesClient
depsReaderClient storage_v1.DependenciesReaderPluginClient
streamWriterClient storage_v1.StreamingSpanWriterPluginClient
samplingStoreClient storage_v1.SamplingStorePluginClient
}

func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) *GRPCClient {
Expand All @@ -50,6 +53,7 @@
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(tracedConn),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(tracedConn),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(untracedConn),
samplingStoreClient: storage_v1.NewSamplingStorePluginClient(untracedConn),
}
}

Expand All @@ -68,6 +72,10 @@
return c
}

func (c *GRPCClient) SamplingStore() samplingstore.Store {
return c
}

func (c *GRPCClient) StreamingSpanWriter() spanstore.Writer {
return newStreamingSpanWriter(c.streamWriterClient)
}
Expand Down Expand Up @@ -266,3 +274,62 @@

return &trace, nil
}

func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughput) error {
ctx := context.Background()
storageV1Throughput, err := samplingStoreThroughputsToStorageV1Throughputs(throughputs)
if err != nil {
return err
}

Check warning on line 283 in plugin/storage/grpc/shared/grpc_client.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/shared/grpc_client.go#L282-L283

Added lines #L282 - L283 were not covered by tests

_, err = c.samplingStoreClient.InsertThroughput(ctx, &storage_v1.InsertThroughputRequest{
Throughput: storageV1Throughput,
})
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

Check warning on line 290 in plugin/storage/grpc/shared/grpc_client.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/shared/grpc_client.go#L289-L290

Added lines #L289 - L290 were not covered by tests

return nil
}

func (c *GRPCClient) InsertProbabilitiesAndQPS(hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error {
ctx := context.Background()

_, err := c.samplingStoreClient.InsertProbabilitiesAndQPS(ctx, &storage_v1.InsertProbabilitiesAndQPSRequest{
Hostname: hostname,
Probabilities: &storage_v1.ServiceOperationProbabilities{
ServiceOperationProbabilities: sSFloatMapToStorageV1SSFloatMap(probabilities),
},
Qps: &storage_v1.ServiceOperationQPS{
ServiceOperationQPS: sSFloatMapToStorageV1SSFloatMap(qps),
},
})
if err != nil {
return fmt.Errorf("plugin error: %w", err)
}

Check warning on line 309 in plugin/storage/grpc/shared/grpc_client.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/shared/grpc_client.go#L308-L309

Added lines #L308 - L309 were not covered by tests

return nil
}

func (c *GRPCClient) GetThroughput(start, end time.Time) ([]*samplingStoreModel.Throughput, error) {
ctx := context.Background()
resp, err := c.samplingStoreClient.GetThroughput(ctx, &storage_v1.GetThroughputRequest{
StartTime: start,
EndTime: end,
})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

Check warning on line 322 in plugin/storage/grpc/shared/grpc_client.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/shared/grpc_client.go#L321-L322

Added lines #L321 - L322 were not covered by tests

return storageV1ThroughputsToSamplingStoreThroughputs(resp.Throughput), nil
}

func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperationProbabilities, error) {
ctx := context.Background()
resp, err := c.samplingStoreClient.GetLatestProbabilities(ctx, &storage_v1.GetLatestProbabilitiesRequest{})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
}

Check warning on line 332 in plugin/storage/grpc/shared/grpc_client.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/grpc/shared/grpc_client.go#L331-L332

Added lines #L331 - L332 were not covered by tests

return storageV1SSFloatMapToSSFloatMap(resp.ServiceOperationProbabilities.ServiceOperationProbabilities), nil
}
1 change: 1 addition & 0 deletions plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestNewGRPCClient(t *testing.T) {
assert.Implements(t, (*storage_v1.PluginCapabilitiesClient)(nil), client.capabilitiesClient)
assert.Implements(t, (*storage_v1.DependenciesReaderPluginClient)(nil), client.depsReaderClient)
assert.Implements(t, (*storage_v1.StreamingSpanWriterPluginClient)(nil), client.streamWriterClient)
assert.Implements(t, (*storage_v1.SamplingStorePluginClient)(nil), client.samplingStoreClient)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no unit tests for new functions?

}

func TestGRPCClientGetServices(t *testing.T) {
Expand Down
Loading
Loading