Pré-requisitos
- Docker CE
- Docker Compose
- git
Para iniciar o Kafka utilizaremos um ficheiro docker-compose criado e mantido pela Confluent, disponível no DockerHub
O ficheiro docker-compose.yml tem declarado os serviços e ao executar irá instalar e iniciar os serviços Zookeeper e o Kafka.
Para executar o ficheiro .yml e correr os serviços em segundo plano:
$ docker-compose up -d
Para verificar se os serviços estão em execução corretamente (portas devem estar 'Up' e estado deve ser 'run'):
$ docker-compose ps
Resultado esperado:
Name Command State Ports
----------------------------------------------------------------
kafka-single-node_kafka_1 /etc/confluent/docker/run Up
kafka-single-node_zookeeper_1 /etc/confluent/docker/run Up
O comando abaixo cria um Topic chamado storego-events:
$ docker-compose exec kafka kafka-topics --create --topic storego-events --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181
O comando abaixo confirma se o tópico storego-events foi criado:
$ docker-compose exec kafka kafka-topics --describe --topic storego-events --zookeeper zookeeper:2181
O comando abaixo irá enviar 53 mensagens para o tópico:
$ docker-compose exec kafka bash -c "seq 53 | kafka-console-producer --request-required-acks 1 --broker-list localhost:9092 --topic storego-events && echo 'Produced 53 messages.'"
Se tudo funcionar corretamente, receberá uma mensagem como a abaixo:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>Produced 53 messages.
O comando abaixo irá ler as mensagens enviadas para o tópico:
$ docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic storego-events --from-beginning --max-messages 53
Se tudo funcionar corretamente, receberá uma mensagem como a abaixo:
1
....
....
53
Processed a total of 53 messages
Pré-requisitos:
- python
- pip
- venv
- Criar o virtual enviroment:
$ python3 -m venv venv
- Activar o virtual enviroment:
$ source venv/bin/activate
- Instalar o pacote kafka-python:
$ pip install kafka-python
Exemplo de código do Producer:
from kafka import KafkaProducer
import json
import random
from time import sleep
from datetime import datetime
# Create an instance of the Kafka producer
producer = KafkaProducer(bootstrap_servers='localhost:9092',
value_serializer=lambda v: str(v).encode('utf-8'))
# Call the producer.send method with a producer-record
print("Ctrl+c to Stop")
while True:
sleep(2)
producer.send('storego-events', "alert"+str(random.randint(1,999)))
O KafkaProducer é inicializado com dois parâmetros:
- bootstrap-servers: A lista dos brokers que serão enviadas as mensagens, o broker 'localhost:9092' que está configurado no docker-compose.yml.
- value_serializer: O método de serialização das mensagens. Para simplificar, foram transportando como string, mas é comum serializar as mensagens usando JSON. O método producer.send() é utilizado para enviar as mensagens, o primeiro parâmetro é o tópico e o segundo é a mensagem. Se o tópico ainda não existir, a configuração do Broker está para “auto-criar” caso não exista.
Exemplo de código do Consumer:
from kafka import KafkaConsumer
# Create an instance of the Kafka consumer
consumer = KafkaConsumer('storego-events')
for msg in consumer:
print("Topic name=%s, Message=%s"%(msg.topic,msg.value))
Utilizart o model class Person.
Criar a classe Config e as anotações @Configuration e @EnableKafka. Criar as beans ConsumerFactory e ConcurrentKafkaListenerContainerFactory com a classe Person.
@EnableKafka
@Configuration
public class Config {
// Function to establish a connection
// between Spring application
// and Kafka server
@Bean
public ConsumerFactory<String, Person>
personConsumer()
{
// HashMap to store the configurations
Map<String, Object> map
= new HashMap<>();
// put the host IP in the map
map.put(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG,
"127.0.0.1:9092");
// put the group ID of consumer in the map
map.put(ConsumerConfig
.GROUP_ID_CONFIG,
"id");
map.put(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
map.put(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
// return message in JSON formate
return new DefaultKafkaConsumerFactory<>(
map, new StringDeserializer(),
new JsonDeserializer<>(Person.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,
Person>
personListner()
{
ConcurrentKafkaListenerContainerFactory<String,
Person>
factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(personConsumer());
return factory;
}
}
Criar a classe KafkaService com a anotação @Service. Esta classe vai conter o método listener para publicar a mensagem no terminal.
@Service
public class StoreServices {
// Annotation required to listen
// the message from Kafka server
@KafkaListener(topics = "JsonTopic",
groupId = "id", containerFactory
= "personListner")
public void
publish(Person person)
{
System.out.println("New Entry: "
+ person);
}
}
Correr o Spring no terminal para receber as mensagens produzidas.
$ ./mvnw spring-boot:run
Para teste vamos produzir uma mensagem utilizando um ficheiro people.json. No terminal, entrar na bash do container kafka.
$ docker exec -it projkafkabroker_kafka_1 bash
Criar o ficheiro people.json.
$ cat > people.json
{ "firstName": "Giovane", "lastName": "Matos", "email": "[email protected]" }
{ "firstName": "Luisa", "lastName": "Martins", "email": "[email protected]" }
{ "firstName": "Andre", "lastName": "Ferreira", "email": "[email protected]" }
{ "firstName": "Paulo", "lastName": "Loredo", "email": "[email protected]" }
{ "firstName": "Joana", "lastName": "Paiva", "email": "[email protected]" }
{ "firstName": "Jesus", "lastName": "Carvalho", "email": "[email protected]" }
Produzir mensagem
$ cat people.json | kafka-console-producer --request-required-acks 1 --broker-list localhost:9092 --topic JsonTopic && echo 'Produced json message.'
Resultado esperado no terminal do Spring:
New Entry: Person{, firstName='Giovane', lastName='Matos', email='[email protected]'}
New Entry: Person{, firstName='Luisa', lastName='Martins', email='[email protected]'}
New Entry: Person{, firstName='Andre', lastName='Ferreira', email='[email protected]'}
New Entry: Person{, firstName='Paulo', lastName='Loredo', email='[email protected]'}
New Entry: Person{, firstName='Joana', lastName='Paiva', email='[email protected]'}
New Entry: Person{, firstName='Jesus', lastName='Carvalho', email='[email protected]'}
Referências