Skip to content

Commit

Permalink
增加离线消息推送机制
Browse files Browse the repository at this point in the history
  • Loading branch information
vaas1993 committed Apr 7, 2024
1 parent b145059 commit 5bea602
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 22 deletions.
11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
- 支持SQLite(默认)和 MYSQL
- 内建简易消息队列,可自行扩展至其它中间件
- 独立的配置文件

## 未来的计划
- 历史消息推送
- 离线消息推送

## 运行
构建项目后,你可以直接运行jar包来启动服务
Expand All @@ -20,9 +18,10 @@ java -jar xxx.jar
```
- 输出类似以下的信息则说明服务启动成功:
```
2023-02-23 10:40:22.0063 INFO - 消息队列处理器启动成功
2023-02-23 10:40:22.0140 INFO - WebSocket服务启动成功,端口号:20100
2023-02-23 10:40:22.0140 INFO - HTTP服务启动成功,端口号:20101
2024-04-07 13:23:11.0873 INFO - 离线消息过滤器启动成功
2024-04-07 13:23:11.0947 INFO - 消息队列处理器启动成功
2024-04-07 13:23:12.0018 INFO - HTTP服务启动成功,端口号:20101
2024-04-07 13:23:12.0018 INFO - WebSocket服务启动成功,端口号:20100
```

# 详细的文档见 [document](https://github.com/vaas1993/v-socket/tree/main/documents) 目录
29 changes: 29 additions & 0 deletions config-example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"socket": {
"port": 20100,
"timeout": 30
},
"http": {
"port": 20101
},
"message-queue": {
"interval": 1,
"offline-timeout": 604800
},
"db": {
"type": "MYSQL 或者 SQLite",
"file": "database.db",
"host": "127.0.0.1",
"port": 3306,
"username": "root",
"password": ""
},
"security": {
"timestamp": true,
"ssl": {
"enable": false,
"certificate": "certificate.pfx",
"password": ""
}
}
}
8 changes: 7 additions & 1 deletion documents/配置文件.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
>
> 设置过长可能会导致推送不及时
### message-queue.offline-timeout
> 离线消息保存的时长,单位秒,默认是 604800,即默认保存7天
>
> 客户端登录后会推送在这个时长内的离线消息,超过这个时长后离线消息会被自动丢弃
### db.type
> 数据库类型,允许的值有 `SQLite``MYSQL`,默认是 `SQLite`
Expand Down Expand Up @@ -75,7 +80,8 @@
"port": 20101
},
"message-queue": {
"interval": 1
"interval": 1,
"offline-timeout": 604800
},
"db": {
"type": "MYSQL 或者 SQLite",
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/Main.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import queue.ParserMessage;
import queue.ParserOfflineMessage;
import services.HTTPService;
import services.WebSocketService;

Expand All @@ -15,5 +16,9 @@ public static void main(String[] args) {
// 打开消息处理线程
ParserMessage parser = new ParserMessage();
parser.start();

// 打开离线消息过滤线程
ParserOfflineMessage offline = new ParserOfflineMessage();
offline.start();
}
}
7 changes: 7 additions & 0 deletions src/main/java/exceptions/UserOfflineException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package exceptions;

public class UserOfflineException extends BaseException {
public UserOfflineException(String message) {
super(message);
}
}
80 changes: 80 additions & 0 deletions src/main/java/queue/BrokerMessage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package queue;

import utils.Config;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class BrokerMessage {
Expand All @@ -10,13 +13,24 @@ public class BrokerMessage {
*/
private final ConcurrentLinkedQueue<String[]> MESSAGE_LIST = new ConcurrentLinkedQueue<>();

/**
* 离线消息哈希表
* 当发送时客户端不在线,消息会转存到这个列表
* 哈希键是appid + 下划线 + 用户id
* 哈希值都是一个字符串数组,第一个是消息内容,第二个是字符串时间戳
* 这个列表的内容不会主动消费,用户登录时会检查一下有没有离线消息,有消息时会重新添加到 MESSAGE_LIST
* 可以通过配置文件来设定离线消息的保存时长,超过规定时间的消息将被自动丢弃
*/
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<String[]>> OFFLINE_MESSAGE_LIST = new ConcurrentHashMap<>();

private BrokerMessage() {
}

private final static BrokerMessage instance = new BrokerMessage();

/**
* 获取消息处理实例
*
* @return BrokerMessage
*/
public static BrokerMessage getInstance() {
Expand All @@ -25,14 +39,59 @@ public static BrokerMessage getInstance() {

/**
* 获取待处理消息的数量
*
* @return int
*/
public int getMessageListSize() {
return MESSAGE_LIST.size();
}

/**
* 检查用户是否有离线消息,有消息时转移到待处理列表
*
* @param appid String 应用ID
* @param id String 用户ID
*/
public void checkOfflineMessage(String appid, String id) {
String key = appid + "_" + id;
// 没有消息时直接忽略
if (OFFLINE_MESSAGE_LIST.isEmpty() || !OFFLINE_MESSAGE_LIST.containsKey(key)) {
return;
}
// 取出离线消息并在队列中删除
ConcurrentLinkedQueue<String[]> list = OFFLINE_MESSAGE_LIST.get(key);
OFFLINE_MESSAGE_LIST.remove(key);
for (String[] item : list) {
this.push(appid, id, item[0]);
}
}

/**
* 删除离线消息中超时的消息
*/
public void removeOfflineMessages() {
String[] fields = OFFLINE_MESSAGE_LIST.keySet().toArray(new String[0]);

long current = System.currentTimeMillis();
long timeout = Config.getInstance().getOfflineMessageTimeout() * 1000L;

for (String field : fields) {
ConcurrentLinkedQueue<String[]> messages = OFFLINE_MESSAGE_LIST.get(field);
for (int i = 0; i < messages.size(); i++) {
// 取出离线消息,没有超时则再放回去
String[] message = messages.poll();
if (Long.parseLong(message[1]) + timeout >= current) {
messages.add(message);
} else {
System.out.println("删除离线消息");
}
}
}
}

/**
* 添加一条新消息
*
* @param appid String 应用ID
* @param id String 用户ID
* @param message String 消息内容
Expand All @@ -46,8 +105,29 @@ public void push(String appid, String id, String message) {
MESSAGE_LIST.add(item);
}

/**
* 添加一条离线消息
*
* @param appid String 应用ID
* @param id String 用户ID
* @param message String 消息内容
* @param timestamp int 时间戳
*/
public void push(String appid, String id, String message, long timestamp) {
String[] item = {
message,
String.valueOf(timestamp)
};
String key = appid + "_" + id;
if (!OFFLINE_MESSAGE_LIST.containsKey(key)) {
OFFLINE_MESSAGE_LIST.put(key, new ConcurrentLinkedQueue<>());
}
OFFLINE_MESSAGE_LIST.get(key).add(item);
}

/**
* 拉取一条消息,当返回 null 时表示队列为空
*
* @return String[] | null
*/
public String[] pull() {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/queue/ConsumerMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import channel.ChannelManager;
import com.alibaba.fastjson.JSONObject;
import exceptions.UserOfflineException;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

Expand All @@ -25,12 +26,13 @@ public ConsumerMessage(String appid, String id, String message) {
/**
* 将消息发送到客户端
*/
public void send() {
public void send() throws UserOfflineException {
ChannelManager cm = ChannelManager.getInstance();
Channel channel = cm.get(appid, id);
if( channel == null ) {
// TODO 如果用户不在线则添加到离线消息,等下次上线再处理
return;
// 如果用户不在线则添加到离线消息,等下次上线再处理
// 由上游业务处理
throw new UserOfflineException("User Offline");
}
JSONObject response = new JSONObject();
response.put("message", message);
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/queue/ParserMessage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package queue;

import exceptions.UserOfflineException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import utils.Config;
Expand All @@ -11,7 +12,7 @@
public class ParserMessage extends Thread {
private final static Logger LOGGER = LogManager.getLogger(ParserMessage.class);
synchronized public void run() {
int interval = Config.getInstance().getQueueInterval();
int interval = Config.getInstance().getMessageQueueInterval() * 1000;
BrokerMessage broker = BrokerMessage.getInstance();

LOGGER.info("消息队列处理器启动成功");
Expand All @@ -34,7 +35,12 @@ synchronized public void run() {
continue;
}
ConsumerMessage c = new ConsumerMessage(item[0], item[1], item[2]);
c.send();
try {
c.send();
} catch (UserOfflineException e) {
LOGGER.debug("用户离线,写入离线消息");
broker.push(c.appid, c.id, c.message, System.currentTimeMillis());
}
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/queue/ParserOfflineMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package queue;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* 离线消息队列处理器
* 如果自行实现使用了其它队列中间件,需要在 Main 类中停用它
* 离线消息在用户登录时被动消费,这个处理器是根据配置文件的离线消息存储时长来移除超时的离线消息而已
*/
public class ParserOfflineMessage extends Thread {
private final static Logger LOGGER = LogManager.getLogger(ParserOfflineMessage.class);
synchronized public void run() {
BrokerMessage broker = BrokerMessage.getInstance();
LOGGER.info("离线消息过滤器启动成功");
while (true) {
// 删除超时消息
broker.removeOfflineMessages();
try {
wait(1000);
} catch (InterruptedException e) {
break;
}
}
}
}
1 change: 1 addition & 0 deletions src/main/java/services/HTTPService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
action.beforeRun();
// 接口返回的数据直接根 responseData 合并
responseData.putAll(action.run());
action.afterRun();
} catch (ProcessException e) {
responseData.put("code", e.getStatus());
responseData.put("message", e.getMessage());
Expand Down
1 change: 1 addition & 0 deletions src/main/java/services/WebSocketService.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ public JSONObject onSocket(ChannelHandlerContext ctx, WebSocketFrame frame) thro
action.setChannel(ctx.channel());
action.beforeRun();
JSONObject res = action.run();
action.afterRun();
String[] fields = res.keySet().toArray(new String[0]);
for (String field : fields) {
response.put(field, res.get(field));
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/services/actions/BaseAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ public void setParams(JSONObject params) {
}

/**
* 请求前的校验,抛出 ProcessException 异常后将不再执行 run 方法
* 请求前的钩子,抛出 ProcessException 异常后将不再执行 run 方法
*/
public void beforeRun() throws ProcessException {
}

/**
* 请求后的钩子,此时接口逻辑(run方法)已执行完毕
*/
public void afterRun() {
}
}
6 changes: 6 additions & 0 deletions src/main/java/services/actions/sockets/LoginAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import exceptions.ProcessException;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import queue.BrokerMessage;
import utils.Config;
import utils.RSA;

Expand Down Expand Up @@ -54,4 +55,9 @@ public void beforeRun() throws ProcessException {
}
}
}

public void afterRun() {
// 登录后,检查用户的离线消息
BrokerMessage.getInstance().checkOfflineMessage(params.getString("appid"), decipher.getString("id"));
}
}
18 changes: 9 additions & 9 deletions src/main/java/utils/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ private void load() {
data = JSONObject.parseObject(json.toString());
}

/**
* 获取配置文件
* @return String
*/
public String getFilename() {
return filename;
}

/**
* 给 data 设置各种默认值
*/
Expand Down Expand Up @@ -225,10 +217,18 @@ private JSONObject getMessageQueue() {
* 获取消息处理队列的消费间隔(秒)
* @return int
*/
public int getQueueInterval() {
public int getMessageQueueInterval() {
return this.getMessageQueue().getInteger("interval");
}

/**
* 获取离线消息存储的时长(秒)
* @return int
*/
public int getOfflineMessageTimeout() {
return this.getMessageQueue().getInteger("offline-timeout");
}

/**
* 获取服务安全配置
* @return JSONObject
Expand Down

0 comments on commit 5bea602

Please sign in to comment.