diff --git a/lib/kayrock/message_set.ex b/lib/kayrock/message_set.ex index 7e0aadf..d5e218d 100644 --- a/lib/kayrock/message_set.ex +++ b/lib/kayrock/message_set.ex @@ -81,13 +81,11 @@ defmodule Kayrock.MessageSet do } c -> - decompressed = Kayrock.Compression.decompress(c, value) - - if magic == 1 do - Enum.reverse(do_deserialize(decompressed, [], offset)) - else - Enum.reverse(do_deserialize(decompressed, [], 0)) - end + c + |> Kayrock.Compression.decompress(value) + |> do_deserialize([], 0) + |> correct_offsets(offset) + |> Enum.reverse() end do_deserialize(orig_rest, [msg | acc], add_offset) @@ -97,6 +95,21 @@ defmodule Kayrock.MessageSet do Enum.reverse(List.flatten(acc)) end + # All other cases, compressed inner messages should have relative offset, with below attributes: + # - The container message should have the 'real' offset + # - The container message's offset should be the 'real' offset of the last message in the compressed batch + defp correct_offsets(messages, real_offset) do + max_relative_offset = messages |> List.last() |> Map.fetch!(:offset) + + if max_relative_offset == real_offset do + messages + else + Enum.map(messages, fn msg -> + Map.update!(msg, :offset, &(&1 + real_offset)) + end) + end + end + defp create_message_set([], _compression_type), do: {"", 0} defp create_message_set(messages, :none) do diff --git a/lib/kayrock/record_batch.ex b/lib/kayrock/record_batch.ex index ea6d8aa..7ebe8d3 100644 --- a/lib/kayrock/record_batch.ex +++ b/lib/kayrock/record_batch.ex @@ -100,7 +100,6 @@ defmodule Kayrock.RecordBatch do @spec serialize(t) :: iodata def serialize(%__MODULE__{} = record_batch) do [first_record | _] = record_batch.records - num_records = length(record_batch.records) max_timestamp = @@ -108,15 +107,16 @@ defmodule Kayrock.RecordBatch do |> Enum.map(fn r -> r.timestamp end) |> Enum.max() - base_offset = first_record.offset base_timestamp = first_record.timestamp records = - for record <- record_batch.records do + record_batch.records + |> Enum.with_index() + |> Enum.map(fn {record, offset_delta} -> record - |> normalize_record(base_offset, base_timestamp) - |> serialize_record - end + |> normalize_record(offset_delta, base_timestamp) + |> serialize_record() + end) records = case compression_type(record_batch.attributes) do @@ -163,11 +163,9 @@ defmodule Kayrock.RecordBatch do nil end - def deserialize(msg_set_size, msg_set_data) - when byte_size(msg_set_data) == msg_set_size do + def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do case get_magic_byte(msg_set_data) do {2, batch_offset, batch_length, partition_leader_epoch, rest} -> - <<>> deserialize(rest, [], batch_offset, batch_length, partition_leader_epoch) {magic, _, _, _, _} -> @@ -405,11 +403,11 @@ defmodule Kayrock.RecordBatch do [encode_varint(IO.iodata_length(without_length)), without_length] end - defp normalize_record(record, base_offset, base_timestamp) do + defp normalize_record(record, offset_delta, base_timestamp) do %{ record | timestamp: maybe_delta(record.timestamp, base_timestamp), - offset: maybe_delta(record.offset, base_offset) + offset: offset_delta } end diff --git a/test/integration/behaviour/single_broker_test.exs b/test/integration/behaviour/single_broker_test.exs new file mode 100644 index 0000000..05c274c --- /dev/null +++ b/test/integration/behaviour/single_broker_test.exs @@ -0,0 +1,111 @@ +defmodule Kayrock.Integration.Behaviour.SingleBrokerTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + test "for single broker lifecycle", %{kafka: kafka} do + # [WHEN] There is client connected to broker + {:ok, client_pid} = build_client(kafka) + + # [AND] There is a topic created + topic_name = create_topic(client_pid, 0) + + # [WHEN] Client can read from empty topic + fetch_request = fetch_messages_request([[topic: topic_name]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives empty response + [%{partition_responses: [%{record_set: record_set}]}] = resp.responses + assert is_nil(record_set) + + # [WHEN] Client can write to topic + record_set = record_set([{"1", "test-one"}]) + produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5) + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller) + + # [AND] Fetch message from that topic + fetch_request = fetch_messages_request([[topic: topic_name]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives message + [%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses + [record] = record_set.records + assert record.key == "1" + assert record.value == "test-one" + assert record.offset == 0 + + # [WHEN] Client can write multiple messages to topic + record_set = record_set([{"2", "test-two"}, {"3", "test-three"}]) + produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5) + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller) + + # [AND] Fetch messages from that topic + fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 1]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives messages + [%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses + [record_one, record_two] = record_set.records + assert record_one.key == "2" + assert record_one.value == "test-two" + assert record_one.offset == 1 + + assert record_two.key == "3" + assert record_two.value == "test-three" + assert record_two.offset == 2 + + # [WHEN] Client is closed + :ok = Kayrock.Client.stop(client_pid) + + # [AND] New client is created + {:ok, client_pid} = build_client(kafka) + + # [AND] Fetch messages from that topic + fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 0]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives messages + [%{partition_responses: [%{record_set: [record_set_one, record_set_two]}]}] = resp.responses + [record] = record_set_one.records + assert record.key == "1" + assert record.value == "test-one" + assert record.offset == 0 + + [record_one, record_two] = record_set_two.records + assert record_one.key == "2" + assert record_one.value == "test-two" + assert record_one.offset == 1 + + assert record_two.key == "3" + assert record_two.value == "test-three" + assert record_two.offset == 2 + end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid, api_version) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, api_version) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end + + defp record_set(key_values) do + %Kayrock.RecordBatch{ + records: + Enum.map(key_values, fn {key, value} -> + %Kayrock.RecordBatch.Record{ + key: key, + value: value + } + end) + } + end +end diff --git a/test/integration/compression_test.exs b/test/integration/compression_test.exs index ed819fa..786c606 100644 --- a/test/integration/compression_test.exs +++ b/test/integration/compression_test.exs @@ -7,110 +7,316 @@ defmodule Kayrock.Client.CompressionTest do container(:kafka, KafkaContainer.new(), shared: true) - describe "with compression" do - setup do - on_exit(fn -> - Application.put_env(:kayrock, :snappy_module, :snappy) - end) - - :ok - end - - test "gzip produce works", %{kafka: kafka} do - {:ok, client_pid} = build_client(kafka) - topic_name = create_topic(client_pid) - - timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + describe "Produce API & Fetch API with compression" do + for {compression, compression_num} <- [{"gzip", 1}, {"snappy", 2}] do + for version <- [0, 1] do + test "v#{version} - produce and reads data using message set with compression: #{compression}", + %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.MessageSet{ + messages: [ + %Kayrock.MessageSet.Message{ + compression: unquote(compression) |> String.to_atom(), + key: "1", + value: "foo", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + compression: unquote(compression) |> String.to_atom(), + key: "1", + value: "bar", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + compression: unquote(compression) |> String.to_atom(), + key: "1", + value: "baz", + attributes: 0 + } + ] + } - record_batch = %Kayrock.RecordBatch{ - attributes: 1, - records: [ - %Kayrock.RecordBatch.Record{ + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == 0 + + assert message_two.value == "bar" + assert message_two.offset == 1 + + assert message_three.value == "baz" + assert message_three.offset == 2 + end + end + + for version <- [2, 3] do + test "v#{version} - produce and reads data with compression: #{compression}", %{ + kafka: kafka + } do + compression_num = unquote(compression_num) + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + records = [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + attributes: 0 + } + ] + + # [WHEN] Produce message with timestamp + record_set = %Kayrock.RecordBatch{attributes: compression_num, records: records} + + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + + # [THEN] Fetch message from topic + partition_data = [ + [topic: topic_name, partition: 0, fetch_offset: 0, log_start_offset: 0] + ] + + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ key: "1", - value: "foo", - headers: [], + value: "zab", timestamp: timestamp, - attributes: 1 + attributes: 0 } - ] - } - - {:ok, resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) - partition_resp = - resp.responses |> List.first() |> Map.get(:partition_responses) |> List.first() - - partition = partition_resp |> Map.get(:partition) - offset = partition_resp |> Map.get(:base_offset) - - {:ok, resp} = Kayrock.fetch(client_pid, topic_name, partition, offset) - - assert_record_batch(resp, %Kayrock.RecordBatch.Record{key: "1", value: "foo"}) - end + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end + end + + for version <- [4, 5, 6, 7] do + test "v#{version} - produce and reads data using message set with compression: #{compression}", + %{ + kafka: kafka + } do + compression_num = unquote(compression_num) + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.RecordBatch{ + attributes: compression_num, + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + } + ] + } - test "using snappyer produce works", %{kafka: kafka} do - Application.put_env(:kayrock, :snappy_module, :snappyer) - {:ok, client_pid} = build_client(kafka) - topic_name = create_topic(client_pid) + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) - timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name - record_batch = %Kayrock.RecordBatch{ - attributes: 2, - records: [ - %Kayrock.RecordBatch.Record{ - key: "1", - value: "foo", - headers: [], - timestamp: timestamp, - attributes: 1 - } - ] - } + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset - {:ok, resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) - partition_resp = - resp.responses |> List.first() |> Map.get(:partition_responses) |> List.first() + [response] = resp.responses + assert response.topic == topic_name - partition = partition_resp |> Map.get(:partition) - offset = partition_resp |> Map.get(:base_offset) + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set + |> List.first() + |> Map.get(:records) - {:ok, resp} = Kayrock.fetch(client_pid, topic_name, partition, offset) + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] - assert_record_batch(resp, %Kayrock.RecordBatch.Record{key: "1", value: "foo"}) - end + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] - test "using snappy-erlang-nif produce works", %{kafka: kafka} do - {:ok, client_pid} = build_client(kafka) - topic_name = create_topic(client_pid) + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp - timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + assert message_three.headers == [ + %Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"} + ] - record_batch = %Kayrock.RecordBatch{ - attributes: 2, - records: [ - %Kayrock.RecordBatch.Record{ + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ key: "1", - value: "foo", - headers: [], + value: "zab", timestamp: timestamp, - attributes: 1 + attributes: 0 } - ] - } - {:ok, resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name - partition_resp = - resp.responses |> List.first() |> Map.get(:partition_responses) |> List.first() + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset - partition = partition_resp |> Map.get(:partition) - offset = partition_resp |> Map.get(:base_offset) + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) - {:ok, resp} = Kayrock.fetch(client_pid, topic_name, partition, offset) + [response] = resp.responses + assert response.topic == topic_name - assert_record_batch(resp, %Kayrock.RecordBatch.Record{key: "1", value: "foo"}) + # [THEN] Verify message data + [message] = + List.first(response.partition_responses).record_set + |> List.first() + |> Map.get(:records) + + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end + end end end @@ -119,21 +325,10 @@ defmodule Kayrock.Client.CompressionTest do Kayrock.Client.start_link(uris) end - defp create_topic(client_pid) do + defp create_topic(client_pid, api_version) do topic_name = unique_string() - create_request = create_topic_request(topic_name, 5) + create_request = create_topic_request(topic_name, api_version) {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) topic_name end - - defp assert_record_batch(response, record) do - responses = - response.responses |> List.first() |> Map.get(:partition_responses) |> List.first() - - record_set = responses |> Map.get(:record_set) |> List.first() - [received_record] = record_set.records - - assert received_record.key == record.key - assert received_record.value == record.value - end end diff --git a/test/integration/producer_test.exs b/test/integration/producer_test.exs index 4e5fc89..8d65558 100644 --- a/test/integration/producer_test.exs +++ b/test/integration/producer_test.exs @@ -50,6 +50,71 @@ defmodule Kayrock.Integration.ProducerTest do # [THEN] Verify message data [message] = List.first(response.partition_responses).record_set.messages assert message.value == "test" + assert message.offset == offset + end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.MessageSet{ + messages: [ + %Kayrock.MessageSet.Message{ + key: "1", + value: "foo", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + key: "1", + value: "bar", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + key: "1", + value: "baz", + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == offset + + assert message_two.value == "bar" + assert message_two.offset == offset + 1 + + assert message_three.value == "baz" + assert message_three.offset == offset + 2 end end @@ -101,6 +166,112 @@ defmodule Kayrock.Integration.ProducerTest do assert message.value == "test" assert message.timestamp == timestamp end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + records = [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + attributes: 0 + } + ] + + # [WHEN] Produce message with timestamp + record_set = %Kayrock.RecordBatch{records: records} + + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0, log_start_offset: 0]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end end for version <- [4, 5, 6, 7] do @@ -154,6 +325,121 @@ defmodule Kayrock.Integration.ProducerTest do assert message.timestamp == timestamp assert message.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + assert message_three.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = + List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end end end diff --git a/test/kayrock/message_serde_test.exs b/test/kayrock/message_serde_test.exs index 05ac249..229beaf 100644 --- a/test/kayrock/message_serde_test.exs +++ b/test/kayrock/message_serde_test.exs @@ -173,7 +173,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 1, timestamp: -1, value: "bar" }, @@ -181,7 +181,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 2, timestamp: -1, value: "baz" } @@ -189,14 +189,29 @@ defmodule Kayrock.MessageSerdeTest do } expect = - <<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168, - 31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + <<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91, + 168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, - 0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0, - 0, 0, 1, 6, 98, 97, 122, 0>> + 0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114, + 0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>> got = IO.iodata_to_binary(RecordBatch.serialize(record_batch)) assert got == expect, compare_binaries(got, expect) + + <> = got + assert byte_size(rest) == size + + [got_batch] = RecordBatch.deserialize(rest) + + record_batch = + %{ + record_batch + | batch_length: got_batch.batch_length, + crc: got_batch.crc, + last_offset_delta: 2 + } + + assert got_batch == record_batch end test "using snappyer dependency" do @@ -227,7 +242,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 1, timestamp: -1, value: "bar" }, @@ -235,7 +250,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 2, timestamp: -1, value: "baz" } @@ -243,14 +258,29 @@ defmodule Kayrock.MessageSerdeTest do } expect = - <<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168, - 31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + <<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91, + 168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, - 0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0, - 0, 0, 1, 6, 98, 97, 122, 0>> + 0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114, + 0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>> got = IO.iodata_to_binary(RecordBatch.serialize(record_batch)) assert got == expect, compare_binaries(got, expect) + + <> = got + assert byte_size(rest) == size + + [got_batch] = RecordBatch.deserialize(rest) + + record_batch = + %{ + record_batch + | batch_length: got_batch.batch_length, + crc: got_batch.crc, + last_offset_delta: 2 + } + + assert got_batch == record_batch end end diff --git a/test/support/request_factory.ex b/test/support/request_factory.ex index cfb052a..ff66e65 100644 --- a/test/support/request_factory.ex +++ b/test/support/request_factory.ex @@ -69,7 +69,7 @@ defmodule Kayrock.RequestFactory do topic: Keyword.fetch!(partition, :topic), partitions: [ %{ - partition: Keyword.fetch!(partition, :partition), + partition: Keyword.get(partition, :partition, 0), fetch_offset: Keyword.get(partition, :fetch_offset, 0), max_bytes: Keyword.get(partition, :max_bytes, max_bytes), log_start_offset: Keyword.get(partition, :log_start_offset, 0)