diff --git a/actions/scripts/publish.sh b/actions/scripts/publish.sh index 675206638ff..200ca6ce970 100755 --- a/actions/scripts/publish.sh +++ b/actions/scripts/publish.sh @@ -56,4 +56,5 @@ else deployProject "com.yahoo.athenz:athenz-server-aws-common" deployProject "com.yahoo.athenz:athenz-syncer-common" deployProject "com.yahoo.athenz:athenz-instance-provider" + deployProject "com.yahoo.athenz:athenz-server-msg-pulsar" fi diff --git a/libs/java/server_common/pom.xml b/libs/java/server_common/pom.xml index 3af3ca99e5f..f26320dc373 100644 --- a/libs/java/server_common/pom.xml +++ b/libs/java/server_common/pom.xml @@ -27,7 +27,7 @@ jar - 0.9038 + 0.9063 @@ -87,45 +87,6 @@ ${logback.server.version} test - - org.apache.pulsar - pulsar-client - ${pulsar.client.version} - - - org.bouncycastle - bcprov-ext-jdk18on - - - javax.ws.rs - javax.ws.rs-api - - - javax.validation - validation-api - - - com.sun.activation - javax.activation - - - org.bouncycastle - bcpkix-jdk15on - - - org.bouncycastle - bcprov-jdk15on - - - org.bouncycastle - bcutil-jdk15on - - - org.bouncycastle - bcprov-ext-jdk15on - - - com.fasterxml.uuid java-uuid-generator diff --git a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisher.java b/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisher.java deleted file mode 100644 index cf067895f20..00000000000 --- a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisher.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Copyright The Athenz Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.yahoo.athenz.common.messaging.pulsar; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.yahoo.athenz.common.messaging.ChangePublisher; -import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.invoke.MethodHandles; - -public class PulsarChangePublisher implements ChangePublisher { - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private final Producer producer; - private final PulsarClientImpl pulsarClient; - - public PulsarChangePublisher(String serviceUrl, String topicName, AthenzPulsarClient.TlsConfig tlsConfig) { - ProducerConfigurationData producerConfig = AthenzPulsarClient.defaultProducerConfig(topicName); - pulsarClient = AthenzPulsarClient.createPulsarClient(serviceUrl, tlsConfig); - producer = AthenzPulsarClient.createProducer(pulsarClient, producerConfig); - LOG.debug("created publisher: {}, producer: {}", this.getClass(), producer); - } - - @Override - public void publish(T message) { - if (LOG.isDebugEnabled()) { - LOG.debug("producer: {}, publishing message: {}", producer, message); - } - try { - producer.send(OBJECT_MAPPER.writeValueAsBytes(message)); - } catch (PulsarClientException | JsonProcessingException e) { - LOG.error("Pulsar client was not able to publish message. error: {}", e.getMessage(), e); - } - } - - @Override - public void close() { - try { - producer.close(); - pulsarClient.shutdown(); - } catch (PulsarClientException e) { - LOG.error("Got exception while closing pulsar producer: {}", e.getMessage(), e); - } - } -} diff --git a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriber.java b/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriber.java deleted file mode 100644 index fa293d17fc9..00000000000 --- a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriber.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Copyright The Athenz Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.yahoo.athenz.common.messaging.pulsar; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.yahoo.athenz.common.messaging.ChangeSubscriber; -import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.invoke.MethodHandles; -import java.util.Collections; -import java.util.concurrent.TimeUnit; - -import static com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient.defaultConsumerConfig; - -public class PulsarChangeSubscriber implements ChangeSubscriber { - - public static final String PROP_MESSAGING_CLI_CONSUMER_TO_SEC = "athenz.messaging_cli.consumer.timeout_sec"; - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - private final PulsarClientImpl pulsarClient; - private final Consumer consumer; - protected java.util.function.Consumer processor; - protected Class valueType; - private boolean closed = false; - private Thread subscriberThread; - private final int rcvMsgTimeout; - - public PulsarChangeSubscriber(String serviceUrl, - String topicName, - String subscriptionName, - SubscriptionType subscriptionType, - AthenzPulsarClient.TlsConfig tlsConfig) { - - ConsumerConfigurationData consumerConfiguration = defaultConsumerConfig(Collections.singleton(topicName), subscriptionName, subscriptionType); - pulsarClient = AthenzPulsarClient.createPulsarClient(serviceUrl, tlsConfig); - consumer = AthenzPulsarClient.createConsumer(pulsarClient, consumerConfiguration); - rcvMsgTimeout = Integer.parseInt(System.getProperty(PROP_MESSAGING_CLI_CONSUMER_TO_SEC, "1")); - - LOG.debug("created publisher: {}, pulsarConsumer: {}", this.getClass(), consumer); - } - - @Override - public void init(java.util.function.Consumer processor, Class valueType) { - this.processor = processor; - this.valueType = valueType; - } - - @Override - public void run() { - subscriberThread = Thread.currentThread(); - while (!closed) { - if (LOG.isDebugEnabled()) { - LOG.debug("looping over the consumer receive method"); - } - try { - Message msg = consumer.receive(rcvMsgTimeout, TimeUnit.SECONDS); - if (msg != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("received message: {}", new String(msg.getData())); - } - - T message = OBJECT_MAPPER.readValue(msg.getData(), valueType); - processor.accept(message); - consumer.acknowledge(msg); - } - } catch (Exception e) { - LOG.error("exception in receiving the message: {}", e.getMessage(), e); - } - } - } - - @Override - public void close() { - closed = true; - subscriberThread.interrupt(); - try { - consumer.close(); - pulsarClient.shutdown(); - } catch (PulsarClientException e) { - LOG.error("Got exception while closing pulsar consumer: {}", e.getMessage(), e); - } - } -} diff --git a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactory.java b/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactory.java deleted file mode 100644 index 817657da272..00000000000 --- a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactory.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Copyright The Athenz Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.yahoo.athenz.common.messaging.pulsar; - -import com.yahoo.athenz.auth.PrivateKeyStore; -import com.yahoo.athenz.common.messaging.ChangePublisher; -import com.yahoo.athenz.common.messaging.ChangePublisherFactory; -import com.yahoo.athenz.common.messaging.ChangeSubscriber; -import com.yahoo.athenz.common.messaging.ChangeSubscriberFactory; -import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient; -import org.apache.pulsar.client.api.SubscriptionType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.invoke.MethodHandles; - -public class PulsarFactory implements ChangePublisherFactory, ChangeSubscriberFactory { - public static final String PROP_MESSAGING_CLI_SERVICE_URL = "athenz.messaging_cli.service_url"; - public static final String PROP_MESSAGING_CLI_KEY_PATH = "athenz.messaging_cli.key_path"; - public static final String PROP_MESSAGING_CLI_CERT_PATH = "athenz.messaging_cli.cert_path"; - public static final String PROP_MESSAGING_CLI_TRUST_STORE_PATH = "athenz.messaging_cli.truststore_path"; - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - protected static AthenzPulsarClient.TlsConfig tlsConfig() { - String tlsCertPath = System.getProperty(PROP_MESSAGING_CLI_CERT_PATH); - String tlsKeyPath = System.getProperty(PROP_MESSAGING_CLI_KEY_PATH); - String tlsCaPath = System.getProperty(PROP_MESSAGING_CLI_TRUST_STORE_PATH); - - if (tlsCertPath == null || tlsKeyPath == null || tlsCaPath == null) { - LOG.error("Pulsar client configuration invalid. tlsCertPath :[{}]. tlsKeyPath : [{}], tlsCaPath: [{}]", tlsCertPath, tlsKeyPath, tlsCaPath); - throw new IllegalArgumentException("invalid settings configured"); - } - - return new AthenzPulsarClient.TlsConfig(tlsCertPath, tlsKeyPath, tlsCaPath); - } - - protected static String serviceUrl() { - String serviceUrl = System.getProperty(PROP_MESSAGING_CLI_SERVICE_URL); - - if (serviceUrl == null) { - LOG.error("Pulsar client null service url"); - throw new IllegalArgumentException("invalid pulsar service url"); - } - - return serviceUrl; - } - - @Override - public ChangePublisher create(PrivateKeyStore keyStore, String topicName) { - LOG.error("creating a pulsar change publisher"); - return new PulsarChangePublisher<>(serviceUrl(), topicName, tlsConfig()); - } - - @Override - public ChangeSubscriber create(PrivateKeyStore keyStore, String topicName, String subscriptionName, String subscriptionTypeAsString) { - return new PulsarChangeSubscriber<>(serviceUrl(), topicName, subscriptionName, toSubscriptionType(subscriptionTypeAsString), tlsConfig()); - } - - private SubscriptionType toSubscriptionType(String subscriptionType) { - return SubscriptionType.valueOf(subscriptionType); - } -} diff --git a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient.java b/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient.java deleted file mode 100644 index 2243ff97764..00000000000 --- a/libs/java/server_common/src/main/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * - * Copyright The Athenz Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package com.yahoo.athenz.common.messaging.pulsar.client; - -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.client.impl.auth.AuthenticationTls; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.invoke.MethodHandles; -import java.util.Set; -import java.util.concurrent.ExecutionException; - -public class AthenzPulsarClient { - - private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public static final String PROP_PULSAR_MAX_PENDING_MSGS = "athenz.pulsar.max_pending_msgs"; - public static final String PROP_ATHENZ_PULSAR_CLIENT_CLASS = "athenz.pulsar.pulsar_client_class"; - public static final String PROP_ATHENZ_PULSAR_CLIENT_CLASS_DEFAULT = "com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient"; - - public static PulsarClientImpl createPulsarClient(String serviceUrl, TlsConfig tlsConfig) { - if (tlsConfig == null || tlsConfig.tlsCertFilePath == null || tlsConfig.tlsKeyFilePath == null || tlsConfig.tlsTrustCertsFilePath == null) { - throw new IllegalArgumentException("invalid tls configured"); - } - if (serviceUrl == null || serviceUrl.isEmpty()) { - throw new IllegalArgumentException("invalid service configured"); - } - try { - ClientConfigurationData config = getClientConfiguration(tlsConfig); - AthenzPulsarClient athenzPulsarClient = createAthenzPulsarClientInstance(); - return athenzPulsarClient.getPulsarClient(serviceUrl, config); - } catch (PulsarClientException e) { - LOG.error("Failed to create pulsar client: {}", e.getMessage(), e); - } - throw new IllegalStateException("failed to create pulsar client"); - } - - public static ProducerConfigurationData defaultProducerConfig(String topicName) { - int maxPendingMessages = Integer.parseInt(System.getProperty(PROP_PULSAR_MAX_PENDING_MSGS, "10000")); - ProducerConfigurationData producerConfiguration = new ProducerConfigurationData(); - producerConfiguration.setBlockIfQueueFull(true); - producerConfiguration.setMaxPendingMessages(maxPendingMessages); - producerConfiguration.setTopicName(topicName); - return producerConfiguration; - } - - public static Producer createProducer(String serviceUrl, String topicName, TlsConfig tlsConfig) { - ProducerConfigurationData producerConfiguration = defaultProducerConfig(topicName); - PulsarClientImpl pulsarClient = createPulsarClient(serviceUrl, tlsConfig); - return createProducer(pulsarClient, producerConfiguration); - } - - public static Producer createProducer(PulsarClientImpl pulsarClient, ProducerConfigurationData producerConfiguration) { - return createProducer(pulsarClient, producerConfiguration, Schema.BYTES); - } - - public static Producer createProducer(PulsarClientImpl pulsarClient, ProducerConfigurationData producerConfiguration, Schema schema) { - if (producerConfiguration.getTopicName() == null || producerConfiguration.getTopicName().isEmpty()) { - throw new IllegalArgumentException("invalid topic configured"); - } - try { - return pulsarClient.createProducerAsync(producerConfiguration, schema).get(); - } catch (ExecutionException e) { - LOG.error("Failed to create pulsar producer: {}", e.getMessage(), e); - } catch (InterruptedException e) { - LOG.error("Failed to create pulsar producer, thread was interrupt: {}", e.getMessage(), e); - Thread.currentThread().interrupt(); - } - throw new IllegalStateException("failed to create pulsar producer"); - } - - private static ClientConfigurationData getClientConfiguration(TlsConfig tlsConfig) { - ClientConfigurationData config = new ClientConfigurationData(); - AuthenticationTls authenticationTls = new AuthenticationTls(tlsConfig.tlsCertFilePath, tlsConfig.tlsKeyFilePath); - config.setAuthentication(authenticationTls); - config.setTlsAllowInsecureConnection(false); - config.setTlsHostnameVerificationEnable(true); - config.setTlsTrustCertsFilePath(tlsConfig.tlsTrustCertsFilePath); - config.setUseTls(true); - return config; - } - - private static AthenzPulsarClient createAthenzPulsarClientInstance() { - AthenzPulsarClient instance; - String pulsarClientClassName = System.getProperty(PROP_ATHENZ_PULSAR_CLIENT_CLASS, PROP_ATHENZ_PULSAR_CLIENT_CLASS_DEFAULT); - try { - instance = (AthenzPulsarClient) Class.forName(pulsarClientClassName).getDeclaredConstructor().newInstance(); - } catch (Exception ex) { - throw new ExceptionInInitializerError(ex); - } - return instance; - } - - protected PulsarClientImpl getPulsarClient(String serviceUrl, ClientConfigurationData config) throws PulsarClientException { - config.setServiceUrl(serviceUrl); - return new PulsarClientImpl(config); - } - - public static ConsumerConfigurationData defaultConsumerConfig(Set topicNames, String subscriptionName, SubscriptionType subscriptionType) { - if (subscriptionType == null) { - throw new IllegalArgumentException("invalid subscription type configured"); - } - ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); - conf.setSubscriptionType(subscriptionType); - conf.setSubscriptionName(subscriptionName); - conf.setTopicNames(topicNames); - conf.setPoolMessages(true); - return conf; - } - - public static Consumer createConsumer(String serviceUrl, Set topicNames, String subscriptionName, SubscriptionType subscriptionType, TlsConfig tlsConfig) { - ConsumerConfigurationData consumerConfiguration = defaultConsumerConfig(topicNames, subscriptionName, subscriptionType); - PulsarClientImpl pulsarClient = createPulsarClient(serviceUrl, tlsConfig); - return createConsumer(pulsarClient, consumerConfiguration); - } - - public static Consumer createConsumer(PulsarClientImpl pulsarClient, ConsumerConfigurationData consumerConfiguration) { - return createConsumer(pulsarClient, consumerConfiguration, Schema.BYTES); - } - - public static Consumer createConsumer(PulsarClientImpl pulsarClient, ConsumerConfigurationData consumerConfiguration, Schema schema) { - validateConsumerConfiguration(consumerConfiguration); - try { - return pulsarClient.subscribeAsync(consumerConfiguration, schema, null).get(); - } catch (ExecutionException e) { - LOG.error("Failed to create pulsar consumer: {}", e.getMessage(), e); - } catch (InterruptedException e) { - LOG.error("Failed to create pulsar consumer, thread was interrupt: {}", e.getMessage(), e); - Thread.currentThread().interrupt(); - } - throw new IllegalStateException("failed to create pulsar consumer"); - } - - private static void validateConsumerConfiguration(ConsumerConfigurationData consumerConfiguration) { - if (consumerConfiguration.getSubscriptionName() == null) { - throw new IllegalArgumentException("invalid subscription name configured"); - } - if (consumerConfiguration.getTopicNames() == null || consumerConfiguration.getTopicNames().isEmpty()) { - throw new IllegalArgumentException("invalid topic configured"); - } - for (String topic : consumerConfiguration.getTopicNames()) { - if (topic == null) { - throw new IllegalArgumentException("invalid topic configured"); - } - } - } - - public static class TlsConfig { - String tlsCertFilePath; - String tlsKeyFilePath; - String tlsTrustCertsFilePath; - - public TlsConfig(String tlsCertFilePath, String tlsKeyFilePath, String tlsTrustCertsFilePath) { - this.tlsCertFilePath = tlsCertFilePath; - this.tlsKeyFilePath = tlsKeyFilePath; - this.tlsTrustCertsFilePath = tlsTrustCertsFilePath; - } - } - -} diff --git a/libs/java/server_msg_pulsar/README.md b/libs/java/server_msg_pulsar/README.md new file mode 100644 index 00000000000..9528c84a69a --- /dev/null +++ b/libs/java/server_msg_pulsar/README.md @@ -0,0 +1,8 @@ +Athenz Server Messaging/ChangeLog Implementation using Pulsar +============================================================= + +## License + +Copyright The Athenz Authors + +Licensed under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) diff --git a/libs/java/server_msg_pulsar/pom.xml b/libs/java/server_msg_pulsar/pom.xml new file mode 100644 index 00000000000..11c803d1ffa --- /dev/null +++ b/libs/java/server_msg_pulsar/pom.xml @@ -0,0 +1,126 @@ + + + + 4.0.0 + + + com.yahoo.athenz + athenz + 1.12.6-SNAPSHOT + ../../../pom.xml + + + athenz-server-msg-pulsar + athenz-server-msg-pulsar + Athenz Server Messaging/ChangeLog Implementation using Pulsar + jar + + + 0.9139 + + + + + com.yahoo.athenz + athenz-server-common + ${project.parent.version} + + + com.yahoo.athenz + athenz-auth-core + ${project.parent.version} + + + org.apache.pulsar + pulsar-client + ${pulsar.client.version} + + + org.bouncycastle + bcprov-ext-jdk18on + + + javax.ws.rs + javax.ws.rs-api + + + javax.validation + validation-api + + + com.sun.activation + javax.activation + + + org.bouncycastle + bcpkix-jdk15on + + + org.bouncycastle + bcprov-jdk15on + + + org.bouncycastle + bcutil-jdk15on + + + org.bouncycastle + bcprov-ext-jdk15on + + + + + org.bouncycastle + bcprov-jdk18on + ${bouncycastle.version} + + + org.bouncycastle + bcpkix-jdk18on + ${bouncycastle.version} + + + org.bouncycastle + bcutil-jdk18on + ${bouncycastle.version} + + + com.google.protobuf + protobuf-java + ${protobuf.java.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson-core.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson-databind.version} + + + org.slf4j + slf4j-api + ${slf4j.server.version} + + + ch.qos.logback + logback-classic + ${logback.server.version} + test + + + + diff --git a/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisher.java b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisher.java new file mode 100644 index 00000000000..5126a08881f --- /dev/null +++ b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisher.java @@ -0,0 +1,70 @@ +/* + * + * Copyright The Athenz Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.yahoo.athenz.common.messaging.pulsar; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yahoo.athenz.common.messaging.ChangePublisher; +import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +public class PulsarChangePublisher implements ChangePublisher { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final Producer producer; + private final PulsarClientImpl pulsarClient; + + public PulsarChangePublisher(String serviceUrl, String topicName, AthenzPulsarClient.TlsConfig tlsConfig) { + ProducerConfigurationData producerConfig = AthenzPulsarClient.defaultProducerConfig(topicName); + pulsarClient = AthenzPulsarClient.createPulsarClient(serviceUrl, tlsConfig); + producer = AthenzPulsarClient.createProducer(pulsarClient, producerConfig); + LOG.debug("created publisher: {}, producer: {}", this.getClass(), producer); + } + + @Override + public void publish(T message) { + if (LOG.isDebugEnabled()) { + LOG.debug("producer: {}, publishing message: {}", producer, message); + } + try { + producer.send(OBJECT_MAPPER.writeValueAsBytes(message)); + } catch (PulsarClientException | JsonProcessingException e) { + LOG.error("Pulsar client was not able to publish message. error: {}", e.getMessage(), e); + } + } + + @Override + public void close() { + try { + producer.close(); + pulsarClient.shutdown(); + } catch (PulsarClientException e) { + LOG.error("Got exception while closing pulsar producer: {}", e.getMessage(), e); + } + } +} diff --git a/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriber.java b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriber.java new file mode 100644 index 00000000000..bedd623a122 --- /dev/null +++ b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriber.java @@ -0,0 +1,107 @@ +/* + * + * Copyright The Athenz Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.yahoo.athenz.common.messaging.pulsar; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.yahoo.athenz.common.messaging.ChangeSubscriber; +import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient.defaultConsumerConfig; + +public class PulsarChangeSubscriber implements ChangeSubscriber { + + public static final String PROP_MESSAGING_CLI_CONSUMER_TO_SEC = "athenz.messaging_cli.consumer.timeout_sec"; + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private final PulsarClientImpl pulsarClient; + private final Consumer consumer; + protected java.util.function.Consumer processor; + protected Class valueType; + private boolean closed = false; + private Thread subscriberThread; + private final int rcvMsgTimeout; + + public PulsarChangeSubscriber(String serviceUrl, String topicName, String subscriptionName, + SubscriptionType subscriptionType, AthenzPulsarClient.TlsConfig tlsConfig) { + + ConsumerConfigurationData consumerConfiguration = defaultConsumerConfig(Collections.singleton(topicName), + subscriptionName, subscriptionType); + pulsarClient = AthenzPulsarClient.createPulsarClient(serviceUrl, tlsConfig); + consumer = AthenzPulsarClient.createConsumer(pulsarClient, consumerConfiguration); + rcvMsgTimeout = Integer.parseInt(System.getProperty(PROP_MESSAGING_CLI_CONSUMER_TO_SEC, "1")); + + LOG.debug("created publisher: {}, pulsarConsumer: {}", this.getClass(), consumer); + } + + @Override + public void init(java.util.function.Consumer processor, Class valueType) { + this.processor = processor; + this.valueType = valueType; + } + + @Override + public void run() { + subscriberThread = Thread.currentThread(); + while (!closed) { + if (LOG.isDebugEnabled()) { + LOG.debug("looping over the consumer receive method"); + } + try { + Message msg = consumer.receive(rcvMsgTimeout, TimeUnit.SECONDS); + if (msg != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("received message: {}", new String(msg.getData())); + } + + T message = OBJECT_MAPPER.readValue(msg.getData(), valueType); + processor.accept(message); + consumer.acknowledge(msg); + } + } catch (Exception e) { + LOG.error("exception in receiving the message: {}", e.getMessage(), e); + } + } + } + + @Override + public void close() { + closed = true; + subscriberThread.interrupt(); + try { + consumer.close(); + pulsarClient.shutdown(); + } catch (PulsarClientException e) { + LOG.error("Got exception while closing pulsar consumer: {}", e.getMessage(), e); + } + } +} diff --git a/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactory.java b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactory.java new file mode 100644 index 00000000000..c83b9f5455d --- /dev/null +++ b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactory.java @@ -0,0 +1,83 @@ +/* + * + * Copyright The Athenz Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.yahoo.athenz.common.messaging.pulsar; + +import com.yahoo.athenz.auth.PrivateKeyStore; +import com.yahoo.athenz.common.messaging.ChangePublisher; +import com.yahoo.athenz.common.messaging.ChangePublisherFactory; +import com.yahoo.athenz.common.messaging.ChangeSubscriber; +import com.yahoo.athenz.common.messaging.ChangeSubscriberFactory; +import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +public class PulsarFactory implements ChangePublisherFactory, ChangeSubscriberFactory { + + public static final String PROP_MESSAGING_CLI_SERVICE_URL = "athenz.messaging_cli.service_url"; + public static final String PROP_MESSAGING_CLI_KEY_PATH = "athenz.messaging_cli.key_path"; + public static final String PROP_MESSAGING_CLI_CERT_PATH = "athenz.messaging_cli.cert_path"; + public static final String PROP_MESSAGING_CLI_TRUST_STORE_PATH = "athenz.messaging_cli.truststore_path"; + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + protected static AthenzPulsarClient.TlsConfig tlsConfig() { + String tlsCertPath = System.getProperty(PROP_MESSAGING_CLI_CERT_PATH); + String tlsKeyPath = System.getProperty(PROP_MESSAGING_CLI_KEY_PATH); + String tlsCaPath = System.getProperty(PROP_MESSAGING_CLI_TRUST_STORE_PATH); + + if (tlsCertPath == null || tlsKeyPath == null || tlsCaPath == null) { + LOG.error("Pulsar client configuration invalid. tlsCertPath :[{}]. tlsKeyPath : [{}], tlsCaPath: [{}]", + tlsCertPath, tlsKeyPath, tlsCaPath); + throw new IllegalArgumentException("invalid settings configured"); + } + + return new AthenzPulsarClient.TlsConfig(tlsCertPath, tlsKeyPath, tlsCaPath); + } + + protected static String serviceUrl() { + String serviceUrl = System.getProperty(PROP_MESSAGING_CLI_SERVICE_URL); + + if (serviceUrl == null) { + LOG.error("Pulsar client null service url"); + throw new IllegalArgumentException("invalid pulsar service url"); + } + + return serviceUrl; + } + + @Override + public ChangePublisher create(PrivateKeyStore keyStore, String topicName) { + LOG.error("creating a pulsar change publisher"); + return new PulsarChangePublisher<>(serviceUrl(), topicName, tlsConfig()); + } + + @Override + public ChangeSubscriber create(PrivateKeyStore keyStore, String topicName, String subscriptionName, + String subscriptionTypeAsString) { + return new PulsarChangeSubscriber<>(serviceUrl(), topicName, subscriptionName, + toSubscriptionType(subscriptionTypeAsString), tlsConfig()); + } + + private SubscriptionType toSubscriptionType(String subscriptionType) { + return SubscriptionType.valueOf(subscriptionType); + } +} diff --git a/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient.java b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient.java new file mode 100644 index 00000000000..a752c67469a --- /dev/null +++ b/libs/java/server_msg_pulsar/src/main/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClient.java @@ -0,0 +1,180 @@ +/* + * + * Copyright The Athenz Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.yahoo.athenz.common.messaging.pulsar.client; + +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; +import java.util.Set; + +public class AthenzPulsarClient { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String PROP_PULSAR_MAX_PENDING_MSGS = "athenz.pulsar.max_pending_msgs"; + public static final String PROP_ATHENZ_PULSAR_CLIENT_CLASS = "athenz.pulsar.pulsar_client_class"; + public static final String PROP_ATHENZ_PULSAR_CLIENT_CLASS_DEFAULT = "com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient"; + + public static PulsarClientImpl createPulsarClient(String serviceUrl, TlsConfig tlsConfig) { + if (tlsConfig == null || tlsConfig.tlsCertFilePath == null || tlsConfig.tlsKeyFilePath == null || + tlsConfig.tlsTrustCertsFilePath == null) { + throw new IllegalArgumentException("invalid tls configured"); + } + if (serviceUrl == null || serviceUrl.isEmpty()) { + throw new IllegalArgumentException("invalid service configured"); + } + try { + ClientConfigurationData config = getClientConfiguration(tlsConfig); + AthenzPulsarClient athenzPulsarClient = createAthenzPulsarClientInstance(); + return athenzPulsarClient.getPulsarClient(serviceUrl, config); + } catch (PulsarClientException e) { + LOG.error("Failed to create pulsar client: {}", e.getMessage(), e); + } + throw new IllegalStateException("failed to create pulsar client"); + } + + public static ProducerConfigurationData defaultProducerConfig(String topicName) { + int maxPendingMessages = Integer.parseInt(System.getProperty(PROP_PULSAR_MAX_PENDING_MSGS, "10000")); + ProducerConfigurationData producerConfiguration = new ProducerConfigurationData(); + producerConfiguration.setBlockIfQueueFull(true); + producerConfiguration.setMaxPendingMessages(maxPendingMessages); + producerConfiguration.setTopicName(topicName); + return producerConfiguration; + } + + public static Producer createProducer(String serviceUrl, String topicName, TlsConfig tlsConfig) { + ProducerConfigurationData producerConfiguration = defaultProducerConfig(topicName); + PulsarClientImpl pulsarClient = createPulsarClient(serviceUrl, tlsConfig); + return createProducer(pulsarClient, producerConfiguration); + } + + public static Producer createProducer(PulsarClientImpl pulsarClient, ProducerConfigurationData producerConfiguration) { + return createProducer(pulsarClient, producerConfiguration, Schema.BYTES); + } + + public static Producer createProducer(PulsarClientImpl pulsarClient, ProducerConfigurationData producerConfiguration, + Schema schema) { + + if (producerConfiguration.getTopicName() == null || producerConfiguration.getTopicName().isEmpty()) { + throw new IllegalArgumentException("invalid topic configured"); + } + try { + return pulsarClient.createProducerAsync(producerConfiguration, schema).get(); + } catch (Exception ex) { + LOG.error("Failed to create pulsar producer: {}", ex.getMessage(), ex); + throw new IllegalStateException("failed to create pulsar producer"); + } + } + + private static ClientConfigurationData getClientConfiguration(TlsConfig tlsConfig) { + ClientConfigurationData config = new ClientConfigurationData(); + AuthenticationTls authenticationTls = new AuthenticationTls(tlsConfig.tlsCertFilePath, tlsConfig.tlsKeyFilePath); + config.setAuthentication(authenticationTls); + config.setTlsAllowInsecureConnection(false); + config.setTlsHostnameVerificationEnable(true); + config.setTlsTrustCertsFilePath(tlsConfig.tlsTrustCertsFilePath); + config.setUseTls(true); + return config; + } + + private static AthenzPulsarClient createAthenzPulsarClientInstance() { + AthenzPulsarClient instance; + String pulsarClientClassName = System.getProperty(PROP_ATHENZ_PULSAR_CLIENT_CLASS, PROP_ATHENZ_PULSAR_CLIENT_CLASS_DEFAULT); + try { + instance = (AthenzPulsarClient) Class.forName(pulsarClientClassName).getDeclaredConstructor().newInstance(); + } catch (Exception ex) { + throw new ExceptionInInitializerError(ex); + } + return instance; + } + + protected PulsarClientImpl getPulsarClient(String serviceUrl, ClientConfigurationData config) throws PulsarClientException { + config.setServiceUrl(serviceUrl); + return new PulsarClientImpl(config); + } + + public static ConsumerConfigurationData defaultConsumerConfig(Set topicNames, String subscriptionName, + SubscriptionType subscriptionType) { + + if (subscriptionType == null) { + throw new IllegalArgumentException("invalid subscription type configured"); + } + ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + conf.setSubscriptionType(subscriptionType); + conf.setSubscriptionName(subscriptionName); + conf.setTopicNames(topicNames); + conf.setPoolMessages(true); + return conf; + } + + public static Consumer createConsumer(String serviceUrl, Set topicNames, String subscriptionName, + SubscriptionType subscriptionType, TlsConfig tlsConfig) { + ConsumerConfigurationData consumerConfiguration = defaultConsumerConfig(topicNames, + subscriptionName, subscriptionType); + PulsarClientImpl pulsarClient = createPulsarClient(serviceUrl, tlsConfig); + return createConsumer(pulsarClient, consumerConfiguration); + } + + public static Consumer createConsumer(PulsarClientImpl pulsarClient, ConsumerConfigurationData consumerConfiguration) { + return createConsumer(pulsarClient, consumerConfiguration, Schema.BYTES); + } + + public static Consumer createConsumer(PulsarClientImpl pulsarClient, ConsumerConfigurationData consumerConfiguration, + Schema schema) { + validateConsumerConfiguration(consumerConfiguration); + try { + return pulsarClient.subscribeAsync(consumerConfiguration, schema, null).get(); + } catch (Exception ex) { + LOG.error("Failed to create pulsar consumer: {}", ex.getMessage(), ex); + throw new IllegalStateException("failed to create pulsar consumer"); + } + } + + private static void validateConsumerConfiguration(ConsumerConfigurationData consumerConfiguration) { + if (consumerConfiguration.getSubscriptionName() == null) { + throw new IllegalArgumentException("invalid subscription name configured"); + } + if (consumerConfiguration.getTopicNames() == null || consumerConfiguration.getTopicNames().isEmpty()) { + throw new IllegalArgumentException("invalid topic configured"); + } + for (String topic : consumerConfiguration.getTopicNames()) { + if (topic == null) { + throw new IllegalArgumentException("invalid topic configured"); + } + } + } + + public static class TlsConfig { + String tlsCertFilePath; + String tlsKeyFilePath; + String tlsTrustCertsFilePath; + + public TlsConfig(String tlsCertFilePath, String tlsKeyFilePath, String tlsTrustCertsFilePath) { + this.tlsCertFilePath = tlsCertFilePath; + this.tlsKeyFilePath = tlsKeyFilePath; + this.tlsTrustCertsFilePath = tlsTrustCertsFilePath; + } + } +} diff --git a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisherTest.java b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisherTest.java similarity index 95% rename from libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisherTest.java rename to libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisherTest.java index ca76944a91d..d0bd450cad3 100644 --- a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisherTest.java +++ b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangePublisherTest.java @@ -45,7 +45,7 @@ public void tearDown() { } @Test - public void test_validate_publisher() { + public void testValidatePublisher() { try { new PulsarChangePublisher<>(null, "topic", @@ -81,9 +81,11 @@ public void test_validate_publisher() { } @Test - public void test_publisher_creation() { + public void testPublisherCreation() { + System.setProperty(PROP_MESSAGING_CLI_SERVICE_URL, "some-service"); - PulsarChangePublisher publisher = new PulsarChangePublisher<>(serviceUrl(), "some-topic", new TlsConfig("cert", "key", "trust")); + PulsarChangePublisher publisher = new PulsarChangePublisher<>(serviceUrl(), + "some-topic", new TlsConfig("cert", "key", "trust")); publisher.publish(new DomainChangeMessage()); publisher.close(); assertNotNull(getPulsarProducer(publisher)); diff --git a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriberTest.java b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriberTest.java similarity index 97% rename from libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriberTest.java rename to libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriberTest.java index d1aa4ac0e67..80d74aeb4cf 100644 --- a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriberTest.java +++ b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarChangeSubscriberTest.java @@ -53,7 +53,7 @@ public void tearDown() { } @Test - public void test_validate_subscriber() { + public void testValidateSubscriber() { try { new PulsarChangeSubscriber<>("service-url", null, @@ -117,7 +117,7 @@ public void test_validate_subscriber() { } @Test - public void test_subscriber_creation() { + public void testSubscriberCreation() { System.setProperty(PROP_MESSAGING_CLI_SERVICE_URL, "some-service"); PulsarChangeSubscriber subscriber = new PulsarChangeSubscriber<>("service-url", "topic", @@ -128,7 +128,7 @@ public void test_subscriber_creation() { } @Test - public void test_subscribe_to_mock_msg() throws IOException, InterruptedException { + public void testSubscribeToMockMsg() throws IOException, InterruptedException { System.setProperty(PROP_MESSAGING_CLI_SERVICE_URL, "some-service"); PulsarChangeSubscriber subscriber = new PulsarChangeSubscriber<>("service-url", "topic", diff --git a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactoryTest.java b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactoryTest.java similarity index 90% rename from libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactoryTest.java rename to libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactoryTest.java index ff502eaa1c3..9d7b130aaf2 100644 --- a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactoryTest.java +++ b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/PulsarFactoryTest.java @@ -19,6 +19,7 @@ package com.yahoo.athenz.common.messaging.pulsar; import com.yahoo.athenz.common.messaging.DomainChangeMessage; +import com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -42,7 +43,7 @@ public void tearDown() { } @Test - public void test_publisher_creation_no_service() { + public void testPublisherCreationNoService() { System.setProperty(PROP_MESSAGING_CLI_CERT_PATH, "cert"); System.setProperty(PROP_MESSAGING_CLI_KEY_PATH, "key"); System.setProperty(PROP_MESSAGING_CLI_TRUST_STORE_PATH, "trust"); @@ -61,7 +62,7 @@ public void test_publisher_creation_no_service() { } @Test - public void test_publisher_creation_no_topic() { + public void testPublisherCreationNoTopic() { System.setProperty(PROP_MESSAGING_CLI_SERVICE_URL, "some-service"); System.setProperty(PROP_MESSAGING_CLI_CERT_PATH, "cert"); System.setProperty(PROP_MESSAGING_CLI_KEY_PATH, "key"); @@ -82,7 +83,7 @@ public void test_publisher_creation_no_topic() { } @Test - public void test_publisher_creation() { + public void testPublisherCreation() { System.setProperty(PROP_MESSAGING_CLI_SERVICE_URL, "some-service"); System.setProperty(PROP_MESSAGING_CLI_CERT_PATH, "cert"); System.setProperty(PROP_MESSAGING_CLI_KEY_PATH, "key"); @@ -100,7 +101,7 @@ public void test_publisher_creation() { } @Test - public void test_subscriber_creation_invalid_subscription_type() { + public void testSubscriberCreationInvalidSubscriptionType() { System.setProperty(PROP_MESSAGING_CLI_SERVICE_URL, "some-service"); System.setProperty(PROP_MESSAGING_CLI_CERT_PATH, "cert"); System.setProperty(PROP_MESSAGING_CLI_KEY_PATH, "key"); @@ -121,7 +122,7 @@ public void test_subscriber_creation_invalid_subscription_type() { } @Test - public void test_subscriber_creation() { + public void testSubscriberCreation() { System.setProperty(PROP_MESSAGING_CLI_SERVICE_URL, "some-service"); System.setProperty(PROP_MESSAGING_CLI_CERT_PATH, "cert"); System.setProperty(PROP_MESSAGING_CLI_KEY_PATH, "key"); @@ -136,4 +137,14 @@ public void test_subscriber_creation() { System.clearProperty(PROP_MESSAGING_CLI_KEY_PATH); System.clearProperty(PROP_MESSAGING_CLI_TRUST_STORE_PATH); } + + @Test + public void testCreateConfigFailure() { + try { + tlsConfig(); + fail(); + } catch (IllegalArgumentException e) { + assertEquals(e.getMessage(), "invalid settings configured"); + } + } } diff --git a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClientTest.java b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClientTest.java similarity index 50% rename from libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClientTest.java rename to libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClientTest.java index a5ccad4d6f7..c62071e1ad4 100644 --- a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClientTest.java +++ b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/AthenzPulsarClientTest.java @@ -20,8 +20,11 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -29,7 +32,7 @@ import java.util.Collections; import static com.yahoo.athenz.common.messaging.pulsar.client.AthenzPulsarClient.*; -import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.*; public class AthenzPulsarClientTest { @@ -44,7 +47,7 @@ public void tearDown() { } @Test - public void test_producer_creation() { + public void testCreateProducer() { Producer producer = AthenzPulsarClient.createProducer("service", "topic", tlsConfig()); assertNotNull(producer); PulsarClientImpl pulsarClient = AthenzPulsarClient.createPulsarClient("service", tlsConfig()); @@ -53,15 +56,59 @@ public void test_producer_creation() { } @Test - public void test_consumer_creation() { - Consumer consumer = AthenzPulsarClient.createConsumer("service", Collections.singleton("topic"), "subs", SubscriptionType.Exclusive, tlsConfig()); + public void testCreateProducerFailure() { + PulsarClientImpl client = Mockito.mock(PulsarClientImpl.class); + + Mockito.when(client.createProducerAsync(Mockito.any(), Mockito.any())) + .thenThrow(new IllegalArgumentException("invalid configuration")); + try { + AthenzPulsarClient.createProducer(client, defaultProducerConfig("topic")); + fail(); + } catch (IllegalStateException ex) { + assertTrue(ex.getMessage().contains("failed to create pulsar producer")); + } + } + + @Test + public void testCreateConsumer() { + Consumer consumer = AthenzPulsarClient.createConsumer("service", Collections.singleton("topic"), + "subs", SubscriptionType.Exclusive, tlsConfig()); assertNotNull(consumer); PulsarClientImpl pulsarClient = AthenzPulsarClient.createPulsarClient("service", tlsConfig()); - consumer = AthenzPulsarClient.createConsumer(pulsarClient, defaultConsumerConfig(Collections.singleton("topic"), "subs", SubscriptionType.Exclusive)); + consumer = AthenzPulsarClient.createConsumer(pulsarClient, defaultConsumerConfig(Collections.singleton("topic"), + "subs", SubscriptionType.Exclusive)); assertNotNull(consumer); } + @Test + public void testCreateConsumerFailure() { + PulsarClientImpl client = Mockito.mock(PulsarClientImpl.class); + try { + AthenzPulsarClient.createConsumer(client, defaultConsumerConfig(null, + "subs", SubscriptionType.Exclusive)); + fail(); + } catch (IllegalArgumentException ex) { + assertTrue(ex.getMessage().contains("invalid topic configured")); + } + + Mockito.when(client.subscribeAsync(Mockito.any(), Mockito.any(), Mockito.any())) + .thenThrow(new IllegalArgumentException("invalid configuration")); + try { + AthenzPulsarClient.createConsumer(client, defaultConsumerConfig(Collections.singleton("topic"), + "subs", SubscriptionType.Exclusive)); + fail(); + } catch (IllegalStateException ex) { + assertTrue(ex.getMessage().contains("failed to create pulsar consumer")); + } + } + private AthenzPulsarClient.TlsConfig tlsConfig() { return new AthenzPulsarClient.TlsConfig("cert", "key", "truststore"); } + + @Test + public void testGetPulsarClient() throws PulsarClientException { + AthenzPulsarClient client = new AthenzPulsarClient(); + assertNotNull(client.getPulsarClient("https://athenz.io", new ClientConfigurationData())); + } } diff --git a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/MockAthenzPulsarClient.java b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/MockAthenzPulsarClient.java similarity index 73% rename from libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/MockAthenzPulsarClient.java rename to libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/MockAthenzPulsarClient.java index 64c887dd543..25fc6b2fdf6 100644 --- a/libs/java/server_common/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/MockAthenzPulsarClient.java +++ b/libs/java/server_msg_pulsar/src/test/java/com/yahoo/athenz/common/messaging/pulsar/client/MockAthenzPulsarClient.java @@ -46,7 +46,8 @@ protected PulsarClientImpl getPulsarClient(String serviceUrl, ClientConfiguratio try { CompletableFuture asyncProducerResult = null; if (serviceUrl == null || serviceUrl.isEmpty()) { - asyncProducerResult = FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException("Producer configuration undefined")); + asyncProducerResult = FutureUtil.failedFuture( + new PulsarClientException.InvalidConfigurationException("Producer configuration undefined")); } PulsarClientImpl pulsarClient = Mockito.mock(PulsarClientImpl.class); Producer producer = Mockito.mock(Producer.class); @@ -54,25 +55,27 @@ protected PulsarClientImpl getPulsarClient(String serviceUrl, ClientConfiguratio MessageMetadata meta = new MessageMetadata(); DomainChangeMessage roleChange = new DomainChangeMessage() - .setDomainName("domain") - .setObjectType(DomainChangeMessage.ObjectType.ROLE) - .setApiName("putRole") - .setObjectName("role1"); - MessageImpl msg = MessageImpl.create(meta, ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(roleChange)), Schema.BYTES, null); + .setDomainName("domain") + .setObjectType(DomainChangeMessage.ObjectType.ROLE) + .setApiName("putRole") + .setObjectName("role1"); + MessageImpl msg = MessageImpl.create(meta, ByteBuffer.wrap(new ObjectMapper() + .writeValueAsBytes(roleChange)), Schema.BYTES, null); Mockito.when(consumer.receive(1, TimeUnit.SECONDS)) - .thenReturn(msg); + .thenReturn(msg); CompletableFuture finalAsyncProducerResult = asyncProducerResult; Mockito.when(pulsarClient.createProducerAsync(any(ProducerConfigurationData.class), any(Schema.class))) - .thenAnswer(invocation -> { - if (serviceUrl != null) { - return ((ProducerConfigurationData) invocation.getArgument(0)).getTopicName() == null ? finalAsyncProducerResult : CompletableFuture.completedFuture(producer); - } - return finalAsyncProducerResult; - }); + .thenAnswer(invocation -> { + if (serviceUrl != null) { + return ((ProducerConfigurationData) invocation.getArgument(0)).getTopicName() == null ? + finalAsyncProducerResult : CompletableFuture.completedFuture(producer); + } + return finalAsyncProducerResult; + }); Mockito.when(pulsarClient.subscribeAsync(any(ConsumerConfigurationData.class), any(Schema.class), any())) - .thenReturn(CompletableFuture.completedFuture(consumer)); + .thenReturn(CompletableFuture.completedFuture(consumer)); return pulsarClient; } catch (Exception e) { @@ -80,5 +83,4 @@ protected PulsarClientImpl getPulsarClient(String serviceUrl, ClientConfiguratio } return null; } - } \ No newline at end of file diff --git a/libs/java/server_msg_pulsar/src/test/resources/sd_logback.xml b/libs/java/server_msg_pulsar/src/test/resources/sd_logback.xml new file mode 100644 index 00000000000..a11d86de9f7 --- /dev/null +++ b/libs/java/server_msg_pulsar/src/test/resources/sd_logback.xml @@ -0,0 +1,15 @@ + + + + + System.out + + %-4relative [%thread] %-5level %class - %msg%n + + + + + + + + diff --git a/pom.xml b/pom.xml index 8ec3fa88020..bde4cc579a6 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,7 @@ libs/java/server_aws_common libs/java/syncer_common libs/java/instance_provider + libs/java/server_msg_pulsar clients/java/zpe clients/java/msd libs/java/gcp_zts_creds @@ -486,6 +487,7 @@ libs/java/syncer_common libs/java/instance_provider libs/java/dynamodb_client_factory + libs/java/server_msg_pulsar clients/java/zms clients/java/zts clients/java/zpe