Skip to content

Commit

Permalink
Added sampling store in integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Alok Kumar Singh <[email protected]>
  • Loading branch information
akstron committed Dec 7, 2024
1 parent 50b8079 commit c753064
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 5 deletions.
3 changes: 3 additions & 0 deletions plugin/storage/factory_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig {
depStorageType = spanWriterTypes[0]
}
samplingStorageType := os.Getenv(SamplingStorageTypeEnvVar)
if samplingStorageType == "" {
samplingStorageType = cassandraStorageType
}
// TODO support explicit configuration for readers
return FactoryConfig{
SpanWriterTypes: spanWriterTypes,
Expand Down
6 changes: 6 additions & 0 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)
Expand Down Expand Up @@ -161,6 +162,7 @@ func (f *Factory) newRemoteStorage(
Store: grpcClient,
ArchiveStore: grpcClient,
StreamingSpanWriter: grpcClient,
SamplingStore: grpcClient,
},
Capabilities: grpcClient,
}, nil
Expand Down Expand Up @@ -230,6 +232,10 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
return f.services.ArchiveStore.ArchiveSpanWriter(), nil
}

func (f *Factory) CreateSamplingStore() (samplingstore.Store, error) {
return f.services.SamplingStore.SamplingStore(), nil
}

// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
Expand Down
18 changes: 14 additions & 4 deletions plugin/storage/grpc/shared/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
_ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -53,6 +54,7 @@ func NewGRPCClient(tracedConn *grpc.ClientConn, untracedConn *grpc.ClientConn) *
capabilitiesClient: storage_v1.NewPluginCapabilitiesClient(tracedConn),
depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(tracedConn),
streamWriterClient: storage_v1.NewStreamingSpanWriterPluginClient(untracedConn),
samplingStoreClient: storage_v1.NewSamplingStorePluginClient(untracedConn),
}
}

Expand All @@ -71,6 +73,10 @@ func (c *GRPCClient) SpanWriter() spanstore.Writer {
return c
}

func (c *GRPCClient) SamplingStore() samplingstore.Store {
return c
}

func (c *GRPCClient) StreamingSpanWriter() spanstore.Writer {
return newStreamingSpanWriter(c.streamWriterClient)
}
Expand Down Expand Up @@ -270,7 +276,8 @@ func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace,
return &trace, nil
}

func (c *GRPCClient) InsertThroughput(ctx context.Context, throughputs []*samplingStoreModel.Throughput) error {
func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughput) error {
ctx := context.Background()
storageV1Throughput := []*storage_v1.Throughput{}
for _, throughput := range throughputs {
probsAsArray := []float64{}
Expand Down Expand Up @@ -300,7 +307,8 @@ func (c *GRPCClient) InsertThroughput(ctx context.Context, throughputs []*sampli
return nil
}

func (c *GRPCClient) InsertProbabilitiesAndQPS(ctx context.Context, hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error {
func (c *GRPCClient) InsertProbabilitiesAndQPS(hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error {
ctx := context.Background()
stringFloatMapToV1StringFloatMap := func(in map[string]float64) *storage_v1.StringFloatMap {
return &storage_v1.StringFloatMap{
StringFloatMap: in,
Expand Down Expand Up @@ -331,7 +339,8 @@ func (c *GRPCClient) InsertProbabilitiesAndQPS(ctx context.Context, hostname str
return nil
}

func (c *GRPCClient) GetThroughput(ctx context.Context, start, end time.Time) ([]*samplingStoreModel.Throughput, error) {
func (c *GRPCClient) GetThroughput(start, end time.Time) ([]*samplingStoreModel.Throughput, error) {
ctx := context.Background()
resp, err := c.samplingStoreClient.GetThroughput(ctx, &storage_v1.GetThroughputRequest{
StartTime: start,
EndTime: end,
Expand Down Expand Up @@ -359,7 +368,8 @@ func (c *GRPCClient) GetThroughput(ctx context.Context, start, end time.Time) ([
return resThroughput, nil
}

func (c *GRPCClient) GetLatestProbabilities(ctx context.Context) (samplingStoreModel.ServiceOperationProbabilities, error) {
func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperationProbabilities, error) {
ctx := context.Background()
resp, err := c.samplingStoreClient.GetLatestProbabilities(ctx, &storage_v1.GetLatestProbabilitiesRequest{})
if err != nil {
return nil, fmt.Errorf("plugin error: %w", err)
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/grpc/shared/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestNewGRPCClient(t *testing.T) {
assert.Implements(t, (*storage_v1.PluginCapabilitiesClient)(nil), client.capabilitiesClient)
assert.Implements(t, (*storage_v1.DependenciesReaderPluginClient)(nil), client.depsReaderClient)
assert.Implements(t, (*storage_v1.StreamingSpanWriterPluginClient)(nil), client.streamWriterClient)
assert.Implements(t, (*storage_v1.SamplingStorePluginClient)(nil), client.samplingStoreClient)
}

func TestGRPCClientGetServices(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/grpc/shared/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type StreamingSpanWriterPlugin interface {
}

type SamplingStorePlugin interface {
SamplingStoreRW() samplingstore.Store
SamplingStore() samplingstore.Store
}

// PluginCapabilities allow expose plugin its capabilities.
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
require.NoError(t, err)
s.SamplingStore, err = f.CreateSamplingStore()
require.NoError(t, err)

// TODO DependencyWriter is not implemented in grpc store

Expand Down

0 comments on commit c753064

Please sign in to comment.