Skip to content

Commit

Permalink
Load sensitive fields from secrets
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Dec 6, 2023
1 parent 72bfc8f commit cd06555
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.core.annotations.FieldDoc;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsTopic;
Expand All @@ -36,7 +37,18 @@
@Accessors(chain = true)
public class AmqpBaseConfig {

@FieldDoc(
sensitive = true,
defaultValue = "",
help = "the username of the AMQP."
)
private String username;

@FieldDoc(
sensitive = true,
defaultValue = "",
help = "the password of the AMQP."
)
private String password;
@Deprecated
/* Use Connection with failover support instead
Expand All @@ -53,9 +65,29 @@ public class AmqpBaseConfig {
For single uri configuration without failover support provide a list with one ConnectionUri in Connection
*/
private int port;

@FieldDoc(
defaultValue = "",
help = "the AMQP queue."
)
private String queue;

@FieldDoc(
defaultValue = "",
help = "the AMQP topic."
)
private String topic;

@FieldDoc(
defaultValue = "",
help = "the connection url of AMQP."
)
public Connection connection;

@FieldDoc(
defaultValue = "false",
help = "whether message is test format only."
)
private boolean onlyTextMessage = false;

public static AmqpBaseConfig load(Map<String, Object> config) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class AmqpSink implements Sink<ByteBuffer> {

@Override
public void open(Map map, SinkContext sinkContext) throws Exception {
config = AmqpSinkConfig.load(map);
config = AmqpSinkConfig.load(map, sinkContext);
config.validate();

JmsConnectionFactory factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SinkContext;


/**
Expand All @@ -34,9 +36,8 @@
@Accessors(chain = true)
public class AmqpSinkConfig extends AmqpBaseConfig {

public static AmqpSinkConfig load(Map<String, Object> config) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(objectMapper.writeValueAsBytes(config), AmqpSinkConfig.class);
public static AmqpSinkConfig load(Map<String, Object> config, SinkContext sinkContext) throws IOException {
return IOConfigUtils.loadWithSecrets(config, AmqpSinkConfig.class, sinkContext);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class AmqpSource extends PushSource<ByteBuffer> {

@Override
public void open(Map map, SourceContext sourceContext) throws Exception {
config = AmqpSourceConfig.load(map);
config = AmqpSourceConfig.load(map, sourceContext);
config.validate();

JmsConnectionFactory factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.common.IOConfigUtils;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.FieldDoc;


/**
Expand All @@ -36,11 +39,14 @@
public class AmqpSourceConfig extends AmqpBaseConfig {

// Default session mode
@FieldDoc(
defaultValue = "1",
help = "the session mode."
)
private int sessionMode = JMSContext.AUTO_ACKNOWLEDGE;

public static AmqpSourceConfig load(Map<String, Object> config) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(objectMapper.writeValueAsBytes(config), AmqpSourceConfig.class);
public static AmqpSourceConfig load(Map<String, Object> config, SourceContext sourceContext) throws IOException {
return IOConfigUtils.loadWithSecrets(config, AmqpSourceConfig.class, sourceContext);
}

public int getSessionMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.List;
import java.util.Map;

import org.apache.pulsar.io.core.SourceContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;


/**
Expand Down Expand Up @@ -276,7 +278,8 @@ public void destinationConflictSetTest() throws Exception {
Map<String, Object> paramsMap = getBaseConfig();
paramsMap.put("queue", "test-queue");
paramsMap.put("topic", "test-topic");
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext);
try {
sourceConfig.validate();
Assert.fail("The test should fail because queue and topic all set at the same time.");
Expand All @@ -289,7 +292,8 @@ public void destinationConflictSetTest() throws Exception {
@Test
public void destinationNotSetTest() throws Exception {
Map<String, Object> paramsMap = getBaseConfig();
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext);
try {
sourceConfig.validate();
Assert.fail("The test should fail because queue and topic all not set.");
Expand All @@ -303,12 +307,13 @@ public void destinationNotSetTest() throws Exception {
public void destinationConfigTest() throws Exception {
Map<String, Object> paramsMap = getBaseConfig();
paramsMap.put("queue", "test-queue");
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext);
sourceConfig.validate();

paramsMap.remove("queue");
paramsMap.put("topic", "test-topic");
sourceConfig = AmqpSourceConfig.load(paramsMap);
sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext);
sourceConfig.validate();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSContext;
import org.apache.pulsar.io.core.SourceContext;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/**
* Amqp source config test.
Expand All @@ -32,7 +34,8 @@ public class AmqpSourceConfigTest {
@Test
public void testDefaultSessionMode() throws Exception {
Map<String, Object> paramsMap = getBaseConfig();
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext);
sourceConfig.validate();

Assert.assertEquals(JMSContext.AUTO_ACKNOWLEDGE, sourceConfig.getSessionMode());
Expand All @@ -42,12 +45,31 @@ public void testDefaultSessionMode() throws Exception {
public void testClientAcknowledgeSessionMode() throws Exception {
Map<String, Object> paramsMap = getBaseConfig();
paramsMap.put("sessionMode", 2);
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap);
SourceContext sourceContext = Mockito.mock(SourceContext.class);
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext);
sourceConfig.validate();

Assert.assertEquals(JMSContext.CLIENT_ACKNOWLEDGE, sourceConfig.getSessionMode());
}

@Test
public void testLoadCredentialFromSecret() throws Exception {
Map<String, Object> paramsMap = getBaseConfig();
paramsMap.put("sessionMode", 1);

SourceContext sourceContext = Mockito.mock(SourceContext.class);
Mockito.when(sourceContext.getSecret("username"))
.thenReturn("admin");
Mockito.when(sourceContext.getSecret("password"))
.thenReturn("admin");
AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext);
sourceConfig.validate();

Assert.assertEquals(JMSContext.AUTO_ACKNOWLEDGE, sourceConfig.getSessionMode());
Assert.assertEquals("admin", sourceConfig.getUsername());
Assert.assertEquals("admin", sourceConfig.getPassword());
}

private Map<String, Object> getBaseConfig() {
Map<String, Object> paramsMap = new HashMap<>();
paramsMap.put("protocol", "amqp");
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<!-- keep all the dependencies used by all modules here -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>pulsar-io-common</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>pulsar-io-core</artifactId>
Expand Down Expand Up @@ -182,6 +187,11 @@
</dependency>

<!-- runtime dependencies -->
<dependency>
<groupId>io.streamnative</groupId>
<artifactId>pulsar-io-common</artifactId>
</dependency>

<dependency>
<groupId>io.streamnative</groupId>
<artifactId>pulsar-io-core</artifactId>
Expand Down

0 comments on commit cd06555

Please sign in to comment.