Skip to content

Commit

Permalink
[improve][io] Make connectors load sensitive fields from secrets (apa…
Browse files Browse the repository at this point in the history
…che#21675)

(cherry picked from commit c55df07)
  • Loading branch information
jiangpengcheng authored and nikhil-ctds committed Jan 4, 2024
1 parent 555a648 commit 278ac2d
Show file tree
Hide file tree
Showing 52 changed files with 540 additions and 148 deletions.
5 changes: 5 additions & 0 deletions pulsar-io/canal/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class CanalAbstractSource<V> extends PushSource<V> {

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
canalSourceConfig = CanalSourceConfig.load(config);
canalSourceConfig = CanalSourceConfig.load(config, sourceContext);
if (canalSourceConfig.getCluster()) {
connector = CanalConnectors.newClusterConnector(canalSourceConfig.getZkServers(),
canalSourceConfig.getDestination(), canalSourceConfig.getUsername(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;


Expand Down Expand Up @@ -86,8 +88,7 @@ public static CanalSourceConfig load(String yamlFile) throws IOException {
}


public static CanalSourceConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), CanalSourceConfig.class);
public static CanalSourceConfig load(Map<String, Object> map, SourceContext sourceContext) throws IOException {
return IOConfigUtils.loadWithSecrets(map, CanalSourceConfig.class, sourceContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ private static <T> T loadWithSecrets(Map<String, Object> map, Class<T> clazz,
}
}
configs.computeIfAbsent(field.getName(), key -> {
if (fieldDoc.required()) {
throw new IllegalArgumentException(field.getName() + " cannot be null");
}
// Use default value if it is not null before checking required
String value = fieldDoc.defaultValue();
if (value != null && !value.isEmpty()) {
return value;
}
if (fieldDoc.required()) {
throw new IllegalArgumentException(field.getName() + " cannot be null");
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ static class TestDefaultConfig {
)
protected String testRequired;

@FieldDoc(
required = true,
defaultValue = "defaultRequired",
sensitive = true,
help = "testRequired"
)
protected String testDefaultRequired;

@FieldDoc(
required = false,
defaultValue = "defaultStr",
Expand Down Expand Up @@ -299,6 +307,9 @@ public void testDefaultValue() {
configMap.put("testRequired", "test");
TestDefaultConfig testDefaultConfig =
IOConfigUtils.loadWithSecrets(configMap, TestDefaultConfig.class, new TestSinkContext());
// if there is default value for a required field and no value provided when load config,
// it should not throw exception but use the default value.
Assert.assertEquals(testDefaultConfig.getTestDefaultRequired(), "defaultRequired");
Assert.assertEquals(testDefaultConfig.getDefaultStr(), "defaultStr");
Assert.assertEquals(testDefaultConfig.isDefaultBool(), true);
Assert.assertEquals(testDefaultConfig.getDefaultInt(), 100);
Expand Down
6 changes: 6 additions & 0 deletions pulsar-io/dynamodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

<dependencies>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void close() throws Exception {

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config);
this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config, sourceContext);
checkArgument(isNotBlank(dynamodbSourceConfig.getAwsDynamodbStreamArn()),
"empty dynamo-stream arn");
// Even if the endpoint is set, it seems to require a region to go with it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.Map;
import lombok.Data;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import software.amazon.awssdk.regions.Region;

Expand Down Expand Up @@ -77,6 +79,7 @@ public class DynamoDBSourceConfig implements Serializable {
@FieldDoc(
required = false,
defaultValue = "",
sensitive = true,
help = "json-parameters to initialize `AwsCredentialsProviderPlugin`")
private String awsCredentialPluginParam = "";

Expand Down Expand Up @@ -170,9 +173,8 @@ public static DynamoDBSourceConfig load(String yamlFile) throws IOException {
return mapper.readValue(new File(yamlFile), DynamoDBSourceConfig.class);
}

public static DynamoDBSourceConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), DynamoDBSourceConfig.class);
public static DynamoDBSourceConfig load(Map<String, Object> map, SourceContext sourceContext) throws IOException {
return IOConfigUtils.loadWithSecrets(map, DynamoDBSourceConfig.class, sourceContext);
}

protected Region regionAsV2Region() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.pulsar.io.core.SourceContext;
import org.mockito.Mockito;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -90,7 +92,8 @@ public final void loadFromMapTest() throws IOException {
map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON);
map.put("startAtTime", DAY);

DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map, sourceContext);

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
Expand All @@ -111,7 +114,46 @@ public final void loadFromMapTest() throws IOException {
ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC);
assertEquals(actual, expected);
}


@Test
public final void loadFromMapCredentialFromSecretTest() throws IOException {
Map<String, Object> map = new HashMap<String, Object> ();
map.put("awsEndpoint", "https://some.endpoint.aws");
map.put("awsRegion", "us-east-1");
map.put("awsDynamodbStreamArn", "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
map.put("checkpointInterval", "30000");
map.put("backoffTime", "4000");
map.put("numRetries", "3");
map.put("receiveQueueSize", 2000);
map.put("applicationName", "My test application");
map.put("initialPositionInStream", InitialPositionInStream.TRIM_HORIZON);
map.put("startAtTime", DAY);

SourceContext sourceContext = Mockito.mock(SourceContext.class);
Mockito.when(sourceContext.getSecret("awsCredentialPluginParam"))
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
DynamoDBSourceConfig config = DynamoDBSourceConfig.load(map, sourceContext);

assertNotNull(config);
assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws");
assertEquals(config.getAwsRegion(), "us-east-1");
assertEquals(config.getAwsDynamodbStreamArn(), "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");
assertEquals(config.getAwsCredentialPluginParam(),
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
assertEquals(config.getApplicationName(), "My test application");
assertEquals(config.getCheckpointInterval(), 30000);
assertEquals(config.getBackoffTime(), 4000);
assertEquals(config.getNumRetries(), 3);
assertEquals(config.getReceiveQueueSize(), 2000);
assertEquals(config.getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);

Calendar cal = Calendar.getInstance();
cal.setTime(config.getStartAtTime());
ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), ZoneOffset.UTC);
ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), ZoneOffset.UTC);
assertEquals(actual, expected);
}

@Test(expectedExceptions = IllegalArgumentException.class,
expectedExceptionsMessageRegExp = "empty aws-credential param")
public final void missingCredentialsTest() throws Exception {
Expand All @@ -121,7 +163,8 @@ public final void missingCredentialsTest() throws Exception {
map.put("awsDynamodbStreamArn", "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291");

DynamoDBSource source = new DynamoDBSource();
source.open(map, null);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
source.open(map, sourceContext);
}

@Test(expectedExceptions = IllegalArgumentException.class,
Expand All @@ -136,7 +179,8 @@ public final void missingStartTimeTest() throws Exception {
map.put("initialPositionInStream", InitialPositionInStream.AT_TIMESTAMP);

DynamoDBSource source = new DynamoDBSource();
source.open(map, null);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
source.open(map, sourceContext);
}

private File getFile(String name) {
Expand Down
5 changes: 5 additions & 0 deletions pulsar-io/influxdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
<name>Pulsar IO :: InfluxDB</name>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public class InfluxDBGenericRecordSink implements Sink<GenericRecord> {
@Override
public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
try {
val configV2 = InfluxDBSinkConfig.load(map);
val configV2 = InfluxDBSinkConfig.load(map, sinkContext);
configV2.validate();
sink = new InfluxDBSink();
} catch (Exception e) {
try {
val configV1 = org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map);
val configV1 = org.apache.pulsar.io.influxdb.v1.InfluxDBSinkConfig.load(map, sinkContext);
configV1.validate();
sink = new org.apache.pulsar.io.influxdb.v1.InfluxDBGenericRecordSink();
} catch (Exception e1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class InfluxDBAbstractSink<T> extends BatchSink<Point, T> {

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config);
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext);
influxDBSinkConfig.validate();

super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
Expand Down Expand Up @@ -94,7 +96,7 @@ public class InfluxDBSinkConfig implements Serializable {

@FieldDoc(
required = false,
defaultValue = "1000L",
defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000L;

Expand All @@ -110,14 +112,11 @@ public static InfluxDBSinkConfig load(String yamlFile) throws IOException {
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class);
public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext sinkContext) throws IOException {
return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, sinkContext);
}

public void validate() {
Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set.");
Preconditions.checkNotNull(database, "database property not set.");
Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class InfluxDBSink extends BatchSink<Point, GenericRecord> {

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config);
InfluxDBSinkConfig influxDBSinkConfig = InfluxDBSinkConfig.load(config, sinkContext);
influxDBSinkConfig.validate();
super.init(influxDBSinkConfig.getBatchTimeMs(), influxDBSinkConfig.getBatchSize());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;

/**
Expand Down Expand Up @@ -87,7 +89,7 @@ public class InfluxDBSinkConfig implements Serializable {

@FieldDoc(
required = false,
defaultValue = "1000L",
defaultValue = "1000",
help = "The InfluxDB operation time in milliseconds")
private long batchTimeMs = 1000;

Expand All @@ -103,17 +105,11 @@ public static InfluxDBSinkConfig load(String yamlFile) throws IOException {
return mapper.readValue(new File(yamlFile), InfluxDBSinkConfig.class);
}

public static InfluxDBSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(mapper.writeValueAsString(map), InfluxDBSinkConfig.class);
public static InfluxDBSinkConfig load(Map<String, Object> map, SinkContext sinkContext) throws IOException {
return IOConfigUtils.loadWithSecrets(map, InfluxDBSinkConfig.class, sinkContext);
}

public void validate() {
Preconditions.checkNotNull(influxdbUrl, "influxdbUrl property not set.");
Preconditions.checkNotNull(token, "token property not set.");
Preconditions.checkNotNull(organization, "organization property not set.");
Preconditions.checkNotNull(bucket, "bucket property not set.");

Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
}
Expand Down
Loading

0 comments on commit 278ac2d

Please sign in to comment.