diff --git a/docs/amqp-1-0-sink.md b/docs/amqp-1-0-sink.md index 775e6906..3d2ceeb4 100644 --- a/docs/amqp-1-0-sink.md +++ b/docs/amqp-1-0-sink.md @@ -1,294 +1,71 @@ --- dockerfile: "https://hub.docker.com/r/streamnative/pulsar-io-amqp-1-0" -alias: AMQP1_0 Sink Connector +alias: AMQP 1.0 Sink Connector --- -# AMQP1_0 sink connector +# AMQP 1.0 sink connector -The AMQP1_0 sink connector pulls messages from Pulsar topics and persists messages to [AMQP 1.0](https://www.amqp.org/). +The AMQP 1.0 sink connector pulls messages from Pulsar topics and persists messages to [AMQP 1.0](https://www.amqp.org/). ![](/docs/amqp-1-0-sink.png) -# How to get +## Quick start -You can get the AMQP1_0 sink connector using one of the following methods. +### 1. Start AMQP 1.0 service -## Use it with Function Worker - -- Download the NAR package from [here](https://github.com/streamnative/pulsar-io-amqp-1-0/releases/download/v{{connector:version}}/pulsar-io-amqp1_0-{{connector:version}}.nar). - -- Build it from the source code. - - 1. Clone the source code to your machine. - - ```bash - git clone https://github.com/streamnative/pulsar-io-amqp-1-0 - ``` - - 2. Assume that `PULSAR_IO_AMQP1_0_HOME` is the home directory for the `pulsar-io-amqp1_0` repo. Build the connector in the `${PULSAR_IO_AMQP1_0_HOME}` directory. - - ```bash - mvn clean install -DskipTests - ``` - - After the connector is successfully built, a `NAR` package is generated under the `target` directory. - - ```bash - ls pulsar-io-amqp1_0/target - pulsar-io-amqp1_0-{{connector:version}}.nar - ``` - -## Use it with Function Mesh - -Pull the AMQP1_0 connector Docker image from [here](https://hub.docker.com/r/streamnative/pulsar-io-amqp-1-0). - - -# How to configure - -Before using the AMQP1_0 sink connector, you need to configure it. - -You can create a configuration file (JSON or YAML) to set the following properties. - -| Name | Type|Required | Default | Description -|------|----------|----------|---------|-------------| -| `protocol` |String| true | "amqp" | The AMQP protocol. | -| `host` | String| true | " " (empty string) | The AMQP service host. | -| `port` | int |true | 5672 | The AMQP service port. | -| `username` | String|false | " " (empty string) | The username used to authenticate to ActiveMQ. | -| `password` | String|false | " " (empty string) | The password used to authenticate to ActiveMQ. | -| `queue` | String|false | " " (empty string) | The queue name that messages should be read from or written to. | -| `topic` | String|false | " " (empty string) | The topic name that messages should be read from or written to. | -| `activeMessageType` | String|false |0 | The ActiveMQ message simple class name. | -| `onlyTextMessage` | boolean | false | false | If it is set to `true`, the AMQP message type must be set to `TextMessage`. Pulsar consumers can consume the messages with schema ByteBuffer. | - -## Configure it with Function Worker - -You can create a configuration file (JSON or YAML) to set the properties as below. - -**Example** - -* JSON - - ```json - { - "tenant": "public", - "namespace": "default", - "name": "amqp1_0-sink", - "inputs": ["user-op-queue-topic"], - "archive": "connectors/pulsar-io-amqp1_0-{{connector:version}}.nar", - "parallelism": 1, - "configs": - { - "protocol": "amqp", - "host": "localhost", - "port": "5672", - "username": "guest", - "password": "guest", - "queue": "user-op-queue-pulsar" - } - } - -* YAML - - ```yaml - tenant: "public" - namespace: "default" - name: "amqp1_0-sink" - inputs: - - "user-op-queue-topic" - archive: "connectors/pulsar-io-amqp1_0-{{connector:version}}.nar" - parallelism: 1 - - configs: - protocol: "amqp" - host: "localhost" - port: "5672" - username: "guest" - password: "guest" - queue: "user-op-queue-pulsar" - ``` - -## Configure it with Function Mesh - -You can submit a [CustomResourceDefinitions (CRD)](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) to create an AMQP1_0 sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar source CRD configurations, see [here](https://functionmesh.io/docs/connectors/io-crd-config/source-crd-config). - -You can define a CRD file (YAML) to set the properties as below. - -```yaml -apiVersion: compute.functionmesh.io/v1alpha1 -kind: Sink -metadata: - name: amqp-sink-sample -spec: - image: streamnative/pulsar-io-amqp-1-0:{{connector:version}} - className: org.apache.pulsar.ecosystem.io.amqp.AmqpSink - replicas: 1 - input: - topics: - - persistent://public/default/user-op-queue-topic - typeClassName: “java.nio.ByteBuffer” - customSchemaSources: - “persistent://public/default/user-op-queue-topic”: “org.apache.pulsar.client.impl.schema.ByteBufferSchema” - sinkConfig: - protocol: "amqp" - host: "localhost" - port: "5672" - username: "guest" - password: "guest" - queue: "user-op-queue-pulsar" - pulsar: - pulsarConfig: "test-pulsar-sink-config" - resources: - limits: - cpu: "0.2" - memory: 1.1G - requests: - cpu: "0.1" - memory: 1G - java: - jar: connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - clusterName: test-pulsar - autoAck: true +Start a service that supports the AMQP 1.0 protocol, such as [Solace](https://docs.solace.com/index.html). +```bash +docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard ``` -# How to use - -You can use the AMQP1_0 sink connector with Function Worker or Function Mesh. - -## Use it with Function Worker +### 2. Create a connector -You can use the AMQP1_0 source connector as a non built-in connector or a built-in connector. +Depending on the environment, there are several ways to create an AMQP 1.0 sink connector: -### Use it as non built-in connector +- [Create Connector on StreamNative Cloud](https://docs.streamnative.io/docs/connector-create). +- [Create Connector with Function worker](https://pulsar.apache.org/docs/3.0.x/io-quickstart/). + Using this way requires you to download a **NAR** package to create a built-in or non-built-in connector. You can download the version you need from [here](https://github.com/streamnative/pulsar-io-amqp-1-0/releases). +- [Create Connector with Function mesh](https://functionmesh.io/docs/connectors/run-connector). + Using this way requires you to set the docker image. You can choose the version you want to launch from [here](https://hub.docker.com/r/streamnative/pulsar-io-amqp-1-0/tags) -If you already have a Pulsar cluster, you can use the AMQP1_0 sink connector as a non built-in connector directly. +No matter how you create an AMQP 1.0 sink connector, the minimum configuration contains the following parameters. -This example shows how to create an AMQP1_0 sink connector on a Pulsar cluster using the command [`pulsar-admin sinks create`](http://pulsar.apache.org/tools/pulsar-admin/2.8.0-SNAPSHOT/#-em-create-em--24). - -``` -PULSAR_HOME/bin/pulsar-admin sinks create \ ---name amqp1_0-sink \ ---archive pulsar-io-amqp1_0-{{connector:version}}.nar \ ---classname org.apache.pulsar.ecosystem.io.amqp.AmqpSink \ ---sink-config-file amqp-sink-config.yaml +```yaml + configs: + connection: + failover: + useFailover: true + uris: + - protocol: amqp + host: localhost + port: 5672 + urlOptions: + - transport.tcpKeepAlive=true + username: guest + password: guest + queue: user-op-queue-pulsar ``` -### Use it as built-in connector - -You can make the AMQP1_0 sink connector as a built-in connector and use it on a standalone cluster or on-premises cluster. - -#### Standalone cluster - -This example describes how to use the AMQP1_0 sink connector to pull data from Pulsar topics and persist data to AMQP in standalone mode. - -1. Prepare AMQP service using Solace. - - ``` - docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard - ``` - -2. Copy the NAR package of the AMQP1_0 sink connector to the Pulsar connectors directory. - - ``` - cp pulsar-io-amqp1_0-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - ``` - -3. Start Pulsar in standalone mode. - - **Input** - - ``` - PULSAR_HOME/bin/pulsar standalone - ``` - - **Output** - - You can find the similar logs below. - - ``` - Searching for connectors in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors - Found connector ConnectorDefinition(name=amqp1_0, description=AMQP1_0 source and AMQP1_0 connector, sourceClass=org.apache.pulsar.ecosystem.io.amqp.AmqpSource, sinkClass=org.apache.pulsar.ecosystem.io.amqp.AmqpSink, sourceConfigClass=null, sinkConfigClass=null) from /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - Searching for functions in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./functions - ``` - -4. Create an AMQP1_0 sink. +> * The configuration structure varies depending on how you create the AMQP 1.0 sink connector. + > For example, some are **JSON**, some are **YAML**, and some are **Kubernetes YAML**. You need to adapt the configs to the corresponding format. +> +> * If you want to configure more parameters, see [Configuration Properties](#configuration-properties) for reference. - **Input** +### 2. Send messages to the topic - ``` - PULSAR_HOME/bin/pulsar-admin sinks create \ - --sink-config-file amqp-sink-config.yaml \ - --custom-schema-inputs '{"user-op-queue-topic": "org.apache.pulsar.client.impl.schema.ByteBufferSchema"}' - ``` +{% callout title="Note" type="note" %} +- If your connector is created on StreamNative Cloud, you need to authenticate your clients. See [Build applications using Pulsar clients](https://docs.streamnative.io/docs/qs-connect#jumpstart-for-beginners) for more information. +- The following sample code uses the **Apache qpid** library. +{% /callout %} - **Output** - - ``` - "Created successfully" - ``` - - Verify whether the sink is created successfully or not. - - **Input** - - ``` - PULSAR_HOME/bin/pulsar-admin sinks list - ``` - - **Output** - - ``` - [ - "amqp1_0-sink" - ] - ``` - - Check the sink status. - - **Input** - - ``` - PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink - ``` - - **Output** - - ``` - "numInstances" : 1, - "numRunning" : 1, - "instances" : [ { - "instanceId" : 0, - "status" : { - "running" : true, - "error" : "", - "numRestarts" : 0, - "numReadFromPulsar" : 0, - "numSystemExceptions" : 0, - "latestSystemExceptions" : [ ], - "numSinkExceptions" : 0, - "latestSinkExceptions" : [ ], - "numWrittenToSink" : 0, - "lastReceivedTime" : 0, - "workerId" : "c-standalone-fw-localhost-8080" - } - } ] - } - ``` - -5. Send messages to Pulsar topics. - - ``` - @Test - public void generateMessages() throws Exception { - @Cleanup - PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); - @Cleanup +``` java + public static void main(String[] args) { + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("{{Your Pulsar URL}}").build(); Producer producer = pulsarClient.newProducer(Schema.BYTEBUFFER) - .topic("user-op-queue-topic") + .topic("{{The topic name that you specified when you created the connector}}") .create(); JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(); - @Cleanup JMSContext jmsContext = jmsConnectionFactory.createContext(); for (int i = 0; i < 10; i++) { @@ -297,180 +74,71 @@ This example describes how to use the AMQP1_0 sink connector to pull data from P producer.send(byteBuf.nioBuffer()); } System.out.println("finish send messages."); + jmsContext.close(); + pulsarClient.close(); } - ``` - -6. Consume messages from AMQP service using the `receiveMessages` method. +``` - **Input** +### 3. Consume data from AMQP 1.0 service - ``` - @Test - public void receiveMessages() throws Exception { +``` java + public static void main(String[] args) { ConnectionFactory connectionFactory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672"); - @Cleanup Connection connection = connectionFactory.createConnection(); connection.start(); - @Cleanup Session session = connection.createSession(); - @Cleanup MessageConsumer consumer = session.createConsumer(new JmsQueue("user-op-queue-pulsar")); for (int i = 0; i < 10; i++) { JmsTextMessage textMessage = (JmsTextMessage) consumer.receive(); System.out.println("receive msg content: " + textMessage.getText()); textMessage.acknowledge(); } + consumer.close(); + session.close(); + connection.close(); } - ``` - -Check the sink status. - - **Input** - - ``` - PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink - ``` - - **Output** - - ``` - { - "numInstances" : 1, - "numRunning" : 1, - "instances" : [ { - "instanceId" : 0, - "status" : { - "running" : true, - "error" : "", - "numRestarts" : 0, - "numReadFromPulsar" : 10, - "numSystemExceptions" : 0, - "latestSystemExceptions" : [ ], - "numSinkExceptions" : 0, - "latestSinkExceptions" : [ ], - "numWrittenToSink" : 10, - "lastReceivedTime" : 1615192471713, - "workerId" : "c-standalone-fw-localhost-8080" - } - } ] - } - ``` - -#### On-premise cluster - -This example explains how to create an AMQP1_0 sink connector on an on-premises cluster. - -1. Copy the NAR package of the AMQP1_0 connector to the Pulsar connectors directory. - - ``` - cp pulsar-io-amqp1_0-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - ``` - -2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/). - - ``` - PULSAR_HOME/bin/pulsar-admin sources reload - ``` - -3. Check whether the AMQP1_0 sink connector is available on the list or not. - - ``` - PULSAR_HOME/bin/pulsar-admin sinks available-sinks - ``` - -4. Create an AMQP1_0 sink connector on a Pulsar cluster using the [`pulsar-admin sinks create`](http://pulsar.apache.org/tools/pulsar-admin/2.8.0-SNAPSHOT/#-em-create-em--24) command. - - ``` - PULSAR_HOME/bin/pulsar-admin sinks create \ - --sink-type amqp1_0 \ - --sink-config-file amqp-sink-config.yaml \ - --name amqp1_0-sink - ``` - -## Use it with Function Mesh - -This example demonstrates how to create an AMQP1_0 sink connector through Function Mesh. - -### Prerequisites - -- Create and connect to a [Kubernetes cluster](https://kubernetes.io/). - -- Create a [Pulsar cluster](https://pulsar.apache.org/docs/en/kubernetes-helm/) in the Kubernetes cluster. - -- [Install the Function Mesh Operator and CRD](https://functionmesh.io/docs/install-function-mesh/) into the Kubernetes cluster. - -- Prepare AMQP service. - -### Step - -1. Define the AMQP1_0 sink connector with a YAML file and save it as `sink-sample.yaml`. - - This example shows how to publish the AMQP1_0 sink connector to Function Mesh with a Docker image. - - ```yaml - apiVersion: compute.functionmesh.io/v1alpha1 - kind: Sink - metadata: - name: amqp-sink-sample - spec: - image: streamnative/pulsar-io-amqp-1-0:{{connector:version}} - className: org.apache.pulsar.ecosystem.io.amqp.AmqpSink - replicas: 1 - input: - topics: - - persistent://public/default/user-op-queue-topic - typeClassName: “java.nio.ByteBuffer” - customSchemaSources: - “persistent://public/default/user-op-queue-topic”: “org.apache.pulsar.client.impl.schema.ByteBufferSchema” - sinkConfig: - protocol: "amqp" - host: "localhost" - port: "5672" - username: "guest" - password: "guest" - queue: "user-op-queue-pulsar" - pulsar: - pulsarConfig: "test-pulsar-sink-config" - resources: - limits: - cpu: "0.2" - memory: 1.1G - requests: - cpu: "0.1" - memory: 1G - java: - jar: connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - clusterName: test-pulsar - autoAck: true - ``` - -2. Apply the YAML file to create the AMQP1_0 sink connector. - - **Input** - - ``` - kubectl apply -f - ``` - - **Output** - - ``` - sink.compute.functionmesh.io/amqp-sink-sample created - ``` - -3. Check whether the AMQP1_0 sink connector is created successfully. +``` - **Input** +## Configuration Properties - ``` - kubectl get all - ``` +Before using the AMQP 1.0 sink connector, you need to configure it. - **Output** +You can create a configuration file (JSON or YAML) to set the following properties. - ``` - NAME READY STATUS RESTARTS AGE - pod/amqp-sink-sample-0 1/1 Running 0 77s - ``` +| Name | Type | Required | Default | Description | +|---------------------|------------|----------------------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------| +| `protocol` | String | required if connection is not used | "amqp" | [deprecated: use connection instead] The AMQP protocol. | +| `host` | String | required if connection is not used | " " (empty string) | [deprecated: use connection instead] The AMQP service host. | +| `port` | int | required if connection is not used | 5672 | [deprecated: use connection instead] The AMQP service port. | +| `connection` | Connection | required if protocol, host, port is not used | " " (empty string) | The connection details. | +| `username` | String | false | " " (empty string) | The username used to authenticate to ActiveMQ. | +| `password` | String | false | " " (empty string) | The password used to authenticate to ActiveMQ. | +| `queue` | String | false | " " (empty string) | The queue name that messages should be read from or written to. | +| `topic` | String | false | " " (empty string) | The topic name that messages should be read from or written to. | +| `activeMessageType` | String | false | 0 | The ActiveMQ message simple class name. | +| `onlyTextMessage` | boolean | false | false | If it is set to `true`, the AMQP message type must be set to `TextMessage`. Pulsar consumers can consume the messages with schema ByteBuffer. | + +A `Connection` object can be specified as follows: + +| Name | Type | Required | Default | Description | +|------------|-----------------------|----------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `failover` | Failover | false | " " (empty string) | The configuration for a failover connection. | +| `uris` | list of ConnectionUri | true | " " (empty string) | A list of ConnectionUri objects. When useFailover is set to true 1 or more should be provided. Currently only 1 uri is supported when useFailover is set to false | + +A `Failover` object can be specified as follows: + +| Name | Type | Required | Default | Description | +|--------------------------------|----------------|--------------------------------------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `useFailover` | boolean | true | false | If it is set to true, the connection will be created from the uris provided under uris, using qpid's failover connection factory. | +| `jmsClientId` | String | required if failoverConfigurationOptions is used | " " (empty string) | Identifying name for the jms Client | +| `failoverConfigurationOptions` | List of String | required if jmsClientId is used | " " (empty string) | A list of options (e.g. ). The options wil be joined using an '&', prefixed with a the jmsClientId and added to the end of the failoverUri. see also: https://qpid.apache.org/releases/qpid-jms-2.2.0/docs/index.html#failover-configuration-options | + +A `ConnectionUri` object can be specified as follows: + +| Name | Type | Required | Default | Description | +|--------------|----------------|----------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------| +| `protocol` | String | true | " " (empty string) | The AMQP protocol. | +| `host` | String | true | " " (empty string) | The AMQP service host. | +| `port` | int | true | 0 | The AMQP service port. | +| `urlOptions` | List of String | false | " " (empty string) | A list of url-options (e.g. ). The url options wil be joined using an '&', prefixed with a '?' and added to the end of the uri | - After that, you can produce and consume messages using the AMQP1_0 sink connector between Pulsar and AMQP1_0. diff --git a/docs/amqp-1-0-source.md b/docs/amqp-1-0-source.md index 27da9251..05b6916f 100644 --- a/docs/amqp-1-0-source.md +++ b/docs/amqp-1-0-source.md @@ -1,459 +1,130 @@ --- dockerfile: "https://hub.docker.com/r/streamnative/pulsar-io-amqp-1-0" -alias: AMQP1_0 Source Connector +alias: AMQP 1.0 Source Connector --- -# AMQP1_0 source connector +# AMQP 1.0 source connector -The AMQP1_0 source connector receives messages from [AMQP 1.0](https://www.amqp.org/) and writes messages to Pulsar topics. +The AMQP 1.0 source connector receives messages from [AMQP 1.0](https://www.amqp.org/) and writes messages to Pulsar topics. ![](/docs/amqp-1-0-source.png) -# How to get +## Quick start -You can get the AMQP1_0 source connector using one of the following methods. +### 1. Start AMQP 1.0 service -## Use it with Function Worker - -- Download the NAR package from [here](https://github.com/streamnative/pulsar-io-amqp-1-0/releases/download/v{{connector:version}}/pulsar-io-amqp1_0-{{connector:version}}.nar). - -- Build it from the source code. - - 1. Clone the source code to your machine. - - - ```bash - git clone https://github.com/streamnative/pulsar-io-amqp-1-0 - ``` - - 2. Assume that `PULSAR_IO_AMQP1_0_HOME` is the home directory for the `pulsar-io-amqp1_0` repo. Build the connector in the `${PULSAR_IO_AMQP1_0_HOME}` directory. - - ```bash - mvn clean install -DskipTests - ``` - - After the connector is successfully built, a `NAR` package is generated under the `target` directory. - - ```bash - ls pulsar-io-amqp1_0/target - pulsar-io-amqp1_0-{{connector:version}}.nar - ``` - -## Use it with Function Mesh - -Pull the AMQP1_0 connector Docker image from [here](https://hub.docker.com/r/streamnative/pulsar-io-amqp-1-0). - -# How to configure - -Before using the AMQP1_0 source connector, you need to configure it. - -You can create a configuration file (JSON or YAML) to set the following properties. - -| Name | Type|Required | Default | Description -|------|----------|----------|---------|-------------| -| `protocol` |String| true | "amqp" | The AMQP protocol. | -| `host` | String| true | " " (empty string) | The AMQP service host. | -| `port` | int |true | 5672 | The AMQP service port. | -| `username` | String|false | " " (empty string) | The username used to authenticate to AMQP1_0. | -| `password` | String|false | " " (empty string) | The password used to authenticate to AMQP1_0. | -| `queue` | String|false | " " (empty string) | The queue name that messages should be read from or written to. | -| `topic` | String|false | " " (empty string) | The topic name that messages should be read from or written to. | -| `onlyTextMessage` | boolean | false | false | If it is set to `true`, the AMQP message type must be set to `TextMessage`. Pulsar consumers can consume the messages with schema ByteBuffer. | - -## Configure it with Function Worker - -You can create a configuration file (JSON or YAML) to set the properties as below. - -**Example** - -* JSON - - ```json - { - "tenant": "public", - "namespace": "default", - "name": "amqp1_0-source", - "topicName": "user-op-queue-topic", - "archive": "connectors/pulsar-io-amqp1_0-{{connector:version}}.nar", - "parallelism": 1, - "configs": { - "protocol": "amqp", - "host": "localhost", - "port": "5672", - "username": "guest", - "password": "guest", - "queue": "user-op-queue" - } - } - ``` - -* YAML - - ```yaml - tenant: "public" - namespace: "default" - name: "amqp1_0-source" - topicName: "user-op-queue-topic" - archive: "connectors/pulsar-io-amqp1_0-{{connector:version}}.nar" - parallelism: 1 - - configs: - protocol: "amqp" - host: "localhost" - port: "5672" - username: "guest" - password: "guest" - queue: "user-op-queue" - ``` - -## Configure it with Function Mesh - -You can submit a [CustomResourceDefinitions (CRD)](https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/) to create an AMQP1_0 source connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar source CRD configurations, see [here](https://functionmesh.io/docs/connectors/io-crd-config/source-crd-config). - -You can define a CRD file (YAML) to set the properties as below. - -```yaml -apiVersion: compute.functionmesh.io/v1alpha1 -kind: Source -metadata: - name: amqp-source-sample -spec: - image: streamnative/pulsar-io-amqp-1-0:{{connector:version}} - className: org.apache.pulsar.ecosystem.io.amqp.AmqpSource - replicas: 1 - output: - topic: persistent://public/default/user-op-queue-topic - typeClassName: “java.nio.ByteBuffer” - sourceConfig: - protocol: "amqp" - host: "localhost" - port: "5672" - username: "guest" - password: "guest" - queue: "user-op-queue" - pulsar: - pulsarConfig: "test-pulsar-source-config" - resources: - limits: - cpu: "0.2" - memory: 1.1G - requests: - cpu: "0.1" - memory: 1G - java: - jar: connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - clusterName: test-pulsar +Start a service that supports the AMQP 1.0 protocol, such as [Solace](https://docs.solace.com/index.html). +```bash +docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard ``` -# How to use - -You can use the AMQP1_0 source connector with Function Worker or Function Mesh. - -## Use it with Function Worker - -You can use the AMQP1_0 source connector as a non built-in connector or a built-in connector. +### 2. Create a connector -### Use it as non built-in connector +Depending on the environment, there are several ways to create an AMQP 1.0 sink connector: -If you already have a Pulsar cluster, you can use the AMQP1_0 source connector as a non built-in connector directly. +- [Create Connector on StreamNative Cloud](https://docs.streamnative.io/docs/connector-create). +- [Create Connector with Function worker](https://pulsar.apache.org/docs/3.0.x/io-quickstart/). + Using this way requires you to download a **NAR** package to create a built-in or non-built-in connector. You can download the version you need from [here](https://github.com/streamnative/pulsar-io-amqp-1-0/releases). +- [Create Connector with Function mesh](https://functionmesh.io/docs/connectors/run-connector). + Using this way requires you to set the docker image. You can choose the version you want to launch from [here](https://hub.docker.com/r/streamnative/pulsar-io-amqp-1-0/tags) -This example shows how to create an AMQP1_0 source connector on a Pulsar cluster using the command [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.8.0-SNAPSHOT/#-em-create-em--14). +No matter how you create an AMQP 1.0 sink connector, the minimum configuration contains the following parameters. +```yaml + configs: + connection: + failover: + useFailover: true + uris: + - protocol: amqp + host: localhost + port: 5672 + urlOptions: + - transport.tcpKeepAlive=true + username: guest + password: guest + queue: user-op-queue-pulsar ``` -PULSAR_HOME/bin/pulsar-admin sources create \ ---name amqp1_0-source \ ---archive pulsar-io-amqp1_0-{{connector:version}}.nar \ ---classname org.apache.pulsar.ecosystem.io.amqp.AmqpSource \ ---source-config-file amqp-source-config.yaml -``` - -### Use it as built-in connector - -You can make the AMQP1_0 source connector as a built-in connector and use it on a standalone cluster or on-premises cluster. - -#### Standalone cluster - -This example describes how to use the AMQP1_0 source connector to feed data from AMQP and write data to Pulsar topics in standalone mode. - -1. Prepare AMQP service using Solace. - - ``` - docker run -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard - ``` - -2. Copy the NAR package of the AMQP1_0 source connector to the Pulsar connectors directory. - - ``` - cp pulsar-io-amqp1_0-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - ``` - -3. Start Pulsar in standalone mode. - - **Input** - - ``` - PULSAR_HOME/bin/pulsar standalone - ``` - - **Output** - - You can find the similar logs below. - - ``` - Searching for connectors in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors - Found connector ConnectorDefinition(name=amqp1_0, description=AMQP1_0 source and AMQP1_0 connector, sourceClass=org.apache.pulsar.ecosystem.io.amqp.AmqpSource, sinkClass=org.apache.pulsar.ecosystem.io.amqp.AmqpSink, sourceConfigClass=null, sinkConfigClass=null) from /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - Searching for functions in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./functions - ``` - -4. Create an AMQP1_0 source. - - **Input** - ``` - PULSAR_HOME/bin/pulsar-admin sources create \ - --source-type amqp1_0 \ - --source-config-file amqp-source-config.yaml - ``` +> * The configuration structure varies depending on how you create the AMQP 1.0 sink connector. + > For example, some are **JSON**, some are **YAML**, and some are **Kubernetes YAML**. You need to adapt the configs to the corresponding format. +> +> * If you want to configure more parameters, see [Configuration Properties](#configuration-properties) for reference. - **Output** - - ``` - "Created successfully" - ``` +### 2. Send messages to the AMQP 1.0 service - Verify whether the source is created successfully or not. +{% callout title="Note" type="note" %} +- The following sample code uses the **Apache qpid** library. +{% /callout %} - **Input** - ``` - PULSAR_HOME/bin/pulsar-admin sources list - ``` - - **Output** - - ``` - [ - "amqp1_0-source" - ] - ``` - - Check the source status. - - **Input** - - ``` - PULSAR_HOME/bin/pulsar-admin sources status --name amqp1_0-source - ``` - - **Output** - - ``` - { - "numInstances" : 1, - "numRunning" : 1, - "instances" : [ { - "instanceId" : 0, - "status" : { - "running" : true, - "error" : "", - "numRestarts" : 0, - "numReceivedFromSource" : 0, - "numSystemExceptions" : 0, - "latestSystemExceptions" : [ ], - "numSourceExceptions" : 0, - "latestSourceExceptions" : [ ], - "numWritten" : 0, - "lastReceivedTime" : 0, - "workerId" : "c-standalone-fw-localhost-8080" - } - } ] - } - ``` - -5. Consume messages from Pulsar topics. - - ``` - PULSAR_HOME/bin/pulsar-client consume -s "test" public/default/user-op-queue-topic -n 10 - ``` - -6. Send messages to AMQP1_0 service using the method `sendMessage`. - - **Input** - - ``` - @Test - public void sendMessage() throws Exception { +``` java + public static void main(String[] args) { ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:5672"); - @Cleanup Connection connection = connectionFactory.createConnection(); connection.start(); JMSProducer producer = connectionFactory.createContext().createProducer(); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Destination destination = new JmsQueue("user-op-queue"); for (int i = 0; i < 10; i++) { - producer.send(destination, "Hello AMQP1_0 - " + i); + producer.send(destination, "Hello AMQP 1.0 - " + i); } + connection.close(); } - ``` - - Check the source status. - - **Input** - - ``` - PULSAR_HOME/bin/pulsar-admin sources status --name amqp1_0-source - ``` - - **Output** - - The values of `numWritten` and `lastReceivedTime` are changed. - - ``` - { - "numInstances" : 1, - "numRunning" : 1, - "instances" : [ { - "instanceId" : 0, - "status" : { - "running" : true, - "error" : "", - "numRestarts" : 0, - "numReceivedFromSource" : 10, - "numSystemExceptions" : 0, - "latestSystemExceptions" : [ ], - "numSourceExceptions" : 0, - "latestSourceExceptions" : [ ], - "numWritten" : 10, - "lastReceivedTime" : 1615194014874, - "workerId" : "c-standalone-fw-localhost-8080" - } - } ] - } - ``` - - Now you can see the Pulsar consumer receives 10 messages. The message contents are not in regular formats since the message contents contain some extra information about `TextMessage`. - - **Output** - - ``` - ----- got message ----- - key:[null], properties:[], content:Sp�ASr�)�x-opt-jms-msg-typeQ�x-opt-jms-destQSs�\ - �/ID:67e69637-bd24-4ee1-86b7-be89e5a49b7f:1:1:1-1@�queue://user-op-queue@@@@@@�x?|�)Sw� - text str - 0 - ... - ``` - -#### On-premise cluster - -This example explains how to create an AMQP1_0 source connector on an on-premises cluster. - -1. Copy the NAR package of the AMQP1_0 connector to the Pulsar connectors directory. - - ``` - cp pulsar-io-amqp1_0-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - ``` - -2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/). - - ``` - PULSAR_HOME/bin/pulsar-admin sources reload - ``` - -3. Check whether the AMQP1_0 source connector is available on the list or not. - - ``` - PULSAR_HOME/bin/pulsar-admin sources available-sources - ``` - -4. Create an AMQP1_0 source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.8.0-SNAPSHOT/#-em-create-em--14) command. - -``` - PULSAR_HOME/bin/pulsar-admin sources create \ - --source-type amqp1_0 \ - --source-config-file amqp-source-config.yaml \ - --name amqp1_0-source ``` -## Use it with Function Mesh - -This example demonstrates how to create an AMQP1_0 source connector through Function Mesh. - -### Prerequisites - -- Create and connect to a [Kubernetes cluster](https://kubernetes.io/). - -- Create a [Pulsar cluster](https://pulsar.apache.org/docs/en/kubernetes-helm/) in the Kubernetes cluster. +### 3. Consume data from Pulsar -- [Install the Function Mesh Operator and CRD](https://functionmesh.io/docs/install-function-mesh/) into the Kubernetes cluster. - -- Prepare AMQP service. - -### Step - -1. Define the AMQP1_0 source connector with a YAML file and save it as `source-sample.yaml`. - - This example shows how to publish the AMQP1_0 source connector to Function Mesh with a Docker image. - - ```yaml - apiVersion: compute.functionmesh.io/v1alpha1 - kind: Source - metadata: - name: amqp-source-sample - spec: - image: streamnative/pulsar-io-amqp-1-0:{{connector:version}} - className: org.apache.pulsar.ecosystem.io.amqp.AmqpSource - replicas: 1 - output: - topic: persistent://public/default/user-op-queue-topic - typeClassName: “java.nio.ByteBuffer” - sourceConfig: - protocol: "amqp" - host: "localhost" - port: "5672" - username: "guest" - password: "guest" - queue: "user-op-queue" - pulsar: - pulsarConfig: "test-pulsar-source-config" - resources: - limits: - cpu: "0.2" - memory: 1.1G - requests: - cpu: "0.1" - memory: 1G - java: - jar: connectors/pulsar-io-amqp1_0-{{connector:version}}.nar - clusterName: test-pulsar - ``` - -2. Apply the YAML file to create the AMQP1_0 source connector. - - **Input** - - ``` - kubectl apply -f - ``` - - **Output** - - ``` - source.compute.functionmesh.io/amqp-source-sample created - ``` - -3. Check whether the AMQP1_0 source connector is created successfully. +{% callout title="Note" type="note" %} +- If your connector is created on StreamNative Cloud, you need to authenticate your clients. See [Build applications using Pulsar clients](https://docs.streamnative.io/docs/qs-connect#jumpstart-for-beginners) for more information. +{% /callout %} + +``` java +bin/pulsar-client \ +--url "Your Pulsar serviceUrl" \ +consume "The topic that you specified when you created the connector" -s "test-sub" -n 10 -p Earliest +``` - **Input** +## Configuration Properties - ``` - kubectl get all - ``` +Before using the AMQP 1.0 sink connector, you need to configure it. - **Output** +You can create a configuration file (JSON or YAML) to set the following properties. - ``` - NAME READY STATUS RESTARTS AGE - pod/amqp-source-sample-0 1/1 Running 0 77s - ``` +| Name | Type | Required | Default | Description | +|---------------------|------------|----------------------------------------------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------------------| +| `protocol` | String | required if connection is not used | "amqp" | [deprecated: use connection instead] The AMQP protocol. | +| `host` | String | required if connection is not used | " " (empty string) | [deprecated: use connection instead] The AMQP service host. | +| `port` | int | required if connection is not used | 5672 | [deprecated: use connection instead] The AMQP service port. | +| `connection` | Connection | required if protocol, host, port is not used | " " (empty string) | The connection details. | +| `username` | String | false | " " (empty string) | The username used to authenticate to ActiveMQ. | +| `password` | String | false | " " (empty string) | The password used to authenticate to ActiveMQ. | +| `queue` | String | false | " " (empty string) | The queue name that messages should be read from or written to. | +| `topic` | String | false | " " (empty string) | The topic name that messages should be read from or written to. | +| `activeMessageType` | String | false | 0 | The ActiveMQ message simple class name. | +| `onlyTextMessage` | boolean | false | false | If it is set to `true`, the AMQP message type must be set to `TextMessage`. Pulsar consumers can consume the messages with schema ByteBuffer. | + +A `Connection` object can be specified as follows: + +| Name | Type | Required | Default | Description | +|------------|-----------------------|----------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `failover` | Failover | false | " " (empty string) | The configuration for a failover connection. | +| `uris` | list of ConnectionUri | true | " " (empty string) | A list of ConnectionUri objects. When useFailover is set to true 1 or more should be provided. Currently only 1 uri is supported when useFailover is set to false | + +A `Failover` object can be specified as follows: + +| Name | Type | Required | Default | Description | +|--------------------------------|----------------|--------------------------------------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `useFailover` | boolean | true | false | If it is set to true, the connection will be created from the uris provided under uris, using qpid's failover connection factory. | +| `jmsClientId` | String | required if failoverConfigurationOptions is used | " " (empty string) | Identifying name for the jms Client | +| `failoverConfigurationOptions` | List of String | required if jmsClientId is used | " " (empty string) | A list of options (e.g. ). The options wil be joined using an '&', prefixed with a the jmsClientId and added to the end of the failoverUri. see also: https://qpid.apache.org/releases/qpid-jms-2.2.0/docs/index.html#failover-configuration-options | + +A `ConnectionUri` object can be specified as follows: + +| Name | Type | Required | Default | Description | +|--------------|----------------|----------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------| +| `protocol` | String | true | " " (empty string) | The AMQP protocol. | +| `host` | String | true | " " (empty string) | The AMQP service host. | +| `port` | int | true | 0 | The AMQP service port. | +| `urlOptions` | List of String | false | " " (empty string) | A list of url-options (e.g. ). The url options wil be joined using an '&', prefixed with a '?' and added to the end of the uri | - After that, you can produce and consume messages using the AMQP1_0 source connector between Pulsar and AMQP1_0.