From 07e5706b1c50be57616bc21580d646798680c35f Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Mon, 8 Mar 2021 17:05:46 +0800 Subject: [PATCH] update the connector docs --- README.md | 128 +------------------ docs/io-amqp1_0-sink.md | 230 +++++++++++++++++++++++++++++++++++ docs/io-amqp1_0-source.md | 250 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 485 insertions(+), 123 deletions(-) create mode 100644 docs/io-amqp1_0-sink.md create mode 100644 docs/io-amqp1_0-source.md diff --git a/README.md b/README.md index 3aabcfa3..d7fed050 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,10 @@ -## Pulsar IO :: Template +### AMQP1_0 Source Doc -This is a template project for developing an enterprise-grade -pulsar IO connector. +see the docs/io-amqp1_0-source.md -This template already sets up a project structure, including -necessary dependencies and plugins. The IO connector developers -can clone this project to develop their own connector. +### AMQP1_0 Source Doc + +see the docs/io-amqp1_0-sink.md ### Project Layout @@ -37,120 +36,3 @@ how this template project organize the files for a connector. - `src/spotbugs`: store the spotbugs configuration files - `src/main`: for keeping all the main source files - `src/test`: for keeping all the related tests - -### Develop a Connector - -Here are the instructions for developing a Pulsar connector `pulsar-io-foo` -use this project template. - -#### Clone the Template - -You can clone the template project with `--bare`. - -```bash -$ git clone --bare https://github.com/streamnative/pulsar-io-template.git pulsar-io-foo -``` - -#### Push to Github - -You can create a `pulsar-io-foo` project at your Github account and push the project to your -Github account. - -``` -$ cd pulsar-io-foo -$ git push https://github.com//pulsar-io-foo -``` - -Once the project is pushed to your Github account, you can then develop the connector as -a normal Github project. - -```bash -$ cd .. -$ rm -rf pulsar-io-foo -$ git clone https://github.com//pulsar-io-foo -``` - -#### Update Pom File - -The first thing to do is to update the [pom file](pom.xml) to customize your connector. - -1. Change `artifactId` to `pulsar-io-foo`. -2. Update `version` to a version you like. A good practice is to pick the pulsar version - as the same version for your connector so that it is easy to figure out what version of - this connector works for what version of Pulsar. -3. Update `name` to `Pulsar Ecosystem :: IO Connector :: `. -4. Update `description` to the description of your connector. - -Once the above steps are done, you will have a real `pulsar-io-foo` project to develop -your own connector. - -#### Implement Your Connector - -Before you start implementing your own connector, it would be good to remove the example -connector included in the project template. - -```bash -$ rm -rf src/main/java/org/apache/pulsar/ecosystem/io/random -``` - -Then you can create a package `org.apache.pulsar.ecosystem.io.foo` to develop your connector -logic under it. - -#### Test Your Connector - -It is strongly recommended to write tests for your connector. - -There are a few test examples under -[src/test/java/org/apache/pulsar/ecosystem/io/random](src/test/java/org/apache/pulsar/ecosystem/io/random) -showing how to test a connector. - -Before you start writing tests for your connector, you can remove those examples - -```bash -$ rm -rf src/test/java/org/apache/pulsar/ecosystem/io/random -``` - -Then you can create a package `org.apache.pulsar.ecosystem.io.foo` under `src/test` directory -to develop the tests for your connector. - -#### Checkstyle and Spotbugs - -The template project already sets up checkstyle plugin and spotbugs plugin for ensuring you -write a connector that has a consistent coding convention with other connectors and high code -quality. - -To run checkstyle: - -```bash -$ mvn checkstyle:check -``` - -To run spotbugs: - -```bash -$ mvn spotbugs:check -``` - -#### License - -Before you publish your connector for others to use, you might consider pick up a license -you like to use for your connector. - -Once you choose the license, you should do followings: - -- Replace the `LICENSE` file with your chosen license. -- Add your license header to `src/license/.txt`. -- Update the license-maven-plugin configuration in pom.xml to point to your license header - `src/license/.txt`. -- Run `license:format` to format the project with your license - - - - - - - - - - - diff --git a/docs/io-amqp1_0-sink.md b/docs/io-amqp1_0-sink.md new file mode 100644 index 00000000..13fb17b1 --- /dev/null +++ b/docs/io-amqp1_0-sink.md @@ -0,0 +1,230 @@ +--- +description: The AMQP1_0 sink connector pulls messages from Pulsar topics and persist messages to AMQP broker. +author: ["ASF"] +contributors: ["ASF"] +language: Java +document: +source: "https://github.com/streamnative/pulsar-io-activemq/tree/v2.5.1/src/main/java/org/apache/pulsar/ecosystem/io/activemq" +license: Apache License 2.0 +tags: ["Pulsar IO", "AMQP", "Qpid", "JMS", "Sink"] +alias: AMQP1_0 Sink +features: ["Use AMQP1_0 sink connector to sync data from Pulsar"] +license_link: "https://www.apache.org/licenses/LICENSE-2.0" +icon: "https://www.amqp.org/sites/amqp.org/themes/genesis_amqp/images/amqp-logo.png" +download: "https://github.com/streamnative/pulsar-io-activemq/releases/download/v2.5.1/pulsar-io-activemq-2.5.1.nar" +support: StreamNative +support_link: https://streamnative.io +support_img: "/images/connectors/streamnative.png" +dockerfile: +id: "io-amqp1_0-sink" +--- + +The AMQP1_0 sink connector pulls messages from Pulsar topics and persist messages to AMQP broker. + +# Installation + +``` +git clone https://github.com/streamnative/pulsar-io-amqp-1-0.git +cd pulsar-io-amqp-1-0/ +mvn clean install -DskipTests +cp pulsar-io-amqp1_0/target/pulsar-io-amqp1_0-0.0.1.nar $PULSAR_HOME/pulsar-io-amqp1_0-0.0.1.nar +``` + +# Configuration + +The configuration of the AMQP1_0 sink connector has the following properties. + +## AMQP1_0 sink connector configuration + +| Name | Type|Required | Default | Description +|------|----------|----------|---------|-------------| +| `protocol` |String| true | "amqp" | The AMQP protocol. | +| `host` | String| true | " " (empty string) | The AMQP service host. | +| `port` | int |true | 5672 | The AMQP serrvice port. | +| `username` | String|false | " " (empty string) | The username used to authenticate to ActiveMQ. | +| `password` | String|false | " " (empty string) | The password used to authenticate to ActiveMQ. | +| `queue` | String|false | " " (empty string) | The queue name that messages should be read from or written to. | +| `topic` | String|false | " " (empty string) | The topic name that messages should be read from or written to. | +| `activeMessageType` | String|false |0 | The ActiveMQ message simple class name. | + +## Configure AMQP1_0 sink connector + +Before using the AMQP1_0 sink connector, you need to create a configuration file through one of the following methods. + +* JSON + + ```json + { + "tenant": "public", + "namespace": "default", + "name": "amqp1_0-sink", + "inputs": ["user-op-queue-topic"], + "archive": "connectors/pulsar-io-amqp1_0-{version}.nar", + "parallelism": 1, + "configs": + { + "protocol": "amqp", + "host": "localhost", + "port": "5672", + "username": "guest", + "password": "guest", + "queue": "user-op-queue-pulsar" + } + } + ``` + +* YAML + + ```yaml + tenant: "public" + namespace: "default" + name: "amqp1_0-sink" + inputs: + - "user-op-queue-topic" + archive: "connectors/pulsar-io-amqp1_0-{version}.nar" + parallelism: 1 + + configs: + protocol: "amqp" + host: "localhost" + port: "5672" + username: "guest" + password: "guest" + queueName: "user-op-queue-pulsar" + ``` + +# Usage + +1. Prepare AMQP service, use the solace service. + + ``` + docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard + ``` + +2. Put the `pulsar-io-amqp1_0-{version}.nar` in the pulsar connectors directory. + + ``` + cp pulsar-io-amqp1_0-{version}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{version}.nar + ``` + +3. Start Pulsar in standalone mode. + + ``` + $PULSAR_HOME/bin/pulsar standalone + ``` + + found logs like this + ``` + Searching for connectors in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors + Found connector ConnectorDefinition(name=amqp1_0, description=AMQP1_0 source and sink connector, sourceClass=org.apache.pulsar.ecosystem.io.amqp.QpidJmsSource, sinkClass=org.apache.pulsar.ecosystem.io.amqp.QpidJmsSink, sourceConfigClass=null, sinkConfigClass=null) from /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors/pulsar-io-amqp1_0.nar + ``` + +4. Create the AMQP1_0 sink. + + ``` + $PULSAR_HOME/bin/pulsar-admin sources create --source-config-file qpid-source-config.yaml + ``` + + found logs like this + ``` + "Created successfully" + ``` + + get sinks list + ``` + $PULSAR_HOME/bin/pulsar-admin sinks list + ``` + + found logs like this + ``` + [ + "amqp1_0-sink" + ] + ``` + + check sink status + ``` + $PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink + ``` + + found logs like this + ``` + "numInstances" : 1, + "numRunning" : 1, + "instances" : [ { + "instanceId" : 0, + "status" : { + "running" : true, + "error" : "", + "numRestarts" : 0, + "numReadFromPulsar" : 0, + "numSystemExceptions" : 0, + "latestSystemExceptions" : [ ], + "numSinkExceptions" : 0, + "latestSinkExceptions" : [ ], + "numWrittenToSink" : 0, + "lastReceivedTime" : 0, + "workerId" : "c-standalone-fw-localhost-8080" + } + } ] + } + ``` + +5. Send Pulsar messages. + + ``` + $PULSAR_HOME/bin/pulsar-client produce public/default/user-op-queue-topic --messages hello -n 10 + ``` + +6. Consume AMQP messages. + + Use this test method `receiveMessages` to consume AMQP messages. + + ``` + @Test + public void receiveMessages() throws Exception { + ConnectionFactory connectionFactory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672"); + @Cleanup + Connection connection = connectionFactory.createConnection(); + connection.start(); + @Cleanup + Session session = connection.createSession(); + @Cleanup + MessageConsumer consumer = session.createConsumer(new JmsQueue("user-op-queue-pulsar")); + for (int i = 0; i < 10; i++) { + JmsTextMessage textMessage = (JmsTextMessage) consumer.receive(); + System.out.println("receive msg content: " + textMessage.getText()); + textMessage.acknowledge(); + } + } + ``` + +7. Check the sink status. + + ``` + $PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink + ``` + + found logs like this + ``` + { + "numInstances" : 1, + "numRunning" : 1, + "instances" : [ { + "instanceId" : 0, + "status" : { + "running" : true, + "error" : "", + "numRestarts" : 0, + "numReadFromPulsar" : 10, + "numSystemExceptions" : 0, + "latestSystemExceptions" : [ ], + "numSinkExceptions" : 0, + "latestSinkExceptions" : [ ], + "numWrittenToSink" : 10, + "lastReceivedTime" : 1615192471713, + "workerId" : "c-standalone-fw-localhost-8080" + } + } ] + } + ``` diff --git a/docs/io-amqp1_0-source.md b/docs/io-amqp1_0-source.md new file mode 100644 index 00000000..c3b692cd --- /dev/null +++ b/docs/io-amqp1_0-source.md @@ -0,0 +1,250 @@ +--- +description: The AMQP1_0 source connector receives messages from AMQP service and writes messages to Pulsar topics. +author: ["ASF"] +contributors: ["ASF"] +language: Java +document: +source: "https://github.com/streamnative/pulsar-io-amqp1_0/tree/v2.5.1/src/main/java/org/apache/pulsar/ecosystem/io/amqp1_0" +license: Apache License 2.0 +tags: ["Pulsar IO", "AMQP", "Qpid", "JMS", "Source"] +alias: AMQP1_0 source +features: ["Use AMQP1_0 source connector to sync data to Pulsar"] +license_link: "https://www.apache.org/licenses/LICENSE-2.0" +icon: "https://www.amqp.org/sites/amqp.org/themes/genesis_amqp/images/amqp-logo.png" +download: "https://github.com/streamnative/pulsar-io-amqp1_0/releases/download/v2.5.1/pulsar-io-amqp1_0-2.5.1.nar" +support: StreamNative +support_link: https://streamnative.io +support_img: "/images/connectors/streamnative.png" +dockerfile: +id: "io-amqp1_0-source" +--- + +The AMQP1_0 source connector receives messages from AMQP service and writes messages to Pulsar topics. + +# Installation + +``` +git clone https://github.com/streamnative/pulsar-io-amqp-1-0.git +cd pulsar-io-amqp-1-0/ +mvn clean install -DskipTests +cp pulsar-io-amqp1_0/target/pulsar-io-amqp1_0-0.0.1.nar $PULSAR_HOME/pulsar-io-amqp1_0-0.0.1.nar +``` + +# Configuration + +The configuration of the AMQP1_0 source connector has the following properties. + +## AMQP1_0 source connector configuration + +| Name | Type|Required | Default | Description +|------|----------|----------|---------|-------------| +| `protocol` |String| true | "amqp" | The AMQP protocol. | +| `host` | String| true | " " (empty string) | The AMQP service host. | +| `port` | int |true | 5672 | The AMQP service port. | +| `username` | String|false | " " (empty string) | The username used to authenticate to AMQP1_0. | +| `password` | String|false | " " (empty string) | The password used to authenticate to AMQP1_0. | +| `queue` | String|false | " " (empty string) | The queue name that messages should be read from or written to. | +| `topic` | String|false | " " (empty string) | The topic name that messages should be read from or written to. | + +## Configure AMQP1_0 source connector + +Before using the AMQP1_0 source connector, you need to create a configuration file through one of the following methods. + +* JSON + + ```json + { + "tenant": "public", + "namespace": "default", + "name": "amqp1_0-source", + "topicName": "user-op-queue-topic", + "archive": "connectors/pulsar-io-amqp1_0-{version}.nar", + "parallelism": 1, + "configs": { + "protocol": "amqp", + "host": "localhost", + "port": "5672", + "username": "guest", + "password": "guest", + "queue": "user-op-queue" + } + } + ``` + +* YAML + + ```yaml + tenant: "public" + namespace: "default" + name: "amqp1_0-source" + topicName: "user-op-queue-topic" + archive: "connectors/pulsar-io-amqp1_0-{version}.nar" + parallelism: 1 + + configs: + protocol: "amqp" + host: "localhost" + port: "5672" + username: "guest" + password: "guest" + queue: "user-op-queue" + ``` + +1. Prepare AMQP service, use the solace service. + + ``` + docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard + ``` + +2. Put the `pulsar-io-amqp1_0-{version}.nar` in the pulsar connectors directory. + + ``` + cp pulsar-io-amqp1_0-{version}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{version}.nar + ``` + +3. Start Pulsar in standalone mode. + + ``` + $PULSAR_HOME/bin/pulsar standalone + ``` + + found logs like this + ``` + Searching for connectors in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors + Found connector ConnectorDefinition(name=amqp1_0, description=AMQP1_0 source and sink connector, sourceClass=org.apache.pulsar.ecosystem.io.amqp.QpidJmsSource, sinkClass=org.apache.pulsar.ecosystem.io.amqp.QpidJmsSink, sourceConfigClass=null, sinkConfigClass=null) from /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors/pulsar-io-amqp1_0.nar + ``` + +4. Create the AMQP1_0 source. + + ``` + $PULSAR_HOME/bin/pulsar-admin sources create --source-config-file qpid-source-config.yaml + ``` + + found logs like this + ``` + "Created successfully" + ``` + + get sinks list + ``` + $PULSAR_HOME/bin/pulsar-admin sources list + ``` + + found logs like this + ``` + [ + "amqp1_0-source" + ] + ``` + + check sink status + ``` + $PULSAR_HOME/bin/pulsar-admin sources status --name amqp1_0-source + ``` + + found logs like this + ``` + { + "numInstances" : 1, + "numRunning" : 1, + "instances" : [ { + "instanceId" : 0, + "status" : { + "running" : true, + "error" : "", + "numRestarts" : 0, + "numReceivedFromSource" : 0, + "numSystemExceptions" : 0, + "latestSystemExceptions" : [ ], + "numSourceExceptions" : 0, + "latestSourceExceptions" : [ ], + "numWritten" : 0, + "lastReceivedTime" : 0, + "workerId" : "c-standalone-fw-localhost-8080" + } + } ] + } + ``` + +5. Consume Pulsar messages. + + ``` + $PULSAR_HOME/bin/pulsar-client consume -s "test" public/default/user-op-queue-topic -n 10 + ``` + +6. Send AMQP1_0 messages. + + Use the test method `sendMessage` to send AMQP1_0 messages. + + ``` + @Test + public void sendMessage() throws Exception { + ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672"); + + @Cleanup + Connection connection = connectionFactory.createConnection(); + connection.start(); + + JMSProducer producer = connectionFactory.createContext().createProducer(); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + Destination destination = new JmsQueue("user-op-queue"); + + for (int i = 0; i < 10; i++) { + producer.send(destination, "Hello AMQP1_0 - " + i); + } + } + ``` + +7. Check the sink status. + + ``` + $PULSAR_HOME/bin/pulsar-admin sources status --name amqp1_0-source + ``` + + found logs like this + ``` + { + "numInstances" : 1, + "numRunning" : 1, + "instances" : [ { + "instanceId" : 0, + "status" : { + "running" : true, + "error" : "", + "numRestarts" : 0, + "numReceivedFromSource" : 10, + "numSystemExceptions" : 0, + "latestSystemExceptions" : [ ], + "numSourceExceptions" : 0, + "latestSourceExceptions" : [ ], + "numWritten" : 10, + "lastReceivedTime" : 1615194014874, + "workerId" : "c-standalone-fw-localhost-8080" + } + } ] + } + ``` + + found the pulsar consumer receive the messages. + ``` + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 0 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 1 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 2 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 3 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 4 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 5 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 6 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 7 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 8 + ----- got message ----- + key:[null], properties:[], content:Hello AMQP1_0 - 9 + ``` \ No newline at end of file