Skip to content

Commit

Permalink
Add special header accessors under li-apache-kafka-clients (#175)
Browse files Browse the repository at this point in the history
* Add special header accessors under li-apache-kafka-clients

Additional methods provide easier way to access special header.

Co-authored-by: Abhishek Mendhekar <[email protected]>
  • Loading branch information
Abhishek Mendhekar and Abhishek Mendhekar authored Jun 10, 2020
1 parent 4cd8eed commit 157ed93
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import com.linkedin.kafka.clients.common.LargeMessageHeaderValue;
import com.linkedin.kafka.clients.consumer.LiKafkaConsumer;
import com.linkedin.kafka.clients.producer.LiKafkaProducer;
import com.linkedin.kafka.clients.utils.Constants;
import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils;
import java.util.Collections;
Expand Down Expand Up @@ -38,9 +35,7 @@
import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.LARGE_MESSAGE_SEGMENT_WRAPPING_REQUIRED_CONFIG;
import static com.linkedin.kafka.clients.producer.LiKafkaProducerConfig.MAX_MESSAGE_SEGMENT_BYTES_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.*;


/**
Expand Down Expand Up @@ -162,13 +157,10 @@ public void testLargeMessage() throws Exception {
}
for (ConsumerRecord<String, String> consumerRecord : records) {
// Verify headers
Map<String, byte[]> headers = LiKafkaClientsTestUtils.fetchSpecialHeaders(consumerRecord.headers());
assertTrue(headers.containsKey(Constants.TIMESTAMP_HEADER));
assertEquals(PrimitiveEncoderDecoder.LONG_SIZE, headers.get(Constants.TIMESTAMP_HEADER).length);
long eventTimestamp = PrimitiveEncoderDecoder.decodeLong(headers.get(Constants.TIMESTAMP_HEADER), 0);
Long eventTimestamp = LiKafkaClientsUtils.fetchTimestampHeader(consumerRecord.headers());
assertNotNull(eventTimestamp);
assertTrue(eventTimestamp >= startTime && eventTimestamp <= System.currentTimeMillis());
assertTrue(headers.containsKey(Constants.LARGE_MESSAGE_HEADER));
LargeMessageHeaderValue largeMessageHeaderValue = LargeMessageHeaderValue.fromBytes(headers.get(Constants.LARGE_MESSAGE_HEADER));
LargeMessageHeaderValue largeMessageHeaderValue = LiKafkaClientsUtils.fetchLargeMessageHeader(consumerRecord.headers());
assertEquals(largeMessageHeaderValue.getSegmentNumber(), -1);
assertEquals(largeMessageHeaderValue.getNumberOfSegments(), 6);
assertEquals(largeMessageHeaderValue.getType(), LargeMessageHeaderValue.LEGACY_V2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.kafka.clients.consumer.LiKafkaConsumer;
import com.linkedin.kafka.clients.largemessage.errors.SkippableException;
import com.linkedin.kafka.clients.utils.Constants;
import com.linkedin.kafka.clients.utils.LiKafkaClientsTestUtils;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.PrimitiveEncoderDecoder;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import java.time.Duration;
Expand Down Expand Up @@ -89,7 +89,7 @@ public void testSend() throws Exception {
while (messageCount < RECORD_COUNT && System.currentTimeMillis() < startMs + 30000) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, byte[]> headers = LiKafkaClientsTestUtils.fetchSpecialHeaders(record.headers());
Map<String, byte[]> headers = LiKafkaClientsUtils.fetchSpecialHeaders(record.headers());
assertTrue(headers.containsKey(Constants.TIMESTAMP_HEADER));
long eventTimestamp = PrimitiveEncoderDecoder.decodeLong(headers.get(Constants.TIMESTAMP_HEADER), 0);
assertTrue(eventTimestamp >= startTime && eventTimestamp <= System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -56,26 +52,4 @@ public static String getRandomString(int length) {
}
return stringBuiler.toString();
}

/**
* Special header keys have a "_" prefix and are managed internally by the clients.
* @param headers
* @return
*/
public static Map<String, byte[]> fetchSpecialHeaders(Headers headers) {
Map<String, byte[]> map = new HashMap<>();
for (Header header : headers) {

if (!header.key().startsWith("_")) {
// skip any non special header
continue;
}

if (map.containsKey(header.key())) {
throw new IllegalStateException("Duplicate special header found " + header.key());
}
map.put(header.key(), header.value());
}
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package com.linkedin.kafka.clients.utils;

import com.linkedin.kafka.clients.common.LargeMessageHeaderValue;
import com.linkedin.mario.common.versioning.VersioningUtils;
import java.nio.ByteBuffer;
import java.security.NoSuchAlgorithmException;
Expand All @@ -22,6 +23,8 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -205,4 +208,50 @@ private static String fishForClientId(Map<MetricName, ? extends Metric> metrics)
}
return candidates.iterator().next();
}

/**
* Special header keys have a "_" prefix and are managed internally by the clients.
* @param headers
* @return
*/
public static Map<String, byte[]> fetchSpecialHeaders(Headers headers) {
Map<String, byte[]> map = new HashMap<>();
for (Header header : headers) {

if (!header.key().startsWith("_")) {
// skip any non special header
continue;
}

if (map.containsKey(header.key())) {
throw new IllegalStateException("Duplicate special header found " + header.key());
}
map.put(header.key(), header.value());
}
return map;
}

/**
* Fetch value of special timestamp header (_t)
* @param headers ConsumerRecord headers
* @return Returns null if _t does not exist otherwise returns the long value
*/
public static Long fetchTimestampHeader(Headers headers) {
Map<String, byte[]> specialHeaders = fetchSpecialHeaders(headers);
return specialHeaders.containsKey(Constants.TIMESTAMP_HEADER)
? PrimitiveEncoderDecoder.decodeLong(specialHeaders.get(Constants.TIMESTAMP_HEADER), 0)
: null;
}

/**
* Fetch value of special large message header (_lm)
* @param headers ConsumerRecord headers
* @return Returns null if _lm does not exist otherwise returns the long value
*/
public static LargeMessageHeaderValue fetchLargeMessageHeader(Headers headers) {
Map<String, byte[]> specialHeaders = fetchSpecialHeaders(headers);
return specialHeaders.containsKey(Constants.LARGE_MESSAGE_HEADER)
? LargeMessageHeaderValue.fromBytes(specialHeaders.get(Constants.LARGE_MESSAGE_HEADER))
: null;
}
}

0 comments on commit 157ed93

Please sign in to comment.