Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't start components until Run is called #2296

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/component/otelcol/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error {
})

// Schedule the components to run once our component is running.
a.sched.Schedule(a.ctx, host, components...)
a.sched.Schedule(a.ctx, func() {}, host, components...)
return nil
}

Expand Down
11 changes: 6 additions & 5 deletions internal/component/otelcol/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,
}
if err := p.Update(args); err != nil {
Expand Down Expand Up @@ -214,11 +214,12 @@ func (p *Connector) Update(args component.Arguments) error {
return errors.New("unsupported connector type")
}

updateConsumersFunc := func() {
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
}

// Schedule the components to run once our component is running.
p.consumer.Pause()
p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector)
p.sched.Schedule(p.ctx, host, components...)
p.consumer.Resume()
p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...)
return nil
}

Expand Down
11 changes: 6 additions & 5 deletions internal/component/otelcol/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,

supportedSignals: supportedSignals,
Expand Down Expand Up @@ -242,11 +242,12 @@ func (e *Exporter) Update(args component.Arguments) error {
}
}

updateConsumersFunc := func() {
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
}

// Schedule the components to run once our component is running.
e.consumer.Pause()
e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter)
e.sched.Schedule(e.ctx, host, components...)
e.consumer.Resume()
e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
e.sched.Schedule(e.ctx, host, components...)
e.sched.Schedule(e.ctx, func() {}, host, components...)
return nil
}

Expand Down
82 changes: 73 additions & 9 deletions internal/component/otelcol/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,99 @@ type Scheduler struct {
schedMut sync.Mutex
schedComponents []otelcomponent.Component // Most recently created components
host otelcomponent.Host
running bool

// onPause is called when scheduler is making changes to running components.
onPause func()
// onResume is called when scheduler is done making changes to running components.
onResume func()
}

// New creates a new unstarted Scheduler. Call Run to start it, and call
// Schedule to schedule components to run.
func New(l log.Logger) *Scheduler {
return &Scheduler{
log: l,
log: l,
onPause: func() {},
onResume: func() {},
}
}

// Schedule schedules a new set of OpenTelemetry Components to run. Components
// will only be scheduled when the Scheduler is running.
// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks.
// The callbacks are a useful way of pausing and resuming the ingestion of data by the components:
// * onPause() is called before the scheduler stops the components.
// * onResume() is called after the scheduler starts the components.
// The callbacks are used by the Schedule() and Run() functions.
// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran.
func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler {
return &Scheduler{
log: l,
onPause: onPause,
onResume: onResume,
}
}

// Schedule a new set of OpenTelemetry Components to run.
// Components will only be started when the Scheduler's Run() function has been called.
//
// Schedule completely overrides the set of previously running components;
// components which have been removed since the last call to Schedule will be
// stopped.
func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) {
// Schedule() completely overrides the set of previously running components.
// Components which have been removed since the last call to Schedule will be stopped.
func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) {
ptodev marked this conversation as resolved.
Show resolved Hide resolved
cs.schedMut.Lock()
defer cs.schedMut.Unlock()

// Stop the old components before running new scheduled ones.
// If the scheduler isn't running yet, just update the state.
// That way the Run function is ready to go.
if !cs.running {
cs.schedComponents = cc
cs.host = h
updateConsumers()
return
}

// The new components must be setup in this order:
//
// 1. Pause consumers
// 2. Stop the old components
// 3. Change the consumers
// 4. Start the new components
// 5. Start the consumer
//
// There could be race conditions if the order above is not followed.

// 1. Pause consumers
// This prevents them from accepting new data while we're shutting them down.
cs.onPause()

// 2. Stop the old components
cs.stopComponents(ctx, cs.schedComponents...)

level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc))
// 3. Change the consumers
// This is can only be done after stopping the pervious components and before starting the new ones.
updateConsumers()

// 4. Start the new components
level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents))
cs.schedComponents = cs.startComponents(ctx, h, cc...)
cs.host = h
//TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers?

// 5. Start the consumer
// The new components will now start accepting telemetry data.
cs.onResume()
}

// Run starts the Scheduler and stops the components when the context is cancelled.
func (cs *Scheduler) Run(ctx context.Context) error {
cs.schedMut.Lock()
cs.running = true

cs.onPause()
cs.startComponents(ctx, cs.host, cs.schedComponents...)
cs.onResume()

cs.schedMut.Unlock()

// Make sure we terminate all of our running components on shutdown.
defer func() {
cs.schedMut.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started trigger once it is
// running.
component, started, _ := newTriggerComponent()
cs.Schedule(context.Background(), h, component)
cs.Schedule(context.Background(), func() {}, h, component)
require.NoError(t, started.Wait(5*time.Second), "component did not start")
})

Expand All @@ -50,12 +50,12 @@ func TestScheduler(t *testing.T) {
// Schedule our component, which should notify the started and stopped
// trigger once it starts and stops respectively.
component, started, stopped := newTriggerComponent()
cs.Schedule(context.Background(), h, component)
cs.Schedule(context.Background(), func() {}, h, component)

// Wait for the component to start, and then unschedule all components, which
// should cause our running component to terminate.
require.NoError(t, started.Wait(5*time.Second), "component did not start")
cs.Schedule(context.Background(), h)
cs.Schedule(context.Background(), func() {}, h)
require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown")
})

Expand All @@ -78,7 +78,7 @@ func TestScheduler(t *testing.T) {
// Schedule our component which will notify our trigger when Shutdown gets
// called.
component, started, stopped := newTriggerComponent()
cs.Schedule(ctx, h, component)
cs.Schedule(ctx, func() {}, h, component)

// Wait for the component to start, and then stop our scheduler, which
// should cause our running component to terminate.
Expand Down
11 changes: 6 additions & 5 deletions internal/component/otelcol/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc
factory: f,
consumer: consumer,

sched: scheduler.New(opts.Logger),
sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume),
collector: collector,

liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID),
Expand Down Expand Up @@ -237,11 +237,12 @@ func (p *Processor) Update(args component.Arguments) error {
}
}

updateConsumersFunc := func() {
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
}

// Schedule the components to run once our component is running.
p.consumer.Pause()
p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor)
p.sched.Schedule(p.ctx, host, components...)
p.consumer.Resume()
p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error {
}

// Schedule the components to run once our component is running.
r.sched.Schedule(r.ctx, host, components...)
r.sched.Schedule(r.ctx, func() {}, host, components...)
return nil
}

Expand Down
Loading