diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 46db08e..4e3088c 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -15,8 +15,8 @@ jobs: ACTIONS_ALLOW_UNSECURE_COMMANDS: true strategy: matrix: - elixir: ['1.12.3'] - otp: ['24.3.4'] + elixir: ['1.15'] + otp: ['26.1'] steps: - name: Cancel previous runs @@ -64,8 +64,8 @@ jobs: strategy: fail-fast: false matrix: - elixir: ['1.12.3'] - otp: ['24.3.4'] + elixir: ['1.15'] + otp: ['26.1'] steps: - name: Cancel Previous Runs diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml new file mode 100644 index 0000000..b236419 --- /dev/null +++ b/.github/workflows/integration.yml @@ -0,0 +1,93 @@ +name: CI Integration + +on: + pull_request: [] + +jobs: + dependencies: + name: integration | setup dependencies + runs-on: ubuntu-20.04 + env: + MIX_ENV: test + ACTIONS_ALLOW_UNSECURE_COMMANDS: true + strategy: + matrix: + elixir: ['1.15'] + otp: ['26.1'] + + steps: + - name: Cancel previous runs + uses: styfle/cancel-workflow-action@0.9.0 + with: + access_token: ${{ github.token }} + + - name: Checkout Github repo + uses: actions/checkout@v2 + + - name: Setup elixir & erlang environment + uses: erlef/setup-beam@v1 + with: + elixir-version: ${{matrix.elixir}} # Define the elixir version [required] + otp-version: ${{matrix.otp}} # Define the OTP version [required] + experimental-otp: true # More info https://github.com/actions/setup-elixir/issues/31 + + - name: Retrieve Cached Dependencies + uses: actions/cache@v2 + id: mix-cache + with: + path: | + deps + _build + key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }} + + - name: Install Dependencies + if: steps.mix-cache.outputs.cache-hit != 'true' + run: | + mkdir -p priv/plts + mix local.rebar --force + mix local.hex --force + mix deps.get + mix deps.compile + + integration_test: + name: Integration Test + runs-on: ubuntu-20.04 + needs: [dependencies] + env: + MIX_ENV: test + + strategy: + fail-fast: false + matrix: + elixir: ['1.15'] + otp: ['26.1'] + + steps: + - name: Cancel Previous Runs + uses: styfle/cancel-workflow-action@0.6.0 + with: + access_token: ${{ github.token }} + + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Setup elixir & erlang environment + uses: erlef/setup-beam@v1 + with: + elixir-version: ${{matrix.elixir}} # Define the elixir version [required] + otp-version: ${{matrix.otp}} # Define the OTP version [required] + experimental-otp: true # More info https://github.com/actions/setup-elixir/issues/31 + + - name: Retrieve Cached Dependencies + uses: actions/cache@v2 + id: mix-cache + with: + path: | + deps + _build + key: ${{ runner.os }}-${{ matrix.otp }}-${{ matrix.elixir }}-${{ hashFiles('mix.lock') }} + + - name: Run Test + run: mix test.integration diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f957d13..272e172 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,21 +8,19 @@ on: jobs: setup: - name: test / setup + name: test | setup dependencies runs-on: ubuntu-20.04 env: MIX_ENV: test strategy: matrix: pair: - - elixir: 1.14 - otp: 25.2 + - elixir: 1.15 + otp: 26.1 - elixir: 1.13 otp: 24.3 - - elixir: 1.11 - otp: 21.3 - - elixir: 1.8 - otp: 20.3 + - elixir: '1.10' + otp: 22.3 steps: - name: Cancel previous runs @@ -74,14 +72,12 @@ jobs: fail-fast: false matrix: pair: - - elixir: 1.14 - otp: 25.2 + - elixir: 1.15 + otp: 26.1 - elixir: 1.13 otp: 24.3 - - elixir: 1.11 - otp: 21.3 - - elixir: 1.8 - otp: 20.3 + - elixir: '1.10' + otp: 22.3 steps: - uses: actions/checkout@v2 @@ -107,4 +103,4 @@ jobs: key: ${{ runner.os }}-${{ matrix.pair.otp }}-${{ matrix.pair.elixir }}-mix-deps-compile-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} - name: Run unit tests - run: mix test \ No newline at end of file + run: mix test diff --git a/README.md b/README.md index a4ccbe7..9ad53b6 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Kayrock -![Elixir CI](https://github.com/dantswain/kayrock/workflows/Elixir%20CI/badge.svg) +[![CI Checks](https://github.com/kafkaex/kayrock/actions/workflows/checks.yml/badge.svg?branch=master)](https://github.com/kafkaex/kayrock/actions/workflows/checks.yml) +[![CI Tests](https://github.com/kafkaex/kayrock/actions/workflows/test.yml/badge.svg?branch=master)](https://github.com/kafkaex/kayrock/actions/workflows/test.yml) [![Module Version](https://img.shields.io/hexpm/v/kayrock.svg)](https://hex.pm/packages/kayrock) [![Hex Docs](https://img.shields.io/badge/hex-docs-lightgreen.svg)](https://hexdocs.pm/kayrock/) [![Total Download](https://img.shields.io/hexpm/dt/kayrock.svg)](https://hex.pm/packages/kayrock) @@ -107,3 +108,30 @@ a Kafka client. This serves a few purposes: Regardless, it is not a production-ready implementation. I would refer you to KafkaEx or brod for that. + +## Testing + +Kayrock includes a test suite that checks that the generated code is correct. +We have both unit tests and integration tests. + +The unit tests are run as part of the normal mix test process. To run unit tests + +```shell +mix test +``` + +The integration tests require a running docker, currently we are using docker-compose to run a kafka cluster. +But we are in a process of migrating to use [testcontainers](https://github.com/testcontainers/testcontainers-elixir) +to run kafka cluster. + +To run original integration tests + +```shell +mix test --include integration +``` + +To run integration tests with testcontainers + +```shell +mix test.integration +``` diff --git a/config/test.exs b/config/test.exs new file mode 100644 index 0000000..ef657aa --- /dev/null +++ b/config/test.exs @@ -0,0 +1 @@ +config :testcontainers, enabled: true diff --git a/lib/kayrock/message_set.ex b/lib/kayrock/message_set.ex index 862206f..975c5ff 100644 --- a/lib/kayrock/message_set.ex +++ b/lib/kayrock/message_set.ex @@ -81,13 +81,11 @@ defmodule Kayrock.MessageSet do } c -> - decompressed = Kayrock.Compression.decompress(c, value) - - if magic == 1 do - Enum.reverse(do_deserialize(decompressed, [], offset)) - else - Enum.reverse(do_deserialize(decompressed, [], 0)) - end + c + |> Kayrock.Compression.decompress(value) + |> do_deserialize([], 0) + |> correct_offsets(offset) + |> Enum.reverse() end do_deserialize(orig_rest, [msg | acc], add_offset) @@ -97,6 +95,21 @@ defmodule Kayrock.MessageSet do Enum.reverse(List.flatten(acc)) end + # All other cases, compressed inner messages should have relative offset, with below attributes: + # - The container message should have the 'real' offset + # - The container message's offset should be the 'real' offset of the last message in the compressed batch + defp correct_offsets(messages, real_offset) do + max_relative_offset = messages |> List.last() |> Map.fetch!(:offset) + + if max_relative_offset == real_offset do + messages + else + Enum.map(messages, fn msg -> + Map.update!(msg, :offset, &(&1 + real_offset)) + end) + end + end + defp create_message_set([], _compression_type), do: {"", 0} defp create_message_set(messages, :none) do diff --git a/lib/kayrock/record_batch.ex b/lib/kayrock/record_batch.ex index 51dec63..33ed4c2 100644 --- a/lib/kayrock/record_batch.ex +++ b/lib/kayrock/record_batch.ex @@ -100,7 +100,6 @@ defmodule Kayrock.RecordBatch do @spec serialize(t) :: iodata def serialize(%__MODULE__{} = record_batch) do [first_record | _] = record_batch.records - num_records = length(record_batch.records) max_timestamp = @@ -108,15 +107,16 @@ defmodule Kayrock.RecordBatch do |> Enum.map(fn r -> r.timestamp end) |> Enum.max() - base_offset = first_record.offset base_timestamp = first_record.timestamp records = - for record <- record_batch.records do + record_batch.records + |> Enum.with_index() + |> Enum.map(fn {record, offset_delta} -> record - |> normalize_record(base_offset, base_timestamp) - |> serialize_record - end + |> normalize_record(offset_delta, base_timestamp) + |> serialize_record() + end) records = case compression_type(record_batch.attributes) do @@ -163,11 +163,9 @@ defmodule Kayrock.RecordBatch do nil end - def deserialize(msg_set_size, msg_set_data) - when byte_size(msg_set_data) == msg_set_size do + def deserialize(msg_set_size, msg_set_data) when byte_size(msg_set_data) == msg_set_size do case get_magic_byte(msg_set_data) do {2, batch_offset, batch_length, partition_leader_epoch, rest} -> - <<>> deserialize(rest, [], batch_offset, batch_length, partition_leader_epoch) {magic, _, _, _, _} -> @@ -405,11 +403,11 @@ defmodule Kayrock.RecordBatch do [encode_varint(IO.iodata_length(without_length)), without_length] end - defp normalize_record(record, base_offset, base_timestamp) do + defp normalize_record(record, offset_delta, base_timestamp) do %{ record | timestamp: maybe_delta(record.timestamp, base_timestamp), - offset: maybe_delta(record.offset, base_offset) + offset: offset_delta } end diff --git a/mix.exs b/mix.exs index cf0cd4b..6b80f0e 100644 --- a/mix.exs +++ b/mix.exs @@ -1,21 +1,22 @@ defmodule Kayrock.MixProject do use Mix.Project - @source_url "https://github.com/dantswain/kayrock" + @source_url "https://github.com/kafkaex/kayrock" def project do [ app: :kayrock, - version: "0.1.15", - elixir: "~> 1.1", + version: "0.2.0", + elixir: "~> 1.10", elixirc_paths: elixirc_paths(Mix.env()), test_coverage: [tool: ExCoveralls], - preferred_cli_env: [coveralls: :test], + preferred_cli_env: [coveralls: :test, "test.integration": :test], start_permanent: Mix.env() == :prod, deps: deps(), + aliases: aliases(), dialyzer: [ plt_add_apps: [:mix], - flags: [:error_handling, :race_conditions] + flags: [:error_handling] ], description: "Elixir interface to the Kafka protocol", package: package(), @@ -41,19 +42,28 @@ defmodule Kayrock.MixProject do defp deps do [ # Core - {:crc32cer, "~>0.1.8"}, - {:varint, "~>1.2.0"}, - {:connection, "~>1.1.0"}, + {:crc32cer, "~> 0.1"}, + {:varint, "~> 1.2"}, + {:connection, "~> 1.1"}, # Dev/Test - {:credo, "~>1.5.0", only: [:dev, :test], runtime: false}, - {:dialyxir, "~> 1.0.0-rc.6", only: [:dev], runtime: false}, - {:ex_doc, "~>0.23.0", only: [:dev], runtime: false}, - {:excoveralls, "~>0.13.3", only: :test}, + {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, + {:dialyxir, "~> 1.4", only: [:dev], runtime: false}, + {:ex_doc, "~> 0.30", only: [:dev], runtime: false}, + {:excoveralls, "~> 0.18", only: :test}, {:kafka_protocol, "~> 2.4.1", only: [:dev, :test]}, {:snappy, git: "https://github.com/fdmanana/snappy-erlang-nif", only: [:dev, :test]}, {:snappyer, "~> 1.2", only: [:dev, :test]} ] + |> integration_test_deps() + end + + defp integration_test_deps(deps_list) do + if Version.match?(System.version(), ">= 1.15.0") do + [{:testcontainers, "~> 1.5"} | deps_list] + else + deps_list + end end defp elixirc_paths(:test), do: ["lib", "test/support"] @@ -62,10 +72,16 @@ defmodule Kayrock.MixProject do defp package do [ - maintainers: ["Dan Swain"], + maintainers: ["Dan Swain", "Argonus"], files: ["lib", "config/config.exs", "mix.exs", "README.md"], licenses: ["MIT"], links: %{"GitHub" => @source_url} ] end + + defp aliases do + [ + "test.integration": "test --only integration_v2" + ] + end end diff --git a/mix.lock b/mix.lock index da1cfd7..70e62b3 100644 --- a/mix.lock +++ b/mix.lock @@ -1,29 +1,35 @@ %{ - "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, - "certifi": {:hex, :certifi, "2.5.2", "b7cfeae9d2ed395695dd8201c57a2d019c0c43ecaf8b8bcb9320b40d6662f340", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "3b3b5f36493004ac3455966991eaf6e768ce9884693d9968055aeeeb1e575040"}, + "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, + "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"}, "crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"}, - "credo": {:hex, :credo, "1.5.0", "bf6af2ae803575376d6f6fae5470557706718503b677ef2ef7ba1ea15428ddb6", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "0bcedaa097dfb4d20d6b4bee05da40b0b07f0a21e2fda60d7ef81797591c7575"}, - "dialyxir": {:hex, :dialyxir, "1.0.0", "6a1fa629f7881a9f5aaf3a78f094b2a51a0357c843871b8bc98824e7342d00a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "aeb06588145fac14ca08d8061a142d52753dbc2cf7f0d00fc1013f53f8654654"}, + "credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"}, + "dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"}, "earmark": {:hex, :earmark, "1.3.2", "b840562ea3d67795ffbb5bd88940b1bed0ed9fa32834915125ea7d02e35888a5", [:mix], [], "hexpm", "e3be2bc3ae67781db529b80aa7e7c49904a988596e2dbff897425b48b3581161"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"}, - "excoveralls": {:hex, :excoveralls, "0.13.3", "edc5f69218f84c2bf61b3609a22ddf1cec0fbf7d1ba79e59f4c16d42ea4347ed", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cc26f48d2f68666380b83d8aafda0fffc65dafcc8d8650358e0b61f6a99b1154"}, - "file_system": {:hex, :file_system, "0.2.9", "545b9c9d502e8bfa71a5315fac2a923bd060fd9acb797fe6595f54b0f975fd32", [:mix], [], "hexpm", "3cf87a377fe1d93043adeec4889feacf594957226b4f19d5897096d6f61345d8"}, - "hackney": {:hex, :hackney, "1.16.0", "5096ac8e823e3a441477b2d187e30dd3fff1a82991a806b2003845ce72ce2d84", [:rebar3], [{:certifi, "2.5.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.0", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3bf0bebbd5d3092a3543b783bf065165fa5d3ad4b899b836810e513064134e18"}, - "idna": {:hex, :idna, "6.0.1", "1d038fb2e7668ce41fbf681d2c45902e52b3cb9e9c77b55334353b222c2ee50c", [:rebar3], [{:unicode_util_compat, "0.5.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a02c8a1c4fd601215bb0b0324c8a6986749f807ce35f25449ec9e69758708122"}, - "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, + "ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"}, + "ex_docker_engine_api": {:hex, :ex_docker_engine_api, "1.43.0", "a0dbcb509732247ab6925f0429a3516c1fe27561f21b29182cfc69d7b32fc516", [:mix], [{:hackney, "~> 1.20", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "278a05f8f1d5f5b5738801cd96287583228fbaf8e6a9aef30176a5b37544f8ba"}, + "excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"}, + "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, + "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "kafka_protocol": {:hex, :kafka_protocol, "2.4.1", "9e89afc740f57d17bec33f0f21dc23e7cbfd8c9ed3b0d6b9fc3a6bd4a827c088", [:rebar, :rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:snappyer, "1.2.6", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "86c9e1c6496273a6d1a02dc0e6ef7479d70441d526970abe667e2cf4deb1df21"}, - "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"}, + "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, + "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, - "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm", "17ef63abde837ad30680ea7f857dd9e7ced9476cdd7b0394432af4bfc241b960"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "snappy": {:git, "https://github.com/fdmanana/snappy-erlang-nif", "e8907ee8e37cfa07d933a070669a88798082c3d7", []}, "snappyer": {:hex, :snappyer, "1.2.6", "34181e3233f68a92044e176fe96e54fee7957acc2be554f0460d799c495166c2", [:rebar3], [], "hexpm", "d538d1e8892af09dc8b2771b2652c9d70f009cd1556246b3e22706df643f47b4"}, - "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"}, - "unicode_util_compat": {:hex, :unicode_util_compat, "0.5.0", "8516502659002cec19e244ebd90d312183064be95025a319a6c7e89f4bccd65b", [:rebar3], [], "hexpm", "d48d002e15f5cc105a696cf2f1bbb3fc72b4b770a184d8420c8db20da2674b38"}, - "varint": {:hex, :varint, "1.2.0", "61bffd9dcc2d5242d59f75694506b4d4013bb103f6a23e34b94f89cebb0c1ab3", [:mix], [], "hexpm", "d94941ed8b9d1a5fdede9103a5e52035bd0aaf35081d44e67713a36799927e47"}, + "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, + "tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"}, + "testcontainers": {:hex, :testcontainers, "1.5.0", "a7c24a67a78dd0bff3e4abe8f15997564ef4007860746362778f759995c70c83", [:mix], [{:ecto, "~> 3.10", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.10", [hex: :ecto_sql, repo: "hexpm", optional: true]}, {:ex_docker_engine_api, "~> 1.43", [hex: :ex_docker_engine_api, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "87f57e8b2878a7cf9f13a52c3c776000ec50569e9bb8d1b9782e8da817a6e736"}, + "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, + "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"}, + "varint": {:hex, :varint, "1.3.0", "e5901ea7fafdbb28cd2c402b5836878fe9480614d1152e8dbbafc44335d723aa", [:mix], [], "hexpm", "f32c332ddf730ea6a9c80e5bda774651dc9e9090552deff99f6e3fc1b4dc6519"}, } diff --git a/test/integration/behaviour/single_broker_test.exs b/test/integration/behaviour/single_broker_test.exs new file mode 100644 index 0000000..05c274c --- /dev/null +++ b/test/integration/behaviour/single_broker_test.exs @@ -0,0 +1,111 @@ +defmodule Kayrock.Integration.Behaviour.SingleBrokerTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + test "for single broker lifecycle", %{kafka: kafka} do + # [WHEN] There is client connected to broker + {:ok, client_pid} = build_client(kafka) + + # [AND] There is a topic created + topic_name = create_topic(client_pid, 0) + + # [WHEN] Client can read from empty topic + fetch_request = fetch_messages_request([[topic: topic_name]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives empty response + [%{partition_responses: [%{record_set: record_set}]}] = resp.responses + assert is_nil(record_set) + + # [WHEN] Client can write to topic + record_set = record_set([{"1", "test-one"}]) + produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5) + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller) + + # [AND] Fetch message from that topic + fetch_request = fetch_messages_request([[topic: topic_name]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives message + [%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses + [record] = record_set.records + assert record.key == "1" + assert record.value == "test-one" + assert record.offset == 0 + + # [WHEN] Client can write multiple messages to topic + record_set = record_set([{"2", "test-two"}, {"3", "test-three"}]) + produce_request = produce_messages_request(topic_name, [[record_set: record_set]], 1, 5) + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request, :controller) + + # [AND] Fetch messages from that topic + fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 1]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives messages + [%{partition_responses: [%{record_set: [record_set]}]}] = resp.responses + [record_one, record_two] = record_set.records + assert record_one.key == "2" + assert record_one.value == "test-two" + assert record_one.offset == 1 + + assert record_two.key == "3" + assert record_two.value == "test-three" + assert record_two.offset == 2 + + # [WHEN] Client is closed + :ok = Kayrock.Client.stop(client_pid) + + # [AND] New client is created + {:ok, client_pid} = build_client(kafka) + + # [AND] Fetch messages from that topic + fetch_request = fetch_messages_request([[topic: topic_name, fetch_offset: 0]], [], 5) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + # [THEN] It receives messages + [%{partition_responses: [%{record_set: [record_set_one, record_set_two]}]}] = resp.responses + [record] = record_set_one.records + assert record.key == "1" + assert record.value == "test-one" + assert record.offset == 0 + + [record_one, record_two] = record_set_two.records + assert record_one.key == "2" + assert record_one.value == "test-two" + assert record_one.offset == 1 + + assert record_two.key == "3" + assert record_two.value == "test-three" + assert record_two.offset == 2 + end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid, api_version) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, api_version) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end + + defp record_set(key_values) do + %Kayrock.RecordBatch{ + records: + Enum.map(key_values, fn {key, value} -> + %Kayrock.RecordBatch.Record{ + key: key, + value: value + } + end) + } + end +end diff --git a/test/integration/compression_test.exs b/test/integration/compression_test.exs new file mode 100644 index 0000000..786c606 --- /dev/null +++ b/test/integration/compression_test.exs @@ -0,0 +1,334 @@ +defmodule Kayrock.Client.CompressionTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + describe "Produce API & Fetch API with compression" do + for {compression, compression_num} <- [{"gzip", 1}, {"snappy", 2}] do + for version <- [0, 1] do + test "v#{version} - produce and reads data using message set with compression: #{compression}", + %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.MessageSet{ + messages: [ + %Kayrock.MessageSet.Message{ + compression: unquote(compression) |> String.to_atom(), + key: "1", + value: "foo", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + compression: unquote(compression) |> String.to_atom(), + key: "1", + value: "bar", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + compression: unquote(compression) |> String.to_atom(), + key: "1", + value: "baz", + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == 0 + + assert message_two.value == "bar" + assert message_two.offset == 1 + + assert message_three.value == "baz" + assert message_three.offset == 2 + end + end + + for version <- [2, 3] do + test "v#{version} - produce and reads data with compression: #{compression}", %{ + kafka: kafka + } do + compression_num = unquote(compression_num) + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + records = [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + attributes: 0 + } + ] + + # [WHEN] Produce message with timestamp + record_set = %Kayrock.RecordBatch{attributes: compression_num, records: records} + + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + + # [THEN] Fetch message from topic + partition_data = [ + [topic: topic_name, partition: 0, fetch_offset: 0, log_start_offset: 0] + ] + + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end + end + + for version <- [4, 5, 6, 7] do + test "v#{version} - produce and reads data using message set with compression: #{compression}", + %{ + kafka: kafka + } do + compression_num = unquote(compression_num) + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.RecordBatch{ + attributes: compression_num, + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set + |> List.first() + |> Map.get(:records) + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + + assert message_three.headers == [ + %Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"} + ] + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = + List.first(response.partition_responses).record_set + |> List.first() + |> Map.get(:records) + + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end + end + end + end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid, api_version) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, api_version) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end +end diff --git a/test/integration/producer_test.exs b/test/integration/producer_test.exs new file mode 100644 index 0000000..8d65558 --- /dev/null +++ b/test/integration/producer_test.exs @@ -0,0 +1,656 @@ +defmodule Kayrock.Integration.ProducerTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + describe "Produce API & Fetch API" do + for version <- [0, 1] do + test "v#{version} - produce and reads data using message set", %{kafka: kafka} do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.MessageSet{ + messages: [ + %Kayrock.MessageSet.Message{ + key: "1", + value: "test", + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "test" + assert message.offset == offset + end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.MessageSet{ + messages: [ + %Kayrock.MessageSet.Message{ + key: "1", + value: "foo", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + key: "1", + value: "bar", + attributes: 0 + }, + %Kayrock.MessageSet.Message{ + key: "1", + value: "baz", + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == offset + + assert message_two.value == "bar" + assert message_two.offset == offset + 1 + + assert message_three.value == "baz" + assert message_three.offset == offset + 2 + end + end + + for version <- [2, 3] do + test "v#{version} - produce and reads data using record batch", %{kafka: kafka} do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "test" + assert message.timestamp == timestamp + end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + records = [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + attributes: 0 + } + ] + + # [WHEN] Produce message with timestamp + record_set = %Kayrock.RecordBatch{records: records} + + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: 0, log_start_offset: 0]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set.messages + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = List.first(response.partition_responses).record_set.messages + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end + end + + for version <- [4, 5, 6, 7] do + test "v#{version} - produce and reads data using record batch", %{kafka: kafka} do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = + List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + + assert message.value == "test" + assert message.timestamp == timestamp + assert message.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + end + + test "v#{version} - produce and reads data using message set with multiple messages", %{ + kafka: kafka + } do + api_version = unquote(version) + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + # [GIVEN] MessageSet with timestamp + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "foo", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "bar", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + }, + %Kayrock.RecordBatch.Record{ + key: "1", + value: "baz", + timestamp: timestamp, + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message_one, message_two, message_three] = + List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + + assert message_one.value == "foo" + assert message_one.offset == 0 + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_two.value == "bar" + assert message_two.offset == 1 + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + assert message_three.value == "baz" + assert message_three.offset == 2 + assert message_three.timestamp == timestamp + assert message_three.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + # [THEN] Produce another message + record = %Kayrock.RecordBatch.Record{ + key: "1", + value: "zab", + timestamp: timestamp, + attributes: 0 + } + + record_set = %Kayrock.RecordBatch{records: [record]} + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 0 + offset = partition_response.base_offset + + # [THEN] Fetch message from topic + partition_data = [[topic: topic_name, partition: 0, fetch_offset: offset]] + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [message] = + List.first(response.partition_responses).record_set |> List.first() |> Map.get(:records) + + assert message.value == "zab" + assert message.offset == 3 + assert message.timestamp == timestamp + end + end + end + + describe "with non existing topic" do + test "it will return error code", %{kafka: kafka} do + api_version = 5 + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = unique_string() + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_message_request = + produce_messages_request(topic_name, [[record_set: record_set]], 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_response] = response.partition_responses + assert partition_response.error_code == 3 + end + end + + describe "with multiple topics and partitions" do + test "with multiple partitions for single topic", %{kafka: kafka} do + api_version = 5 + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set_one = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test-one", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + record_set_two = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "2", + value: "test-two", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_data = [ + [record_set: record_set_one, partition: 0], + [record_set: record_set_two, partition: 1] + ] + + produce_message_request = + produce_messages_request(topic_name, produce_data, 1, api_version) + + {:ok, resp} = Kayrock.client_call(client_pid, produce_message_request, :controller) + [response] = resp.responses + assert response.topic == topic_name + + [partition_one_resp, partition_two_resp] = + response.partition_responses |> Enum.sort_by(& &1.partition) + + assert partition_one_resp.error_code == 0 + assert partition_two_resp.error_code == 0 + + partition_one_offset = partition_one_resp.base_offset + partition_two_offset = partition_two_resp.base_offset + + # [THEN] Fetch message from topic + partition_data = [ + [topic: topic_name, partition: 0, fetch_offset: partition_one_offset], + [topic: topic_name, partition: 1, fetch_offset: partition_two_offset] + ] + + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + [response] = resp.responses + assert response.topic == topic_name + + # [THEN] Verify message data + [record_batch_one, record_batch_two] = + Enum.sort_by(response.partition_responses, & &1.partition_header.partition) + + assert record_batch_one.partition_header.partition == 0 + assert record_batch_two.partition_header.partition == 1 + + [message_one] = record_batch_one.record_set |> List.first() |> Map.get(:records) + assert message_one.value == "test-one" + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + [message_two] = record_batch_two.record_set |> List.first() |> Map.get(:records) + assert message_two.value == "test-two" + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + end + + test "with multiple topics for single partition", %{kafka: kafka} do + api_version = 5 + {:ok, client_pid} = build_client(kafka) + + # Create Topic + topic_name_one = create_topic(client_pid, api_version) + topic_name_two = create_topic(client_pid, api_version) + + # [GIVEN] MessageSet with timestamp + timestamp = DateTime.utc_now() |> DateTime.to_unix(:millisecond) + + record_set_one = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "1", + value: "test-one", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + record_set_two = %Kayrock.RecordBatch{ + records: [ + %Kayrock.RecordBatch.Record{ + key: "2", + value: "test-two", + headers: [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}], + timestamp: timestamp, + attributes: 0 + } + ] + } + + # [WHEN] Produce message with timestamp + produce_request_one = + produce_messages_request(topic_name_one, [[record_set: record_set_one]], 1, api_version) + + produce_request_two = + produce_messages_request(topic_name_two, [[record_set: record_set_two]], 1, api_version) + + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request_one, :controller) + {:ok, _resp} = Kayrock.client_call(client_pid, produce_request_two, :controller) + + # [THEN] Fetch message from topic + partition_data = [ + [topic: topic_name_one, partition: 0, fetch_offset: 0], + [topic: topic_name_two, partition: 0, fetch_offset: 0] + ] + + fetch_request = fetch_messages_request(partition_data, [], api_version) + {:ok, resp} = Kayrock.client_call(client_pid, fetch_request, :controller) + + response_one = Enum.find(resp.responses, &(&1.topic == topic_name_one)) + response_two = Enum.find(resp.responses, &(&1.topic == topic_name_two)) + + # [THEN] Verify message data + [record_batch_one] = response_one.partition_responses + assert record_batch_one.partition_header.partition == 0 + + [message_one] = record_batch_one.record_set |> List.first() |> Map.get(:records) + assert message_one.value == "test-one" + assert message_one.timestamp == timestamp + assert message_one.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + + [record_batch_two] = response_two.partition_responses + assert record_batch_two.partition_header.partition == 0 + + [message_two] = record_batch_two.record_set |> List.first() |> Map.get(:records) + assert message_two.value == "test-two" + assert message_two.timestamp == timestamp + assert message_two.headers == [%Kayrock.RecordBatch.RecordHeader{key: "1", value: "1"}] + end + end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid, api_version) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, api_version) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end +end diff --git a/test/integration/topic_management_test.exs b/test/integration/topic_management_test.exs new file mode 100644 index 0000000..395aa33 --- /dev/null +++ b/test/integration/topic_management_test.exs @@ -0,0 +1,99 @@ +defmodule Kayrock.Integration.TopicManagementTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.Convenience + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + describe "topic management API" do + for version <- [0, 1, 2] do + test "v#{version} - allows to manage topic", %{kafka: kafka} do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + api_version = unquote(version) + {:ok, client_pid} = Kayrock.Client.start_link(uris) + topic_name = unique_string() + + # Get Topics + refute topic_exists?(client_pid, topic_name) + + # Creates Topic + create_request = create_topic_request(topic_name, api_version) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + + # Get Topic + topic = get_topic_metadata(client_pid, topic_name) + assert topic.topic == topic_name + assert length(topic.partition_metadata) == 3 + + # Create Partitions + create_partition_config = create_topic_partition(topic_name, api_version) + {:ok, res} = Kayrock.client_call(client_pid, create_partition_config, :controller) + assert List.first(res.topic_errors).error_code == 0 + + # Get Updated Topic + topic = get_topic_metadata(client_pid, topic_name) + assert length(topic.partition_metadata) == 5 + + # Update Topic Config + alter_config = alter_topic_config(topic_name, api_version) + {:ok, res} = Kayrock.client_call(client_pid, alter_config, :controller) + assert List.first(res.resources).error_code == 0 + + # Get Topic Config + describe_config = describe_config(topic_name, api_version) + {:ok, res} = Kayrock.client_call(client_pid, describe_config, :controller) + resource = List.first(res.resources) + assert resource.error_code == 0 + config = List.first(resource.config_entries) + assert config.config_name == "cleanup.policy" + assert config.config_value == "compact" + + # Deletes Topic + max_version = min(Kayrock.DeleteTopics.max_vsn(), api_version) + {:ok, _} = Kayrock.delete_topics(client_pid, [topic_name], 1000, max_version) + + # Get Topic + refute topic_exists?(client_pid, topic_name) + end + end + end + + defp create_topic_partition(topic_name, api_version) do + api_version = min(Kayrock.CreatePartitions.max_vsn(), api_version) + request = Kayrock.CreatePartitions.get_request_struct(api_version) + partition_config = %{topic: topic_name, new_partitions: %{count: 5, assignment: nil}} + %{request | topic_partitions: [partition_config], timeout: 1000, validate_only: false} + end + + defp alter_topic_config(topic_name, api_version) do + api_version = min(Kayrock.AlterConfigs.max_vsn(), api_version) + request = Kayrock.AlterConfigs.get_request_struct(api_version) + config = %{config_name: "cleanup.policy", config_value: "compact"} + + %{ + request + | resources: [%{resource_type: 2, resource_name: topic_name, config_entries: [config]}], + validate_only: false + } + end + + defp describe_config(topic_name, api_version) do + api_version = min(Kayrock.DescribeConfigs.max_vsn(), api_version) + request = Kayrock.DescribeConfigs.get_request_struct(api_version) + + %{ + request + | resources: [ + %{resource_type: 2, resource_name: topic_name, config_names: ["cleanup.policy"]} + ] + } + end + + def get_topic_metadata(pid, topic) when is_pid(pid) and is_binary(topic) do + {:ok, [topic]} = Kayrock.topics_metadata(pid, [topic]) + topic + end +end diff --git a/test/kayrock/client/create_topic_test.exs b/test/kayrock/client/create_topic_test.exs deleted file mode 100644 index 0fa6e6e..0000000 --- a/test/kayrock/client/create_topic_test.exs +++ /dev/null @@ -1,46 +0,0 @@ -defmodule Kayrock.Client.CreateTopicTest do - # CreateTopics Request (Version: 3) => [create_topic_requests] timeout validate_only - # create_topic_requests => topic num_partitions replication_factor [replica_assignment] [config_entries] - # topic => STRING - # num_partitions => INT32 - # replication_factor => INT16 - # replica_assignment => partition [replicas] - # partition => INT32 - # replicas => INT32 - # config_entries => config_name config_value - # config_name => STRING - # config_value => NULLABLE_STRING - # timeout => INT32 - # validate_only => BOOLEAN - - use Kayrock.ClientCase - - alias Kayrock.ErrorCode - - import Kayrock.Convenience - - test "delete topic - topic doesn't exist", %{client: client} do - topic = unique_topic() - refute topic_exists?(client, topic) - - {:ok, resp} = Kayrock.delete_topics(client, [topic]) - [topic_error_code] = resp.topic_error_codes - assert topic_error_code[:error_code] == ErrorCode.unknown_topic() - end - - test "create a topic - specify num partitions", %{client: client} do - topic = unique_topic() - refute topic_exists?(client, topic) - - {:ok, _} = - Kayrock.create_topics( - client, - [%{topic: topic, num_partitions: 4, replication_factor: 2}], - 1000 - ) - - assert topic_exists?(client, topic) - - {:ok, _} = Kayrock.delete_topics(client, [topic], 1000) - end -end diff --git a/test/kayrock/client/produce_test.exs b/test/kayrock/client/produce_test.exs deleted file mode 100644 index 30c8708..0000000 --- a/test/kayrock/client/produce_test.exs +++ /dev/null @@ -1,443 +0,0 @@ -defmodule Kayrock.Client.ProduceTest do - use Kayrock.ClientCase - - alias Kayrock.RecordBatch - alias Kayrock.RecordBatch.Record - alias Kayrock.RecordBatch.RecordHeader - - test "Simple produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"]) - {:ok, _} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 0, - base_sequence: -1, - batch_length: 79, - batch_offset: first_offset, - crc: -784_342_914, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - - test "gzip produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :gzip) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 1, - base_sequence: -1, - batch_length: 94, - batch_offset: first_offset, - crc: 1_821_682_799, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - - describe "with snappy compression" do - setup do - on_exit(fn -> - Application.put_env(:kayrock, :snappy_module, :snappy) - end) - - :ok - end - - test "using snappyer produce works", %{client: client} do - Application.put_env(:kayrock, :snappy_module, :snappyer) - - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 2, - base_sequence: -1, - batch_length: 101, - batch_offset: first_offset, - crc: 468_182_773, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - - test "using snappy-erlang-nif produce works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "simple_produce") - - record_batch = RecordBatch.from_binary_list(["foo", "bar", "baz"], :snappy) - {:ok, _resp} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 2, - base_sequence: -1, - batch_length: 101, - batch_offset: first_offset, - crc: 468_182_773, - first_timestamp: -1, - last_offset_delta: 2, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset, - timestamp: -1, - value: "foo" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 1, - timestamp: -1, - value: "bar" - }, - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [], - key: nil, - offset: first_offset + 2, - timestamp: -1, - value: "baz" - } - ] - } - ] - } - ], - topic: "simple_produce" - } - ], - throttle_time_ms: 0 - } - end - end - - test "Produce with key, value and headers works", %{client: client} do - {:ok, topic} = ensure_test_topic(client, "full_record_produce") - - headers = [ - %RecordHeader{key: "source", value: "System-X"}, - %RecordHeader{key: "type", value: "HeaderCreatedEvent"} - ] - - records = [%Record{headers: headers, key: "rd-k", value: "record-value-here"}] - - record_batch = %RecordBatch{ - attributes: 0, - records: records - } - - {:ok, _} = Kayrock.produce(client, record_batch, topic, 0) - - offset = Kayrock.Convenience.partition_last_offset(client, topic, 0) - - {:ok, resp} = Kayrock.fetch(client, topic, 0, offset - 1) - - [main_resp] = resp.responses - [partition_resp] = main_resp.partition_responses - - [ - %RecordBatch{ - partition_leader_epoch: partition_leader_epoch, - records: [%Record{offset: first_offset} | _] - } - | _ - ] = partition_resp.record_set - - assert resp == %Kayrock.Fetch.V4.Response{ - correlation_id: 4, - responses: [ - %{ - partition_responses: [ - %{ - partition_header: %{ - aborted_transactions: [], - error_code: 0, - high_watermark: offset, - last_stable_offset: offset, - partition: 0 - }, - record_set: [ - %Kayrock.RecordBatch{ - attributes: 0, - base_sequence: -1, - batch_length: 118, - batch_offset: first_offset, - crc: -1_972_253_040, - first_timestamp: -1, - last_offset_delta: 0, - max_timestamp: -1, - partition_leader_epoch: partition_leader_epoch, - producer_epoch: -1, - producer_id: -1, - records: [ - %Kayrock.RecordBatch.Record{ - attributes: 0, - headers: [ - %Kayrock.RecordBatch.RecordHeader{ - key: "source", - value: "System-X" - }, - %Kayrock.RecordBatch.RecordHeader{ - key: "type", - value: "HeaderCreatedEvent" - } - ], - key: "rd-k", - offset: first_offset, - timestamp: -1, - value: "record-value-here" - } - ] - } - ] - } - ], - topic: "full_record_produce" - } - ], - throttle_time_ms: 0 - } - end -end diff --git a/test/kayrock/message_serde_test.exs b/test/kayrock/message_serde_test.exs index 05ac249..229beaf 100644 --- a/test/kayrock/message_serde_test.exs +++ b/test/kayrock/message_serde_test.exs @@ -173,7 +173,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 1, timestamp: -1, value: "bar" }, @@ -181,7 +181,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 2, timestamp: -1, value: "baz" } @@ -189,14 +189,29 @@ defmodule Kayrock.MessageSerdeTest do } expect = - <<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168, - 31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + <<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91, + 168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, - 0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0, - 0, 0, 1, 6, 98, 97, 122, 0>> + 0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114, + 0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>> got = IO.iodata_to_binary(RecordBatch.serialize(record_batch)) assert got == expect, compare_binaries(got, expect) + + <> = got + assert byte_size(rest) == size + + [got_batch] = RecordBatch.deserialize(rest) + + record_batch = + %{ + record_batch + | batch_length: got_batch.batch_length, + crc: got_batch.crc, + last_offset_delta: 2 + } + + assert got_batch == record_batch end test "using snappyer dependency" do @@ -227,7 +242,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 1, timestamp: -1, value: "bar" }, @@ -235,7 +250,7 @@ defmodule Kayrock.MessageSerdeTest do attributes: 0, headers: [], key: nil, - offset: 0, + offset: 2, timestamp: -1, value: "baz" } @@ -243,14 +258,29 @@ defmodule Kayrock.MessageSerdeTest do } expect = - <<0, 0, 0, 90, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 78, 255, 255, 255, 255, 2, 240, 195, 168, - 31, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + <<0, 0, 0, 93, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 81, 255, 255, 255, 255, 2, 240, 3, 91, + 168, 0, 2, 0, 0, 0, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 0, - 0, 0, 3, 30, 36, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 9, 10, 52, 98, 97, 114, 0, 18, 0, - 0, 0, 1, 6, 98, 97, 122, 0>> + 0, 0, 3, 30, 116, 18, 0, 0, 0, 1, 6, 102, 111, 111, 0, 18, 0, 0, 2, 1, 6, 98, 97, 114, + 0, 18, 0, 0, 4, 1, 6, 98, 97, 122, 0>> got = IO.iodata_to_binary(RecordBatch.serialize(record_batch)) assert got == expect, compare_binaries(got, expect) + + <> = got + assert byte_size(rest) == size + + [got_batch] = RecordBatch.deserialize(rest) + + record_batch = + %{ + record_batch + | batch_length: got_batch.batch_length, + crc: got_batch.crc, + last_offset_delta: 2 + } + + assert got_batch == record_batch end end diff --git a/test/support/integration_case.ex b/test/support/integration_case.ex new file mode 100644 index 0000000..b3b91d7 --- /dev/null +++ b/test/support/integration_case.ex @@ -0,0 +1,52 @@ +defmodule Kayrock.IntegrationCase do + @moduledoc """ + Testcontainer integration case template + """ + use ExUnit.CaseTemplate + + if Code.ensure_compiled?(Testcontainers) do + using do + quote do + @moduletag :integration_v2 + import Testcontainers.ExUnit + + alias Testcontainers.Container + alias Testcontainers.KafkaContainer + end + end + + setup_all do + case Testcontainers.start_link() do + {:ok, _} -> + :ok + + {:error, {:already_started, _}} -> + :ok + end + end + else + defmodule TestcontainersStub do + @moduledoc false + + def container(_name, _config, _opts) do + :ok + end + end + + defmodule KafkaContainerStub do + @moduledoc false + + def new() do + end + end + + using do + quote do + @moduletag :integration_v2 + import TestcontainersStub + + alias Kayrock.IntegrationCase.KafkaContainerStub, as: KafkaContainer + end + end + end +end diff --git a/test/support/request_factory.ex b/test/support/request_factory.ex new file mode 100644 index 0000000..ff66e65 --- /dev/null +++ b/test/support/request_factory.ex @@ -0,0 +1,84 @@ +defmodule Kayrock.RequestFactory do + @moduledoc """ + Documentation for Kayrock.RequestFactory. + """ + + @doc """ + Creates a request to create a topic + Uses min of api_version and max supported version + """ + def create_topic_request(topic_name, api_version) do + api_version = min(Kayrock.CreateTopics.max_vsn(), api_version) + request = Kayrock.CreateTopics.get_request_struct(api_version) + + topic_config = %{ + topic: topic_name, + num_partitions: 3, + replication_factor: 1, + replica_assignment: [], + config_entries: [] + } + + %{request | create_topic_requests: [topic_config], timeout: 1000} + end + + @doc """ + Creates a request to produce a message to a topic + Uses min of api_version and max supported version + """ + def produce_messages_request(topic_name, data, ack, api_version) do + api_version = min(Kayrock.Produce.max_vsn(), api_version) + request = Kayrock.Produce.get_request_struct(api_version) + + topic_data = [ + %{ + topic: topic_name, + data: + Enum.map(data, fn datum -> + %{ + partition: Keyword.get(datum, :partition, 0), + record_set: Keyword.get(datum, :record_set, []) + } + end) + } + ] + + %{request | topic_data: topic_data, acks: ack, timeout: 1000} + end + + @doc """ + Creates a request to fetch messages from a topic + Uses min of api_version and max supported version + """ + def fetch_messages_request(partition_data, opts, api_version) do + api_version = min(Kayrock.Fetch.max_vsn(), api_version) + request = Kayrock.Fetch.get_request_struct(api_version) + max_bytes = Keyword.get(opts, :max_bytes, 1_000_000) + + request_date = %{ + replica_id: -1, + max_wait_time: Keyword.get(opts, :max_wait_time, 1000), + min_bytes: Keyword.get(opts, :min_bytes, 0), + max_bytes: Keyword.get(opts, :max_bytes, max_bytes), + isolation_level: Keyword.get(opts, :isolation_level, 1), + session_id: Keyword.get(opts, :session_id, 0), + epoch: Keyword.get(opts, :epoch, 0), + topics: + Enum.map(partition_data, fn partition -> + %{ + topic: Keyword.fetch!(partition, :topic), + partitions: [ + %{ + partition: Keyword.get(partition, :partition, 0), + fetch_offset: Keyword.get(partition, :fetch_offset, 0), + max_bytes: Keyword.get(partition, :max_bytes, max_bytes), + log_start_offset: Keyword.get(partition, :log_start_offset, 0) + } + ] + } + end) + } + + %{Map.merge(request, request_date) | replica_id: -1} + end +end diff --git a/test/support/test_support.ex b/test/support/test_support.ex index c842f20..7f19ecb 100644 --- a/test/support/test_support.ex +++ b/test/support/test_support.ex @@ -1,5 +1,13 @@ defmodule Kayrock.TestSupport do @moduledoc "Support code for tests" + + @doc """ + Returns a unique string for use in tests. + """ + def unique_string do + "test-topic-#{:erlang.unique_integer([:positive])}" + end + def compare_binaries(lhs, rhs) do bytes_per_chunk = 16 chunks_lhs = chunk_binary(lhs, bytes_per_chunk) diff --git a/test/test_helper.exs b/test/test_helper.exs index 2a45934..b13773e 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,2 +1,2 @@ -ExUnit.configure(exclude: :integration) +ExUnit.configure(exclude: [:integration, :integration_v2]) ExUnit.start()