Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle "Could not connect to broker"? #298

Open
IanVaughan opened this issue Jul 18, 2018 · 21 comments
Open

How to handle "Could not connect to broker"? #298

IanVaughan opened this issue Jul 18, 2018 · 21 comments

Comments

@IanVaughan
Copy link
Contributor

If the Kafka broker is not present/found, then the KafkaEx.ConsumerGroup crashes, which brings down the whole app.

10:46:06.838 [error] Could not connect to broker "localhost":9092 because of error {:error, :econnrefused}

10:46:06.852 [error] Unable to fetch metadata from any brokers.  Timeout is 3000.
10:46:06.857 [error] CRASH REPORT Process <0.1003.0> with 0 neighbours exited with reason: #{'__exception__' => true,'__struct__' => 'Elixir.RuntimeError',message => <<"Unable to fetch metadata from any brokers.  Timeout is 3000.">>} in 'Elixir.KafkaEx.Server0P9P0':retrieve_metadata/6 line 393 in gen_server:init_it/6 line 352
10:46:06.857 [error] CRASH REPORT Process <0.1002.0> with 0 neighbours exited with reason: no match of right hand value {error,{#{'__exception__' => true,'__struct__' => 'Elixir.RuntimeError',message => <<"Unable to fetch metadata from any brokers.  Timeout is 3000.">>},[{'Elixir.KafkaEx.Server0P9P0',retrieve_metadata,6,[{file,"lib/kafka_ex/server.ex"},{line,393}]},{'Elixir.KafkaEx.Server0P9P0',kafka_server_init,1,[{file,"lib/kafka_ex/server_0_p_9_p_0.ex"},{line,67}]},{gen_server,init_it,6,[{file,"gen_server.erl"},{line,328}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}} in 'Elixir.KafkaEx.ConsumerGroup.Manager':init/1 line 97 in gen_server:init_it/6 line 352
10:46:06.858 [error] Supervisor 'Elixir.Main.ConsumerGroup.TrackingLocation' had child 'Elixir.KafkaEx.ConsumerGroup.Manager' started with 'Elixir.KafkaEx.ConsumerGroup.Manager':start_link('Elixir.Main.Consumers.TrackingLocation', <<"tracking_locations">>, [<<"com.main.tracking_locations">>], [{supervisor_pid,<0.1001.0>},{heartbeat_interval,1000},{commit_interval,1000},{gen_server_opts,[...]}]) at undefined exit with reason no match of right hand value {error,{#{'__exception__' => true,'__struct__' => 'Elixir.RuntimeError',message => <<"Unable to fetch metadata from any brokers.  Timeout is 3000.">>},[{'Elixir.KafkaEx.Server0P9P0',retrieve_metadata,6,[{file,"lib/kafka_ex/server.ex"},{line,393}]},{'Elixir.KafkaEx.Server0P9P0',kafka_server_init,1,[{file,"lib/kafka_ex/server_0_p_9_p_0.ex"},{line,67}]},{gen_server,init_it,6,[{file,"gen_server.erl"},{line,328}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,247}]}]}} in 'Elixir.KafkaEx.ConsumerGroup.Manager':init/1 line 97 in context start_error
** (EXIT from #PID<0.998.0>) shutdown: failed to start child: KafkaEx.ConsumerGroup.Manager
    ** (EXIT) an exception was raised:
        ** (MatchError) no match of right hand side value: {:error, {%RuntimeError{message: "Unable to fetch metadata from any brokers.  Timeout is 3000."}, [{KafkaEx.Server0P9P0, :retrieve_metadata, 6, [file: 'lib/kafka_ex/server.ex', line: 393]}, {KafkaEx.Server0P9P0, :kafka_server_init, 1, [file: 'lib/kafka_ex/server_0_p_9_p_0.ex', line: 67]}, {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 328]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
            (kafka_ex) lib/kafka_ex/consumer_group/manager.ex:97: KafkaEx.ConsumerGroup.Manager.init/1
            (stdlib) gen_server.erl:328: :gen_server.init_it/6
            (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3

The error comes from the result from create_worker/2

{:ok, worker_name} = KafkaEx.create_worker(

Which returns

{:error,
 {%RuntimeError{message: "Unable to fetch metadata from any brokers.  Timeout is 3000."},
  [{KafkaEx.Server0P9P0, :retrieve_metadata, 6,
    [file: 'lib/kafka_ex/server.ex', line: 393]},
   {KafkaEx.Server0P9P0, :kafka_server_init, 1,
    [file: 'lib/kafka_ex/server_0_p_9_p_0.ex', line: 67]},
   {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 328]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
  1. Should the library handle this better?
    1. could add a case on the start_worker/2 line to check the :error return? (If so I am happy to have a go at submitting that)
  2. Or do I need to find a way of handling this myself?
    1. I am thinking of adding a process that rescues the response

I am using master from KafkaEx, and am starting the KafkaEx.ConsumerGroup as part of my main Application Supervisor.start_link :

      supervisor(
        KafkaEx.ConsumerGroup,
         [Main.Consumers.TrackingLocation, "tracking_locations",
          ["com.main.tracking_locations"],
          [heartbeat_interval: 1000, commit_interval: 1000,
          name: Main.ConsumerGroup.TrackingLocation,
          gen_server_opts: [name: Main.ConsumerGroup.TrackingLocation.Manager]]]
@IanVaughan
Copy link
Contributor Author

The problem is also hard to handle when implementing my own custom consumer :

iex(3)> KafkaEx.create_worker(:tl, [consumer_group: "tracking_locations"])
12:49:28.579 [error] Could not connect to broker "localhost":9092 because of error {:error, :econnrefused}
12:49:28.603 [error] Unable to fetch metadata from any brokers.  Timeout is 3000.
12:49:28.611 [error] CRASH REPORT Process <0.838.0> with 0 neighbours exited with reason: #{'__exception__' => true,'__struct__' => 'Elixir.RuntimeError',message => <<"Unable to fetch metadata from any brokers.  Timeout is 3000.">>} in 'Elixir.KafkaEx.Server0P9P0':retrieve_metadata/6 line 394 in gen_server:init_it/6 line 352

As I cannot try/catch the create_worker, I guess because its a separate process via:

Supervisor.start_child(KafkaEx.Supervisor, [worker_init, name])

It all stems from this raise, which I guess is required to be a raise due to being within the init callstack?

raise "Unable to fetch metadata from any brokers. Timeout is #{sync_timeout}."

@joshuawscott
Copy link
Member

I've wrestled with how to solve this a few times.

  1. Should the library handle this better?

I believe the library handles this appropriately because catching the error and not crashing would introduce problems with applications that rely on kafka for their basic function. Put another way: how would kafka_ex tell the application that it lost the kafka connection?

As for how to handle this to prevent crashes, are you seeing this on application startup, or if the brokers all go away while the application is already started?

@IanVaughan
Copy link
Contributor Author

how would kafka_ex tell the application that it lost the kafka connection?

That is a good question, my original question is really only aimed at application startup, and the handling of errors connecting initially.
I guess maybe there could be callbacks, eg on_connection_lost etc.

are you seeing this on application startup

Initially yes, so I dug in and thought I'd wrap the KafkaEx.ConsumerGroup setup within my own supervisor so I can handle connection problems and reconnecting etc.

But as you can see from above, the KafkaEx.create_worker/2 blows up with CRASH REPORT when the raise is hit in the Server code.

It could well be my lack of knowledge at handling this, but I could not find a way of either

  1. configuring my supervisor and/or KafkaEx.ConsumerGroup supervisor that does not cause my main application to crash, or
  2. handling the raise generated from KafkaEx.create_worker/2 in a way that allows me retry/handle connecting issues gracefully.

are you seeing this on application startup, or if the brokers all go away while the application is already started?

This is currently a startup issue, I've not tested if the brokers disappear, but I am less concerned about that use case as if they go, we have bigger issues!

@joshuawscott
Copy link
Member

I suspect that we'd fix this by maybe returning {:stop, reason} instead of raiseing. I would have to dig into it to figure out how to do that. PRs are of course welcome if you see a way to avoid the startup crash, or optimally to make it rescueable.

@IanVaughan
Copy link
Contributor Author

optimally to make it rescueable

I did look at this, but to my current knowledge I belive its hard to do, as the raise is within the Supervisor callstack and not something we can rescue from, AFICT.

maybe returning {:stop, reason} instead of raiseing

To me, that sounds really good, I was almost going in that direction anyway, so will have a play and see if what I can do looks any good.

Thanks for your help and advice.

@IanVaughan
Copy link
Contributor Author

Thinking out loud on here, and having had a quick play, it may be more involved than I first thought.
As returning {:stop, reason} from init from a child of a supervisor, will cause the whole supervision tree to bail.

If the start function of any of the child processes fails or returns an error tuple or an erroneous value, the supervisor first terminates with reason :shutdown all the child processes that have already been started, and then terminates itself and returns {:error, {:shutdown, reason}}.

https://hexdocs.pm/elixir/Supervisor.html#start_link/2

I think the only/correct way to solve is to separate supervisor/server init phases from kafka connecting phases, such that once the supervision tree(s) are setup, and then the genservers, they cast/2 a message to "connect", which can then fail and invoke the relevant user callbacks, if defined.

I'm not sure on your thoughts on that, but its more than a "quick" fix, but I am willing to have a play if you believe it might be a good avenue to explore?

@joshuawscott
Copy link
Member

@IanVaughan We're always open to pull requests. I think being able to handle this sort of thing gracefully would be a good idea if you're open to doing the pull request.

@IanVaughan
Copy link
Contributor Author

@joshrotenberg thats good news, as I am 50% way done with a bit of a reorg.

A brief overview is that I've split the GonsumerGroup.Manager boot sequence out of the init/1, into 3 phases, which get called in sequence when each is successful, but retry if not.

More details to follow, I will push an MR when its looking ready, and welcome any advice.

@amorphid
Copy link
Contributor

@IanVaughan @joshuawscott I've just discovered the joy of my app blowing up when there are no Kafka brokers available because of this raise. It'd be awesome to have an option to avoid that pain. To be specific, I'd love to see an error message get logged instead having an exception get raised.

Here's an example of the behavior I'd love to see:

defmodule NeverConnecter do
  use GenServer
  require Logger

  def start_link(_) do
    GenServer.start_link(__MODULE__, [])
  end

  def init(_) do
    data = %{
      hostname: 'not.a.real.kafka.cluster',
      portno: 80,
      connect_opts: [:binary, active: false],
      conenct_timeout: 2000,
      socket: nil
    } 
    _ = send(self(), :connect_if_disconnected)
    _ = :timer.send_interval(1000, :connect_if_disconnected)
    {:ok, data}
  end

  def handle_info(:connect_if_disconnected, %{socket: nil} = data) do
    {:error, :nxdomain} = :gen_tcp.connect(data.hostname, data.portno, data.connect_opts, data.conenct_timeout)
    :ok = Logger.warn("Unable to connect to #{data.hostname}:#{data.portno}.  Retrying in #{data.conenct_timeout} milliseconds")
    {:noreply, data}
  end
end

{:ok, _} = NeverConnecter.start_link([])
15:16:06.467 [warn]  Unable to connect to not.a.real.kafka.cluster:80.  Retrying in 2000 milliseconds
15:16:07.436 [warn]  Unable to connect to not.a.real.kafka.cluster:80.  Retrying in 2000 milliseconds
15:16:08.436 [warn]  Unable to connect to not.a.real.kafka.cluster:80.  Retrying in 2000 milliseconds

@technomage
Copy link

I have a somewhat related issue. I need to connect to brokers dynamically, not statically. Having the broker defined in config is a huge problem for me. I would love to see the app startup sequence be simply setting up the server processes and then have the connection be done via API rather than all the config values being global. There could be a startup option to do the simple case, but currently there is no way that I can see to redirect to a different broker at run-time.

@joshuawscott
Copy link
Member

@technomage KafkaEx supports runtime configuration:
#314 (comment)

@technomage
Copy link

technomage commented Jan 18, 2019

Interesting but not addressing my problem. In my app the user selects a cluster/set of brokers from the UI and I need to point to the brokers at runtime after the app is up and running. And to be able to add another set of brokers, or change to another set of brokers at a later time as well.

@technomage
Copy link

I could live with having Kafka_ex not start up at app startup and manually starting it then shutting it down between broker selections. But, currently the brokers have to be identified and fixed at server startup which is not a good use case for us.

@joshuawscott
Copy link
Member

Can you open a new issue for this? it doesn't seem related to the original crash reported here.

@technomage
Copy link

ok

@bjhaid
Copy link
Member

bjhaid commented Jan 18, 2019

@technomage does the ideas recommended here:

https://github.com/kafkaex/kafka_ex#use-kafkaex-with-a-pooling-library

not work for you?

also here:

https://github.com/kafkaex/kafka_ex#create-a-kafkaex-worker

under the

With custom options:

section

@bjhaid
Copy link
Member

bjhaid commented Jan 18, 2019

and you probably want this too:

# Set this value to true if you do not want the default
# `KafkaEx.Server` worker to start during application start-up -
# i.e., if you want to start your own set of named workers
disable_default_worker: false,

@technomage
Copy link

Thanks. I did not get from the docs how these fit together. I will try that.

@Yamilquery
Copy link

Yamilquery commented Feb 27, 2019

I have the same issue. I have a server running with Apache Kafka. Even if I try to open the URL from the browser I get a message from the Kafka log, however when I start my elixir application with kafka_ex I get the following error and I never receive nothing in the Kafka log:

[error] Could not connect to broker "www-kafka":9092 because of error {:error, :nxdomain}

Could you explain to me, what can be this error?

@bjhaid
Copy link
Member

bjhaid commented Feb 28, 2019

@Yamilquery your issue is DNS related, you should confirm you can resolve the hostname from the same place the application is running, e.g:

dig +short www-kafka

@Yamilquery
Copy link

Thank you very much, I solve it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants