diff --git a/README.md b/README.md deleted file mode 100644 index e184d24..0000000 --- a/README.md +++ /dev/null @@ -1,40 +0,0 @@ -# RabbitMQ Work Queue example - -Based on [these tutorials](https://www.rabbitmq.com/tutorials/tutorial-two-go.html), -this repo contains a MWE of a master that sends messages to various modules: -a [builder](modules/builder), -an [analyzer](modules/analyzer), -and a [reporter](modules/reporter). - -Those modules then send a response back to the master. - -## Demo setup - -- A [RabbitMQ instance](https://kubernetes.io/docs/tasks/job/coarse-parallel-processing-work-queue/#starting-a-message-queue-service) -- One [master Job](master/master.yaml) issuing messages in all queues -- Three dummy [module ReplicaSets](modules) containing Pods waiting for different kind of messages - -## Run the demo - -1. Start RabbitMQ: - ```bash - kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-service.yaml - kubectl create -f https://raw.githubusercontent.com/kubernetes/kubernetes/release-1.3/examples/celery-rabbitmq/rabbitmq-controller.yaml - ``` -1. Start any subset of dummy modules: - ```bash - kubectl apply -k modules/builder - kubectl apply -k modules/analyzer - kubectl apply -k modules/reporter - ``` - or, to start them all: - ```bash - kubectl apply -k modules - ``` -1. Start the master: - ```bash - kubectl apply -k master - ``` - -Exactly one Pod of each [module ReplicaSet](modules) will consume its corresponding message.\ -All the modules will then reply after having performed some dummy computation. diff --git a/master/Dockerfile b/master/Dockerfile deleted file mode 100644 index 3c1805c..0000000 --- a/master/Dockerfile +++ /dev/null @@ -1,59 +0,0 @@ -ARG BUILDER_IMAGE=golang:alpine -############################ -# STEP 1 build executable binary -############################ -FROM ${BUILDER_IMAGE} as builder - -# Install git + SSL ca certificates.` -# Git is required for fetching the dependencies. -# Ca-certificates is required to call HTTPS endpoints. -RUN apk update && apk add --no-cache git ca-certificates tzdata && update-ca-certificates - -# Create appuser -ENV USER=appuser -ENV UID=10001 - -# See https://stackoverflow.com/a/55757473/12429735 -RUN adduser \ - --disabled-password \ - --gecos "" \ - --home "/nonexistent" \ - --shell "/sbin/nologin" \ - --no-create-home \ - --uid "${UID}" \ - "${USER}" -WORKDIR $GOPATH/src/mypackage/myapp/ - -# use modules -COPY go.mod . - -ENV GO111MODULE=on -RUN go mod download -RUN go mod verify - -COPY . . - -# Build the binary -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ - -ldflags='-w -s -extldflags "-static"' -a \ - -o /go/bin/master . - -############################ -# STEP 2 build a small image -############################ -FROM scratch - -# Import from builder. -COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo -COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ -COPY --from=builder /etc/passwd /etc/passwd -COPY --from=builder /etc/group /etc/group - -# Copy our static executable -COPY --from=builder /go/bin/master /go/bin/master - -# Use an unprivileged user. -USER appuser:appuser - -# Run the master binary. -ENTRYPOINT ["/go/bin/master"] diff --git a/master/go.mod b/master/go.mod deleted file mode 100644 index d7f01a7..0000000 --- a/master/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module master - -go 1.14 - -require github.com/streadway/amqp v1.0.0 // indirect diff --git a/master/go.sum b/master/go.sum deleted file mode 100644 index 75f2157..0000000 --- a/master/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/master/master.go b/master/master.go deleted file mode 100644 index 41c24c6..0000000 --- a/master/master.go +++ /dev/null @@ -1,135 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" - - "github.com/streadway/amqp" -) - -func failOnError(err error, msg string) { - if err != nil { - log.Fatalf("%s: %s", msg, err) - } -} - -func declareResponseQueue(ch *amqp.Channel, queue string) amqp.Queue { - return declareQueue(ch, fmt.Sprintf("%v-response", queue)) -} - -func declareQueue(ch *amqp.Channel, queue string) amqp.Queue { - q, err := ch.QueueDeclare( - queue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - failOnError(err, "Failed to declare a queue") - - return q -} - -func publishMessage(ch *amqp.Channel, q *amqp.Queue, body string) { - err := ch.Publish( - "", // exchange - q.Name, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "text/plain", - //CorrelationId: q.Name, - //ReplyTo: fmt.Sprintf("%v-response", q.Name), - Body: []byte(body), - }) - failOnError(err, "Failed to publish a message") - log.Printf(" [x] Sent %s", body) -} - -func consumeModuleResponse(ch *amqp.Channel, q *amqp.Queue, finalModuleHasResponded chan <- bool, remainingModules *int) { - moduleResponse, err := ch.Consume( - q.Name, - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - failOnError(err, fmt.Sprintf("Failed to consume the %v response", q.Name)) - - // Printing to screen - go func() { - for d := range moduleResponse { - log.Printf("Received a message from a %v: %s", q.Name, d.Body) - } - *remainingModules -= 1 - if *remainingModules == 0 { - finalModuleHasResponded <- true - } - }() -} - -func main() { - - // Connecting to RabbitMQ - conn, err := amqp.Dial("amqp://guest:guest@rabbitmq-service:5672/") - failOnError(err, "Failed to connect to RabbitMQ") - defer conn.Close() - ch, err := conn.Channel() - failOnError(err, "Failed to open a channel") - defer ch.Close() - - // Declaring queues - builderQueue := declareQueue(ch, "builder") - builderResponseQueue := declareResponseQueue(ch, "builder") - analyzerQueue := declareQueue(ch, "analyzer") - analyzerResponseQueue := declareResponseQueue(ch, "analyzer") - reporterQueue := declareQueue(ch, "reporter") - reporterResponseQueue := declareResponseQueue(ch, "reporter") - - type module struct { - name string - queue amqp.Queue - } - - // Modules who are going to be waked up, in order - modulesToWakeUp := []module{ - { - name: "builder", - queue: builderQueue, - }, - { - name: "analyzer", - queue: analyzerQueue, - }, - { - name: "reporter", - queue: reporterQueue, - }, - } - - // Waking up one Pod of each module at the time - for _, module := range modulesToWakeUp { - sleepTimeInSeconds := 5 - time.Sleep(time.Duration(sleepTimeInSeconds) * time.Second) - log.Printf("Waking up one %s...", module.name) - publishMessage(ch, &module.queue, fmt.Sprintf("Go, %s!", module.name)) - } - - // Waiting for modules' responses. - // Apparently, we cannot wait for multiple messages belonging to different queues. - // However, it makes sense for Quartermaster to wait only for messages belonging - // to the current phase (i.e., builder, analysis, report). - // The master consumes messages following that order. - allModulesHaveResponded := make(chan bool) - remainingModules := len(modulesToWakeUp) - consumeModuleResponse(ch, &builderResponseQueue, allModulesHaveResponded, &remainingModules) - consumeModuleResponse(ch, &analyzerResponseQueue, allModulesHaveResponded, &remainingModules) - consumeModuleResponse(ch, &reporterResponseQueue, allModulesHaveResponded, &remainingModules) - - <-allModulesHaveResponded -} diff --git a/master/master.yaml b/master/master.yaml deleted file mode 100644 index 722bd1d..0000000 --- a/master/master.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: master -spec: - template: - spec: - restartPolicy: Never - containers: - - name: master - image: marcomicera/broker-test-rabbitmq-master:responses - imagePullPolicy: Always diff --git a/modules/analyzer/kustomization.yaml b/modules/analyzer/kustomization.yaml deleted file mode 100644 index 5ce70b0..0000000 --- a/modules/analyzer/kustomization.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: kustomize.config.k8s.io/v1beta1 -kind: Kustomization - -bases: - - ../base - -nameSuffix: -analyzer - -patchesStrategicMerge: - - queue_name.yaml diff --git a/modules/analyzer/queue_name.yaml b/modules/analyzer/queue_name.yaml deleted file mode 100644 index 7ae8c25..0000000 --- a/modules/analyzer/queue_name.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: apps/v1 -kind: ReplicaSet -metadata: - name: module -spec: - template: - spec: - containers: - - name: module - env: - - name: QUEUE_NAME - value: "analyzer" diff --git a/modules/base/Dockerfile b/modules/base/Dockerfile deleted file mode 100644 index 31f90cd..0000000 --- a/modules/base/Dockerfile +++ /dev/null @@ -1,59 +0,0 @@ -ARG BUILDER_IMAGE=golang:alpine -############################ -# STEP 1 build executable binary -############################ -FROM ${BUILDER_IMAGE} as builder - -# Install git + SSL ca certificates.` -# Git is required for fetching the dependencies. -# Ca-certificates is required to call HTTPS endpoints. -RUN apk update && apk add --no-cache git ca-certificates tzdata && update-ca-certificates - -# Create appuser -ENV USER=appuser -ENV UID=10001 - -# See https://stackoverflow.com/a/55757473/12429735 -RUN adduser \ - --disabled-password \ - --gecos "" \ - --home "/nonexistent" \ - --shell "/sbin/nologin" \ - --no-create-home \ - --uid "${UID}" \ - "${USER}" -WORKDIR $GOPATH/src/mypackage/myapp/ - -# use modules -COPY go.mod . - -ENV GO111MODULE=on -RUN go mod download -RUN go mod verify - -COPY . . - -# Build the binary -RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ - -ldflags='-w -s -extldflags "-static"' -a \ - -o /go/bin/module . - -############################ -# STEP 2 build a small image -############################ -FROM scratch - -# Import from builder. -COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo -COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ -COPY --from=builder /etc/passwd /etc/passwd -COPY --from=builder /etc/group /etc/group - -# Copy our static executable -COPY --from=builder /go/bin/module /go/bin/module - -# Use an unprivileged user. -USER appuser:appuser - -# Run the module binary. -ENTRYPOINT ["/go/bin/module"] diff --git a/modules/base/go.mod b/modules/base/go.mod deleted file mode 100644 index a38db25..0000000 --- a/modules/base/go.mod +++ /dev/null @@ -1,5 +0,0 @@ -module module - -go 1.14 - -require github.com/streadway/amqp v1.0.0 // indirect diff --git a/modules/base/go.sum b/modules/base/go.sum deleted file mode 100644 index 75f2157..0000000 --- a/modules/base/go.sum +++ /dev/null @@ -1,2 +0,0 @@ -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/modules/base/kustomization.yaml b/modules/base/kustomization.yaml deleted file mode 100644 index b15907b..0000000 --- a/modules/base/kustomization.yaml +++ /dev/null @@ -1,8 +0,0 @@ -apiVersion: kustomize.config.k8s.io/v1beta1 -kind: Kustomization - -resources: - - module.yaml - -commonLabels: - app: broker-test-rabbitmq diff --git a/modules/base/module.go b/modules/base/module.go deleted file mode 100644 index 4c475b4..0000000 --- a/modules/base/module.go +++ /dev/null @@ -1,94 +0,0 @@ -package main - -import ( - "fmt" - "github.com/streadway/amqp" - "log" - "os" - "time" -) - -func failOnError(err error, msg string) { - if err != nil { - log.Fatalf("%s: %s", msg, err) - } -} - -func declareResponseQueue(ch *amqp.Channel, queue string) amqp.Queue { - return declareQueue(ch, fmt.Sprintf("%v-response", queue)) -} - -func declareQueue(ch *amqp.Channel, queue string) amqp.Queue { - q, err := ch.QueueDeclare( - queue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - failOnError(err, "Failed to declare a queue") - - return q -} - -func main() { - - // Connecting to RabbitMQ - conn, err := amqp.Dial("amqp://guest:guest@rabbitmq-service:5672/") - failOnError(err, "Failed to connect to RabbitMQ") - defer conn.Close() - ch, err := conn.Channel() - failOnError(err, "Failed to open a channel") - defer ch.Close() - - // Retrieving the correct queue - queueName := os.Getenv("QUEUE_NAME") - log.Printf("Listening to queue: %s", queueName) - queue := declareQueue(ch, queueName) - responseQueue := declareResponseQueue(ch, queueName) - failOnError(err, "Failed to declare a queue") - - // Consuming a message - msgs, err := ch.Consume( - queue.Name, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - failOnError(err, "Failed to register a consumer") - - // Sending back the module result - responded := make(chan bool) - go func() { - for d := range msgs { - log.Printf("Received a message: %s", d.Body) - - // Dummy computation - dummyComputationTimeInSeconds := 15 - log.Printf("Performing dummy computation for %d seconds...", dummyComputationTimeInSeconds) - time.Sleep(time.Duration(dummyComputationTimeInSeconds) * time.Second) - - // Sending a response - log.Printf("...dummy computation completed. Sending a response to the master...") - responseBody := fmt.Sprintf("Response from %v", queueName) - err = ch.Publish( - "", // exchange - responseQueue.Name, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "text/plain", - CorrelationId: queueName, - Body: []byte(responseBody), - }) - failOnError(err, "Failed to publish the response") - log.Printf("...response sent.") - responded <- true - } - }() - <-responded -} diff --git a/modules/base/module.yaml b/modules/base/module.yaml deleted file mode 100644 index 1b598f3..0000000 --- a/modules/base/module.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: apps/v1 -kind: ReplicaSet -metadata: - name: module -spec: - replicas: 2 - template: - spec: - containers: - - name: module - image: marcomicera/broker-test-rabbitmq-module:responses - imagePullPolicy: Always - env: - - name: QUEUE_NAME - value: "generic" diff --git a/modules/builder/kustomization.yaml b/modules/builder/kustomization.yaml deleted file mode 100644 index e8e3cc5..0000000 --- a/modules/builder/kustomization.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: kustomize.config.k8s.io/v1beta1 -kind: Kustomization - -bases: - - ../base - -nameSuffix: -builder - -patchesStrategicMerge: - - QUEUE_NAME.yaml diff --git a/modules/builder/queue_name.yaml b/modules/builder/queue_name.yaml deleted file mode 100644 index 15c8837..0000000 --- a/modules/builder/queue_name.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: apps/v1 -kind: ReplicaSet -metadata: - name: module -spec: - template: - spec: - containers: - - name: module - env: - - name: QUEUE_NAME - value: "builder" diff --git a/modules/kustomization.yaml b/modules/kustomization.yaml deleted file mode 100644 index 0ca8426..0000000 --- a/modules/kustomization.yaml +++ /dev/null @@ -1,7 +0,0 @@ -apiVersion: kustomize.config.k8s.io/v1beta1 -kind: Kustomization - -bases: - - builder - - analyzer - - reporter diff --git a/modules/reporter/kustomization.yaml b/modules/reporter/kustomization.yaml deleted file mode 100644 index ec24b86..0000000 --- a/modules/reporter/kustomization.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: kustomize.config.k8s.io/v1beta1 -kind: Kustomization - -bases: - - ../base - -nameSuffix: -reporter - -patchesStrategicMerge: - - QUEUE_NAME.yaml diff --git a/modules/reporter/queue_name.yaml b/modules/reporter/queue_name.yaml deleted file mode 100644 index a475962..0000000 --- a/modules/reporter/queue_name.yaml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: apps/v1 -kind: ReplicaSet -metadata: - name: module -spec: - template: - spec: - containers: - - name: module - env: - - name: QUEUE_NAME - value: "reporter" diff --git a/master/kustomization.yaml b/rabbitmq/kustomization.yaml similarity index 57% rename from master/kustomization.yaml rename to rabbitmq/kustomization.yaml index 515e28c..59475c0 100644 --- a/master/kustomization.yaml +++ b/rabbitmq/kustomization.yaml @@ -2,7 +2,6 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization resources: - - master.yaml + - rabbitmq_service.yaml + - rabbitmq_deploy.yaml -commonLabels: - app: broker-test-rabbitmq diff --git a/rabbitmq/rabbitmq_deploy.yaml b/rabbitmq/rabbitmq_deploy.yaml new file mode 100644 index 0000000..b66f5a5 --- /dev/null +++ b/rabbitmq/rabbitmq_deploy.yaml @@ -0,0 +1,23 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rabbitmq + labels: + app: rabbitmq +spec: + replicas: 1 + selector: + matchLabels: + app: rabbitmq + template: + metadata: + labels: + app: rabbitmq + spec: + containers: + - name: rabbitmq + image: rabbitmq:3-management + ports: + - containerPort: 5672 + - containerPort: 15672 diff --git a/rabbitmq/rabbitmq_service.yaml b/rabbitmq/rabbitmq_service.yaml new file mode 100644 index 0000000..d30d167 --- /dev/null +++ b/rabbitmq/rabbitmq_service.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: rabbitmq + name: rabbitmq-service +spec: + type: ClusterIP + selector: + app: rabbitmq + ports: + - port: 5672 + name: amqp + targetPort: 5672 + + - port: 15672 + name: html + targetPort: 15672