Skip to content

Commit

Permalink
make changes to be more fault tolerant: clean up connections, brokers…
Browse files Browse the repository at this point in the history
…, failed_messages
  • Loading branch information
Tal Levy committed Sep 27, 2013
1 parent 9cf4236 commit 2aa8df0
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 7 deletions.
30 changes: 25 additions & 5 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from itertools import count, cycle
import logging
from operator import attrgetter
import socket
import struct
import time
import zlib
Expand Down Expand Up @@ -72,7 +73,7 @@ def _load_metadata_for_topics(self, *topics):
log.debug("Broker metadata: %s", brokers)
log.debug("Topic metadata: %s", topics)

self.brokers.update(brokers)
self.brokers = brokers
self.topics_to_brokers = {}

for topic, partitions in topics.items():
Expand Down Expand Up @@ -100,6 +101,12 @@ def _next_id(self):
"Generate a new correlation id"
return KafkaClient.ID_GEN.next()

def _safe_conn_reinit(self, conn):
try:
conn.reinit()
except socket.error, e:
log.error("unsuccessful reinit", e)

def _send_broker_unaware_request(self, requestId, request):
"""
Attempt to send a broker-agnostic request to one of the available
Expand All @@ -113,6 +120,7 @@ def _send_broker_unaware_request(self, requestId, request):
except Exception, e:
log.warning("Could not send request [%r] to server %s, "
"trying next server: %s" % (request, conn, e))
self._safe_conn_reinit(conn)
continue

return None
Expand Down Expand Up @@ -153,6 +161,9 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
# Accumulate the responses in a dictionary
acc = {}

# keep a list of payloads that were failed to be sent to brokers
failed_payloads = []

# For each broker, send the list of request payloads
for broker, payloads in payloads_by_broker.items():
conn = self._get_conn_for_broker(broker)
Expand All @@ -161,15 +172,24 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
correlation_id=requestId, payloads=payloads)

# Send the request, recv the response
conn.send(requestId, request)

if decoder_fn is None:
try:
conn.send(requestId, request)
if decoder_fn is None:
continue
response = conn.recv(requestId)
except Exception, e:
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
failed_payloads += payloads
self._safe_conn_reinit(conn)
self.topics_to_brokers = {}
continue

response = conn.recv(requestId)
for response in decoder_fn(response):
acc[(response.topic, response.partition)] = response

if failed_payloads:
raise FailedPayloadsException(failed_payloads)

# Order the accumulated responses by the original key order
return (acc[k] for k in original_keys) if acc else ()

Expand Down
3 changes: 3 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ class ErrorMapping(object):
# Exceptions #
#################

class FailedPayloadsException(Exception):
pass

class BufferUnderflowError(Exception):
pass

Expand Down
10 changes: 8 additions & 2 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def _send_upstream(self, queue):
self.client.send_produce_request(reqs, acks=self.req_acks,
timeout=self.ack_timeout)
except Exception as exp:
self.client._load_metadata_for_topics
log.error("Error sending message", exc_info=sys.exc_info())

def send_messages(self, partition, *msg):
Expand All @@ -126,8 +127,13 @@ def send_messages(self, partition, *msg):
else:
messages = [create_message(m) for m in msg]
req = ProduceRequest(self.topic, partition, messages)
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
except Exception as exp:
self.client._load_metadata_for_topics(self.topic)
log.error("Error sending message", exc_info=sys.exc_info())
raise exp
return resp

def stop(self, timeout=1):
Expand Down
2 changes: 2 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import struct
from threading import Thread, Event

from common import BufferUnderflowError


def write_int_string(s):
if s is None:
Expand Down

0 comments on commit 2aa8df0

Please sign in to comment.