Skip to content

Commit

Permalink
add contract address to websocket subscription filter
Browse files Browse the repository at this point in the history
  • Loading branch information
linhnt3400 committed Oct 25, 2024
1 parent 7284612 commit c67505a
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 32 deletions.
61 changes: 61 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,67 @@ This server serve the endpoints to manage backfill task: list, create, cancel, r
}
```

## Broadcast Server API Documentation

This server serve the endpoints to subscribe trade logs
### **WebSocket Connection Creation**

- **URL**: `/eventlogws`
- **Method**: `GET`
- **Description**: Connect WebSocket.
- **Request Params**:
- `id` (string, optional): The ID of websocket connection, also the consumer group ID.
Empty if you need to create new connection and subscribe from the newest offset.
Else, pass the ID to continue from the most recently event.
- `maker` (string, optional): The maker you want to subscribe.
- `taker` (string, optional): The taker you want to subscribe.
- `maker_token` (string, optional): The maker token you want to subscribe.
- `taker_token` (string, optional): The taker token you want to subscribe.
- `event_hash` (string, optional): The event hash you want to subscribe.

The filter will combine non-empty fields (AND). If no field are pass, all the trade logs will be sent.
- **Response**:
- **ID Message**: With new connection, the first message will be the ID of session.
```json
{
"id": "<session_id>"
}
```
- **Trade log Message**: with type `trade_log` and the data containing trade log information
```json
{
"type": "trade_log",
"data": {
"order_hash": "0xcccf91f6cc3f636f0c7864ae8bcbc7cddbb54971997f90d81891a139c36c33e9",
"maker": "0x51C72848c68a965f66FA7a88855F9f7784502a7F",
"taker": "0x22F9dCF4647084d6C31b2765F6910cd85C178C18",
"maker_token": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
"taker_token": "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48",
"maker_token_amount": "20210895540598740",
"taker_token_amount": "50995428",
"contract_address": "0xDef1C0ded9bec7F1a1670819833240f027b25EfF",
"block_number": 21034739,
"tx_hash": "0x6926077a7cfdc78cd1649c43e85da918f10b4ee0c6e930d5107a50de0c9d0837",
"log_index": 489,
"timestamp": 1729764251000,
"event_hash": "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f",
"expiration_date": 1729764301,
"maker_token_price": 0,
"taker_token_price": 0,
"maker_usd_amount": 0,
"taker_usd_amount": 0,
"state": ""
}
}
```
- **Revert Message**: with type `revert` and data containing the block numbers of reverted blocks
```json
{
"type": "revert",
"data": [11, 12, 13]
}
```

## Re-generate mock file

First, you need to install `mockery`
Expand Down
66 changes: 40 additions & 26 deletions internal/server/tradelogs/broadcaster.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package server

import (
"fmt"
"strings"
"sync"
"time"
Expand All @@ -13,23 +12,28 @@ import (
)

type Con struct {
id string
ws *websocket.Conn
eventHash string
maker string
id string
ws *websocket.Conn
param RegisterRequest
}

type Broadcaster struct {
mu sync.Mutex
l *zap.SugaredLogger
clients map[string]map[string]Con
clients map[RegisterRequest]map[string]Con
tradeLogChan chan storage.TradeLog
}

type RegisterRequest struct {
EventHash string `form:"event_hash"`
Maker string `form:"maker"`
ContractAddress string `form:"contract_address"`
}

func NewBroadcaster(tradeChan chan storage.TradeLog) *Broadcaster {
return &Broadcaster{
l: zap.S(),
clients: make(map[string]map[string]Con),
clients: make(map[RegisterRequest]map[string]Con),
tradeLogChan: tradeChan,
}
}
Expand All @@ -40,21 +44,17 @@ func (b *Broadcaster) BroadcastLog() {
}
}

func (b *Broadcaster) newConn(event, maker string, conn *websocket.Conn) {
func (b *Broadcaster) newConn(param RegisterRequest, conn *websocket.Conn) {
id := xid.New().String()
b.l.Infow("connected socket", "id", id)
go func() {
msgType, msg, err := conn.ReadMessage()
b.l.Infow("read msg result", "id", id, "msgType", msgType, "msg", msg, "err", err)
if err != nil {
b.removeConn(conn, id, event, maker)
b.removeConn(conn, id, param)
}
}()
b.addConn(conn, id, event, maker)
}

func combine(event, maker string) string {
return fmt.Sprintf("%s-%s", event, maker)
b.addConn(conn, id, param)
}

func (b *Broadcaster) Test() {
Expand All @@ -66,40 +66,54 @@ func (b *Broadcaster) Test() {
}
}

func (b *Broadcaster) removeConn(conn *websocket.Conn, id, event, maker string) {
func (b *Broadcaster) removeConn(conn *websocket.Conn, id string, param RegisterRequest) {
b.mu.Lock()
defer b.mu.Unlock()
maker = strings.ToLower(maker)
e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)]
param.Maker = strings.ToLower(param.Maker)
param.ContractAddress = strings.ToLower(param.ContractAddress)
e, ok := b.clients[param]
if !ok {
return
}
conn.Close()
delete(e, id)
}

func (b *Broadcaster) addConn(conn *websocket.Conn, id, event, maker string) {
func (b *Broadcaster) addConn(conn *websocket.Conn, id string, param RegisterRequest) {
b.mu.Lock()
defer b.mu.Unlock()
maker = strings.ToLower(maker)
cons, ok := b.clients[combine(event, maker)]
param.Maker = strings.ToLower(param.Maker)
param.ContractAddress = strings.ToLower(param.ContractAddress)
cons, ok := b.clients[param]
if !ok {
cons = map[string]Con{}
}
cons[id] = Con{
id: id,
ws: conn,
maker: maker,
eventHash: event,
id: id,
ws: conn,
param: param,
}
b.clients[combine(event, maker)] = cons
b.clients[param] = cons
}

func (b *Broadcaster) writeEvent(log storage.TradeLog) {
b.mu.Lock()
defer b.mu.Unlock()
maker := strings.ToLower(log.Maker)
cons := b.clients[combine(log.EventHash, maker)]
contractAddress := strings.ToLower(log.ContractAddress)

cons := b.clients[RegisterRequest{EventHash: log.EventHash, Maker: maker}]
for _, c := range cons {
if err := c.ws.WriteJSON(log); err != nil {
b.l.Errorw("error when send msg", "err", err)
}
}

if len(contractAddress) == 0 {
return
}

cons = b.clients[RegisterRequest{EventHash: log.EventHash, Maker: maker, ContractAddress: contractAddress}]
for _, c := range cons {
if err := c.ws.WriteJSON(log); err != nil {
b.l.Errorw("error when send msg", "err", err)
Expand Down
7 changes: 1 addition & 6 deletions internal/server/tradelogs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ var (
}
)

type RegisterRequest struct {
EventHash string `form:"event_hash"`
Maker string `form:"maker"`
}

// Server to serve the service.
type Server struct {
r *gin.Engine
Expand Down Expand Up @@ -120,5 +115,5 @@ func (s *Server) registerEventLogWS(c *gin.Context) {
responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws"))
return
}
s.bc.newConn(param.EventHash, param.Maker, conn)
s.bc.newConn(param, conn)
}

0 comments on commit c67505a

Please sign in to comment.