From 12e44d9339d06395a92a5b266d93b646d0c08ca5 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Sat, 2 Sep 2023 08:17:17 -0700 Subject: [PATCH] Support HDFS config and custom Hadoop properties (#68) --- README.md | 1 + build.gradle | 2 +- .../iceberg/connect/IcebergSinkConfig.java | 7 ++ .../iceberg/connect/data/Utilities.java | 31 +++++++-- .../iceberg/connect/data/UtilitiesTest.java | 67 +++++++++++++++++++ .../src/test/resources/core-site.xml | 7 ++ versions.toml | 6 +- 7 files changed, 111 insertions(+), 10 deletions(-) create mode 100644 kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UtilitiesTest.java create mode 100644 kafka-connect/src/test/resources/core-site.xml diff --git a/README.md b/README.md index d1dbb9d7..ece602a4 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions | iceberg.control.commitThreads | Number of threads to use for commits, default is (cores * 2) | | iceberg.catalog | Name of the catalog, default is `iceberg` | | iceberg.catalog.* | Properties passed through to Iceberg catalog initialization | +| iceberg.hadoop.* | Properties passed through to Hadoop configuration | | iceberg.kafka.* | Properties passed through to control topic Kafka client initialization | If `iceberg.tables.dynamic.enabled` is `false` (the default) then you must specify `iceberg.tables`. If diff --git a/build.gradle b/build.gradle index e243f614..841ce457 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ subprojects { apply plugin: "maven-publish" group "io.tabular.connect" - version "0.4.10" + version "0.4.11" repositories { mavenCentral() diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java index 0b7f6378..c8918e7b 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java @@ -55,6 +55,7 @@ public class IcebergSinkConfig extends AbstractConfig { private static final String ROUTE_REGEX = "routeRegex"; private static final String CATALOG_PROP_PREFIX = "iceberg.catalog."; + private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop."; private static final String KAFKA_PROP_PREFIX = "iceberg.kafka."; private static final String TABLE_PROP_PREFIX = "iceberg.table."; @@ -167,6 +168,7 @@ private static ConfigDef newConfigDef() { private final Map originalProps; private final Map catalogProps; + private final Map hadoopProps; private final Map kafkaProps; private final Map tableRouteRegexMap = new HashMap<>(); private final JsonConverter jsonConverter; @@ -176,6 +178,7 @@ public IcebergSinkConfig(Map originalProps) { this.originalProps = originalProps; this.catalogProps = PropertyUtil.propertiesWithPrefix(originalProps, CATALOG_PROP_PREFIX); + this.hadoopProps = PropertyUtil.propertiesWithPrefix(originalProps, HADOOP_PROP_PREFIX); this.kafkaProps = new HashMap<>(loadWorkerProps()); kafkaProps.putAll(PropertyUtil.propertiesWithPrefix(originalProps, KAFKA_PROP_PREFIX)); @@ -226,6 +229,10 @@ public Map getCatalogProps() { return catalogProps; } + public Map getHadoopProps() { + return hadoopProps; + } + public Map getKafkaProps() { return kafkaProps; } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java index e9151e19..7988415b 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java @@ -32,6 +32,9 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.common.DynMethods; +import org.apache.iceberg.common.DynMethods.BoundMethod; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppenderFactory; @@ -52,15 +55,31 @@ public class Utilities { public static Catalog loadCatalog(IcebergSinkConfig config) { return CatalogUtil.buildIcebergCatalog( - config.getCatalogName(), config.getCatalogProps(), getHadoopConfig()); + config.getCatalogName(), + config.getCatalogProps(), + getHadoopConfig(config.getHadoopProps())); } - private static Object getHadoopConfig() { - try { - Class clazz = Class.forName("org.apache.hadoop.conf.Configuration"); - return clazz.getDeclaredConstructor().newInstance(); - } catch (ClassNotFoundException e) { + private static Object getHadoopConfig(Map hadoopProps) { + Class configClass = + DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build(); + if (configClass == null) { + configClass = + DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build(); + } + + if (configClass == null) { LOG.info("Hadoop not found on classpath, not creating Hadoop config"); + return null; + } + + try { + Object result = configClass.getDeclaredConstructor().newInstance(); + BoundMethod setMethod = + DynMethods.builder("set").impl(configClass, String.class, String.class).build(result); + hadoopProps.forEach(setMethod::invoke); + LOG.info("Hadoop config initialized: {}", configClass.getName()); + return result; } catch (InstantiationException | IllegalAccessException | NoSuchMethodException diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UtilitiesTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UtilitiesTest.java new file mode 100644 index 00000000..1fcfcc5f --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/UtilitiesTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.data; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.tabular.iceberg.connect.IcebergSinkConfig; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +public class UtilitiesTest { + + public static class TestCatalog extends InMemoryCatalog implements Configurable { + private Configuration conf; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + } + + @Test + public void testLoadCatalog() { + Map props = + ImmutableMap.of( + "topics", + "mytopic", + "iceberg.tables", + "mytable", + "iceberg.hadoop.prop", + "value", + "iceberg.catalog.catalog-impl", + TestCatalog.class.getName()); + IcebergSinkConfig config = new IcebergSinkConfig(props); + Catalog result = Utilities.loadCatalog(config); + + assertThat(result).isInstanceOf(TestCatalog.class); + + Configuration conf = ((TestCatalog) result).conf; + assertThat(conf).isNotNull(); + assertThat(conf.get("prop")).isEqualTo("value"); + + // check that core-site.xml was loaded + assertThat(conf.get("foo")).isEqualTo("bar"); + } +} diff --git a/kafka-connect/src/test/resources/core-site.xml b/kafka-connect/src/test/resources/core-site.xml new file mode 100644 index 00000000..1c2b47a5 --- /dev/null +++ b/kafka-connect/src/test/resources/core-site.xml @@ -0,0 +1,7 @@ + + + + foo + bar + + \ No newline at end of file diff --git a/versions.toml b/versions.toml index 4cad80a0..b65722a9 100644 --- a/versions.toml +++ b/versions.toml @@ -5,7 +5,7 @@ awaitility-ver = "4.2.0" hadoop-ver = "3.3.6" hive-ver = "2.3.9" http-client-ver = "5.2.1" -iceberg-ver = "1.3.1-tabular.17" +iceberg-ver = "1.3.1-tabular.29" jackson-ver = "2.14.2" junit-ver = "5.9.2" kafka-ver = "3.5.1" @@ -33,12 +33,12 @@ iceberg-hive-metastore = { module = "org.apache.iceberg:iceberg-hive-metastore", iceberg-nessie = { module = "org.apache.iceberg:iceberg-nessie", version.ref = "iceberg-ver" } iceberg-orc = { module = "org.apache.iceberg:iceberg-orc", version.ref = "iceberg-ver" } iceberg-parquet = { module = "org.apache.iceberg:iceberg-parquet", version.ref = "iceberg-ver" } +jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson-ver" } +jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-ver" } kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka-ver" } kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka-ver" } kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka-ver" } kafka-connect-transforms = { module = "org.apache.kafka:connect-transforms", version.ref = "kafka-ver" } -jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson-ver" } -jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-ver" } slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j-ver" } # test dependencies