Skip to content

Commit

Permalink
Add simple single broker behaviour testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
Argonus committed Jan 24, 2024
1 parent 43d9124 commit 6cb722e
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
111 changes: 111 additions & 0 deletions test/integration/behaviour/single_broker_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
defmodule Kayrock.Integration.Behaviour.SingleBrokerTest do
use Kayrock.IntegrationCase
use ExUnit.Case, async: true

import Kayrock.TestSupport
import Kayrock.RequestFactory

container(:kafka, KafkaContainer.new(), shared: true)

test "for single broker lifecycle", %{kafka: kafka} do
# [WHEN] There is client connected to broker
{:ok, client_pid} = build_client(kafka)

# [AND] There is a topic created
topic_name = create_topic(client_pid, 0)

# [WHEN] Client can read from empty topic
fetch_request = fetch_messages_request([[topic: topic_name]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives empty response
[%{partition_responses: [%{record_set: record_set}]}] = resp.responses
assert is_nil(record_set)

# [WHEN] Client can write to topic
record_set = record_set([{"1", "test-one"}])
produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5)
{:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller)

# [AND] Fetch message from that topic
fetch_request = fetch_messages_request([[topic: topic_name]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives message
[%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses
[record] = record_set.records
assert record.key == "1"
assert record.value == "test-one"
assert record.offset == 0

# [WHEN] Client can write multiple messages to topic
record_set = record_set([{"2", "test-two"}, {"3", "test-three"}])
produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5)
{:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller)

# [AND] Fetch messages from that topic
fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 1]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives messages
[%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses
[record_one, record_two] = record_set.records
assert record_one.key == "2"
assert record_one.value == "test-two"
assert record_one.offset == 1

assert record_two.key == "3"
assert record_two.value == "test-three"
assert record_two.offset == 2

# [WHEN] Client is closed
:ok = Kayrock.Client.stop(client_pid)

# [AND] New client is created
{:ok, client_pid} = build_client(kafka)

# [AND] Fetch messages from that topic
fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 0]], [], 5)
{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

# [THEN] It receives messages
[%{partition_responses: [%{record_set: [record_set_one, record_set_two]}]}] = resp.responses
[record] = record_set_one.records
assert record.key == "1"
assert record.value == "test-one"
assert record.offset == 0

[record_one, record_two] = record_set_two.records
assert record_one.key == "2"
assert record_one.value == "test-two"
assert record_one.offset == 1

assert record_two.key == "3"
assert record_two.value == "test-three"
assert record_two.offset == 2
end

defp build_client(kafka) do
uris = [{"localhost", Container.mapped_port(kafka, 9092)}]

Check warning on line 89 in test/integration/behaviour/single_broker_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.10, 22.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)

Check warning on line 89 in test/integration/behaviour/single_broker_test.exs

View workflow job for this annotation

GitHub Actions / runner / Test (1.13, 24.3)

Container.mapped_port/2 is undefined (module Container is not available or is yet to be defined)
Kayrock.Client.start_link(uris)
end

defp create_topic(client_pid, api_version) do
topic_name = unique_string()
create_request = create_topic_request(topic_name, api_version)
{:ok, _} = Kayrock.client_call(client_pid, create_request, :controller)
topic_name
end

defp record_set(key_values) do
%Kayrock.RecordBatch{
records:
Enum.map(key_values, fn {key, value} ->
%Kayrock.RecordBatch.Record{
key: key,
value: value
}
end)
}
end
end
2 changes: 1 addition & 1 deletion test/support/request_factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ defmodule Kayrock.RequestFactory do
topic: Keyword.fetch!(partition, :topic),
partitions: [
%{
partition: Keyword.fetch!(partition, :partition),
partition: Keyword.get(partition, :partition, 0),
fetch_offset: Keyword.get(partition, :fetch_offset, 0),
max_bytes: Keyword.get(partition, :max_bytes, max_bytes),
log_start_offset: Keyword.get(partition, :log_start_offset, 0)
Expand Down

0 comments on commit 6cb722e

Please sign in to comment.