Skip to content

Commit

Permalink
fix ping/pong handler
Browse files Browse the repository at this point in the history
  • Loading branch information
totegamma committed Nov 6, 2023
1 parent f13de5a commit fe4f8b1
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion x/socket/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (

var ctx = context.Background()

var (
pingInterval = 10 * time.Second
disconnectTimeout = 30 * time.Second
)

type Manager interface {
Subscribe(conn *websocket.Conn, streams []string)
}
Expand Down Expand Up @@ -208,7 +213,7 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) {

// goroutine for relay messages to clients
go func(c *websocket.Conn, messageChan <-chan []byte) {
pingTicker := time.NewTicker(10 * time.Second)
pingTicker := time.NewTicker(pingInterval)
defer func() {
if c != nil {
c.Close()
Expand All @@ -217,6 +222,14 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) {
delete(m.remoteConns, domain)
log.Printf("##### remote connection closed: %s", domain)
}()

var lastPong time.Time = time.Now()
c.SetPongHandler(func(string) error {
log.Printf("pong received: %s", domain)
lastPong = time.Now()
return nil
})

for {
select {
case message := <-messageChan:
Expand Down Expand Up @@ -264,10 +277,15 @@ func (m *manager) RemoteSubRoutine(domain string, streams []string) {
}
}
case <-pingTicker.C:
log.Printf("ping sent: %s", domain)
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Printf("fail to send ping message: %v", err)
return
}
if lastPong.Before(time.Now().Add(-disconnectTimeout)) {
log.Printf("pong timeout: %s", domain)
return
}
}
}
}(c, messageChan)
Expand Down

0 comments on commit fe4f8b1

Please sign in to comment.