Skip to content

Commit

Permalink
merged mumrah into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Lim committed Apr 9, 2014
2 parents 88559a5 + 13d0d44 commit 87b979e
Show file tree
Hide file tree
Showing 26 changed files with 1,981 additions and 877 deletions.
9 changes: 5 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
language: python

python:
- 2.6
- 2.7

before_install:
- git submodule update --init --recursive
- cd kafka-src
- ./sbt update
- ./sbt package
- ./sbt clean update package assembly-package-dependency
- cd -

install:
pip install .
- pip install .
# Deal with issue on Travis builders re: multiprocessing.Queue :(
# See https://github.com/travis-ci/travis-cookbooks/issues/155
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm

script:
- python -m test.test_unit
Expand Down
2 changes: 2 additions & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Ordered by contribution
* David Arthur, [@mumrah](https://github.com/mumrah)
* Mahendra M, [@mahendra](https://github.com/mahendra)
* Ivan Pouzyrevsky, [@sandello](https://github.com/sandello)
* Jim Lim, [@jimjh](https://github.com/jimjh)
* StevenLeRoux, [@StevenLeRoux](https://github.com/StevenLeRoux)
* [@anentropic](https://github.com/anentropic)
* Ben Frederickson, [@benfred](https://github.com/benfred)

Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright 2012 David Arthur
Copyright 2013 David Arthur

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
6 changes: 6 additions & 0 deletions POWERED-BY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Project/People/Companies using kafka-python

If you're using this library and care to give us a shout out, please fork the project,
add yourself here, and submit a pull request. Thanks!

* [@mumrah](https://github.com/mumrah), adding myself as an example
42 changes: 21 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kafka Python client

![travis](https://travis-ci.org/mumrah/kafka-python.png)
[![Build Status](https://travis-ci.org/mumrah/kafka-python.png)](https://travis-ci.org/mumrah/kafka-python)

This module provides low-level protocol support for Apache Kafka as well as
high-level consumer and producer classes. Request batching is supported by the
Expand All @@ -17,9 +17,8 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`

# Status

I'm following the version numbers of Kafka, plus one number to indicate the
version of this project. The current version is 0.8.1-1. This version is under
development, APIs are subject to change.
The current version of this package is **0.9.0** and is compatible with
Kafka brokers running version **0.8.1**.

# Usage

Expand All @@ -30,27 +29,27 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# To send messages synchronously
producer = SimpleProducer(kafka, "my-topic")
producer.send_messages("some message")
producer.send_messages("this method", "is variadic")
producer = SimpleProducer(kafka)
producer.send_messages("my-topic", "some message")
producer.send_messages("my-topic", "this method", "is variadic")

# To send messages asynchronously
producer = SimpleProducer(kafka, "my-topic", async=True)
producer.send_messages("async message")
producer = SimpleProducer(kafka, async=True)
producer.send_messages("my-topic", "async message")

# To wait for acknowledgements
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
# a local log before sending response
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
# by all in sync replicas before sending a response
producer = SimpleProducer(kafka, "my-topic", async=False,
producer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
acks_timeout=2000)
ack_timeout=2000)

response = producer.send_messages("async message")
response = producer.send_messages("my-topic", "async message")

if response:
print(response[0].error)
Expand All @@ -63,7 +62,7 @@ if response:
# Notes:
# * If the producer dies before the messages are sent, there will be losses
# * Call producer.stop() to send the messages and cleanup
producer = SimpleProducer(kafka, "my-topic", batch_send=True,
producer = SimpleProducer(kafka, batch_send=True,
batch_send_every_n=20,
batch_send_every_t=60)

Expand All @@ -81,22 +80,22 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# HashedPartitioner is default
producer = KeyedProducer(kafka, "my-topic")
producer.send("key1", "some message")
producer.send("key2", "this methode")
producer = KeyedProducer(kafka)
producer.send("my-topic", "key1", "some message")
producer.send("my-topic", "key2", "this methode")

producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
```

## Multiprocess consumer
```python
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
Expand All @@ -116,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):

```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
Expand Down Expand Up @@ -196,6 +195,7 @@ git submodule update
cd kafka-src
./sbt update
./sbt package
./sbt assembly-package-dependency
```

And then run the tests. This will actually start up real local Zookeeper
Expand Down
51 changes: 38 additions & 13 deletions example.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,23 +1,48 @@
import logging
#!/usr/bin/env python
import threading, logging, time

from kafka.client import KafkaClient, FetchRequest, ProduceRequest
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer

def produce_example(client):
producer = SimpleProducer(client, "my-topic")
producer.send_messages("test")
class Producer(threading.Thread):
daemon = True

def consume_example(client):
consumer = SimpleConsumer(client, "test-group", "my-topic")
for message in consumer:
print(message)
def run(self):
client = KafkaClient("localhost:9092")
producer = SimpleProducer(client)

while True:
producer.send_messages('my-topic', "test")
producer.send_messages('my-topic', "\xc2Hola, mundo!")

time.sleep(1)


class Consumer(threading.Thread):
daemon = True

def run(self):
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "my-topic")

for message in consumer:
print(message)

def main():
client = KafkaClient("localhost", 9092)
produce_example(client)
consume_example(client)
threads = [
Producer(),
Consumer()
]

for t in threads:
t.start()

time.sleep(5)

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.DEBUG
)
main()
2 changes: 1 addition & 1 deletion kafka-src
Submodule kafka-src updated 370 files
2 changes: 1 addition & 1 deletion kafka/NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ There are a few levels of abstraction:

# Possible API

client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")

producer = KafkaProducer(client, "topic")
producer.send_string("hello")
Expand Down
2 changes: 1 addition & 1 deletion kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__title__ = 'kafka'
__version__ = '0.2-alpha'
__version__ = '0.9.0'
__author__ = 'David Arthur'
__license__ = 'Apache License 2.0'
__copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0'
Expand Down
Loading

0 comments on commit 87b979e

Please sign in to comment.