diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/FlussPaths.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/FlussPaths.java
index a8c85915..51ef4dd6 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/utils/FlussPaths.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/FlussPaths.java
@@ -586,6 +586,28 @@ public static FsPath remoteKvTabletDir(
return new FsPath(remoteTableDir, String.valueOf(tableBucket.getBucket()));
}
+ /**
+ * Returns the remote directory path for storing kv snapshot files for a kv table.
+ *
+ *
The path contract:
+ *
+ *
+ * {remoteKvDir}/{databaseName}/{tableName}-{tableId}
+ *
+ *
+ * @param remoteKvDir the remote kv directory, usually should be "{$remote.data.dir}/kv"
+ */
+ public static FsPath remoteKvTableRootDir(
+ FsPath remoteKvDir, PhysicalTablePath physicalPath, TableBucket tableBucket) {
+ return new FsPath(
+ remoteKvDir,
+ String.format(
+ "%s/%s-%d",
+ physicalPath.getDatabaseName(),
+ physicalPath.getTableName(),
+ tableBucket.getTableId()));
+ }
+
/**
* Returns the remote directory path for storing kv snapshot exclusive files (manifest and
* CURRENT files).
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java
index d8441ed2..51fc4158 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java
@@ -19,6 +19,10 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.KvStorageException;
+import com.alibaba.fluss.exception.RemoteStorageException;
+import com.alibaba.fluss.fs.FileStatus;
+import com.alibaba.fluss.fs.FileSystem;
+import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.memory.LazyMemorySegmentPool;
import com.alibaba.fluss.memory.MemorySegmentPool;
import com.alibaba.fluss.metadata.KvFormat;
@@ -74,6 +78,10 @@ public final class KvManager extends TabletManagerBase {
/** The memory segment pool to allocate memorySegment. */
private final MemorySegmentPool memorySegmentPool;
+ private final FsPath remoteKvDir;
+
+ private FileSystem remoteFileSystem;
+
private KvManager(
File dataDir,
Configuration conf,
@@ -85,6 +93,12 @@ private KvManager(
this.arrowBufferAllocator = new RootAllocator(Long.MAX_VALUE);
this.memorySegmentPool = LazyMemorySegmentPool.create(conf);
this.zkClient = zkClient;
+ this.remoteKvDir = FlussPaths.remoteKvDir(conf);
+ try {
+ this.remoteFileSystem = remoteKvDir.getFileSystem();
+ } catch (Exception e) {
+ LOG.error("Can not find matched file system from dir {}", this.remoteKvDir);
+ }
}
public static KvManager create(
@@ -187,7 +201,8 @@ public Optional getKv(TableBucket tableBucket) {
return Optional.ofNullable(currentKvs.get(tableBucket));
}
- public void dropKv(TableBucket tableBucket) {
+ public void dropKv(
+ PhysicalTablePath physicalTablePath, TableBucket tableBucket, boolean deleteRemote) {
KvTablet dropKvTablet =
inLock(tabletCreationOrDeletionLock, () -> currentKvs.remove(tableBucket));
@@ -195,6 +210,9 @@ public void dropKv(TableBucket tableBucket) {
TablePath tablePath = dropKvTablet.getTablePath();
try {
dropKvTablet.drop();
+ if (deleteRemote) {
+ dropRemoteKvSnapshot(physicalTablePath, tableBucket);
+ }
if (dropKvTablet.getPartitionName() == null) {
LOG.info(
"Deleted kv bucket {} for table {} in file path {}.",
@@ -270,4 +288,41 @@ public KvTablet loadKv(File tabletDir) throws Exception {
this.currentKvs.put(tableBucket, kvTablet);
return kvTablet;
}
+
+ private void dropRemoteKvSnapshot(PhysicalTablePath physicalTablePath, TableBucket tableBucket)
+ throws RemoteStorageException {
+ FsPath remoteKvTabletDir =
+ FlussPaths.remoteKvTabletDir(remoteKvDir, physicalTablePath, tableBucket);
+ FsPath remoteKvTableRootDir =
+ FlussPaths.remoteKvTableRootDir(remoteKvDir, physicalTablePath, tableBucket);
+ try {
+ if (remoteFileSystem.exists(remoteKvTabletDir)) {
+ remoteFileSystem.delete(remoteKvTabletDir, true);
+ LOG.info("Delete remote table bucket snapshot of {} success.", tableBucket);
+ }
+ } catch (Exception e) {
+ throw new RemoteStorageException(
+ "Failed to delete remote kv tablet path:" + remoteKvTabletDir, e);
+ }
+ deleteEmptyParentRecursively(
+ remoteKvTabletDir.getParent(), remoteKvTableRootDir.getParent());
+ }
+
+ private void deleteEmptyParentRecursively(FsPath curDir, FsPath tableRootParentDir) {
+ if (!tableRootParentDir.getPath().equalsIgnoreCase(curDir.getPath())) {
+ try {
+ FileStatus[] fileStatuses = remoteFileSystem.listStatus(curDir);
+ if (fileStatuses != null && fileStatuses.length == 0) {
+ boolean deleted = remoteFileSystem.delete(curDir, false);
+ if (deleted) {
+ deleteEmptyParentRecursively(curDir.getParent(), tableRootParentDir);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to delete remote kv tablet path: {}, may this paths has been deleted",
+ curDir);
+ }
+ }
+ }
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index 747a2b54..6658c64c 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -177,34 +177,36 @@ public void triggerSnapshot() {
// of using guardedExecutor
guardedExecutor.execute(
() -> {
- LOG.debug("TableBucket {} triggers snapshot.", tableBucket);
- long triggerTime = System.currentTimeMillis();
-
- Optional snapshotRunnableOptional;
- try {
- snapshotRunnableOptional = target.initSnapshot();
- } catch (Exception e) {
- LOG.error("Fail to init snapshot during triggering snapshot.", e);
- return;
- }
- if (snapshotRunnableOptional.isPresent()) {
- SnapshotRunnable runnable = snapshotRunnableOptional.get();
- asyncOperationsThreadPool.execute(
- () ->
- asyncSnapshotPhase(
- triggerTime,
- runnable.getSnapshotId(),
- runnable.getCoordinatorEpoch(),
- runnable.getBucketLeaderEpoch(),
- runnable.getSnapshotLocation(),
- runnable.getSnapshotRunnable()));
- } else {
- scheduleNextSnapshot();
- LOG.debug(
- "TableBucket {} has no data updates since last snapshot, "
- + "skip this one and schedule the next one in {} seconds",
- tableBucket,
- periodicSnapshotDelay / 1000);
+ if (started && !periodicExecutor.isShutdown()) {
+ LOG.debug("TableBucket {} triggers snapshot.", tableBucket);
+ long triggerTime = System.currentTimeMillis();
+
+ Optional snapshotRunnableOptional;
+ try {
+ snapshotRunnableOptional = target.initSnapshot();
+ } catch (Exception e) {
+ LOG.error("Fail to init snapshot during triggering snapshot.", e);
+ return;
+ }
+ if (snapshotRunnableOptional.isPresent()) {
+ SnapshotRunnable runnable = snapshotRunnableOptional.get();
+ asyncOperationsThreadPool.execute(
+ () ->
+ asyncSnapshotPhase(
+ triggerTime,
+ runnable.getSnapshotId(),
+ runnable.getCoordinatorEpoch(),
+ runnable.getBucketLeaderEpoch(),
+ runnable.getSnapshotLocation(),
+ runnable.getSnapshotRunnable()));
+ } else {
+ scheduleNextSnapshot();
+ LOG.debug(
+ "TableBucket {} has no data updates since last snapshot, "
+ + "skip this one and schedule the next one in {} seconds",
+ tableBucket,
+ periodicSnapshotDelay / 1000);
+ }
}
});
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java
index 337b3f50..08fc9429 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java
@@ -445,7 +445,7 @@ public void delete() {
leaderIsrUpdateLock,
() -> {
if (isKvTable()) {
- dropKv();
+ dropKv(true);
}
// drop log then
logManager.dropLog(tableBucket);
@@ -490,7 +490,7 @@ private void onBecomeNewLeader() {
// if it's become new leader, we must
// first destroy the old kv tablet
// if exist. Otherwise, it'll use still the old kv tablet which will cause data loss
- dropKv();
+ dropKv(false);
// now, we can create a new kv tablet
createKv();
}
@@ -499,7 +499,7 @@ private void onBecomeNewLeader() {
private void onBecomeNewFollower() {
if (isKvTable()) {
// it should be from leader to follower, we need to destroy the kv tablet
- dropKv();
+ dropKv(false);
}
}
@@ -519,7 +519,7 @@ private void createKv() {
startPeriodicKvSnapshot(snapshotUsed.orElse(null));
}
- private void dropKv() {
+ private void dropKv(boolean deleteRemote) {
// close any closeable registry for kv
if (closeableRegistry.unregisterCloseable(closeableRegistryForKv)) {
IOUtils.closeQuietly(closeableRegistryForKv);
@@ -527,7 +527,7 @@ private void dropKv() {
if (kvTablet != null) {
// drop the kv tablet
checkNotNull(kvManager);
- kvManager.dropKv(tableBucket);
+ kvManager.dropKv(physicalPath, tableBucket, deleteRemote);
kvTablet = null;
}
}
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java
index 6bbf7632..d6b91f66 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvManagerTest.java
@@ -217,7 +217,8 @@ void testSameTableNameInDifferentDb(String partitionName) throws Exception {
void testDropKv(String partitionName) throws Exception {
initTableBuckets(partitionName);
KvTablet kv1 = getOrCreateKv(tablePath1, partitionName, tableBucket1);
- kvManager.dropKv(kv1.getTableBucket());
+ kvManager.dropKv(
+ PhysicalTablePath.of(tablePath1, partitionName), kv1.getTableBucket(), true);
assertThat(kv1.getKvTabletDir()).doesNotExist();
assertThat(kvManager.getKv(tableBucket1)).isNotPresent();