Skip to content
This repository has been archived by the owner on Sep 29, 2024. It is now read-only.

Commit

Permalink
add errors and cast check (#468)
Browse files Browse the repository at this point in the history
* refactor conn.go method(serveRead)

* fixed issue#444 golangci-lint error

* try my best to avoid ignoring error(exclude test file)

* update README.md,change to check error

* update README.md,change to check error

* revert golangci-lint

* revert golangci-lint code

* revert golangci-lint code

* add error msg

* fixed by comments

* fixed by comments

Co-authored-by: ralphchen <Wwkkvikthh123>
  • Loading branch information
ralfbawg authored May 18, 2021
1 parent 0738184 commit 0e602dc
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 16 deletions.
2 changes: 1 addition & 1 deletion engineio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ func main() {

## License

The 3-clause BSD License - see LICENSE for more details
The 3-clause BSD License - see [LICENSE](https://opensource.org/licenses/BSD-3-Clause) for more details
12 changes: 10 additions & 2 deletions engineio/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package engineio

import (
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -66,10 +67,14 @@ func (c *client) NextReader() (session.FrameType, io.ReadCloser, error) {

switch pt {
case packet.PONG:
c.conn.SetReadDeadline(time.Now().Add(c.params.PingInterval + c.params.PingTimeout))
if err = c.conn.SetReadDeadline(time.Now().Add(c.params.PingInterval + c.params.PingTimeout)); err != nil {
return 0, nil, err
}

case packet.CLOSE:
c.Close()
return 0, nil, io.EOF

case packet.MESSAGE:
return session.FrameType(ft), r, nil
}
Expand Down Expand Up @@ -116,6 +121,9 @@ func (c *client) serve() {
if err := w.Close(); err != nil {
return
}
c.conn.SetWriteDeadline(time.Now().Add(c.params.PingInterval + c.params.PingTimeout))

if err = c.conn.SetWriteDeadline(time.Now().Add(c.params.PingInterval + c.params.PingTimeout)); err != nil {
fmt.Printf("set writer's deadline error,msg:%s\n", err.Error())
}
}
}
6 changes: 5 additions & 1 deletion engineio/transport/polling/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package polling

import (
"bytes"
"fmt"
"html/template"
"net"
"net/http"
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *serverConn) SetHeaders(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-XSS-Protection", "0")
}

//just in case the default behaviour gets changed and it has to handle an origin check
// just in case the default behaviour gets changed and it has to handle an origin check
checkOrigin := Default.CheckOrigin
if c.transport.CheckOrigin != nil {
checkOrigin = c.transport.CheckOrigin
Expand Down Expand Up @@ -136,6 +137,9 @@ func (c *serverConn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

_, err = w.Write([]byte("ok"))
if err != nil {
fmt.Printf("ack post err=%s\n", err.Error())
}

default:
http.Error(w, "invalid method", http.StatusBadRequest)
Expand Down
1 change: 0 additions & 1 deletion namespace_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,4 @@ func (nc *namespaceConn) dispatch(header parser.Header) {
return
}
}
return
}
37 changes: 26 additions & 11 deletions redis_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,13 @@ func newRedisBroadcast(nsp string, adapter *RedisAdapterOptions) (*redisBroadcas
bc.resChannel = bc.prefix + "-response#" + bc.nsp
bc.requests = make(map[string]interface{})

bc.sub.PSubscribe(bc.prefix + "#" + bc.nsp + "#*")
bc.sub.Subscribe(bc.reqChannel, bc.resChannel)
if err = bc.sub.PSubscribe(bc.prefix + "#" + bc.nsp + "#*"); err != nil {
return nil, err
}

if err = bc.sub.Subscribe(bc.reqChannel, bc.resChannel); err != nil {
return nil, err
}

go func() {
for {
Expand All @@ -144,7 +149,10 @@ func newRedisBroadcast(nsp string, adapter *RedisAdapterOptions) (*redisBroadcas
break
}

bc.onMessage(m.Channel, m.Data)
err = bc.onMessage(m.Channel, m.Data)
if err != nil {
return
}
case redis.Subscription:
if m.Count == 0 {
return
Expand Down Expand Up @@ -172,9 +180,13 @@ func (bc *redisBroadcast) AllRooms() []string {
req.done = make(chan bool, 1)

bc.requests[req.RequestID] = &req
bc.pub.Conn.Do("PUBLISH", bc.reqChannel, reqJSON)
_, err := bc.pub.Conn.Do("PUBLISH", bc.reqChannel, reqJSON)
if err != nil {
return []string{} // if error occurred,return empty
}

<-req.done

rooms := make([]string, 0, len(req.rooms))
for room := range req.rooms {
rooms = append(rooms, room)
Expand Down Expand Up @@ -338,6 +350,9 @@ func (bc *redisBroadcast) onMessage(channel string, msg []byte) error {
}

event, ok := opts[1].(string)
if !ok {
return errors.New("invalid event")
}

if room != "" {
bc.send(room, event, args...)
Expand All @@ -355,19 +370,20 @@ func (bc *redisBroadcast) getNumSub(channel string) (int, error) {
return 0, err
}

var numSub64 int64
numSub64 = rs.([]interface{})[1].(int64)
return int(numSub64), nil
numSub64, ok := rs.([]interface{})[1].(int)
if !ok {
return 0, errors.New("redis reply cast to int error")
}
return numSub64, nil
}

// Handle request from redis channel.
func (bc *redisBroadcast) onRequest(msg []byte) {
var req map[string]string
err := json.Unmarshal(msg, &req)
if err != nil {

if err := json.Unmarshal(msg, &req); err != nil {
return
}
// log.Println("on request:", req)

var res interface{}
switch req["RequestType"] {
Expand Down Expand Up @@ -417,7 +433,6 @@ func (bc *redisBroadcast) onResponse(msg []byte) {
return
}

// log.Println("on resp:", res)
switch res["RequestType"] {
case roomLenReqType:
roomLenReq := req.(*roomLenRequest)
Expand Down

0 comments on commit 0e602dc

Please sign in to comment.