From c67505a6529d906a08d2993d75f7447b71cee057 Mon Sep 17 00:00:00 2001 From: Nguyen Thuy Linh Date: Fri, 25 Oct 2024 10:26:19 +0700 Subject: [PATCH] add contract address to websocket subscription filter --- README.md | 61 ++++++++++++++++++++++ internal/server/tradelogs/broadcaster.go | 66 ++++++++++++++---------- internal/server/tradelogs/server.go | 7 +-- 3 files changed, 102 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 2d720cd..aa3e63e 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,67 @@ This server serve the endpoints to manage backfill task: list, create, cancel, r } ``` +## Broadcast Server API Documentation + +This server serve the endpoints to subscribe trade logs +### **WebSocket Connection Creation** + +- **URL**: `/eventlogws` +- **Method**: `GET` +- **Description**: Connect WebSocket. +- **Request Params**: + - `id` (string, optional): The ID of websocket connection, also the consumer group ID. + Empty if you need to create new connection and subscribe from the newest offset. + Else, pass the ID to continue from the most recently event. + - `maker` (string, optional): The maker you want to subscribe. + - `taker` (string, optional): The taker you want to subscribe. + - `maker_token` (string, optional): The maker token you want to subscribe. + - `taker_token` (string, optional): The taker token you want to subscribe. + - `event_hash` (string, optional): The event hash you want to subscribe. + + The filter will combine non-empty fields (AND). If no field are pass, all the trade logs will be sent. +- **Response**: + - **ID Message**: With new connection, the first message will be the ID of session. + ```json + { + "id": "" + } + ``` + - **Trade log Message**: with type `trade_log` and the data containing trade log information + ```json + { + "type": "trade_log", + "data": { + "order_hash": "0xcccf91f6cc3f636f0c7864ae8bcbc7cddbb54971997f90d81891a139c36c33e9", + "maker": "0x51C72848c68a965f66FA7a88855F9f7784502a7F", + "taker": "0x22F9dCF4647084d6C31b2765F6910cd85C178C18", + "maker_token": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", + "taker_token": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + "maker_token_amount": "20210895540598740", + "taker_token_amount": "50995428", + "contract_address": "0xDef1C0ded9bec7F1a1670819833240f027b25EfF", + "block_number": 21034739, + "tx_hash": "0x6926077a7cfdc78cd1649c43e85da918f10b4ee0c6e930d5107a50de0c9d0837", + "log_index": 489, + "timestamp": 1729764251000, + "event_hash": "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f", + "expiration_date": 1729764301, + "maker_token_price": 0, + "taker_token_price": 0, + "maker_usd_amount": 0, + "taker_usd_amount": 0, + "state": "" + } + } + ``` + - **Revert Message**: with type `revert` and data containing the block numbers of reverted blocks + ```json + { + "type": "revert", + "data": [11, 12, 13] + } + ``` + ## Re-generate mock file First, you need to install `mockery` diff --git a/internal/server/tradelogs/broadcaster.go b/internal/server/tradelogs/broadcaster.go index 0cd1dc9..1ab5aa9 100644 --- a/internal/server/tradelogs/broadcaster.go +++ b/internal/server/tradelogs/broadcaster.go @@ -1,7 +1,6 @@ package server import ( - "fmt" "strings" "sync" "time" @@ -13,23 +12,28 @@ import ( ) type Con struct { - id string - ws *websocket.Conn - eventHash string - maker string + id string + ws *websocket.Conn + param RegisterRequest } type Broadcaster struct { mu sync.Mutex l *zap.SugaredLogger - clients map[string]map[string]Con + clients map[RegisterRequest]map[string]Con tradeLogChan chan storage.TradeLog } +type RegisterRequest struct { + EventHash string `form:"event_hash"` + Maker string `form:"maker"` + ContractAddress string `form:"contract_address"` +} + func NewBroadcaster(tradeChan chan storage.TradeLog) *Broadcaster { return &Broadcaster{ l: zap.S(), - clients: make(map[string]map[string]Con), + clients: make(map[RegisterRequest]map[string]Con), tradeLogChan: tradeChan, } } @@ -40,21 +44,17 @@ func (b *Broadcaster) BroadcastLog() { } } -func (b *Broadcaster) newConn(event, maker string, conn *websocket.Conn) { +func (b *Broadcaster) newConn(param RegisterRequest, conn *websocket.Conn) { id := xid.New().String() b.l.Infow("connected socket", "id", id) go func() { msgType, msg, err := conn.ReadMessage() b.l.Infow("read msg result", "id", id, "msgType", msgType, "msg", msg, "err", err) if err != nil { - b.removeConn(conn, id, event, maker) + b.removeConn(conn, id, param) } }() - b.addConn(conn, id, event, maker) -} - -func combine(event, maker string) string { - return fmt.Sprintf("%s-%s", event, maker) + b.addConn(conn, id, param) } func (b *Broadcaster) Test() { @@ -66,11 +66,12 @@ func (b *Broadcaster) Test() { } } -func (b *Broadcaster) removeConn(conn *websocket.Conn, id, event, maker string) { +func (b *Broadcaster) removeConn(conn *websocket.Conn, id string, param RegisterRequest) { b.mu.Lock() defer b.mu.Unlock() - maker = strings.ToLower(maker) - e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)] + param.Maker = strings.ToLower(param.Maker) + param.ContractAddress = strings.ToLower(param.ContractAddress) + e, ok := b.clients[param] if !ok { return } @@ -78,28 +79,41 @@ func (b *Broadcaster) removeConn(conn *websocket.Conn, id, event, maker string) delete(e, id) } -func (b *Broadcaster) addConn(conn *websocket.Conn, id, event, maker string) { +func (b *Broadcaster) addConn(conn *websocket.Conn, id string, param RegisterRequest) { b.mu.Lock() defer b.mu.Unlock() - maker = strings.ToLower(maker) - cons, ok := b.clients[combine(event, maker)] + param.Maker = strings.ToLower(param.Maker) + param.ContractAddress = strings.ToLower(param.ContractAddress) + cons, ok := b.clients[param] if !ok { cons = map[string]Con{} } cons[id] = Con{ - id: id, - ws: conn, - maker: maker, - eventHash: event, + id: id, + ws: conn, + param: param, } - b.clients[combine(event, maker)] = cons + b.clients[param] = cons } func (b *Broadcaster) writeEvent(log storage.TradeLog) { b.mu.Lock() defer b.mu.Unlock() maker := strings.ToLower(log.Maker) - cons := b.clients[combine(log.EventHash, maker)] + contractAddress := strings.ToLower(log.ContractAddress) + + cons := b.clients[RegisterRequest{EventHash: log.EventHash, Maker: maker}] + for _, c := range cons { + if err := c.ws.WriteJSON(log); err != nil { + b.l.Errorw("error when send msg", "err", err) + } + } + + if len(contractAddress) == 0 { + return + } + + cons = b.clients[RegisterRequest{EventHash: log.EventHash, Maker: maker, ContractAddress: contractAddress}] for _, c := range cons { if err := c.ws.WriteJSON(log); err != nil { b.l.Errorw("error when send msg", "err", err) diff --git a/internal/server/tradelogs/server.go b/internal/server/tradelogs/server.go index 4c1a668..6376285 100644 --- a/internal/server/tradelogs/server.go +++ b/internal/server/tradelogs/server.go @@ -25,11 +25,6 @@ var ( } ) -type RegisterRequest struct { - EventHash string `form:"event_hash"` - Maker string `form:"maker"` -} - // Server to serve the service. type Server struct { r *gin.Engine @@ -120,5 +115,5 @@ func (s *Server) registerEventLogWS(c *gin.Context) { responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws")) return } - s.bc.newConn(param.EventHash, param.Maker, conn) + s.bc.newConn(param, conn) }