Skip to content

Commit

Permalink
[iceberg] Introduce integration for AWS Glue (#4624)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 2, 2024
1 parent a5cb687 commit 419b02a
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 22 deletions.
11 changes: 11 additions & 0 deletions docs/content/migration/iceberg-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,20 @@ you also need to set some (or all) of the following table options when creating
<td>Boolean</td>
<td>Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.</td>
</tr>
<tr>
<td><h5>metadata.iceberg.hive-client-class</h5></td>
<td style="word-wrap: break-word;">org.apache.hadoop.hive.metastore.HiveMetaStoreClient</td>
<td>String</td>
<td>Hive client class name for Iceberg Hive Catalog.</td>
</tr>
</tbody>
</table>

## AWS Glue Catalog

You can use Hive Catalog to connect AWS Glue metastore, you can use set `'metadata.iceberg.hive-client-class'` to
`'com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient'`.

## AWS Athena

AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,7 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
break;
case HADOOP_CATALOG:
case HIVE_CATALOG:
Path dbPath = table.location().getParent();
final String dbSuffix = ".db";
if (dbPath.getName().endsWith(dbSuffix)) {
String dbName =
dbPath.getName()
.substring(0, dbPath.getName().length() - dbSuffix.length());
String tableName = table.location().getName();
Path separatePath =
new Path(
dbPath.getParent(),
String.format("iceberg/%s/%s/metadata", dbName, tableName));
this.pathFactory = new IcebergPathFactory(separatePath);
} else {
throw new UnsupportedOperationException(
"Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse.");
}
this.pathFactory = new IcebergPathFactory(catalogTableMetadataPath(table));
break;
default:
throw new UnsupportedOperationException(
Expand All @@ -152,6 +137,24 @@ public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
this.manifestList = IcebergManifestList.create(table, pathFactory);
}

public static Path catalogTableMetadataPath(FileStoreTable table) {
Path icebergDBPath = catalogDatabasePath(table);
return new Path(icebergDBPath, String.format("%s/metadata", table.location().getName()));
}

public static Path catalogDatabasePath(FileStoreTable table) {
Path dbPath = table.location().getParent();
final String dbSuffix = ".db";
if (dbPath.getName().endsWith(dbSuffix)) {
String dbName =
dbPath.getName().substring(0, dbPath.getName().length() - dbSuffix.length());
return new Path(dbPath.getParent(), String.format("iceberg/%s/", dbName));
} else {
throw new UnsupportedOperationException(
"Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse.");
}
}

@Override
public void close() throws Exception {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public class IcebergOptions {
.withDescription(
"Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.");

public static final ConfigOption<String> HIVE_CLIENT_CLASS =
key("metadata.iceberg.hive-client-class")
.stringType()
.defaultValue("org.apache.hadoop.hive.metastore.HiveMetaStoreClient")
.withDescription("Hive client class name for Iceberg Hive Catalog.");

/** Where to store Iceberg metadata. */
public enum StorageType implements DescribedEnum {
DISABLED("disabled", "Disable Iceberg compatibility support."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.HiveCatalogFactory;
import org.apache.paimon.hive.HiveTypeUtils;
import org.apache.paimon.hive.pool.CachedClientPool;
import org.apache.paimon.options.Options;
Expand All @@ -49,6 +48,8 @@
import java.util.HashMap;
import java.util.stream.Collectors;

import static org.apache.paimon.iceberg.AbstractIcebergCommitCallback.catalogDatabasePath;

/**
* {@link IcebergMetadataCommitter} to commit Iceberg metadata to Hive metastore, so the table can
* be visited by Iceberg's Hive catalog.
Expand Down Expand Up @@ -98,9 +99,7 @@ public IcebergHiveMetadataCommitter(FileStoreTable table) {

this.clients =
new CachedClientPool(
hiveConf,
options,
HiveCatalogFactory.METASTORE_CLIENT_CLASS.defaultValue());
hiveConf, options, options.getString(IcebergOptions.HIVE_CLIENT_CLASS));
}

@Override
Expand Down Expand Up @@ -158,6 +157,7 @@ private boolean databaseExists(String databaseName) throws Exception {
private void createDatabase(String databaseName) throws Exception {
Database database = new Database();
database.setName(databaseName);
database.setLocationUri(catalogDatabasePath(table).toString());
clients.execute(client -> client.createDatabase(database));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,12 @@

package org.apache.paimon.iceberg;

import org.apache.paimon.hive.CreateFailHiveMetaStoreClient;

/** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 2.3. */
public class IcebergHive23MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {}
public class IcebergHive23MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {
@Override
protected String createFailHiveMetaStoreClient() {
return CreateFailHiveMetaStoreClient.class.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,12 @@

package org.apache.paimon.iceberg;

import org.apache.paimon.hive.CreateFailHiveMetaStoreClient;

/** IT cases for {@link IcebergHiveMetadataCommitter} in Hive 3.1. */
public class IcebergHive31MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {}
public class IcebergHive31MetadataCommitterITCase extends IcebergHiveMetadataCommitterITCaseBase {
@Override
protected String createFailHiveMetaStoreClient() {
return CreateFailHiveMetaStoreClient.class.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public void testPrimaryKeyTable() throws Exception {
Row.of(2, 1, "cat"),
Row.of(2, 2, "elephant")),
collect(tEnv.executeSql("SELECT * FROM my_iceberg.test_db.t ORDER BY pt, id")));

Assert.assertTrue(
hiveShell
.executeQuery("DESC DATABASE EXTENDED test_db")
.toString()
.contains("iceberg/test_db"));
}

@Test
Expand Down Expand Up @@ -150,6 +156,36 @@ public void testAppendOnlyTable() throws Exception {
"SELECT data, id, pt FROM my_iceberg.test_db.t WHERE id > 1 ORDER BY pt, id")));
}

@Test
public void testCustomMetastoreClass() {
TableEnvironment tEnv =
TableEnvironmentImpl.create(
EnvironmentSettings.newInstance().inBatchMode().build());
tEnv.executeSql(
"CREATE CATALOG my_paimon WITH ( 'type' = 'paimon', 'warehouse' = '"
+ path
+ "' )");
tEnv.executeSql("CREATE DATABASE my_paimon.test_db");
tEnv.executeSql(
String.format(
"CREATE TABLE my_paimon.test_db.t ( pt INT, id INT, data STRING ) PARTITIONED BY (pt) WITH "
+ "( "
+ "'metadata.iceberg.storage' = 'hive-catalog', "
+ "'metadata.iceberg.uri' = '', "
+ "'file.format' = 'avro', "
+ "'metadata.iceberg.hive-client-class' = '%s')",
createFailHiveMetaStoreClient()));
Assert.assertThrows(
Exception.class,
() ->
tEnv.executeSql(
"INSERT INTO my_paimon.test_db.t VALUES "
+ "(1, 1, 'apple'), (1, 2, 'pear'), (2, 1, 'cat'), (2, 2, 'dog')")
.await());
}

protected abstract String createFailHiveMetaStoreClient();

private List<Row> collect(TableResult result) throws Exception {
List<Row> rows = new ArrayList<>();
try (CloseableIterator<Row> it = result.collect()) {
Expand Down

0 comments on commit 419b02a

Please sign in to comment.