Para criar o projeto Java, vamos usar o Maven para gerenciar dependências e automatizar a build. O projeto já está criado (ver).
Caso você não tenha experiência em Maven, é um dos gerenciadores de build mais famosos do Java. Recomendo a leitura de uma introdução que escrevi para projetos Quarkus Maven 101 ou de uma série completa de posts do Chandra Guntur Understanding Apache Maven – The Series (em inglês).
Para gerar um projeto Maven, use o comando abaixo.
mvn archetype:generate -DgroupId=io.vepo.kafka.imersao -DartifactId=meu-primeiro-produtor -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
Depois de criado o projeto, abra o arquivo Project Object Model pom.xml
e adicione a biblioteca na área de dependências. As dependências Maven são representadas pelas coordenadas (groupId
, artifactId
e version
) e podem ser encontrada nos repositórios sonartype ou mvnrepository.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
Sobre o arquétipo: O arquétipo do projeto Java é bem antigo e desatualizado. Pode remover a dependência do JUnit 4 e mudar a versão do Java de
1.7
para1.8
.Sobre a versão do Java:: As bibliotecas do Apache Kafka são compatíveis com as novas versões do Java, mas usaremos a 8. Se quiser pode testar depois como fica nas versões 11 (LTS), 17 (LTS) e 20 (mais recente).
O Maven segue a filosofia convenção-sobre-configuração, ou seja, para criar uma classe não precisamos informar em lugar nenhum que ela deve ser compilada, deve-se apenas colocar ela no diretório correto.
Nosso exemplo será um Produtor de informações climáticas, por isso vou criar a classe WeatherSensorCollector
, ela vai pertencer ao pacote io.vepo.sensor.weather
, logo deve ser colocada na estrutura abaixo:
.
├── src ## Todo o código deve ser armazenado nessa pasta
│ ├── main ## Código de produção
| | ├── java ## Código Java de produção
| | | └── io
| | | └── vepo
| | | └── sensor
| | | └── weather
| | | └── WeatherSensorCollector.java
| | └── resources ## Arquivos que não serão compilados, mas estarão disponíveis em tempo de execução
│ └── test ## Código usado para testes unitários
| ├── java ## Código Java para testes unitários
| └── resources ## Arquivos que não serão compilados, mas estarão disponíveis em tempo de execução
└── pom.xml ## Arquivo que define como será feita a build
Criada a classe, podemos verificar se está tudo certo executando ela usando o Maven:
mvn clean compile exec:java -Dexec.mainClass=io.vepo.sensor.weather.WeatherSensorCollector
Para facilitar nossa vida, vamos adicionar definir a classe principal direto no jar e como configuração padrão para execução.
Adicione o seguinte plugin (procure por build
→ plugins
ou build
→ pluginManagement
→ plugins
) para configurar a build:
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<mainClass>io.vepo.sensor.weather.WeatherSensorCollector</mainClass>
</configuration>
</plugin>
Agora para executar, basta usar:
mvn clean compile exec:java
Para definir que essa classe de execução do jar, altere a configuração do plugin maven-jar-plugin
:
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>io.vepo.sensor.weather.WeatherSensorCollector</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
Para executar é preciso primeiro criar o jar:
mvn clean package
java -jar target/meu-primeiro-produtor-1.0-SNAPSHOT.jar
Antes de implementar essa classe, vamos criar alguns pressupostos sobre como ela vai ser utilizada.
- Ela será utilizada em um loop de coleta dedados. Ela deve ser passada inicializada para o loop.
- Ela receberá um objeto com informações do clima.
- A chave será a geolocalização e esse valor está dentro das informações do clima.
- A classe já receberá o produtor inicializado.
WeatherSensorCollector collector = new WeatherSensorCollector(new KafkaProducer<>(/* inicialização */));
collector.send(new WeatherInfo(/* inicialização */));
O primeiro passo quando vamos pensa em um produtor é escolher nossos Serializador. Devemos escolher o serializador dos valores e das chaves. A biblioteca padrão do Kafka já vem com alguns serializadores, mas eles se resumem a tipos primitivos e objetos simples. O recomendado é que usemos um serializador open source, como o da Confluent ou da Red Hat, mas para estudo vamos implementar nosso próprio.
Todo serializador deve implementar a interface Serializer, código da biblioteca padrão é aberto e podemos ver que são disponibilizado serializadores para boolean
(BooleanSerializer), byte[]
(ByteArraySerializer), ByteBuffer
(ByteBufferSerializer), Bytes
(um tipo de byte[] imutável do Kafka, BytesSerializer), double
(DoubleSerializer), float
(FloatSerializer), int
(IntegerSerializer), long
(LongSerializer), short
(ShortSerializer), String
(StringSerializer), UUID
(UUIDSerializer) e null
(VoidSerializer).
As bibliotecas de serialização vão depender de Schema, que falaremos depois. Pela Confluent temos o KafkaAvroSerializer para AVRO, KafkaProtobufSerializer para ProtoBuf e KafkaJsonSchemaSerializer para JSON. Pela Red Hat, temos o AvroKafkaSerializer para AVRO e ProtobufKafkaSerializer para ProtoBuf. Esses formatos dependem da troca de Schema entre o produtor e o consumidor, o que implica no uso de um servidor de registro de schemas, que são o Schema Registry da Confluent e o Apicurio Registry da Red Hat.
Podemos ver abaixo o código da interface Serializer. Devemos implementar o método byte[] serialize(String topic, T data);
e apesar de existir o um método com o argumento Headers
ele não faz parte do corpo da mensagem e falaremos mais a frente.
/**
* An interface for converting objects to bytes.
*
* A class that implements this interface is expected to have a constructor with no parameter.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
*
* @param <T> Type to be serialized from.
*/
public interface Serializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}
/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param data typed data
* @return serialized bytes
*/
byte[] serialize(String topic, T data);
/**
* Convert {@code data} into a byte array.
*
* @param topic topic associated with data
* @param headers headers associated with the record
* @param data typed data
* @return serialized bytes
*/
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
/**
* Close this serializer.
* <p>
* This method must be idempotent as it may be called multiple times.
*/
@Override
default void close() {
// intentionally left blank
}
}
Vamos optar por uma solução simples usando ByteBuffer
para serializar nossos objetos de domínio. Segue abaixo como ficaria a implementação.
public class WeatherInfoSerializer implements Serializer<WeatherInfo> {
@Override
public byte[] serialize(String topic, WeatherInfo data) {
return ByteBuffer.allocate(Double.SIZE * 4 + Long.SIZE)
.putDouble(data.getLocation().getLat())
.putDouble(data.getLocation().getLon())
.putDouble(data.getTemperature())
.putDouble(data.getWind())
.putLong(data.getTimestamp())
.array();
}
}
public class GeolocationSerializer implements Serializer<Geolocation> {
@Override
public byte[] serialize(String topic, Geolocation data) {
return ByteBuffer.allocate(Double.SIZE * 2)
.putDouble(data.getLat())
.putDouble(data.getLon())
.array();
}
}
O próximo passo é definir as configurações do Produtor. Quando vamos inicializar qualquer cliente Kafka, ele pode receber como parâmetro um Properties ou um Map<String, Object> com os valores que serão usados para configurar o produtor e tudo relacionado a ele (serializers, interceptors, etc...). Os valores aceitos como padrão estão na documentação do site do Kafka, é uma leitura obrigatória.
Fonte: https://excalidraw.com/#json=vlNnnnJ2VLzXJ7f9swNxk,Qt0njPi8iUSA2MQdAmGV5w
Abaixo eu listo as configurações mais importantes, mas você não precisa decorar essas chaves, pode usar a classe ProducerConfig que além de ter toda as chaves, tem toda a documentação associada a ela.
key.serializer
value.serializer
bootstrap.servers
partitioner.class
acks
interceptor.classes
Assim para definir as propriedades que vamos usar basta:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, GeolocationSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, WeatherInfoSerializer.class);
Agora precisamos definir a classe que vai encapsular o produtor. As responsabilidades dela serão:
- Definir o Tópico
- Definir a chave
public class WeatherSensorCollector implements AutoCloseable {
private final Producer<Geolocation, WeatherInfo> producer;
public WeatherSensorCollector(Producer<Geolocation, WeatherInfo> producer) {
this.producer = producer;
}
public Future<RecordMetadata> send(WeatherInfo info) {
return producer.send(new ProducerRecord<Geolocation, WeatherInfo>("weather", info.getLocation(), info));
}
@Override
public void close() {
this.producer.close();
}
}
Porque estamos recebendo Producer como parâmetro? Para que possamos depois testar usando MockProducer!
Recomendo você conhecer um pouco da classe ProducerRecord e da própria classe Producer. Com a classe ProducerRecord podemos definir qual é a partição, o timestamp e os cabeçalhos da mensagem. Mas a partição pode ser definida Partitioner.
Mais um ponto a se observar é que o envio de mensagens é síncrono. Isso significa que o Kafka coloca as mensagens em um buffer e envia a posteriori, para mais detalhes veja as configurações buffer.memory
, batch.size
e linger.ms
em Producer Configs.
Se você olhou a documentação, também deve ter percebido que Existe a possibilidade de se implementar transações, o que garante que várias mensagens sejam enviadas atomicamente.
A nossa classe WeatherSensorCollector
é pequena e simples, mas mesmo assim ela pode ser testada através da classe MockProducer.