diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index e333e6112..9e9cc5fdf 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(a.ctx, host, components...) + a.sched.Schedule(a.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index a0eda435f..643730000 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, } if err := p.Update(args); err != nil { @@ -214,11 +214,12 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + } + // Schedule the components to run once our component is running. - p.consumer.Pause() - p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) - p.sched.Schedule(p.ctx, host, components...) - p.consumer.Resume() + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index b63d0c348..a4b9e15dc 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, supportedSignals: supportedSignals, @@ -242,11 +242,12 @@ func (e *Exporter) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + } + // Schedule the components to run once our component is running. - e.consumer.Pause() - e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) - e.sched.Schedule(e.ctx, host, components...) - e.consumer.Resume() + e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 457e0a9b1..7eca6e734 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(e.ctx, host, components...) + e.sched.Schedule(e.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 4649bda52..b519db4b3 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,35 +37,99 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host + running bool + + // onPause is called when scheduler is making changes to running components. + onPause func() + // onResume is called when scheduler is done making changes to running components. + onResume func() } // New creates a new unstarted Scheduler. Call Run to start it, and call // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, + log: l, + onPause: func() {}, + onResume: func() {}, } } -// Schedule schedules a new set of OpenTelemetry Components to run. Components -// will only be scheduled when the Scheduler is running. +// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks. +// The callbacks are a useful way of pausing and resuming the ingestion of data by the components: +// * onPause() is called before the scheduler stops the components. +// * onResume() is called after the scheduler starts the components. +// The callbacks are used by the Schedule() and Run() functions. +// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran. +func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { + return &Scheduler{ + log: l, + onPause: onPause, + onResume: onResume, + } +} + +// Schedule a new set of OpenTelemetry Components to run. +// Components will only be started when the Scheduler's Run() function has been called. // -// Schedule completely overrides the set of previously running components; -// components which have been removed since the last call to Schedule will be -// stopped. -func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) { +// Schedule() completely overrides the set of previously running components. +// Components which have been removed since the last call to Schedule will be stopped. +func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - // Stop the old components before running new scheduled ones. + // If the scheduler isn't running yet, just update the state. + // That way the Run function is ready to go. + if !cs.running { + cs.schedComponents = cc + cs.host = h + updateConsumers() + return + } + + // The new components must be setup in this order: + // + // 1. Pause consumers + // 2. Stop the old components + // 3. Change the consumers + // 4. Start the new components + // 5. Start the consumer + // + // There could be race conditions if the order above is not followed. + + // 1. Pause consumers + // This prevents them from accepting new data while we're shutting them down. + cs.onPause() + + // 2. Stop the old components cs.stopComponents(ctx, cs.schedComponents...) - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc)) + // 3. Change the consumers + // This is can only be done after stopping the pervious components and before starting the new ones. + updateConsumers() + + // 4. Start the new components + level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) cs.schedComponents = cs.startComponents(ctx, h, cc...) + cs.host = h + //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? + + // 5. Start the consumer + // The new components will now start accepting telemetry data. + cs.onResume() } // Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { + cs.schedMut.Lock() + cs.running = true + + cs.onPause() + cs.startComponents(ctx, cs.host, cs.schedComponents...) + cs.onResume() + + cs.schedMut.Unlock() + // Make sure we terminate all of our running components on shutdown. defer func() { cs.schedMut.Lock() diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 5a9a59bd7..477809c96 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -30,7 +30,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(context.Background(), h, component) + cs.Schedule(context.Background(), func() {}, h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -50,12 +50,12 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(context.Background(), h, component) + cs.Schedule(context.Background(), func() {}, h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(context.Background(), h) + cs.Schedule(context.Background(), func() {}, h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) @@ -78,7 +78,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(ctx, h, component) + cs.Schedule(ctx, func() {}, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 312369f07..08a7a6181 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID), @@ -237,11 +237,12 @@ func (p *Processor) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + } + // Schedule the components to run once our component is running. - p.consumer.Pause() - p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) - p.sched.Schedule(p.ctx, host, components...) - p.consumer.Resume() + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 673552d27..ff4c3c21d 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(r.ctx, host, components...) + r.sched.Schedule(r.ctx, func() {}, host, components...) return nil }