Skip to content

Commit

Permalink
TRD-670 Broadcast Flow for Tradelogs v2 (#86)
Browse files Browse the repository at this point in the history
* TRD-670 broadcast flow

* TRD-670 refactor remove old log flow

* TRD-670 add log

* TRD-670 refactor broadcast flow

* TRD-670 update dockerfile

* TRD-670 split file
  • Loading branch information
linhnt3400 authored Oct 23, 2024
1 parent d01f891 commit 7284612
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Dockerfile-v2
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ COPY . .

RUN go build -o parse_log ./v2/cmd/parse_log
RUN go build -o backfill ./v2/cmd/backfill
RUN go build -o broadcast ./v2/cmd/broadcast


## DEPLOY
Expand All @@ -21,6 +22,7 @@ WORKDIR /v2

COPY --from=builder /src/parse_log /v2/parse_log
COPY --from=builder /src/backfill /v2/backfill
COPY --from=builder /src/broadcast /v2/broadcast

COPY v2/cmd/migrations /v2/migrations

Expand Down
53 changes: 53 additions & 0 deletions v2/cmd/broadcast/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"fmt"
"log"
"os"

"github.com/KyberNetwork/tradelogs/v2/internal/server"
"github.com/KyberNetwork/tradelogs/v2/internal/worker"
libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app"
"github.com/KyberNetwork/tradelogs/v2/pkg/kafka"
"github.com/urfave/cli"
"go.uber.org/zap"
)

func main() {
app := libapp.NewApp()
app.Name = "trade log broadcast service"
app.Action = run

app.Flags = append(app.Flags, libapp.KafkaFlag()...)
app.Flags = append(app.Flags, libapp.HTTPServerFlags()...)

if err := app.Run(os.Args); err != nil {
log.Panic(err)
}
}

func run(c *cli.Context) error {
logger, _, flush, err := libapp.NewLogger(c)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}

defer flush()

zap.ReplaceGlobals(logger)
l := logger.Sugar()
l.Infow("Starting broadcast service")

// kafka broadcast topic
broadcastTopic := c.String(libapp.KafkaBroadcastTopic.Name)
err = kafka.ValidateTopicName(broadcastTopic)
if err != nil {
return fmt.Errorf("invalid kafka topic: %w", err)
}

cfg := libapp.KafkaConfigFromFlags(c)
broadcaster := worker.NewBroadcaster(l, cfg, broadcastTopic)

s := server.New(l, c.String(libapp.HTTPBroadcastServerFlag.Name), broadcaster)
return s.Run()
}
34 changes: 10 additions & 24 deletions v2/internal/server/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,15 @@ func (s *BackfillServer) register() {
s.r.GET("/backfill/restart/:id", s.restartTask)
}

func responseErr(c *gin.Context, err error) {
c.JSON(http.StatusBadRequest, gin.H{
"success": false,
"error": err.Error(),
})
}

func internalServerError(c *gin.Context, err error) {
c.JSON(http.StatusInternalServerError, gin.H{
"success": false,
"error": err.Error(),
})
}

func (s *BackfillServer) backfill(c *gin.Context) {
var params Query
if err := c.BindJSON(&params); err != nil {
responseErr(c, err)
responseErr(c, http.StatusBadRequest, err)
return
}

if params.FromBlock > params.ToBlock {
responseErr(c, fmt.Errorf("from block is greater than to block"))
responseErr(c, http.StatusBadRequest, fmt.Errorf("from block is greater than to block"))
return
}

Expand All @@ -92,7 +78,7 @@ func (s *BackfillServer) backfill(c *gin.Context) {
id, message, err := s.service.NewBackfillTask(params.FromBlock, params.ToBlock, params.Exchange)
if err != nil {
l.Errorw("error when backfill", "error", err)
internalServerError(c, err)
responseErr(c, http.StatusInternalServerError, err)
return
}

Expand All @@ -106,7 +92,7 @@ func (s *BackfillServer) backfill(c *gin.Context) {
func (s *BackfillServer) getAllTask(c *gin.Context) {
tasks, err := s.service.ListTask()
if err != nil {
internalServerError(c, err)
responseErr(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, gin.H{
Expand All @@ -119,12 +105,12 @@ func (s *BackfillServer) getTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil || len(id) == 0 {
responseErr(c, fmt.Errorf("invalid task id: %s", id))
responseErr(c, http.StatusBadRequest, fmt.Errorf("invalid task id: %s", id))
return
}
task, err := s.service.GetTask(int(taskID))
if err != nil {
internalServerError(c, err)
responseErr(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, gin.H{
Expand All @@ -137,12 +123,12 @@ func (s *BackfillServer) cancelTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
responseErr(c, fmt.Errorf("invalid task id: %w", err))
responseErr(c, http.StatusBadRequest, fmt.Errorf("invalid task id: %w", err))
return
}
err = s.service.CancelBackfillTask(int(taskID))
if err != nil {
internalServerError(c, err)
responseErr(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, gin.H{
Expand All @@ -154,12 +140,12 @@ func (s *BackfillServer) restartTask(c *gin.Context) {
id := c.Param("id")
taskID, err := strconv.ParseInt(id, 10, 32)
if err != nil {
responseErr(c, fmt.Errorf("invalid task id: %w", err))
responseErr(c, http.StatusBadRequest, fmt.Errorf("invalid task id: %w", err))
return
}
err = s.service.RestartBackfillTask(int(taskID))
if err != nil {
internalServerError(c, err)
responseErr(c, http.StatusInternalServerError, err)
return
}
c.JSON(http.StatusOK, gin.H{
Expand Down
93 changes: 93 additions & 0 deletions v2/internal/server/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package server

import (
"fmt"
"net/http"

"github.com/KyberNetwork/tradelogs/v2/internal/worker"
"github.com/gin-contrib/pprof"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)

var (
wsupgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// Allow connections from any Origin
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)

// Server to serve the service.
type Server struct {
r *gin.Engine
bindAddr string
l *zap.SugaredLogger
bc *worker.Broadcaster
}

// New returns a new server.
func New(l *zap.SugaredLogger, bindAddr string, bc *worker.Broadcaster) *Server {
engine := gin.New()
engine.Use(gin.Recovery())

server := &Server{
r: engine,
bindAddr: bindAddr,
l: l,
bc: bc,
}

gin.SetMode(gin.ReleaseMode)
server.register()

return server
}

// Run runs server.
func (s *Server) Run() error {
if err := s.r.Run(s.bindAddr); err != nil {
return fmt.Errorf("run server: %w", err)
}

return nil
}

func (s *Server) register() {
pprof.Register(s.r, "/debug")
s.r.GET("/eventlogws", s.registerEventLogWS)
}

func responseErr(c *gin.Context, status int, err error) {
c.JSON(status, gin.H{
"success": false,
"error": err.Error(),
})
}

func (s *Server) registerEventLogWS(c *gin.Context) {
var param worker.RegisterRequest
if err := c.BindQuery(&param); err != nil {
responseErr(c, http.StatusBadRequest, err)
return
}

s.l.Infow("receive ws", "param", param)
conn, err := wsupgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
s.l.Errorw("Failed to set websocket upgrade", "error", err)
responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws"))
return
}

err = s.bc.NewConn(param, conn)
if err != nil {
s.l.Errorw("Failed to create websocket connection", "error", err)
responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create new connection"))
return
}
}
110 changes: 110 additions & 0 deletions v2/internal/worker/broadcast_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package worker

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/IBM/sarama"
"github.com/KyberNetwork/tradelogs/pkg/storage"
"github.com/KyberNetwork/tradelogs/v2/pkg/kafka"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)

type Client struct {
l *zap.SugaredLogger
id string
ws *websocket.Conn
params RegisterRequest
}

func (c *Client) run(ctx context.Context, cfg *kafka.Config, topic string) error {
// kafka consumer for broadcasting trade logs
consumer, err := kafka.NewConsumer(cfg, c.id)
if err != nil {
return fmt.Errorf("failed to create kafka consumer: %w", err)
}

tradeLogsChan := make(chan *sarama.ConsumerMessage, 100)

go func() {
err = consumer.Consume(ctx, c.l, topic, tradeLogsChan)
if err != nil {
panic(fmt.Errorf("failed to consume trade logs: %w", err))
}
}()

go func() {
for msg := range tradeLogsChan {
c.broadcast(msg)
}
}()

return nil
}

func (c *Client) broadcast(msg *sarama.ConsumerMessage) {
var newMsg kafka.Message
err := json.Unmarshal(msg.Value, &newMsg)
if err != nil {
c.l.Errorw("error when unmarshal message", "err", err, "msg", string(msg.Value))
return
}
dataBytes, err := json.Marshal(newMsg.Data)
if err != nil {
c.l.Errorw("error when marshal message data", "err", err, "data", newMsg.Data)
return
}

switch newMsg.Type {
case kafka.MessageTypeRevert:
var blocks []uint64
err = json.Unmarshal(dataBytes, &blocks)
if err != nil {
c.l.Errorw("error when unmarshal reverted blocks", "err", err, "data", string(dataBytes))
return
}
newMsg.Data = blocks
if err = c.ws.WriteJSON(newMsg); err != nil {
c.l.Errorw("error when send msg", "err", err)
}
c.l.Infow("broadcast revert message", "message", newMsg)

case kafka.MessageTypeTradeLog:
var tradelog storage.TradeLog
err = json.Unmarshal(dataBytes, &tradelog)
if err != nil {
c.l.Errorw("error when unmarshal trade log", "err", err, "data", string(dataBytes))
return
}
newMsg.Data = tradelog
if c.match(tradelog) {
if err = c.ws.WriteJSON(newMsg); err != nil {
c.l.Errorw("error when send msg", "err", err)
}
}
c.l.Infow("broadcast trade log message", "message", newMsg)
}
}

func (c *Client) match(log storage.TradeLog) bool {
params := c.params
if len(params.EventHash) != 0 && !strings.EqualFold(params.EventHash, log.EventHash) {
return false
}
if len(params.Maker) != 0 && !strings.EqualFold(params.Maker, log.Maker) {
return false
}
if len(params.Taker) != 0 && !strings.EqualFold(params.Taker, log.Taker) {
return false
}
if len(params.MakerToken) != 0 && !strings.EqualFold(params.MakerToken, log.MakerToken) {
return false
}
if len(params.TakerToken) != 0 && !strings.EqualFold(params.TakerToken, log.TakerToken) {
return false
}
return true
}
Loading

0 comments on commit 7284612

Please sign in to comment.