From 278ac2de1697cefa8e65c1c9ed0ca5730f423f5f Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 11 Dec 2023 09:13:43 +0800 Subject: [PATCH] [improve][io] Make connectors load sensitive fields from secrets (#21675) (cherry picked from commit c55df07d3c43df09130ddd2e1e8411b39467e46a) --- pulsar-io/canal/pom.xml | 5 ++ .../pulsar/io/canal/CanalAbstractSource.java | 2 +- .../pulsar/io/canal/CanalSourceConfig.java | 7 ++- .../pulsar/io/common/IOConfigUtils.java | 7 ++- .../pulsar/io/common/IOConfigUtilsTest.java | 11 ++++ pulsar-io/dynamodb/pom.xml | 6 ++ .../pulsar/io/dynamodb/DynamoDBSource.java | 2 +- .../io/dynamodb/DynamoDBSourceConfig.java | 8 ++- .../dynamodb/DynamoDBSourceConfigTests.java | 52 ++++++++++++++-- pulsar-io/influxdb/pom.xml | 5 ++ .../influxdb/InfluxDBGenericRecordSink.java | 4 +- .../io/influxdb/v1/InfluxDBAbstractSink.java | 2 +- .../io/influxdb/v1/InfluxDBSinkConfig.java | 11 ++-- .../pulsar/io/influxdb/v2/InfluxDBSink.java | 2 +- .../io/influxdb/v2/InfluxDBSinkConfig.java | 14 ++--- .../influxdb/v1/InfluxDBSinkConfigTest.java | 56 ++++++++++++++--- .../influxdb/v2/InfluxDBSinkConfigTest.java | 29 +++++++-- pulsar-io/jdbc/core/pom.xml | 6 ++ .../pulsar/io/jdbc/JdbcAbstractSink.java | 2 +- .../apache/pulsar/io/jdbc/JdbcSinkConfig.java | 7 ++- pulsar-io/kafka/pom.xml | 5 ++ .../pulsar/io/kafka/KafkaAbstractSink.java | 6 +- .../pulsar/io/kafka/KafkaAbstractSource.java | 2 +- .../pulsar/io/kafka/KafkaSinkConfig.java | 11 ++-- .../pulsar/io/kafka/KafkaSourceConfig.java | 9 ++- .../io/kafka/sink/KafkaAbstractSinkTest.java | 8 +-- .../kafka/source/KafkaAbstractSourceTest.java | 26 +++++++- pulsar-io/mongo/pom.xml | 5 ++ .../io/mongodb/MongoAbstractConfig.java | 3 +- .../apache/pulsar/io/mongodb/MongoSink.java | 2 +- .../pulsar/io/mongodb/MongoSinkConfig.java | 9 ++- .../apache/pulsar/io/mongodb/MongoSource.java | 2 +- .../pulsar/io/mongodb/MongoSourceConfig.java | 10 ++-- .../io/mongodb/MongoSinkConfigTest.java | 41 ++++++++++--- .../io/mongodb/MongoSourceConfigTest.java | 38 ++++++++++-- pulsar-io/rabbitmq/pom.xml | 5 ++ .../pulsar/io/rabbitmq/RabbitMQSink.java | 2 +- .../io/rabbitmq/RabbitMQSinkConfig.java | 9 ++- .../pulsar/io/rabbitmq/RabbitMQSource.java | 2 +- .../io/rabbitmq/RabbitMQSourceConfig.java | 7 ++- .../rabbitmq/sink/RabbitMQSinkConfigTest.java | 52 ++++++++++++++-- .../source/RabbitMQSourceConfigTest.java | 60 +++++++++++++++++-- pulsar-io/redis/pom.xml | 5 ++ .../pulsar/io/redis/RedisAbstractConfig.java | 5 +- .../pulsar/io/redis/sink/RedisSink.java | 2 +- .../pulsar/io/redis/sink/RedisSinkConfig.java | 11 ++-- .../io/redis/sink/RedisSinkConfigTest.java | 47 ++++++++++++--- .../pulsar/io/redis/sink/RedisSinkTest.java | 5 +- pulsar-io/solr/pom.xml | 5 ++ .../pulsar/io/solr/SolrAbstractSink.java | 2 +- .../apache/pulsar/io/solr/SolrSinkConfig.java | 7 ++- .../pulsar/io/solr/SolrSinkConfigTest.java | 47 ++++++++++++--- 52 files changed, 540 insertions(+), 148 deletions(-) diff --git a/pulsar-io/canal/pom.xml b/pulsar-io/canal/pom.xml index 34b96d1502b9c..5e65d582dccc6 100644 --- a/pulsar-io/canal/pom.xml +++ b/pulsar-io/canal/pom.xml @@ -37,6 +37,11 @@ + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-io-core diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java index 06c8788d5aea1..7d0cd0305a49e 100644 --- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java +++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java @@ -57,7 +57,7 @@ public abstract class CanalAbstractSource extends PushSource { @Override public void open(Map config, SourceContext sourceContext) throws Exception { - canalSourceConfig = CanalSourceConfig.load(config); + canalSourceConfig = CanalSourceConfig.load(config, sourceContext); if (canalSourceConfig.getCluster()) { connector = CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(), canalSourceConfig.getDestination(), canalSourceConfig.getUsername(), diff --git a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java index a0408e60e5f76..5a754988ffdc1 100644 --- a/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java +++ b/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalSourceConfig.java @@ -26,6 +26,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @@ -86,8 +88,7 @@ public static CanalSourceConfig load(String yamlFile) throws IOException { } - public static CanalSourceConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), CanalSourceConfig.class); + public static CanalSourceConfig load(Map map, SourceContext sourceContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, CanalSourceConfig.class, sourceContext); } } diff --git a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java index d15986a897caa..69d981bf68728 100644 --- a/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java +++ b/pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java @@ -77,13 +77,14 @@ private static T loadWithSecrets(Map map, Class clazz, } } configs.computeIfAbsent(field.getName(), key -> { - if (fieldDoc.required()) { - throw new IllegalArgumentException(field.getName() + " cannot be null"); - } + // Use default value if it is not null before checking required String value = fieldDoc.defaultValue(); if (value != null && !value.isEmpty()) { return value; } + if (fieldDoc.required()) { + throw new IllegalArgumentException(field.getName() + " cannot be null"); + } return null; }); } diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index 52afac1a5ac0c..fdcd2ea7fe88a 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -54,6 +54,14 @@ static class TestDefaultConfig { ) protected String testRequired; + @FieldDoc( + required = true, + defaultValue = "defaultRequired", + sensitive = true, + help = "testRequired" + ) + protected String testDefaultRequired; + @FieldDoc( required = false, defaultValue = "defaultStr", @@ -299,6 +307,9 @@ public void testDefaultValue() { configMap.put("testRequired", "test"); TestDefaultConfig testDefaultConfig = IOConfigUtils.loadWithSecrets(configMap, TestDefaultConfig.class, new TestSinkContext()); + // if there is default value for a required field and no value provided when load config, + // it should not throw exception but use the default value. + Assert.assertEquals(testDefaultConfig.getTestDefaultRequired(), "defaultRequired"); Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr"); Assert.assertEquals(testDefaultConfig.isDefaultBool(), true); Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100); diff --git a/pulsar-io/dynamodb/pom.xml b/pulsar-io/dynamodb/pom.xml index 7d6dc2ca6c125..372847e89afaf 100644 --- a/pulsar-io/dynamodb/pom.xml +++ b/pulsar-io/dynamodb/pom.xml @@ -32,6 +32,12 @@ + + ${project.groupId} + pulsar-io-common + ${project.version} + + ${project.groupId} pulsar-io-core diff --git a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java index d67c4e21154ee..2193cf39c17a5 100644 --- a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java +++ b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSource.java @@ -65,7 +65,7 @@ public void close() throws Exception { @Override public void open(Map config, SourceContext sourceContext) throws Exception { - this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config); + this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config, sourceContext); checkArgument(isNotBlank(dynamodbSourceConfig.getAwsDynamodbStreamArn()), "empty dynamo-stream arn"); // Even if the endpoint is set, it seems to require a region to go with it diff --git a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java index b734dd5741155..0547ff8f863e0 100644 --- a/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java +++ b/pulsar-io/dynamodb/src/main/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfig.java @@ -35,6 +35,8 @@ import java.util.Map; import lombok.Data; import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; import software.amazon.awssdk.regions.Region; @@ -77,6 +79,7 @@ public class DynamoDBSourceConfig implements Serializable { @FieldDoc( required = false, defaultValue = "", + sensitive = true, help = "json-parameters to initialize `AwsCredentialsProviderPlugin`") private String awsCredentialPluginParam = ""; @@ -170,9 +173,8 @@ public static DynamoDBSourceConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), DynamoDBSourceConfig.class); } - public static DynamoDBSourceConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), DynamoDBSourceConfig.class); + public static DynamoDBSourceConfig load(Map map, SourceContext sourceContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, DynamoDBSourceConfig.class, sourceContext); } protected Region regionAsV2Region() { diff --git a/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java index f84cb785896e6..bdccaa2e5846e 100644 --- a/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java +++ b/pulsar-io/dynamodb/src/test/java/org/apache/pulsar/io/dynamodb/DynamoDBSourceConfigTests.java @@ -31,6 +31,8 @@ import java.util.Map; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; +import org.apache.pulsar.io.core.SourceContext; +import org.mockito.Mockito; import org.testng.annotations.Test; @@ -90,7 +92,8 @@ public final void loadFromMapTest() throws IOException { map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON); map.put("startAtTime", DAY); - DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map, sourceContext); assertNotNull(config); assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); @@ -111,7 +114,46 @@ public final void loadFromMapTest() throws IOException { ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC); assertEquals(actual, expected); } - + + @Test + public final void loadFromMapCredentialFromSecretTest() throws IOException { + Map map = new HashMap (); + map.put("awsEndpoint", "https://some.endpoint.aws"); + map.put("awsRegion", "us-east-1"); + map.put("awsDynamodbStreamArn", "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291"); + map.put("checkpointInterval", "30000"); + map.put("backoffTime", "4000"); + map.put("numRetries", "3"); + map.put("receiveQueueSize", 2000); + map.put("applicationName", "My test application"); + map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON); + map.put("startAtTime", DAY); + + SourceContext sourceContext = Mockito.mock(SourceContext.class); + Mockito.when(sourceContext.getSecret("awsCredentialPluginParam")) + .thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); + DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map, sourceContext); + + assertNotNull(config); + assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws"); + assertEquals(config.getAwsRegion(), "us-east-1"); + assertEquals(config.getAwsDynamodbStreamArn(), "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291"); + assertEquals(config.getAwsCredentialPluginParam(), + "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"); + assertEquals(config.getApplicationName(), "My test application"); + assertEquals(config.getCheckpointInterval(), 30000); + assertEquals(config.getBackoffTime(), 4000); + assertEquals(config.getNumRetries(), 3); + assertEquals(config.getReceiveQueueSize(), 2000); + assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + + Calendar cal = Calendar.getInstance(); + cal.setTime(config.getStartAtTime()); + ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC); + ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC); + assertEquals(actual, expected); + } + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "empty aws-credential param") public final void missingCredentialsTest() throws Exception { @@ -121,7 +163,8 @@ public final void missingCredentialsTest() throws Exception { map.put("awsDynamodbStreamArn", "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291"); DynamoDBSource source = new DynamoDBSource(); - source.open(map, null); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + source.open(map, sourceContext); } @Test(expectedExceptions = IllegalArgumentException.class, @@ -136,7 +179,8 @@ public final void missingStartTimeTest() throws Exception { map.put("initialPositionInStream", InitialPositionInStream.AT_TIMESTAMP); DynamoDBSource source = new DynamoDBSource(); - source.open(map, null); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + source.open(map, sourceContext); } private File getFile(String name) { diff --git a/pulsar-io/influxdb/pom.xml b/pulsar-io/influxdb/pom.xml index 21306c4f90d92..04325c478a3ce 100644 --- a/pulsar-io/influxdb/pom.xml +++ b/pulsar-io/influxdb/pom.xml @@ -32,6 +32,11 @@ Pulsar IO :: InfluxDB + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-io-core diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java index 5b51461fc7b8e..0d431f84c52f2 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSink.java @@ -46,12 +46,12 @@ public class InfluxDBGenericRecordSink implements Sink { @Override public void open(Map map, SinkContext sinkContext) throws Exception { try { - val configV2 = InfluxDBSinkConfig.load(map); + val configV2 = InfluxDBSinkConfig.load(map, sinkContext); configV2.validate(); sink = new InfluxDBSink(); } catch (Exception e) { try { - val configV1 = org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map); + val configV1 = org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map, sinkContext); configV1.validate(); sink = new org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink(); } catch (Exception e1) { diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java index 06856bad80edc..217c5304b24f7 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBAbstractSink.java @@ -43,7 +43,7 @@ public abstract class InfluxDBAbstractSink extends BatchSink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config); + InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext); influxDBSinkConfig.validate(); super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize()); diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java index 9b7d8e1ce905d..4ae2cf1e4a3a1 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfig.java @@ -27,6 +27,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -94,7 +96,7 @@ public class InfluxDBSinkConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "1000L", + defaultValue = "1000", help = "The InfluxDB operation time in milliseconds") private long batchTimeMs = 1000L; @@ -110,14 +112,11 @@ public static InfluxDBSinkConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class); } - public static InfluxDBSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class); + public static InfluxDBSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, sinkContext); } public void validate() { - Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set."); - Preconditions.checkNotNull(database, "database property not set."); Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer."); Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long."); } diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java index 08f1ab2339992..0aa43570596af 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSink.java @@ -49,7 +49,7 @@ public class InfluxDBSink extends BatchSink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config); + InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext); influxDBSinkConfig.validate(); super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize()); diff --git a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java index 899b00c002155..ea87ee66b90a3 100644 --- a/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java +++ b/pulsar-io/influxdb/src/main/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfig.java @@ -27,6 +27,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -87,7 +89,7 @@ public class InfluxDBSinkConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "1000L", + defaultValue = "1000", help = "The InfluxDB operation time in milliseconds") private long batchTimeMs = 1000; @@ -103,17 +105,11 @@ public static InfluxDBSinkConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class); } - public static InfluxDBSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class); + public static InfluxDBSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, sinkContext); } public void validate() { - Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set."); - Preconditions.checkNotNull(token, "token property not set."); - Preconditions.checkNotNull(organization, "organization property not set."); - Preconditions.checkNotNull(bucket, "bucket property not set."); - Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer."); Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long."); } diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java index 4493dcfb24854..10b1bfb624f49 100644 --- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java +++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.io.influxdb.v1; +import org.apache.pulsar.io.core.SinkContext; import org.influxdb.InfluxDB; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.io.File; @@ -60,8 +62,11 @@ public final void loadFromMapTest() throws IOException { map.put("gzipEnable", "false"); map.put("batchTimeMs", "1000"); map.put("batchSize", "100"); + map.put("username", "admin"); + map.put("password", "admin"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); assertNotNull(config); assertEquals("http://localhost:8086", config.getInfluxdbUrl()); assertEquals("test_db", config.getDatabase()); @@ -71,6 +76,39 @@ public final void loadFromMapTest() throws IOException { assertEquals(Boolean.parseBoolean("false"), config.isGzipEnable()); assertEquals(Long.parseLong("1000"), config.getBatchTimeMs()); assertEquals(Integer.parseInt("100"), config.getBatchSize()); + assertEquals("admin", config.getUsername()); + assertEquals("admin", config.getPassword()); + } + + @Test + public final void loadFromMapCredentialFromSecretTest() throws IOException { + Map map = new HashMap<>(); + map.put("influxdbUrl", "http://localhost:8086"); + map.put("database", "test_db"); + map.put("consistencyLevel", "ONE"); + map.put("logLevel", "NONE"); + map.put("retentionPolicy", "autogen"); + map.put("gzipEnable", "false"); + map.put("batchTimeMs", "1000"); + map.put("batchSize", "100"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("username")) + .thenReturn("admin"); + Mockito.when(sinkContext.getSecret("password")) + .thenReturn("admin"); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); + assertNotNull(config); + assertEquals("http://localhost:8086", config.getInfluxdbUrl()); + assertEquals("test_db", config.getDatabase()); + assertEquals("ONE", config.getConsistencyLevel()); + assertEquals("NONE", config.getLogLevel()); + assertEquals("autogen", config.getRetentionPolicy()); + assertEquals(Boolean.parseBoolean("false"), config.isGzipEnable()); + assertEquals(Long.parseLong("1000"), config.getBatchTimeMs()); + assertEquals(Integer.parseInt("100"), config.getBatchSize()); + assertEquals("admin", config.getUsername()); + assertEquals("admin", config.getPassword()); } @Test @@ -85,12 +123,13 @@ public final void validValidateTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("batchSize", "100"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); config.validate(); } - @Test(expectedExceptions = NullPointerException.class, - expectedExceptionsMessageRegExp = "influxdbUrl property not set.") + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "influxdbUrl cannot be null") public final void missingInfluxdbUrlValidateTest() throws IOException { Map map = new HashMap<>(); map.put("database", "test_db"); @@ -101,7 +140,8 @@ public final void missingInfluxdbUrlValidateTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("batchSize", "100"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); config.validate(); } @@ -118,7 +158,8 @@ public final void invalidBatchSizeTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("batchSize", "-100"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); config.validate(); } @@ -135,7 +176,8 @@ public final void invalidConsistencyLevelTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("batchSize", "100"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); config.validate(); InfluxDB.ConsistencyLevel.valueOf(config.getConsistencyLevel().toUpperCase()); diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java index df1f7fd29a637..d6cee1e308d2b 100644 --- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java +++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkConfigTest.java @@ -24,6 +24,8 @@ import java.io.File; import java.util.HashMap; import java.util.Map; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.Mockito; import org.testng.annotations.Test; public class InfluxDBSinkConfigTest { @@ -58,18 +60,34 @@ private Map buildValidConfigMap() { public final void testLoadFromMap() throws Exception { Map map = buildValidConfigMap(); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); assertNotNull(config); config.validate(); verifyValues(config); } - @Test(expectedExceptions = NullPointerException.class, - expectedExceptionsMessageRegExp = "influxdbUrl property not set.") + @Test + public final void testLoadFromMapCredentialFromSecret() throws Exception { + Map map = buildValidConfigMap(); + map.remove("token"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("token")) + .thenReturn("xxxx"); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); + assertNotNull(config); + config.validate(); + verifyValues(config); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "influxdbUrl cannot be null") public void testRequiredConfigMissing() throws Exception { Map map = buildValidConfigMap(); map.remove("influxdbUrl"); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); config.validate(); } @@ -78,7 +96,8 @@ public void testRequiredConfigMissing() throws Exception { public void testBatchConfig() throws Exception { Map map = buildValidConfigMap(); map.put("batchSize", -1); - InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext); config.validate(); } diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml index e3014fa04cd2d..109be01c3670a 100644 --- a/pulsar-io/jdbc/core/pom.xml +++ b/pulsar-io/jdbc/core/pom.xml @@ -32,6 +32,12 @@ Pulsar IO :: Jdbc :: Core + + ${project.groupId} + pulsar-io-common + ${project.version} + + ${project.groupId} pulsar-io-core diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 4586fcebcf167..ca33b3cfdaba9 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -76,7 +76,7 @@ public abstract class JdbcAbstractSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - jdbcSinkConfig = JdbcSinkConfig.load(config); + jdbcSinkConfig = JdbcSinkConfig.load(config, sinkContext); jdbcSinkConfig.validate(); jdbcUrl = jdbcSinkConfig.getJdbcUrl(); diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java index f798d94f7c35e..854d68381312c 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -26,6 +26,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -145,9 +147,8 @@ public static JdbcSinkConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class); } - public static JdbcSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), JdbcSinkConfig.class); + public static JdbcSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, JdbcSinkConfig.class, sinkContext); } public void validate() { diff --git a/pulsar-io/kafka/pom.xml b/pulsar-io/kafka/pom.xml index 6c6904da34647..3084dd076f732 100644 --- a/pulsar-io/kafka/pom.xml +++ b/pulsar-io/kafka/pom.xml @@ -46,6 +46,11 @@ + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java index 5ceea4dec8dca..2bedba928b756 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Map; -import java.util.Objects; import java.util.Properties; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -79,10 +78,7 @@ protected Properties beforeCreateProducer(Properties props) { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - kafkaSinkConfig = KafkaSinkConfig.load(config); - Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not set"); - Objects.requireNonNull(kafkaSinkConfig.getBootstrapServers(), "Kafka bootstrapServers is not set"); - Objects.requireNonNull(kafkaSinkConfig.getAcks(), "Kafka acks mode is not set"); + kafkaSinkConfig = KafkaSinkConfig.load(config, sinkContext); if (kafkaSinkConfig.getBatchSize() <= 0) { throw new IllegalArgumentException("Invalid Kafka Producer batchSize : " + kafkaSinkConfig.getBatchSize()); diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index f8539518851aa..782f9d5d57dbb 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -66,7 +66,7 @@ public abstract class KafkaAbstractSource extends PushSource { @Override public void open(Map config, SourceContext sourceContext) throws Exception { - kafkaSourceConfig = KafkaSourceConfig.load(config); + kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext); Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set"); Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set"); Objects.requireNonNull(kafkaSourceConfig.getGroupId(), "Kafka consumer group id is not set"); diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java index 755b2c89c8f20..8f772a57d0c4d 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSinkConfig.java @@ -26,6 +26,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -84,12 +86,12 @@ public class KafkaSinkConfig implements Serializable { + " before considering a request complete. This controls the durability of records that are sent.") private String acks; @FieldDoc( - defaultValue = "16384L", + defaultValue = "16384", help = "The batch size that Kafka producer will attempt to batch records together" + " before sending them to brokers.") private long batchSize = 16384L; @FieldDoc( - defaultValue = "1048576L", + defaultValue = "1048576", help = "The maximum size of a Kafka request in bytes.") private long maxRequestSize = 1048576L; @@ -122,8 +124,7 @@ public static KafkaSinkConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class); } - public static KafkaSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), KafkaSinkConfig.class); + public static KafkaSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, KafkaSinkConfig.class, sinkContext); } } \ No newline at end of file diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java index 5de60d2a028c8..3edfa27190339 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java @@ -27,6 +27,7 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -151,8 +152,14 @@ public static KafkaSourceConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class); } - public static KafkaSourceConfig load(Map map) throws IOException { + public static KafkaSourceConfig load(Map map, SourceContext sourceContext) throws IOException { ObjectMapper mapper = new ObjectMapper(); + // since the KafkaSourceConfig requires the ACCEPT_EMPTY_STRING_AS_NULL_OBJECT feature + // We manually set the sensitive fields here instead of calling `IOConfigUtils.loadWithSecrets` + String sslTruststorePassword = sourceContext.getSecret("sslTruststorePassword"); + if (sslTruststorePassword != null) { + map.put("sslTruststorePassword", sslTruststorePassword); + } mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT); return mapper.readValue(mapper.writeValueAsString(map), KafkaSourceConfig.class); } diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java index d59cdb1d9b63d..3e31eefe0e409 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java @@ -188,12 +188,12 @@ public PulsarClient getPulsarClient() { sink.close(); } }; - expectThrows(NullPointerException.class, "Kafka topic is not set", openAndClose); - config.put("topic", "topic_2"); - expectThrows(NullPointerException.class, "Kafka bootstrapServers is not set", openAndClose); + expectThrows(IllegalArgumentException.class, "bootstrapServers cannot be null", openAndClose); config.put("bootstrapServers", "localhost:6667"); - expectThrows(NullPointerException.class, "Kafka acks mode is not set", openAndClose); + expectThrows(IllegalArgumentException.class, "acks cannot be null", openAndClose); config.put("acks", "1"); + expectThrows(IllegalArgumentException.class, "topic cannot be null", openAndClose); + config.put("topic", "topic_2"); config.put("batchSize", "-1"); expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer batchSize : -1", openAndClose); config.put("batchSize", "16384"); diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 9e0fef87a2592..7675de0636e8a 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -108,19 +108,39 @@ public void testInvalidConfigWillThrownException() throws Exception { public void loadConsumerConfigPropertiesFromMapTest() throws Exception { Map config = new HashMap<>(); config.put("consumerConfigProperties", ""); - KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config); + config.put("bootstrapServers", "localhost:8080"); + config.put("groupId", "test-group"); + config.put("topic", "test-topic"); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext); assertNotNull(kafkaSourceConfig); assertNull(kafkaSourceConfig.getConsumerConfigProperties()); config.put("consumerConfigProperties", null); - kafkaSourceConfig = KafkaSourceConfig.load(config); + kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext); assertNull(kafkaSourceConfig.getConsumerConfigProperties()); config.put("consumerConfigProperties", ImmutableMap.of("foo", "bar")); - kafkaSourceConfig = KafkaSourceConfig.load(config); + kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext); assertEquals(kafkaSourceConfig.getConsumerConfigProperties(), ImmutableMap.of("foo", "bar")); } + @Test + public void loadSensitiveFieldsFromSecretTest() throws Exception { + Map config = new HashMap<>(); + config.put("consumerConfigProperties", ""); + config.put("bootstrapServers", "localhost:8080"); + config.put("groupId", "test-group"); + config.put("topic", "test-topic"); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + Mockito.when(sourceContext.getSecret("sslTruststorePassword")) + .thenReturn("xxxx"); + KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext); + assertNotNull(kafkaSourceConfig); + assertNull(kafkaSourceConfig.getConsumerConfigProperties()); + assertEquals("xxxx", kafkaSourceConfig.getSslTruststorePassword()); + } + @Test public final void loadFromYamlFileTest() throws IOException { File yamlFile = getFile("kafkaSourceConfig.yaml"); diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml index 1a65e61f035ce..bf4f3e0be572c 100644 --- a/pulsar-io/mongo/pom.xml +++ b/pulsar-io/mongo/pom.xml @@ -37,6 +37,11 @@ + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.parent.groupId} pulsar-io-core diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java index 35c327ed82b99..74f077da62036 100644 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoAbstractConfig.java @@ -24,7 +24,6 @@ import java.io.Serializable; import lombok.Data; import lombok.experimental.Accessors; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -42,6 +41,7 @@ public abstract class MongoAbstractConfig implements Serializable { @FieldDoc( required = true, + sensitive = true, // it may contain password defaultValue = "", help = "The URI of MongoDB that the connector connects to " + "(see: https://docs.mongodb.com/manual/reference/connection-string/)" @@ -95,7 +95,6 @@ public MongoAbstractConfig( } public void validate() { - checkArgument(!StringUtils.isEmpty(getMongoUri()), "Required MongoDB URI is not set."); checkArgument(getBatchSize() > 0, "batchSize must be a positive integer."); checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long."); } diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java index 2206d232eaf97..61d5aeb697e01 100644 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java @@ -86,7 +86,7 @@ public MongoSink(Supplier clientProvider) { public void open(Map config, SinkContext sinkContext) throws Exception { log.info("Open MongoDB Sink"); - mongoSinkConfig = MongoSinkConfig.load(config); + mongoSinkConfig = MongoSinkConfig.load(config, sinkContext); mongoSinkConfig.validate(); if (clientProvider != null) { diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java index 285f3c97bef1a..9431fe4910800 100644 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSinkConfig.java @@ -30,6 +30,8 @@ import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; /** * Configuration class for the MongoDB Sink Connectors. @@ -59,11 +61,8 @@ public static MongoSinkConfig load(String yamlFile) throws IOException { return cfg; } - public static MongoSinkConfig load(Map map) throws IOException { - final ObjectMapper mapper = new ObjectMapper(); - final MongoSinkConfig cfg = mapper.readValue(mapper.writeValueAsString(map), MongoSinkConfig.class); - - return cfg; + public static MongoSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, MongoSinkConfig.class, sinkContext); } @Override diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java index 6ee95fc4cd4b5..68a31b461a51c 100644 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java @@ -79,7 +79,7 @@ public MongoSource(Supplier clientProvider) { public void open(Map config, SourceContext sourceContext) throws Exception { log.info("Open MongoDB Source"); - mongoSourceConfig = MongoSourceConfig.load(config); + mongoSourceConfig = MongoSourceConfig.load(config, sourceContext); mongoSourceConfig.validate(); if (clientProvider != null) { diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java index cf887a93bf3c3..1c0c7f4b3657a 100644 --- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java +++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSourceConfig.java @@ -29,6 +29,8 @@ import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -75,12 +77,8 @@ public static MongoSourceConfig load(String yamlFile) throws IOException { return cfg; } - public static MongoSourceConfig load(Map map) throws IOException { - final ObjectMapper mapper = new ObjectMapper(); - final MongoSourceConfig cfg = - mapper.readValue(mapper.writeValueAsString(map), MongoSourceConfig.class); - - return cfg; + public static MongoSourceConfig load(Map map, SourceContext sourceContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, MongoSourceConfig.class, sourceContext); } /** diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java index b1166eac5722a..c86e45feb2348 100644 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.io.mongodb; import java.util.Map; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.io.File; @@ -34,7 +36,27 @@ public void testLoadMapConfig() throws IOException { commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE); commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME); - final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap, sinkContext); + + assertEquals(cfg.getMongoUri(), TestHelper.URI); + assertEquals(cfg.getDatabase(), TestHelper.DB); + assertEquals(cfg.getCollection(), TestHelper.COLL); + assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); + assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME); + } + + @Test + public void testLoadMapConfigUrlFromSecret() throws IOException { + final Map commonConfigMap = TestHelper.createCommonConfigMap(); + commonConfigMap.put("batchSize", TestHelper.BATCH_SIZE); + commonConfigMap.put("batchTimeMs", TestHelper.BATCH_TIME); + commonConfigMap.remove("mongoUri"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("mongoUri")) + .thenReturn(TestHelper.URI); + final MongoSinkConfig cfg = MongoSinkConfig.load(commonConfigMap, sinkContext); assertEquals(cfg.getMongoUri(), TestHelper.URI); assertEquals(cfg.getDatabase(), TestHelper.DB); @@ -44,12 +66,13 @@ public void testLoadMapConfig() throws IOException { } @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Required MongoDB URI is not set.") + expectedExceptionsMessageRegExp = "mongoUri cannot be null") public void testBadMongoUri() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.removeMongoUri(configMap); - final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, sinkContext); cfg.validate(); } @@ -60,7 +83,8 @@ public void testBadDatabase() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.removeDatabase(configMap); - final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, sinkContext); cfg.validate(); } @@ -71,7 +95,8 @@ public void testBadCollection() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.removeCollection(configMap); - final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, sinkContext); cfg.validate(); } @@ -82,7 +107,8 @@ public void testBadBatchSize() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.putBatchSize(configMap, 0); - final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, sinkContext); cfg.validate(); } @@ -93,7 +119,8 @@ public void testBadBatchTime() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.putBatchTime(configMap, 0L); - final MongoSinkConfig cfg = MongoSinkConfig.load(configMap); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + final MongoSinkConfig cfg = MongoSinkConfig.load(configMap, sinkContext); cfg.validate(); } diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java index e7fd01549b033..528cd0237ef16 100644 --- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java +++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java @@ -23,6 +23,8 @@ import java.io.File; import java.io.IOException; import java.util.Map; +import org.apache.pulsar.io.core.SourceContext; +import org.mockito.Mockito; import org.testng.annotations.Test; public class MongoSourceConfigTest { @@ -32,7 +34,27 @@ public void testLoadMapConfig() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE); - final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, sourceContext); + + assertEquals(cfg.getMongoUri(), TestHelper.URI); + assertEquals(cfg.getDatabase(), TestHelper.DB); + assertEquals(cfg.getCollection(), TestHelper.COLL); + assertEquals(cfg.getSyncType(), TestHelper.SYNC_TYPE); + assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE); + assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME); + } + + @Test + public void testLoadMapConfigUriFromSecret() throws IOException { + final Map configMap = TestHelper.createCommonConfigMap(); + TestHelper.putSyncType(configMap, TestHelper.SYNC_TYPE); + configMap.remove("mongoUri"); + + SourceContext sourceContext = Mockito.mock(SourceContext.class); + Mockito.when(sourceContext.getSecret("mongoUri")) + .thenReturn(TestHelper.URI); + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, sourceContext); assertEquals(cfg.getMongoUri(), TestHelper.URI); assertEquals(cfg.getDatabase(), TestHelper.DB); @@ -43,12 +65,13 @@ public void testLoadMapConfig() throws IOException { } @Test(expectedExceptions = IllegalArgumentException.class, - expectedExceptionsMessageRegExp = "Required MongoDB URI is not set.") + expectedExceptionsMessageRegExp = "mongoUri cannot be null") public void testBadMongoUri() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.removeMongoUri(configMap); - final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, sourceContext); cfg.validate(); } @@ -61,7 +84,8 @@ public void testBadSyncType() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.putSyncType(configMap, "wrong_sync_type_str"); - final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, sourceContext); cfg.validate(); } @@ -72,7 +96,8 @@ public void testBadBatchSize() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.putBatchSize(configMap, 0); - final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, sourceContext); cfg.validate(); } @@ -83,7 +108,8 @@ public void testBadBatchTime() throws IOException { final Map configMap = TestHelper.createCommonConfigMap(); TestHelper.putBatchTime(configMap, 0L); - final MongoSourceConfig cfg = MongoSourceConfig.load(configMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + final MongoSourceConfig cfg = MongoSourceConfig.load(configMap, sourceContext); cfg.validate(); } diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml index 6c400a7521adb..d10204dfedce3 100644 --- a/pulsar-io/rabbitmq/pom.xml +++ b/pulsar-io/rabbitmq/pom.xml @@ -32,6 +32,11 @@ + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-io-core diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java index f317a35734e69..89192c42346e8 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java @@ -53,7 +53,7 @@ public class RabbitMQSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - rabbitMQSinkConfig = RabbitMQSinkConfig.load(config); + rabbitMQSinkConfig = RabbitMQSinkConfig.load(config, sinkContext); rabbitMQSinkConfig.validate(); ConnectionFactory connectionFactory = rabbitMQSinkConfig.createConnectionFactory(); diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java index c1f8d6b8ad3d3..39f97e5e460c8 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java @@ -20,7 +20,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -28,6 +27,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -60,14 +61,12 @@ public static RabbitMQSinkConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class); } - public static RabbitMQSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), RabbitMQSinkConfig.class); + public static RabbitMQSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, RabbitMQSinkConfig.class, sinkContext); } @Override public void validate() { super.validate(); - Preconditions.checkNotNull(exchangeName, "exchangeName property not set."); } } diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java index d15108c4d8288..b0b7ef31b08de 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java @@ -54,7 +54,7 @@ public class RabbitMQSource extends PushSource { @Override public void open(Map config, SourceContext sourceContext) throws Exception { - rabbitMQSourceConfig = RabbitMQSourceConfig.load(config); + rabbitMQSourceConfig = RabbitMQSourceConfig.load(config, sourceContext); rabbitMQSourceConfig.validate(); ConnectionFactory connectionFactory = rabbitMQSourceConfig.createConnectionFactory(); diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java index f24018e70da13..01e23a7146080 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java @@ -28,6 +28,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.FieldDoc; @Data @@ -66,9 +68,8 @@ public static RabbitMQSourceConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), RabbitMQSourceConfig.class); } - public static RabbitMQSourceConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), RabbitMQSourceConfig.class); + public static RabbitMQSourceConfig load(Map map, SourceContext sourceContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, RabbitMQSourceConfig.class, sourceContext); } @Override diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java index 3d4fd6f46e16f..8706cb567524f 100644 --- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java +++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.io.rabbitmq.sink; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.io.File; @@ -71,7 +73,45 @@ public final void loadFromMapTest() throws IOException { map.put("exchangeName", "test-exchange"); map.put("exchangeType", "test-exchange-type"); - RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext); + assertNotNull(config); + assertEquals(config.getHost(), "localhost"); + assertEquals(config.getPort(), Integer.parseInt("5673")); + assertEquals(config.getVirtualHost(), "/"); + assertEquals(config.getUsername(), "guest"); + assertEquals(config.getPassword(), "guest"); + assertEquals(config.getConnectionName(), "test-connection"); + assertEquals(config.getRequestedChannelMax(), Integer.parseInt("0")); + assertEquals(config.getRequestedFrameMax(), Integer.parseInt("0")); + assertEquals(config.getConnectionTimeout(), Integer.parseInt("60000")); + assertEquals(config.getHandshakeTimeout(), Integer.parseInt("10000")); + assertEquals(config.getRequestedHeartbeat(), Integer.parseInt("60")); + assertEquals(config.getExchangeName(), "test-exchange"); + assertEquals(config.getExchangeType(), "test-exchange-type"); + } + + @Test + public final void loadFromMapCredentialsFromSecretTest() throws IOException { + Map map = new HashMap<>(); + map.put("host", "localhost"); + map.put("port", "5673"); + map.put("virtualHost", "/"); + map.put("connectionName", "test-connection"); + map.put("requestedChannelMax", "0"); + map.put("requestedFrameMax", "0"); + map.put("connectionTimeout", "60000"); + map.put("handshakeTimeout", "10000"); + map.put("requestedHeartbeat", "60"); + map.put("exchangeName", "test-exchange"); + map.put("exchangeType", "test-exchange-type"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("username")) + .thenReturn("guest"); + Mockito.when(sinkContext.getSecret("password")) + .thenReturn("guest"); + RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext); assertNotNull(config); assertEquals(config.getHost(), "localhost"); assertEquals(config.getPort(), Integer.parseInt("5673")); @@ -105,12 +145,13 @@ public final void validValidateTest() throws IOException { map.put("exchangeName", "test-exchange"); map.put("exchangeType", "test-exchange-type"); - RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext); config.validate(); } - @Test(expectedExceptions = NullPointerException.class, - expectedExceptionsMessageRegExp = "exchangeName property not set.") + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "exchangeName cannot be null") public final void missingExchangeValidateTest() throws IOException { Map map = new HashMap<>(); map.put("host", "localhost"); @@ -126,7 +167,8 @@ public final void missingExchangeValidateTest() throws IOException { map.put("requestedHeartbeat", "60"); map.put("exchangeType", "test-exchange-type"); - RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext); config.validate(); } diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java index c33e0070c6fd0..43a90062fa453 100644 --- a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java +++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.io.rabbitmq.source; +import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.io.File; @@ -76,7 +78,50 @@ public final void loadFromMapTest() throws IOException { map.put("prefetchGlobal", "false"); map.put("passive", "true"); - RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, sourceContext); + assertNotNull(config); + assertEquals("localhost", config.getHost()); + assertEquals(Integer.parseInt("5672"), config.getPort()); + assertEquals("/", config.getVirtualHost()); + assertEquals("guest", config.getUsername()); + assertEquals("guest", config.getPassword()); + assertEquals("test-queue", config.getQueueName()); + assertEquals("test-connection", config.getConnectionName()); + assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax()); + assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax()); + assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout()); + assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout()); + assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat()); + assertEquals(Integer.parseInt("0"), config.getPrefetchCount()); + assertEquals(false, config.isPrefetchGlobal()); + assertEquals(false, config.isPrefetchGlobal()); + assertEquals(true, config.isPassive()); + } + + @Test + public final void loadFromMapCredentialsFromSecretTest() throws IOException { + Map map = new HashMap<>(); + map.put("host", "localhost"); + map.put("port", "5672"); + map.put("virtualHost", "/"); + map.put("queueName", "test-queue"); + map.put("connectionName", "test-connection"); + map.put("requestedChannelMax", "0"); + map.put("requestedFrameMax", "0"); + map.put("connectionTimeout", "60000"); + map.put("handshakeTimeout", "10000"); + map.put("requestedHeartbeat", "60"); + map.put("prefetchCount", "0"); + map.put("prefetchGlobal", "false"); + map.put("passive", "true"); + + SourceContext sourceContext = Mockito.mock(SourceContext.class); + Mockito.when(sourceContext.getSecret("username")) + .thenReturn("guest"); + Mockito.when(sourceContext.getSecret("password")) + .thenReturn("guest"); + RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, sourceContext); assertNotNull(config); assertEquals("localhost", config.getHost()); assertEquals(Integer.parseInt("5672"), config.getPort()); @@ -115,12 +160,13 @@ public final void validValidateTest() throws IOException { map.put("prefetchGlobal", "false"); map.put("passive", "false"); - RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, sourceContext); config.validate(); } - @Test(expectedExceptions = NullPointerException.class, - expectedExceptionsMessageRegExp = "host property not set.") + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "host cannot be null") public final void missingHostValidateTest() throws IOException { Map map = new HashMap<>(); map.put("port", "5672"); @@ -138,7 +184,8 @@ public final void missingHostValidateTest() throws IOException { map.put("prefetchGlobal", "false"); map.put("passive", "false"); - RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, sourceContext); config.validate(); } @@ -162,7 +209,8 @@ public final void invalidPrefetchCountTest() throws IOException { map.put("prefetchGlobal", "false"); map.put("passive", "false"); - RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map, sourceContext); config.validate(); } diff --git a/pulsar-io/redis/pom.xml b/pulsar-io/redis/pom.xml index 5926a70f6a2ec..e828b34d74066 100644 --- a/pulsar-io/redis/pom.xml +++ b/pulsar-io/redis/pom.xml @@ -32,6 +32,11 @@ Pulsar IO :: Redis + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.groupId} pulsar-io-core diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java index 978e7de31a51c..89ec684dded72 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/RedisAbstractConfig.java @@ -88,13 +88,11 @@ public class RedisAbstractConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "10000L", + defaultValue = "10000", help = "The amount of time in milliseconds to wait before timing out when connecting") private long connectTimeout = 10000L; public void validate() { - Preconditions.checkNotNull(redisHosts, "redisHosts property not set."); - Preconditions.checkNotNull(redisDatabase, "redisDatabase property not set."); Preconditions.checkNotNull(clientMode, "clientMode property not set."); } @@ -105,7 +103,6 @@ public enum ClientMode { public List getHostAndPorts() { List hostAndPorts = Lists.newArrayList(); - Preconditions.checkNotNull(redisHosts, "redisHosts property not set."); String[] hosts = StringUtils.split(redisHosts, ","); for (String host : hosts) { HostAndPort hostAndPort = HostAndPort.fromString(host); diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java index bff0a5c2da592..ebd6e9dbab272 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSink.java @@ -68,7 +68,7 @@ public class RedisSink implements Sink { public void open(Map config, SinkContext sinkContext) throws Exception { log.info("Open Redis Sink"); - redisSinkConfig = RedisSinkConfig.load(config); + redisSinkConfig = RedisSinkConfig.load(config, sinkContext); redisSinkConfig.validate(); redisSession = RedisSession.create(redisSinkConfig); diff --git a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java index a9db66812a475..f7a70cb65a826 100644 --- a/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java +++ b/pulsar-io/redis/src/main/java/org/apache/pulsar/io/redis/sink/RedisSinkConfig.java @@ -28,6 +28,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; import org.apache.pulsar.io.redis.RedisAbstractConfig; @@ -40,13 +42,13 @@ public class RedisSinkConfig extends RedisAbstractConfig implements Serializable @FieldDoc( required = false, - defaultValue = "10000L", + defaultValue = "10000", help = "The amount of time in milliseconds before an operation is marked as timed out") private long operationTimeout = 10000L; @FieldDoc( required = false, - defaultValue = "1000L", + defaultValue = "1000", help = "The Redis operation time in milliseconds") private long batchTimeMs = 1000L; @@ -62,9 +64,8 @@ public static RedisSinkConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), RedisSinkConfig.class); } - public static RedisSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), RedisSinkConfig.class); + public static RedisSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, RedisSinkConfig.class, sinkContext); } @Override diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java index 1316d0994a1cd..39fc6e540c242 100644 --- a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java +++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.io.redis.sink; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.redis.RedisAbstractConfig; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.io.File; @@ -62,7 +64,34 @@ public final void loadFromMapTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("connectTimeout", "3000"); - RedisSinkConfig config = RedisSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext); + assertNotNull(config); + assertEquals(config.getRedisHosts(), "localhost:6379"); + assertEquals(config.getRedisPassword(), "fake@123"); + assertEquals(config.getRedisDatabase(), Integer.parseInt("1")); + assertEquals(config.getClientMode(), "Standalone"); + assertEquals(config.getOperationTimeout(), Long.parseLong("2000")); + assertEquals(config.getBatchSize(), Integer.parseInt("100")); + assertEquals(config.getBatchTimeMs(), Long.parseLong("1000")); + assertEquals(config.getConnectTimeout(), Long.parseLong("3000")); + } + + @Test + public final void loadFromMapCredentialsFromSecretTest() throws IOException { + Map map = new HashMap(); + map.put("redisHosts", "localhost:6379"); + map.put("redisDatabase", "1"); + map.put("clientMode", "Standalone"); + map.put("operationTimeout", "2000"); + map.put("batchSize", "100"); + map.put("batchTimeMs", "1000"); + map.put("connectTimeout", "3000"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("redisPassword")) + .thenReturn("fake@123"); + RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext); assertNotNull(config); assertEquals(config.getRedisHosts(), "localhost:6379"); assertEquals(config.getRedisPassword(), "fake@123"); @@ -86,12 +115,13 @@ public final void validValidateTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("connectTimeout", "3000"); - RedisSinkConfig config = RedisSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext); config.validate(); } - @Test(expectedExceptions = NullPointerException.class, - expectedExceptionsMessageRegExp = "redisHosts property not set.") + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "redisHosts cannot be null") public final void missingValidValidateTableNameTest() throws IOException { Map map = new HashMap(); map.put("redisPassword", "fake@123"); @@ -102,7 +132,8 @@ public final void missingValidValidateTableNameTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("connectTimeout", "3000"); - RedisSinkConfig config = RedisSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext); config.validate(); } @@ -119,7 +150,8 @@ public final void invalidBatchTimeMsTest() throws IOException { map.put("batchTimeMs", "-100"); map.put("connectTimeout", "3000"); - RedisSinkConfig config = RedisSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext); config.validate(); } @@ -136,7 +168,8 @@ public final void invalidClientModeTest() throws IOException { map.put("batchTimeMs", "1000"); map.put("connectTimeout", "3000"); - RedisSinkConfig config = RedisSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext); config.validate(); RedisAbstractConfig.ClientMode.valueOf(config.getClientMode().toUpperCase()); diff --git a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java index 214151345b42c..2b407fafa5e04 100644 --- a/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java +++ b/pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkTest.java @@ -21,7 +21,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.instance.SinkRecord; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.redis.EmbeddedRedisUtils; +import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -66,7 +68,8 @@ public void TestOpenAndWriteSink() throws Exception { Record record = build("fakeTopic", "fakeKey", "fakeValue"); // open should success - sink.open(configs, null); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + sink.open(configs, sinkContext); // write should success. sink.write(record); diff --git a/pulsar-io/solr/pom.xml b/pulsar-io/solr/pom.xml index ca3735121898f..2c3e213d8542a 100644 --- a/pulsar-io/solr/pom.xml +++ b/pulsar-io/solr/pom.xml @@ -36,6 +36,11 @@ Pulsar IO :: Solr + + ${project.groupId} + pulsar-io-common + ${project.version} + ${project.parent.groupId} pulsar-io-core diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java index de9cdb4a9d82a..202c782c14c49 100644 --- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java +++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrAbstractSink.java @@ -48,7 +48,7 @@ public abstract class SolrAbstractSink implements Sink { @Override public void open(Map config, SinkContext sinkContext) throws Exception { - solrSinkConfig = SolrSinkConfig.load(config); + solrSinkConfig = SolrSinkConfig.load(config, sinkContext); solrSinkConfig.validate(); enableBasicAuth = !Strings.isNullOrEmpty(solrSinkConfig.getUsername()); diff --git a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java index 02733d230bdcb..daa93a366b110 100644 --- a/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java +++ b/pulsar-io/solr/src/main/java/org/apache/pulsar/io/solr/SolrSinkConfig.java @@ -27,6 +27,8 @@ import java.util.Map; import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -84,9 +86,8 @@ public static SolrSinkConfig load(String yamlFile) throws IOException { return mapper.readValue(new File(yamlFile), SolrSinkConfig.class); } - public static SolrSinkConfig load(Map map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(mapper.writeValueAsString(map), SolrSinkConfig.class); + public static SolrSinkConfig load(Map map, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(map, SolrSinkConfig.class, sinkContext); } public void validate() { diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java index 42d2121dbfcbd..2c2447a637d35 100644 --- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java +++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrSinkConfigTest.java @@ -19,6 +19,8 @@ package org.apache.pulsar.io.solr; import com.google.common.collect.Lists; +import org.apache.pulsar.io.core.SinkContext; +import org.mockito.Mockito; import org.testng.annotations.Test; import java.io.File; @@ -61,7 +63,31 @@ public final void loadFromMapTest() throws IOException { map.put("username", "fakeuser"); map.put("password", "fake@123"); - SolrSinkConfig config = SolrSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext); + assertNotNull(config); + assertEquals(config.getSolrUrl(), "localhost:2181,localhost:2182/chroot"); + assertEquals(config.getSolrMode(), "SolrCloud"); + assertEquals(config.getSolrCollection(), "techproducts"); + assertEquals(config.getSolrCommitWithinMs(), Integer.parseInt("100")); + assertEquals(config.getUsername(), "fakeuser"); + assertEquals(config.getPassword(), "fake@123"); + } + + @Test + public final void loadFromMapCredentialsFromSecretTest() throws IOException { + Map map = new HashMap<>(); + map.put("solrUrl", "localhost:2181,localhost:2182/chroot"); + map.put("solrMode", "SolrCloud"); + map.put("solrCollection", "techproducts"); + map.put("solrCommitWithinMs", "100"); + + SinkContext sinkContext = Mockito.mock(SinkContext.class); + Mockito.when(sinkContext.getSecret("username")) + .thenReturn("fakeuser"); + Mockito.when(sinkContext.getSecret("password")) + .thenReturn("fake@123"); + SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext); assertNotNull(config); assertEquals(config.getSolrUrl(), "localhost:2181,localhost:2182/chroot"); assertEquals(config.getSolrMode(), "SolrCloud"); @@ -81,12 +107,13 @@ public final void validValidateTest() throws IOException { map.put("username", "fakeuser"); map.put("password", "fake@123"); - SolrSinkConfig config = SolrSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext); config.validate(); } - @Test(expectedExceptions = NullPointerException.class, - expectedExceptionsMessageRegExp = "solrUrl property not set.") + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "solrUrl cannot be null") public final void missingValidValidateSolrModeTest() throws IOException { Map map = new HashMap<>(); map.put("solrMode", "SolrCloud"); @@ -95,7 +122,8 @@ public final void missingValidValidateSolrModeTest() throws IOException { map.put("username", "fakeuser"); map.put("password", "fake@123"); - SolrSinkConfig config = SolrSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext); config.validate(); } @@ -110,7 +138,8 @@ public final void invalidBatchTimeMsTest() throws IOException { map.put("username", "fakeuser"); map.put("password", "fake@123"); - SolrSinkConfig config = SolrSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext); config.validate(); } @@ -125,7 +154,8 @@ public final void invalidClientModeTest() throws IOException { map.put("username", "fakeuser"); map.put("password", "fake@123"); - SolrSinkConfig config = SolrSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext); config.validate(); SolrAbstractSink.SolrMode.valueOf(config.getSolrMode().toUpperCase()); @@ -141,7 +171,8 @@ public final void validZkChrootTest() throws IOException { map.put("username", "fakeuser"); map.put("password", "fake@123"); - SolrSinkConfig config = SolrSinkConfig.load(map); + SinkContext sinkContext = Mockito.mock(SinkContext.class); + SolrSinkConfig config = SolrSinkConfig.load(map, sinkContext); config.validate(); String url = config.getSolrUrl();