diff --git a/lib/kayrock/broker_connection.ex b/lib/kayrock/broker_connection.ex index 77393cb..3e3f156 100644 --- a/lib/kayrock/broker_connection.ex +++ b/lib/kayrock/broker_connection.ex @@ -31,7 +31,7 @@ defmodule Kayrock.BrokerConnection do def send(conn, data), do: Connection.call(conn, {:send, data}) - def recv(conn, timeout \\ 3000) do + def recv(conn, timeout \\ 5000) do Connection.call(conn, {:recv, timeout}) end diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs new file mode 100644 index 0000000..3020f7f --- /dev/null +++ b/test/integration/consumer_group_test.exs @@ -0,0 +1,135 @@ +defmodule Kayrock.Integration.ConsumerGroupTest do + use Kayrock.IntegrationCase + use ExUnit.Case, async: true + + import Kayrock.TestSupport + import Kayrock.RequestFactory + + container(:kafka, KafkaContainer.new(), shared: true) + + describe "Consumer Group API" do + for api_version <- [0, 1, 2] do + test "v#{api_version} - allows to manage consumer groups", %{kafka: kafka} do + api_version = unquote(api_version) + group_name = unique_string() + + {:ok, client_pid} = build_client(kafka) + topic_name = create_topic(client_pid, api_version) + + # [WHEN] No consumer groups exist + # [THEN] List consumer groups returns empty list + list_request = list_consumer_groups_request(api_version) + {:ok, list_response} = Kayrock.client_call(client_pid, list_request, :controller) + + matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name)) + assert list_response.error_code == 0 + assert matching_groups == [] + + # [WHEN] We try to find a coordinator for a consumer group + coordinator_request = find_coordinator_request(group_name, api_version) + + {:ok, coordinator_response} = + with_retry(fn -> + Kayrock.client_call(client_pid, coordinator_request, 1) + end) + + # [THEN] We get a valid coordinator node + assert coordinator_response.error_code == 0 + assert coordinator_response.coordinator.node_id > 0 + node_id = coordinator_response.coordinator.node_id + + # [WHEN] We join a group + member_data = %{group_id: group_name, topics: [topic_name]} + join_request = join_group_request(member_data, api_version) + + {:ok, join_response} = + with_retry(fn -> + Kayrock.client_call(client_pid, join_request, node_id) + end) + + assert join_response.error_code == 0 + assert join_response.members != [] + member_ids = Enum.map(join_response.members, & &1.member_id) + assert Enum.member?(member_ids, join_response.member_id) + + # [THEN] We can list the consumer group + list_request = list_consumer_groups_request(api_version) + {:ok, list_response} = Kayrock.client_call(client_pid, list_request, node_id) + + matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name)) + assert list_response.error_code == 0 + assert matching_groups == [%{group_id: group_name, protocol_type: "consumer"}] + + # [WHEN] We sync the group + assignments = [ + %{ + member_id: join_response.member_id, + topic: topic_name, + partitions: [0, 1, 2] + } + ] + + sync_request = + sync_group_request(group_name, join_response.member_id, assignments, api_version) + + {:ok, sync_response} = Kayrock.client_call(client_pid, sync_request, node_id) + assert sync_response.error_code == 0 + + # [THEN] We can describe the consumer group + describe_request = describe_groups_request([group_name], api_version) + {:ok, describe_response} = Kayrock.client_call(client_pid, describe_request, node_id) + + [group_info] = describe_response.groups + assert group_info.error_code == 0 + assert group_info.group_id == group_name + assert group_info.protocol_type == "consumer" + [member] = group_info.members + assert member.member_id == join_response.member_id + + # [WHEN] We leave the group + leave_group_request = + leave_group_request(group_name, join_response.member_id, api_version) + + {:ok, leave_group_response} = + Kayrock.client_call(client_pid, leave_group_request, node_id) + + assert leave_group_response.error_code == 0 + + # [THEN] We can don't find member in the group + describe_request = describe_groups_request([group_name], api_version) + {:ok, describe_response} = Kayrock.client_call(client_pid, describe_request, node_id) + + [group_info] = describe_response.groups + assert group_info.error_code == 0 + assert group_info.group_id == group_name + assert group_info.protocol_type == "consumer" + assert group_info.members == [] + + # [WHEN] We delete consumer group + delete_request = delete_groups_request([group_name], api_version) + {:ok, delete_response} = Kayrock.client_call(client_pid, delete_request, node_id) + + assert delete_response.group_error_codes == [%{group_id: group_name, error_code: 0}] + + # [THEN] We can't find the group + list_request = list_consumer_groups_request(api_version) + {:ok, list_response} = Kayrock.client_call(client_pid, list_request, node_id) + + matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name)) + assert matching_groups == [] + end + end + end + + defp build_client(kafka) do + uris = [{"localhost", Container.mapped_port(kafka, 9092)}] + Kayrock.Client.start_link(uris) + end + + defp create_topic(client_pid, api_version) do + topic_name = unique_string() + create_request = create_topic_request(topic_name, api_version) + {:ok, _} = Kayrock.client_call(client_pid, create_request, :controller) + topic_name + end +end diff --git a/test/support/request_factory.ex b/test/support/request_factory.ex index ff66e65..ed9578b 100644 --- a/test/support/request_factory.ex +++ b/test/support/request_factory.ex @@ -81,4 +81,126 @@ defmodule Kayrock.RequestFactory do %{Map.merge(request, request_date) | replica_id: -1} end + + @doc """ + Creates a request to join a consumer group + Uses min of api_version and max supported version + """ + def list_consumer_groups_request(api_version) do + api_version = min(Kayrock.ListGroups.max_vsn(), api_version) + Kayrock.ListGroups.get_request_struct(api_version) + end + + @doc """ + Create a request to find coordinator for a consumer group + Uses min of api_version and max supported version + """ + def find_coordinator_request(group_id, api_version) do + api_version = min(Kayrock.FindCoordinator.max_vsn(), api_version) + request = Kayrock.FindCoordinator.get_request_struct(api_version) + coordinator_key(request, api_version, group_id) + end + + defp coordinator_key(request, 0, group_id), do: %{request | group_id: group_id} + + defp coordinator_key(request, _, group_id) do + %{request | coordinator_key: group_id, coordinator_type: 0} + end + + @doc """ + Creates a request to join a consumer group + Uses min of api_version and max supported version + """ + def join_group_request(member_data, api_version) do + api_version = min(Kayrock.JoinGroup.max_vsn(), api_version) + request = Kayrock.JoinGroup.get_request_struct(api_version) + topics = Map.fetch!(member_data, :topics) + + %{ + request + | group_id: Map.fetch!(member_data, :group_id), + session_timeout: Map.get(member_data, :session_timeout, 10000), + member_id: Map.get(member_data, :member_id, ""), + protocol_type: "consumer", + group_protocols: [ + %{ + protocol_metadata: %Kayrock.GroupProtocolMetadata{topics: topics}, + protocol_name: Map.get(member_data, :protocol_name, "assign") + } + ] + } + |> add_rebalance_timeout(api_version, member_data) + end + + defp add_rebalance_timeout(request, 0, _), do: request + + defp add_rebalance_timeout(request, _, member_data) do + %{ + request + | rebalance_timeout: Map.get(member_data, :rebalance_timeout, 30000) + } + end + + @doc """ + Creates a request to sync a consumer group + Uses min of api_version and max supported version + """ + def sync_group_request(group_id, member_id, assignments, api_version) do + api_version = min(Kayrock.SyncGroup.max_vsn(), api_version) + request = Kayrock.SyncGroup.get_request_struct(api_version) + + %{ + request + | group_id: group_id, + member_id: member_id, + generation_id: 1, + group_assignment: build_assignments(assignments) + } + end + + defp build_assignments(assignments) do + Enum.map(assignments, fn assignment -> + %{ + member_id: Map.fetch!(assignment, :member_id), + member_assignment: %Kayrock.MemberAssignment{ + partition_assignments: [ + %Kayrock.MemberAssignment.PartitionAssignment{ + topic: Map.fetch!(assignment, :topic), + partitions: Map.fetch!(assignment, :partitions) + } + ] + } + } + end) + end + + @doc """ + Creates a request to describe a consumer groups + Uses min of api_version and max supported version + """ + def describe_groups_request(group_ids, api_version) do + api_version = min(Kayrock.DescribeGroups.max_vsn(), api_version) + request = Kayrock.DescribeGroups.get_request_struct(api_version) + %{request | group_ids: group_ids} + end + + @doc """ + Creates a request to leave a consumer group + Uses min of api_version and max supported version + """ + def leave_group_request(group_id, member_id, api_version) do + api_version = min(Kayrock.LeaveGroup.max_vsn(), api_version) + request = Kayrock.LeaveGroup.get_request_struct(api_version) + %{request | group_id: group_id, member_id: member_id} + end + + @doc """ + Creates a request to delete a group + Uses min of api_version and max supported version + """ + def delete_groups_request(group_ids, api_version) do + api_version = min(Kayrock.DeleteGroups.max_vsn(), api_version) + request = Kayrock.DeleteGroups.get_request_struct(api_version) + %{request | groups: group_ids} + end end diff --git a/test/support/test_support.ex b/test/support/test_support.ex index 7f19ecb..e7d4fa6 100644 --- a/test/support/test_support.ex +++ b/test/support/test_support.ex @@ -65,4 +65,22 @@ defmodule Kayrock.TestSupport do defp pad_list(l, n, pad_with) do l ++ List.duplicate(pad_with, n - length(l)) end + + @doc """ + Calls the given function up to 3 times, sleeping 1 second between each call. + """ + def with_retry(fun), do: do_with_retry(3, fun, nil) + + defp do_with_retry(0, _fun, result), do: result + + defp do_with_retry(n, fun, _result) do + case fun.() do + {:ok, response = %{error_code: 0}} -> + {:ok, response} + + result -> + :timer.sleep(1000) + do_with_retry(n - 1, fun, result) + end + end end