From f04f86ad113ea85191d607fc5fb4ac8af02955aa Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Wed, 21 Aug 2024 17:23:27 +0800 Subject: [PATCH] [flink] FlinkCatalog should assert different path for creating table (#4023) --- docs/content/flink/sql-ddl.md | 5 +--- .../org/apache/paimon/flink/FlinkCatalog.java | 16 ++++++++++-- .../paimon/flink/CatalogTableITCase.java | 10 ++++---- .../apache/paimon/flink/FlinkCatalogTest.java | 25 ++++++++++++++++--- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/docs/content/flink/sql-ddl.md b/docs/content/flink/sql-ddl.md index 6fcc3da3a72b..363d7475761c 100644 --- a/docs/content/flink/sql-ddl.md +++ b/docs/content/flink/sql-ddl.md @@ -280,10 +280,7 @@ CREATE TABLE my_table ( PRIMARY KEY (dt, hh, user_id) NOT ENFORCED ); -CREATE TABLE my_table_like LIKE my_table; - --- Create Paimon Table like other connector table -CREATE TABLE my_table_like WITH ('connector' = 'paimon') LIKE my_table; +CREATE TABLE my_table_like LIKE my_table (EXCLUDING OPTIONS); ``` ## Work with Flink Temporary Tables diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 8d7b6883e16c..7765215353a3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -301,7 +302,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig "Only support CatalogTable, but is: " + table.getClass()); } - if (getDefaultDatabase().equals(tablePath.getDatabaseName()) + if (Objects.equals(getDefaultDatabase(), tablePath.getDatabaseName()) && disableCreateTableInDefaultDatabase) { throw new UnsupportedOperationException( "Creating table in default database is disabled, please specify a database name."); @@ -353,7 +354,18 @@ protected Schema buildPaimonSchema( } // remove table path - options.remove(PATH.key()); + String path = options.remove(PATH.key()); + if (path != null) { + Path expectedPath = catalog.getTableLocation(identifier); + if (!new Path(path).equals(expectedPath)) { + throw new CatalogException( + String.format( + "You specified the Path when creating the table, " + + "but the Path '%s' is different from where it should be '%s'. " + + "Please remove the Path.", + path, expectedPath)); + } + } return fromCatalogTable(catalogTable.copy(options)); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java index 199f26ad17a7..417cfcd78b5b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java @@ -179,7 +179,7 @@ public void testCreateSystemTable() { } @Test - public void testManifestsTable() throws Exception { + public void testManifestsTable() { sql("CREATE TABLE T (a INT, b INT)"); sql("INSERT INTO T VALUES (1, 2)"); @@ -205,7 +205,7 @@ public void testManifestsTableWithFileCount() { } @Test - public void testSchemasTable() throws Exception { + public void testSchemasTable() { sql( "CREATE TABLE T(a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')"); sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')"); @@ -240,7 +240,7 @@ public void testSchemasTable() throws Exception { } @Test - public void testSnapshotsSchemasTable() throws Exception { + public void testSnapshotsSchemasTable() { sql("CREATE TABLE T (a INT, b INT)"); sql("INSERT INTO T VALUES (1, 2)"); sql("INSERT INTO T VALUES (3, 4)"); @@ -261,9 +261,9 @@ public void testSnapshotsSchemasTable() throws Exception { } @Test - public void testCreateTableLike() throws Exception { + public void testCreateTableLike() { sql("CREATE TABLE T (a INT)"); - sql("CREATE TABLE T1 LIKE T"); + sql("CREATE TABLE T1 LIKE T (EXCLUDING OPTIONS)"); List result = sql( "SELECT schema_id, fields, partition_keys, " diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index f95ec3ae4a16..f7edfa9d644f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -76,6 +76,7 @@ import java.util.stream.Stream; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB; import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER; @@ -97,15 +98,17 @@ public class FlinkCatalogTest { private final ObjectPath tableInDefaultDb1 = new ObjectPath("default-db", "t1"); private final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); private final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist"); + + private String warehouse; private Catalog catalog; @TempDir public static java.nio.file.Path temporaryFolder; @BeforeEach public void beforeEach() throws IOException { - String path = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + warehouse = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); Options conf = new Options(); - conf.setString("warehouse", path); + conf.setString("warehouse", warehouse); conf.set(LOG_SYSTEM_AUTO_REGISTER, true); catalog = FlinkCatalogFactory.createCatalog( @@ -229,6 +232,22 @@ public void testCreateFlinkTable(Map options) { .hasMessageContaining("Paimon Catalog only supports paimon tables"); } + @Test + public void testCreateFlinkTableWithPath() throws Exception { + catalog.createDatabase(path1.getDatabaseName(), null, false); + Map options = new HashMap<>(); + options.put(PATH.key(), "/unknown/path"); + CatalogTable table1 = createTable(options); + assertThatThrownBy(() -> catalog.createTable(this.path1, table1, false)) + .hasMessageContaining( + "You specified the Path when creating the table, " + + "but the Path '/unknown/path' is different from where it should be"); + + options.put(PATH.key(), warehouse + "/db1.db/t1"); + CatalogTable table2 = createTable(options); + catalog.createTable(this.path1, table2, false); + } + @ParameterizedTest @MethodSource("streamingOptionProvider") public void testCreateTable_Streaming(Map options) throws Exception { @@ -654,7 +673,7 @@ private void checkEquals( .catalog() .getTable(FlinkCatalog.toIdentifier(path)) .options() - .get(CoreOptions.PATH.key())); + .get(PATH.key())); } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) { throw new RuntimeException(e); }