diff --git a/Dockerfile-v2 b/Dockerfile-v2 index 539a164..1fe1fa4 100644 --- a/Dockerfile-v2 +++ b/Dockerfile-v2 @@ -7,6 +7,7 @@ COPY . . RUN go build -o parse_log ./v2/cmd/parse_log RUN go build -o backfill ./v2/cmd/backfill +RUN go build -o broadcast ./v2/cmd/broadcast ## DEPLOY @@ -21,6 +22,7 @@ WORKDIR /v2 COPY --from=builder /src/parse_log /v2/parse_log COPY --from=builder /src/backfill /v2/backfill +COPY --from=builder /src/broadcast /v2/broadcast COPY v2/cmd/migrations /v2/migrations diff --git a/v2/cmd/broadcast/main.go b/v2/cmd/broadcast/main.go new file mode 100644 index 0000000..5eb2755 --- /dev/null +++ b/v2/cmd/broadcast/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + "log" + "os" + + "github.com/KyberNetwork/tradelogs/v2/internal/server" + "github.com/KyberNetwork/tradelogs/v2/internal/worker" + libapp "github.com/KyberNetwork/tradelogs/v2/pkg/app" + "github.com/KyberNetwork/tradelogs/v2/pkg/kafka" + "github.com/urfave/cli" + "go.uber.org/zap" +) + +func main() { + app := libapp.NewApp() + app.Name = "trade log broadcast service" + app.Action = run + + app.Flags = append(app.Flags, libapp.KafkaFlag()...) + app.Flags = append(app.Flags, libapp.HTTPServerFlags()...) + + if err := app.Run(os.Args); err != nil { + log.Panic(err) + } +} + +func run(c *cli.Context) error { + logger, _, flush, err := libapp.NewLogger(c) + if err != nil { + return fmt.Errorf("new logger: %w", err) + } + + defer flush() + + zap.ReplaceGlobals(logger) + l := logger.Sugar() + l.Infow("Starting broadcast service") + + // kafka broadcast topic + broadcastTopic := c.String(libapp.KafkaBroadcastTopic.Name) + err = kafka.ValidateTopicName(broadcastTopic) + if err != nil { + return fmt.Errorf("invalid kafka topic: %w", err) + } + + cfg := libapp.KafkaConfigFromFlags(c) + broadcaster := worker.NewBroadcaster(l, cfg, broadcastTopic) + + s := server.New(l, c.String(libapp.HTTPBroadcastServerFlag.Name), broadcaster) + return s.Run() +} diff --git a/v2/internal/server/backfill.go b/v2/internal/server/backfill.go index 022f4e6..890ff9c 100644 --- a/v2/internal/server/backfill.go +++ b/v2/internal/server/backfill.go @@ -60,29 +60,15 @@ func (s *BackfillServer) register() { s.r.GET("/backfill/restart/:id", s.restartTask) } -func responseErr(c *gin.Context, err error) { - c.JSON(http.StatusBadRequest, gin.H{ - "success": false, - "error": err.Error(), - }) -} - -func internalServerError(c *gin.Context, err error) { - c.JSON(http.StatusInternalServerError, gin.H{ - "success": false, - "error": err.Error(), - }) -} - func (s *BackfillServer) backfill(c *gin.Context) { var params Query if err := c.BindJSON(¶ms); err != nil { - responseErr(c, err) + responseErr(c, http.StatusBadRequest, err) return } if params.FromBlock > params.ToBlock { - responseErr(c, fmt.Errorf("from block is greater than to block")) + responseErr(c, http.StatusBadRequest, fmt.Errorf("from block is greater than to block")) return } @@ -92,7 +78,7 @@ func (s *BackfillServer) backfill(c *gin.Context) { id, message, err := s.service.NewBackfillTask(params.FromBlock, params.ToBlock, params.Exchange) if err != nil { l.Errorw("error when backfill", "error", err) - internalServerError(c, err) + responseErr(c, http.StatusInternalServerError, err) return } @@ -106,7 +92,7 @@ func (s *BackfillServer) backfill(c *gin.Context) { func (s *BackfillServer) getAllTask(c *gin.Context) { tasks, err := s.service.ListTask() if err != nil { - internalServerError(c, err) + responseErr(c, http.StatusInternalServerError, err) return } c.JSON(http.StatusOK, gin.H{ @@ -119,12 +105,12 @@ func (s *BackfillServer) getTask(c *gin.Context) { id := c.Param("id") taskID, err := strconv.ParseInt(id, 10, 32) if err != nil || len(id) == 0 { - responseErr(c, fmt.Errorf("invalid task id: %s", id)) + responseErr(c, http.StatusBadRequest, fmt.Errorf("invalid task id: %s", id)) return } task, err := s.service.GetTask(int(taskID)) if err != nil { - internalServerError(c, err) + responseErr(c, http.StatusInternalServerError, err) return } c.JSON(http.StatusOK, gin.H{ @@ -137,12 +123,12 @@ func (s *BackfillServer) cancelTask(c *gin.Context) { id := c.Param("id") taskID, err := strconv.ParseInt(id, 10, 32) if err != nil { - responseErr(c, fmt.Errorf("invalid task id: %w", err)) + responseErr(c, http.StatusBadRequest, fmt.Errorf("invalid task id: %w", err)) return } err = s.service.CancelBackfillTask(int(taskID)) if err != nil { - internalServerError(c, err) + responseErr(c, http.StatusInternalServerError, err) return } c.JSON(http.StatusOK, gin.H{ @@ -154,12 +140,12 @@ func (s *BackfillServer) restartTask(c *gin.Context) { id := c.Param("id") taskID, err := strconv.ParseInt(id, 10, 32) if err != nil { - responseErr(c, fmt.Errorf("invalid task id: %w", err)) + responseErr(c, http.StatusBadRequest, fmt.Errorf("invalid task id: %w", err)) return } err = s.service.RestartBackfillTask(int(taskID)) if err != nil { - internalServerError(c, err) + responseErr(c, http.StatusInternalServerError, err) return } c.JSON(http.StatusOK, gin.H{ diff --git a/v2/internal/server/broadcast.go b/v2/internal/server/broadcast.go new file mode 100644 index 0000000..bfd94a7 --- /dev/null +++ b/v2/internal/server/broadcast.go @@ -0,0 +1,93 @@ +package server + +import ( + "fmt" + "net/http" + + "github.com/KyberNetwork/tradelogs/v2/internal/worker" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +var ( + wsupgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + // Allow connections from any Origin + CheckOrigin: func(r *http.Request) bool { + return true + }, + } +) + +// Server to serve the service. +type Server struct { + r *gin.Engine + bindAddr string + l *zap.SugaredLogger + bc *worker.Broadcaster +} + +// New returns a new server. +func New(l *zap.SugaredLogger, bindAddr string, bc *worker.Broadcaster) *Server { + engine := gin.New() + engine.Use(gin.Recovery()) + + server := &Server{ + r: engine, + bindAddr: bindAddr, + l: l, + bc: bc, + } + + gin.SetMode(gin.ReleaseMode) + server.register() + + return server +} + +// Run runs server. +func (s *Server) Run() error { + if err := s.r.Run(s.bindAddr); err != nil { + return fmt.Errorf("run server: %w", err) + } + + return nil +} + +func (s *Server) register() { + pprof.Register(s.r, "/debug") + s.r.GET("/eventlogws", s.registerEventLogWS) +} + +func responseErr(c *gin.Context, status int, err error) { + c.JSON(status, gin.H{ + "success": false, + "error": err.Error(), + }) +} + +func (s *Server) registerEventLogWS(c *gin.Context) { + var param worker.RegisterRequest + if err := c.BindQuery(¶m); err != nil { + responseErr(c, http.StatusBadRequest, err) + return + } + + s.l.Infow("receive ws", "param", param) + conn, err := wsupgrader.Upgrade(c.Writer, c.Request, nil) + if err != nil { + s.l.Errorw("Failed to set websocket upgrade", "error", err) + responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws")) + return + } + + err = s.bc.NewConn(param, conn) + if err != nil { + s.l.Errorw("Failed to create websocket connection", "error", err) + responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create new connection")) + return + } +} diff --git a/v2/internal/worker/broadcast_client.go b/v2/internal/worker/broadcast_client.go new file mode 100644 index 0000000..9768d66 --- /dev/null +++ b/v2/internal/worker/broadcast_client.go @@ -0,0 +1,110 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/IBM/sarama" + "github.com/KyberNetwork/tradelogs/pkg/storage" + "github.com/KyberNetwork/tradelogs/v2/pkg/kafka" + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +type Client struct { + l *zap.SugaredLogger + id string + ws *websocket.Conn + params RegisterRequest +} + +func (c *Client) run(ctx context.Context, cfg *kafka.Config, topic string) error { + // kafka consumer for broadcasting trade logs + consumer, err := kafka.NewConsumer(cfg, c.id) + if err != nil { + return fmt.Errorf("failed to create kafka consumer: %w", err) + } + + tradeLogsChan := make(chan *sarama.ConsumerMessage, 100) + + go func() { + err = consumer.Consume(ctx, c.l, topic, tradeLogsChan) + if err != nil { + panic(fmt.Errorf("failed to consume trade logs: %w", err)) + } + }() + + go func() { + for msg := range tradeLogsChan { + c.broadcast(msg) + } + }() + + return nil +} + +func (c *Client) broadcast(msg *sarama.ConsumerMessage) { + var newMsg kafka.Message + err := json.Unmarshal(msg.Value, &newMsg) + if err != nil { + c.l.Errorw("error when unmarshal message", "err", err, "msg", string(msg.Value)) + return + } + dataBytes, err := json.Marshal(newMsg.Data) + if err != nil { + c.l.Errorw("error when marshal message data", "err", err, "data", newMsg.Data) + return + } + + switch newMsg.Type { + case kafka.MessageTypeRevert: + var blocks []uint64 + err = json.Unmarshal(dataBytes, &blocks) + if err != nil { + c.l.Errorw("error when unmarshal reverted blocks", "err", err, "data", string(dataBytes)) + return + } + newMsg.Data = blocks + if err = c.ws.WriteJSON(newMsg); err != nil { + c.l.Errorw("error when send msg", "err", err) + } + c.l.Infow("broadcast revert message", "message", newMsg) + + case kafka.MessageTypeTradeLog: + var tradelog storage.TradeLog + err = json.Unmarshal(dataBytes, &tradelog) + if err != nil { + c.l.Errorw("error when unmarshal trade log", "err", err, "data", string(dataBytes)) + return + } + newMsg.Data = tradelog + if c.match(tradelog) { + if err = c.ws.WriteJSON(newMsg); err != nil { + c.l.Errorw("error when send msg", "err", err) + } + } + c.l.Infow("broadcast trade log message", "message", newMsg) + } +} + +func (c *Client) match(log storage.TradeLog) bool { + params := c.params + if len(params.EventHash) != 0 && !strings.EqualFold(params.EventHash, log.EventHash) { + return false + } + if len(params.Maker) != 0 && !strings.EqualFold(params.Maker, log.Maker) { + return false + } + if len(params.Taker) != 0 && !strings.EqualFold(params.Taker, log.Taker) { + return false + } + if len(params.MakerToken) != 0 && !strings.EqualFold(params.MakerToken, log.MakerToken) { + return false + } + if len(params.TakerToken) != 0 && !strings.EqualFold(params.TakerToken, log.TakerToken) { + return false + } + return true +} diff --git a/v2/internal/worker/broadcaster.go b/v2/internal/worker/broadcaster.go new file mode 100644 index 0000000..acade68 --- /dev/null +++ b/v2/internal/worker/broadcaster.go @@ -0,0 +1,93 @@ +package worker + +import ( + "context" + "fmt" + "sync" + + "github.com/KyberNetwork/tradelogs/v2/pkg/kafka" + "github.com/gorilla/websocket" + "github.com/rs/xid" + "go.uber.org/zap" +) + +type Broadcaster struct { + l *zap.SugaredLogger + mu sync.Mutex + clients map[string]*Client + config *kafka.Config + topic string +} + +type RegisterRequest struct { + ID string `form:"id"` + EventHash string `form:"event_hash"` + Maker string `form:"maker"` + Taker string `form:"taker"` + TakerToken string `form:"taker_token"` + MakerToken string `form:"maker_token"` +} + +func NewBroadcaster(logger *zap.SugaredLogger, cfg *kafka.Config, topic string) *Broadcaster { + return &Broadcaster{ + l: logger, + config: cfg, + topic: topic, + clients: make(map[string]*Client), + } +} + +func (b *Broadcaster) NewConn(req RegisterRequest, conn *websocket.Conn) error { + ctx, cancel := context.WithCancel(context.Background()) + + // new connection + if len(req.ID) == 0 { + // create id for the connection + req.ID = xid.New().String() + + // write the id + err := conn.WriteJSON(map[string]interface{}{"id": req.ID}) + if err != nil { + b.removeClient(cancel, conn, req.ID) + return fmt.Errorf("write conn id err: %v", err) + } + } + b.l.Infow("connected socket", "id", req.ID) + + go func() { + msgType, msg, err := conn.ReadMessage() + b.l.Infow("read msg result", "id", req.ID, "msgType", msgType, "msg", msg, "err", err) + if err != nil { + b.l.Errorw("error when read ws connection", "id", req.ID, "err", err) + b.removeClient(cancel, conn, req.ID) + } + }() + + b.addClient(conn, req.ID, req) + + err := b.clients[req.ID].run(ctx, b.config, b.topic) + if err != nil { + b.removeClient(cancel, conn, req.ID) + return fmt.Errorf("cannot run client: %w", err) + } + return nil +} + +func (b *Broadcaster) removeClient(cancel context.CancelFunc, conn *websocket.Conn, id string) { + b.mu.Lock() + defer b.mu.Unlock() + cancel() + _ = conn.Close() + delete(b.clients, id) +} + +func (b *Broadcaster) addClient(conn *websocket.Conn, id string, params RegisterRequest) { + b.mu.Lock() + defer b.mu.Unlock() + b.clients[id] = &Client{ + l: b.l.With("id", id), + id: id, + ws: conn, + params: params, + } +} diff --git a/v2/pkg/app/kafka.go b/v2/pkg/app/kafka.go index 5caba57..60065bd 100644 --- a/v2/pkg/app/kafka.go +++ b/v2/pkg/app/kafka.go @@ -34,8 +34,7 @@ var ( Name: "kafka-broadcast-topic", Usage: "Kafka broadcast topic", EnvVar: "KAFKA_BROADCAST_TOPIC", - //Required: true, - Value: "trade-logs", + Value: "trade-logs", } ) diff --git a/v2/pkg/app/server.go b/v2/pkg/app/server.go index 0225d0f..8a537ab 100644 --- a/v2/pkg/app/server.go +++ b/v2/pkg/app/server.go @@ -2,15 +2,24 @@ package app import "github.com/urfave/cli" -var HTTPBackfillServerFlag = cli.StringFlag{ - Name: "backfill-server-address", - Usage: "Run the rest for backfill server", - EnvVar: "BACKFILL_SERVER_ADDRESS", - Value: "localhost:8081", -} +var ( + HTTPBackfillServerFlag = cli.StringFlag{ + Name: "backfill-server-address", + Usage: "Run the rest for backfill server", + EnvVar: "BACKFILL_SERVER_ADDRESS", + Value: "localhost:8081", + } + HTTPBroadcastServerFlag = cli.StringFlag{ + Name: "broadcast-server-address", + Usage: "Run the rest for broadcast server", + EnvVar: "BROADCAST_SERVER_ADDRESS", + Value: "localhost:8082", + } +) func HTTPServerFlags() []cli.Flag { return []cli.Flag{ HTTPBackfillServerFlag, + HTTPBroadcastServerFlag, } } diff --git a/v2/pkg/handler/trade_logs.go b/v2/pkg/handler/trade_logs.go index 6c958c3..814daf0 100644 --- a/v2/pkg/handler/trade_logs.go +++ b/v2/pkg/handler/trade_logs.go @@ -52,18 +52,18 @@ func (h *TradeLogHandler) ProcessBlock(blockHash string, blockNumber uint64, tim } func (h *TradeLogHandler) ProcessBlockWithExclusion(blockHash string, blockNumber uint64, timestamp uint64, exclusions sets.Set[string]) error { - // remove old trade log in db of processing block - err := h.storage.DeleteWithExclusions([]uint64{blockNumber}, exclusions) - if err != nil { - return fmt.Errorf("delete blocks error: %w", err) - } - // fetch trace call calls, err := h.rpcClient.FetchTraceCalls(context.Background(), blockHash) if err != nil { return fmt.Errorf("fetch calls error: %w", err) } + // remove old trade log in db of processing block + err = h.storage.DeleteWithExclusions([]uint64{blockNumber}, exclusions) + if err != nil { + return fmt.Errorf("delete blocks error: %w", err) + } + logIndexStart := 0 for i, call := range calls { logIndexStart = assignLogIndexes(&call.CallFrame, logIndexStart) diff --git a/v2/pkg/kafka/consumer.go b/v2/pkg/kafka/consumer.go new file mode 100644 index 0000000..b923249 --- /dev/null +++ b/v2/pkg/kafka/consumer.go @@ -0,0 +1,77 @@ +package kafka + +import ( + "context" + "github.com/IBM/sarama" + "go.uber.org/zap" +) + +type Consumer interface { + Consume(ctx context.Context, l *zap.SugaredLogger, topic string, ch chan<- *sarama.ConsumerMessage) error +} + +type SaramaConsumer struct { + group sarama.ConsumerGroup +} + +func NewConsumer(config *Config, consumerGroup string) (*SaramaConsumer, error) { + c := sarama.NewConfig() + + c.Consumer.Return.Errors = true + c.Net.SASL.Enable = config.UseAuthentication + c.Net.SASL.User = config.Username + c.Net.SASL.Password = config.Password + + group, err := sarama.NewConsumerGroup(config.Addresses, consumerGroup, c) + if err != nil { + return nil, err + } + + return &SaramaConsumer{ + group: group, + }, nil +} + +func (c *SaramaConsumer) Consume(ctx context.Context, l *zap.SugaredLogger, topic string, ch chan<- *sarama.ConsumerMessage) error { + messageHandler := newConsumerGroupHandler(ch) + defer close(ch) + for { + select { + case <-ctx.Done(): + l.Infow("receive stop signal", "topic", topic) + return nil + default: + err := c.group.Consume(ctx, []string{topic}, messageHandler) + if err != nil { + l.Errorw("error when consume messages", "topic", topic, "error", err) + return err + } + } + } +} + +type consumerGroupHandler struct { + msgChan chan<- *sarama.ConsumerMessage +} + +func newConsumerGroupHandler(msgChan chan<- *sarama.ConsumerMessage) *consumerGroupHandler { + return &consumerGroupHandler{ + msgChan: msgChan, + } +} + +func (h *consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (h *consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } + +// ConsumeClaim consumes messages from the claim and pushes them to the channel. +// It manually commits the offset once the message is processed. +func (h *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + // Push the message to the channel + h.msgChan <- msg + + // Acknowledge the message by marking it as processed (commit the offset) + session.MarkMessage(msg, "") + } + return nil +}