Skip to content

Commit

Permalink
feat: gob json encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Nov 14, 2024
1 parent df03c76 commit 97d10f9
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 34 deletions.
17 changes: 14 additions & 3 deletions internal/msggateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type Client struct {
IsCompress bool `json:"isCompress"`
UserID string `json:"userID"`
IsBackground bool `json:"isBackground"`
SDKType string `json:"sdkType"`
Encoder Encoder
ctx *UserConnContext
longConnServer LongConnServer
closed atomic.Bool
Expand All @@ -82,7 +84,7 @@ type Client struct {
}

// ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer) {
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer, sdkType string) {
c.w = new(sync.Mutex)
c.conn = conn
c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID())
Expand All @@ -95,11 +97,20 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.closed.Store(false)
c.closedErr = nil
c.token = ctx.GetToken()
c.SDKType = sdkType
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
c.subLock = new(sync.Mutex)
if c.subUserIDs != nil {
clear(c.subUserIDs)
}
if c.SDKType == "" {
c.SDKType = GoSDK
}
if c.SDKType == GoSDK {
c.Encoder = NewGobEncoder()
} else {
c.Encoder = NewJsonEncoder()
}
c.subUserIDs = make(map[string]struct{})
}

Expand Down Expand Up @@ -192,7 +203,7 @@ func (c *Client) handleMessage(message []byte) error {
var binaryReq = getReq()
defer freeReq(binaryReq)

err := c.longConnServer.Decode(message, binaryReq)
err := c.Encoder.Decode(message, binaryReq)
if err != nil {
return err
}
Expand Down Expand Up @@ -339,7 +350,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
return nil
}

encodedBuf, err := c.longConnServer.Encode(resp)
encodedBuf, err := c.Encoder.Encode(resp)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions internal/msggateway/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ const (
GzipCompressionProtocol = "gzip"
BackgroundStatus = "isBackground"
SendResponse = "isMsgResp"
SDKType = "sdkType"
)

const (
GoSDK = "go"
JsSDK = "js"
)

const (
Expand Down
6 changes: 5 additions & 1 deletion internal/msggateway/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func (c *UserConnContext) ParseEssentialArgs() error {
_, err := strconv.Atoi(platformIDStr)
if err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int")

}
switch sdkType, _ := c.Query(SDKType); sdkType {
case "", GoSDK, JsSDK:
default:
return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js")
}
return nil
}
38 changes: 32 additions & 6 deletions internal/msggateway/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package msggateway

import (
"bytes"
"encoding/gob"
"encoding/json"

"github.com/openimsdk/tools/errs"
Expand All @@ -27,22 +29,46 @@ type Encoder interface {

type GobEncoder struct{}

func NewGobEncoder() *GobEncoder {
return &GobEncoder{}
func NewGobEncoder() Encoder {
return GobEncoder{}
}

func (g *GobEncoder) Encode(data any) ([]byte, error) {
func (g GobEncoder) Encode(data any) ([]byte, error) {
var buff bytes.Buffer
enc := gob.NewEncoder(&buff)
if err := enc.Encode(data); err != nil {
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode")
}
return buff.Bytes(), nil
}

func (g GobEncoder) Decode(encodeData []byte, decodeData any) error {
buff := bytes.NewBuffer(encodeData)
dec := gob.NewDecoder(buff)
if err := dec.Decode(decodeData); err != nil {
return errs.WrapMsg(err, "GobEncoder.Decode failed", "action", "decode")
}
return nil
}

type JsonEncoder struct{}

func NewJsonEncoder() Encoder {
return JsonEncoder{}
}

func (g JsonEncoder) Encode(data any) ([]byte, error) {
b, err := json.Marshal(data)
if err != nil {
return nil, errs.New("Encoder.Encode failed", "action", "encode")
return nil, errs.New("JsonEncoder.Encode failed", "action", "encode")
}
return b, nil
}

func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
func (g JsonEncoder) Decode(encodeData []byte, decodeData any) error {
err := json.Unmarshal(encodeData, decodeData)
if err != nil {
return errs.New("Encoder.Decode failed", "action", "decode")
return errs.New("JsonEncoder.Decode failed", "action", "decode")
}
return nil
}
15 changes: 3 additions & 12 deletions internal/msggateway/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,11 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
return s
}

func (s *Server) OnlinePushMsg(
context context.Context,
req *msggateway.OnlinePushMsgReq,
) (*msggateway.OnlinePushMsgResp, error) {
func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
panic("implement me")
}

func (s *Server) GetUsersOnlineStatus(
ctx context.Context,
req *msggateway.GetUsersOnlineStatusReq,
) (*msggateway.GetUsersOnlineStatusResp, error) {
func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) {
if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
return nil, errs.ErrNoPermission.WrapMsg("only app manager")
}
Expand Down Expand Up @@ -221,10 +215,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga
}
}

func (s *Server) KickUserOffline(
ctx context.Context,
req *msggateway.KickUserOfflineReq,
) (*msggateway.KickUserOfflineResp, error) {
func (s *Server) KickUserOffline(ctx context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) {
for _, v := range req.KickUserIDList {
clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID))
if !ok {
Expand Down
16 changes: 4 additions & 12 deletions internal/msggateway/ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type LongConnServer interface {
SetKickHandlerInfo(i *kickHandler)
SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error)
Compressor
Encoder
MessageHandler
}

Expand All @@ -61,7 +60,7 @@ type WsServer struct {
authClient *rpcclient.Auth
disCov discovery.SvcDiscoveryRegistry
Compressor
Encoder
//Encoder
MessageHandler
webhookClient *webhook.Client
}
Expand Down Expand Up @@ -135,7 +134,6 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
clients: newUserMap(),
subscription: newSubscription(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
}
}
Expand Down Expand Up @@ -278,14 +276,7 @@ func (ws *WsServer) registerClient(client *Client) {

wg.Wait()

log.ZDebug(
client.ctx,
"user online",
"online user Num",
ws.onlineUserNum.Load(),
"online user conn Num",
ws.onlineUserConnNum.Load(),
)
log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load())
}

func getRemoteAdders(client []*Client) string {
Expand Down Expand Up @@ -484,7 +475,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {

// Retrieve a client object from the client pool, reset its state, and associate it with the current WebSocket long connection
client := ws.clientPool.Get().(*Client)
client.ResetClient(connContext, wsLongConn, ws)
sdkType, _ := connContext.Query(SDKType)
client.ResetClient(connContext, wsLongConn, ws, sdkType)

// Register the client with the server and start message processing
ws.registerChan <- client
Expand Down

0 comments on commit 97d10f9

Please sign in to comment.