Skip to content

Commit

Permalink
[ISSUE openmessaging#187] DLedgerClient batch append entry Support
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jul 23, 2022
1 parent a34df8d commit 577ad97
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
Expand All @@ -37,6 +38,7 @@
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
import io.openmessaging.storage.dledger.protocol.VoteResponse;
import io.openmessaging.storage.dledger.remoting.header.AppendHeader;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
this(dLedgerServer, null, null, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this(dLedgerServer, nettyServerConfig, nettyClientConfig, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.dLedgerServer = dLedgerServer;
this.memberState = dLedgerServer.getMemberState();
NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() {
Expand Down Expand Up @@ -254,7 +258,7 @@ public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throw

@Override
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
LeadershipTransferRequest request) throws Exception {
LeadershipTransferRequest request) throws Exception {
CompletableFuture<LeadershipTransferResponse> future = new CompletableFuture<>();
try {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
Expand Down Expand Up @@ -283,7 +287,7 @@ public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
}

private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request,
ChannelHandlerContext ctx) {
ChannelHandlerContext ctx) {
RemotingCommand response = null;
try {
if (t != null) {
Expand Down Expand Up @@ -325,7 +329,10 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
break;
}
case APPEND: {
AppendEntryRequest appendEntryRequest = JSON.parseObject(request.getBody(), AppendEntryRequest.class);
AppendHeader appendHeader = (AppendHeader) request.decodeCommandCustomHeader(AppendHeader.class);
AppendEntryRequest appendEntryRequest = ((appendHeader == null) | !appendHeader.isBatch()) ?
JSON.parseObject(request.getBody(), AppendEntryRequest.class) :
JSON.parseObject(request.getBody(), BatchAppendEntryRequest.class);
CompletableFuture<AppendEntryResponse> future = handleAppend(appendEntryRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
Expand Down Expand Up @@ -379,7 +386,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms",
request, x, DLedgerUtils.elapsed(start));
request, x, DLedgerUtils.elapsed(start));
}, futureExecutor);
break;
}
Expand All @@ -392,7 +399,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
// record positions to return;
long[] positions = new long[batchRequest.getBatchMsgs().size()];
DLedgerEntry resEntry = null;
// split bodys to append
// split bodies to append
int index = 0;
Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator();
while (iterator.hasNext()) {
Expand All @@ -227,7 +227,7 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
return batchAppendFuture;
}
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
" with empty bodies");
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.openmessaging.storage.dledger.ShutdownAbleThread;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
Expand All @@ -28,6 +29,8 @@
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -55,17 +58,35 @@ public DLedgerClient(String group, String peers) {
}

public AppendEntryResponse append(byte[] body) {
return batchAppend(Collections.singletonList(body));
}

public AppendEntryResponse batchAppend(List<byte[]> bodies) {

if (null == bodies || bodies.size() == 0) {
logger.warn("Batch append data is empty");
AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
appendEntryResponse.setCode(DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode());
return appendEntryResponse;
}

try {
waitOnUpdatingMetadata(1500, false);
if (leaderId == null) {
AppendEntryResponse appendEntryResponse = new AppendEntryResponse();
appendEntryResponse.setCode(DLedgerResponseCode.METADATA_ERROR.getCode());
return appendEntryResponse;
}
AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
AppendEntryRequest appendEntryRequest;
if (bodies.size() == 1) {
appendEntryRequest = new AppendEntryRequest();
appendEntryRequest.setBody(bodies.get(0));
} else {
appendEntryRequest = new BatchAppendEntryRequest();
((BatchAppendEntryRequest) appendEntryRequest).setBatchMsgs(bodies);
}
appendEntryRequest.setGroup(group);
appendEntryRequest.setRemoteId(leaderId);
appendEntryRequest.setBody(body);
AppendEntryResponse response = dLedgerClientRpcService.append(appendEntryRequest).get();
if (response.getCode() == DLedgerResponseCode.NOT_LEADER.getCode()) {
waitOnUpdatingMetadata(1500, true);
Expand All @@ -82,6 +103,7 @@ public AppendEntryResponse append(byte[] body) {
appendEntryResponse.setCode(DLedgerResponseCode.INTERNAL_ERROR.getCode());
return appendEntryResponse;
}

}

public GetEntriesResponse get(long index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import com.alibaba.fastjson.JSON;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode;
import io.openmessaging.storage.dledger.protocol.GetEntriesRequest;
import io.openmessaging.storage.dledger.protocol.GetEntriesResponse;
import io.openmessaging.storage.dledger.protocol.MetadataRequest;
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse;
import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest;
import io.openmessaging.storage.dledger.remoting.header.AppendHeader;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand All @@ -41,7 +43,13 @@ public DLedgerClientRpcNettyService() {

@Override
public CompletableFuture<AppendEntryResponse> append(AppendEntryRequest request) throws Exception {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.APPEND.getCode(), null);

AppendHeader appendHeader = null;
if (request instanceof BatchAppendEntryRequest) {
appendHeader = new AppendHeader();
appendHeader.setBatch(true);
}
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.APPEND.getCode(), appendHeader);
wrapperRequest.setBody(JSON.toJSONBytes(request));
RemotingCommand wrapperResponse = this.remotingClient.invokeSync(getPeerAddr(request.getRemoteId()), wrapperRequest, 3000);
AppendEntryResponse response = JSON.parseObject(wrapperResponse.getBody(), AppendEntryResponse.class);
Expand All @@ -58,7 +66,8 @@ public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) thr
}

@Override
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(LeadershipTransferRequest request) throws Exception {
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
LeadershipTransferRequest request) throws Exception {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
wrapperRequest.setBody(JSON.toJSONBytes(request));
RemotingCommand wrapperResponse = this.remotingClient.invokeSync(getPeerAddr(request.getRemoteId()), wrapperRequest, 10000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

import com.alibaba.fastjson.JSON;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Parameters(commandDescription = "append data to DLedger server")
public class AppendCommand extends BaseCommand {

private static Logger logger = LoggerFactory.getLogger(AppendCommand.class);
Expand All @@ -34,18 +39,32 @@ public class AppendCommand extends BaseCommand {
private String peers = "n0-localhost:20911";

@Parameter(names = {"--data", "-d"}, description = "the data to append")
private String data = "Hello";
private List<String> data = new ArrayList<>();

@Parameter(names = {"--count", "-c"}, description = "append several times")
private int count = 1;

@Override
public void doCommand() {

if (null == data || data.isEmpty()) {
logger.warn("Not data append to dledger server");
return;
}
DLedgerClient dLedgerClient = new DLedgerClient(group, peers);
dLedgerClient.startup();
for (int i = 0; i < count; i++) {
AppendEntryResponse response = dLedgerClient.append(data.getBytes());
logger.info("Append Result:{}", JSON.toJSONString(response));
if (data.size() == 1) {
byte[] dataBytes = data.get(0).getBytes();
for (int i = 0; i < count; i++) {
AppendEntryResponse response = dLedgerClient.append(dataBytes);
logger.info("Append Result:{}", JSON.toJSONString(response));
}
} else {
List<byte[]> dataList = data.stream().map(String::getBytes).collect(Collectors.toList());
for (int i = 0; i < count; i++) {
AppendEntryResponse response = dLedgerClient.batchAppend(dataList);
logger.info("Append Result:{}", JSON.toJSONString(response));
}
}
dLedgerClient.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2017-2022 The DLedger Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.openmessaging.storage.dledger.remoting.header;

import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;

public class AppendHeader implements CommandCustomHeader {

private boolean batch = false;

public boolean isBatch() {
return batch;
}

public void setBatch(boolean batch) {
this.batch = batch;
}

@Override
public void checkFields() throws RemotingCommandException {
//nothing to do
}
}
Loading

0 comments on commit 577ad97

Please sign in to comment.