diff --git a/src/main/java/com/github/nginate/kafka/protocol/messages/request/TopicMetadataRequest.java b/src/main/java/com/github/nginate/kafka/protocol/messages/request/TopicMetadataRequest.java new file mode 100644 index 0000000..191e260 --- /dev/null +++ b/src/main/java/com/github/nginate/kafka/protocol/messages/request/TopicMetadataRequest.java @@ -0,0 +1,18 @@ +package com.github.nginate.kafka.protocol.messages.request; + +import com.github.nginate.kafka.protocol.messages.Request; +import com.github.nginate.kafka.protocol.types.Type; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import static com.github.nginate.kafka.protocol.types.TypeName.STRING; + +@Data +@EqualsAndHashCode(callSuper = true) +public class TopicMetadataRequest extends Request { + /** + * The topics to produce metadata for. If empty the request will yield metadata for all topics. + */ + @Type(STRING) + private String[] topic; +} diff --git a/src/main/java/com/github/nginate/kafka/protocol/messages/response/MetadataResponse.java b/src/main/java/com/github/nginate/kafka/protocol/messages/response/MetadataResponse.java new file mode 100644 index 0000000..c52b73b --- /dev/null +++ b/src/main/java/com/github/nginate/kafka/protocol/messages/response/MetadataResponse.java @@ -0,0 +1,61 @@ +package com.github.nginate.kafka.protocol.messages.response; + +import com.github.nginate.kafka.protocol.messages.Response; +import com.github.nginate.kafka.protocol.types.Type; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import static com.github.nginate.kafka.protocol.types.TypeName.*; + +@Data +@EqualsAndHashCode(callSuper = true) +public class MetadataResponse extends Response { + @Type(WRAPPER) + private Broker[] brokers; + @Type(WRAPPER) + private TopicMetadata[] topicMetadata; + + @Data + public static class Broker { + @Type(INT32) + private int nodeId; + @Type(STRING) + private String host; + @Type(INT32) + private int port; + } + + @Data + public static class TopicMetadata { + @Type(INT16) + private int topicErrorCode; + @Type(STRING) + private String topicName; + @Type(WRAPPER) + private PartitionMetadata[] partitionMetadata; + + @Data + public static class PartitionMetadata { + @Type(INT16) + private int partitionErrorCode; + @Type(INT32) + private int partitionId; + /** + * The node id for the kafka broker currently acting as leader for this partition. If no leader exists + * because we are in the middle of a leader election this id will be -1. + */ + @Type(INT32) + private int leader; + /** + * The set of alive nodes that currently acts as slaves for the leader for this partition. + */ + @Type(INT32) + private int[] replicas; + /** + * The set subset of the replicas that are "caught up" to the leader + */ + @Type(INT32) + private int[] isr; + } + } +} diff --git a/src/main/java/com/github/nginate/kafka/protocol/types/Type.java b/src/main/java/com/github/nginate/kafka/protocol/types/Type.java index 0f41f3f..3ffcd7c 100644 --- a/src/main/java/com/github/nginate/kafka/protocol/types/Type.java +++ b/src/main/java/com/github/nginate/kafka/protocol/types/Type.java @@ -7,5 +7,4 @@ @Documented public @interface Type { TypeName value(); - boolean repeatable() default false; } diff --git a/src/main/java/com/github/nginate/kafka/protocol/types/TypeName.java b/src/main/java/com/github/nginate/kafka/protocol/types/TypeName.java index ee66535..676dc8b 100644 --- a/src/main/java/com/github/nginate/kafka/protocol/types/TypeName.java +++ b/src/main/java/com/github/nginate/kafka/protocol/types/TypeName.java @@ -24,5 +24,9 @@ public enum TypeName { /** * Consist of a signed int32 giving a length N followed by N bytes of content. A length of -1 indicates null. */ - BYTES + BYTES, + /** + * Used to mark utility objects, wrapping actual binary data for protocol + */ + WRAPPER }