From 0b98c793b4ef098818d66e4fee9851c60000357f Mon Sep 17 00:00:00 2001 From: Sai Prasad Date: Fri, 26 Apr 2019 15:20:29 +0530 Subject: [PATCH 1/2] ManagedBalancedConsumer: Log the exception even if its the last retry attempt. --- pykafka/managedbalancedconsumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pykafka/managedbalancedconsumer.py b/pykafka/managedbalancedconsumer.py index fb4613ec8..c2081ee95 100644 --- a/pykafka/managedbalancedconsumer.py +++ b/pykafka/managedbalancedconsumer.py @@ -324,10 +324,10 @@ def _update_member_assignment(self): log.debug("Successfully rebalanced consumer '%s'", self._consumer_id) break except Exception as ex: + log.exception(ex) if i == self._rebalance_max_retries - 1: log.warning('Failed to rebalance s after %d retries.', i) raise - log.exception(ex) log.info('Unable to complete rebalancing. Retrying') self._cluster.handler.sleep(i * (self._rebalance_backoff_ms / 1000)) self._raise_worker_exceptions() From 238bcc8d73c1aefd5cbaa8f1a2e4688392f32b27 Mon Sep 17 00:00:00 2001 From: Sai Prasad Date: Fri, 26 Apr 2019 15:24:22 +0530 Subject: [PATCH 2/2] Reconnect the connection if required in JoinGroup requests `Broker._get_unique_req_handler` is responsible to identify which `BrokerConnection` & which `RequestHandler` objects will be used for `JoinGroup`, `SyncGroup`, `LeaveGroup` etc. This change will ensure that this method doesn't reuse a disconnected connection without attempting to reconnect. --- pykafka/broker.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pykafka/broker.py b/pykafka/broker.py index bfcdfd15c..e06e01fcb 100644 --- a/pykafka/broker.py +++ b/pykafka/broker.py @@ -294,7 +294,16 @@ def _get_unique_req_handler(self, connection_id): handler = RequestHandler(self._handler, conn) handler.start() self._req_handlers[connection_id] = handler - return self._req_handlers[connection_id] + + handler = self._req_handlers[connection_id] + + # Ensure that we're returning a handler with a connected connection. + # If the connection is disconnected, it will raise SocketDisconnectedError + if not handler.shared.connection.connected: + log.warn('Attempting to reconnect for connection id %s..', connection_id) + handler.shared.connection.connect(self._socket_timeout_ms) + + return handler @_check_handler def fetch_messages(self,