Skip to content

Commit

Permalink
[PAIMON-1401] [Feature] Implement Purge table for SparkCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
zhu3pang committed Mar 15, 2024
1 parent 994849c commit 89e3085
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
dropTable(identifier, ignoreIfNotExists, false);
}

@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists, boolean ifPurge)
throws TableNotExistException {
checkNotSystemTable(identifier, "dropTable");
if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
Expand All @@ -181,10 +187,10 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throw new TableNotExistException(identifier);
}

dropTableImpl(identifier);
dropTableImpl(identifier, ifPurge);
}

protected abstract void dropTableImpl(Identifier identifier);
protected abstract void dropTableImpl(Identifier identifier, boolean ifPurge);

@Override
public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists)
Expand Down Expand Up @@ -364,7 +370,7 @@ protected boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}

private void checkNotSystemTable(Identifier identifier, String method) {
protected void checkNotSystemTable(Identifier identifier, String method) {
if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ default boolean tableExists(Identifier identifier) {
*/
void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException;

/**
* @param ifPurge completely purge the table (skipping trash) while removing data from warehouse
* @see #dropTable(Identifier, boolean)
*/
void dropTable(Identifier identifier, boolean ignoreIfNotExists, boolean ifPurge)
throws TableNotExistException;

/**
* Create a new table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,21 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, boolean ifPurge) {
Path path = getDataTableLocation(identifier);
uncheck(() -> fileIO.delete(path, true));
if (ifPurge) {
uncheck(() -> fileIO.delete(path, true));
} else {
uncheck(
() ->
fileIO.rename(
path,
new Path(
trash(),
identifier.getDatabaseName()
+ ".db/"
+ identifier.getObjectName())));
}
}

@Override
Expand Down Expand Up @@ -188,4 +200,8 @@ public String warehouse() {
public boolean caseSensitive() {
return catalogOptions.get(CASE_SENSITIVE);
}

public String trash() {
return warehouse + "/.Trash";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ protected List<String> listTablesImpl(String databaseName) {
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, boolean ifPurge) {
//fixme purge for jdbc is not implemented
try {
int deletedRecords =
execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
}

@Override
protected void dropTableImpl(Identifier identifier) {
protected void dropTableImpl(Identifier identifier, boolean ifPurge) {
try {
client.dropTable(
identifier.getDatabaseName(), identifier.getObjectName(), true, false, true);
identifier.getDatabaseName(), identifier.getObjectName(), true, false, ifPurge);

// When drop a Hive external table, only the hive metadata is deleted and the data files
// are not deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -105,6 +107,42 @@ public void testCsvTable() {
.containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
}

@Test
public void testDropTable() throws Exception {
spark.sql(
"CREATE TABLE CT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");
writeTable(
"CT",
GenericRow.of(1, 2, BinaryString.fromString("3")),
GenericRow.of(4, 5, BinaryString.fromString("6")));
spark.sql("DROP TABLE CT").collect();
assertThat(spark.sql("SHOW TABLES like 'CT'").collectAsList()).isEmpty();
assertThat(tableInTrash("CT")).isTrue();
}

@Test
public void testPurgeTable() throws Exception {
spark.sql(
"CREATE TABLE CT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES"
+ " ('file.format'='avro')");
writeTable(
"CT",
GenericRow.of(1, 2, BinaryString.fromString("3")),
GenericRow.of(4, 5, BinaryString.fromString("6")));
spark.sql("DROP TABLE CT PURGE").collect();
assertThat(spark.sql("SHOW TABLES like 'CT'").collectAsList()).isEmpty();
assertThat(tableInTrash("CT")).isFalse();
}

private static boolean tableInTrash(String table) {
return Files.exists(
Paths.get(
new Path(warehousePath, String.format(".Trash/default.db/%s", table))
.toUri()
.getPath()));
}

private static void writeTable(String tableName, GenericRow... rows) throws Exception {
FileStoreTable fileStoreTable =
FileStoreTableFactory.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,17 @@ public SparkTable createTable(
@Override
public boolean dropTable(Identifier ident) {
try {
catalog.dropTable(toIdentifier(ident), false);
catalog.dropTable(toIdentifier(ident), false, false);
return true;
} catch (Catalog.TableNotExistException | NoSuchTableException e) {
return false;
}
}

@Override
public boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
try {
catalog.dropTable(toIdentifier(ident), true, true);
return true;
} catch (Catalog.TableNotExistException | NoSuchTableException e) {
return false;
Expand Down

0 comments on commit 89e3085

Please sign in to comment.