From 6257f3b28a940c3502dd3450eac1a3101988302e Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 18 Dec 2024 09:52:16 +0200 Subject: [PATCH 1/3] Don't start components until Run is called --- .../otelcol/internal/scheduler/scheduler.go | 51 +++++++++++++++++-- .../component/otelcol/processor/processor.go | 6 +-- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 4649bda527..9eeec8da43 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,13 +37,33 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host + running bool + + // onPause is called when scheduler is making changes to running components. + onPause func() + // onResume is called when scheduler is done making changes to running components. + onResume func() } +// TODO: Delete this function? I don't think it's used anywhere. // New creates a new unstarted Scheduler. Call Run to start it, and call // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, + log: l, + onPause: func() {}, + onResume: func() {}, + } +} + +// TODO: Rename to "New"? +// TODO: Write a new comment to explain what this method does. +func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { + //TODO: Instead of assuming that the scheduler is paused, just call onPause() here. + return &Scheduler{ + log: l, + onPause: onPause, + onResume: onResume, } } @@ -57,15 +77,23 @@ func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...o cs.schedMut.Lock() defer cs.schedMut.Unlock() - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, cs.schedComponents...) + cs.schedComponents = cc + cs.host = h + + if !cs.running { + return + } - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cc)) - cs.schedComponents = cs.startComponents(ctx, h, cc...) + cs.runScheduled(ctx) } // Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { + cs.schedMut.Lock() + cs.running = true + cs.runScheduled(ctx) + cs.schedMut.Unlock() + // Make sure we terminate all of our running components on shutdown. defer func() { cs.schedMut.Lock() @@ -77,6 +105,19 @@ func (cs *Scheduler) Run(ctx context.Context) error { return nil } +func (cs *Scheduler) runScheduled(ctx context.Context) { + cs.onPause() + + // Stop the old components before running new scheduled ones. + cs.stopComponents(ctx, cs.schedComponents...) + + level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) + cs.schedComponents = cs.startComponents(ctx, cs.host, cs.schedComponents...) + //TODO: Check if there were errors? What if the trace component failed but the metrics one didn't? Should we resume all consumers? + + cs.onResume() +} + func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { for _, c := range cc { if err := c.Shutdown(ctx); err != nil { diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 312369f079..089b4ca6d2 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelprocessor.Factory, args Arguments) (*Proc factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, liveDebuggingConsumer: livedebuggingconsumer.New(debugDataPublisher.(livedebugging.DebugDataPublisher), opts.ID), @@ -139,6 +139,8 @@ func (p *Processor) Run(ctx context.Context) error { // configuration for OpenTelemetry Collector processor configuration and manage // the underlying OpenTelemetry Collector processor. func (p *Processor) Update(args component.Arguments) error { + //TODO: Lock a mutex? There could be a race condition with multiple calls to Update + p.args = args.(Arguments) host := scheduler.NewHost( @@ -238,10 +240,8 @@ func (p *Processor) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - p.consumer.Pause() p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) p.sched.Schedule(p.ctx, host, components...) - p.consumer.Resume() return nil } From 3a0b4f69495092fc0934cd39271fff7cb93b6636 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 18 Dec 2024 13:31:32 +0200 Subject: [PATCH 2/3] Update consumers after stopping the component --- internal/component/otelcol/auth/auth.go | 2 +- .../component/otelcol/connector/connector.go | 9 ++-- .../component/otelcol/exporter/exporter.go | 9 ++-- .../component/otelcol/extension/extension.go | 2 +- .../otelcol/internal/scheduler/scheduler.go | 43 +++++++++++-------- .../internal/scheduler/scheduler_test.go | 8 ++-- .../component/otelcol/processor/processor.go | 7 ++- .../component/otelcol/receiver/receiver.go | 2 +- 8 files changed, 46 insertions(+), 36 deletions(-) diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index e333e61122..9e9cc5fdf2 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(a.ctx, host, components...) + a.sched.Schedule(a.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index a0eda435f0..5b858afc41 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -214,11 +214,12 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + } + // Schedule the components to run once our component is running. - p.consumer.Pause() - p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) - p.sched.Schedule(p.ctx, host, components...) - p.consumer.Resume() + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index b63d0c3487..a5b58fe143 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -242,11 +242,12 @@ func (e *Exporter) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + } + // Schedule the components to run once our component is running. - e.consumer.Pause() - e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) - e.sched.Schedule(e.ctx, host, components...) - e.consumer.Resume() + e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 457e0a9b17..7eca6e7349 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(e.ctx, host, components...) + e.sched.Schedule(e.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 9eeec8da43..12fee6b63d 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -73,25 +73,43 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched // Schedule completely overrides the set of previously running components; // components which have been removed since the last call to Schedule will be // stopped. -func (cs *Scheduler) Schedule(ctx context.Context, h otelcomponent.Host, cc ...otelcomponent.Component) { +func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - cs.schedComponents = cc - cs.host = h - + // If the scheduler isn't running yet, just update the state. + // That way the Run function is ready to go. if !cs.running { + cs.schedComponents = cc + cs.host = h + updateConsumers() return } - cs.runScheduled(ctx) + cs.onPause() + + // Stop the old components before running new scheduled ones. + cs.stopComponents(ctx, cs.schedComponents...) + + updateConsumers() + + level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) + cs.schedComponents = cs.startComponents(ctx, h, cc...) + cs.host = h + //TODO: Check if there were errors? What if the trace component failed but the metrics one didn't? Should we resume all consumers? + + cs.onResume() } // Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { cs.schedMut.Lock() cs.running = true - cs.runScheduled(ctx) + + cs.onPause() + cs.startComponents(ctx, cs.host, cs.schedComponents...) + cs.onResume() + cs.schedMut.Unlock() // Make sure we terminate all of our running components on shutdown. @@ -105,19 +123,6 @@ func (cs *Scheduler) Run(ctx context.Context) error { return nil } -func (cs *Scheduler) runScheduled(ctx context.Context) { - cs.onPause() - - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, cs.schedComponents...) - - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) - cs.schedComponents = cs.startComponents(ctx, cs.host, cs.schedComponents...) - //TODO: Check if there were errors? What if the trace component failed but the metrics one didn't? Should we resume all consumers? - - cs.onResume() -} - func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { for _, c := range cc { if err := c.Shutdown(ctx); err != nil { diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 5a9a59bd78..477809c960 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -30,7 +30,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(context.Background(), h, component) + cs.Schedule(context.Background(), func() {}, h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -50,12 +50,12 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(context.Background(), h, component) + cs.Schedule(context.Background(), func() {}, h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(context.Background(), h) + cs.Schedule(context.Background(), func() {}, h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) @@ -78,7 +78,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(ctx, h, component) + cs.Schedule(ctx, func() {}, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 089b4ca6d2..c7b91d5053 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -239,9 +239,12 @@ func (p *Processor) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + } + // Schedule the components to run once our component is running. - p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) - p.sched.Schedule(p.ctx, host, components...) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 673552d271..ff4c3c21d1 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(r.ctx, host, components...) + r.sched.Schedule(r.ctx, func() {}, host, components...) return nil } From 60aeb74060ca6975fc2ed45f25d95d0f75fcd129 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Wed, 18 Dec 2024 15:41:06 +0200 Subject: [PATCH 3/3] Minor fixes --- .../component/otelcol/connector/connector.go | 2 +- .../component/otelcol/exporter/exporter.go | 2 +- .../otelcol/internal/scheduler/scheduler.go | 40 ++++++++++++++----- .../component/otelcol/processor/processor.go | 2 - 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index 5b858afc41..643730000f 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -117,7 +117,7 @@ func New(opts component.Options, f otelconnector.Factory, args Arguments) (*Conn factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, } if err := p.Update(args); err != nil { diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index a5b58fe143..a4b9e15dcc 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -131,7 +131,7 @@ func New(opts component.Options, f otelexporter.Factory, args Arguments, support factory: f, consumer: consumer, - sched: scheduler.New(opts.Logger), + sched: scheduler.NewWithPauseCallbacks(opts.Logger, consumer.Pause, consumer.Resume), collector: collector, supportedSignals: supportedSignals, diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 12fee6b63d..b519db4b32 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -45,7 +45,6 @@ type Scheduler struct { onResume func() } -// TODO: Delete this function? I don't think it's used anywhere. // New creates a new unstarted Scheduler. Call Run to start it, and call // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { @@ -56,10 +55,13 @@ func New(l log.Logger) *Scheduler { } } -// TODO: Rename to "New"? -// TODO: Write a new comment to explain what this method does. +// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks. +// The callbacks are a useful way of pausing and resuming the ingestion of data by the components: +// * onPause() is called before the scheduler stops the components. +// * onResume() is called after the scheduler starts the components. +// The callbacks are used by the Schedule() and Run() functions. +// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran. func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { - //TODO: Instead of assuming that the scheduler is paused, just call onPause() here. return &Scheduler{ log: l, onPause: onPause, @@ -67,12 +69,11 @@ func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Sched } } -// Schedule schedules a new set of OpenTelemetry Components to run. Components -// will only be scheduled when the Scheduler is running. +// Schedule a new set of OpenTelemetry Components to run. +// Components will only be started when the Scheduler's Run() function has been called. // -// Schedule completely overrides the set of previously running components; -// components which have been removed since the last call to Schedule will be -// stopped. +// Schedule() completely overrides the set of previously running components. +// Components which have been removed since the last call to Schedule will be stopped. func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() @@ -86,18 +87,35 @@ func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h ote return } + // The new components must be setup in this order: + // + // 1. Pause consumers + // 2. Stop the old components + // 3. Change the consumers + // 4. Start the new components + // 5. Start the consumer + // + // There could be race conditions if the order above is not followed. + + // 1. Pause consumers + // This prevents them from accepting new data while we're shutting them down. cs.onPause() - // Stop the old components before running new scheduled ones. + // 2. Stop the old components cs.stopComponents(ctx, cs.schedComponents...) + // 3. Change the consumers + // This is can only be done after stopping the pervious components and before starting the new ones. updateConsumers() + // 4. Start the new components level.Debug(cs.log).Log("msg", "scheduling components", "count", len(cs.schedComponents)) cs.schedComponents = cs.startComponents(ctx, h, cc...) cs.host = h - //TODO: Check if there were errors? What if the trace component failed but the metrics one didn't? Should we resume all consumers? + //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? + // 5. Start the consumer + // The new components will now start accepting telemetry data. cs.onResume() } diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index c7b91d5053..08a7a6181c 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -139,8 +139,6 @@ func (p *Processor) Run(ctx context.Context) error { // configuration for OpenTelemetry Collector processor configuration and manage // the underlying OpenTelemetry Collector processor. func (p *Processor) Update(args component.Arguments) error { - //TODO: Lock a mutex? There could be a race condition with multiple calls to Update - p.args = args.(Arguments) host := scheduler.NewHost(