diff --git a/internal/server/tradelogs/broadcaster.go b/internal/server/tradelogs/broadcaster.go index e0630b7..72b0a7e 100644 --- a/internal/server/tradelogs/broadcaster.go +++ b/internal/server/tradelogs/broadcaster.go @@ -35,35 +35,50 @@ func NewBroadcaster(tradeChan chan storage.TradeLog) *Broadcaster { func (b *Broadcaster) BroadcastLog() { for log := range b.tradeLogChan { - b.mu.Lock() - cons := b.clients[combine(log.EventHash, log.Maker)] - for _, c := range cons { - if err := c.ws.WriteJSON(log); err != nil { - b.l.Errorw("error when send msg", "err", err) - } - } - b.mu.Unlock() + b.writeEvent(log) } } -func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) { +func (b *Broadcaster) newConn(event, maker string, conn *websocket.Conn) { id := xid.New().String() b.l.Infow("connected socket", "id", id) go func() { msgType, msg, err := conn.ReadMessage() b.l.Infow("read msg result", "id", id, "msgType", msgType, "msg", msg, "err", err) if err != nil { - b.mu.Lock() - e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)] - if !ok { - return - } - conn.Close() - delete(e, id) - b.mu.Unlock() + b.removeConn(conn, id, event, maker) } }() + b.addConn(conn, id, event, maker) +} + +func combine(event, maker string) string { + return fmt.Sprintf("%s-%s", event, maker) +} + +func (b *Broadcaster) Test() { + for range time.NewTicker(time.Second * 5).C { + b.tradeLogChan <- storage.TradeLog{ + EventHash: "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f", + Maker: "0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38", + } + } +} + +func (b *Broadcaster) removeConn(conn *websocket.Conn, id, event, maker string) { b.mu.Lock() + defer b.mu.Unlock() + e, ok := b.clients[fmt.Sprintf("%s-%s", event, maker)] + if !ok { + return + } + conn.Close() + delete(e, id) +} + +func (b *Broadcaster) addConn(conn *websocket.Conn, id, event, maker string) { + b.mu.Lock() + defer b.mu.Unlock() cons, ok := b.clients[combine(event, maker)] if !ok { cons = map[string]Con{} @@ -75,18 +90,15 @@ func (b *Broadcaster) addConn(event, maker string, conn *websocket.Conn) { eventHash: event, } b.clients[combine(event, maker)] = cons - b.mu.Unlock() } -func combine(event, maker string) string { - return fmt.Sprintf("%s-%s", event, maker) -} - -func (b *Broadcaster) Test() { - for range time.NewTicker(time.Second * 5).C { - b.tradeLogChan <- storage.TradeLog{ - EventHash: "0xac75f773e3a92f1a02b12134d65e1f47f8a14eabe4eaf1e24624918e6a8b269f", - Maker: "0x807cF9A772d5a3f9CeFBc1192e939D62f0D9bD38", +func (b *Broadcaster) writeEvent(log storage.TradeLog) { + b.mu.Lock() + defer b.mu.Unlock() + cons := b.clients[combine(log.EventHash, log.Maker)] + for _, c := range cons { + if err := c.ws.WriteJSON(log); err != nil { + b.l.Errorw("error when send msg", "err", err) } } } diff --git a/internal/server/tradelogs/server.go b/internal/server/tradelogs/server.go index 6225f63..4c1a668 100644 --- a/internal/server/tradelogs/server.go +++ b/internal/server/tradelogs/server.go @@ -120,5 +120,5 @@ func (s *Server) registerEventLogWS(c *gin.Context) { responseErr(c, http.StatusInternalServerError, fmt.Errorf("can't create ws")) return } - s.bc.addConn(param.EventHash, param.Maker, conn) + s.bc.newConn(param.EventHash, param.Maker, conn) }