Skip to content

Commit

Permalink
Adding original message size to the large message header value (#172)
Browse files Browse the repository at this point in the history
LargeMessageHeaderValue is missing messageSizeInBytes, which indicates the size of the original message. This is useful to validate the segments during assembling on the consumer side.
  • Loading branch information
Navina Ramesh authored May 4, 2020
1 parent 8443a98 commit 841dfc8
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testLargeMessage() throws Exception {
LargeMessageHeaderValue largeMessageHeaderValue = LargeMessageHeaderValue.fromBytes(headers.get(Constants.LARGE_MESSAGE_HEADER));
assertEquals(largeMessageHeaderValue.getSegmentNumber(), -1);
assertEquals(largeMessageHeaderValue.getNumberOfSegments(), 6);
assertEquals(largeMessageHeaderValue.getType(), LargeMessageHeaderValue.LEGACY);
assertEquals(largeMessageHeaderValue.getType(), LargeMessageHeaderValue.LEGACY_V2);

String messageId = consumerRecord.value().substring(0, 32);
String origMessage = messages.get(messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,46 @@

/**
* This class represents the header value for a large message.
* Every large message header takes up 25 bytes and is structured as follows
* Every large message header takes up 29 bytes and is structured as follows
*
* | Type | UUID | segmentNumber | numberOfSegments |
* | 1 byte | 16 bytes | 4 bytes | 4 bytes |
* | Type | UUID | segmentNumber | numberOfSegments | messageSizeInBytes |
* | 1 byte | 16 bytes | 4 bytes | 4 bytes | 4 bytes |
*
* The Large message header values will be used to support large messages eventually.
* (as opposed to encoding large segment metadata info inside the payload)
*/
public class LargeMessageHeaderValue {
public static final UUID EMPTY_UUID = new UUID(0L, 0L);
public static final int INVALID_SEGMENT_ID = -1;
public static final int INVALID_MESSAGE_SIZE = -1;

private static final int LEGACY_HEADER_SIZE = 1 + PrimitiveEncoderDecoder.LONG_SIZE +
PrimitiveEncoderDecoder.LONG_SIZE + PrimitiveEncoderDecoder.INT_SIZE + PrimitiveEncoderDecoder.INT_SIZE;
// new field added in LEGACY_V2 - messageSizeInBytes
private static final int LEGACY_V2_HEADER_SIZE = LEGACY_HEADER_SIZE + PrimitiveEncoderDecoder.INT_SIZE;
private final byte _type;
private final UUID _uuid;
private final int _segmentNumber;
private final int _numberOfSegments;
private final int _messageSizeInBytes;

// This indicates that the large message framework is using
// SegmentSerializer/SegmentDeserializer interface to split
// and assemble large message segments.
public static final byte LEGACY = (byte) 0;
// Added new field - messageSizeInBytes to the header value
public static final byte LEGACY_V2 = (byte) 1;

public LargeMessageHeaderValue(byte type, UUID uuid, int segmentNumber, int numberOfSegments) {
public LargeMessageHeaderValue(byte type, UUID uuid, int segmentNumber, int numberOfSegments, int messageSizeInBytes) {
_type = type;
_uuid = uuid;
_segmentNumber = segmentNumber;
_numberOfSegments = numberOfSegments;
_messageSizeInBytes = messageSizeInBytes;
}

public int getMessageSizeInBytes() {
return _messageSizeInBytes;
}

public int getSegmentNumber() {
Expand All @@ -55,7 +69,9 @@ public byte getType() {
}

public static byte[] toBytes(LargeMessageHeaderValue largeMessageHeaderValue) {
byte[] serialized = new byte[25];
byte[] serialized = largeMessageHeaderValue.getType() == LEGACY ? new byte[LEGACY_HEADER_SIZE]
: new byte[LEGACY_V2_HEADER_SIZE];

int byteOffset = 0;
serialized[byteOffset] = largeMessageHeaderValue.getType();
byteOffset += 1; // for type
Expand All @@ -66,6 +82,10 @@ public static byte[] toBytes(LargeMessageHeaderValue largeMessageHeaderValue) {
PrimitiveEncoderDecoder.encodeInt(largeMessageHeaderValue.getSegmentNumber(), serialized, byteOffset);
byteOffset += PrimitiveEncoderDecoder.INT_SIZE; // for segment number
PrimitiveEncoderDecoder.encodeInt(largeMessageHeaderValue.getNumberOfSegments(), serialized, byteOffset);
if (largeMessageHeaderValue.getType() == LEGACY_V2) {
byteOffset += PrimitiveEncoderDecoder.INT_SIZE; // for message size
PrimitiveEncoderDecoder.encodeInt(largeMessageHeaderValue.getMessageSizeInBytes(), serialized, byteOffset);
}
return serialized;
}

Expand All @@ -81,6 +101,11 @@ public static LargeMessageHeaderValue fromBytes(byte[] bytes) {
int segmentNumber = PrimitiveEncoderDecoder.decodeInt(bytes, byteOffset);
byteOffset += PrimitiveEncoderDecoder.INT_SIZE;
int numberOfSegments = PrimitiveEncoderDecoder.decodeInt(bytes, byteOffset);
return new LargeMessageHeaderValue(type, new UUID(mostSignificantBits, leastSignificantBits), segmentNumber, numberOfSegments);
if (bytes.length == LEGACY_V2_HEADER_SIZE) {
byteOffset += PrimitiveEncoderDecoder.INT_SIZE;
int messageSizeInBytes = PrimitiveEncoderDecoder.decodeInt(bytes, byteOffset);
return new LargeMessageHeaderValue(type, new UUID(mostSignificantBits, leastSignificantBits), segmentNumber, numberOfSegments, messageSizeInBytes);
}
return new LargeMessageHeaderValue(type, new UUID(mostSignificantBits, leastSignificantBits), segmentNumber, numberOfSegments, INVALID_MESSAGE_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,9 @@ private ConsumerRecord<K, V> handleConsumerRecord(ConsumerRecord<byte[], byte[]>
largeMessageHeaderValue.getType(),
LargeMessageHeaderValue.EMPTY_UUID,
LargeMessageHeaderValue.INVALID_SEGMENT_ID,
largeMessageHeaderValue.getNumberOfSegments()
largeMessageHeaderValue.getNumberOfSegments(),
largeMessageHeaderValue.getType() == LargeMessageHeaderValue.LEGACY ?
LargeMessageHeaderValue.INVALID_MESSAGE_SIZE : largeMessageHeaderValue.getMessageSizeInBytes()
);
headers.add(Constants.LARGE_MESSAGE_HEADER, LargeMessageHeaderValue.toBytes(largeMessageHeaderValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ public List<ProducerRecord<byte[], byte[]>> split(String topic,
Headers temporaryHeaders = new RecordHeaders(headers);
temporaryHeaders.remove(Constants.LARGE_MESSAGE_HEADER);
LargeMessageHeaderValue largeMessageHeaderValue = new LargeMessageHeaderValue(
LargeMessageHeaderValue.LEGACY,
LargeMessageHeaderValue.LEGACY_V2,
messageId,
seq,
numberOfSegments
);
numberOfSegments,
messageSizeInBytes);
temporaryHeaders.add(Constants.LARGE_MESSAGE_HEADER, LargeMessageHeaderValue.toBytes(largeMessageHeaderValue));
ProducerRecord<byte[], byte[]> segmentProducerRecord =
new ProducerRecord<>(
Expand Down

0 comments on commit 841dfc8

Please sign in to comment.