Skip to content

Commit

Permalink
Merge pull request kafkaex#24 from jfmyers9/jmyers/incomplete-record
Browse files Browse the repository at this point in the history
  • Loading branch information
dantswain authored Nov 17, 2023
2 parents fc41266 + e8be52d commit ff40891
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 7 deletions.
25 changes: 18 additions & 7 deletions lib/kayrock/record_batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ defmodule Kayrock.RecordBatch do
defp deserialize(rest, acc, batch_offset, batch_length, partition_leader_epoch) do
# we already parsed off 5 bytes in get_magic_byte
real_size = batch_length - 5

deserialize(real_size, rest, acc, batch_offset, batch_length, partition_leader_epoch)
end

# If the expected size to fetch is less than the rest of the body, we have
# fetched an incomplete record. Cowardly refuse to parse this record.
defp deserialize(real_size, rest, acc, _, _, _) when real_size > byte_size(rest) do
Enum.reverse(acc)
end

defp deserialize(real_size, rest, acc, batch_offset, batch_length, partition_leader_epoch) do
<<batch_data::size(real_size)-binary, body_rest::binary>> = rest

<<crc::32-signed, attributes::16-signed, last_offset_delta::32-signed,
Expand Down Expand Up @@ -236,15 +247,12 @@ defmodule Kayrock.RecordBatch do

acc = [record_batch | acc]

case body_rest do
"" ->
Enum.reverse(acc)
case get_magic_byte(body_rest) do
{2, batch_offset, batch_length, partition_leader_epoch, new_rest} ->
deserialize(new_rest, acc, batch_offset, batch_length, partition_leader_epoch)

_ ->
{2, batch_offset, batch_length, partition_leader_epoch, new_rest} =
get_magic_byte(body_rest)

deserialize(new_rest, acc, batch_offset, batch_length, partition_leader_epoch)
Enum.reverse(acc)
end
end

Expand Down Expand Up @@ -346,6 +354,9 @@ defmodule Kayrock.RecordBatch do
# message_size: int32
# first_message crc: int32
# first_message magic: int8
# Return early if we do not have a complete 17 bytes to parse from the record
defp get_magic_byte(msg_set_data) when byte_size(msg_set_data) < 17, do: nil

defp get_magic_byte(msg_set_data) do
<<first_offset::64-signed, batch_length_or_message_size::32-signed,
partition_leader_epoch_or_first_crc::32-signed, magic::8-signed, rest::bits>> = msg_set_data
Expand Down
116 changes: 116 additions & 0 deletions test/kayrock/fetch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,122 @@ defmodule Kayrock.FetchTest do
assert got == expect
end

test "test deserializing a record batch with an incomplete record" do
data =
<<0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 1, 0, 4, 102, 111, 111, 100, 0, 0, 0, 1, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 231, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0,
0, 255, 255, 255, 255, 0, 0, 1, 34, 0, 0, 0, 0, 0, 0, 0, 228, 0, 0, 0, 78, 0, 0, 0, 33, 2,
106, 8, 42, 102, 0, 2, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 0, 0, 0, 1, 27, 104, 52, 0, 0, 0, 0, 40, 75, 81, 76, 84, 67, 69, 67, 88, 68, 81, 66,
82, 81, 67, 89, 73, 65, 68, 84, 79, 0, 0, 0, 0, 0, 0, 0, 0, 229, 0, 0, 0, 76, 0, 0, 0, 33,
2, 232, 120, 45, 102, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 0, 0, 0, 1, 52, 0, 0, 0, 0, 40, 84, 87, 73, 73, 83, 71, 73, 73, 69, 72, 72, 72,
79, 88, 70, 79, 74, 73, 86, 79, 0, 0, 0, 0, 0, 0, 0, 0, 230, 0, 0, 0, 98, 0, 0, 0, 33, 2,
35, 151, 146, 115, 0, 2, 0, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 0, 0, 0, 1, 130, 83, 78, 65, 80, 80, 89, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 29,
27, 104, 52, 0, 0, 0, 0, 40, 72, 67, 85, 80, 72, 73, 85, 78, 75, 84, 67, 86, 82, 80, 66,
86, 70, 77, 73, 82, 0, 0, 0>>

expect = %Kayrock.Fetch.V5.Response{
correlation_id: 4,
responses: [
%{
partition_responses: [
%{
partition_header: %{
aborted_transactions: [],
error_code: 0,
high_watermark: 231,
last_stable_offset: -1,
log_start_offset: 0,
partition: 0
},
record_set: [
%Kayrock.RecordBatch{
attributes: 2,
base_sequence: -1,
batch_length: 78,
batch_offset: 228,
crc: 1_778_920_038,
first_timestamp: -1,
last_offset_delta: 0,
max_timestamp: -1,
partition_leader_epoch: 33,
producer_epoch: -1,
producer_id: -1,
records: [
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: "",
offset: 228,
timestamp: -1,
value: "KQLTCECXDQBRQCYIADTO"
}
]
},
%Kayrock.RecordBatch{
attributes: 0,
base_sequence: -1,
batch_length: 76,
batch_offset: 229,
crc: -394_777_242,
first_timestamp: -1,
last_offset_delta: 0,
max_timestamp: -1,
partition_leader_epoch: 33,
producer_epoch: -1,
producer_id: -1,
records: [
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: "",
offset: 229,
timestamp: -1,
value: "TWIISGIIEHHHOXFOJIVO"
}
]
},
%Kayrock.RecordBatch{
attributes: 2,
base_sequence: -1,
batch_length: 98,
batch_offset: 230,
crc: 597_135_987,
first_timestamp: -1,
last_offset_delta: 0,
max_timestamp: -1,
partition_leader_epoch: 33,
producer_epoch: -1,
producer_id: -1,
records: [
%Kayrock.RecordBatch.Record{
attributes: 0,
headers: [],
key: "",
offset: 230,
timestamp: -1,
value: "HCUPHIUNKTCVRPBVFMIR"
}
]
}
]
}
],
topic: "food"
}
],
throttle_time_ms: 0
}

{got, ""} = Kayrock.Fetch.V5.Response.deserialize(data)
assert got == expect
end

test "correctly handle timestamps for LogAppend" do
data =
<<0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 1, 0, 25, 116, 101, 115, 116, 95, 108, 111, 103, 95, 97,
Expand Down

0 comments on commit ff40891

Please sign in to comment.