Skip to content

Commit

Permalink
Missing tests and deps (#36)
Browse files Browse the repository at this point in the history
* Add missing test for incomplete data

* Update test containers to support different elixir versions
  • Loading branch information
Argonus authored Nov 29, 2024
1 parent 1204a83 commit 156d79c
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 67 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
elixir: ['1.16']
otp: ['26.2']
otp: ['25.3.2.9']

steps:
- name: Cancel previous runs
Expand Down Expand Up @@ -63,7 +63,7 @@ jobs:
fail-fast: false
matrix:
elixir: ['1.16']
otp: ['26.2']
otp: ['25.3.2.9']

steps:
- name: Cancel Previous Runs
Expand Down
1 change: 0 additions & 1 deletion lib/kayrock/record_batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ defmodule Kayrock.RecordBatch do

@doc """
Direct deserializer
Supplied for compatibility with the Request protocol
"""
@spec deserialize(binary) :: nil | MessageSet.t() | t
Expand Down
12 changes: 2 additions & 10 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,9 @@ defmodule Kayrock.MixProject do
{:excoveralls, "~> 0.18", only: :test},
{:kafka_protocol, "~> 2.4.1", only: [:dev, :test]},
{:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]},
{:snappyer, "~> 1.2", only: [:dev, :test]}
{:snappyer, "~> 1.2", only: [:dev, :test]},
{:testcontainers, "~> 1.6.0", only: [:dev, :test]}
]
|> integration_test_deps()
end

defp integration_test_deps(deps_list) do
if Version.match?(System.version(), ">= 1.15.0") do
[{:testcontainers, "~> 1.6", only: :test} | deps_list]
else
deps_list
end
end

defp elixirc_paths(:test), do: ["lib", "test/support"]
Expand Down
10 changes: 6 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,29 @@
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"ex_docker_engine_api": {:hex, :ex_docker_engine_api, "1.43.1", "1161e34b6bea5cef84d8fdc1d5d510fcb0c463941ce84c36f4a0f44a9096eb96", [:mix], [{:hackney, "~> 1.20", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "ec8fc499389aeef56ddca67e89e9e98098cff50587b56e8b4613279f382793b1"},
"excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"fs": {:hex, :fs, "8.6.1", "7c9c0d0211e8c520e4e9eda63b960605c2711839f47285e6166c332d973be8ea", [:rebar3], [], "hexpm", "61ea2bdaedae4e2024d0d25c63e44dccf65622d4402db4a2df12868d1546503f"},
"hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"kafka_protocol": {:hex, :kafka_protocol, "2.4.1", "9e89afc740f57d17bec33f0f21dc23e7cbfd8c9ed3b0d6b9fc3a6bd4a827c088", [:rebar, :rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.6", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "86c9e1c6496273a6d1a02dc0e6ef7479d70441d526970abe667e2cf4deb1df21"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
"mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
"snappy": {:git, "https://github.com/fdmanana/snappy-erlang-nif", "e8907ee8e37cfa07d933a070669a88798082c3d7", []},
"snappyer": {:hex, :snappyer, "1.2.6", "34181e3233f68a92044e176fe96e54fee7957acc2be554f0460d799c495166c2", [:rebar3], [], "hexpm", "d538d1e8892af09dc8b2771b2652c9d70f009cd1556246b3e22706df643f47b4"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"},
"tesla": {:hex, :tesla, "1.13.2", "85afa342eb2ac0fee830cf649dbd19179b6b359bec4710d02a3d5d587f016910", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:mox, "~> 1.0", [hex: :mox, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "960609848f1ef654c3cdfad68453cd84a5febecb6ed9fed9416e36cd9cd724f9"},
"testcontainers": {:hex, :testcontainers, "1.6.0", "14b3251f01ce0b1ada716130d371ba0b6cb1ce2904aa38bd58e5ff4194f4d88f", [:mix], [{:ecto, "~> 3.3", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.3", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:ex_docker_engine_api, "~> 1.43.1", [hex: :ex_docker_engine_api, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "3f812407f232954999a3a2e05b2802e1d8d1afba120533c42b32c7cc91d35daf"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
Expand Down
87 changes: 75 additions & 12 deletions test/integration/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,8 @@ defmodule Kayrock.Integration.ProducerTest do
} do
api_version = unquote(version)
{:ok, client_pid} = build_client(kafka)
long_header = ?a..?z |> Enum.to_list() |> Enum.take_random(12) |> to_string()
message_content = ?a..?z |> Enum.to_list() |> Enum.take_random(50) |> to_string()

# Create Topic
topic_name = create_topic(client_pid, api_version)
Expand All @@ -341,23 +343,23 @@ defmodule Kayrock.Integration.ProducerTest do
records: [
%Kayrock.RecordBatch.Record{
key: "1",
value: "foo",
value: "#{message_content} 1",
timestamp: timestamp,
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}],
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}],
attributes: 0
},
%Kayrock.RecordBatch.Record{
key: "1",
value: "bar",
value: "#{message_content} 2",
timestamp: timestamp,
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}],
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}],
attributes: 0
},
%Kayrock.RecordBatch.Record{
key: "1",
value: "baz",
value: "#{message_content} 3",
timestamp: timestamp,
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}],
headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}],
attributes: 0
}
]
Expand Down Expand Up @@ -387,20 +389,29 @@ defmodule Kayrock.Integration.ProducerTest do
[message_one, message_two, message_three] =
List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records)

assert message_one.value == "foo"
assert message_one.value == "#{message_content} 1"
assert message_one.offset == 0
assert message_one.timestamp == timestamp
assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}]

assert message_two.value == "bar"
assert message_one.headers == [
%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}
]

assert message_two.value == "#{message_content} 2"
assert message_two.offset == 1
assert message_two.timestamp == timestamp
assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}]

assert message_three.value == "baz"
assert message_two.headers == [
%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}
]

assert message_three.value == "#{message_content} 3"
assert message_three.offset == 2
assert message_three.timestamp == timestamp
assert message_three.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}]

assert message_three.headers == [
%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}
]

# [THEN] Produce another message
record = %Kayrock.RecordBatch.Record{
Expand Down Expand Up @@ -439,6 +450,58 @@ defmodule Kayrock.Integration.ProducerTest do
assert message.value == "zab"
assert message.offset == 3
assert message.timestamp == timestamp

# [THEN] Fetch incomplete messages from topic
partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0]]
fetch_request = fetch_messages_request(partition_data, [max_bytes: 100], api_version)

{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

[response] = resp.responses
assert response.topic == topic_name

# [THEN] Verify message data
[%{records: records}] = List.first(response.partition_responses).record_set
assert length(records) == 3

assert List.first(records).value == "#{message_content} 1"
assert List.first(records).offset == 0

assert List.first(records).headers == [
%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}
]

assert List.last(records).value == "#{message_content} 3"
assert List.last(records).offset == 2

assert List.last(records).headers == [
%Kayrock.RecordBatch.RecordHeader{key: "1", value: long_header}
]

# [THEN] Fetch complete messages from topic
partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0]]
fetch_request = fetch_messages_request(partition_data, [], api_version)

{:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller)

[response] = resp.responses
assert response.topic == topic_name

# [THEN] Verify message data
[%{records: records}, %{records: records_two}] =
List.first(response.partition_responses).record_set

assert length(records) == 3

assert List.first(records).value == "#{message_content} 1"
assert List.first(records).offset == 0

assert List.last(records).value == "#{message_content} 3"
assert List.last(records).offset == 2

assert length(records_two) == 1
assert List.first(records_two).value == "zab"
assert List.first(records_two).offset == 3
end
end
end
Expand Down
Loading

0 comments on commit 156d79c

Please sign in to comment.