Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Jul 19, 2017
2 parents a60a775 + 26ce1c0 commit f273b6a
Show file tree
Hide file tree
Showing 24 changed files with 394 additions and 146 deletions.
43 changes: 32 additions & 11 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,40 @@ cache:
directories:
- $HOME/.cache/pip
- $HOME/.ccache
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
- "pypy"
matrix:
include:
- env: TOX_ENV=py27 KAFKA_VERSION=0.8.2.2
python: 2.7
- env: TOX_ENV=py27 KAFKA_VERSION=0.10.0.1
python: 2.7
- env: TOX_ENV=py34 KAFKA_VERSION=0.8.2.2
python: 3.4
- env: TOX_ENV=py34 KAFKA_VERSION=0.10.0.1
python: 3.4
- env: TOX_ENV=py35 KAFKA_VERSION=0.8.2.2
python: 3.5
- env: TOX_ENV=py35 KAFKA_VERSION=0.10.0.1
python: 3.5
- env: TOX_ENV=py36 KAFKA_VERSION=0.8.2.2
python: 3.6
- env: TOX_ENV=py36 KAFKA_VERSION=0.10.0.1
python: 3.6
- env: TOX_ENV=pypy KAFKA_VERSION=0.8.2.2
python: pypy
- env: TOX_ENV=pypy KAFKA_VERSION=0.10.0.1
python: pypy
- env: TOX_ENV=py27-gevent KAFKA_VERSION=0.8.2.2
python: 2.7
- env: TOX_ENV=py27-gevent KAFKA_VERSION=0.10.0.1
python: 2.7
- env: TOX_ENV=py36-gevent KAFKA_VERSION=0.8.2.2
python: 3.6
- env: TOX_ENV=py36-gevent KAFKA_VERSION=0.10.0.1
python: 3.6
env:
global:
- PATH="/usr/lib/ccache:$PATH"
- KAFKA_BIN="$HOME/kafka-bin"
matrix:
- KAFKA_VERSION=0.8.2.2
- KAFKA_VERSION=0.10.0.1

addons:
apt:
Expand Down Expand Up @@ -46,7 +67,7 @@ install:
source "$HOME/virtualenvs/pypy-$PYPY_VERSION/bin/activate"
fi
- pip install -U pip setuptools
- pip install codecov kazoo tox testinstances tox-travis "gevent==1.1"
- pip install codecov kazoo tox testinstances
- wget https://github.com/edenhill/librdkafka/archive/0.9.1.tar.gz
- tar -xzf 0.9.1.tar.gz
- cd librdkafka-0.9.1/ && ./configure --prefix=$HOME
Expand All @@ -65,7 +86,7 @@ before_script:
- export `grep ZOOKEEPER $TEMPFILE`

script:
- tox tests
- tox -e $TOX_ENV tests

# Calculate coverage on success
after_success:
Expand Down
2 changes: 1 addition & 1 deletion pykafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .balancedconsumer import BalancedConsumer
from .managedbalancedconsumer import ManagedBalancedConsumer

__version__ = '2.6.0'
__version__ = '2.6.1.dev1'


__all__ = ["Broker", "SimpleConsumer", "Cluster", "Partition", "Producer",
Expand Down
16 changes: 11 additions & 5 deletions pykafka/balancedconsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,23 @@
import weakref

from kazoo.client import KazooClient
from kazoo.handlers.gevent import SequentialGeventHandler
from kazoo.exceptions import NoNodeException, NodeExistsError
from kazoo.recipe.watchers import ChildrenWatch
try:
from kazoo.handlers.gevent import SequentialGeventHandler
except ImportError:
SequentialGeventHandler = None
from six import reraise

from .common import OffsetType
from .exceptions import KafkaException, PartitionOwnedError, ConsumerStoppedException
from .handlers import GEventHandler
from .simpleconsumer import SimpleConsumer
from .utils.compat import range, get_bytes, itervalues, iteritems, get_string
from .utils.error_handlers import valid_int
try:
from .handlers import GEventHandler
except ImportError:
GEventHandler = None
try:
from . import rdkafka
except ImportError:
Expand Down Expand Up @@ -189,7 +195,7 @@ def __init__(self,
:type use_rdkafka: bool
:param compacted_topic: Set to read from a compacted topic. Forces
consumer to use less stringent message ordering logic because compacted
topics do not provide offsets in stict incrementing order.
topics do not provide offsets in strict incrementing order.
:type compacted_topic: bool
"""
self._cluster = cluster
Expand Down Expand Up @@ -225,7 +231,7 @@ def __init__(self,

if not rdkafka and use_rdkafka:
raise ImportError("use_rdkafka requires rdkafka to be installed")
if isinstance(self._cluster.handler, GEventHandler) and use_rdkafka:
if GEventHandler and isinstance(self._cluster.handler, GEventHandler) and use_rdkafka:
raise ImportError("use_rdkafka cannot be used with gevent")
self._use_rdkafka = rdkafka and use_rdkafka

Expand Down Expand Up @@ -344,7 +350,7 @@ def _setup_zookeeper(self, zookeeper_connect, timeout):
:type timeout: int
"""
kazoo_kwargs = {'timeout': timeout / 1000}
if isinstance(self._cluster.handler, GEventHandler):
if GEventHandler and isinstance(self._cluster.handler, GEventHandler):
kazoo_kwargs['handler'] = SequentialGeventHandler()
self._zookeeper = KazooClient(zookeeper_connect, **kazoo_kwargs)
self._zookeeper.start()
Expand Down
14 changes: 10 additions & 4 deletions pykafka/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def __init__(self,
buffer_size=1024 * 1024,
source_host='',
source_port=0,
ssl_config=None):
ssl_config=None,
broker_version="0.9.0"):
"""Create a Broker instance.
:param id_: The id number of this broker
Expand Down Expand Up @@ -104,6 +105,7 @@ def __init__(self,
self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms
self._buffer_size = buffer_size
self._req_handlers = {}
self._broker_version = broker_version
try:
self.connect()
except SocketDisconnectedError:
Expand All @@ -129,7 +131,8 @@ def from_metadata(cls,
buffer_size=64 * 1024,
source_host='',
source_port=0,
ssl_config=None):
ssl_config=None,
broker_version="0.9.0"):
"""Create a Broker using BrokerMetadata
:param metadata: Metadata that describes the broker.
Expand Down Expand Up @@ -160,7 +163,8 @@ def from_metadata(cls,
buffer_size=buffer_size,
source_host=source_host,
source_port=source_port,
ssl_config=ssl_config)
ssl_config=ssl_config,
broker_version=broker_version)

@property
def connected(self):
Expand Down Expand Up @@ -289,13 +293,15 @@ def fetch_messages(self,
block for up to `timeout` milliseconds.
:type min_bytes: int
"""
response_class = FetchResponse.get_subclass(self._broker_version)
future = self._req_handler.request(FetchRequest(
partition_requests=partition_requests,
timeout=timeout,
min_bytes=min_bytes,
api_version=response_class.api_version,
))
# XXX - this call returns even with less than min_bytes of messages?
return future.get(FetchResponse)
return future.get(response_class)

@_check_handler
def produce_messages(self, produce_request):
Expand Down
3 changes: 3 additions & 0 deletions pykafka/cli/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

from .kafka_tools import main
main()
7 changes: 6 additions & 1 deletion pykafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

__all__ = ["KafkaClient"]

from .handlers import ThreadingHandler, GEventHandler
import logging

from .cluster import Cluster
from .handlers import ThreadingHandler
try:
from .handlers import GEventHandler
except ImportError:
GEventHandler = None


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -124,6 +127,8 @@ def __init__(self,
self._source_address = source_address
self._socket_timeout_ms = socket_timeout_ms
self._offsets_channel_socket_timeout_ms = offsets_channel_socket_timeout_ms
if use_greenlets and not GEventHandler:
raise ImportError('use_greenlets can only be used when gevent is installed.')
self._handler = GEventHandler() if use_greenlets else ThreadingHandler()
self.cluster = Cluster(
hosts,
Expand Down
20 changes: 12 additions & 8 deletions pykafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def __init__(self,
self._ssl_config = ssl_config
self._zookeeper_connect = zookeeper_hosts
self._max_connection_retries = 3
self._max_connection_retries_offset_mgr = 8
self._broker_version = broker_version
if ':' in self._source_address:
self._source_port = int(self._source_address.split(':')[1])
Expand Down Expand Up @@ -245,7 +246,8 @@ def _request_metadata(self, broker_connects, topics):
buffer_size=1024 * 1024,
source_host=self._source_host,
source_port=self._source_port,
ssl_config=self._ssl_config)
ssl_config=self._ssl_config,
broker_version=self._broker_version)
response = broker.request_metadata(topics)
if response is not None:
return response
Expand Down Expand Up @@ -354,7 +356,8 @@ def _update_brokers(self, broker_metadata):
buffer_size=1024 * 1024,
source_host=self._source_host,
source_port=self._source_port,
ssl_config=self._ssl_config)
ssl_config=self._ssl_config,
broker_version=self._broker_version)
elif not self._brokers[id_].connected:
log.info('Reconnecting to broker id %s: %s:%s', id_, meta.host, meta.port)
try:
Expand Down Expand Up @@ -402,23 +405,24 @@ def get_group_coordinator(self, consumer_group):
"""
log.info("Attempting to discover offset manager for consumer group '%s'",
consumer_group)
for i in range(self._max_connection_retries):
max_connection_retries = self._max_connection_retries_offset_mgr
for i in range(max_connection_retries):
if i > 0:
log.debug("Retrying offset manager discovery")
time.sleep(i * 2)
for broker in itervalues(self.brokers):
if i > 0:
log.debug("Retrying offset manager discovery")
time.sleep(i * 2)

req = GroupCoordinatorRequest(consumer_group)
future = broker.handler.request(req)
try:
res = future.get(GroupCoordinatorResponse)
except GroupCoordinatorNotAvailable:
log.error('Error discovering offset manager.')
if i == self._max_connection_retries - 1:
if i == max_connection_retries - 1:
raise
except SocketDisconnectedError:
log.warning("Socket disconnected during offset manager discovery")
if i == self._max_connection_retries - 1:
if i == max_connection_retries - 1:
raise
self.update()
else:
Expand Down
10 changes: 5 additions & 5 deletions pykafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def connect(self, timeout):
except (self._handler.SockErr, self._handler.GaiError) as e:
log.info("Failed to connect to %s:%s", self.host, self.port)
log.info(e)
raise SocketDisconnectedError
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
log.debug("Successfully connected to %s:%s", self.host, self.port)

def disconnect(self):
Expand All @@ -194,13 +194,13 @@ def request(self, request):
"""Send a request over the socket connection"""
bytes_ = request.get_bytes()
if not self._socket:
raise SocketDisconnectedError
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
try:
self._socket.sendall(bytes_)
except self._handler.SockErr as e:
log.error("Failed to send data, error: %s" % repr(e))
self.disconnect()
raise SocketDisconnectedError
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))

def response(self):
"""Wait for a response from the broker"""
Expand All @@ -214,13 +214,13 @@ def response(self):
if r is None or len(r) == 0:
# Happens when broker has shut down
self.disconnect()
raise SocketDisconnectedError
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
size += r
size = struct.unpack('!i', size)[0]
try:
recvall_into(self._socket, self._buff, size)
except SocketDisconnectedError:
self.disconnect()
raise
raise SocketDisconnectedError("<broker {}:{}>".format(self.host, self.port))
# Drop CorrelationId => int32
return buffer(self._buff[4:4 + size])
59 changes: 32 additions & 27 deletions pykafka/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@
__all__ = ["ResponseFuture", "Handler", "ThreadingHandler", "RequestHandler"]

from collections import namedtuple
import gevent
import gevent.event
import gevent.lock
import gevent.queue
import gevent.socket as gsocket
from gevent.socket import error as gsocket_error
from gevent.socket import gaierror as g_gaierror
import logging
import socket as pysocket
from socket import error as socket_error
Expand All @@ -34,6 +27,17 @@
import threading
import time

try:
import gevent
import gevent.event
import gevent.lock
import gevent.queue
import gevent.socket as gsocket
from gevent.socket import error as gsocket_error
from gevent.socket import gaierror as g_gaierror
except ImportError:
gevent = None

from .utils.compat import Queue, Empty, Semaphore

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,26 +118,27 @@ def spawn(self, target, *args, **kwargs):
return t


class GEventHandler(Handler):
"""A handler that uses a greenlet to perform its work"""
Queue = gevent.queue.JoinableQueue
Event = gevent.event.Event
Lock = gevent.lock.RLock # fixme
RLock = gevent.lock.RLock
Semaphore = gevent.lock.Semaphore
Socket = gsocket
SockErr = gsocket_error
GaiError = g_gaierror

def sleep(self, seconds=0):
gevent.sleep(seconds)

def spawn(self, target, *args, **kwargs):
# Greenlets don't support naming
if 'name' in kwargs:
kwargs.pop('name')
t = gevent.spawn(target, *args, **kwargs)
return t
if gevent:
class GEventHandler(Handler):
"""A handler that uses a greenlet to perform its work"""
Queue = gevent.queue.JoinableQueue
Event = gevent.event.Event
Lock = gevent.lock.RLock # fixme
RLock = gevent.lock.RLock
Semaphore = gevent.lock.Semaphore
Socket = gsocket
SockErr = gsocket_error
GaiError = g_gaierror

def sleep(self, seconds=0):
gevent.sleep(seconds)

def spawn(self, target, *args, **kwargs):
# Greenlets don't support naming
if 'name' in kwargs:
kwargs.pop('name')
t = gevent.spawn(target, *args, **kwargs)
return t


class RequestHandler(object):
Expand Down
Loading

0 comments on commit f273b6a

Please sign in to comment.