Skip to content

Commit

Permalink
Really basic version, subscribe to kafka topic and insert into elasti…
Browse files Browse the repository at this point in the history
…csearch
  • Loading branch information
toszo committed Apr 25, 2019
1 parent 714476c commit 48a6845
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 0 deletions.
120 changes: 120 additions & 0 deletions kafka-elk-replicator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"context"
"fmt"
"log"
"time"

kafka "github.com/segmentio/kafka-go"
"github.com/olivere/elastic"
"encoding/json"
)

type IoTData struct {
ObjectId string `json:"objectid"`
TimeStamp time.Time `json:"timeStamp,omitempty"`
}

const mapping = `
{
"settings":{
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings":{
"iotdata":{
"properties":{
"objectid":{
"type":"keyword"
},
"timeStamp":{
"type":"date"
}
}
}
}
}`

func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaURL},
GroupID: groupID,
Topic: topic,
MinBytes: 10, // 10KB
MaxBytes: 10e6, // 10MB
})
}

func main() {
// Starting with elastic.v5, you must pass a context to execute each service
ctx := context.Background()

client, err := elastic.NewClient(elastic.SetURL("http://10.0.4.9:9200"))
if err != nil {
// Handle error
panic(err)
}

// Use the IndexExists service to check if a specified index exists.
exists, err := client.IndexExists("iot_data").Do(ctx)
if err != nil {
// Handle error
panic(err)
}
if !exists {
// Create a new index.
createIndex, err := client.CreateIndex("iot_data").BodyString(mapping).Do(ctx)
if err != nil {
// Handle error
panic(err)
}
if !createIndex.Acknowledged {
// Not acknowledged
}
}

// get kafka reader using environment variables.
//kafkaURL := os.Getenv("kafkaURL")
kafkaURL := "10.0.4.6:9092"
//topic := os.Getenv("topic")
topic := "test-topic"
//groupID := os.Getenv("groupID")
groupID := "my-customer-group"

reader := getKafkaReader(kafkaURL, topic, groupID)

defer reader.Close()

fmt.Println("start consuming ... !!")
for {
m, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatalln(err)
}

var dat IoTData
merr := json.Unmarshal(m.Value, &dat)
if merr != nil {
log.Fatalln(err)
}

fmt.Printf("Obj: %s date: %s\n", dat.ObjectId, dat.TimeStamp)
// Index a tweet (using JSON serialization)
//data := IoTData{ObjectId: "mySuperObjId", Time: time.Now()}

put1, err := client.Index().
Index("iot_data").
Type("iotdata").
Id(dat.ObjectId).
BodyJson(dat).
Do(ctx)
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Indexed iotdata %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)

fmt.Printf("message at topic:%v partition:%v offset:%v %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
}
59 changes: 59 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
kafka "github.com/segmentio/kafka-go"
"encoding/json"
)

type IoTData struct {
ObjectId string `json:"objectid"`
TimeStamp time.Time `json:"timeStamp,omitempty"`
}

func newKafkaWriter(kafkaURL, topic string) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
}

func main() {
// get kafka writer using environment variables.
// kafkaURL := os.Getenv("kafkaURL")
// topic := os.Getenv("topic")
kafkaURL := "10.0.4.6:9092"
topic := "test-topic"

writer := newKafkaWriter(kafkaURL, topic)
defer writer.Close()
fmt.Println("start producing ... !!")
for i := 0; ; i++ {
t := time.Now()
fmt.Println(t.Format("20060102150405"))


data := IoTData{uuid.New().String(), t}
fmt.Println(data)
jsonData, merr := json.Marshal(data)
if merr != nil {
fmt.Println(merr)
}


msg := kafka.Message{
Key: []byte(t.Format("20060102150405")),
Value: jsonData,
}
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Println(err)
}
//time.Sleep(1 * time.Second)
}
}

0 comments on commit 48a6845

Please sign in to comment.