From afd4222d1714b7462bf0638b2c4706dfa3180168 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 28 Nov 2024 10:22:32 +0100 Subject: [PATCH] bugfix: idle publishers would terminate too early --- pkg/amqp10_client/publisher.go | 2 +- pkg/mqtt_client/publisher.go | 2 +- pkg/mqtt_client/publisher_v5.go | 2 +- pkg/stomp_client/publisher.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/amqp10_client/publisher.go b/pkg/amqp10_client/publisher.go index d883305..1dad4e9 100644 --- a/pkg/amqp10_client/publisher.go +++ b/pkg/amqp10_client/publisher.go @@ -185,7 +185,7 @@ func (p *Amqp10Publisher) StartFullSpeed(ctx context.Context) { } func (p *Amqp10Publisher) StartIdle(ctx context.Context) { - _ = ctx.Done() + <-ctx.Done() } func (p *Amqp10Publisher) StartRateLimited(ctx context.Context) { diff --git a/pkg/mqtt_client/publisher.go b/pkg/mqtt_client/publisher.go index f77c00f..46e5fd1 100644 --- a/pkg/mqtt_client/publisher.go +++ b/pkg/mqtt_client/publisher.go @@ -57,7 +57,7 @@ func (p MqttPublisher) StartFullSpeed(ctx context.Context) { func (p MqttPublisher) StartIdle(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic) - _ = ctx.Done() + <-ctx.Done() } func (p MqttPublisher) StartRateLimited(ctx context.Context) { diff --git a/pkg/mqtt_client/publisher_v5.go b/pkg/mqtt_client/publisher_v5.go index 614dd2d..7af3885 100644 --- a/pkg/mqtt_client/publisher_v5.go +++ b/pkg/mqtt_client/publisher_v5.go @@ -60,7 +60,7 @@ func (p Mqtt5Publisher) StartFullSpeed(ctx context.Context) { func (p Mqtt5Publisher) StartIdle(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic) - _ = ctx.Done() + <-ctx.Done() } func (p Mqtt5Publisher) StartRateLimited(ctx context.Context) { diff --git a/pkg/stomp_client/publisher.go b/pkg/stomp_client/publisher.go index 1ab8991..2cff9ec 100644 --- a/pkg/stomp_client/publisher.go +++ b/pkg/stomp_client/publisher.go @@ -109,7 +109,7 @@ func (p *StompPublisher) StartFullSpeed(ctx context.Context) { func (p *StompPublisher) StartIdle(ctx context.Context) { log.Info("publisher started", "id", p.Id, "rate", "-", "destination", p.Topic) - _ = ctx.Done() + <-ctx.Done() } func (p *StompPublisher) StartRateLimited(ctx context.Context) {