Skip to content

Commit

Permalink
[ISSUE #176] Realize the ability to read from the replica node,Optimi…
Browse files Browse the repository at this point in the history
…ze read performance
  • Loading branch information
mxsm committed Jul 13, 2022
1 parent 7d10d2f commit d70d6e4
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PullIndexRequest;
import io.openmessaging.storage.dledger.protocol.PullIndexResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
Expand Down Expand Up @@ -77,11 +79,13 @@ public DLedgerRpcNettyService(DLedgerServer dLedgerServer) {
this(dLedgerServer, null, null, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this(dLedgerServer, nettyServerConfig, nettyClientConfig, null);
}

public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig, NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
public DLedgerRpcNettyService(DLedgerServer dLedgerServer, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
this.dLedgerServer = dLedgerServer;
this.memberState = dLedgerServer.getMemberState();
NettyRequestProcessor protocolProcessor = new NettyRequestProcessor() {
Expand Down Expand Up @@ -109,6 +113,7 @@ public boolean rejectRequest() {
this.remotingServer.registerProcessor(DLedgerRequestCode.VOTE.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null);
this.remotingServer.registerProcessor(DLedgerRequestCode.PULL_INDEX.getCode(), protocolProcessor, null);

//start the remoting client
if (nettyClientConfig == null) {
Expand Down Expand Up @@ -252,9 +257,29 @@ public CompletableFuture<PushEntryResponse> push(PushEntryRequest request) throw
return future;
}

@Override
public CompletableFuture<PullIndexResponse> pullIndex(PullIndexRequest request) throws Exception {
CompletableFuture<PullIndexResponse> future = new CompletableFuture<>();
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.PULL_INDEX.getCode(), null);
wrapperRequest.setBody(JSON.toJSONBytes(request));
remotingClient.invokeAsync(getPeerAddr(request), wrapperRequest, 3000, responseFuture -> {
RemotingCommand responseCommand = responseFuture.getResponseCommand();
PullIndexResponse response;
if (null != responseCommand) {
response = JSON.parseObject(responseCommand.getBody(), PullIndexResponse.class);
} else {
response = new PullIndexResponse();
response.copyBaseInfo(request);
response.setCode(DLedgerResponseCode.NETWORK_ERROR.getCode());
}
future.complete(response);
});
return future;
}

@Override
public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
LeadershipTransferRequest request) throws Exception {
LeadershipTransferRequest request) throws Exception {
CompletableFuture<LeadershipTransferResponse> future = new CompletableFuture<>();
try {
RemotingCommand wrapperRequest = RemotingCommand.createRequestCommand(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), null);
Expand Down Expand Up @@ -283,7 +308,7 @@ public CompletableFuture<LeadershipTransferResponse> leadershipTransfer(
}

private void writeResponse(RequestOrResponse storeResp, Throwable t, RemotingCommand request,
ChannelHandlerContext ctx) {
ChannelHandlerContext ctx) {
RemotingCommand response = null;
try {
if (t != null) {
Expand Down Expand Up @@ -319,57 +344,43 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
case METADATA: {
MetadataRequest metadataRequest = JSON.parseObject(request.getBody(), MetadataRequest.class);
CompletableFuture<MetadataResponse> future = handleMetadata(metadataRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case APPEND: {
AppendEntryRequest appendEntryRequest = JSON.parseObject(request.getBody(), AppendEntryRequest.class);
CompletableFuture<AppendEntryResponse> future = handleAppend(appendEntryRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case GET: {
GetEntriesRequest getEntriesRequest = JSON.parseObject(request.getBody(), GetEntriesRequest.class);
CompletableFuture<GetEntriesResponse> future = handleGet(getEntriesRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case PULL: {
PullEntriesRequest pullEntriesRequest = JSON.parseObject(request.getBody(), PullEntriesRequest.class);
CompletableFuture<PullEntriesResponse> future = handlePull(pullEntriesRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case PUSH: {
PushEntryRequest pushEntryRequest = JSON.parseObject(request.getBody(), PushEntryRequest.class);
CompletableFuture<PushEntryResponse> future = handlePush(pushEntryRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case VOTE: {
VoteRequest voteRequest = JSON.parseObject(request.getBody(), VoteRequest.class);
CompletableFuture<VoteResponse> future = handleVote(voteRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case HEART_BEAT: {
HeartBeatRequest heartBeatRequest = JSON.parseObject(request.getBody(), HeartBeatRequest.class);
CompletableFuture<HeartBeatResponse> future = handleHeartBeat(heartBeatRequest);
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
}, futureExecutor);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
case LEADERSHIP_TRANSFER: {
Expand All @@ -379,10 +390,16 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
future.whenCompleteAsync((x, y) -> {
writeResponse(x, y, request, ctx);
logger.info("LEADERSHIP_TRANSFER FINISHED. Request={}, response={}, cost={}ms",
request, x, DLedgerUtils.elapsed(start));
request, x, DLedgerUtils.elapsed(start));
}, futureExecutor);
break;
}
case PULL_INDEX: {
PullIndexRequest pullIndexRequest = JSON.parseObject(request.getBody(), PullIndexRequest.class);
CompletableFuture<PullIndexResponse> future = handlePullIndex(pullIndexRequest);
future.whenCompleteAsync((x, y) -> writeResponse(x, y, request, ctx), futureExecutor);
break;
}
default:
logger.error("Unknown request code {} from {}", request.getCode(), request);
break;
Expand All @@ -392,7 +409,7 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
LeadershipTransferRequest leadershipTransferRequest) throws Exception {
return dLedgerServer.handleLeadershipTransfer(leadershipTransferRequest);
}

Expand Down Expand Up @@ -432,6 +449,11 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
return dLedgerServer.handlePush(request);
}

@Override
public CompletableFuture<PullIndexResponse> handlePullIndex(PullIndexRequest request) throws Exception {
return dLedgerServer.handlePullIndex(request);
}

public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) {
RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null);
remotingCommand.setBody(JSON.toJSONBytes(response));
Expand Down
110 changes: 99 additions & 11 deletions src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.openmessaging.storage.dledger.protocol.MetadataResponse;
import io.openmessaging.storage.dledger.protocol.PullEntriesRequest;
import io.openmessaging.storage.dledger.protocol.PullEntriesResponse;
import io.openmessaging.storage.dledger.protocol.PullIndexRequest;
import io.openmessaging.storage.dledger.protocol.PullIndexResponse;
import io.openmessaging.storage.dledger.protocol.PushEntryRequest;
import io.openmessaging.storage.dledger.protocol.PushEntryResponse;
import io.openmessaging.storage.dledger.protocol.VoteRequest;
Expand All @@ -44,19 +46,17 @@
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.PreConditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand Down Expand Up @@ -211,7 +211,7 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
// record positions to return;
long[] positions = new long[batchRequest.getBatchMsgs().size()];
DLedgerEntry resEntry = null;
// split bodys to append
// split bodies to append
int index = 0;
Iterator<byte[]> iterator = batchRequest.getBatchMsgs().iterator();
while (iterator.hasNext()) {
Expand All @@ -226,8 +226,8 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
batchAppendFuture.setPositions(positions);
return batchAppendFuture;
}
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODYS, "BatchAppendEntryRequest" +
" with empty bodys");
throw new DLedgerException(DLedgerResponseCode.REQUEST_WITH_EMPTY_BODIES, "BatchAppendEntryRequest" +
" with empty bodies");
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
dLedgerEntry.setBody(request.getBody());
Expand All @@ -246,16 +246,59 @@ public CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest re
}

@Override
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws IOException {
public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);
DLedgerEntry entry = dLedgerStore.get(request.getBeginIndex());
PreConditions.check(!memberState.isCandidate(), DLedgerResponseCode.IS_CANDIDATE);
GetEntriesResponse response = new GetEntriesResponse();
response.setGroup(memberState.getGroup());
if (entry != null) {
response.setEntries(Collections.singletonList(entry));
Long requestIndex = request.getBeginIndex();
if (memberState.isFollower()) {
//Get from follower
if (requestIndex <= memberState.getLedgerEndIndex()) {
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
}

// when requestIndex greater than ledgerEndIndex then send pull ledgerEndIndex request to leader
PullIndexRequest indexRequest = new PullIndexRequest();
indexRequest.setGroup(request.getGroup());
indexRequest.setRemoteId(memberState.getLeaderId());
CompletableFuture<PullIndexResponse> future = dLedgerRpcService.pullIndex(indexRequest);
PullIndexResponse pullIndexResponse = future.get();
if (pullIndexResponse.getCode() != DLedgerResponseCode.SUCCESS.getCode()) {
response.copyBaseInfo(request);
response.setLeaderId(memberState.getLeaderId());
response.setCode(pullIndexResponse.getCode());
return CompletableFuture.completedFuture(response);
}

long leaderEndIndex = pullIndexResponse.getEndIndex();
if (requestIndex > leaderEndIndex) {
response.copyBaseInfo(request);
response.setLeaderId(memberState.getLeaderId());
response.setCode(DLedgerResponseCode.INDEX_OUT_OF_RANGE.getCode());
return CompletableFuture.completedFuture(response);
}

if (leaderEndIndex <= memberState.getLedgerEndIndex()) {
getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
}

//wait for follower ledgerEndIndex to update
if (!waitEndIndex2Update(2, TimeUnit.SECONDS, requestIndex)) {
logger.warn("update follower[{}] ledgerEndIndex time out", memberState.getSelfId());
response.setCode(DLedgerResponseCode.FOLLOWER_UPDATE_END_INDEX_TIMEOUT.getCode());
return CompletableFuture.completedFuture(response);
}

getEntry(response, requestIndex);
return CompletableFuture.completedFuture(response);
} else {
//get from leader
getEntry(response, requestIndex);
}
return CompletableFuture.completedFuture(response);
} catch (DLedgerException e) {
Expand All @@ -268,6 +311,13 @@ public CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request
}
}

private void getEntry(GetEntriesResponse response, Long requestIndex) {
DLedgerEntry entry = dLedgerStore.get(requestIndex);
if (entry != null) {
response.setEntries(Collections.singletonList(entry));
}
}

@Override
public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest request) throws Exception {
try {
Expand Down Expand Up @@ -311,6 +361,29 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)

}

@Override
public CompletableFuture<PullIndexResponse> handlePullIndex(PullIndexRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);

PullIndexResponse response = new PullIndexResponse();
response.setGroup(memberState.getGroup());
response.setLeaderId(memberState.getLeaderId());
response.setEndIndex(memberState.getLedgerEndIndex());

return CompletableFuture.completedFuture(response);
} catch (DLedgerException e) {
logger.error("[{}][HandlePullIndex] failed", memberState.getSelfId(), e);
PullIndexResponse response = new PullIndexResponse();
response.copyBaseInfo(request);
response.setCode(e.getCode().getCode());
response.setLeaderId(memberState.getLeaderId());
return CompletableFuture.completedFuture(response);
}
}

@Override
public CompletableFuture<LeadershipTransferResponse> handleLeadershipTransfer(
LeadershipTransferRequest request) throws Exception {
Expand Down Expand Up @@ -486,4 +559,19 @@ public NettyRemotingClient getRemotingClient() {
return null;
}

private boolean waitEndIndex2Update(long maxWaitTime, TimeUnit unit, long index) {
long maxWaitMs = unit.toMillis(maxWaitTime);
long start = System.currentTimeMillis();
while (DLedgerUtils.elapsed(start) < maxWaitMs) {
try {
DLedgerUtils.sleep(1);
if (index <= memberState.getLedgerEndIndex()) {
return true;
}
} catch (Exception e) {
break;
}
}
return false;
}
}
Loading

0 comments on commit d70d6e4

Please sign in to comment.