From ea5084b38771c8a7ed10ed6a910360b92650ce51 Mon Sep 17 00:00:00 2001 From: Piotr Rybarczyk Date: Fri, 19 Jan 2024 14:06:57 +0100 Subject: [PATCH] Produce integration tests (#31) * Add base produce integration test cases * Add more complex integration test cases * Migrate compression tests --- test/integration/compression_test.exs | 139 +++++++ test/integration/producer_test.exs | 370 +++++++++++++++++ test/integration/topic_management_test.exs | 17 +- test/kayrock/client/produce_test.exs | 443 --------------------- test/support/integration_case.ex | 9 +- test/support/request_factory.ex | 84 ++++ 6 files changed, 601 insertions(+), 461 deletions(-) create mode 100644 test/integration/compression_test.exs create mode 100644 test/integration/producer_test.exs delete mode 100644 test/kayrock/client/produce_test.exs create mode 100644 test/support/request_factory.ex diff --git a/test/integration/compression_test.exs b/test/integration/compression_test.exs new file mode 100644 index 0000000..ed819fa --- /dev/null +++ b/test/integration/compression_test.exs @@ -0,0 +1,139 @@ +defmodule Kayrock.Client.CompressionTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + describe "with compression" do + setup do + on_exit(fn -> + Application.put_env(:kayrock, :snappy_module, :snappy) + end) + + :ok + end + + test "gzip produce works", %{kafka: kafka} do + {:ok, client_pid} = build_client(kafka) + topic_name = create_topic(client_pid) + + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_batch = %Kayrock.RecordBatch{ + attributes: 1, + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + headers: [], + timestamp: timestamp, + attributes: 1 + } + ] + } + + {:ok, resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + + partition_resp = + resp.responses |> List.first() |> Map.get(:partition_responses) |> List.first() + + partition = partition_resp |> Map.get(:partition) + offset = partition_resp |> Map.get(:base_offset) + + {:ok, resp} = Kayrock.fetch(client_pid, topic_name, partition, offset) + + assert_record_batch(resp, %Kayrock.RecordBatch.Record{key: "1", value: "foo"}) + end + + test "using snappyer produce works", %{kafka: kafka} do + Application.put_env(:kayrock, :snappy_module, :snappyer) + {:ok, client_pid} = build_client(kafka) + topic_name = create_topic(client_pid) + + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_batch = %Kayrock.RecordBatch{ + attributes: 2, + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + headers: [], + timestamp: timestamp, + attributes: 1 + } + ] + } + + {:ok, resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + + partition_resp = + resp.responses |> List.first() |> Map.get(:partition_responses) |> List.first() + + partition = partition_resp |> Map.get(:partition) + offset = partition_resp |> Map.get(:base_offset) + + {:ok, resp} = Kayrock.fetch(client_pid, topic_name, partition, offset) + + assert_record_batch(resp, %Kayrock.RecordBatch.Record{key: "1", value: "foo"}) + end + + test "using snappy-erlang-nif produce works", %{kafka: kafka} do + {:ok, client_pid} = build_client(kafka) + topic_name = create_topic(client_pid) + + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_batch = %Kayrock.RecordBatch{ + attributes: 2, + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + headers: [], + timestamp: timestamp, + attributes: 1 + } + ] + } + + {:ok, resp} = Kayrock.produce(client_pid, record_batch, topic_name, 0) + + partition_resp = + resp.responses |> List.first() |> Map.get(:partition_responses) |> List.first() + + partition = partition_resp |> Map.get(:partition) + offset = partition_resp |> Map.get(:base_offset) + + {:ok, resp} = Kayrock.fetch(client_pid, topic_name, partition, offset) + + assert_record_batch(resp, %Kayrock.RecordBatch.Record{key: "1", value: "foo"}) + end + end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, 5) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end + + defp assert_record_batch(response, record) do + responses = + response.responses |> List.first() |> Map.get(:partition_responses) |> List.first() + + record_set = responses |> Map.get(:record_set) |> List.first() + [received_record] = record_set.records + + assert received_record.key == record.key + assert received_record.value == record.value + end +end diff --git a/test/integration/producer_test.exs b/test/integration/producer_test.exs new file mode 100644 index 0000000..4e5fc89 --- /dev/null +++ b/test/integration/producer_test.exs @@ -0,0 +1,370 @@ +defmodule Kayrock.Integration.ProducerTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + describe "Produce API & Fetch API" do + for version <- [0, 1] do + test "v#{version} - produce and reads data using message set", %{kafka: kafka} do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.MessageSet{ + messages: [ + %Kayrock.MessageSet.Message{ + key: "1", + value: "test", + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "test" + end + end + + for version <- [2, 3] do + test "v#{version} - produce and reads data using record batch", %{kafka: kafka} do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "test" + assert message.timestamp == timestamp + end + end + + for version <- [4, 5, 6, 7] do + test "v#{version} - produce and reads data using record batch", %{kafka: kafka} do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = + List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + + assert message.value == "test" + assert message.timestamp == timestamp + assert message.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + end + end + end + + describe "with non existing topic" do + test "it will return error code", %{kafka: kafka} do + api_version = 5 + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = unique_string() + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 3 + end + end + + describe "with multiple topics and partitions" do + test "with multiple partitions for single topic", %{kafka: kafka} do + api_version = 5 + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set_one = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test-one", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + record_set_two = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "2", + value: "test-two", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_data = [ + [record_set: record_set_one, partition: 0], + [record_set: record_set_two, partition: 1] + ] + + produce_message_request = + produce_messages_request(topic_name, produce_data, 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_one_resp, partition_two_resp] = + response.partition_responses |> Enum.sort_by(& &1.partition) + + assert partition_one_resp.error_code == 0 + assert partition_two_resp.error_code == 0 + + partition_one_offset = partition_one_resp.base_offset + partition_two_offset = partition_two_resp.base_offset + + # [THEN] Fetch message from topic + partition_data = [ + [topic: topic_name, partition: 0, fetch_offset: partition_one_offset], + [topic: topic_name, partition: 1, fetch_offset: partition_two_offset] + ] + + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [record_batch_one, record_batch_two] = + Enum.sort_by(response.partition_responses, & &1.partition_header.partition) + + assert record_batch_one.partition_header.partition == 0 + assert record_batch_two.partition_header.partition == 1 + + [message_one] = record_batch_one.record_set |> List.first() |> Map.get(:records) + assert message_one.value == "test-one" + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + [message_two] = record_batch_two.record_set |> List.first() |> Map.get(:records) + assert message_two.value == "test-two" + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + end + + test "with multiple topics for single partition", %{kafka: kafka} do + api_version = 5 + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name_one = create_topic(client_pid, api_version) + topic_name_two = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set_one = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test-one", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + record_set_two = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "2", + value: "test-two", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_request_one = + produce_messages_request(topic_name_one, [[record_set: record_set_one]], 1, api_version) + + produce_request_two = + produce_messages_request(topic_name_two, [[record_set: record_set_two]], 1, api_version) + + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request_one, :controller) + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request_two, :controller) + + # [THEN] Fetch message from topic + partition_data = [ + [topic: topic_name_one, partition: 0, fetch_offset: 0], + [topic: topic_name_two, partition: 0, fetch_offset: 0] + ] + + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + response_one = Enum.find(resp.responses, &(&1.topic == topic_name_one)) + response_two = Enum.find(resp.responses, &(&1.topic == topic_name_two)) + + # [THEN] Verify message data + [record_batch_one] = response_one.partition_responses + assert record_batch_one.partition_header.partition == 0 + + [message_one] = record_batch_one.record_set |> List.first() |> Map.get(:records) + assert message_one.value == "test-one" + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + [record_batch_two] = response_two.partition_responses + assert record_batch_two.partition_header.partition == 0 + + [message_two] = record_batch_two.record_set |> List.first() |> Map.get(:records) + assert message_two.value == "test-two" + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + end + end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid, api_version) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, api_version) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end +end diff --git a/test/integration/topic_management_test.exs b/test/integration/topic_management_test.exs index d2aa362..395aa33 100644 --- a/test/integration/topic_management_test.exs +++ b/test/integration/topic_management_test.exs @@ -4,6 +4,7 @@ defmodule Kayrock.Integration.TopicManagementTest do import Kayrock.TestSupport import Kayrock.Convenience + import Kayrock.RequestFactory container(:kafka, KafkaContainer.new(), shared: true) @@ -60,22 +61,6 @@ defmodule Kayrock.Integration.TopicManagementTest do end end - # Helpers - defp create_topic_request(topic_name, api_version) do - api_version = min(Kayrock.CreateTopics.max_vsn(), api_version) - request = Kayrock.CreateTopics.get_request_struct(api_version) - - topic_config = %{ - topic: topic_name, - num_partitions: 3, - replication_factor: 1, - replica_assignment: [], - config_entries: [] - } - - %{request | create_topic_requests: [topic_config], timeout: 1000} - end - defp create_topic_partition(topic_name, api_version) do api_version = min(Kayrock.CreatePartitions.max_vsn(), api_version) request = Kayrock.CreatePartitions.get_request_struct(api_version) diff --git a/test/kayrock/client/produce_test.exs b/test/kayrock/client/produce_test.exs deleted file mode 100644 index 30c8708..0000000 --- a/test/kayrock/client/produce_test.exs +++ /dev/null @@ -1,443 +0,0 @@ -defmodule Kayrock.Client.ProduceTest do - use Kayrock.ClientCase - - alias Kayrock.RecordBatch - alias Kayrock.RecordBatch.Record - alias Kayrock.RecordBatch.RecordHeader - - test "Simple produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"]) - {:ok, _} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 0, - base_sequence: -1, - batch_length: 79, - batch_offset: first_offset, - crc: -784_342_914, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - - test "gzip produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :gzip) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 1, - base_sequence: -1, - batch_length: 94, - batch_offset: first_offset, - crc: 1_821_682_799, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - - describe "with snappy compression" do - setup do - on_exit(fn -> - Application.put_env(:kayrock, :snappy_module, :snappy) - end) - - :ok - end - - test "using snappyer produce works", %{client: client} do - Application.put_env(:kayrock, :snappy_module, :snappyer) - - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 2, - base_sequence: -1, - batch_length: 101, - batch_offset: first_offset, - crc: 468_182_773, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - - test "using snappy-erlang-nif produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 2, - base_sequence: -1, - batch_length: 101, - batch_offset: first_offset, - crc: 468_182_773, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - end - - test "Produce with key, value and headers works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "full_record_produce") - - headers = [ - %RecordHeader{key: "source", value: "System-X"}, - %RecordHeader{key: "type", value: "HeaderCreatedEvent"} - ] - - records = [%Record{headers: headers, key: "rd-k", value: "record-value-here"}] - - record_batch = %RecordBatch{ - attributes: 0, - records: records - } - - {:ok, _} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 0, - base_sequence: -1, - batch_length: 118, - batch_offset: first_offset, - crc: -1_972_253_040, - first_timestamp: -1, - last_offset_delta: 0, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [ - %Kayrock.RecordBatch.RecordHeader{ - key: "source", - value: "System-X" - }, - %Kayrock.RecordBatch.RecordHeader{ - key: "type", - value: "HeaderCreatedEvent" - } - ], - key: "rd-k", - offset: first_offset, - timestamp: -1, - value: "record-value-here" - } - ] - } - ] - } - ], - topic: "full_record_produce" - } - ], - throttle_time_ms: 0 - } - end -end diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex index cc65095..b3b91d7 100644 --- a/test/support/integration_case.ex +++ b/test/support/integration_case.ex @@ -16,8 +16,13 @@ defmodule Kayrock.IntegrationCase do end setup_all do - {:ok, _pid} = Testcontainers.start_link() - :ok + case Testcontainers.start_link() do + {:ok, _} -> + :ok + + {:error, {:already_started, _}} -> + :ok + end end else defmodule TestcontainersStub do diff --git a/test/support/request_factory.ex b/test/support/request_factory.ex new file mode 100644 index 0000000..cfb052a --- /dev/null +++ b/test/support/request_factory.ex @@ -0,0 +1,84 @@ +defmodule Kayrock.RequestFactory do + @moduledoc """ + Documentation for Kayrock.RequestFactory. + """ + + @doc """ + Creates a request to create a topic + Uses min of api_version and max supported version + """ + def create_topic_request(topic_name, api_version) do + api_version = min(Kayrock.CreateTopics.max_vsn(), api_version) + request = Kayrock.CreateTopics.get_request_struct(api_version) + + topic_config = %{ + topic: topic_name, + num_partitions: 3, + replication_factor: 1, + replica_assignment: [], + config_entries: [] + } + + %{request | create_topic_requests: [topic_config], timeout: 1000} + end + + @doc """ + Creates a request to produce a message to a topic + Uses min of api_version and max supported version + """ + def produce_messages_request(topic_name, data, ack, api_version) do + api_version = min(Kayrock.Produce.max_vsn(), api_version) + request = Kayrock.Produce.get_request_struct(api_version) + + topic_data = [ + %{ + topic: topic_name, + data: + Enum.map(data, fn datum -> + %{ + partition: Keyword.get(datum, :partition, 0), + record_set: Keyword.get(datum, :record_set, []) + } + end) + } + ] + + %{request | topic_data: topic_data, acks: ack, timeout: 1000} + end + + @doc """ + Creates a request to fetch messages from a topic + Uses min of api_version and max supported version + """ + def fetch_messages_request(partition_data, opts, api_version) do + api_version = min(Kayrock.Fetch.max_vsn(), api_version) + request = Kayrock.Fetch.get_request_struct(api_version) + max_bytes = Keyword.get(opts, :max_bytes, 1_000_000) + + request_date = %{ + replica_id: -1, + max_wait_time: Keyword.get(opts, :max_wait_time, 1000), + min_bytes: Keyword.get(opts, :min_bytes, 0), + max_bytes: Keyword.get(opts, :max_bytes, max_bytes), + isolation_level: Keyword.get(opts, :isolation_level, 1), + session_id: Keyword.get(opts, :session_id, 0), + epoch: Keyword.get(opts, :epoch, 0), + topics: + Enum.map(partition_data, fn partition -> + %{ + topic: Keyword.fetch!(partition, :topic), + partitions: [ + %{ + partition: Keyword.fetch!(partition, :partition), + fetch_offset: Keyword.get(partition, :fetch_offset, 0), + max_bytes: Keyword.get(partition, :max_bytes, max_bytes), + log_start_offset: Keyword.get(partition, :log_start_offset, 0) + } + ] + } + end) + } + + %{Map.merge(request, request_date) | replica_id: -1} + end +end