Skip to content

Commit

Permalink
Support HDFS config and custom Hadoop properties (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Sep 2, 2023
1 parent b392508 commit 12e44d9
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.control.commitThreads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.hadoop.* | Properties passed through to Hadoop configuration |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |

If `iceberg.tables.dynamic.enabled` is `false` (the default) then you must specify `iceberg.tables`. If
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.4.10"
version "0.4.11"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String ROUTE_REGEX = "routeRegex";

private static final String CATALOG_PROP_PREFIX = "iceberg.catalog.";
private static final String HADOOP_PROP_PREFIX = "iceberg.hadoop.";
private static final String KAFKA_PROP_PREFIX = "iceberg.kafka.";
private static final String TABLE_PROP_PREFIX = "iceberg.table.";

Expand Down Expand Up @@ -167,6 +168,7 @@ private static ConfigDef newConfigDef() {

private final Map<String, String> originalProps;
private final Map<String, String> catalogProps;
private final Map<String, String> hadoopProps;
private final Map<String, String> kafkaProps;
private final Map<String, Pattern> tableRouteRegexMap = new HashMap<>();
private final JsonConverter jsonConverter;
Expand All @@ -176,6 +178,7 @@ public IcebergSinkConfig(Map<String, String> originalProps) {
this.originalProps = originalProps;

this.catalogProps = PropertyUtil.propertiesWithPrefix(originalProps, CATALOG_PROP_PREFIX);
this.hadoopProps = PropertyUtil.propertiesWithPrefix(originalProps, HADOOP_PROP_PREFIX);

this.kafkaProps = new HashMap<>(loadWorkerProps());
kafkaProps.putAll(PropertyUtil.propertiesWithPrefix(originalProps, KAFKA_PROP_PREFIX));
Expand Down Expand Up @@ -226,6 +229,10 @@ public Map<String, String> getCatalogProps() {
return catalogProps;
}

public Map<String, String> getHadoopProps() {
return hadoopProps;
}

public Map<String, String> getKafkaProps() {
return kafkaProps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.common.DynMethods.BoundMethod;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.FileAppenderFactory;
Expand All @@ -52,15 +55,31 @@ public class Utilities {

public static Catalog loadCatalog(IcebergSinkConfig config) {
return CatalogUtil.buildIcebergCatalog(
config.getCatalogName(), config.getCatalogProps(), getHadoopConfig());
config.getCatalogName(),
config.getCatalogProps(),
getHadoopConfig(config.getHadoopProps()));
}

private static Object getHadoopConfig() {
try {
Class<?> clazz = Class.forName("org.apache.hadoop.conf.Configuration");
return clazz.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException e) {
private static Object getHadoopConfig(Map<String, String> hadoopProps) {
Class<?> configClass =
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
if (configClass == null) {
configClass =
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
}

if (configClass == null) {
LOG.info("Hadoop not found on classpath, not creating Hadoop config");
return null;
}

try {
Object result = configClass.getDeclaredConstructor().newInstance();
BoundMethod setMethod =
DynMethods.builder("set").impl(configClass, String.class, String.class).build(result);
hadoopProps.forEach(setMethod::invoke);
LOG.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (InstantiationException
| IllegalAccessException
| NoSuchMethodException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.data;

import static org.assertj.core.api.Assertions.assertThat;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;

public class UtilitiesTest {

public static class TestCatalog extends InMemoryCatalog implements Configurable<Configuration> {
private Configuration conf;

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
}

@Test
public void testLoadCatalog() {
Map<String, String> props =
ImmutableMap.of(
"topics",
"mytopic",
"iceberg.tables",
"mytable",
"iceberg.hadoop.prop",
"value",
"iceberg.catalog.catalog-impl",
TestCatalog.class.getName());
IcebergSinkConfig config = new IcebergSinkConfig(props);
Catalog result = Utilities.loadCatalog(config);

assertThat(result).isInstanceOf(TestCatalog.class);

Configuration conf = ((TestCatalog) result).conf;
assertThat(conf).isNotNull();
assertThat(conf.get("prop")).isEqualTo("value");

// check that core-site.xml was loaded
assertThat(conf.get("foo")).isEqualTo("bar");
}
}
7 changes: 7 additions & 0 deletions kafka-connect/src/test/resources/core-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0"?>
<configuration>
<property>
<name>foo</name>
<value>bar</value>
</property>
</configuration>
6 changes: 3 additions & 3 deletions versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ awaitility-ver = "4.2.0"
hadoop-ver = "3.3.6"
hive-ver = "2.3.9"
http-client-ver = "5.2.1"
iceberg-ver = "1.3.1-tabular.17"
iceberg-ver = "1.3.1-tabular.29"
jackson-ver = "2.14.2"
junit-ver = "5.9.2"
kafka-ver = "3.5.1"
Expand Down Expand Up @@ -33,12 +33,12 @@ iceberg-hive-metastore = { module = "org.apache.iceberg:iceberg-hive-metastore",
iceberg-nessie = { module = "org.apache.iceberg:iceberg-nessie", version.ref = "iceberg-ver" }
iceberg-orc = { module = "org.apache.iceberg:iceberg-orc", version.ref = "iceberg-ver" }
iceberg-parquet = { module = "org.apache.iceberg:iceberg-parquet", version.ref = "iceberg-ver" }
jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson-ver" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-ver" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka-ver" }
kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka-ver" }
kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka-ver" }
kafka-connect-transforms = { module = "org.apache.kafka:connect-transforms", version.ref = "kafka-ver" }
jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson-ver" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-ver" }
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j-ver" }

# test dependencies
Expand Down

0 comments on commit 12e44d9

Please sign in to comment.