diff --git a/plugin/storage/factory_config.go b/plugin/storage/factory_config.go index 9807c8fa524..aaa40cd7256 100644 --- a/plugin/storage/factory_config.go +++ b/plugin/storage/factory_config.go @@ -68,6 +68,9 @@ func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig { depStorageType = spanWriterTypes[0] } samplingStorageType := os.Getenv(SamplingStorageTypeEnvVar) + if samplingStorageType == "" { + samplingStorageType = cassandraStorageType + } // TODO support explicit configuration for readers return FactoryConfig{ SpanWriterTypes: spanWriterTypes, diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 28edfa6f2ae..ccaa35b3542 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics" ) @@ -161,6 +162,7 @@ func (f *Factory) newRemoteStorage( Store: grpcClient, ArchiveStore: grpcClient, StreamingSpanWriter: grpcClient, + SamplingStore: grpcClient, }, Capabilities: grpcClient, }, nil @@ -230,6 +232,10 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { return f.services.ArchiveStore.ArchiveSpanWriter(), nil } +func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) { + return f.services.SamplingStore.SamplingStore(), nil +} + // Close closes the resources held by the factory func (f *Factory) Close() error { var errs []error diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index ab2e73bac10..3b15f39a422 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -20,6 +20,7 @@ import ( _ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration "github.com/jaegertracing/jaeger/proto-gen/storage_v1" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -53,6 +54,7 @@ func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) * capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(tracedConn), depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(tracedConn), streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(untracedConn), + samplingStoreClient: storage_v1.NewSamplingStorePluginClient(untracedConn), } } @@ -71,6 +73,10 @@ func (c *GRPCClient) SpanWriter() spanstore.Writer { return c } +func (c *GRPCClient) SamplingStore() samplingstore.Store { + return c +} + func (c *GRPCClient) StreamingSpanWriter() spanstore.Writer { return newStreamingSpanWriter(c.streamWriterClient) } @@ -270,7 +276,8 @@ func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace, return &trace, nil } -func (c *GRPCClient) InsertThroughput(ctx context.Context, throughputs []*samplingStoreModel.Throughput) error { +func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughput) error { + ctx := context.Background() storageV1Throughput := []*storage_v1.Throughput{} for _, throughput := range throughputs { probsAsArray := []float64{} @@ -300,7 +307,8 @@ func (c *GRPCClient) InsertThroughput(ctx context.Context, throughputs []*sampli return nil } -func (c *GRPCClient) InsertProbabilitiesAndQPS(ctx context.Context, hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error { +func (c *GRPCClient) InsertProbabilitiesAndQPS(hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error { + ctx := context.Background() stringFloatMapToV1StringFloatMap := func(in map[string]float64) *storage_v1.StringFloatMap { return &storage_v1.StringFloatMap{ StringFloatMap: in, @@ -331,7 +339,8 @@ func (c *GRPCClient) InsertProbabilitiesAndQPS(ctx context.Context, hostname str return nil } -func (c *GRPCClient) GetThroughput(ctx context.Context, start, end time.Time) ([]*samplingStoreModel.Throughput, error) { +func (c *GRPCClient) GetThroughput(start, end time.Time) ([]*samplingStoreModel.Throughput, error) { + ctx := context.Background() resp, err := c.samplingStoreClient.GetThroughput(ctx, &storage_v1.GetThroughputRequest{ StartTime: start, EndTime: end, @@ -359,7 +368,8 @@ func (c *GRPCClient) GetThroughput(ctx context.Context, start, end time.Time) ([ return resThroughput, nil } -func (c *GRPCClient) GetLatestProbabilities(ctx context.Context) (samplingStoreModel.ServiceOperationProbabilities, error) { +func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperationProbabilities, error) { + ctx := context.Background() resp, err := c.samplingStoreClient.GetLatestProbabilities(ctx, &storage_v1.GetLatestProbabilitiesRequest{}) if err != nil { return nil, fmt.Errorf("plugin error: %w", err) diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go index a8bb88fcf34..a0e97bc2cad 100644 --- a/plugin/storage/grpc/shared/grpc_client_test.go +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -112,6 +112,7 @@ func TestNewGRPCClient(t *testing.T) { assert.Implements(t, (*storage_v1.PluginCapabilitiesClient)(nil), client.capabilitiesClient) assert.Implements(t, (*storage_v1.DependenciesReaderPluginClient)(nil), client.depsReaderClient) assert.Implements(t, (*storage_v1.StreamingSpanWriterPluginClient)(nil), client.streamWriterClient) + assert.Implements(t, (*storage_v1.SamplingStorePluginClient)(nil), client.samplingStoreClient) } func TestGRPCClientGetServices(t *testing.T) { diff --git a/plugin/storage/grpc/shared/interface.go b/plugin/storage/grpc/shared/interface.go index 9c5b9fd8c2e..60d4011743b 100644 --- a/plugin/storage/grpc/shared/interface.go +++ b/plugin/storage/grpc/shared/interface.go @@ -28,7 +28,7 @@ type StreamingSpanWriterPlugin interface { } type SamplingStorePlugin interface { - SamplingStoreRW() samplingstore.Store + SamplingStore() samplingstore.Store } // PluginCapabilities allow expose plugin its capabilities. diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 77e273cf826..8e7e949d125 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -43,6 +43,8 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() require.NoError(t, err) + s.SamplingStore, err = f.CreateSamplingStore() + require.NoError(t, err) // TODO DependencyWriter is not implemented in grpc store