From ea3797def0bcae1d0ec314138b5fb0a3304d997e Mon Sep 17 00:00:00 2001 From: zhu3pang Date: Fri, 29 Mar 2024 19:28:00 +0800 Subject: [PATCH] [PAIMON-1401] [Feature] Implement Purge table for SparkCatalog, add some UTs --- .../paimon/catalog/AbstractCatalog.java | 12 +++++- .../paimon/catalog/FileSystemCatalog.java | 6 +-- .../org/apache/paimon/jdbc/JdbcCatalog.java | 13 ++++++- .../paimon/catalog/CatalogTestBase.java | 25 +++++++++++- .../apache/paimon/hive/HiveCatalogTest.java | 13 ++++++- .../paimon/spark/SparkGenericCatalogTest.java | 38 ------------------- .../paimon/spark/PaimonSparkTestBase.scala | 5 +++ .../apache/paimon/spark/sql/DDLTestBase.scala | 18 +++++++++ 8 files changed, 81 insertions(+), 49 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index 5be55102a5cd..207d37c4eca0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -205,7 +205,7 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep @Override public void dropTable(Identifier identifier, boolean ignoreIfNotExists) throws TableNotExistException { - dropTable(identifier, ignoreIfNotExists, false); + dropTable(identifier, ignoreIfNotExists, true); } @Override @@ -382,6 +382,16 @@ public Map> allTablePaths() { */ public abstract String warehouse(); + /** + * Get the trash path for the catalog if exists. The default trash path is `warehouse` + + * "/.Trash". + * + * @return The catalog trash path. + */ + public String trashPath() { + return warehouse() + "/.Trash"; + } + public Map options() { return catalogOptions.toMap(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index a6c018b490a8..8233bd65aa37 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -139,7 +139,7 @@ protected void dropTableImpl(Identifier identifier, boolean ifPurge) { fileIO.rename( path, new Path( - trash(), + trashPath(), identifier.getDatabaseName() + ".db/" + identifier.getObjectName()))); @@ -210,8 +210,4 @@ public String warehouse() { public boolean caseSensitive() { return catalogOptions.get(CASE_SENSITIVE); } - - public String trash() { - return warehouse + "/.Trash"; - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index c81af9b4c913..bb811335c245 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -218,7 +218,6 @@ protected List listTablesImpl(String databaseName) { @Override protected void dropTableImpl(Identifier identifier, boolean ifPurge) { - //fixme purge for jdbc is not implemented try { int deletedRecords = execute( @@ -235,7 +234,17 @@ protected void dropTableImpl(Identifier identifier, boolean ifPurge) { Path path = getDataTableLocation(identifier); try { if (fileIO.exists(path)) { - fileIO.deleteDirectoryQuietly(path); + if (ifPurge) { + fileIO.deleteDirectoryQuietly(path); + } else { + fileIO.rename( + path, + new Path( + trashPath(), + identifier.getDatabaseName() + + ".db/" + + identifier.getObjectName())); + } } } catch (Exception ex) { LOG.error("Delete directory[{}] fail for table {}", path, identifier, ex); diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 378ebf159079..d948b676230a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -51,7 +51,7 @@ /** Base test class of paimon catalog in {@link Catalog}. */ public abstract class CatalogTestBase { - @TempDir java.nio.file.Path tempFile; + @TempDir protected java.nio.file.Path tempFile; protected String warehouse; protected FileIO fileIO; protected Catalog catalog; @@ -359,12 +359,33 @@ public void testGetTable() throws Exception { @Test public void testDropTable() throws Exception { + testDropTableImpl(true); + } + + @Test + public void testDropTableNotPurge() throws Exception { + testDropTableImpl(false); + } + + protected Path trashPath(String database, String table) { + return new Path(warehouse, ".Trash/" + database + ".db/" + table); + } + + private void testDropTableImpl(boolean ifPurge) throws Exception { catalog.createDatabase("test_db", false); // Drop table deletes the table when it exists Identifier identifier = Identifier.create("test_db", "table_to_drop"); catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); - catalog.dropTable(identifier, false); + if (ifPurge) { + catalog.dropTable(identifier, true); + // Dropped table should not be in trash + assertThat(fileIO.exists(trashPath("test_db", "table_to_drop"))).isFalse(); + } else { + catalog.dropTable(identifier, true, false); + // Dropped table should be in trash + assertThat(fileIO.exists(trashPath("test_db", "table_to_drop"))).isTrue(); + } boolean exists = catalog.tableExists(identifier); assertThat(exists).isFalse(); diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index f88969de7425..78a5f1bdea91 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -61,7 +62,10 @@ public class HiveCatalogTest extends CatalogTestBase { @BeforeEach public void setUp() throws Exception { super.setUp(); - HiveConf hiveConf = new HiveConf(); + HiveConf hiveConf = + HiveCatalog.createHiveConf( + null, HADOOP_CONF_DIR, HadoopUtils.getHadoopConfiguration(new Options())); + hiveConf.set("fs.defaultFS", warehouse); String jdoConnectionURL = "jdbc:derby:memory:" + UUID.randomUUID(); hiveConf.setVar(METASTORECONNECTURLKEY, jdoConnectionURL + ";create=true"); HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(hiveConf); @@ -270,4 +274,11 @@ public void testAlterHiveTableParameters() { fail("Test failed due to exception: " + e.getMessage()); } } + + @Override + protected Path trashPath(String database, String table) { + return new Path( + System.getenv("HOME"), + ".Trash/Current" + tempFile.toString() + "/" + database + ".db/" + table); + } } diff --git a/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java b/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java index 9afd10ac70b1..bb3e81c9dd36 100644 --- a/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java +++ b/paimon-spark/paimon-spark-3.1/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java @@ -36,8 +36,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import java.nio.file.Files; -import java.nio.file.Paths; import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -107,42 +105,6 @@ public void testCsvTable() { .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]"); } - @Test - public void testDropTable() throws Exception { - spark.sql( - "CREATE TABLE CT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES" - + " ('file.format'='avro')"); - writeTable( - "CT", - GenericRow.of(1, 2, BinaryString.fromString("3")), - GenericRow.of(4, 5, BinaryString.fromString("6"))); - spark.sql("DROP TABLE CT").collect(); - assertThat(spark.sql("SHOW TABLES like 'CT'").collectAsList()).isEmpty(); - assertThat(tableInTrash("CT")).isTrue(); - } - - @Test - public void testPurgeTable() throws Exception { - spark.sql( - "CREATE TABLE CT (a INT, b INT, c STRING) USING paimon TBLPROPERTIES" - + " ('file.format'='avro')"); - writeTable( - "CT", - GenericRow.of(1, 2, BinaryString.fromString("3")), - GenericRow.of(4, 5, BinaryString.fromString("6"))); - spark.sql("DROP TABLE CT PURGE").collect(); - assertThat(spark.sql("SHOW TABLES like 'CT'").collectAsList()).isEmpty(); - assertThat(tableInTrash("CT")).isFalse(); - } - - private static boolean tableInTrash(String table) { - return Files.exists( - Paths.get( - new Path(warehousePath, String.format(".Trash/default.db/%s", table)) - .toUri() - .getPath())); - } - private static void writeTable(String tableName, GenericRow... rows) throws Exception { FileStoreTable fileStoreTable = FileStoreTableFactory.create( diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala index 0e79d1080aa4..106b300b32e8 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala @@ -36,6 +36,7 @@ import org.scalactic.source.Position import org.scalatest.Tag import java.io.File +import java.nio.file.Files import scala.util.Random @@ -118,4 +119,8 @@ class PaimonSparkTestBase extends QueryTest with SharedSparkSession with WithTab Some(SparkIdentifier.of(Array(this.dbName0), tableName)) ) } + + protected def tableInTrash(tableName: String): Boolean = { + Files.exists(new File(tempDBDir, s".Trash/$dbName0.db/$tableName").toPath) + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index 9ddb83118955..45bb1ae131aa 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -71,4 +71,22 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } } + + test("Paimon DDL: drop table not purge") { + withTable(tableName0) { + spark.sql(s"CREATE TABLE $tableName0 (a INT) USING parquet") + spark.sql(s"INSERT INTO $tableName0 VALUES (1)") + spark.sql(s"DROP TABLE $tableName0") + assert(tableInTrash(tableName0)) + } + } + + test("Paimon DDL: drop table purge") { + withTable(tableName0) { + spark.sql(s"CREATE TABLE $tableName0 (a INT) USING parquet") + spark.sql(s"INSERT INTO $tableName0 VALUES (1)") + spark.sql(s"DROP TABLE $tableName0 PURGE") + assert(tableInTrash(tableName0) === false) + } + } }