-
Notifications
You must be signed in to change notification settings - Fork 162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Attempt of better connection error handling / reconnect #347
Conversation
Hello, @ukrbublik! This is your first Pull Request that will be reviewed by Ebert, an automatic Code Review service. It will leave comments on this diff with potential issues and style violations found in the code as you push new commits. You can also see all the issues found on this Pull Request on its review page. Please check our documentation for more information. |
- When kafka connection is lost, handle relative errors gracefully. - Retry to recreate supervision tree with timeout. Added config sleep_for_reconnect, default 400. - Also fixed termination of ConsumerGroup.Manager to guaranteed stop worker.
a342e3f
to
a746114
Compare
@@ -79,6 +79,11 @@ defmodule KafkaEx.ConsumerGroup.Heartbeat do | |||
%HeartbeatResponse{error_code: error_code} -> | |||
Logger.warn("Heartbeat failed, got error code #{error_code}") | |||
{:stop, {:shutdown, {:error, error_code}}, state} | |||
|
|||
{:error, reason} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not possible for this value to be returned from KafkaEx.heartbeat/2
according to dialyzer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sure it's possible, I've checked this on live application.
KafkaEx.heartbeat is called here -
case KafkaEx.heartbeat(heartbeat_request, worker_name: worker_name) do |
->
KafkaEx.heartbeat
Line 146 in a746114
Server.call(worker_name, {:heartbeat, request, timeout}, opts) |
->
KafkaEx.Server.handle_call({:heartbeat... -
kafka_ex/lib/kafka_ex/server.ex
Line 336 in a746114
kafka_server_heartbeat(request, network_timeout, state) |
->
KafkaEx.Server0P9P0.kafka_server_heartbeat -
kafka_ex/lib/kafka_ex/server_0_p_9_p_0.ex
Line 186 in a746114
consumer_group_sync_request( |
->
KafkaEx.Server0P9P0.consumer_group_sync_request -
kafka_ex/lib/kafka_ex/server_0_p_9_p_0.ex
Line 228 in a746114
{{:error, reason}, state_out} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, that's unfortunate. I suspect we need to fix the @spec
for KafkaEx.heartbeat then, so that dialyzer will pass.
%{error_code: error_code} -> | ||
raise "Error joining consumer group #{group_name}: " <> | ||
"#{inspect(error_code)}" | ||
{:error, reason} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here - dialyzer says that join_response can never be {:error, _}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also checked on live application - exception can be raised from line 268
"Received error #{inspect(error_code)}, " <> | ||
"consumer group manager will exit regardless." | ||
end) | ||
{:error, reason} -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here, Dialyzer catches this as not possible
@joshuawscott I've updated specs, Dialyzer pass. |
0cb335d
to
aa20483
Compare
aa20483
to
ec9299b
Compare
Ebert has finished reviewing this Pull Request and has found:
But beware that this branch is 1 commit behind the You can see more details about this review at https://ebertapp.io/github/kafkaex/kafka_ex/pulls/347. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. Thanks again!
Attempt of better connection error handling / reconnect
Related to issues #298, #190
When kafka connection is lost, handle relative connection errors gracefully.
Currently supervisor tree will try to recreate itself without timeout which leads to quick exhaustion of max restarts and complete shutdown of application :kafka_ex.
I added sleep (config
sleep_for_reconnect
, default 400 ms) in server'sinit
at places that throws errors without connection to kafka.Tested with
docker-compose down && docker-compose up
.Application :kafka_ex will not crash but contantly (with accordingly configured
max_restarts/max_seconds/sleep_for_reconnect
, 3/1/500 for example) tries to reconnect.Or if configured with
max_restarts/max_seconds/sleep_for_reconnect
, 20/10/400 for example will try to reconnect t Kafka in ~10s and then shutdown.Tests pass.