Skip to content

Commit

Permalink
Dockerized kafka-elk-connector
Browse files Browse the repository at this point in the history
  • Loading branch information
toszo committed Apr 26, 2019
1 parent 48a6845 commit 96fbc24
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 88 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"go.formatTool": "goimports"
}
30 changes: 30 additions & 0 deletions kafka-elk-connector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#####################################
# STEP 1 build executable binary #
#####################################
FROM golang:alpine AS builder

# Install git.
# Git is required for fetching the dependencies.
RUN apk update && apk add --no-cache git

WORKDIR /app

COPY go.mod .

RUN go mod download

COPY . .

# Build the binary.
RUN CGO_ENABLED=0 GOOS=linux go build -o main

#####################################
# STEP 2 build a small image #
#####################################
FROM scratch

# Copy our static executable.
COPY --from=builder /app/main /app/main

# Run the hello binary.
ENTRYPOINT ["/app/main"]
35 changes: 35 additions & 0 deletions kafka-elk-connector/deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: epiphany-kafka-elk-connector
spec:
replicas: 1
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
minReadySeconds: 5
template:
metadata:
labels:
app: epiphany-kafka-elk-connector
spec:
containers:
- name: epiphany-kafka-elk-connector
image: toszo/kafka-elk-connector:0.1.1
imagePullPolicy: Always
env:
- name: KAFKA_URL
value: "10.0.4.6:9092"
- name: TOPIC
value: "iot-data"
- name: GROUP_ID
value: "my-consumer-group"
- name: ELASTIC_URL
value: "http://10.0.4.9:9200"
- name: INDEX_NAME
value: "timeseries"


7 changes: 7 additions & 0 deletions kafka-elk-connector/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/toszo/go-kafka-elk

require (
github.com/google/uuid v1.1.0
github.com/segmentio/kafka-go v0.2.2
github.com/olivere/elastic v6.2.17
)
68 changes: 39 additions & 29 deletions kafka-elk-replicator.go → kafka-elk-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@ import (
"fmt"
"log"
"time"

"os"
kafka "github.com/segmentio/kafka-go"
"github.com/olivere/elastic"
"encoding/json"
)

type IoTData struct {
ObjectId string `json:"objectid"`
TimeStamp time.Time `json:"timeStamp,omitempty"`
ObjectId string `json:"objectId"`
TimeStamp time.Time `json:"timestamp,omitempty"`
Variable string `json:"variable,omitempty"`
Model string `json:"model,omitempty"`
Value float64 `json:"value,omitempty"`
Quality int `json:"quality,omitempty"`
}


const mapping = `
{
"settings":{
Expand All @@ -25,11 +30,23 @@ const mapping = `
"mappings":{
"iotdata":{
"properties":{
"objectid":{
"objectId":{
"type":"keyword"
},
"timeStamp":{
"timestamp":{
"type":"date"
},
"variable":{
"type":"keyword"
},
"model":{
"type":"keyword"
},
"value":{
"type":"keyword"
},
"quality":{
"type":"keyword"
}
}
}
Expand All @@ -47,24 +64,32 @@ func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
}

func main() {
// Starting with elastic.v5, you must pass a context to execute each service
// get kafka reader using environment variables.
kafkaURL := os.Getenv("KAFKA_URL")
// kafkaURL := "10.0.4.6:9092"
topic := os.Getenv("TOPIC")
//topic := "test-topic"
groupID := os.Getenv("GROUP_ID")
//groupID := "my-customer-group"
elasticUrl := os.Getenv("ELASTIC_URL")

elkIndexName := os.Getenv("INDEX_NAME") // iot_data

ctx := context.Background()

client, err := elastic.NewClient(elastic.SetURL("http://10.0.4.9:9200"))
client, err := elastic.NewClient(elastic.SetURL(elasticUrl)) // should be in format "http://x.y.z.zz:PORT"
if err != nil {
// Handle error
panic(err)
}

// Use the IndexExists service to check if a specified index exists.
exists, err := client.IndexExists("iot_data").Do(ctx)
exists, err := client.IndexExists(elkIndexName).Do(ctx)
if err != nil {
// Handle error
panic(err)
}
if !exists {
// Create a new index.
createIndex, err := client.CreateIndex("iot_data").BodyString(mapping).Do(ctx)
createIndex, err := client.CreateIndex(elkIndexName).BodyString(mapping).Do(ctx)
if err != nil {
// Handle error
panic(err)
Expand All @@ -74,47 +99,32 @@ func main() {
}
}

// get kafka reader using environment variables.
//kafkaURL := os.Getenv("kafkaURL")
kafkaURL := "10.0.4.6:9092"
//topic := os.Getenv("topic")
topic := "test-topic"
//groupID := os.Getenv("groupID")
groupID := "my-customer-group"

reader := getKafkaReader(kafkaURL, topic, groupID)

defer reader.Close()

fmt.Println("start consuming ... !!")
fmt.Printf("Start consuming Kafka: %s topic: %s, consumer group: %s, and pass to to ELK: %s, index: %s \n", kafkaURL, topic, groupID, elasticUrl, elkIndexName)
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatalln(err)
}

fmt.Printf("Value %s \n", m.Value)
var dat IoTData
merr := json.Unmarshal(m.Value, &dat)
if merr != nil {
log.Fatalln(err)
}

fmt.Printf("Obj: %s date: %s\n", dat.ObjectId, dat.TimeStamp)
// Index a tweet (using JSON serialization)
//data := IoTData{ObjectId: "mySuperObjId", Time: time.Now()}

put1, err := client.Index().
Index("iot_data").
Index(elkIndexName).
Type("iotdata").
Id(dat.ObjectId).
BodyJson(dat).
Do(ctx)
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Indexed iotdata %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)

fmt.Printf("message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}
59 changes: 0 additions & 59 deletions producer.go

This file was deleted.

69 changes: 69 additions & 0 deletions producer/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"context"
"fmt"
"time"
"strconv"
//"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
"encoding/json"
"math/rand"
)

type IoTData struct {
ObjectId string `json:"objectId"`
TimeStamp time.Time `json:"timeStamp,omitempty"`
Variable string `json:"variable,omitempty"`
Model string `json:"model,omitempty"`
Value float64 `json:"value,omitempty"`
Quality int `json:"quality,omitempty"`
}

func newKafkaWriter(kafkaURL, topic string) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
}

func main() {
// get kafka writer using environment variables.
// kafkaURL := os.Getenv("kafkaURL")
// topic := os.Getenv("topic")
kafkaURL := "10.0.4.6:9092"
topic := "iot-data"

writer := newKafkaWriter(kafkaURL, topic)
defer writer.Close()
fmt.Println("start producing ... !!")
for i := 0; ; i++ {
t := time.Now()
fmt.Println(t.Format("20060102150405"))

for j := 0; j < 20 ; j++ {
data := IoTData{strconv.Itoa(j), t, "variable", "my.model.co", rand.Float64(), 1}
fmt.Println(data)
jsonData, merr := json.Marshal(data)
if merr != nil {
fmt.Println(merr)
}

msg := kafka.Message{
Key: []byte(t.Format("20060102150405")),
Value: jsonData,
}

err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println(err)
}
}




//time.Sleep(1 * time.Second)
}
}

0 comments on commit 96fbc24

Please sign in to comment.