Skip to content

Commit

Permalink
Add protocol dto classes #5
Browse files Browse the repository at this point in the history
- join group API
  • Loading branch information
Kindrat committed Nov 29, 2015
1 parent adc7a20 commit fe9fd79
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/main/java/com/github/nginate/kafka/protocol/ApiKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ public enum ApiKeys {
*/
OFFSET_FETCH(9),
CONSUMER_METADATA(10),
/**
* The purpose of the initial phase is to set the active members of the group. This protocol has similar semantics
* as in the initial consumer rewrite design. After finding the coordinator for the group, each member sends a
* JoinGroup request containing member-specific metadata. The join group request will park at the coordinator until
* all expected members have sent their own join group requests ("expected" in this case means all members that were
* part of the previous generation). Once they have done so, the coordinator randomly selects a leader from the
* group and sends JoinGroup responses to all the pending requests.
* The JoinGroup request contains an array with the group protocols that it supports along with member-specific
* metadata. This is basically used to ensure compatibility of group member metadata within the group. The
* coordinator chooses a protocol which is supported by all members of the group and returns it in the respective
* JoinGroup responses. If a member joins and doesn't support any of the protocols used by the rest of the group,
* then it will be rejected. This mechanism provides a way to update protocol metadata to a new format in a rolling
* upgrade scenario. The newer version will provide metadata for the new protocol and for the old protocol, and the
* coordinator will choose the old protocol until all members have been upgraded.
*/
JOIN_GROUP(11),
HEARTBEAT(12);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.github.nginate.kafka.protocol.messages.request;

import com.github.nginate.kafka.protocol.ApiKey;
import com.github.nginate.kafka.protocol.ApiKeys;
import com.github.nginate.kafka.protocol.messages.Request;
import com.github.nginate.kafka.protocol.types.Type;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;

import static com.github.nginate.kafka.protocol.types.TypeName.*;

/**
* The purpose of the initial phase is to set the active members of the group. This protocol has similar semantics as in
* the initial consumer rewrite design. After finding the coordinator for the group, each member sends a JoinGroup
* request containing member-specific metadata. The join group request will park at the coordinator until all expected
* members have sent their own join group requests ("expected" in this case means all members that were part of the
* previous generation). Once they have done so, the coordinator randomly selects a leader from the group and sends
* JoinGroup responses to all the pending requests.
* The JoinGroup request contains an array with the group protocols that it supports along with member-specific
* metadata. This is basically used to ensure compatibility of group member metadata within the group. The coordinator
* chooses a protocol which is supported by all members of the group and returns it in the respective JoinGroup
* responses. If a member joins and doesn't support any of the protocols used by the rest of the group, then it will be
* rejected. This mechanism provides a way to update protocol metadata to a new format in a rolling upgrade scenario.
* The newer version will provide metadata for the new protocol and for the old protocol, and the coordinator will
* choose the old protocol until all members have been upgraded.
* The JoinGroup response includes an array for the members of the group along with their metadata. This is only
* populated for the leader to reduce the overall overhead of the protocol; for other members, it will be empty. The is
* used by the leader to prepare member state for phase 2. In the case of the consumer, this allows the leader to
* collect the subscriptions from all members and set the partition assignment. The member metadata returned in the join
* group response corresponds to the respective metadata provided in the join group request for the group protocol
* chosen by the coordinator.
*/
@Data
@Builder
@ApiKey(ApiKeys.JOIN_GROUP)
@EqualsAndHashCode(callSuper = true)
public class JoinGroupRequest extends Request {
@Type(STRING)
private String groupId;
@Type(INT32)
private Integer sessionTimeout;
@Type(STRING)
private String memberId;
@Type(STRING)
private String protocolType;
@Type(WRAPPER)
private GroupProtocols[] groupProtocols;

@Data
public static class GroupProtocols {
@Type(STRING)
private String protocolType;
@Type(BYTES)
private byte[] protocolMetadata;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.github.nginate.kafka.protocol.messages.response;

import com.github.nginate.kafka.protocol.ApiKey;
import com.github.nginate.kafka.protocol.ApiKeys;
import com.github.nginate.kafka.protocol.messages.Response;
import com.github.nginate.kafka.protocol.types.Type;
import lombok.Data;
import lombok.EqualsAndHashCode;

import static com.github.nginate.kafka.protocol.types.TypeName.*;

/**
* The JoinGroup response includes an array for the members of the group along with their metadata. This is only
* populated for the leader to reduce the overall overhead of the protocol; for other members, it will be empty. The is
* used by the leader to prepare member state for phase 2. In the case of the consumer, this allows the leader to
* collect the subscriptions from all members and set the partition assignment. The member metadata returned in the join
* group response corresponds to the respective metadata provided in the join group request for the group protocol
* chosen by the coordinator.
*/
@Data
@ApiKey(ApiKeys.JOIN_GROUP)
@EqualsAndHashCode(callSuper = true)
public class JoinGroupResponse extends Response {
@Type(INT16)
private Short errorCode;
@Type(INT32)
private Integer generationId;
@Type(STRING)
private String groupProtocol;
@Type(STRING)
private String leaderId;
@Type(STRING)
private String memberId;
@Type(WRAPPER)
private Member[] members;

@Data
public static class Member{
@Type(STRING)
private String memberId;
@Type(BYTES)
private byte[] memberMetadata;
}
}

0 comments on commit fe9fd79

Please sign in to comment.