diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 08e80bbe2691..564a0264b1ca 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -303,24 +303,24 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE logger.info("[IoTConsensus] inactivate new peer: {}", peer); impl.inactivePeer(peer, false); - // step 2: take snapshot + // step 2: notify all the other Peers to build the sync connection to newPeer + logger.info("[IoTConsensus] notify current peers to build sync log..."); + impl.notifyPeersToBuildSyncLogChannel(peer); + + // step 3: take snapshot logger.info("[IoTConsensus] start to take snapshot..."); - impl.checkAndLockSafeDeletedSearchIndex(); + impl.takeSnapshot(); - // step 3: transit snapshot + // step 4: transit snapshot logger.info("[IoTConsensus] start to transmit snapshot..."); impl.transmitSnapshot(peer); - // step 4: let the new peer load snapshot + // step 5: let the new peer load snapshot logger.info("[IoTConsensus] trigger new peer to load snapshot..."); impl.triggerSnapshotLoad(peer); KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION); - // step 5: notify all the other Peers to build the sync connection to newPeer - logger.info("[IoTConsensus] notify current peers to build sync log..."); - impl.notifyPeersToBuildSyncLogChannel(peer); - // step 6: active new Peer logger.info("[IoTConsensus] activate new peer..."); impl.activePeer(peer); @@ -340,7 +340,6 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE impl.notifyPeersToRemoveSyncLogChannel(peer); throw new ConsensusException(e); } finally { - impl.checkAndUnlockSafeDeletedSearchIndex(); logger.info("[IoTConsensus] clean up local snapshot..."); impl.cleanupLocalSnapshot(); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index efa1a0306ef4..57256b39b6ab 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -132,8 +132,6 @@ public class IoTConsensusServerImpl { private final ScheduledExecutorService backgroundTaskService; private final IoTConsensusRateLimiter ioTConsensusRateLimiter = IoTConsensusRateLimiter.getInstance(); - private volatile long lastPinnedSearchIndexForMigration = -1; - private volatile long lastPinnedSafeDeletedIndexForMigration = -1; public IoTConsensusServerImpl( String storageDir, @@ -516,7 +514,7 @@ public void notifyPeersToBuildSyncLogChannel(Peer targetPeer) if (peer.equals(thisNode)) { // use searchIndex for thisNode as the initialSyncIndex because targetPeer will load the // snapshot produced by thisNode - buildSyncLogChannel(targetPeer, lastPinnedSearchIndexForMigration); + buildSyncLogChannel(targetPeer); } else { // use RPC to tell other peers to build sync log channel to target peer try (SyncIoTConsensusServiceClient client = @@ -822,9 +820,7 @@ public long getMinSyncIndex() { } public long getMinFlushedSyncIndex() { - return lastPinnedSafeDeletedIndexForMigration == -1 - ? logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get) - : lastPinnedSafeDeletedIndexForMigration; + return logDispatcher.getMinFlushedSyncIndex().orElseGet(searchIndex::get); } public String getStorageDir() { @@ -947,25 +943,6 @@ public void cleanupLocalSnapshot() { } } - /** - * We should set safelyDeletedSearchIndex to searchIndex before addPeer to avoid potential data - * lost. - */ - public void checkAndLockSafeDeletedSearchIndex() { - lastPinnedSearchIndexForMigration = searchIndex.get(); - lastPinnedSafeDeletedIndexForMigration = getMinFlushedSyncIndex(); - consensusReqReader.setSafelyDeletedSearchIndex(getMinFlushedSyncIndex()); - } - - /** - * We should unlock safelyDeletedSearchIndex after addPeer to avoid potential data accumulation. - */ - public void checkAndUnlockSafeDeletedSearchIndex() { - lastPinnedSearchIndexForMigration = -1; - lastPinnedSafeDeletedIndexForMigration = -1; - checkAndUpdateSafeDeletedSearchIndex(); - } - /** * If there is only one replica, set it to Long.MAX_VALUE.、 If there are multiple replicas, get * the latest SafelyDeletedSearchIndex again. This enables wal to be deleted in a timely manner.