From 506e9195f856194cabb13a5a211f91bf3cf8c84d Mon Sep 17 00:00:00 2001 From: Argonus Date: Fri, 19 Jan 2024 21:41:04 +0100 Subject: [PATCH] Fix producint multiple messages in record batch --- lib/kayrock/record_batch.ex | 18 +- test/integration/producer_test.exs | 278 +++++++++++++++++++++++++++++ 2 files changed, 287 insertions(+), 9 deletions(-) diff --git a/lib/kayrock/record_batch.ex b/lib/kayrock/record_batch.ex index ea6d8aa..ea72292 100644 --- a/lib/kayrock/record_batch.ex +++ b/lib/kayrock/record_batch.ex @@ -100,7 +100,6 @@ defmodule Kayrock.RecordBatch do @spec serialize(t) :: iodata def serialize(%__MODULE__{} = record_batch) do [first_record | _] = record_batch.records - num_records = length(record_batch.records) max_timestamp = @@ -112,11 +111,13 @@ defmodule Kayrock.RecordBatch do base_timestamp = first_record.timestamp records = - for record <- record_batch.records do + record_batch.records + |> Enum.with_index() + |> Enum.map(fn {record, offset_delta} -> record - |> normalize_record(base_offset, base_timestamp) - |> serialize_record - end + |> normalize_record(offset_delta, base_timestamp) + |> serialize_record() + end) records = case compression_type(record_batch.attributes) do @@ -163,8 +164,7 @@ defmodule Kayrock.RecordBatch do nil end - def deserialize(msg_set_size, msg_set_data) - when byte_size(msg_set_data) == msg_set_size do + def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do case get_magic_byte(msg_set_data) do {2, batch_offset, batch_length, partition_leader_epoch, rest} -> <<>> @@ -405,11 +405,11 @@ defmodule Kayrock.RecordBatch do [encode_varint(IO.iodata_length(without_length)), without_length] end - defp normalize_record(record, base_offset, base_timestamp) do + defp normalize_record(record, offset_delta, base_timestamp) do %{ record | timestamp: maybe_delta(record.timestamp, base_timestamp), - offset: maybe_delta(record.offset, base_offset) + offset: offset_delta } end diff --git a/test/integration/producer_test.exs b/test/integration/producer_test.exs index 4e5fc89..bc748be 100644 --- a/test/integration/producer_test.exs +++ b/test/integration/producer_test.exs @@ -51,6 +51,65 @@ defmodule Kayrock.Integration.ProducerTest do [message] = List.first(response.partition_responses).record_set.messages assert message.value == "test" end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.MessageSet{ + messages: [ + %Kayrock.MessageSet.Message{ + key: "1", + value: "foo", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + key: "1", + value: "bar", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + key: "1", + value: "baz", + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_two.value == "bar" + assert message_three.value == "baz" + end end for version <- [2, 3] do @@ -101,6 +160,112 @@ defmodule Kayrock.Integration.ProducerTest do assert message.value == "test" assert message.timestamp == timestamp end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + records = [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + attributes: 0 + } + ] + + # [WHEN] Produce message with timestamp + record_set = %Kayrock.RecordBatch{records: records} + + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0, log_start_offset: 0]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end end for version <- [4, 5, 6, 7] do @@ -154,6 +319,119 @@ defmodule Kayrock.Integration.ProducerTest do assert message.timestamp == timestamp assert message.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + assert message_three.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end end end