diff --git a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfig.java b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfig.java index 6913ff27..255e093c 100644 --- a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfig.java +++ b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfig.java @@ -25,6 +25,7 @@ import lombok.Data; import lombok.experimental.Accessors; +import org.apache.pulsar.io.core.annotations.FieldDoc; import org.apache.qpid.jms.JmsDestination; import org.apache.qpid.jms.JmsQueue; import org.apache.qpid.jms.JmsTopic; @@ -36,7 +37,18 @@ @Accessors(chain = true) public class AmqpBaseConfig { + @FieldDoc( + sensitive = true, + defaultValue = "", + help = "the username of the AMQP." + ) private String username; + + @FieldDoc( + sensitive = true, + defaultValue = "", + help = "the password of the AMQP." + ) private String password; @Deprecated /* Use Connection with failover support instead @@ -53,9 +65,29 @@ public class AmqpBaseConfig { For single uri configuration without failover support provide a list with one ConnectionUri in Connection */ private int port; + + @FieldDoc( + defaultValue = "", + help = "the AMQP queue." + ) private String queue; + + @FieldDoc( + defaultValue = "", + help = "the AMQP topic." + ) private String topic; + + @FieldDoc( + defaultValue = "", + help = "the connection url of AMQP." + ) public Connection connection; + + @FieldDoc( + defaultValue = "false", + help = "whether message is test format only." + ) private boolean onlyTextMessage = false; public static AmqpBaseConfig load(Map config) throws IOException { diff --git a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSink.java b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSink.java index 26b64b59..d29bfd86 100644 --- a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSink.java +++ b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSink.java @@ -55,7 +55,7 @@ public class AmqpSink implements Sink { @Override public void open(Map map, SinkContext sinkContext) throws Exception { - config = AmqpSinkConfig.load(map); + config = AmqpSinkConfig.load(map, sinkContext); config.validate(); JmsConnectionFactory factory; diff --git a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSinkConfig.java b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSinkConfig.java index ac38b896..cf63041d 100644 --- a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSinkConfig.java +++ b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSinkConfig.java @@ -18,12 +18,13 @@ */ package org.apache.pulsar.ecosystem.io.amqp; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SinkContext; /** @@ -34,9 +35,8 @@ @Accessors(chain = true) public class AmqpSinkConfig extends AmqpBaseConfig { - public static AmqpSinkConfig load(Map config) throws IOException { - ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(objectMapper.writeValueAsBytes(config), AmqpSinkConfig.class); + public static AmqpSinkConfig load(Map config, SinkContext sinkContext) throws IOException { + return IOConfigUtils.loadWithSecrets(config, AmqpSinkConfig.class, sinkContext); } } diff --git a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSource.java b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSource.java index 417416b8..23ea7ad4 100644 --- a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSource.java +++ b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSource.java @@ -55,7 +55,7 @@ public class AmqpSource extends PushSource { @Override public void open(Map map, SourceContext sourceContext) throws Exception { - config = AmqpSourceConfig.load(map); + config = AmqpSourceConfig.load(map, sourceContext); config.validate(); JmsConnectionFactory factory; diff --git a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfig.java b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfig.java index c3e9e861..03067c07 100644 --- a/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfig.java +++ b/io-amqp1_0-impl/src/main/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfig.java @@ -18,13 +18,15 @@ */ package org.apache.pulsar.ecosystem.io.amqp; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; import javax.jms.JMSContext; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; +import org.apache.pulsar.io.common.IOConfigUtils; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.core.annotations.FieldDoc; /** @@ -36,11 +38,14 @@ public class AmqpSourceConfig extends AmqpBaseConfig { // Default session mode + @FieldDoc( + defaultValue = "1", + help = "the session mode." + ) private int sessionMode = JMSContext.AUTO_ACKNOWLEDGE; - public static AmqpSourceConfig load(Map config) throws IOException { - ObjectMapper objectMapper = new ObjectMapper(); - return objectMapper.readValue(objectMapper.writeValueAsBytes(config), AmqpSourceConfig.class); + public static AmqpSourceConfig load(Map config, SourceContext sourceContext) throws IOException { + return IOConfigUtils.loadWithSecrets(config, AmqpSourceConfig.class, sourceContext); } public int getSessionMode() { diff --git a/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfigTest.java b/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfigTest.java index e1e65565..87f5d2f8 100644 --- a/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfigTest.java +++ b/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpBaseConfigTest.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Map; +import org.apache.pulsar.io.core.SourceContext; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; /** @@ -276,7 +278,8 @@ public void destinationConflictSetTest() throws Exception { Map paramsMap = getBaseConfig(); paramsMap.put("queue", "test-queue"); paramsMap.put("topic", "test-topic"); - AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext); try { sourceConfig.validate(); Assert.fail("The test should fail because queue and topic all set at the same time."); @@ -289,7 +292,8 @@ public void destinationConflictSetTest() throws Exception { @Test public void destinationNotSetTest() throws Exception { Map paramsMap = getBaseConfig(); - AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext); try { sourceConfig.validate(); Assert.fail("The test should fail because queue and topic all not set."); @@ -303,12 +307,13 @@ public void destinationNotSetTest() throws Exception { public void destinationConfigTest() throws Exception { Map paramsMap = getBaseConfig(); paramsMap.put("queue", "test-queue"); - AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext); sourceConfig.validate(); paramsMap.remove("queue"); paramsMap.put("topic", "test-topic"); - sourceConfig = AmqpSourceConfig.load(paramsMap); + sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext); sourceConfig.validate(); } diff --git a/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfigTest.java b/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfigTest.java index c5d1598a..c9b62f2d 100644 --- a/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfigTest.java +++ b/io-amqp1_0-impl/src/test/java/org/apache/pulsar/ecosystem/io/amqp/AmqpSourceConfigTest.java @@ -21,8 +21,10 @@ import java.util.HashMap; import java.util.Map; import javax.jms.JMSContext; +import org.apache.pulsar.io.core.SourceContext; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; /** * Amqp source config test. @@ -32,7 +34,8 @@ public class AmqpSourceConfigTest { @Test public void testDefaultSessionMode() throws Exception { Map paramsMap = getBaseConfig(); - AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext); sourceConfig.validate(); Assert.assertEquals(JMSContext.AUTO_ACKNOWLEDGE, sourceConfig.getSessionMode()); @@ -42,12 +45,31 @@ public void testDefaultSessionMode() throws Exception { public void testClientAcknowledgeSessionMode() throws Exception { Map paramsMap = getBaseConfig(); paramsMap.put("sessionMode", 2); - AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap); + SourceContext sourceContext = Mockito.mock(SourceContext.class); + AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext); sourceConfig.validate(); Assert.assertEquals(JMSContext.CLIENT_ACKNOWLEDGE, sourceConfig.getSessionMode()); } + @Test + public void testLoadCredentialFromSecret() throws Exception { + Map paramsMap = getBaseConfig(); + paramsMap.put("sessionMode", 1); + + SourceContext sourceContext = Mockito.mock(SourceContext.class); + Mockito.when(sourceContext.getSecret("username")) + .thenReturn("admin"); + Mockito.when(sourceContext.getSecret("password")) + .thenReturn("admin"); + AmqpSourceConfig sourceConfig = AmqpSourceConfig.load(paramsMap, sourceContext); + sourceConfig.validate(); + + Assert.assertEquals(JMSContext.AUTO_ACKNOWLEDGE, sourceConfig.getSessionMode()); + Assert.assertEquals("admin", sourceConfig.getUsername()); + Assert.assertEquals("admin", sourceConfig.getPassword()); + } + private Map getBaseConfig() { Map paramsMap = new HashMap<>(); paramsMap.put("protocol", "amqp"); diff --git a/pom.xml b/pom.xml index 863ca609..cf39e700 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,11 @@ + + io.streamnative + pulsar-io-common + ${pulsar.version} + io.streamnative pulsar-io-core @@ -182,6 +187,11 @@ + + io.streamnative + pulsar-io-common + + io.streamnative pulsar-io-core