Skip to content

Commit

Permalink
Rename strategy store to sampling strategy provider
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Jun 16, 2024
1 parent 082d2e0 commit 252980d
Show file tree
Hide file tree
Showing 60 changed files with 336 additions and 330 deletions.
38 changes: 19 additions & 19 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -68,13 +68,13 @@ func main() {
if err != nil {
log.Fatalf("Cannot initialize storage factory: %v", err)
}
strategyStoreFactoryConfig, err := ss.FactoryConfigFromEnv()
samplingStrategyFactoryConfig, err := ss.FactoryConfigFromEnv()
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory config: %v", err)
log.Fatalf("Cannot initialize sampling strategy factory config: %v", err)
}
strategyStoreFactory, err := ss.NewFactory(*strategyStoreFactoryConfig)
samplingStrategyFactory, err := ss.NewFactory(*samplingStrategyFactoryConfig)
if err != nil {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
log.Fatalf("Cannot initialize sampling strategy factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
Expand Down Expand Up @@ -133,13 +133,13 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create sampling store factory", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v, logger)
if err := strategyStoreFactory.Initialize(collectorMetricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
samplingStrategyFactory.InitFromViper(v, logger)
if err := samplingStrategyFactory.Initialize(collectorMetricsFactory, ssFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy factory", zap.Error(err))
}
strategyStore, aggregator, err := strategyStoreFactory.CreateStrategyStore()
samplingProvider, samplingAggregator, err := samplingStrategyFactory.CreateStrategyProvider()
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
logger.Fatal("Failed to create sampling strategy provider", zap.Error(err))
}

aOpts := new(agentApp.Builder).InitFromViper(v)
Expand All @@ -161,14 +161,14 @@ by default uses only in-memory database.`,

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
Logger: logger,
MetricsFactory: collectorMetricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
Aggregator: aggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
ServiceName: "jaeger-collector",
Logger: logger,
MetricsFactory: collectorMetricsFactory,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
SamplingAggregator: samplingAggregator,
HealthCheck: svc.HC(),
TenancyMgr: tm,
})
if err := c.Start(cOpts); err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -238,7 +238,7 @@ by default uses only in-memory database.`,
agentGrpcRep.AddFlags,
collectorFlags.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
samplingStrategyFactory.AddFlags,
metricsReaderFactory.AddFlags,
)

Expand Down
78 changes: 39 additions & 39 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/model"
Expand All @@ -46,16 +46,16 @@ const (
// Collector returns the collector as a manageable unit of work
type Collector struct {
// required to start a new collector
serviceName string
logger *zap.Logger
metricsFactory metrics.Factory
spanWriter spanstore.Writer
strategyStore strategystore.StrategyStore
aggregator strategystore.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.Manager
serviceName string
logger *zap.Logger
metricsFactory metrics.Factory
spanWriter spanstore.Writer
samplingProvider samplingstrategy.Provider
samplingAggregator samplingstrategy.Aggregator
hCheck *healthcheck.HealthCheck
spanProcessor processor.SpanProcessor
spanHandlers *SpanHandlers
tenancyMgr *tenancy.Manager

// state, read only
hServer *http.Server
Expand All @@ -69,27 +69,27 @@ type Collector struct {

// CollectorParams to construct a new Jaeger Collector.
type CollectorParams struct {
ServiceName string
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
StrategyStore strategystore.StrategyStore
Aggregator strategystore.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.Manager
ServiceName string
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
SamplingProvider samplingstrategy.Provider
SamplingAggregator samplingstrategy.Aggregator
HealthCheck *healthcheck.HealthCheck
TenancyMgr *tenancy.Manager
}

// New constructs a new collector component, ready to be started
func New(params *CollectorParams) *Collector {
return &Collector{
serviceName: params.ServiceName,
logger: params.Logger,
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
strategyStore: params.StrategyStore,
aggregator: params.Aggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
serviceName: params.ServiceName,
logger: params.Logger,
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
samplingProvider: params.SamplingProvider,
samplingAggregator: params.SamplingAggregator,
hCheck: params.HealthCheck,
tenancyMgr: params.TenancyMgr,
}
}

Expand All @@ -104,9 +104,9 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
}

var additionalProcessors []ProcessSpan
if c.aggregator != nil {
if c.samplingAggregator != nil {
additionalProcessors = append(additionalProcessors, func(span *model.Span, _ /* tenant */ string) {
c.aggregator.HandleRootSpan(span, c.logger)
c.samplingAggregator.HandleRootSpan(span, c.logger)
})
}

Expand All @@ -117,7 +117,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
HostPort: options.GRPC.HostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: options.GRPC.TLS,
SamplingStore: c.strategyStore,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength,
MaxConnectionAge: options.GRPC.MaxConnectionAge,
Expand All @@ -129,13 +129,13 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
c.grpcServer = grpcServer

httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Logger: c.logger,
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("could not start HTTP server: %w", err)
Expand Down Expand Up @@ -213,8 +213,8 @@ func (c *Collector) Close() error {
}

// aggregator does not exist for all strategy stores. only Close() if exists.
if c.aggregator != nil {
if err := c.aggregator.Close(); err != nil {
if c.samplingAggregator != nil {
if err := c.samplingAggregator.Close(); err != nil {
c.logger.Error("failed to close aggregator.", zap.Error(err))
}
}
Expand Down
72 changes: 36 additions & 36 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ func TestNewCollector(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})

collectorOpts := optionsForEphemeralPorts()
Expand All @@ -102,17 +102,17 @@ func TestCollector_StartErrors(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})
err := c.Start(options)
require.Error(t, err)
Expand Down Expand Up @@ -144,13 +144,13 @@ func TestCollector_StartErrors(t *testing.T) {
run("OTLP/HTTP", options, "could not start OTLP receiver")
}

type mockStrategyStore struct{}
type mockSamplingProvider struct{}

func (*mockStrategyStore) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) {
func (*mockSamplingProvider) GetSamplingStrategy(context.Context, string /* serviceName */) (*api_v2.SamplingStrategyResponse, error) {
return &api_v2.SamplingStrategyResponse{}, nil
}

func (*mockStrategyStore) Close() error {
func (*mockSamplingProvider) Close() error {
return nil
}

Expand All @@ -161,17 +161,17 @@ func TestCollector_PublishOpts(t *testing.T) {
metricsFactory := metricstest.NewFactory(time.Second)
defer metricsFactory.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
Expand All @@ -191,19 +191,19 @@ func TestAggregator(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
defer baseMetrics.Backend.Stop()
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}
samplingProvider := &mockSamplingProvider{}
agg := &mockAggregator{}
tm := &tenancy.Manager{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
Aggregator: agg,
TenancyMgr: tm,
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
SamplingProvider: samplingProvider,
HealthCheck: hc,
SamplingAggregator: agg,
TenancyMgr: tm,
})
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
Expand Down
10 changes: 5 additions & 5 deletions cmd/collector/app/sampling/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ package sampling
import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler is sampling strategy handler for gRPC.
type GRPCHandler struct {
store strategystore.StrategyStore
samplingProvider samplingstrategy.Provider
}

// NewGRPCHandler creates a handler that controls sampling strategies for services.
func NewGRPCHandler(store strategystore.StrategyStore) GRPCHandler {
func NewGRPCHandler(provider samplingstrategy.Provider) GRPCHandler {
return GRPCHandler{
store: store,
samplingProvider: provider,
}
}

// GetSamplingStrategy returns sampling decision from store.
func (s GRPCHandler) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) {
return s.store.GetSamplingStrategy(ctx, param.GetServiceName())
return s.samplingProvider.GetSamplingStrategy(ctx, param.GetServiceName())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package strategystore
package samplingstrategy

import (
"testing"
Expand Down
Loading

0 comments on commit 252980d

Please sign in to comment.