Skip to content

Commit

Permalink
Merge branch 'main' of github.com:aklivity/zilla-examples into live-a…
Browse files Browse the repository at this point in the history
…nd-local-quickstarts
  • Loading branch information
vordimous committed Jul 9, 2024
2 parents b1b3515 + a1a3780 commit a936836
Show file tree
Hide file tree
Showing 56 changed files with 1,535 additions and 33 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,12 @@ Options:
## Examples

| Name | Description |
| ------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------- |
|--------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------|
| [asyncapi.mqtt.proxy](asyncapi.mqtt.proxy) | Forwards validated MQTT publish messages and proxies subscribes to an MQTT broker |
| [asyncapi.mqtt.kafka.proxy](asyncapi.mqtt.kafka.proxy) | Forwards MQTT publish messages to Kafka, broadcasting to all subscribed MQTT clients |
| [asyncapi.http.kafka.proxy](asyncapi.http.kafka.proxy) | Correlates HTTP requests and responses over separate Kafka topics |
| [asyncapi.sse.proxy](asyncapi.sse.proxy) | Proxies validated messages delivered by the SSE server |
| [asyncapi.sse.kafka.proxy](asyncapi.sse.kafka.proxy) | Streams messages published to a Kafka topic over SSE |
| [tcp.echo](tcp.echo) | Echoes bytes sent to the TCP server |
| [tcp.reflect](tcp.reflect) | Echoes bytes sent to the TCP server, broadcasting to all TCP clients |
| [tls.echo](tls.echo) | Echoes encrypted bytes sent to the TLS server |
Expand Down Expand Up @@ -90,7 +95,6 @@ Options:
| [amqp.reflect](amqp.reflect) | Echoes messages published to the AMQP server, broadcasting to all receiving AMQP clients |
| [mqtt.kafka.broker](mqtt.kafka.broker) | Forwards MQTT publish messages to Kafka, broadcasting to all subscribed MQTT clients |
| [mqtt.kafka.broker.jwt](mqtt.kafka.broker.jwt) | Forwards MQTT publish messages to Kafka, broadcasting to all subscribed JWT-authorized MQTT clients |
| [mqtt.proxy.asyncapi](mqtt.proxy.asyncapi) | Forwards validated MQTT publish messages and proxies subscribes to an MQTT broker |
| [quickstart](quickstart) | Starts endpoints for all protocols (HTTP, SSE, gRPC, MQTT) |
| [sse.kafka.fanout](sse.kafka.fanout) | Streams messages published to a Kafka topic, applying conflation based on log compaction |
| [sse.proxy.jwt](sse.proxy.jwt) | Proxies messages delivered by the SSE server, enforcing streaming security constraints |
Expand Down
176 changes: 176 additions & 0 deletions asyncapi.http.kafka.proxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# asyncapi.http.kafka.proxy

In this guide, you create Kafka topics and use Zilla to implement the common Petstore example where requests are proxied to Kafka. Zilla is implementing the REST endpoints
defined in an AsyncAPI 3.x spec and proxying them onto Kafka topics defined in an AsyncAPI 3.x spec based on the operations defined in each spec.

## Running locally

This example can be run using Docker compose or Kubernetes. The setup scripts are in the [compose](./docker/compose) and [helm](./k8s/helm) folders respectively and work the same way.

You will need a running kafka broker. To start one locally you will find instructions in the [kafka.broker](../kafka.broker) folder. Alternatively you can use the [redpanda.broker](../redpanda.broker) folder.

### Setup

Whether you chose [compose](./docker/compose) or [helm](./k8s/helm), the `setup.sh` script will:

- create the necessary kafka topics
- create the Petstore API at `http://localhost:7114`

```bash
./setup.sh
```

### Using this example

#### Synchronous APIs
The `/pet` endpoint proxies to Kafka synchronously meaning they will behave like a normal rest endpoint where the message persists on a kafka topics.

Create a pet using the `/pets` endpoint in the implemented API.
```bash
curl -v --location 'http://localhost:7114/pets' \
--header 'Content-Type: application/json' \
--data '{
"name": "Rocky",
"id": 1
}'
```

output:

```
* Trying 127.0.0.1:7114...
* Connected to localhost (127.0.0.1) port 7114 (#0)
> POST /pets HTTP/1.1
> Host: localhost:7114
> User-Agent: curl/7.88.1
> Accept: */*
> Content-Type: application/json
> Content-Length: 32
>
< HTTP/1.1 204 No Content
< Access-Control-Allow-Origin: *
<
```

List all the pets using `GET` request for the `/pets` endpoint
```bash
curl -v --location 'http://localhost:7114/pets' \
--header 'Accept: application/json'
```

output:

```
* Trying 127.0.0.1:7114...
* Connected to localhost (127.0.0.1) port 7114 (#0)
> GET /pets HTTP/1.1
> Host: localhost:7114
> User-Agent: curl/7.88.1
> Accept: application/json
>
< HTTP/1.1 200 OK
< Content-Length: 32
< Content-Type: application/json
< Etag: AQIAAg==
< Access-Control-Allow-Origin: *
< Access-Control-Expose-Headers: *
<
{
"name": "Rocky",
"id": 1
}
```

#### Asynchronous APIs

The `/customer` endpoint is an asynchronous endpoint meaning it will success with a `202 ACCEPTED` response and include a `Location` header that will include the correlation id used in the `/customer;cid={correlationId}` endpoint.

- The [petstore-customers](http://localhost:8080/ui/clusters/localhost/all-topics/petstore-pets/messages) Kafka topic will have all the pending customer object you posted with a `zilla:correlation-id` header on the kafka message.
- The [petstore-verified-customers](http://localhost:8080/ui/clusters/localhost/all-topics/petstore-pets/messages) Kafka topic will have all the verified customers and will need to include a matching `zilla:correlation-id` header to align with the message on the initial topic.

Create a non-verified customer using a `POST` request for the `/customer` endpoint
```bash
curl -v --location --globoff 'http://localhost:7114/customer' \
--header 'Prefer: respond-async' \
--header 'Content-Type: application/json' \
--data '{
"id": 200000,
"username": "fehguy",
"status": "pending",
"address": [
{
"street": "437 Lytton",
"city": "Palo Alto",
"state": "CA",
"zip": "94301"
}
]
}'
```

output:

```
* Trying 127.0.0.1:7114...
* Connected to localhost (127.0.0.1) port 7114 (#0)
> POST /customer HTTP/1.1
> Host: localhost:7114
> User-Agent: curl/7.88.1
> Accept: */*
> Prefer: respond-async
> Content-Type: application/json
> Content-Length: 238
>
< HTTP/1.1 202 Accepted
< Content-Length: 0
< Location: /customer;cid={correlationId}
< Access-Control-Allow-Origin: *
< Access-Control-Expose-Headers: *
<
```

Copy the location, and create a `GET` async request using the location as the path.
Note that the response will not return until you complete the following step to produce the response with `kcat`.

```bash
curl -v --location 'http://localhost:7114/customer;cid={correlationId}' \
--header 'Prefer: wait=1902418' \
--header 'Accept: application/json'
```

output:
```
* Trying 127.0.0.1:7114...
* Connected to localhost (127.0.0.1) port 7114 (#0)
> GET /customer;cid=d9037ed5-a073-4a21-932d-2392a707800f-badc2e5c2abc25956e9d23d897d5db85 HTTP/1.1
> Host: localhost:7114
> User-Agent: curl/7.88.1
> Prefer: wait=1902418
> Accept: application/json
>
< HTTP/1.1 200 OK
< Content-Length: 135
< Access-Control-Allow-Origin: *
<
{"id":200000,"username":"fehguy","status":"approved","address":[{"street":"437 Lytton","city":"Palo Alto","state":"CA","zip":"94301"}]}%
```

Using `kcat` and the copied `correlation-id` produce the correlated message:

```sh
echo '{"id":200000,"username":"fehguy","status":"approved","address":[{"street":"437 Lytton","city":"Palo Alto","state":"CA","zip":"94301"}]}' | \
kcat -P \
-b localhost:29092 \
-k "c234d09b-2fdf-4538-9d31-27c8e2912d4e" \
-t petstore-verified-customers \
-H "zilla:correlation-id={correlationId}"
```


### Teardown

The `teardown.sh` script will remove any resources created.

```bash
./teardown.sh
```
20 changes: 20 additions & 0 deletions asyncapi.http.kafka.proxy/docker/compose/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: '3'
services:
zilla:
image: ghcr.io/aklivity/zilla:${ZILLA_VERSION}
container_name: zilla
pull_policy: always
restart: unless-stopped
ports:
- 7114:7114
environment:
ZILLA_INCUBATOR_ENABLED: "true"
volumes:
- ../../zilla.yaml:/etc/zilla/zilla.yaml
- ../../specs/http-asyncapi.yaml:/etc/zilla/specs/http-asyncapi.yaml
- ../../specs/kafka-asyncapi.yaml:/etc/zilla/specs/kafka-asyncapi.yaml
command: start -v -e

networks:
default:
driver: bridge
28 changes: 28 additions & 0 deletions asyncapi.http.kafka.proxy/docker/compose/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
set -e

NAMESPACE="${NAMESPACE:-zilla-http-kafka-asyncapi-proxy}"
export ZILLA_VERSION="${ZILLA_VERSION:-latest}"
export KAFKA_BROKER="${KAFKA_BROKER:-kafka}"
export KAFKA_BOOTSTRAP_SERVER="${KAFKA_BOOTSTRAP_SERVER:-host.docker.internal:9092}"
export KAFKA_PORT="${KAFKA_PORT:-9092}"
INIT_KAFKA="${INIT_KAFKA:-true}"

# Start or restart Zilla
if [[ -z $(docker-compose -p $NAMESPACE ps -q zilla) ]]; then
echo "==== Running the $NAMESPACE example with $KAFKA_BROKER($KAFKA_BOOTSTRAP_SERVER) ===="
docker-compose -p $NAMESPACE up -d

# Create topics in Kafka
if [[ $INIT_KAFKA == true ]]; then
docker run --rm bitnami/kafka:3.2 bash -c "
echo 'Creating topics for $KAFKA_BOOTSTRAP_SERVER'
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --create --if-not-exists --topic petstore-pets --config cleanup.policy=compact
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --create --if-not-exists --topic petstore-customers --config cleanup.policy=compact
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --create --if-not-exists --topic petstore-verified-customers --config cleanup.policy=compact
"
fi

else
docker-compose -p $NAMESPACE restart --no-deps zilla
fi
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
set -e

NAMESPACE="${NAMESPACE:-zilla-mqtt-kafka-asyncapi-proxy}"
NAMESPACE="${NAMESPACE:-zilla-asyncapi-http-kafka-proxy}"
docker-compose -p $NAMESPACE down --remove-orphans
38 changes: 38 additions & 0 deletions asyncapi.http.kafka.proxy/k8s/helm/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash
set -e

ZILLA_VERSION="${ZILLA_VERSION:-^0.9.0}"
NAMESPACE="${NAMESPACE:-zilla-asyncapi-http-kafka-proxy}"
export KAFKA_BROKER="${KAFKA_BROKER:-kafka}"
export KAFKA_BOOTSTRAP_SERVER="${KAFKA_BOOTSTRAP_SERVER:-host.docker.internal:9092}"
export KAFKA_PORT="${KAFKA_PORT:-9092}"
INIT_KAFKA="${INIT_KAFKA:-true}"
ZILLA_CHART="${ZILLA_CHART:-oci://ghcr.io/aklivity/charts/zilla}"

# Install Zilla to the Kubernetes cluster with helm and wait for the pod to start up
echo "==== Installing $ZILLA_CHART to $NAMESPACE with $KAFKA_BROKER($KAFKA_BOOTSTRAP_SERVER) ===="
helm upgrade --install zilla $ZILLA_CHART --version $ZILLA_VERSION --namespace $NAMESPACE --create-namespace --wait \
--values values.yaml \
--set extraEnv[1].value="\"$KAFKA_HOST\"",extraEnv[2].value="\"$KAFKA_PORT\"" \
--set-file zilla\\.yaml=../../zilla.yaml \
--set-file configMaps.specs.data.http-asyncapi\\.yaml=../../specs/http-asyncapi.yaml \
--set-file configMaps.specs.data.kafka-asyncapi\\.yaml=../../specs/kafka-asyncapi.yaml

# Create topics in Kafka
if [[ $INIT_KAFKA == true ]]; then
kubectl run kafka-init-pod --image=bitnami/kafka:3.2 --namespace $NAMESPACE --rm --restart=Never -i -t -- /bin/sh -c "
echo 'Creating topics for $KAFKA_BOOTSTRAP_SERVER'
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --create --if-not-exists --topic petstore-pets --config cleanup.policy=compact
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --create --if-not-exists --topic petstore-customers --config cleanup.policy=compact
/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVER --create --if-not-exists --topic petstore-verified-customers --config cleanup.policy=compact
"
kubectl wait --namespace $NAMESPACE --for=delete pod/kafka-init-pod
fi

# Start port forwarding
SERVICE_PORTS=$(kubectl get svc --namespace $NAMESPACE zilla --template "{{ range .spec.ports }}{{.port}} {{ end }}")
eval "kubectl port-forward --namespace $NAMESPACE service/zilla $SERVICE_PORTS" > /tmp/kubectl-zilla.log 2>&1 &

if [[ -x "$(command -v nc)" ]]; then
until nc -z localhost 7114; do sleep 1; done
fi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ set -x
pgrep kubectl && killall kubectl

# Uninstall Zilla and Kafka
NAMESPACE="${NAMESPACE:-zilla-mqtt-kafka-asyncapi-proxy}"
NAMESPACE="${NAMESPACE:-zilla-http-kafka-asyncapi-proxy}"
helm uninstall zilla --namespace $NAMESPACE
kubectl delete namespace $NAMESPACE
11 changes: 11 additions & 0 deletions asyncapi.http.kafka.proxy/k8s/helm/values.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
extraEnv:
- name: ZILLA_INCUBATOR_ENABLED
value: "true"

livenessProbePort: 7114
readinessProbePort: 7114

service:
ports:
- port: 7114
name: http
Loading

0 comments on commit a936836

Please sign in to comment.