Skip to content

Commit

Permalink
Merge pull request #83 from sumup-oss/modify-rabbitmq-add-headers
Browse files Browse the repository at this point in the history
Add ability to receive headers when consuming messages from RabbitMQ.
  • Loading branch information
syndbg authored Feb 19, 2024
2 parents 249d0ff + 8f95ed9 commit 54413df
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
22 changes: 16 additions & 6 deletions rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func (c *Consumer) Run(ctx context.Context) error {
c.logger.Info("Received context cancel. Going to close RMQ connections.")
err = channel.Cancel(c.handler.GetConsumerTag(), false)
if err != nil {
c.logger.Warn("failed to cancel the RMQ channel while stopping handler", logger.ErrorField(err))
c.logger.Warn(
"failed to cancel the RMQ channel while stopping handler",
logger.ErrorField(err),
)
}

// NOTE: We must process the events before we close the channel
Expand All @@ -112,7 +115,11 @@ func (c *Consumer) Run(ctx context.Context) error {

err = channel.Qos(c.cfg.PrefetchCount, 0, false)
if err != nil {
return stacktrace.Propagate(err, "failed to set RMQ channel's QoS prefetch count to: %d", c.cfg.PrefetchCount)
return stacktrace.Propagate(
err,
"failed to set RMQ channel's QoS prefetch count to: %d",
c.cfg.PrefetchCount,
)
}

deliveries, err := channel.Consume(
Expand Down Expand Up @@ -164,10 +171,13 @@ func (c *Consumer) handleDeliveries(
func (c *Consumer) handleSingleDelivery(ctx context.Context, d *amqp.Delivery) error {
c.metric.ObserveMsgDelivered()

acknowledgement, err := c.handler.ReceiveMessage(ctx, &Message{
Body: d.Body,
CorrelationID: d.CorrelationId,
})
acknowledgement, err := c.handler.ReceiveMessage(
ctx, &Message{
Body: d.Body,
CorrelationID: d.CorrelationId,
Headers: d.Headers,
},
)
if err != nil {
return stacktrace.Propagate(err, "handler returned error")
}
Expand Down
2 changes: 2 additions & 0 deletions rabbitmq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,6 @@ type Message struct {

// Correlation identifier
CorrelationID string
// Message headers
Headers map[string]interface{}
}

0 comments on commit 54413df

Please sign in to comment.