Skip to content

Commit

Permalink
TRD-670 refactor broadcast flow
Browse files Browse the repository at this point in the history
  • Loading branch information
linhnt3400 committed Oct 23, 2024
1 parent 288a357 commit 6f7af20
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 111 deletions.
26 changes: 2 additions & 24 deletions v2/cmd/broadcast/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package main

import (
"context"
"fmt"
"log"
"os"

"github.com/IBM/sarama"
"github.com/KyberNetwork/tradelogs/v2/internal/server"
"github.com/KyberNetwork/tradelogs/v2/internal/worker"
libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app"
Expand Down Expand Up @@ -47,28 +45,8 @@ func run(c *cli.Context) error {
return fmt.Errorf("invalid kafka topic: %w", err)
}

// kafka consumer for broadcasting trade logs
consumer, err := kafka.NewConsumer(libapp.KafkaConfigFromFlags(c), c.String(libapp.KafkaConsumerGroup.Name))
if err != nil {
return fmt.Errorf("failed to create kafka consumer: %w", err)
}

tradeLogsChan := make(chan *sarama.ConsumerMessage, 100)
defer close(tradeLogsChan)

ctx := context.Background()
defer ctx.Done()

go func() {
err = consumer.Consume(ctx, l, broadcastTopic, tradeLogsChan)
if err != nil {
panic(fmt.Errorf("failed to consume trade logs: %w", err))
}
}()

broadcaster := worker.NewBroadcaster(l, tradeLogsChan)

go broadcaster.Run()
cfg := libapp.KafkaConfigFromFlags(c)
broadcaster := worker.NewBroadcaster(l, cfg, broadcastTopic)

s := server.New(l, c.String(libapp.HTTPBroadcastServerFlag.Name), broadcaster)
return s.Run()
Expand Down
8 changes: 7 additions & 1 deletion v2/internal/server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,11 @@ func (s *Server) registerEventLogWS(c *gin.Context) {
responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws"))
return
}
s.bc.NewConn(param, conn)

err = s.bc.NewConn(param, conn)
if err != nil {
s.l.Errorw("Failed to create websocket connection", "error", err)
responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create new connection"))
return
}
}
184 changes: 105 additions & 79 deletions v2/internal/worker/broadcaster.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package worker

import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

Expand All @@ -16,17 +18,20 @@ import (
type Broadcaster struct {
l *zap.SugaredLogger
mu sync.Mutex
clients map[string]Conn
channel chan *sarama.ConsumerMessage
clients map[string]*Client
config *kafka.Config
topic string
}

type (
Conn struct {
Client struct {
l *zap.SugaredLogger
id string
ws *websocket.Conn
params RegisterRequest
}
RegisterRequest struct {
ID string `form:"id"`
EventHash string `form:"event_hash"`
Maker string `form:"maker"`
Taker string `form:"taker"`
Expand All @@ -35,103 +40,141 @@ type (
}
)

func NewBroadcaster(logger *zap.SugaredLogger, ch chan *sarama.ConsumerMessage) *Broadcaster {
func NewBroadcaster(logger *zap.SugaredLogger, cfg *kafka.Config, topic string) *Broadcaster {
return &Broadcaster{
l: logger,
channel: ch,
clients: make(map[string]Conn),
config: cfg,
topic: topic,
clients: make(map[string]*Client),
}
}

func (b *Broadcaster) Run() {
for msg := range b.channel {
b.broadcast(msg)
}
}
func (b *Broadcaster) NewConn(req RegisterRequest, conn *websocket.Conn) error {
ctx, cancel := context.WithCancel(context.Background())

func (b *Broadcaster) broadcast(msg *sarama.ConsumerMessage) {
var newMsg kafka.Message
err := json.Unmarshal(msg.Value, &newMsg)
if err != nil {
b.l.Errorw("error when unmarshal message", "err", err, "msg", string(msg.Value))
return
}
dataBytes, err := json.Marshal(newMsg.Data)
if err != nil {
b.l.Errorw("error when marshal message data", "err", err, "data", newMsg.Data)
return
}
// new connection
if len(req.ID) == 0 {
// create id for the connection
req.ID = xid.New().String()

switch newMsg.Type {
case kafka.MessageTypeRevert:
var blocks []uint64
err = json.Unmarshal(dataBytes, &blocks)
if err != nil {
b.l.Errorw("error when unmarshal reverted blocks", "err", err, "data", string(dataBytes))
return
}
b.writeRevert(blocks)
case kafka.MessageTypeTradeLog:
var tradelog storage.TradeLog
err = json.Unmarshal(dataBytes, &tradelog)
// write the id
err := conn.WriteJSON(map[string]interface{}{"id": req.ID})
if err != nil {
b.l.Errorw("error when unmarshal trade log", "err", err, "data", string(dataBytes))
return
b.removeClient(cancel, conn, req.ID)
return fmt.Errorf("write conn id err: %v", err)
}
b.writeTradeLog(tradelog)
}
}
b.l.Infow("connected socket", "id", req.ID)

func (b *Broadcaster) NewConn(req RegisterRequest, conn *websocket.Conn) {
id := xid.New().String()
b.l.Infow("connected socket", "id", id)
go func() {
msgType, msg, err := conn.ReadMessage()
b.l.Infow("read msg result", "id", id, "msgType", msgType, "msg", msg, "err", err)
b.l.Infow("read msg result", "id", req.ID, "msgType", msgType, "msg", msg, "err", err)
if err != nil {
b.removeConn(conn, id)
b.l.Errorw("error when read ws connection", "id", req.ID, "err", err)
b.removeClient(cancel, conn, req.ID)
}
}()
b.addConn(conn, id, req)

b.addClient(conn, req.ID, req)

err := b.clients[req.ID].run(ctx, b.config, b.topic)
if err != nil {
b.removeClient(cancel, conn, req.ID)
return fmt.Errorf("cannot run client: %w", err)
}
return nil
}

func (b *Broadcaster) removeConn(conn *websocket.Conn, id string) {
func (b *Broadcaster) removeClient(cancel context.CancelFunc, conn *websocket.Conn, id string) {
b.mu.Lock()
defer b.mu.Unlock()
cancel()
_ = conn.Close()
delete(b.clients, id)
}

func (b *Broadcaster) addConn(conn *websocket.Conn, id string, params RegisterRequest) {
func (b *Broadcaster) addClient(conn *websocket.Conn, id string, params RegisterRequest) {
b.mu.Lock()
defer b.mu.Unlock()
b.clients[id] = Conn{
b.clients[id] = &Client{
l: b.l.With("id", id),
id: id,
ws: conn,
params: params,
}
}

func (b *Broadcaster) writeTradeLog(log storage.TradeLog) {
b.mu.Lock()
defer b.mu.Unlock()
msg := kafka.Message{
Type: kafka.MessageTypeTradeLog,
Data: log,
func (c *Client) run(ctx context.Context, cfg *kafka.Config, topic string) error {
// kafka consumer for broadcasting trade logs
consumer, err := kafka.NewConsumer(cfg, c.id)
if err != nil {
return fmt.Errorf("failed to create kafka consumer: %w", err)
}
var failCount int
for _, conn := range b.clients {
if match(log, conn.params) {
if err := conn.ws.WriteJSON(msg); err != nil {
b.l.Errorw("error when send msg", "err", err, "id", conn.id)
failCount++

tradeLogsChan := make(chan *sarama.ConsumerMessage, 100)

go func() {
err = consumer.Consume(ctx, c.l, topic, tradeLogsChan)
if err != nil {
panic(fmt.Errorf("failed to consume trade logs: %w", err))
}
}()

go func() {
for msg := range tradeLogsChan {
c.broadcast(msg)
}
}()

return nil
}

func (c *Client) broadcast(msg *sarama.ConsumerMessage) {
var newMsg kafka.Message
err := json.Unmarshal(msg.Value, &newMsg)
if err != nil {
c.l.Errorw("error when unmarshal message", "err", err, "msg", string(msg.Value))
return
}
dataBytes, err := json.Marshal(newMsg.Data)
if err != nil {
c.l.Errorw("error when marshal message data", "err", err, "data", newMsg.Data)
return
}

switch newMsg.Type {
case kafka.MessageTypeRevert:
var blocks []uint64
err = json.Unmarshal(dataBytes, &blocks)
if err != nil {
c.l.Errorw("error when unmarshal reverted blocks", "err", err, "data", string(dataBytes))
return
}
newMsg.Data = blocks
if err = c.ws.WriteJSON(newMsg); err != nil {
c.l.Errorw("error when send msg", "err", err)
}
c.l.Infow("broadcast revert message", "message", newMsg)

case kafka.MessageTypeTradeLog:
var tradelog storage.TradeLog
err = json.Unmarshal(dataBytes, &tradelog)
if err != nil {
c.l.Errorw("error when unmarshal trade log", "err", err, "data", string(dataBytes))
return
}
newMsg.Data = tradelog
if c.match(tradelog) {
if err = c.ws.WriteJSON(newMsg); err != nil {
c.l.Errorw("error when send msg", "err", err)
}
}
c.l.Infow("broadcast trade log message", "message", newMsg)
}
b.l.Infow("broadcast trade log message", "total", len(b.clients), "fail", failCount, "message", msg)
}

func match(log storage.TradeLog, params RegisterRequest) bool {
func (c *Client) match(log storage.TradeLog) bool {
params := c.params
if len(params.EventHash) != 0 && !strings.EqualFold(params.EventHash, log.EventHash) {
return false
}
Expand All @@ -149,20 +192,3 @@ func match(log storage.TradeLog, params RegisterRequest) bool {
}
return true
}

func (b *Broadcaster) writeRevert(blocks []uint64) {
b.mu.Lock()
defer b.mu.Unlock()
msg := kafka.Message{
Type: kafka.MessageTypeRevert,
Data: blocks,
}
var failCount int
for _, conn := range b.clients {
if err := conn.ws.WriteJSON(msg); err != nil {
b.l.Errorw("error when send msg", "err", err, "id", conn.id)
failCount++
}
}
b.l.Infow("broadcast revert message", "total", len(b.clients), "fail", failCount, "message", msg)
}
8 changes: 1 addition & 7 deletions v2/pkg/app/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,11 @@ var (
EnvVar: "KAFKA_BROADCAST_TOPIC",
Value: "trade-logs",
}
KafkaConsumerGroup = cli.StringFlag{
Name: "kafka-consumer-group",
Usage: "Kafka consumer group",
EnvVar: "KAFKA_CONSUMER_GROUP",
Value: "trade-logs-broadcaster",
}
)

func KafkaFlag() []cli.Flag {
return []cli.Flag{
kafkaAddresses, kafkaAuthentication, kafkaUsername, kafkaPassword, KafkaBroadcastTopic, KafkaConsumerGroup,
kafkaAddresses, kafkaAuthentication, kafkaUsername, kafkaPassword, KafkaBroadcastTopic,
}
}

Expand Down
1 change: 1 addition & 0 deletions v2/pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewConsumer(config *Config, consumerGroup string) (*SaramaConsumer, error)

func (c *SaramaConsumer) Consume(ctx context.Context, l *zap.SugaredLogger, topic string, ch chan<- *sarama.ConsumerMessage) error {
messageHandler := newConsumerGroupHandler(ch)
defer close(ch)
for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit 6f7af20

Please sign in to comment.