Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

ManagedBalancedConsumer broker failure handling #568

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ def _get_internal_consumer(self, partitions=None, start=True):
auto_start=start,
compacted_topic=self._is_compacted_topic,
generation_id=self._generation_id,
consumer_id=self._consumer_id
consumer_id=self._consumer_id,
parent_consumer=self
)

def _decide_partitions(self, participants, consumer_id=None):
Expand Down
32 changes: 20 additions & 12 deletions pykafka/managedbalancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from .balancedconsumer import BalancedConsumer
from .common import OffsetType
from .exceptions import (IllegalGeneration, RebalanceInProgress, NotCoordinatorForGroup,
GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress)
GroupCoordinatorNotAvailable, ERROR_CODES, GroupLoadInProgress,
UnknownMemberId)
from .protocol import MemberAssignment
from .utils.compat import iterkeys
from .utils.error_handlers import valid_int
Expand Down Expand Up @@ -199,6 +200,7 @@ def __init__(self,
self._consumer_id = b''
self._worker_trace_logged = False
self._worker_exception = None
self._should_rebalance = False
self._default_error_handlers = self._build_default_error_handlers()

if auto_start is True:
Expand All @@ -213,18 +215,21 @@ def fetcher():
try:
if not self._running:
break

log.info("Sending heartbeat from consumer '%s'", self._consumer_id)
res = self._group_coordinator.heartbeat(self._connection_id,
self._consumer_group,
self._generation_id,
self._consumer_id)
if res.error_code != 0:
log.info("Error code %d encountered on heartbeat.",
res.error_code)
self._handle_error(res.error_code)
if self._should_rebalance:
self._rebalance()

self._should_rebalance = False
else:
log.info("Sending heartbeat from consumer '%s'",
self._consumer_id)
res = self._group_coordinator.heartbeat(self._connection_id,
self._consumer_group,
self._generation_id,
self._consumer_id)
if res.error_code != 0:
log.info("Error code %d encountered on heartbeat.",
res.error_code)
self._handle_error(res.error_code)
self._rebalance()
self._cluster.handler.sleep(self._heartbeat_interval_ms / 1000)
except ReferenceError:
break
Expand Down Expand Up @@ -273,6 +278,7 @@ def _update_member_assignment(self):
but uses the Kafka 0.9 Group Membership API instead of ZooKeeper to manage
group state
"""
log.info("Member assignment update triggered")
for i in range(self._rebalance_max_retries):
try:
members = self._join_group()
Expand Down Expand Up @@ -314,6 +320,7 @@ def _handle_NotCoordinatorForGroup():
return {
GroupCoordinatorNotAvailable.ERROR_CODE: _handle_GroupCoordinatorNotAvailable,
NotCoordinatorForGroup.ERROR_CODE: _handle_NotCoordinatorForGroup,
UnknownMemberId.ERROR_CODE: None,
GroupLoadInProgress.ERROR_CODE: None,
RebalanceInProgress.ERROR_CODE: None,
IllegalGeneration.ERROR_CODE: None
Expand Down Expand Up @@ -350,6 +357,7 @@ def _join_group(self):
self._cluster.handler.sleep(i * 2)
self._generation_id = join_result.generation_id
self._consumer_id = join_result.member_id
log.info("Successfully joined consumer group '%s'", self._consumer_group)
return join_result.members

def _sync_group(self, group_assignments):
Expand Down
7 changes: 6 additions & 1 deletion pykafka/simpleconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ def __init__(self,
reset_offset_on_start=False,
compacted_topic=False,
generation_id=-1,
consumer_id=b''):
consumer_id=b'',
parent_consumer=None):
"""Create a SimpleConsumer.

Settings and default values are taken from the Scala
Expand Down Expand Up @@ -172,6 +173,7 @@ def __init__(self,
self._generation_id = valid_int(generation_id, allow_zero=True,
allow_negative=True)
self._consumer_id = consumer_id
self._parent_consumer = parent_consumer

# incremented for any message arrival from any partition
# the initial value is 0 (no messages waiting)
Expand Down Expand Up @@ -233,6 +235,8 @@ def _update(self):
self._cluster.update()
self._setup_partitions_by_leader()
self._discover_group_coordinator()
if self._parent_consumer is not None:
self._parent_consumer._should_rebalance = True

def start(self):
"""Begin communicating with Kafka, including setting up worker threads
Expand Down Expand Up @@ -286,6 +290,7 @@ def _handle_GroupLoadInProgress(parts):

def _handle_IllegalGeneration(parts):
log.info("Continuing in response to IllegalGeneration")
self._update()

def _handle_UnknownMemberId(parts):
log.info("Continuing in response to UnknownMemberId")
Expand Down