Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Jun 16, 2024
1 parent 0da4655 commit aff467e
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 79 deletions.
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func GetStorageFactory(name string, host component.Host) (storage.Factory, error
f, ok := comp.(Extension).Factory(name)
if !ok {
return nil, fmt.Errorf(
"cannot find storage '%s' declared by '%s' extension",
"cannot find definition of storage '%s' in the configuration for extension '%s'",

Check warning on line 58 in cmd/jaeger/internal/extension/jaegerstorage/extension.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/extension.go#L58

Added line #L58 was not covered by tests
name, componentType,
)
}
Expand Down
133 changes: 75 additions & 58 deletions cmd/jaeger/internal/extension/remotesampling/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ var _ extension.Extension = (*rsExtension)(nil)
const defaultResourceName = "sampling_store_leader"

type rsExtension struct {
cfg *Config
telemetry component.TelemetrySettings
httpServer *http.Server
grpcServer *grpc.Server
strategyStore strategystore.StrategyStore
shutdownWG sync.WaitGroup
cfg *Config
telemetry component.TelemetrySettings
httpServer *http.Server
grpcServer *grpc.Server
strategyProvider strategystore.StrategyStore // TODO we should rename this to Provider, not "store"
adaptiveStore samplingstore.Store
distLock *leaderelection.DistributedElectionParticipant
shutdownWG sync.WaitGroup
}

func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtension {
Expand All @@ -51,58 +53,35 @@ func newExtension(cfg *Config, telemetry component.TelemetrySettings) *rsExtensi
}
}

func GetExtensionConfig(host component.Host) (AdaptiveConfig, error) {
// GetAdaptiveSamplingStore locates the `remotesampling` extension in Host
// and returns the sampling store and a loader/follower implementation, provided
// that the extension is configured with adaptive sampling (vs. file-based config).
func GetAdaptiveSamplingStore(
host component.Host,
) (samplingstore.Store, *leaderelection.DistributedElectionParticipant, error) {
var comp component.Component
var compID component.ID
for id, ext := range host.GetExtensions() {
if id.Type() == componentType {
comp = ext
compID = id
break
}
}
if comp == nil {
return AdaptiveConfig{}, fmt.Errorf(
return nil, nil, fmt.Errorf(
"cannot find extension '%s' (make sure it's defined earlier in the config)",
componentType,
)
}
return comp.(*rsExtension).cfg.Adaptive, nil
}

// GetSamplingStorage retrieves a storage factory from `jaegerstorage` extension
// and uses it to create a sampling store and a loader/follower implementation.
func GetSamplingStorage(
samplingStorage string,
host component.Host,
opts adaptive.Options,
logger *zap.Logger,
) (samplingstore.Store, *leaderelection.DistributedElectionParticipant, error) {
f, err := jaegerstorage.GetStorageFactory(samplingStorage, host)
if err != nil {
return nil, nil, fmt.Errorf("cannot find storage factory: %w", err)
}

ssStore, ok := f.(storage.SamplingStoreFactory)
ext, ok := comp.(*rsExtension)
if !ok {
return nil, nil, fmt.Errorf("storage factory of type %s does not support sampling store", samplingStorage)
return nil, nil, fmt.Errorf("extension '%s' is not of type '%s'", compID, componentType)
}

lock, err := ssStore.CreateLock()
if err != nil {
return nil, nil, err
if ext.adaptiveStore == nil || ext.distLock == nil {
return nil, nil, fmt.Errorf("extension '%s' is not configured for adaptive sampling", compID)
}

store, err := ssStore.CreateSamplingStore(opts.AggregationBuckets)
if err != nil {
return nil, nil, err
}

ep := leaderelection.NewElectionParticipant(lock, defaultResourceName, leaderelection.ElectionParticipantOptions{
FollowerLeaseRefreshInterval: opts.FollowerLeaseRefreshInterval,
LeaderLeaseRefreshInterval: opts.LeaderLeaseRefreshInterval,
Logger: logger,
})

return store, ep, nil
return ext.adaptiveStore, ext.distLock, nil
}

func (ext *rsExtension) Start(ctx context.Context, host component.Host) error {
Expand Down Expand Up @@ -142,9 +121,15 @@ func (ext *rsExtension) Shutdown(ctx context.Context) error {
ext.grpcServer.GracefulStop()
}

if ext.strategyStore != nil {
if err := ext.strategyStore.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to stop strategy store: %w", err))
if ext.distLock != nil {
if err := ext.distLock.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to stop the distributed lock: %w", err))
}
}

if ext.strategyProvider != nil {
if err := ext.strategyProvider.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to stop strategy provider: %w", err))
}
}
return errors.Join(errs...)
Expand All @@ -157,32 +142,64 @@ func (ext *rsExtension) startFileStrategyStore(_ context.Context) error {

// contextcheck linter complains about next line that context is not passed.
//nolint
ss, err := static.NewStrategyStore(opts, ext.telemetry.Logger)
provider, err := static.NewStrategyStore(opts, ext.telemetry.Logger)
if err != nil {
return fmt.Errorf("failed to create the local file strategy store: %w", err)
}

ext.strategyStore = ss
ext.strategyProvider = provider
return nil
}

func (ext *rsExtension) startAdaptiveStrategyStore(host component.Host) error {
storageName := ext.cfg.Adaptive.StrategyStore
f, err := jaegerstorage.GetStorageFactory(storageName, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

storeFactory, ok := f.(storage.SamplingStoreFactory)
if !ok {
return fmt.Errorf("storage '%s' does not support sampling store", storageName)
}

store, err := storeFactory.CreateSamplingStore(ext.cfg.Adaptive.AggregationBuckets)
if err != nil {
return fmt.Errorf("failed to create the sampling store: %w", err)
}
ext.adaptiveStore = store

{
lock, err := storeFactory.CreateLock()
if err != nil {
return fmt.Errorf("failed to create the distributed lock: %w", err)
}

ep := leaderelection.NewElectionParticipant(lock, defaultResourceName,
leaderelection.ElectionParticipantOptions{
LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval,
FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval,
Logger: ext.telemetry.Logger,
})
if err := ep.Start(); err != nil {
return fmt.Errorf("failed to start the leader election participant: %w", err)
}
ext.distLock = ep
}

// TODO it is unlikely all these options are needed, we should refactor more
opts := adaptive.Options{
InitialSamplingProbability: ext.cfg.Adaptive.InitialSamplingProbability,
MinSamplesPerSecond: ext.cfg.Adaptive.MinSamplesPerSecond,
LeaderLeaseRefreshInterval: ext.cfg.Adaptive.LeaderLeaseRefreshInterval,
FollowerLeaseRefreshInterval: ext.cfg.Adaptive.FollowerLeaseRefreshInterval,
AggregationBuckets: ext.cfg.Adaptive.AggregationBuckets,
}

store, ep, err := GetSamplingStorage(ext.cfg.Adaptive.StrategyStore, host, opts, ext.telemetry.Logger)
if err != nil {
return err
provider := adaptive.NewStrategyStore(opts, ext.telemetry.Logger, ext.distLock, store)
if err := provider.Start(); err != nil {
return fmt.Errorf("failed to start the adaptive strategy store: %w", err)
}

ss := adaptive.NewStrategyStore(opts, ext.telemetry.Logger, ep, store)
ss.Start()
ext.strategyStore = ss
ext.strategyProvider = provider
return nil
}

Expand All @@ -193,7 +210,7 @@ func (ext *rsExtension) startGRPCServer(ctx context.Context, host component.Host
}

healthServer := health.NewServer()
api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyStore))
api_v2.RegisterSamplingManagerServer(ext.grpcServer, sampling.NewGRPCHandler(ext.strategyProvider))
healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(ext.grpcServer, healthServer)
ext.telemetry.Logger.Info("Starting GRPC server", zap.String("endpoint", ext.cfg.GRPC.NetAddr.Endpoint))
Expand All @@ -220,7 +237,7 @@ func (ext *rsExtension) startHTTPServer(ctx context.Context, host component.Host

handler := clientcfghttp.NewHTTPHandler(clientcfghttp.HTTPHandlerParams{
ConfigManager: &clientcfghttp.ConfigManager{
SamplingStrategyStore: ext.strategyStore,
SamplingStrategyStore: ext.strategyProvider,
},
MetricsFactory: metrics.NullFactory,
BasePath: "/api",
Expand Down
30 changes: 10 additions & 20 deletions cmd/jaeger/internal/processors/adaptivesampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,24 @@ func newTraceProcessor(cfg Config, otel component.TelemetrySettings) *traceProce
}

func (tp *traceProcessor) start(_ context.Context, host component.Host) error {
extCfg, err := remotesampling.GetExtensionConfig(host)

tp.logger.Info("Adaptive sampling extension config", zap.Any("config", extCfg))

store, ep, err := remotesampling.GetAdaptiveSamplingStore(host)
if err != nil {
return err
}
opts := adaptive.Options{
InitialSamplingProbability: extCfg.InitialSamplingProbability,
TargetSamplesPerSecond: tp.config.TargetSamplesPerSecond,
DeltaTolerance: tp.config.DeltaTolerance,
CalculationInterval: tp.config.CalculationInterval,
AggregationBuckets: extCfg.AggregationBuckets,
BucketsForCalculation: tp.config.BucketsForCalculation,
Delay: tp.config.Delay,
MinSamplingProbability: tp.config.MinSamplingProbability,
LeaderLeaseRefreshInterval: extCfg.LeaderLeaseRefreshInterval,
FollowerLeaseRefreshInterval: extCfg.FollowerLeaseRefreshInterval,
}

store, ep, err := remotesampling.GetSamplingStorage(tp.config.StrategyStore, host, opts, tp.logger)
if err != nil {
return err
opts := adaptive.Options{
TargetSamplesPerSecond: tp.config.TargetSamplesPerSecond,
DeltaTolerance: tp.config.DeltaTolerance,
CalculationInterval: tp.config.CalculationInterval,
BucketsForCalculation: tp.config.BucketsForCalculation,
Delay: tp.config.Delay,
MinSamplingProbability: tp.config.MinSamplingProbability,
}

// TODO it is unlikely that aggregator needs the full Options object, we need to refactor.
agg, err := adaptive.NewAggregator(opts, tp.logger, metrics.NullFactory, ep, store)
if err != nil {
return err
return fmt.Errorf("failed to create the adpative sampling aggregator : %w", err)
}

agg.Start()
Expand Down

0 comments on commit aff467e

Please sign in to comment.