Skip to content

Commit

Permalink
[region migration] Fix IoTConsensus data consistency during region me…
Browse files Browse the repository at this point in the history
…mber changes (apache#14084)
  • Loading branch information
HxpSerein authored Nov 15, 2024
1 parent f15ebf3 commit 44b348c
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,24 +303,24 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
logger.info("[IoTConsensus] inactivate new peer: {}", peer);
impl.inactivePeer(peer, false);

// step 2: take snapshot
// step 2: notify all the other Peers to build the sync connection to newPeer
logger.info("[IoTConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);

// step 3: take snapshot
logger.info("[IoTConsensus] start to take snapshot...");
impl.checkAndLockSafeDeletedSearchIndex();

impl.takeSnapshot();

// step 3: transit snapshot
// step 4: transit snapshot
logger.info("[IoTConsensus] start to transmit snapshot...");
impl.transmitSnapshot(peer);

// step 4: let the new peer load snapshot
// step 5: let the new peer load snapshot
logger.info("[IoTConsensus] trigger new peer to load snapshot...");
impl.triggerSnapshotLoad(peer);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);

// step 5: notify all the other Peers to build the sync connection to newPeer
logger.info("[IoTConsensus] notify current peers to build sync log...");
impl.notifyPeersToBuildSyncLogChannel(peer);

// step 6: active new Peer
logger.info("[IoTConsensus] activate new peer...");
impl.activePeer(peer);
Expand All @@ -340,7 +340,6 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
impl.notifyPeersToRemoveSyncLogChannel(peer);
throw new ConsensusException(e);
} finally {
impl.checkAndUnlockSafeDeletedSearchIndex();
logger.info("[IoTConsensus] clean up local snapshot...");
impl.cleanupLocalSnapshot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ public class IoTConsensusServerImpl {
private final ScheduledExecutorService backgroundTaskService;
private final IoTConsensusRateLimiter ioTConsensusRateLimiter =
IoTConsensusRateLimiter.getInstance();
private volatile long lastPinnedSearchIndexForMigration = -1;
private volatile long lastPinnedSafeDeletedIndexForMigration = -1;

public IoTConsensusServerImpl(
String storageDir,
Expand Down Expand Up @@ -516,7 +514,7 @@ public void notifyPeersToBuildSyncLogChannel(Peer targetPeer)
if (peer.equals(thisNode)) {
// use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the
// snapshot produced by thisNode
buildSyncLogChannel(targetPeer, lastPinnedSearchIndexForMigration);
buildSyncLogChannel(targetPeer);
} else {
// use RPC to tell other peers to build sync log channel to target peer
try (SyncIoTConsensusServiceClient client =
Expand Down Expand Up @@ -822,9 +820,7 @@ public long getMinSyncIndex() {
}

public long getMinFlushedSyncIndex() {
return lastPinnedSafeDeletedIndexForMigration == -1
? logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get)
: lastPinnedSafeDeletedIndexForMigration;
return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get);
}

public String getStorageDir() {
Expand Down Expand Up @@ -947,25 +943,6 @@ public void cleanupLocalSnapshot() {
}
}

/**
* We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data
* lost.
*/
public void checkAndLockSafeDeletedSearchIndex() {
lastPinnedSearchIndexForMigration = searchIndex.get();
lastPinnedSafeDeletedIndexForMigration = getMinFlushedSyncIndex();
consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex());
}

/**
* We should unlock safelyDeletedSearchIndex after addPeer to avoid potential data accumulation.
*/
public void checkAndUnlockSafeDeletedSearchIndex() {
lastPinnedSearchIndexForMigration = -1;
lastPinnedSafeDeletedIndexForMigration = -1;
checkAndUpdateSafeDeletedSearchIndex();
}

/**
* If there is only one replica, set it to Long.MAX_VALUE.、 If there are multiple replicas, get
* the latest SafelyDeletedSearchIndex again. This enables wal to be deleted in a timely manner.
Expand Down

0 comments on commit 44b348c

Please sign in to comment.