Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/v3.8-js-sdk-only'
Browse files Browse the repository at this point in the history
# Conflicts:
#	go.mod
#	go.sum
  • Loading branch information
withchao committed Nov 14, 2024
2 parents caafefd + 97d10f9 commit 789a215
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 43 deletions.
3 changes: 2 additions & 1 deletion config/openim-msggateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ longConnSvr:
# WebSocket connection handshake timeout in seconds
websocketTimeout: 10


# 1: For Android, iOS, Windows, Mac, and web platforms, only one instance can be online at a time
multiLoginPolicy: 1
2 changes: 1 addition & 1 deletion config/openim-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ prometheus:
# Enable or disable Prometheus monitoring
enable: true
# List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup
ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12182, 12183, 12184, 12185, 12186 ]
ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12181, 12182, 12183, 12184, 12185 ]

maxConcurrentWorkers: 3
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.
Expand Down
2 changes: 1 addition & 1 deletion config/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ scrape_configs:
- job_name: openimserver-openim-push
static_configs:
- targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ]
# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ]
# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12181, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185 ]
labels:
namespace: default
- job_name: openimserver-openim-rpc-auth
Expand Down
50 changes: 44 additions & 6 deletions internal/msggateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msggateway

import (
"context"
"encoding/json"
"fmt"
"runtime/debug"
"sync"
Expand Down Expand Up @@ -69,6 +70,8 @@ type Client struct {
IsCompress bool `json:"isCompress"`
UserID string `json:"userID"`
IsBackground bool `json:"isBackground"`
SDKType string `json:"sdkType"`
Encoder Encoder
ctx *UserConnContext
longConnServer LongConnServer
closed atomic.Bool
Expand All @@ -81,7 +84,7 @@ type Client struct {
}

// ResetClient updates the client's state with new connection and context information.
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer) {
func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer LongConnServer, sdkType string) {
c.w = new(sync.Mutex)
c.conn = conn
c.PlatformID = stringutil.StringToInt(ctx.GetPlatformID())
Expand All @@ -94,11 +97,20 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer
c.closed.Store(false)
c.closedErr = nil
c.token = ctx.GetToken()
c.SDKType = sdkType
c.hbCtx, c.hbCancel = context.WithCancel(c.ctx)
c.subLock = new(sync.Mutex)
if c.subUserIDs != nil {
clear(c.subUserIDs)
}
if c.SDKType == "" {
c.SDKType = GoSDK
}
if c.SDKType == GoSDK {
c.Encoder = NewGobEncoder()
} else {
c.Encoder = NewJsonEncoder()
}
c.subUserIDs = make(map[string]struct{})
}

Expand Down Expand Up @@ -159,9 +171,12 @@ func (c *Client) readMessage() {
return
}
case MessageText:
c.closedErr = ErrNotSupportMessageProtocol
return

_ = c.conn.SetReadDeadline(pongWait)
parseDataErr := c.handlerTextMessage(message)
if parseDataErr != nil {
c.closedErr = parseDataErr
return
}
case PingMessage:
err := c.writePongMsg("")
log.ZError(c.ctx, "writePongMsg", err)
Expand All @@ -188,7 +203,7 @@ func (c *Client) handleMessage(message []byte) error {
var binaryReq = getReq()
defer freeReq(binaryReq)

err := c.longConnServer.Decode(message, binaryReq)
err := c.Encoder.Decode(message, binaryReq)
if err != nil {
return err
}
Expand Down Expand Up @@ -335,7 +350,7 @@ func (c *Client) writeBinaryMsg(resp Resp) error {
return nil
}

encodedBuf, err := c.longConnServer.Encode(resp)
encodedBuf, err := c.Encoder.Encode(resp)
if err != nil {
return err
}
Expand Down Expand Up @@ -419,3 +434,26 @@ func (c *Client) writePongMsg(appData string) error {

return errs.Wrap(err)
}

func (c *Client) handlerTextMessage(b []byte) error {
var msg TextMessage
if err := json.Unmarshal(b, &msg); err != nil {
return err
}
switch msg.Type {
case TextPong:
return nil
case TextPing:
msg.Type = TextPong
msgData, err := json.Marshal(msg)
if err != nil {
return err
}
if err := c.conn.SetWriteDeadline(writeWait); err != nil {
return err
}
return c.conn.WriteMessage(MessageText, msgData)
default:
return fmt.Errorf("not support message type %s", msg.Type)
}
}
6 changes: 6 additions & 0 deletions internal/msggateway/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ const (
GzipCompressionProtocol = "gzip"
BackgroundStatus = "isBackground"
SendResponse = "isMsgResp"
SDKType = "sdkType"
)

const (
GoSDK = "go"
JsSDK = "js"
)

const (
Expand Down
6 changes: 5 additions & 1 deletion internal/msggateway/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func (c *UserConnContext) ParseEssentialArgs() error {
_, err := strconv.Atoi(platformIDStr)
if err != nil {
return servererrs.ErrConnArgsErr.WrapMsg("platformID is not int")

}
switch sdkType, _ := c.Query(SDKType); sdkType {
case "", GoSDK, JsSDK:
default:
return servererrs.ErrConnArgsErr.WrapMsg("sdkType is not go or js")
}
return nil
}
33 changes: 28 additions & 5 deletions internal/msggateway/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package msggateway
import (
"bytes"
"encoding/gob"
"encoding/json"

"github.com/openimsdk/tools/errs"
)
Expand All @@ -28,24 +29,46 @@ type Encoder interface {

type GobEncoder struct{}

func NewGobEncoder() *GobEncoder {
return &GobEncoder{}
func NewGobEncoder() Encoder {
return GobEncoder{}
}

func (g *GobEncoder) Encode(data any) ([]byte, error) {
buff := bytes.Buffer{}
func (g GobEncoder) Encode(data any) ([]byte, error) {
var buff bytes.Buffer
enc := gob.NewEncoder(&buff)
if err := enc.Encode(data); err != nil {
return nil, errs.WrapMsg(err, "GobEncoder.Encode failed", "action", "encode")
}
return buff.Bytes(), nil
}

func (g *GobEncoder) Decode(encodeData []byte, decodeData any) error {
func (g GobEncoder) Decode(encodeData []byte, decodeData any) error {
buff := bytes.NewBuffer(encodeData)
dec := gob.NewDecoder(buff)
if err := dec.Decode(decodeData); err != nil {
return errs.WrapMsg(err, "GobEncoder.Decode failed", "action", "decode")
}
return nil
}

type JsonEncoder struct{}

func NewJsonEncoder() Encoder {
return JsonEncoder{}
}

func (g JsonEncoder) Encode(data any) ([]byte, error) {
b, err := json.Marshal(data)
if err != nil {
return nil, errs.New("JsonEncoder.Encode failed", "action", "encode")
}
return b, nil
}

func (g JsonEncoder) Decode(encodeData []byte, decodeData any) error {
err := json.Unmarshal(encodeData, decodeData)
if err != nil {
return errs.New("JsonEncoder.Decode failed", "action", "decode")
}
return nil
}
48 changes: 48 additions & 0 deletions internal/msggateway/encoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package msggateway

import (
"testing"
)

func TestGobEncoder_Encode(t *testing.T) {
encoder := NewGobEncoder()

// 测试用例1: 编码 []byte 数据
inputData := []byte("test data")
encodedData, err := encoder.Encode(inputData)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if string(encodedData) != string(inputData) {
t.Fatalf("expected encoded data to be '%s', got '%s'", inputData, encodedData)
}

// 测试用例2: 编码非 []byte 数据
nonByteData := "string data"
_, err = encoder.Encode(nonByteData)
if err == nil {
t.Fatalf("expected an error when encoding non-byte data, got none")
}
}

func TestGobEncoder_Decode(t *testing.T) {
encoder := NewGobEncoder()

// 测试用例1: 解码到 []byte 数据
encodedData := []byte("test data")
var decodedData []byte
err := encoder.Decode(encodedData, &decodedData)
if err != nil {
t.Fatalf("expected no error, got %v", err)
}
if string(decodedData) != string(encodedData) {
t.Fatalf("expected decoded data to be '%s', got '%s'", encodedData, decodedData)
}

// 测试用例2: 解码到非 []byte 数据
var nonByteData string
err = encoder.Decode(encodedData, &nonByteData)
if err == nil {
t.Fatalf("expected an error when decoding to non-byte data, got none")
}
}
16 changes: 4 additions & 12 deletions internal/msggateway/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,11 @@ func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready f
return s
}

func (s *Server) OnlinePushMsg(
context context.Context,
req *msggateway.OnlinePushMsgReq,
) (*msggateway.OnlinePushMsgResp, error) {
func (s *Server) OnlinePushMsg(context context.Context, req *msggateway.OnlinePushMsgReq) (*msggateway.OnlinePushMsgResp, error) {
panic("implement me")
}

func (s *Server) GetUsersOnlineStatus(
ctx context.Context,
req *msggateway.GetUsersOnlineStatusReq,
) (*msggateway.GetUsersOnlineStatusResp, error) {
func (s *Server) GetUsersOnlineStatus(ctx context.Context, req *msggateway.GetUsersOnlineStatusReq) (*msggateway.GetUsersOnlineStatusResp, error) {
if !authverify.IsAppManagerUid(ctx, s.config.Share.IMAdminUserID) {
return nil, errs.ErrNoPermission.WrapMsg("only app manager")
}
Expand Down Expand Up @@ -155,6 +149,7 @@ func (s *Server) pushToUser(ctx context.Context, userID string, msgData *sdkws.M
(client.IsBackground && client.PlatformID != constant.IOSPlatformID) {
err := client.PushMessage(ctx, msgData)
if err != nil {
log.ZWarn(ctx, "online push msg failed", err, "userID", userID, "platformID", client.PlatformID)
userPlatform.ResultCode = int64(servererrs.ErrPushMsgErr.Code())
} else {
if _, ok := s.pushTerminal[client.PlatformID]; ok {
Expand Down Expand Up @@ -220,10 +215,7 @@ func (s *Server) SuperGroupOnlineBatchPushOneMsg(ctx context.Context, req *msgga
}
}

func (s *Server) KickUserOffline(
ctx context.Context,
req *msggateway.KickUserOfflineReq,
) (*msggateway.KickUserOfflineResp, error) {
func (s *Server) KickUserOffline(ctx context.Context, req *msggateway.KickUserOfflineReq) (*msggateway.KickUserOfflineResp, error) {
for _, v := range req.KickUserIDList {
clients, _, ok := s.LongConnServer.GetUserPlatformCons(v, int(req.PlatformID))
if !ok {
Expand Down
11 changes: 11 additions & 0 deletions internal/msggateway/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msggateway

import (
"context"
"encoding/json"
"sync"

"github.com/go-playground/validator/v10"
Expand All @@ -31,6 +32,16 @@ import (
"github.com/openimsdk/tools/utils/jsonutil"
)

const (
TextPing = "ping"
TextPong = "pong"
)

type TextMessage struct {
Type string `json:"type"`
Body json.RawMessage `json:"body"`
}

type Req struct {
ReqIdentifier int32 `json:"reqIdentifier" validate:"required"`
Token string `json:"token"`
Expand Down
16 changes: 4 additions & 12 deletions internal/msggateway/ws_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type LongConnServer interface {
SetKickHandlerInfo(i *kickHandler)
SubUserOnlineStatus(ctx context.Context, client *Client, data *Req) ([]byte, error)
Compressor
Encoder
MessageHandler
}

Expand All @@ -61,7 +60,7 @@ type WsServer struct {
authClient *rpcclient.Auth
disCov discovery.SvcDiscoveryRegistry
Compressor
Encoder
//Encoder
MessageHandler
webhookClient *webhook.Client
}
Expand Down Expand Up @@ -135,7 +134,6 @@ func NewWsServer(msgGatewayConfig *Config, opts ...Option) *WsServer {
clients: newUserMap(),
subscription: newSubscription(),
Compressor: NewGzipCompressor(),
Encoder: NewGobEncoder(),
webhookClient: webhook.NewWebhookClient(msgGatewayConfig.WebhooksConfig.URL),
}
}
Expand Down Expand Up @@ -278,14 +276,7 @@ func (ws *WsServer) registerClient(client *Client) {

wg.Wait()

log.ZDebug(
client.ctx,
"user online",
"online user Num",
ws.onlineUserNum.Load(),
"online user conn Num",
ws.onlineUserConnNum.Load(),
)
log.ZDebug(client.ctx, "user online", "online user Num", ws.onlineUserNum.Load(), "online user conn Num", ws.onlineUserConnNum.Load())
}

func getRemoteAdders(client []*Client) string {
Expand Down Expand Up @@ -464,7 +455,8 @@ func (ws *WsServer) wsHandler(w http.ResponseWriter, r *http.Request) {

// Retrieve a client object from the client pool, reset its state, and associate it with the current WebSocket long connection
client := ws.clientPool.Get().(*Client)
client.ResetClient(connContext, wsLongConn, ws)
sdkType, _ := connContext.Query(SDKType)
client.ResetClient(connContext, wsLongConn, ws, sdkType)

// Register the client with the server and start message processing
ws.registerChan <- client
Expand Down
Loading

0 comments on commit 789a215

Please sign in to comment.