Skip to content

Commit

Permalink
AKCORE-1: ShareGroupHeartbeat support in group coordinator (1/N) (apa…
Browse files Browse the repository at this point in the history
…che#1006)

* AKCORE-1: ShareGroupHeartbeat support in group coordinator (1/N)

* Merged upstream changes and fixed core adapter
  • Loading branch information
apoorvmittal10 authored Feb 6, 2024
1 parent 01f4106 commit 9fc3996
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Implicits.MapExtensionMethods
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.message.{ConsumerGroupDescribeResponseData, ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchResponseData, ShareGroupHeartbeatRequestData, ShareGroupHeartbeatResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData}
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch
Expand Down Expand Up @@ -74,6 +74,15 @@ private[group] class GroupCoordinatorAdapter(
))
}

override def shareGroupHeartbeat(
context: RequestContext,
request: ShareGroupHeartbeatRequestData
): CompletableFuture[ShareGroupHeartbeatResponseData] = {
FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
s"The old group coordinator does not support ${ApiKeys.SHARE_GROUP_HEARTBEAT.name} API."
))
}

override def joinGroup(
context: RequestContext,
request: JoinGroupRequestData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public interface Group {
enum GroupType {
CONSUMER("consumer"),
CLASSIC("classic"),
UNKNOWN("unknown");
UNKNOWN("unknown"),
SHARE("share");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
Expand Down Expand Up @@ -73,6 +75,20 @@ CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartbeat(
ConsumerGroupHeartbeatRequestData request
);

/**
* Heartbeat to a Share Group.
*
* @param context The request context.
* @param request The ShareGroupHeartbeatResponse data.
*
* @return A future yielding the response.
* The error code(s) of the response are set to indicate the error(s) occurred during the execution.
*/
CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
);

/**
* Join a Classic Group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.ConsumerGroupDescribeRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
Expand All @@ -58,11 +60,11 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.metrics.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.image.MetadataDelta;
Expand Down Expand Up @@ -304,6 +306,35 @@ public CompletableFuture<ConsumerGroupHeartbeatResponseData> consumerGroupHeartb
));
}

/**
* See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}.
*/
@Override
public CompletableFuture<ShareGroupHeartbeatResponseData> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
) {
if (!isActive.get()) {
return CompletableFuture.completedFuture(new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
);
}

return runtime.scheduleWriteOperation(
"share-group-heartbeat",
topicPartitionFor(request.groupId()),
Duration.ofMillis(config.offsetCommitTimeoutMs),
coordinator -> coordinator.shareGroupHeartbeat(context, request)
).exceptionally(exception -> handleOperationException(
"share-group-heartbeat",
request,
exception,
(error, message) -> new ShareGroupHeartbeatResponseData()
.setErrorCode(error.code())
.setErrorMessage(message)
));
}

/**
* See {@link GroupCoordinator#joinGroup(RequestContext, JoinGroupRequestData, BufferSupplier)}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.errors.ApiException;
Expand Down Expand Up @@ -306,6 +308,22 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGro
return groupMetadataManager.consumerGroupHeartbeat(context, request);
}

/**
* Handles a ShareGroupHeartbeat request.
*
* @param context The request context.
* @param request The actual ShareGroupHeartbeat request.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
*/
public CoordinatorResult<ShareGroupHeartbeatResponseData, Record> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
) {
return groupMetadataManager.shareGroupHeartbeat(context, request);
}

/**
* Handles a JoinGroup request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
Expand Down Expand Up @@ -823,6 +826,34 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
}
}

/**
* Validates the ShareGroupHeartbeat request.
*
* @param request The request to validate.
*
* @throws InvalidRequestException if the request is not valid.
* @throws UnsupportedAssignorException if the assignor is not supported.
*/
private void throwIfShareGroupHeartbeatRequestIsInvalid(
ShareGroupHeartbeatRequestData request
) throws InvalidRequestException, UnsupportedAssignorException {
throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
throwIfEmptyString(request.rackId(), "RackId can't be empty.");

if (request.memberEpoch() > 0 || request.memberEpoch() == ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
} else if (request.memberEpoch() == 0) {
if (request.rebalanceTimeoutMs() == -1) {
throw new InvalidRequestException("RebalanceTimeoutMs must be provided in first request.");
}
if (request.subscribedTopicNames() == null || request.subscribedTopicNames().isEmpty()) {
throw new InvalidRequestException("SubscribedTopicNames must be set in first request.");
}
} else {
throw new InvalidRequestException("MemberEpoch is invalid.");
}
}

/**
* Verifies that the partitions currently owned by the member (the ones set in the
* request) matches the ones that the member should own. It matches if the consumer
Expand Down Expand Up @@ -882,7 +913,7 @@ private void throwIfConsumerGroupIsFull(
* @throws FencedMemberEpochException if the provided epoch is ahead or behind the epoch known
* by this coordinator.
*/
private void throwIfMemberEpochIsInvalid(
private void throwIfConsumerGroupMemberEpochIsInvalid(
ConsumerGroupMember member,
int receivedMemberEpoch,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
Expand Down Expand Up @@ -955,14 +986,14 @@ private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, Stri
}
}

private ConsumerGroupHeartbeatResponseData.Assignment createResponseAssignment(
private ConsumerGroupHeartbeatResponseData.Assignment createConsumerGroupResponseAssignment(
ConsumerGroupMember member
) {
return new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(fromAssignmentMap(member.assignedPartitions()));
.setTopicPartitions(fromConsumerGroupAssignmentMap(member.assignedPartitions()));
}

private List<ConsumerGroupHeartbeatResponseData.TopicPartitions> fromAssignmentMap(
private List<ConsumerGroupHeartbeatResponseData.TopicPartitions> fromConsumerGroupAssignmentMap(
Map<Uuid, Set<Integer>> assignment
) {
return assignment.entrySet().stream()
Expand Down Expand Up @@ -1025,7 +1056,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
boolean staticMemberReplaced = false;
if (instanceId == null) {
member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
throwIfConsumerGroupMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
if (createIfNotExists) {
log.info("[GroupId {}] Member {} joins the consumer group.", groupId, memberId);
}
Expand Down Expand Up @@ -1053,7 +1084,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
} else {
throwIfStaticMemberIsUnknown(member, instanceId);
throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
throwIfConsumerGroupMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
updatedMemberBuilder = new ConsumerGroupMember.Builder(member);
}
}
Expand Down Expand Up @@ -1212,12 +1243,41 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
// 2. The member just joined or rejoined to group (epoch equals to zero);
// 3. The member's assignment has been updated.
if (ownedTopicPartitions != null || memberEpoch == 0 || assignmentUpdated) {
response.setAssignment(createResponseAssignment(updatedMember));
response.setAssignment(createConsumerGroupResponseAssignment(updatedMember));
}

return new CoordinatorResult<>(records, response);
}

/**
* Handles a ShareGroupHeartbeat request.
*
* @param groupId The group id from the request.
* @param memberId The member id from the request.
* @param memberEpoch The member epoch from the request.
* @param rackId The rack id from the request or null.
* @param rebalanceTimeoutMs The rebalance timeout from the request or -1.
* @param clientId The client id.
* @param clientHost The client host.
* @param subscribedTopicNames The list of subscribed topic names from the request or null.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
*/
private CoordinatorResult<ShareGroupHeartbeatResponseData, Record> shareGroupHeartbeat(
String groupId,
String memberId,
int memberEpoch,
String rackId,
int rebalanceTimeoutMs,
String clientId,
String clientHost,
List<String> subscribedTopicNames
) throws ApiException {
// TODO: Implement ShareGroupHeartbeat
throw new UnsupportedOperationException("ShareGroupHeartbeat is not supported yet.");
}

private void removeMemberAndCancelTimers(
List<Record> records,
String groupId,
Expand Down Expand Up @@ -1269,6 +1329,24 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
.setMemberEpoch(memberEpoch));
}

/**
* Handles leave request from a share group member.
* @param groupId The group id from the request.
* @param memberId The member id from the request.
* @param memberEpoch The member epoch from the request.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
*/
private CoordinatorResult<ShareGroupHeartbeatResponseData, Record> shareGroupLeave(
String groupId,
String memberId,
int memberEpoch
) throws ApiException {
// TODO: Implement this method.
throw new UnsupportedOperationException("Share group leave is not yet implemented.");
}

/**
* Handles the case when a static member decides to leave the group.
* The member is not actually fenced from the group, and instead it's
Expand Down Expand Up @@ -1496,6 +1574,39 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGro
}
}

/**
* Handles a ShareGroupHeartbeat request.
*
* @param context The request context.
* @param request The actual ShareGroupHeartbeat request.
*
* @return A Result containing the ShareGroupHeartbeat response and
* a list of records to update the state machine.
*/
public CoordinatorResult<ShareGroupHeartbeatResponseData, Record> shareGroupHeartbeat(
RequestContext context,
ShareGroupHeartbeatRequestData request
) throws ApiException {
throwIfShareGroupHeartbeatRequestIsInvalid(request);
if (request.memberEpoch() == ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH) {
// -1 means that the member wants to leave the group.
return shareGroupLeave(
request.groupId(),
request.memberId(),
request.memberEpoch());
}
// Otherwise, it is a regular heartbeat.
return shareGroupHeartbeat(
request.groupId(),
request.memberId(),
request.memberEpoch(),
request.rackId(),
request.rebalanceTimeoutMs(),
context.clientId(),
context.clientAddress.toString(),
request.subscribedTopicNames());
}

/**
* Replays ConsumerGroupMemberMetadataKey/Value to update the hard state of
* the consumer group. It updates the subscription part of the member or
Expand Down
Loading

0 comments on commit 9fc3996

Please sign in to comment.