Skip to content

Commit

Permalink
Add protocol dto classes #5
Browse files Browse the repository at this point in the history
- produce API
  • Loading branch information
Kindrat committed Nov 28, 2015
1 parent a953e07 commit 662ebf3
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 8 deletions.
20 changes: 20 additions & 0 deletions src/main/java/com/github/nginate/kafka/protocol/ApiKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,29 @@
@Getter
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
public enum ApiKeys {
/**
* The produce API is used to send message sets to the server. For efficiency it allows sending message sets
* intended for many topic partitions in a single request. The produce API uses the generic message set format,
* but since no offset has been assigned to the messages at the time of the send the producer is free to fill in
* that field in any way it likes.
*/
PRODUCE(0),
FETCH(1),
LIST_OFFSETS(2),
/**
* This API answers the following questions:
* What topics exist?
* How many partitions does each topic have?
* Which broker is currently the leader for each partition?
* What is the host and port for each of these brokers?
* This is the only request that can be addressed to any broker in the cluster. Since there may be many topics the
* client can give an optional list of topic names in order to only return metadata for a subset of topics. The
* metadata returned is at the partition level, but grouped together by topic for convenience and to avoid
* redundancy. For each partition the metadata contains the information for the leader as well as for all the
* replicas and the list of replicas that are currently in-sync. Note: If "auto.create.topics.enable" is set in the
* broker configuration, a topic metadata request will create the topic with the default replication factor and
* number of partitions.
*/
METADATA(3),
LEADER_AND_ISR(4),
STOP_REPLICA(5),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.github.nginate.kafka.protocol.messages;

import com.github.nginate.kafka.protocol.types.Type;
import lombok.Data;

import static com.github.nginate.kafka.protocol.types.TypeName.*;

@Data
public class MessageSet {
@Type(WRAPPER)
private MessageData[] messageData;

@Data
public static class MessageData {
/**
* This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't
* actually know the offset and can fill in any value here it likes.
*/
@Type(INT64)
private Long offset;
@Type(INT32)
private Integer messageSize;
@Type(WRAPPER)
private Message message;

@Data
public static class Message {
/**
* The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of
* the message on the broker and consumer.
*/
@Type(INT32)
private Integer crc;
/**
* This is a version id used to allow backwards compatible evolution of the message binary format.
* The current value is 0.
*/
@Type(INT8)
private Byte magicByte;
/**
* This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec
* used for the message. The other bits should be set to 0.
*/
@Type(INT8)
private Byte attributes;
/**
* The key is an optional message key that was used for partition assignment. The key can be null.
*/
@Type(BYTES)
private byte[] key;
/**
* The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in
* which case this may itself contain a message set. The message can be null.
*/
@Type(BYTES)
private byte[] value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.github.nginate.kafka.protocol.messages.request;

import com.github.nginate.kafka.protocol.ApiKey;
import com.github.nginate.kafka.protocol.ApiKeys;
import com.github.nginate.kafka.protocol.messages.MessageSet;
import com.github.nginate.kafka.protocol.messages.Request;
import com.github.nginate.kafka.protocol.types.Type;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;

import static com.github.nginate.kafka.protocol.types.TypeName.*;

/**
* The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended
* for many topic partitions in a single request. The produce API uses the generic message set format, but since no
* offset has been assigned to the messages at the time of the send the producer is free to fill in that field in any
* way it likes.
*/
@Data
@Builder
@ApiKey(ApiKeys.PRODUCE)
@EqualsAndHashCode(callSuper = true)
public class ProduceRequest extends Request {
/**
* This field indicates how many acknowledgements the servers should receive before responding to the request.
* If it is 0 the server will not send any response (this is the only case where the server will not reply to a
* request). If it is 1, the server will wait the data is written to the local log before sending a response.
* If it is -1 the server will block until the message is committed by all in sync replicas before sending
* a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur
* (but the server will never wait for more acknowledgements than there are in-sync replicas).
*/
@Type(INT16)
private Short requiredAcks;
/**
* This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements
* in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include
* network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are
* queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if
* the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client
* should use the socket timeout.
*/
@Type(INT32)
private Integer timeout;

@Type(WRAPPER)
private TopicProduceData[] topicProduceData;

@Data
public static class TopicProduceData {
/**
* The topic that data is being published to.
*/
@Type(STRING)
private String topic;

@Type(WRAPPER)
private PartitionProduceData[] partitionProduceData;

@Data
public static class PartitionProduceData {
/**
* The partition that data is being published to.
*/
@Type(INT32)
private Integer partition;
/**
* The size, in bytes, of the message set that follows.
*/
@Type(INT32)
private Integer messageSetSize;
/**
* A set of messages in the standard format described above.
*/
@Type(WRAPPER)
private MessageSet messageSet;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@

import static com.github.nginate.kafka.protocol.types.TypeName.STRING;

/**
* This API answers the following questions:
* What topics exist?
* How many partitions does each topic have?
* Which broker is currently the leader for each partition?
* What is the host and port for each of these brokers?
* This is the only request that can be addressed to any broker in the cluster. Since there may be many topics the
* client can give an optional list of topic names in order to only return metadata for a subset of topics. The metadata
* returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each
* partition the metadata contains the information for the leader as well as for all the replicas and the list of
* replicas that are currently in-sync. Note: If "auto.create.topics.enable" is set in the broker configuration, a topic
* metadata request will create the topic with the default replication factor and number of partitions.
*/
@Data
@Builder
@ApiKey(ApiKeys.METADATA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

import static com.github.nginate.kafka.protocol.types.TypeName.*;

/**
* The response contains metadata for each partition, with partitions grouped together by topic. This metadata refers to
* brokers by their broker id. The brokers each have a host and port.
*/
@Data
@ApiKey(ApiKeys.METADATA)
@EqualsAndHashCode(callSuper = true)
Expand All @@ -21,17 +25,17 @@ public class MetadataResponse extends Response {
@Data
public static class Broker {
@Type(INT32)
private int nodeId;
private Integer nodeId;
@Type(STRING)
private String host;
@Type(INT32)
private int port;
private Integer port;
}

@Data
public static class TopicMetadata {
@Type(INT16)
private int topicErrorCode;
private Short topicErrorCode;
@Type(STRING)
private String topicName;
@Type(WRAPPER)
Expand All @@ -40,25 +44,25 @@ public static class TopicMetadata {
@Data
public static class PartitionMetadata {
@Type(INT16)
private int partitionErrorCode;
private Short partitionErrorCode;
@Type(INT32)
private int partitionId;
private Integer partitionId;
/**
* The node id for the kafka broker currently acting as leader for this partition. If no leader exists
* because we are in the middle of a leader election this id will be -1.
*/
@Type(INT32)
private int leader;
private Integer leader;
/**
* The set of alive nodes that currently acts as slaves for the leader for this partition.
*/
@Type(INT32)
private int[] replicas;
private Integer[] replicas;
/**
* The set subset of the replicas that are "caught up" to the leader
*/
@Type(INT32)
private int[] isr;
private Integer[] isr;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.github.nginate.kafka.protocol.messages.response;

import com.github.nginate.kafka.protocol.ApiKey;
import com.github.nginate.kafka.protocol.ApiKeys;
import com.github.nginate.kafka.protocol.messages.Response;
import com.github.nginate.kafka.protocol.types.Type;
import lombok.Data;
import lombok.EqualsAndHashCode;

import static com.github.nginate.kafka.protocol.types.TypeName.*;

@Data
@ApiKey(ApiKeys.PRODUCE)
@EqualsAndHashCode(callSuper = true)
public class ProduceResponse extends Response {
@Type(WRAPPER)
private ProduceResponseData[] produceResponseData;

@Data
public static class ProduceResponseData {
/**
* The topic this response entry corresponds to.
*/
@Type(STRING)
private String topic;
@Type(WRAPPER)
private ProduceResponsePartitionData[] produceResponsePartitionData;

@Data
public static class ProduceResponsePartitionData {
/**
* The partition this response entry corresponds to.
*/
@Type(INT32)
private Integer partition;
/**
* The error from this partition, if any. Errors are given on a per-partition basis because a given
* partition may be unavailable or maintained on a different host, while others may have successfully
* accepted the produce request.
*/
@Type(INT16)
private Short errorCode;
/**
* The offset assigned to the first message in the message set appended to this partition.
*/
@Type(INT64)
private Long offset;
}
}
}

0 comments on commit 662ebf3

Please sign in to comment.