diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java index 0dcf24f3a67..6c972b08390 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java @@ -32,25 +32,41 @@ public class PaimonPropertiesUtils { // will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will // change it to `catalogType` automatically and pass it to Paimon. public static final Map GRAVITINO_CONFIG_TO_PAIMON; + public static final Map PAIMON_CATALOG_CONFIG_TO_GRAVITINO; static { - Map map = new HashMap(); - map.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.CATALOG_BACKEND); - map.put(PaimonConstants.GRAVITINO_JDBC_DRIVER, PaimonConstants.GRAVITINO_JDBC_DRIVER); - map.put(PaimonConstants.GRAVITINO_JDBC_USER, PaimonConstants.PAIMON_JDBC_USER); - map.put(PaimonConstants.GRAVITINO_JDBC_PASSWORD, PaimonConstants.PAIMON_JDBC_PASSWORD); - map.put(PaimonConstants.URI, PaimonConstants.URI); - map.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE); - map.put(PaimonConstants.CATALOG_BACKEND_NAME, PaimonConstants.CATALOG_BACKEND_NAME); + Map gravitinoConfigToPaimon = new HashMap<>(); + Map paimonCatalogConfigToGravitino = new HashMap<>(); + gravitinoConfigToPaimon.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.METASTORE); + gravitinoConfigToPaimon.put( + PaimonConstants.GRAVITINO_JDBC_DRIVER, PaimonConstants.GRAVITINO_JDBC_DRIVER); + gravitinoConfigToPaimon.put( + PaimonConstants.GRAVITINO_JDBC_USER, PaimonConstants.PAIMON_JDBC_USER); + gravitinoConfigToPaimon.put( + PaimonConstants.GRAVITINO_JDBC_PASSWORD, PaimonConstants.PAIMON_JDBC_PASSWORD); + gravitinoConfigToPaimon.put(PaimonConstants.URI, PaimonConstants.URI); + gravitinoConfigToPaimon.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE); + gravitinoConfigToPaimon.put( + PaimonConstants.CATALOG_BACKEND_NAME, PaimonConstants.CATALOG_BACKEND_NAME); // S3 - map.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT); - map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, PaimonConstants.S3_ACCESS_KEY); - map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, PaimonConstants.S3_SECRET_KEY); + gravitinoConfigToPaimon.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT); + gravitinoConfigToPaimon.put( + S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, PaimonConstants.S3_ACCESS_KEY); + gravitinoConfigToPaimon.put( + S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, PaimonConstants.S3_SECRET_KEY); // OSS - map.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, PaimonConstants.OSS_ENDPOINT); - map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, PaimonConstants.OSS_ACCESS_KEY); - map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, PaimonConstants.OSS_SECRET_KEY); - GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(map); + gravitinoConfigToPaimon.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, PaimonConstants.OSS_ENDPOINT); + gravitinoConfigToPaimon.put( + OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, PaimonConstants.OSS_ACCESS_KEY); + gravitinoConfigToPaimon.put( + OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, PaimonConstants.OSS_SECRET_KEY); + GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(gravitinoConfigToPaimon); + gravitinoConfigToPaimon.forEach( + (key, value) -> { + paimonCatalogConfigToGravitino.put(value, key); + }); + PAIMON_CATALOG_CONFIG_TO_GRAVITINO = + Collections.unmodifiableMap(paimonCatalogConfigToGravitino); } /** diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 9e2a48c036c..14329dd1c07 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -26,6 +26,7 @@ repositories { mavenCentral() } +var paimonVersion: String = libs.versions.paimon.get() val flinkVersion: String = libs.versions.flink.get() val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") @@ -38,14 +39,16 @@ val scalaVersion: String = "2.12" val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion" dependencies { + implementation(project(":core")) implementation(project(":catalogs:catalog-common")) + implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) - compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") + compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion") compileOnly(libs.hive2.exec) { artifact { @@ -75,6 +78,7 @@ dependencies { testImplementation(project(":clients:client-java")) testImplementation(project(":core")) testImplementation(project(":common")) + testImplementation(project(":catalogs:catalog-lakehouse-paimon")) testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":server")) testImplementation(project(":server-common")) @@ -90,6 +94,7 @@ dependencies { testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") testImplementation("org.apache.flink:flink-table-common:$flinkVersion") testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion") + testImplementation("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion") testImplementation(libs.hive2.exec) { artifact { @@ -170,6 +175,7 @@ tasks.test { } else { dependsOn(tasks.jar) dependsOn(":catalogs:catalog-hive:jar") + dependsOn(":catalogs:catalog-lakehouse-paimon:jar") } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java new file mode 100644 index 00000000000..017ac6e7085 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; + +/** + * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to + * proxy the PaimonCatalog class. + */ +public class GravitinoPaimonCatalog extends BaseCatalog { + + private final AbstractCatalog paimonCatalog; + + protected GravitinoPaimonCatalog( + String catalogName, + AbstractCatalog paimonCatalog, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter) { + super(catalogName, paimonCatalog.getDefaultDatabase(), propertiesConverter, partitionConverter); + this.paimonCatalog = paimonCatalog; + } + + @Override + protected AbstractCatalog realCatalog() { + return paimonCatalog; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java new file mode 100644 index 00000000000..52489fc667f --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.paimon.flink.FlinkCatalog; +import org.apache.paimon.flink.FlinkCatalogFactory; + +/** + * Factory for creating instances of {@link GravitinoPaimonCatalog}. It will be created by SPI + * discovery in Flink. + */ +public class GravitinoPaimonCatalogFactory implements BaseCatalogFactory { + + @Override + public Catalog createCatalog(Context context) { + FlinkCatalog catalog = new FlinkCatalogFactory().createCatalog(context); + return new GravitinoPaimonCatalog( + context.getName(), catalog, propertiesConverter(), partitionConverter()); + } + + @Override + public String factoryIdentifier() { + return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public String gravitinoCatalogProvider() { + return "lakehouse-paimon"; + } + + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + @Override + public PropertiesConverter propertiesConverter() { + return PaimonPropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java new file mode 100644 index 00000000000..dd78f96d24b --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +public class GravitinoPaimonCatalogFactoryOptions { + + /** Identifier for the {@link GravitinoPaimonCatalog}. */ + public static final String IDENTIFIER = "gravitino-paimon"; +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java new file mode 100644 index 00000000000..36a9e0f9a1a --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.paimon.catalog.FileSystemCatalogFactory; + +public class PaimonPropertiesConverter implements PropertiesConverter { + + public static final PaimonPropertiesConverter INSTANCE = new PaimonPropertiesConverter(); + + private PaimonPropertiesConverter() {} + + @Override + public Map toGravitinoCatalogProperties(Configuration flinkConf) { + Map gravitinoProperties = Maps.newHashMap(); + Map flinkConfMap = flinkConf.toMap(); + for (Map.Entry entry : flinkConfMap.entrySet()) { + String gravitinoKey = + PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey()); + if (gravitinoKey != null) { + gravitinoProperties.put(gravitinoKey, entry.getValue()); + } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { + gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); + } else { + gravitinoProperties.put(entry.getKey(), entry.getValue()); + } + } + gravitinoProperties.put( + PaimonConstants.CATALOG_BACKEND, + flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER)); + return gravitinoProperties; + } + + @Override + public Map toFlinkCatalogProperties(Map gravitinoProperties) { + Map flinkCatalogProperties = Maps.newHashMap(); + flinkCatalogProperties.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); + gravitinoProperties.forEach( + (key, value) -> { + String flinkConfigKey = key; + if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { + flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); + } + flinkCatalogProperties.put( + PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.getOrDefault( + flinkConfigKey, flinkConfigKey), + value); + }); + + return flinkCatalogProperties; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java index 92e778ce297..4c29b7fde3b 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java @@ -54,7 +54,8 @@ public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) { public void storeCatalog(String catalogName, CatalogDescriptor descriptor) throws CatalogException { Configuration configuration = descriptor.getConfiguration(); - BaseCatalogFactory catalogFactory = getCatalogFactory(configuration.toMap()); + Map gravitino = configuration.toMap(); + BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino); Map gravitinoProperties = catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration); gravitinoCatalogManager.createCatalog( diff --git a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index c9d9c92b5ef..a535afb6dc2 100644 --- a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -18,4 +18,5 @@ # org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory -org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory \ No newline at end of file +org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory +org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory \ No newline at end of file diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 2d022b4a8a4..bef1f920fb8 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -53,12 +54,30 @@ import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; public abstract class FlinkCommonIT extends FlinkEnvIT { protected abstract Catalog currentCatalog(); + protected boolean supportSchemaOperation() { + return true; + } + + protected boolean supportTableOperation() { + return true; + } + + protected boolean supportColumnOperation() { + return true; + } + + protected boolean supportSchemaOperationWithCommentAndOptions() { + return true; + } + @Test + @EnabledIf("supportSchemaOperation") public void testCreateSchema() { doWithCatalog( currentCatalog(), @@ -76,32 +95,20 @@ public void testCreateSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testGetSchema() { doWithCatalog( currentCatalog(), catalog -> { String schema = "test_get_schema"; - String comment = "test comment"; - String propertyKey = "key1"; - String propertyValue = "value1"; - String location = warehouse + "/" + schema; - try { TestUtils.assertTableResult( - sql( - "CREATE DATABASE IF NOT EXISTS %s COMMENT '%s' WITH ('%s'='%s', '%s'='%s')", - schema, comment, propertyKey, propertyValue, "location", location), - ResultKind.SUCCESS); + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); catalog.asSchemas().schemaExists(schema); Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); - Assertions.assertEquals(comment, loadedSchema.comment()); - Assertions.assertEquals(2, loadedSchema.properties().size()); - Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); - Assertions.assertEquals( - location, loadedSchema.properties().get(HiveConstants.LOCATION)); } finally { catalog.asSchemas().dropSchema(schema, true); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); @@ -110,11 +117,11 @@ public void testGetSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testListSchema() { doWithCatalog( currentCatalog(), catalog -> { - Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); String schema = "test_list_schema"; String schema2 = "test_list_schema2"; String schema3 = "test_list_schema3"; @@ -135,6 +142,7 @@ public void testListSchema() { Row.of(schema3)); String[] schemas = catalog.asSchemas().listSchemas(); + Arrays.sort(schemas); Assertions.assertEquals(4, schemas.length); Assertions.assertEquals("default", schemas[0]); Assertions.assertEquals(schema, schemas[1]); @@ -150,7 +158,8 @@ public void testListSchema() { } @Test - public void testAlterSchema() { + @EnabledIf("supportSchemaOperationWithCommentAndOptions") + public void testAlterSchemaWithCommentAndOptions() { doWithCatalog( currentCatalog(), catalog -> { @@ -188,6 +197,83 @@ public void testAlterSchema() { } @Test + @EnabledIf("supportSchemaOperationWithCommentAndOptions") + public void testGetSchemaWithCommentAndOptions() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_get_schema"; + String comment = "test comment"; + String propertyKey = "key1"; + String propertyValue = "value1"; + String location = warehouse + "/" + schema; + + try { + TestUtils.assertTableResult( + sql( + "CREATE DATABASE IF NOT EXISTS %s COMMENT '%s' WITH ('%s'='%s', '%s'='%s')", + schema, comment, propertyKey, propertyValue, "location", location), + ResultKind.SUCCESS); + TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); + + catalog.asSchemas().schemaExists(schema); + Schema loadedSchema = catalog.asSchemas().loadSchema(schema); + Assertions.assertEquals(schema, loadedSchema.name()); + Assertions.assertEquals(comment, loadedSchema.comment()); + Assertions.assertEquals(2, loadedSchema.properties().size()); + Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); + Assertions.assertEquals( + location, loadedSchema.properties().get(HiveConstants.LOCATION)); + } finally { + catalog.asSchemas().dropSchema(schema, true); + Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); + } + }); + } + + @Test + @EnabledIf("supportSchemaOperationWithCommentAndOptions") + public void testListSchemaWithCommentAndOptions() { + doWithCatalog( + currentCatalog(), + catalog -> { + Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); + String schema = "test_list_schema"; + String schema2 = "test_list_schema2"; + String schema3 = "test_list_schema3"; + + try { + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema2), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema3), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("SHOW DATABASES"), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of("default"), + Row.of(schema), + Row.of(schema2), + Row.of(schema3)); + + String[] schemas = catalog.asSchemas().listSchemas(); + Assertions.assertEquals(4, schemas.length); + Assertions.assertEquals("default", schemas[0]); + Assertions.assertEquals(schema, schemas[1]); + Assertions.assertEquals(schema2, schemas[2]); + Assertions.assertEquals(schema3, schemas[3]); + } finally { + catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema2, true); + catalog.asSchemas().dropSchema(schema3, true); + Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); + } + }); + } + + @Test + @EnabledIf("supportTableOperation") public void testCreateSimpleTable() { String databaseName = "test_create_no_partition_table_db"; String tableName = "test_create_no_partition_table"; @@ -236,6 +322,7 @@ public void testCreateSimpleTable() { } @Test + @EnabledIf("supportTableOperation") public void testListTables() { String newSchema = "test_list_table_catalog"; Column[] columns = new Column[] {Column.of("user_id", Types.IntegerType.get(), "USER_ID")}; @@ -268,6 +355,7 @@ public void testListTables() { } @Test + @EnabledIf("supportTableOperation") public void testDropTable() { String databaseName = "test_drop_table_db"; doWithSchema( @@ -289,6 +377,7 @@ public void testDropTable() { } @Test + @EnabledIf("supportTableOperation") public void testGetSimpleTable() { String databaseName = "test_get_simple_table"; Column[] columns = @@ -342,6 +431,7 @@ public void testGetSimpleTable() { } @Test + @EnabledIf("supportColumnOperation") public void testRenameColumn() { String databaseName = "test_rename_column_db"; String tableName = "test_rename_column"; @@ -377,6 +467,7 @@ public void testRenameColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableComment() { String databaseName = "test_alter_table_comment_database"; String tableName = "test_alter_table_comment"; @@ -436,6 +527,7 @@ public void testAlterTableComment() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableAddColumn() { String databaseName = "test_alter_table_add_column_db"; String tableName = "test_alter_table_add_column"; @@ -471,6 +563,7 @@ public void testAlterTableAddColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableDropColumn() { String databaseName = "test_alter_table_drop_column_db"; String tableName = "test_alter_table_drop_column"; @@ -501,6 +594,7 @@ public void testAlterTableDropColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterColumnTypeAndChangeOrder() { String databaseName = "test_alter_table_alter_column_db"; String tableName = "test_alter_table_rename_column"; @@ -542,6 +636,7 @@ public void testAlterColumnTypeAndChangeOrder() { } @Test + @EnabledIf("supportTableOperation") public void testRenameTable() { String databaseName = "test_rename_table_db"; String tableName = "test_rename_table"; @@ -569,6 +664,7 @@ public void testRenameTable() { } @Test + @EnabledIf("supportTableOperation") public void testAlterTableProperties() { String databaseName = "test_alter_table_properties_db"; String tableName = "test_alter_table_properties"; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java new file mode 100644 index 00000000000..c10f843f753 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.integration.test.paimon; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.nio.file.Path; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +@Tag("gravitino-docker-test") +public class FlinkPaimonCatalogIT extends FlinkCommonIT { + + @TempDir private static Path warehouseDir; + + private static final String DEFAULT_PAIMON_CATALOG = + "test_flink_paimon_filesystem_schema_catalog"; + + private static org.apache.gravitino.Catalog catalog; + + @Override + protected boolean supportColumnOperation() { + return false; + } + + @Override + protected boolean supportTableOperation() { + return false; + } + + @Override + protected boolean supportSchemaOperationWithCommentAndOptions() { + return false; + } + + protected Catalog currentCatalog() { + return catalog; + } + + @BeforeAll + static void setup() { + initPaimonCatalog(); + } + + @AfterAll + static void stop() { + Preconditions.checkNotNull(metalake); + metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true); + } + + private static void initPaimonCatalog() { + Preconditions.checkNotNull(metalake); + catalog = + metalake.createCatalog( + DEFAULT_PAIMON_CATALOG, + org.apache.gravitino.Catalog.Type.RELATIONAL, + "lakehouse-paimon", + null, + ImmutableMap.of( + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + "filesystem", + "warehouse", + warehouseDir.toString())); + } + + @Test + public void testCreateGravitinoPaimonCatalogUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "gravitino_hive_sql"; + String warehouse = warehouseDir.toString(); + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-paimon', " + + "'warehouse'='%s'," + + "'catalog.backend'='filesystem'" + + ")", + catalogName, warehouse)); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(warehouse, properties.get("warehouse")); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java new file mode 100644 index 00000000000..47673bc2f18 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Test for {@link PaimonPropertiesConverter} */ +public class TestPaimonPropertiesConverter { + + private static final PaimonPropertiesConverter CONVERTER = PaimonPropertiesConverter.INSTANCE; + + private static final String localWarehouse = "file:///tmp/paimon_warehouse"; + + @Test + public void testToPaimonFileSystemCatalog() { + Map catalogProperties = ImmutableMap.of("warehouse", localWarehouse); + Map flinkCatalogProperties = + CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals( + GravitinoPaimonCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals(localWarehouse, flinkCatalogProperties.get("warehouse")); + } + + @Test + public void testToPaimonJdbcCatalog() { + String testUser = "testUser"; + String testPassword = "testPassword"; + String testUri = "testUri"; + Map catalogProperties = + ImmutableMap.of( + PaimonConfig.CATALOG_WAREHOUSE.getKey(), + localWarehouse, + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + "jdbc", + PaimonConfig.CATALOG_JDBC_USER.getKey(), + testUser, + PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), + testPassword, + PaimonConfig.CATALOG_URI.getKey(), + testUri, + "flink.bypass.key", + "value"); + Map flinkCatalogProperties = + CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals( + GravitinoPaimonCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals(localWarehouse, flinkCatalogProperties.get(PaimonConstants.WAREHOUSE)); + Assertions.assertEquals(testUser, flinkCatalogProperties.get(PaimonConstants.PAIMON_JDBC_USER)); + Assertions.assertEquals( + testPassword, flinkCatalogProperties.get(PaimonConstants.PAIMON_JDBC_PASSWORD)); + Assertions.assertEquals("jdbc", flinkCatalogProperties.get(PaimonConstants.METASTORE)); + Assertions.assertEquals(testUri, flinkCatalogProperties.get(PaimonConstants.URI)); + Assertions.assertEquals("value", flinkCatalogProperties.get("key")); + } + + @Test + public void testToGravitinoCatalogProperties() { + String testUser = "testUser"; + String testPassword = "testPassword"; + String testUri = "testUri"; + String testBackend = "jdbc"; + Configuration configuration = + Configuration.fromMap( + ImmutableMap.of( + PaimonConstants.WAREHOUSE, + localWarehouse, + PaimonConstants.METASTORE, + testBackend, + PaimonConstants.PAIMON_JDBC_USER, + testUser, + PaimonConstants.PAIMON_JDBC_PASSWORD, + testPassword, + PaimonConstants.URI, + testUri)); + Map properties = CONVERTER.toGravitinoCatalogProperties(configuration); + Assertions.assertEquals( + localWarehouse, properties.get(PaimonConfig.CATALOG_WAREHOUSE.getKey())); + Assertions.assertEquals(testUser, properties.get(PaimonConfig.CATALOG_JDBC_USER.getKey())); + Assertions.assertEquals( + testPassword, properties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); + Assertions.assertEquals(testUri, properties.get(PaimonConfig.CATALOG_URI.getKey())); + Assertions.assertEquals(testBackend, properties.get(PaimonConstants.CATALOG_BACKEND)); + } +} diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java index f713ca89ddd..1544dfeec8c 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; -import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.spark.connector.PropertiesConverter; @@ -42,14 +41,7 @@ public static PaimonPropertiesConverter getInstance() { @Override public Map toSparkCatalogProperties(Map properties) { Preconditions.checkArgument(properties != null, "Paimon Catalog properties should not be null"); - Map all = PaimonPropertiesUtils.toPaimonCatalogProperties(properties); - String catalogBackend = all.remove(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND); - Preconditions.checkArgument( - StringUtils.isNotBlank(catalogBackend), - String.format( - "%s should not be empty", PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND)); - all.put(PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, catalogBackend); - return all; + return PaimonPropertiesUtils.toPaimonCatalogProperties(properties); } @Override