Skip to content

Commit

Permalink
Add protocol dto classes #5
Browse files Browse the repository at this point in the history
- partial sync group API
  • Loading branch information
Kindrat committed Nov 29, 2015
1 parent fe9fd79 commit 906ded9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.github.nginate.kafka.protocol.messages.request;

import com.github.nginate.kafka.protocol.messages.Request;
import com.github.nginate.kafka.protocol.types.Type;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;

import static com.github.nginate.kafka.protocol.types.TypeName.*;

/**
* The sync group request is used by the leader to assign state (e.g. partition assignments) to all members of the
* current generation. All members send SyncGroup immediately after joining the group, but only the leader provides the
* group's assignment.
* Once the group members have been stabilized by the completion of phase 1, the active leader must propagate state to
* the other members in the group. This is used in the new consumer protocol to set partition assignments. Similar to
* phase 1, all members send SyncGroup requests to the coordinator. Once group state has been provided by the leader,
* the coordinator forwards each member's state respectively in the SyncGroup response.
*/
@Data
@Builder
//@ApiKey(ApiKeys.) FIXME
@EqualsAndHashCode(callSuper = true)
public class SyncGroupRequest extends Request {
@Type(STRING)
private String groupId;
@Type(INT32)
private Integer generationId;
@Type(STRING)
private String memberId;
@Type(WRAPPER)
private GroupAssignment[] groupAssignments;

@Data
public static class GroupAssignment {
@Type(STRING)
private String memberId;
@Type(BYTES)
private byte[] memberAssignment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.github.nginate.kafka.protocol.messages.response;

import com.github.nginate.kafka.protocol.messages.Response;
import com.github.nginate.kafka.protocol.types.Type;
import lombok.Data;
import lombok.EqualsAndHashCode;

import static com.github.nginate.kafka.protocol.types.TypeName.BYTES;
import static com.github.nginate.kafka.protocol.types.TypeName.INT16;

/**
* Each member in the group will receive an assignment from the leader in the sync group response.
*/
@Data
//@ApiKey(ApiKeys.) FIXME
@EqualsAndHashCode(callSuper = true)
public class SyncGroupResponse extends Response {
@Type(INT16)
private Short errorCode;
@Type(BYTES)
private byte[] memberAssignment;
}

0 comments on commit 906ded9

Please sign in to comment.