Skip to content

Commit

Permalink
Optimize code in QuorumAckChecker (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
rushsky518 authored Jul 17, 2020
1 parent 99990f8 commit a35f0d8
Showing 1 changed file with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public class DLedgerEntryPusher {
private Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap<>();
private Map<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>> pendingAppendResponsesByTerm = new ConcurrentHashMap<>();

private EntryHandler entryHandler = new EntryHandler(logger);
private EntryHandler entryHandler;

private QuorumAckChecker quorumAckChecker = new QuorumAckChecker(logger);
private QuorumAckChecker quorumAckChecker;

private Map<String, EntryDispatcher> dispatcherMap = new HashMap<>();

Expand All @@ -77,6 +77,8 @@ public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState,
dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
}
}
this.entryHandler = new EntryHandler(logger);
this.quorumAckChecker = new QuorumAckChecker(logger);
}

public void startup() {
Expand Down Expand Up @@ -176,7 +178,7 @@ private class QuorumAckChecker extends ShutdownAbleThread {
private long lastQuorumIndex = -1;

public QuorumAckChecker(Logger logger) {
super("QuorumAckChecker", logger);
super("QuorumAckChecker-" + memberState.getSelfId(), logger);
}

@Override
Expand Down Expand Up @@ -231,26 +233,24 @@ public void doWork() {
ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);
boolean needCheck = false;
int ackNum = 0;
if (quorumIndex >= 0) {
for (Long i = quorumIndex; i >= 0; i--) {
try {
CompletableFuture<AppendEntryResponse> future = responses.remove(i);
if (future == null) {
needCheck = lastQuorumIndex != -1 && lastQuorumIndex != quorumIndex && i != lastQuorumIndex;
break;
} else if (!future.isDone()) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setTerm(currTerm);
response.setIndex(i);
response.setLeaderId(memberState.getSelfId());
response.setPos(((AppendFuture) future).getPos());
future.complete(response);
}
ackNum++;
} catch (Throwable t) {
logger.error("Error in ack to index={} term={}", i, currTerm, t);
for (Long i = quorumIndex; i > lastQuorumIndex; i--) {
try {
CompletableFuture<AppendEntryResponse> future = responses.remove(i);
if (future == null) {
needCheck = true;
break;
} else if (!future.isDone()) {
AppendEntryResponse response = new AppendEntryResponse();
response.setGroup(memberState.getGroup());
response.setTerm(currTerm);
response.setIndex(i);
response.setLeaderId(memberState.getSelfId());
response.setPos(((AppendFuture) future).getPos());
future.complete(response);
}
ackNum++;
} catch (Throwable t) {
logger.error("Error in ack to index={} term={}", i, currTerm, t);
}
}

Expand Down Expand Up @@ -753,7 +753,7 @@ private class EntryHandler extends ShutdownAbleThread {
BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests = new ArrayBlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>(100);

public EntryHandler(Logger logger) {
super("EntryHandler", logger);
super("EntryHandler-" + memberState.getSelfId(), logger);
}

public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {
Expand Down

0 comments on commit a35f0d8

Please sign in to comment.