Skip to content

Commit

Permalink
[alibaba#121]fix bug for delete remote kv dir
Browse files Browse the repository at this point in the history
  • Loading branch information
Vipamp committed Dec 22, 2024
1 parent b2d0df9 commit 84d5270
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 35 deletions.
22 changes: 22 additions & 0 deletions fluss-common/src/main/java/com/alibaba/fluss/utils/FlussPaths.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,28 @@ public static FsPath remoteKvTabletDir(
return new FsPath(remoteTableDir, String.valueOf(tableBucket.getBucket()));
}

/**
* Returns the remote directory path for storing kv snapshot files for a kv table.
*
* <p>The path contract:
*
* <pre>
* {remoteKvDir}/{databaseName}/{tableName}-{tableId}
* </pre>
*
* @param remoteKvDir the remote kv directory, usually should be "{$remote.data.dir}/kv"
*/
public static FsPath remoteKvTableRootDir(
FsPath remoteKvDir, PhysicalTablePath physicalPath, TableBucket tableBucket) {
return new FsPath(
remoteKvDir,
String.format(
"%s/%s-%d",
physicalPath.getDatabaseName(),
physicalPath.getTableName(),
tableBucket.getTableId()));
}

/**
* Returns the remote directory path for storing kv snapshot exclusive files (manifest and
* CURRENT files).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.KvStorageException;
import com.alibaba.fluss.exception.RemoteStorageException;
import com.alibaba.fluss.fs.FileStatus;
import com.alibaba.fluss.fs.FileSystem;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.memory.LazyMemorySegmentPool;
import com.alibaba.fluss.memory.MemorySegmentPool;
import com.alibaba.fluss.metadata.KvFormat;
Expand Down Expand Up @@ -74,6 +78,10 @@ public final class KvManager extends TabletManagerBase {
/** The memory segment pool to allocate memorySegment. */
private final MemorySegmentPool memorySegmentPool;

private final FsPath remoteKvDir;

private FileSystem remoteFileSystem;

private KvManager(
File dataDir,
Configuration conf,
Expand All @@ -85,6 +93,12 @@ private KvManager(
this.arrowBufferAllocator = new RootAllocator(Long.MAX_VALUE);
this.memorySegmentPool = LazyMemorySegmentPool.create(conf);
this.zkClient = zkClient;
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
try {
this.remoteFileSystem = remoteKvDir.getFileSystem();
} catch (Exception e) {
LOG.error("Can not find matched file system from dir {}", this.remoteKvDir);
}
}

public static KvManager create(
Expand Down Expand Up @@ -187,14 +201,18 @@ public Optional<KvTablet> getKv(TableBucket tableBucket) {
return Optional.ofNullable(currentKvs.get(tableBucket));
}

public void dropKv(TableBucket tableBucket) {
public void dropKv(
PhysicalTablePath physicalTablePath, TableBucket tableBucket, boolean deleteRemote) {
KvTablet dropKvTablet =
inLock(tabletCreationOrDeletionLock, () -> currentKvs.remove(tableBucket));

if (dropKvTablet != null) {
TablePath tablePath = dropKvTablet.getTablePath();
try {
dropKvTablet.drop();
if (deleteRemote) {
dropRemoteKvSnapshot(physicalTablePath, tableBucket);
}
if (dropKvTablet.getPartitionName() == null) {
LOG.info(
"Deleted kv bucket {} for table {} in file path {}.",
Expand Down Expand Up @@ -270,4 +288,41 @@ public KvTablet loadKv(File tabletDir) throws Exception {
this.currentKvs.put(tableBucket, kvTablet);
return kvTablet;
}

private void dropRemoteKvSnapshot(PhysicalTablePath physicalTablePath, TableBucket tableBucket)
throws RemoteStorageException {
FsPath remoteKvTabletDir =
FlussPaths.remoteKvTabletDir(remoteKvDir, physicalTablePath, tableBucket);
FsPath remoteKvTableRootDir =
FlussPaths.remoteKvTableRootDir(remoteKvDir, physicalTablePath, tableBucket);
try {
if (remoteFileSystem.exists(remoteKvTabletDir)) {
remoteFileSystem.delete(remoteKvTabletDir, true);
LOG.info("Delete remote table bucket snapshot of {} success.", tableBucket);
}
} catch (Exception e) {
throw new RemoteStorageException(
"Failed to delete remote kv tablet path:" + remoteKvTabletDir, e);
}
deleteEmptyParentRecursively(
remoteKvTabletDir.getParent(), remoteKvTableRootDir.getParent());
}

private void deleteEmptyParentRecursively(FsPath curDir, FsPath tableRootParentDir) {
if (!tableRootParentDir.getPath().equalsIgnoreCase(curDir.getPath())) {
try {
FileStatus[] fileStatuses = remoteFileSystem.listStatus(curDir);
if (fileStatuses != null && fileStatuses.length == 0) {
boolean deleted = remoteFileSystem.delete(curDir, false);
if (deleted) {
deleteEmptyParentRecursively(curDir.getParent(), tableRootParentDir);
}
}
} catch (Exception e) {
LOG.warn(
"Failed to delete remote kv tablet path: {}, may this paths has been deleted",
curDir);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,34 +177,36 @@ public void triggerSnapshot() {
// of using guardedExecutor
guardedExecutor.execute(
() -> {
LOG.debug("TableBucket {} triggers snapshot.", tableBucket);
long triggerTime = System.currentTimeMillis();

Optional<SnapshotRunnable> snapshotRunnableOptional;
try {
snapshotRunnableOptional = target.initSnapshot();
} catch (Exception e) {
LOG.error("Fail to init snapshot during triggering snapshot.", e);
return;
}
if (snapshotRunnableOptional.isPresent()) {
SnapshotRunnable runnable = snapshotRunnableOptional.get();
asyncOperationsThreadPool.execute(
() ->
asyncSnapshotPhase(
triggerTime,
runnable.getSnapshotId(),
runnable.getCoordinatorEpoch(),
runnable.getBucketLeaderEpoch(),
runnable.getSnapshotLocation(),
runnable.getSnapshotRunnable()));
} else {
scheduleNextSnapshot();
LOG.debug(
"TableBucket {} has no data updates since last snapshot, "
+ "skip this one and schedule the next one in {} seconds",
tableBucket,
periodicSnapshotDelay / 1000);
if (started && !periodicExecutor.isShutdown()) {
LOG.debug("TableBucket {} triggers snapshot.", tableBucket);
long triggerTime = System.currentTimeMillis();

Optional<SnapshotRunnable> snapshotRunnableOptional;
try {
snapshotRunnableOptional = target.initSnapshot();
} catch (Exception e) {
LOG.error("Fail to init snapshot during triggering snapshot.", e);
return;
}
if (snapshotRunnableOptional.isPresent()) {
SnapshotRunnable runnable = snapshotRunnableOptional.get();
asyncOperationsThreadPool.execute(
() ->
asyncSnapshotPhase(
triggerTime,
runnable.getSnapshotId(),
runnable.getCoordinatorEpoch(),
runnable.getBucketLeaderEpoch(),
runnable.getSnapshotLocation(),
runnable.getSnapshotRunnable()));
} else {
scheduleNextSnapshot();
LOG.debug(
"TableBucket {} has no data updates since last snapshot, "
+ "skip this one and schedule the next one in {} seconds",
tableBucket,
periodicSnapshotDelay / 1000);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public void delete() {
leaderIsrUpdateLock,
() -> {
if (isKvTable()) {
dropKv();
dropKv(true);
}
// drop log then
logManager.dropLog(tableBucket);
Expand Down Expand Up @@ -490,7 +490,7 @@ private void onBecomeNewLeader() {
// if it's become new leader, we must
// first destroy the old kv tablet
// if exist. Otherwise, it'll use still the old kv tablet which will cause data loss
dropKv();
dropKv(false);
// now, we can create a new kv tablet
createKv();
}
Expand All @@ -499,7 +499,7 @@ private void onBecomeNewLeader() {
private void onBecomeNewFollower() {
if (isKvTable()) {
// it should be from leader to follower, we need to destroy the kv tablet
dropKv();
dropKv(false);
}
}

Expand All @@ -519,15 +519,15 @@ private void createKv() {
startPeriodicKvSnapshot(snapshotUsed.orElse(null));
}

private void dropKv() {
private void dropKv(boolean deleteRemote) {
// close any closeable registry for kv
if (closeableRegistry.unregisterCloseable(closeableRegistryForKv)) {
IOUtils.closeQuietly(closeableRegistryForKv);
}
if (kvTablet != null) {
// drop the kv tablet
checkNotNull(kvManager);
kvManager.dropKv(tableBucket);
kvManager.dropKv(physicalPath, tableBucket, deleteRemote);
kvTablet = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ void testSameTableNameInDifferentDb(String partitionName) throws Exception {
void testDropKv(String partitionName) throws Exception {
initTableBuckets(partitionName);
KvTablet kv1 = getOrCreateKv(tablePath1, partitionName, tableBucket1);
kvManager.dropKv(kv1.getTableBucket());
kvManager.dropKv(
PhysicalTablePath.of(tablePath1, partitionName), kv1.getTableBucket(), true);

assertThat(kv1.getKvTabletDir()).doesNotExist();
assertThat(kvManager.getKv(tableBucket1)).isNotPresent();
Expand Down

0 comments on commit 84d5270

Please sign in to comment.