From 91f3408d862e4f4dce4be4682f41f0f90e027f31 Mon Sep 17 00:00:00 2001 From: yantian Date: Tue, 17 Dec 2024 14:35:53 +0800 Subject: [PATCH] fix flink alter database change comment and add test, doc for location --- docs/content/flink/sql-alter.md | 8 ++++ docs/content/spark/sql-alter.md | 8 ++++ .../org/apache/paimon/flink/FlinkCatalog.java | 18 ++++++-- .../apache/paimon/flink/FlinkCatalogTest.java | 31 ++++++++++++++ .../sql/DDLWithHiveCatalogTestBase.scala | 42 ++++++++++++++++++- 5 files changed, 103 insertions(+), 4 deletions(-) diff --git a/docs/content/flink/sql-alter.md b/docs/content/flink/sql-alter.md index 645cb06ef365..6c3186b4af75 100644 --- a/docs/content/flink/sql-alter.md +++ b/docs/content/flink/sql-alter.md @@ -235,3 +235,11 @@ The following SQL sets one or more properties in the specified database. If a pa ```sql ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...) ``` + +## Altering Database Location + +The following SQL changes location of database `my_database` to `file:/temp/my_database`. + +```sql +ALTER DATABASE my_database SET ('location' = 'file:/temp/my_database') +``` diff --git a/docs/content/spark/sql-alter.md b/docs/content/spark/sql-alter.md index 839f0d2f8ce4..359b1187292d 100644 --- a/docs/content/spark/sql-alter.md +++ b/docs/content/spark/sql-alter.md @@ -249,4 +249,12 @@ The following SQL sets one or more properties in the specified database. If a pa ```sql ALTER { DATABASE | SCHEMA | NAMESPACE } my_database SET { DBPROPERTIES | PROPERTIES } ( property_name = property_value [ , ... ] ) +``` + +## Altering Database Location + +The following SQL sets the location of the specified database to `file:/temp/my_database.db`. + +```sql +ALTER DATABASE my_database SET LOCATION 'file:/temp/my_database.db' ``` \ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 510d7c59a8b7..ec3c4a47a69d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -136,6 +136,7 @@ import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_MODE; import static org.apache.paimon.CoreOptions.MATERIALIZED_TABLE_REFRESH_STATUS; import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.catalog.Catalog.COMMENT_PROP; import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP; import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP; import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP; @@ -244,7 +245,7 @@ public void createDatabase(String name, CatalogDatabase database, boolean ignore properties = new HashMap<>(database.getProperties()); if (database.getDescription().isPresent() && !database.getDescription().get().equals("")) { - properties.put(Catalog.COMMENT_PROP, database.getDescription().get()); + properties.put(COMMENT_PROP, database.getDescription().get()); } } else { properties = Collections.emptyMap(); @@ -623,7 +624,7 @@ private List toSchemaChange( SchemaManager.checkAlterTablePath(key); - if (Catalog.COMMENT_PROP.equals(key)) { + if (COMMENT_PROP.equals(key)) { schemaChanges.add(SchemaChange.updateComment(value)); } else { schemaChanges.add(SchemaChange.setOption(key, value)); @@ -632,7 +633,7 @@ private List toSchemaChange( } else if (change instanceof ResetOption) { ResetOption resetOption = (ResetOption) change; String key = resetOption.getKey(); - if (Catalog.COMMENT_PROP.equals(key)) { + if (COMMENT_PROP.equals(key)) { schemaChanges.add(SchemaChange.updateComment(null)); } else { schemaChanges.add(SchemaChange.removeOption(resetOption.getKey())); @@ -1220,6 +1221,8 @@ public final void alterDatabase( Database oldDatabase = catalog.getDatabase(name); List changes = getPropertyChanges(oldDatabase.options(), newDatabase.getProperties()); + getPropertyChangeFromComment(oldDatabase.comment(), newDatabase.getDescription()) + .ifPresent(changes::add); catalog.alterDatabase(name, changes, ignoreIfNotExists); } catch (Catalog.DatabaseNotExistException e) { if (!ignoreIfNotExists) { @@ -1295,6 +1298,15 @@ static List getPropertyChanges( return changes; } + @VisibleForTesting + static Optional getPropertyChangeFromComment( + Optional oldComment, Optional newComment) { + if (newComment.isPresent() && !oldComment.equals(newComment)) { + return Optional.of(PropertyChange.setProperty(COMMENT_PROP, newComment.get())); + } + return Optional.empty(); + } + private Table getPaimonTable(ObjectPath tablePath) throws TableNotExistException { try { Identifier identifier = toIdentifier(tablePath); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 0b68d9ecf6ff..4b8cf7912192 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -84,6 +84,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Stream; @@ -617,6 +618,20 @@ public void testAlterDb() throws DatabaseAlreadyExistException, DatabaseNotExist verify(mockCatalog, times(1)).getDatabase(path1.getDatabaseName()); } + @Test + public void testAlterDbComment() + throws DatabaseAlreadyExistException, DatabaseNotExistException { + CatalogDatabaseImpl database = new CatalogDatabaseImpl(Collections.emptyMap(), null); + catalog.createDatabase(path1.getDatabaseName(), database, false); + Catalog mockCatalog = spy(catalog); + when(mockCatalog.getDatabase(path1.getDatabaseName())).thenReturn(database); + CatalogDatabaseImpl newDatabase = new CatalogDatabaseImpl(Collections.emptyMap(), "aa"); + doNothing().when(mockCatalog).alterDatabase(path1.getDatabaseName(), newDatabase, false); + mockCatalog.alterDatabase(path1.getDatabaseName(), newDatabase, false); + verify(mockCatalog, times(1)).alterDatabase(path1.getDatabaseName(), newDatabase, false); + verify(mockCatalog, times(1)).getDatabase(path1.getDatabaseName()); + } + @Test public void testAlterDb_DatabaseNotExistException() { CatalogDatabaseImpl database = new CatalogDatabaseImpl(Collections.emptyMap(), null); @@ -640,6 +655,22 @@ public void testGetProperties() throws Exception { assertThat(propertyChanges.size()).isEqualTo(2); } + @Test + public void testGetPropertyChangeFromComment() { + Optional commentChange = + FlinkCatalog.getPropertyChangeFromComment(Optional.empty(), Optional.empty()); + assertThat(commentChange.isPresent()).isFalse(); + commentChange = + FlinkCatalog.getPropertyChangeFromComment(Optional.of("aa"), Optional.of("bb")); + assertThat(commentChange.isPresent()).isTrue(); + commentChange = + FlinkCatalog.getPropertyChangeFromComment(Optional.of("aa"), Optional.empty()); + assertThat(commentChange.isPresent()).isFalse(); + commentChange = + FlinkCatalog.getPropertyChangeFromComment(Optional.empty(), Optional.of("bb")); + assertThat(commentChange.isPresent()).isTrue(); + } + @Test public void testCreateTableWithColumnOptions() throws Exception { ResolvedExpression expression = diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala index 4ba079ea0bb2..526e24250751 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala @@ -22,7 +22,7 @@ import org.apache.paimon.hive.HiveMetastoreClient import org.apache.paimon.spark.PaimonHiveTestBase import org.apache.paimon.table.FileStoreTable -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.junit.jupiter.api.Assertions abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { @@ -194,6 +194,46 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase { } } + test("Paimon DDL with hive catalog: alter database's properties") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + val databaseName = "paimon_db" + withDatabase(databaseName) { + spark.sql(s"CREATE DATABASE $databaseName WITH DBPROPERTIES ('k1' = 'v1', 'k2' = 'v2')") + var props = getDatabaseProps(databaseName) + Assertions.assertEquals(props("k1"), "v1") + Assertions.assertEquals(props("k2"), "v2") + spark.sql(s"ALTER DATABASE $databaseName SET DBPROPERTIES ('k1' = 'v11', 'k2' = 'v22')") + props = getDatabaseProps(databaseName) + Assertions.assertEquals(props("k1"), "v11") + Assertions.assertEquals(props("k2"), "v22") + } + } + } + + test("Paimon DDL with hive catalog: alter database location") { + Seq(sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + spark.sql(s"USE $catalogName") + val databaseName = "paimon_db" + withDatabase(databaseName) { + spark.sql(s"CREATE DATABASE $databaseName WITH DBPROPERTIES ('k1' = 'v1', 'k2' = 'v2')") + withTempDir { + dBLocation => + try { + spark.sql( + s"ALTER DATABASE $databaseName SET LOCATION '${dBLocation.getCanonicalPath}'") + } catch { + case e: AnalysisException => + Assertions.assertTrue( + e.getMessage.contains("does not support altering database location")) + } + } + } + } + } + test("Paimon DDL with hive catalog: set default database") { var reusedSpark = spark