Skip to content

Commit

Permalink
Add headers to go.delivery.report.fields (disabled by default)
Browse files Browse the repository at this point in the history
Disabled due to the extra CGo calls required (costly) and the
deserialization of C headers to Go Headers.
  • Loading branch information
edenhill committed Apr 28, 2021
1 parent 8bb300e commit 0c8f5b8
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 75 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

## v1.7.0

confluent-kafka-go is based on librdkafka v1.7.0, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v1.7.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.

### Enhancements

* The produced message headers are now available in the delivery report
`Message.Headers` if the Producer's `go.delivery.report.fields`
configuration property is set to include `headers`, e.g.:
`"go.delivery.report.fields": "key,value,headers"`
This comes at a performance cost and are thus disabled by default.


### Fixes

* AdminClient.CreateTopics() previously did not accept default value(-1) of
Expand Down
74 changes: 37 additions & 37 deletions kafka/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,32 @@ import (
#include "glue_rdkafka.h"
#ifdef RD_KAFKA_V_HEADERS
void chdrs_to_tmphdrs (rd_kafka_headers_t *chdrs, tmphdr_t *tmphdrs) {
size_t i = 0;
const char *name;
const void *val;
size_t size;
while (!rd_kafka_header_get_all(chdrs, i,
&tmphdrs[i].key,
&tmphdrs[i].val,
(size_t *)&tmphdrs[i].size))
i++;
void chdrs_to_tmphdrs (glue_msg_t *gMsg) {
size_t i = 0;
const char *name;
const void *val;
size_t size;
rd_kafka_headers_t *chdrs;
if (rd_kafka_message_headers(gMsg->msg, &chdrs)) {
gMsg->tmphdrs = NULL;
gMsg->tmphdrsCnt = 0;
return;
}
gMsg->tmphdrsCnt = rd_kafka_header_cnt(chdrs);
gMsg->tmphdrs = malloc(sizeof(*gMsg->tmphdrs) * gMsg->tmphdrsCnt);
while (!rd_kafka_header_get_all(chdrs, i,
&gMsg->tmphdrs[i].key,
&gMsg->tmphdrs[i].val,
(size_t *)&gMsg->tmphdrs[i].size))
i++;
}
#endif
rd_kafka_event_t *_rk_queue_poll (rd_kafka_queue_t *rkq, int timeoutMs,
rd_kafka_event_type_t *evtype,
fetched_c_msg_t *fcMsg,
glue_msg_t *gMsg,
rd_kafka_event_t *prev_rkev) {
rd_kafka_event_t *rkev;
Expand All @@ -56,31 +64,22 @@ rd_kafka_event_t *_rk_queue_poll (rd_kafka_queue_t *rkq, int timeoutMs,
*evtype = rd_kafka_event_type(rkev);
if (*evtype == RD_KAFKA_EVENT_FETCH) {
#ifdef RD_KAFKA_V_HEADERS
rd_kafka_headers_t *hdrs;
#endif
fcMsg->msg = (rd_kafka_message_t *)rd_kafka_event_message_next(rkev);
fcMsg->ts = rd_kafka_message_timestamp(fcMsg->msg, &fcMsg->tstype);
#ifdef RD_KAFKA_V_HEADERS
if (!rd_kafka_message_headers(fcMsg->msg, &hdrs)) {
fcMsg->tmphdrsCnt = rd_kafka_header_cnt(hdrs);
fcMsg->tmphdrs = malloc(sizeof(*fcMsg->tmphdrs) * fcMsg->tmphdrsCnt);
chdrs_to_tmphdrs(hdrs, fcMsg->tmphdrs);
} else {
#else
if (1) {
#endif
fcMsg->tmphdrs = NULL;
fcMsg->tmphdrsCnt = 0;
}
gMsg->msg = (rd_kafka_message_t *)rd_kafka_event_message_next(rkev);
gMsg->ts = rd_kafka_message_timestamp(gMsg->msg, &gMsg->tstype);
if (gMsg->want_hdrs)
chdrs_to_tmphdrs(gMsg);
}
return rkev;
}
*/
import "C"

func chdrsToTmphdrs(gMsg *C.glue_msg_t) {
C.chdrs_to_tmphdrs(gMsg)
}

// Event generic interface
type Event interface {
// String returns a human-readable representation of the event
Expand Down Expand Up @@ -164,8 +163,9 @@ func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, ter
out:
for evcnt := 0; evcnt < maxEvents; evcnt++ {
var evtype C.rd_kafka_event_type_t
var fcMsg C.fetched_c_msg_t
rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &fcMsg, prevRkev)
var gMsg C.glue_msg_t
gMsg.want_hdrs = C.int8_t(bool2cint(h.msgFields.Headers))
rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &gMsg, prevRkev)
prevRkev = rkev
timeoutMs = 0

Expand All @@ -174,8 +174,8 @@ out:
switch evtype {
case C.RD_KAFKA_EVENT_FETCH:
// Consumer fetch event, new message.
// Extracted into temporary fcMsg for optimization
retval = h.newMessageFromFcMsg(&fcMsg)
// Extracted into temporary gMsg for optimization
retval = h.newMessageFromGlueMsg(&gMsg)

case C.RD_KAFKA_EVENT_REBALANCE:
// Consumer rebalance event
Expand Down
12 changes: 7 additions & 5 deletions kafka/glue_rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ typedef struct tmphdr_s {


/**
* Represents a fetched C message, with all extra fields extracted
* to struct fields.
* @struct This is a glue struct used by the C code in this client to
* effectively map fields from a librdkafka rd_kafka_message_t
* to something usable in Go with as few CGo calls as possible.
*/
typedef struct fetched_c_msg {
typedef struct glue_msg_s {
rd_kafka_message_t *msg;
rd_kafka_timestamp_type_t tstype;
int64_t ts;
int64_t ts;
tmphdr_t *tmphdrs;
size_t tmphdrsCnt;
} fetched_c_msg_t;
int8_t want_hdrs; /**< If true, copy headers */
} glue_msg_t;
17 changes: 11 additions & 6 deletions kafka/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type handle struct {
// Forward delivery reports on Producer.Events channel
fwdDr bool

// Enabled fields for delivery reports
// Enabled message fields for delivery reports and consumed messages.
msgFields *messageFields

//
Expand Down Expand Up @@ -328,24 +328,27 @@ func (h *handle) setOAuthBearerTokenFailure(errstr string) error {
return newError(cErr)
}

// messageFields controls which fields are made available for producer delivery reports & incoming messages
// messageFields controls which fields are made available for producer delivery reports & consumed messages.
// true values indicate that the field should be included
type messageFields struct {
Key bool
Value bool
Key bool
Value bool
Headers bool
}

// disableAll disable all fields
func (mf *messageFields) disableAll() {
mf.Key = false
mf.Value = false
mf.Headers = false
}

// newMessageFields returns a new messageFields with all fields enabled
func newMessageFields() *messageFields {
return &messageFields{
Key: true,
Value: true,
Key: true,
Value: true,
Headers: true,
}
}

Expand All @@ -365,6 +368,8 @@ func newMessageFieldsFrom(v ConfigValue) (*messageFields, error) {
msgFields.Key = true
case "value":
msgFields.Value = true
case "headers":
msgFields.Headers = true
default:
return nil, fmt.Errorf("unknown message field: %s", value)
}
Expand Down
50 changes: 33 additions & 17 deletions kafka/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,30 +100,37 @@ func (h *handle) getRktFromMessage(msg *Message) (crkt *C.rd_kafka_topic_t) {
return h.getRkt(*msg.TopicPartition.Topic)
}

func (h *handle) newMessageFromFcMsg(fcMsg *C.fetched_c_msg_t) (msg *Message) {
// setupHeadersFromGlueMsg converts the C tmp headers in gMsg to
// Go Headers in msg.
// gMsg.tmphdrs will be freed.
func setupHeadersFromGlueMsg(msg *Message, gMsg *C.glue_msg_t) {
msg.Headers = make([]Header, gMsg.tmphdrsCnt)
for n := range msg.Headers {
tmphdr := (*[1 << 30]C.tmphdr_t)(unsafe.Pointer(gMsg.tmphdrs))[n]
msg.Headers[n].Key = C.GoString(tmphdr.key)
if tmphdr.val != nil {
msg.Headers[n].Value = C.GoBytes(unsafe.Pointer(tmphdr.val), C.int(tmphdr.size))
} else {
msg.Headers[n].Value = nil
}
}
C.free(unsafe.Pointer(gMsg.tmphdrs))
}

func (h *handle) newMessageFromGlueMsg(gMsg *C.glue_msg_t) (msg *Message) {
msg = &Message{}

if fcMsg.ts != -1 {
ts := int64(fcMsg.ts)
msg.TimestampType = TimestampType(fcMsg.tstype)
if gMsg.ts != -1 {
ts := int64(gMsg.ts)
msg.TimestampType = TimestampType(gMsg.tstype)
msg.Timestamp = time.Unix(ts/1000, (ts%1000)*1000000)
}

if fcMsg.tmphdrsCnt > 0 {
msg.Headers = make([]Header, fcMsg.tmphdrsCnt)
for n := range msg.Headers {
tmphdr := (*[1 << 30]C.tmphdr_t)(unsafe.Pointer(fcMsg.tmphdrs))[n]
msg.Headers[n].Key = C.GoString(tmphdr.key)
if tmphdr.val != nil {
msg.Headers[n].Value = C.GoBytes(unsafe.Pointer(tmphdr.val), C.int(tmphdr.size))
} else {
msg.Headers[n].Value = nil
}
}
C.free(unsafe.Pointer(fcMsg.tmphdrs))
if gMsg.tmphdrsCnt > 0 {
setupHeadersFromGlueMsg(msg, gMsg)
}

h.setupMessageFromC(msg, fcMsg.msg)
h.setupMessageFromC(msg, gMsg.msg)

return msg
}
Expand All @@ -141,6 +148,15 @@ func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
if cmsg.key != nil && h.msgFields.Key {
msg.Key = C.GoBytes(unsafe.Pointer(cmsg.key), C.int(cmsg.key_len))
}
if h.msgFields.Headers {
var gMsg C.glue_msg_t
gMsg.msg = cmsg
gMsg.want_hdrs = C.int8_t(1)
chdrsToTmphdrs(&gMsg)
if gMsg.tmphdrsCnt > 0 {
setupHeadersFromGlueMsg(msg, &gMsg)
}
}
msg.TopicPartition.Offset = Offset(cmsg.offset)
if cmsg.err != 0 {
msg.TopicPartition.Error = newError(cmsg.err)
Expand Down
12 changes: 6 additions & 6 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,15 @@ func (p *Producer) Purge(flags int) error {
//
// conf is a *ConfigMap with standard librdkafka configuration properties.
//
// Supported special configuration properties:
// Supported special configuration properties (type, default):
// go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance).
// These batches do not relate to Kafka message batches in any way.
// Note: timestamps and headers are not supported with this interface.
// go.delivery.reports (bool, true) - Forward per-message delivery reports to the
// Events() channel.
// go.delivery.report.fields (string, all) - Comma separated list of fields to enable for delivery reports.
// Allowed values: all, none (or empty string), key, value
// go.delivery.report.fields (string, "key,value") - Comma separated list of fields to enable for delivery reports.
// Allowed values: all, none (or empty string), key, value, headers
// Warning: There is a performance penalty to include headers in the delivery report.
// go.events.channel.size (int, 1000000) - Events().
// go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
// go.logs.channel.enable (bool, false) - Forward log to Logs() channel.
Expand Down Expand Up @@ -463,16 +464,15 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
}
p.handle.fwdDr = v.(bool)

v, err = confCopy.extract("go.delivery.report.fields", "all")
v, err = confCopy.extract("go.delivery.report.fields", "key,value")
if err != nil {
return nil, err
}

msgFields, err := newMessageFieldsFrom(v)
p.handle.msgFields, err = newMessageFieldsFrom(v)
if err != nil {
return nil, err
}
p.handle.msgFields = msgFields

v, err = confCopy.extract("go.events.channel.size", 1000000)
if err != nil {
Expand Down
Loading

0 comments on commit 0c8f5b8

Please sign in to comment.