From 4aef47af2d77f07cfd15260c1bee738d3c838356 Mon Sep 17 00:00:00 2001 From: Argonus Date: Tue, 23 Jan 2024 12:05:11 +0100 Subject: [PATCH] Fix produce & deserialize s --- lib/kayrock/message_set.ex | 20 +++++++++++++++++++- lib/kayrock/record_batch.ex | 20 +++++++++----------- test/kayrock/message_serde_test.exs | 24 ++++++++++++------------ 3 files changed, 40 insertions(+), 24 deletions(-) diff --git a/lib/kayrock/message_set.ex b/lib/kayrock/message_set.ex index 7e0aadf..3ecb4cd 100644 --- a/lib/kayrock/message_set.ex +++ b/lib/kayrock/message_set.ex @@ -84,7 +84,10 @@ defmodule Kayrock.MessageSet do decompressed = Kayrock.Compression.decompress(c, value) if magic == 1 do - Enum.reverse(do_deserialize(decompressed, [], offset)) + decompressed + |> do_deserialize([], 0) + |> correct_offsets(offset) + |> Enum.reverse() else Enum.reverse(do_deserialize(decompressed, [], 0)) end @@ -97,6 +100,21 @@ defmodule Kayrock.MessageSet do Enum.reverse(List.flatten(acc)) end + # All other cases, compressed inner messages should have relative offset, with below attributes: + # - The container message should have the 'real' offset + # - The container message's offset should be the 'real' offset of the last message in the compressed batch + defp correct_offsets(messages, real_offset) do + max_relative_offset = messages |> List.last() |> Map.fetch!(:offset) + + if max_relative_offset == real_offset do + messages + else + Enum.map(messages, fn msg -> + Map.update!(msg, :offset, &(&1 + real_offset)) + end) + end + end + defp create_message_set([], _compression_type), do: {"", 0} defp create_message_set(messages, :none) do diff --git a/lib/kayrock/record_batch.ex b/lib/kayrock/record_batch.ex index ea6d8aa..7ebe8d3 100644 --- a/lib/kayrock/record_batch.ex +++ b/lib/kayrock/record_batch.ex @@ -100,7 +100,6 @@ defmodule Kayrock.RecordBatch do @spec serialize(t) :: iodata def serialize(%__MODULE__{} = record_batch) do [first_record | _] = record_batch.records - num_records = length(record_batch.records) max_timestamp = @@ -108,15 +107,16 @@ defmodule Kayrock.RecordBatch do |> Enum.map(fn r -> r.timestamp end) |> Enum.max() - base_offset = first_record.offset base_timestamp = first_record.timestamp records = - for record <- record_batch.records do + record_batch.records + |> Enum.with_index() + |> Enum.map(fn {record, offset_delta} -> record - |> normalize_record(base_offset, base_timestamp) - |> serialize_record - end + |> normalize_record(offset_delta, base_timestamp) + |> serialize_record() + end) records = case compression_type(record_batch.attributes) do @@ -163,11 +163,9 @@ defmodule Kayrock.RecordBatch do nil end - def deserialize(msg_set_size, msg_set_data) - when byte_size(msg_set_data) == msg_set_size do + def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do case get_magic_byte(msg_set_data) do {2, batch_offset, batch_length, partition_leader_epoch, rest} -> - <<>> deserialize(rest, [], batch_offset, batch_length, partition_leader_epoch) {magic, _, _, _, _} -> @@ -405,11 +403,11 @@ defmodule Kayrock.RecordBatch do [encode_varint(IO.iodata_length(without_length)), without_length] end - defp normalize_record(record, base_offset, base_timestamp) do + defp normalize_record(record, offset_delta, base_timestamp) do %{ record | timestamp: maybe_delta(record.timestamp, base_timestamp), - offset: maybe_delta(record.offset, base_offset) + offset: offset_delta } end diff --git a/test/kayrock/message_serde_test.exs b/test/kayrock/message_serde_test.exs index 05ac249..a663954 100644 --- a/test/kayrock/message_serde_test.exs +++ b/test/kayrock/message_serde_test.exs @@ -173,7 +173,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 1, timestamp: -1, value: "bar" }, @@ -181,7 +181,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 2, timestamp: -1, value: "baz" } @@ -189,11 +189,11 @@ defmodule Kayrock.MessageSerdeTest do } expect = - <<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168, - 31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + <<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91, + 168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, - 0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0, - 0, 0, 1, 6, 98, 97, 122, 0>> + 0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114, + 0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>> got = IO.iodata_to_binary(RecordBatch.serialize(record_batch)) assert got == expect, compare_binaries(got, expect) @@ -227,7 +227,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 1, timestamp: -1, value: "bar" }, @@ -235,7 +235,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 2, timestamp: -1, value: "baz" } @@ -243,11 +243,11 @@ defmodule Kayrock.MessageSerdeTest do } expect = - <<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168, - 31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + <<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91, + 168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, - 0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0, - 0, 0, 1, 6, 98, 97, 122, 0>> + 0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114, + 0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>> got = IO.iodata_to_binary(RecordBatch.serialize(record_batch)) assert got == expect, compare_binaries(got, expect)