diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index 546d5c0b16c..e4b9c081bf1 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -22,6 +22,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -36,8 +37,8 @@ type Server struct { } // NewServer creates and initializes Server. -func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) { - handler, err := createGRPCHandler(storageFactory, telset.Logger) +func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings, samplingStoreFactory storage.SamplingStoreFactory) (*Server, error) { + handler, err := createGRPCHandler(storageFactory, samplingStoreFactory, telset.Logger) if err != nil { return nil, err } @@ -54,7 +55,7 @@ func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy }, nil } -func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCHandler, error) { +func createGRPCHandler(f storage.BaseFactory, samplingStoreFactory storage.SamplingStoreFactory, logger *zap.Logger) (*shared.GRPCHandler, error) { reader, err := f.CreateSpanReader() if err != nil { return nil, err @@ -67,12 +68,17 @@ func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCH if err != nil { return nil, err } + samplingStore, err := samplingStoreFactory.CreateSamplingStore(1) + if err != nil { + return nil, err + } impl := &shared.GRPCHandlerStorageImpl{ SpanReader: func() spanstore.Reader { return reader }, SpanWriter: func() spanstore.Writer { return writer }, DependencyReader: func() dependencystore.Reader { return depReader }, StreamingSpanWriter: func() spanstore.Writer { return nil }, + SamplingStore: func() samplingstore.Store { return samplingStore }, } // borrow code from Query service for archive storage diff --git a/cmd/remote-storage/main.go b/cmd/remote-storage/main.go index 8f1ed50e9a7..dacd5a7aa66 100644 --- a/cmd/remote-storage/main.go +++ b/cmd/remote-storage/main.go @@ -77,7 +77,7 @@ func main() { tm := tenancy.NewManager(&opts.Tenancy) telset := baseTelset // copy telset.Metrics = metricsFactory - server, err := app.NewServer(opts, storageFactory, tm, telset) + server, err := app.NewServer(opts, storageFactory, tm, telset, nil) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } diff --git a/plugin/storage/factory_config.go b/plugin/storage/factory_config.go index aaa40cd7256..9807c8fa524 100644 --- a/plugin/storage/factory_config.go +++ b/plugin/storage/factory_config.go @@ -68,9 +68,6 @@ func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig { depStorageType = spanWriterTypes[0] } samplingStorageType := os.Getenv(SamplingStorageTypeEnvVar) - if samplingStorageType == "" { - samplingStorageType = cassandraStorageType - } // TODO support explicit configuration for readers return FactoryConfig{ SpanWriterTypes: spanWriterTypes, diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go index 3b15f39a422..efecf227bcd 100644 --- a/plugin/storage/grpc/shared/grpc_client.go +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -8,7 +8,6 @@ import ( "errors" "fmt" "io" - "strconv" "time" "google.golang.org/grpc" @@ -278,26 +277,12 @@ func readTrace(stream storage_v1.SpanReaderPlugin_GetTraceClient) (*model.Trace, func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughput) error { ctx := context.Background() - storageV1Throughput := []*storage_v1.Throughput{} - for _, throughput := range throughputs { - probsAsArray := []float64{} - for prob := range throughput.Probabilities { - probInFloat, err := strconv.ParseFloat(prob, 64) - if err != nil { - return err - } - probsAsArray = append(probsAsArray, probInFloat) - } - - storageV1Throughput = append(storageV1Throughput, &storage_v1.Throughput{ - Service: throughput.Service, - Operation: throughput.Operation, - Count: throughput.Count, - Probabilities: probsAsArray, - }) + storageV1Throughput, err := samplingStoreThroughpusToStorageV1Throughputs(throughputs) + if err != nil { + return err } - _, err := c.samplingStoreClient.InsertThroughput(ctx, &storage_v1.InsertThroughputRequest{ + _, err = c.samplingStoreClient.InsertThroughput(ctx, &storage_v1.InsertThroughputRequest{ Throughput: storageV1Throughput, }) if err != nil { @@ -309,27 +294,14 @@ func (c *GRPCClient) InsertThroughput(throughputs []*samplingStoreModel.Throughp func (c *GRPCClient) InsertProbabilitiesAndQPS(hostname string, probabilities samplingStoreModel.ServiceOperationProbabilities, qps samplingStoreModel.ServiceOperationQPS) error { ctx := context.Background() - stringFloatMapToV1StringFloatMap := func(in map[string]float64) *storage_v1.StringFloatMap { - return &storage_v1.StringFloatMap{ - StringFloatMap: in, - } - } - - convertToV1Map := func(in map[string]map[string]float64) map[string]*storage_v1.StringFloatMap { - res := make(map[string]*storage_v1.StringFloatMap) - for k, v := range in { - res[k] = stringFloatMapToV1StringFloatMap(v) - } - return res - } _, err := c.samplingStoreClient.InsertProbabilitiesAndQPS(ctx, &storage_v1.InsertProbabilitiesAndQPSRequest{ Hostname: hostname, Probabilities: &storage_v1.ServiceOperationProbabilities{ - ServiceOperationProbabilities: convertToV1Map(probabilities), + ServiceOperationProbabilities: sSFloatMapToStorageV1SSFloatMap(probabilities), }, Qps: &storage_v1.ServiceOperationQPS{ - ServiceOperationQPS: convertToV1Map(qps), + ServiceOperationQPS: sSFloatMapToStorageV1SSFloatMap(qps), }, }) if err != nil { @@ -349,23 +321,7 @@ func (c *GRPCClient) GetThroughput(start, end time.Time) ([]*samplingStoreModel. return nil, fmt.Errorf("plugin error: %w", err) } - resThroughput := []*samplingStoreModel.Throughput{} - - for _, throughput := range resp.Throughput { - probsAsSet := make(map[string]struct{}) - for _, prob := range throughput.Probabilities { - probsAsSet[strconv.FormatFloat(prob, 'E', -1, 64)] = struct{}{} - } - - resThroughput = append(resThroughput, &samplingStoreModel.Throughput{ - Service: throughput.Service, - Operation: throughput.Operation, - Count: throughput.Count, - Probabilities: probsAsSet, - }) - } - - return resThroughput, nil + return storageV1ThroughputsToSamplingStoreThroughputs(resp.Throughput), nil } func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperationProbabilities, error) { @@ -375,17 +331,5 @@ func (c *GRPCClient) GetLatestProbabilities() (samplingStoreModel.ServiceOperati return nil, fmt.Errorf("plugin error: %w", err) } - v1StringFloatMapToStringFloatMap := func(in *storage_v1.StringFloatMap) map[string]float64 { - return in.StringFloatMap - } - - convertToMap := func(in map[string]*storage_v1.StringFloatMap) map[string]map[string]float64 { - res := make(map[string]map[string]float64) - for k, v := range in { - res[k] = v1StringFloatMapToStringFloatMap(v) - } - return res - } - - return convertToMap(resp.ServiceOperationProbabilities.ServiceOperationProbabilities), nil + return storageV1SSFloatMapToSSFloatMap(resp.ServiceOperationProbabilities.ServiceOperationProbabilities), nil } diff --git a/plugin/storage/grpc/shared/grpc_handler.go b/plugin/storage/grpc/shared/grpc_handler.go index 564a625ebf0..ddb0ef47199 100644 --- a/plugin/storage/grpc/shared/grpc_handler.go +++ b/plugin/storage/grpc/shared/grpc_handler.go @@ -19,6 +19,7 @@ import ( _ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration "github.com/jaegertracing/jaeger/proto-gen/storage_v1" "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -42,6 +43,8 @@ type GRPCHandlerStorageImpl struct { ArchiveSpanWriter func() spanstore.Writer StreamingSpanWriter func() spanstore.Writer + + SamplingStore func() samplingstore.Store } // NewGRPCHandler creates a handler given individual storage implementations. @@ -83,6 +86,7 @@ func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error { storage_v1.RegisterPluginCapabilitiesServer(ss, s) storage_v1.RegisterDependenciesReaderPluginServer(ss, s) storage_v1.RegisterStreamingSpanWriterPluginServer(ss, s) + storage_v1.RegisterSamplingStorePluginServer(ss, s) hs.SetServingStatus("jaeger.storage.v1.SpanReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING) hs.SetServingStatus("jaeger.storage.v1.SpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING) @@ -91,6 +95,7 @@ func (s *GRPCHandler) Register(ss *grpc.Server, hs *health.Server) error { hs.SetServingStatus("jaeger.storage.v1.PluginCapabilities", grpc_health_v1.HealthCheckResponse_SERVING) hs.SetServingStatus("jaeger.storage.v1.DependenciesReaderPlugin", grpc_health_v1.HealthCheckResponse_SERVING) hs.SetServingStatus("jaeger.storage.v1.StreamingSpanWriterPlugin", grpc_health_v1.HealthCheckResponse_SERVING) + hs.SetServingStatus("jaeger.storage.v1.SamplingStorePlugin", grpc_health_v1.HealthCheckResponse_SERVING) grpc_health_v1.RegisterHealthServer(ss, hs) return nil @@ -303,3 +308,46 @@ func (s *GRPCHandler) WriteArchiveSpan(ctx context.Context, r *storage_v1.WriteS } return &storage_v1.WriteSpanResponse{}, nil } + +func (s *GRPCHandler) InsertThroughput(_ context.Context, r *storage_v1.InsertThroughputRequest) (*storage_v1.InsertThroughputResponse, error) { + err := s.impl.SamplingStore().InsertThroughput(storageV1ThroughputsToSamplingStoreThroughputs(r.Throughput)) + if err != nil { + return nil, err + } + return &storage_v1.InsertThroughputResponse{}, nil +} + +func (s *GRPCHandler) InsertProbabilitiesAndQPS(_ context.Context, r *storage_v1.InsertProbabilitiesAndQPSRequest) (*storage_v1.InsertProbabilitiesAndQPSResponse, error) { + err := s.impl.SamplingStore().InsertProbabilitiesAndQPS(r.Hostname, storageV1SSFloatMapToSSFloatMap(r.Probabilities.ServiceOperationProbabilities), storageV1SSFloatMapToSSFloatMap(r.Qps.ServiceOperationQPS)) + if err != nil { + return nil, err + } + return &storage_v1.InsertProbabilitiesAndQPSResponse{}, nil +} + +func (s *GRPCHandler) GetThroughput(_ context.Context, r *storage_v1.GetThroughputRequest) (*storage_v1.GetThroughputResponse, error) { + throughput, err := s.impl.SamplingStore().GetThroughput(r.StartTime, r.EndTime) + if err != nil { + return nil, err + } + storageV1Throughput, err := samplingStoreThroughpusToStorageV1Throughputs(throughput) + if err != nil { + return nil, err + } + + return &storage_v1.GetThroughputResponse{ + Throughput: storageV1Throughput, + }, nil +} + +func (s *GRPCHandler) GetLatestProbabilities(context.Context, *storage_v1.GetLatestProbabilitiesRequest) (*storage_v1.GetLatestProbabilitiesResponse, error) { + probabilities, err := s.impl.SamplingStore().GetLatestProbabilities() + if err != nil { + return nil, err + } + return &storage_v1.GetLatestProbabilitiesResponse{ + ServiceOperationProbabilities: &storage_v1.ServiceOperationProbabilities{ + ServiceOperationProbabilities: sSFloatMapToStorageV1SSFloatMap(probabilities), + }, + }, nil +} diff --git a/plugin/storage/integration/remote_memory_storage.go b/plugin/storage/integration/remote_memory_storage.go index 74f813fa2b9..a02c8ac8674 100644 --- a/plugin/storage/integration/remote_memory_storage.go +++ b/plugin/storage/integration/remote_memory_storage.go @@ -43,6 +43,9 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage { storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr)) require.NoError(t, err) + samplingStoreFactory, err := storageFactory.CreateSamplingStoreFactory() + require.NoError(t, err) + v, _ := config.Viperize(storageFactory.AddFlags) storageFactory.InitFromViper(v, logger) require.NoError(t, storageFactory.Initialize(metrics.NullFactory, logger)) @@ -51,7 +54,7 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage { telset := telemetry.NoopSettings() telset.Logger = logger telset.ReportStatus = telemetry.HCAdapter(healthcheck.New()) - server, err := app.NewServer(opts, storageFactory, tm, telset) + server, err := app.NewServer(opts, storageFactory, tm, telset, samplingStoreFactory) require.NoError(t, err) require.NoError(t, server.Start())