diff --git a/src/kafka.c b/src/kafka.c index ee422c0..c4213dd 100644 --- a/src/kafka.c +++ b/src/kafka.c @@ -491,31 +491,60 @@ void kafka_producer_produce(Producer p, Message msg) { char *buf = (char *) message_get_data(msg); + rd_kafka_headers_t *hdrs = (rd_kafka_headers_t *) message_get_headers(msg); size_t len = message_get_len(msg); rd_kafka_t *rk = ((Meta) p->meta)->rk; rd_kafka_topic_t *rkt = ((Meta)p->meta)->rkt; retry: - if (rd_kafka_produce( - rkt, - RD_KAFKA_PARTITION_UA, - RD_KAFKA_MSG_F_COPY, - buf, len, - NULL, 0, + if (hdrs) { + rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs); + if(rd_kafka_producev( + rk, + RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(RD_KAFKA_PARTITION_UA), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_VALUE(buf, len), + RD_KAFKA_V_HEADERS(hdrs_copy), NULL) == -1) - { - if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) { - rd_kafka_poll(rk, 10*1000); - goto retry; + if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) + { + rd_kafka_poll(rk, 10*1000); + goto retry; + } + else + { + logger_log( + "%s %d Failed to produce to topic %s: %s\n", + __FILE__, __LINE__, + rd_kafka_topic_name(rkt), + rd_kafka_err2str(rd_kafka_last_error()) + ); + } } - else + } else { + if (rd_kafka_produce( + rkt, + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, + buf, len, + NULL, 0, + NULL) == -1) { - logger_log( - "%s %d Failed to produce to topic %s: %s\n", - __FILE__, __LINE__, - rd_kafka_topic_name(rkt), - rd_kafka_err2str(rd_kafka_last_error()) - ); + if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) + { + rd_kafka_poll(rk, 10*1000); + goto retry; + } + else + { + logger_log( + "%s %d Failed to produce to topic %s: %s\n", + __FILE__, __LINE__, + rd_kafka_topic_name(rkt), + rd_kafka_err2str(rd_kafka_last_error()) + ); + } } } rd_kafka_poll(rk, 0); @@ -595,6 +624,7 @@ kafka_simple_consumer_consume(Consumer c, Message msg) { rd_kafka_queue_t *rkqu = ((Meta) c->meta)->rkqu; rd_kafka_message_t *rkmessage; + rd_kafka_headers_t *hdrs; rkmessage = rd_kafka_consume_queue(rkqu, 10000); @@ -621,6 +651,15 @@ kafka_simple_consumer_consume(Consumer c, Message msg) return 0; } + // Here we don't detach the headers so the memory gets cleared when + // the message is destroyed. + if (!rd_kafka_message_headers(rkmessage, &hdrs)) { + if (hdrs){ + rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs); + message_set_headers(msg, hdrs_copy); + } + } + char *cpy = SCALLOC((int)rkmessage->len + 1, sizeof(*cpy)); memcpy(cpy, (char *)rkmessage->payload, (size_t)rkmessage->len); message_set_data(msg, cpy); @@ -635,6 +674,7 @@ kafka_transactional_consumer_consume(Consumer c, Message msg) { rd_kafka_t *rk = ((Meta) c->meta)->rk; rd_kafka_message_t *rkmessage; + rd_kafka_headers_t *hdrs; rkmessage = rd_kafka_consumer_poll(rk, 10000); @@ -661,6 +701,15 @@ kafka_transactional_consumer_consume(Consumer c, Message msg) return 0; } + // Here we don't detach the headers so the memory gets cleared when + // the message is destroyed. + if (!rd_kafka_message_headers(rkmessage, &hdrs)) { + if (hdrs){ + rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs); + message_set_headers(msg, hdrs_copy); + } + } + message_set_data(msg, rkmessage->payload); message_set_len(msg, (size_t)rkmessage->len); @@ -702,6 +751,7 @@ kafka_consumer_consume(Consumer c, Message msg) { rd_kafka_t *rk = ((Meta) c->meta)->rk; rd_kafka_message_t *rkmessage; + rd_kafka_headers_t *hdrs; rkmessage = rd_kafka_consumer_poll(rk, 10000); @@ -728,6 +778,15 @@ kafka_consumer_consume(Consumer c, Message msg) return 0; } + // Here we don't detach the headers so the memory gets cleared when + // the message is destroyed. + if (!rd_kafka_message_headers(rkmessage, &hdrs)) { + if (hdrs){ + rd_kafka_headers_t *hdrs_copy = rd_kafka_headers_copy(hdrs); + message_set_headers(msg, hdrs_copy); + } + } + char *cpy = SCALLOC((int)rkmessage->len + 1, sizeof(*cpy)); memcpy(cpy, (char *)rkmessage->payload, (size_t)rkmessage->len); message_set_data(msg, cpy); diff --git a/src/main.c b/src/main.c index 3f42668..5eba577 100644 --- a/src/main.c +++ b/src/main.c @@ -144,7 +144,7 @@ consume(void *config) continue; //todo: check result of queue_add() - queue_add(q, message_get_data(msg), message_get_len(msg), + queue_add(q, message_get_data(msg), message_get_len(msg), message_get_headers(msg), message_get_xmark(msg),message_get_metadata(msg)); //give up ownership message_set_data(msg, NULL); @@ -201,7 +201,9 @@ produce(void *config) metadata_callback_run(m,msg); //message was handled: free it free(message_get_data(msg)); + free(message_get_headers(msg)); message_set_data(msg, NULL); + message_set_headers(msg, NULL); metadata_free(message_get_metadata(msg)); } } diff --git a/src/queue.c b/src/queue.c index b3faad1..0db5653 100644 --- a/src/queue.c +++ b/src/queue.c @@ -12,6 +12,7 @@ typedef struct Message { void *data; size_t datalen; + void *headers; int64_t xmark; Metadata metadata; } *Message; @@ -68,6 +69,22 @@ message_get_len(Message msg) return msg->datalen; } +void +message_set_headers(Message msg, void *headers) +{ + if (msg) + msg->headers = headers; +} + +void * +message_get_headers(Message msg) +{ + if (msg == NULL){ + return NULL; + } + return msg->headers; +} + int64_t message_get_xmark(Message msg) { @@ -237,7 +254,7 @@ _xmark_find(Queue q, uint32_t mark) } int -queue_add(Queue q, void *data, size_t datalen, int64_t xmark, Metadata *md) +queue_add(Queue q, void *data, size_t datalen, void *headers, int64_t xmark, Metadata *md) { MessageList newmsg; /* We can afford to allocate the message before @@ -252,6 +269,7 @@ queue_add(Queue q, void *data, size_t datalen, int64_t xmark, Metadata *md) } newmsg->msg->datalen = datalen; newmsg->msg->data = data; + newmsg->msg->headers = headers; newmsg->msg->xmark = xmark; newmsg->msg->metadata = *md; newmsg->next = NULL; @@ -414,6 +432,7 @@ queue_get(Queue q, Message msg) msg->data = firstrec->msg->data; msg->datalen = firstrec->msg->datalen; msg->metadata = firstrec->msg->metadata; + msg->headers = firstrec->msg->headers; /* this line can cause an unfinishable queue * consumers do not need xmark anylonger diff --git a/src/queue.h b/src/queue.h index f0744b6..4d065df 100644 --- a/src/queue.h +++ b/src/queue.h @@ -16,15 +16,17 @@ void message_set_metadata(Message msg, Metadata md); void *message_get_data(Message msg); void message_set_data(Message msg, void *data); size_t message_get_len(Message msg); +void message_set_len(Message msg, size_t len); +void *message_get_headers(Message msg); +void message_set_headers(Message msg, void *data); int64_t message_get_xmark(Message msg); void message_set_xmark(Message msg, int64_t xmark); -void message_set_len(Message msg, size_t len); void message_free(Message *msg); typedef struct Queue *Queue; Queue queue_init(config_setting_t *config); -int queue_add(Queue q, void *data, size_t datalen, int64_t msgtype, Metadata *md); +int queue_add(Queue q, void *data, size_t datalen, void *headers, int64_t msgtype, Metadata *md); int queue_get(Queue q, Message msg); long queue_length(Queue q); long queue_added(Queue q); diff --git a/t/queue_test.c b/t/queue_test.c index 7483728..d1bdabf 100644 --- a/t/queue_test.c +++ b/t/queue_test.c @@ -21,13 +21,13 @@ main(void) Metadata *md = message_get_metadata(msg); message_set_xmark(msg,1); char *data = "moep"; - queue_add(q, data, strlen(data), 1, md); + queue_add(q, data, strlen(data), NULL, 1, md); Message msg2 = message_init(); md = message_get_metadata(msg2); message_set_xmark(msg2,65535); char *data2 = "huuuurz"; - queue_add(q, data2, strlen(data2), 65535, md); + queue_add(q, data2, strlen(data2), NULL, 65535, md); queue_get(q, msg2); queue_get(q, msg);