Skip to content

Commit

Permalink
[flink] FlinkCatalog should assert different path for creating table (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Aug 21, 2024
1 parent adb0279 commit f04f86a
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
5 changes: 1 addition & 4 deletions docs/content/flink/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,7 @@ CREATE TABLE my_table (
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

CREATE TABLE my_table_like LIKE my_table;

-- Create Paimon Table like other connector table
CREATE TABLE my_table_like WITH ('connector' = 'paimon') LIKE my_table;
CREATE TABLE my_table_like LIKE my_table (EXCLUDING OPTIONS);
```

## Work with Flink Temporary Tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -301,7 +302,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
"Only support CatalogTable, but is: " + table.getClass());
}

if (getDefaultDatabase().equals(tablePath.getDatabaseName())
if (Objects.equals(getDefaultDatabase(), tablePath.getDatabaseName())
&& disableCreateTableInDefaultDatabase) {
throw new UnsupportedOperationException(
"Creating table in default database is disabled, please specify a database name.");
Expand Down Expand Up @@ -353,7 +354,18 @@ protected Schema buildPaimonSchema(
}

// remove table path
options.remove(PATH.key());
String path = options.remove(PATH.key());
if (path != null) {
Path expectedPath = catalog.getTableLocation(identifier);
if (!new Path(path).equals(expectedPath)) {
throw new CatalogException(
String.format(
"You specified the Path when creating the table, "
+ "but the Path '%s' is different from where it should be '%s'. "
+ "Please remove the Path.",
path, expectedPath));
}
}

return fromCatalogTable(catalogTable.copy(options));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void testCreateSystemTable() {
}

@Test
public void testManifestsTable() throws Exception {
public void testManifestsTable() {
sql("CREATE TABLE T (a INT, b INT)");
sql("INSERT INTO T VALUES (1, 2)");

Expand All @@ -205,7 +205,7 @@ public void testManifestsTableWithFileCount() {
}

@Test
public void testSchemasTable() throws Exception {
public void testSchemasTable() {
sql(
"CREATE TABLE T(a INT, b INT, c STRING, PRIMARY KEY (a) NOT ENFORCED) with ('a.aa.aaa'='val1', 'b.bb.bbb'='val2')");
sql("ALTER TABLE T SET ('snapshot.time-retained' = '5 h')");
Expand Down Expand Up @@ -240,7 +240,7 @@ public void testSchemasTable() throws Exception {
}

@Test
public void testSnapshotsSchemasTable() throws Exception {
public void testSnapshotsSchemasTable() {
sql("CREATE TABLE T (a INT, b INT)");
sql("INSERT INTO T VALUES (1, 2)");
sql("INSERT INTO T VALUES (3, 4)");
Expand All @@ -261,9 +261,9 @@ public void testSnapshotsSchemasTable() throws Exception {
}

@Test
public void testCreateTableLike() throws Exception {
public void testCreateTableLike() {
sql("CREATE TABLE T (a INT)");
sql("CREATE TABLE T1 LIKE T");
sql("CREATE TABLE T1 LIKE T (EXCLUDING OPTIONS)");
List<Row> result =
sql(
"SELECT schema_id, fields, partition_keys, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.stream.Stream;

import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
import static org.apache.paimon.flink.FlinkCatalogOptions.DISABLE_CREATE_TABLE_IN_DEFAULT_DB;
import static org.apache.paimon.flink.FlinkCatalogOptions.LOG_SYSTEM_AUTO_REGISTER;
Expand All @@ -97,15 +98,17 @@ public class FlinkCatalogTest {
private final ObjectPath tableInDefaultDb1 = new ObjectPath("default-db", "t1");
private final ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist");
private final ObjectPath nonExistObjectPath = ObjectPath.fromString("db1.nonexist");

private String warehouse;
private Catalog catalog;

@TempDir public static java.nio.file.Path temporaryFolder;

@BeforeEach
public void beforeEach() throws IOException {
String path = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
warehouse = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
Options conf = new Options();
conf.setString("warehouse", path);
conf.setString("warehouse", warehouse);
conf.set(LOG_SYSTEM_AUTO_REGISTER, true);
catalog =
FlinkCatalogFactory.createCatalog(
Expand Down Expand Up @@ -229,6 +232,22 @@ public void testCreateFlinkTable(Map<String, String> options) {
.hasMessageContaining("Paimon Catalog only supports paimon tables");
}

@Test
public void testCreateFlinkTableWithPath() throws Exception {
catalog.createDatabase(path1.getDatabaseName(), null, false);
Map<String, String> options = new HashMap<>();
options.put(PATH.key(), "/unknown/path");
CatalogTable table1 = createTable(options);
assertThatThrownBy(() -> catalog.createTable(this.path1, table1, false))
.hasMessageContaining(
"You specified the Path when creating the table, "
+ "but the Path '/unknown/path' is different from where it should be");

options.put(PATH.key(), warehouse + "/db1.db/t1");
CatalogTable table2 = createTable(options);
catalog.createTable(this.path1, table2, false);
}

@ParameterizedTest
@MethodSource("streamingOptionProvider")
public void testCreateTable_Streaming(Map<String, String> options) throws Exception {
Expand Down Expand Up @@ -654,7 +673,7 @@ private void checkEquals(
.catalog()
.getTable(FlinkCatalog.toIdentifier(path))
.options()
.get(CoreOptions.PATH.key()));
.get(PATH.key()));
} catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit f04f86a

Please sign in to comment.