From 419b02a836da34e2050a1d6c56a57e3ea32d7e99 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 2 Dec 2024 21:09:48 +0800 Subject: [PATCH] [iceberg] Introduce integration for AWS Glue (#4624) --- .../migration/iceberg-compatibility.md | 11 ++++++ .../AbstractIcebergCommitCallback.java | 35 +++++++++--------- .../apache/paimon/iceberg/IcebergOptions.java | 6 ++++ .../iceberg/IcebergHiveMetadataCommitter.java | 8 ++--- .../IcebergHive23MetadataCommitterITCase.java | 9 ++++- .../IcebergHive31MetadataCommitterITCase.java | 9 ++++- ...cebergHiveMetadataCommitterITCaseBase.java | 36 +++++++++++++++++++ 7 files changed, 92 insertions(+), 22 deletions(-) diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md index d74560714864..01a03a45264d 100644 --- a/docs/content/migration/iceberg-compatibility.md +++ b/docs/content/migration/iceberg-compatibility.md @@ -383,9 +383,20 @@ you also need to set some (or all) of the following table options when creating Boolean Should use the legacy manifest version to generate Iceberg's 1.4 manifest files. + +
metadata.iceberg.hive-client-class
+ org.apache.hadoop.hive.metastore.HiveMetaStoreClient + String + Hive client class name for Iceberg Hive Catalog. + +## AWS Glue Catalog + +You can use Hive Catalog to connect AWS Glue metastore, you can use set `'metadata.iceberg.hive-client-class'` to +`'com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient'`. + ## AWS Athena AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index 1b952c1716cf..7ea6cbe05777 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -112,22 +112,7 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { break; case HADOOP_CATALOG: case HIVE_CATALOG: - Path dbPath = table.location().getParent(); - final String dbSuffix = ".db"; - if (dbPath.getName().endsWith(dbSuffix)) { - String dbName = - dbPath.getName() - .substring(0, dbPath.getName().length() - dbSuffix.length()); - String tableName = table.location().getName(); - Path separatePath = - new Path( - dbPath.getParent(), - String.format("iceberg/%s/%s/metadata", dbName, tableName)); - this.pathFactory = new IcebergPathFactory(separatePath); - } else { - throw new UnsupportedOperationException( - "Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse."); - } + this.pathFactory = new IcebergPathFactory(catalogTableMetadataPath(table)); break; default: throw new UnsupportedOperationException( @@ -152,6 +137,24 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { this.manifestList = IcebergManifestList.create(table, pathFactory); } + public static Path catalogTableMetadataPath(FileStoreTable table) { + Path icebergDBPath = catalogDatabasePath(table); + return new Path(icebergDBPath, String.format("%s/metadata", table.location().getName())); + } + + public static Path catalogDatabasePath(FileStoreTable table) { + Path dbPath = table.location().getParent(); + final String dbSuffix = ".db"; + if (dbPath.getName().endsWith(dbSuffix)) { + String dbName = + dbPath.getName().substring(0, dbPath.getName().length() - dbSuffix.length()); + return new Path(dbPath.getParent(), String.format("iceberg/%s/", dbName)); + } else { + throw new UnsupportedOperationException( + "Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse."); + } + } + @Override public void close() throws Exception {} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java index c0ceed97ba8c..4b59e29c8c33 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java @@ -84,6 +84,12 @@ public class IcebergOptions { .withDescription( "Should use the legacy manifest version to generate Iceberg's 1.4 manifest files."); + public static final ConfigOption HIVE_CLIENT_CLASS = + key("metadata.iceberg.hive-client-class") + .stringType() + .defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient") + .withDescription("Hive client class name for Iceberg Hive Catalog."); + /** Where to store Iceberg metadata. */ public enum StorageType implements DescribedEnum { DISABLED("disabled", "Disable Iceberg compatibility support."), diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java index d913f729e351..ddd21384cbc8 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java @@ -22,7 +22,6 @@ import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.Path; import org.apache.paimon.hive.HiveCatalog; -import org.apache.paimon.hive.HiveCatalogFactory; import org.apache.paimon.hive.HiveTypeUtils; import org.apache.paimon.hive.pool.CachedClientPool; import org.apache.paimon.options.Options; @@ -49,6 +48,8 @@ import java.util.HashMap; import java.util.stream.Collectors; +import static org.apache.paimon.iceberg.AbstractIcebergCommitCallback.catalogDatabasePath; + /** * {@link IcebergMetadataCommitter} to commit Iceberg metadata to Hive metastore, so the table can * be visited by Iceberg's Hive catalog. @@ -98,9 +99,7 @@ public IcebergHiveMetadataCommitter(FileStoreTable table) { this.clients = new CachedClientPool( - hiveConf, - options, - HiveCatalogFactory.METASTORE_CLIENT_CLASS.defaultValue()); + hiveConf, options, options.getString(IcebergOptions.HIVE_CLIENT_CLASS)); } @Override @@ -158,6 +157,7 @@ private boolean databaseExists(String databaseName) throws Exception { private void createDatabase(String databaseName) throws Exception { Database database = new Database(); database.setName(databaseName); + database.setLocationUri(catalogDatabasePath(table).toString()); clients.execute(client -> client.createDatabase(database)); } diff --git a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java index a9e4ba945440..7d726e75a17d 100644 --- a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java +++ b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/iceberg/IcebergHive23MetadataCommitterITCase.java @@ -18,5 +18,12 @@ package org.apache.paimon.iceberg; +import org.apache.paimon.hive.CreateFailHiveMetaStoreClient; + /** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 2.3. */ -public class IcebergHive23MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {} +public class IcebergHive23MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase { + @Override + protected String createFailHiveMetaStoreClient() { + return CreateFailHiveMetaStoreClient.class.getName(); + } +} diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java index 6f4b0afd1ae1..0634adfad357 100644 --- a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java +++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/iceberg/IcebergHive31MetadataCommitterITCase.java @@ -18,5 +18,12 @@ package org.apache.paimon.iceberg; +import org.apache.paimon.hive.CreateFailHiveMetaStoreClient; + /** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 3.1. */ -public class IcebergHive31MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {} +public class IcebergHive31MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase { + @Override + protected String createFailHiveMetaStoreClient() { + return CreateFailHiveMetaStoreClient.class.getName(); + } +} diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java index fab22775751b..d0c64c5d3b7f 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitterITCaseBase.java @@ -104,6 +104,12 @@ public void testPrimaryKeyTable() throws Exception { Row.of(2, 1, "cat"), Row.of(2, 2, "elephant")), collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t ORDER BY pt, id"))); + + Assert.assertTrue( + hiveShell + .executeQuery("DESC DATABASE EXTENDED test_db") + .toString() + .contains("iceberg/test_db")); } @Test @@ -150,6 +156,36 @@ public void testAppendOnlyTable() throws Exception { "SELECT data, id, pt FROM my_iceberg.test_db.t WHERE id > 1 ORDER BY pt, id"))); } + @Test + public void testCustomMetastoreClass() { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv.executeSql( + "CREATE CATALOG my_paimon WITH ( 'type' = 'paimon', 'warehouse' = '" + + path + + "' )"); + tEnv.executeSql("CREATE DATABASE my_paimon.test_db"); + tEnv.executeSql( + String.format( + "CREATE TABLE my_paimon.test_db.t ( pt INT, id INT, data STRING ) PARTITIONED BY (pt) WITH " + + "( " + + "'metadata.iceberg.storage' = 'hive-catalog', " + + "'metadata.iceberg.uri' = '', " + + "'file.format' = 'avro', " + + "'metadata.iceberg.hive-client-class' = '%s')", + createFailHiveMetaStoreClient())); + Assert.assertThrows( + Exception.class, + () -> + tEnv.executeSql( + "INSERT INTO my_paimon.test_db.t VALUES " + + "(1, 1, 'apple'), (1, 2, 'pear'), (2, 1, 'cat'), (2, 2, 'dog')") + .await()); + } + + protected abstract String createFailHiveMetaStoreClient(); + private List collect(TableResult result) throws Exception { List rows = new ArrayList<>(); try (CloseableIterator it = result.collect()) {