diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index b96938e28..1fee4f1e4 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -414,7 +414,8 @@ def _get_internal_consumer(self, partitions=None, start=True): auto_start=start, compacted_topic=self._is_compacted_topic, generation_id=self._generation_id, - consumer_id=self._consumer_id + consumer_id=self._consumer_id, + parent_consumer=self ) def _decide_partitions(self, participants, consumer_id=None): diff --git a/pykafka/managedbalancedconsumer.py b/pykafka/managedbalancedconsumer.py index bc4ce8a07..d8984cbff 100644 --- a/pykafka/managedbalancedconsumer.py +++ b/pykafka/managedbalancedconsumer.py @@ -26,7 +26,8 @@ from .balancedconsumer import BalancedConsumer from .common import OffsetType from .exceptions import (IllegalGeneration, RebalanceInProgress, NotCoordinatorForGroup, - GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress) + GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress, + UnknownMemberId) from .protocol import MemberAssignment from .utils.compat import iterkeys from .utils.error_handlers import valid_int @@ -199,6 +200,7 @@ def __init__(self, self._consumer_id = b'' self._worker_trace_logged = False self._worker_exception = None + self._should_rebalance = False self._default_error_handlers = self._build_default_error_handlers() if auto_start is True: @@ -213,18 +215,21 @@ def fetcher(): try: if not self._running: break - - log.info("Sending heartbeat from consumer '%s'", self._consumer_id) - res = self._group_coordinator.heartbeat(self._connection_id, - self._consumer_group, - self._generation_id, - self._consumer_id) - if res.error_code != 0: - log.info("Error code %d encountered on heartbeat.", - res.error_code) - self._handle_error(res.error_code) + if self._should_rebalance: self._rebalance() - + self._should_rebalance = False + else: + log.info("Sending heartbeat from consumer '%s'", + self._consumer_id) + res = self._group_coordinator.heartbeat(self._connection_id, + self._consumer_group, + self._generation_id, + self._consumer_id) + if res.error_code != 0: + log.info("Error code %d encountered on heartbeat.", + res.error_code) + self._handle_error(res.error_code) + self._rebalance() self._cluster.handler.sleep(self._heartbeat_interval_ms / 1000) except ReferenceError: break @@ -273,6 +278,7 @@ def _update_member_assignment(self): but uses the Kafka 0.9 Group Membership API instead of ZooKeeper to manage group state """ + log.info("Member assignment update triggered") for i in range(self._rebalance_max_retries): try: members = self._join_group() @@ -314,6 +320,7 @@ def _handle_NotCoordinatorForGroup(): return { GroupCoordinatorNotAvailable.ERROR_CODE: _handle_GroupCoordinatorNotAvailable, NotCoordinatorForGroup.ERROR_CODE: _handle_NotCoordinatorForGroup, + UnknownMemberId.ERROR_CODE: None, GroupLoadInProgress.ERROR_CODE: None, RebalanceInProgress.ERROR_CODE: None, IllegalGeneration.ERROR_CODE: None @@ -350,6 +357,7 @@ def _join_group(self): self._cluster.handler.sleep(i * 2) self._generation_id = join_result.generation_id self._consumer_id = join_result.member_id + log.info("Successfully joined consumer group '%s'", self._consumer_group) return join_result.members def _sync_group(self, group_assignments): diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index 0e3ee795d..b056530ee 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -70,7 +70,8 @@ def __init__(self, reset_offset_on_start=False, compacted_topic=False, generation_id=-1, - consumer_id=b''): + consumer_id=b'', + parent_consumer=None): """Create a SimpleConsumer. Settings and default values are taken from the Scala @@ -172,6 +173,7 @@ def __init__(self, self._generation_id = valid_int(generation_id, allow_zero=True, allow_negative=True) self._consumer_id = consumer_id + self._parent_consumer = parent_consumer # incremented for any message arrival from any partition # the initial value is 0 (no messages waiting) @@ -233,6 +235,8 @@ def _update(self): self._cluster.update() self._setup_partitions_by_leader() self._discover_group_coordinator() + if self._parent_consumer is not None: + self._parent_consumer._should_rebalance = True def start(self): """Begin communicating with Kafka, including setting up worker threads @@ -286,6 +290,7 @@ def _handle_GroupLoadInProgress(parts): def _handle_IllegalGeneration(parts): log.info("Continuing in response to IllegalGeneration") + self._update() def _handle_UnknownMemberId(parts): log.info("Continuing in response to UnknownMemberId")