Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt of better connection error handling / reconnect #347

Merged
merged 4 commits into from
Jun 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ config :kafka_ex,
# Threshold number of messages consumed for GenConsumer to commit offsets
# to the broker.
commit_threshold: 100,
# Interval in milliseconds to wait before reconnect to kafka
sleep_for_reconnect: 400,
# This is the flag that enables use of ssl
use_ssl: true,
# see SSL OPTION DESCRIPTIONS - CLIENT SIDE at http://erlang.org/doc/man/ssl.html
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/consumer_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ defmodule KafkaEx.ConsumerGroup do
)
]

supervise(children, strategy: :one_for_all)
supervise(children, strategy: :one_for_all, max_restarts: 0, max_seconds: 1)
end

defp call_manager(supervisor_pid, call) do
Expand Down
5 changes: 5 additions & 0 deletions lib/kafka_ex/consumer_group/heartbeat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ defmodule KafkaEx.ConsumerGroup.Heartbeat do
%HeartbeatResponse{error_code: error_code} ->
Logger.warn("Heartbeat failed, got error code #{error_code}")
{:stop, {:shutdown, {:error, error_code}}, state}

{:error, reason} ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible for this value to be returned from KafkaEx.heartbeat/2 according to dialyzer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure it's possible, I've checked this on live application.

KafkaEx.heartbeat is called here -

case KafkaEx.heartbeat(heartbeat_request, worker_name: worker_name) do

->
KafkaEx.heartbeat
Server.call(worker_name, {:heartbeat, request, timeout}, opts)

->
KafkaEx.Server.handle_call({:heartbeat... -
kafka_server_heartbeat(request, network_timeout, state)

->
KafkaEx.Server0P9P0.kafka_server_heartbeat -
consumer_group_sync_request(

->
KafkaEx.Server0P9P0.consumer_group_sync_request -
{{:error, reason}, state_out}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, that's unfortunate. I suspect we need to fix the @spec for KafkaEx.heartbeat then, so that dialyzer will pass.

Logger.warn("Heartbeat failed, got error reason #{inspect reason}")
{:stop, {:shutdown, {:error, reason}}, state}

end
end
end
49 changes: 37 additions & 12 deletions lib/kafka_ex/consumer_group/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,30 @@ defmodule KafkaEx.ConsumerGroup.Manager do
{:noreply, state}
end

# If the heartbeat gets an unrecoverable error.
def handle_info(
{:EXIT, _heartbeat_timer, {:shutdown, {:error, reason}}},
%State{} = state
) do
{:stop, {:shutdown, {:error, reason}}, state}
end

# When terminating, inform the group coordinator that this member is leaving
# the group so that the group can rebalance without waiting for a session
# timeout.
def terminate(_reason, %State{generation_id: nil, member_id: nil}), do: :ok
def terminate(_reason, %State{generation_id: nil, member_id: nil} = state) do
Process.unlink(state.worker_name)
KafkaEx.stop_worker(state.worker_name)
end

def terminate(_reason, %State{} = state) do
{:ok, _state} = leave(state)
Process.unlink(state.worker_name)
KafkaEx.stop_worker(state.worker_name)

# should be at end because of race condition (stop heartbeat while it is shutting down)
# if race condition happens, worker will be abandoned
stop_heartbeat_timer(state)
end

### Helpers
Expand Down Expand Up @@ -244,9 +259,14 @@ defmodule KafkaEx.ConsumerGroup.Manager do

# crash the worker if we recieve an error, but do it with a meaningful
# error message
if join_response.error_code != :no_error do
raise "Error joining consumer group #{group_name}: " <>
"#{inspect(join_response.error_code)}"
case join_response do
%{error_code: :no_error} -> :ok
%{error_code: error_code} ->
raise "Error joining consumer group #{group_name}: " <>
"#{inspect(error_code)}"
{:error, reason} ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - dialyzer says that join_response can never be {:error, _}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also checked on live application - exception can be raised from line 268

raise "Error joining consumer group #{group_name}: " <>
"#{inspect(reason)}"
end

Logger.debug(fn -> "Joined consumer group #{group_name}" end)
Expand Down Expand Up @@ -331,7 +351,6 @@ defmodule KafkaEx.ConsumerGroup.Manager do
member_id: member_id
} = state
) do
stop_heartbeat_timer(state)

leave_request = %LeaveGroupRequest{
group_name: group_name,
Expand All @@ -341,13 +360,19 @@ defmodule KafkaEx.ConsumerGroup.Manager do
leave_group_response =
KafkaEx.leave_group(leave_request, worker_name: worker_name)

if leave_group_response.error_code == :no_error do
Logger.debug(fn -> "Left consumer group #{group_name}" end)
else
Logger.warn(fn ->
"Received error #{inspect(leave_group_response.error_code)}, " <>
"consumer group manager will exit regardless."
end)
case leave_group_response do
%{error_code: :no_error} ->
Logger.debug(fn -> "Left consumer group #{group_name}" end)
%{error_code: error_code} ->
Logger.warn(fn ->
"Received error #{inspect(error_code)}, " <>
"consumer group manager will exit regardless."
end)
{:error, reason} ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here, Dialyzer catches this as not possible

Logger.warn(fn ->
"Received error #{inspect(reason)}, " <>
"consumer group manager will exit regardless."
end)
end

{:ok, state}
Expand Down
42 changes: 26 additions & 16 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -627,9 +627,12 @@ defmodule KafkaEx.GenConsumer do
end

def handle_info(:timeout, %State{} = state) do
new_state = consume(state)

{:noreply, new_state, 0}
case consume(state) do
{:error, reason} ->
{:stop, reason, state}
new_state ->
{:noreply, new_state, 0}
end
end

def handle_info(
Expand Down Expand Up @@ -668,20 +671,23 @@ defmodule KafkaEx.GenConsumer do
fetch_options: fetch_options
} = state
) do
[
%FetchResponse{
topic: ^topic,
partitions: [
response = %{error_code: error_code, partition: ^partition}
]
}
] =
KafkaEx.fetch(
topic,
partition,
Keyword.merge(fetch_options, offset: offset)
)
response = KafkaEx.fetch(
topic,
partition,
Keyword.merge(fetch_options, offset: offset)
)
response
|> handle_fetch_response(state)
end

defp handle_fetch_response([
%FetchResponse{
topic: _topic,
partitions: [
response = %{error_code: error_code, partition: _partition}
]
}
], state) do
state =
case error_code do
:offset_out_of_range ->
Expand All @@ -700,6 +706,10 @@ defmodule KafkaEx.GenConsumer do
end
end

defp handle_fetch_response(error, _state) do
{:error, error}
end

defp handle_message_set(
message_set,
%State{
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka_ex/network_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ defmodule KafkaEx.NetworkClient do
response
end

def send_sync_request(nil, _, _) do
{:error, :no_broker}
end

@spec format_host(binary) :: [char] | :inet.ip_address()
def format_host(host) do
case Regex.scan(~r/^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})$/, host) do
Expand Down
4 changes: 4 additions & 0 deletions lib/kafka_ex/protocol/api_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ defmodule KafkaEx.Protocol.ApiVersions do
}
end

def parse_response(nil, _this_api_version) do
%Response{error_code: :no_response}
end

defp parse_rest_of_response(api_versions_count, data, this_api_version) do
{api_versions, remaining_data} =
Protocol.Common.read_array(
Expand Down
6 changes: 6 additions & 0 deletions lib/kafka_ex/protocol/consumer_metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,10 @@ defmodule KafkaEx.Protocol.ConsumerMetadata do
error_code: Protocol.error(error_code)
}
end

def parse_response(nil) do
%Response{
error_code: :no_response
}
end
end
2 changes: 1 addition & 1 deletion lib/kafka_ex/protocol/heartbeat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule KafkaEx.Protocol.Heartbeat do
# We could just return the error code instead of having the struct, but this
# keeps the code normalized
defstruct error_code: nil
@type t :: %Response{error_code: atom | integer}
@type t :: %Response{error_code: atom | integer} | {:error, atom}
end

@spec create_request(integer, binary, Request.t()) :: binary
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/protocol/join_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule KafkaEx.Protocol.JoinGroup do
leader_id: binary,
member_id: binary,
members: [binary]
}
} | {:error, atom}

def leader?(%__MODULE__{member_id: member_id, leader_id: leader_id}) do
member_id == leader_id
Expand Down
2 changes: 1 addition & 1 deletion lib/kafka_ex/protocol/leave_group.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule KafkaEx.Protocol.LeaveGroup do

@type t :: %Response{
error_code: atom | integer
}
} | {:error, atom}
end

@spec create_request(integer, binary, Request.t()) :: binary
Expand Down
22 changes: 21 additions & 1 deletion lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -754,12 +754,18 @@ defmodule KafkaEx.Server do
connect_broker(host, port, ssl_options, use_ssl)
end

{correlation_id, metadata} =
check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
retrieve_metadata(
brokers,
0,
config_sync_timeout()
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand All @@ -783,6 +789,20 @@ defmodule KafkaEx.Server do
state
end

defp sleep_for_reconnect() do
Process.sleep(Application.get_env(:kafka_ex, :sleep_for_reconnect, 400))
end

defp check_brokers_sockets!(brokers) do
any_socket_opened = brokers
|> Enum.map(fn %Broker{socket: socket} -> !is_nil(socket) end)
|> Enum.reduce(&(&1 || &2))
if !any_socket_opened do
sleep_for_reconnect()
raise "Brokers sockets are not opened"
end
end

defp connect_broker(host, port, ssl_opts, use_ssl) do
%Broker{
host: host,
Expand Down
21 changes: 16 additions & 5 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,33 @@ defmodule KafkaEx.Server0P10AndLater do
}
end)

check_brokers_sockets!(brokers)

{_,
%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: :no_error
}, state} = kafka_api_versions(%State{brokers: brokers})
%KafkaEx.Protocol.ApiVersions.Response{
api_versions: api_versions,
error_code: error_code
}, state} = kafka_api_versions(%State{brokers: brokers})
if error_code == :no_response do
sleep_for_reconnect()
raise "Brokers sockets are closed"
end
:no_error = error_code

api_versions = KafkaEx.ApiVersions.api_versions_map(api_versions)

{correlation_id, metadata} =
{correlation_id, metadata} = try do
retrieve_metadata(
brokers,
state.correlation_id,
config_sync_timeout(),
[],
api_versions
)
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down
8 changes: 7 additions & 1 deletion lib/kafka_ex/server_0_p_8_p_2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,14 @@ defmodule KafkaEx.Server0P8P2 do
}
end)

{correlation_id, metadata} =
check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down
8 changes: 7 additions & 1 deletion lib/kafka_ex/server_0_p_9_p_0.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,14 @@ defmodule KafkaEx.Server0P9P0 do
}
end)

{correlation_id, metadata} =
check_brokers_sockets!(brokers)

{correlation_id, metadata} = try do
retrieve_metadata(brokers, 0, config_sync_timeout())
rescue e ->
sleep_for_reconnect()
Kernel.reraise(e, System.stacktrace())
end

state = %State{
metadata: metadata,
Expand Down