Skip to content

Commit

Permalink
Merge pull request #73 from jamebal/share
Browse files Browse the repository at this point in the history
perf: 文件变动后的消息推送
  • Loading branch information
jamebal authored May 28, 2024
2 parents 7e7526a + a449485 commit 3cc7a29
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public ResponseResult<Object> checkUpload(UploadApiParamDTO upload) throws IOExc
@PostMapping("checkExist")
@LogOperatingFun(logType = LogOperation.Type.BROWSE)
@Permission("cloud:file:upload")
public ResponseResult<Object> checkFileExist(UploadApiParamDTO upload) throws IOException {
public ResponseResult<Object> checkFileExist(@RequestBody UploadApiParamDTO upload) throws IOException {
return fileService.checkFileExist(upload);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public int handle(Track body) {
fileDocument.setSize(size);
fileDocument.setUpdateDate(updateDate);
fileDocument.setMd5(md5);
commonFileService.pushMessage(userLoginHolder.getUsername(), fileDocument, "updateFile");
commonFileService.pushMessage(userLoginHolder.getUsername(), fileDocument, Constants.UPDATE_FILE);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void notifyCreateFile(String username, String objectName, String ossRootF
FileIntroVO fileIntroVO = new FileIntroVO();
fileIntroVO.setPath(getPathByObjectName(ossRootFolderName, objectName));
fileIntroVO.setName(Paths.get(objectName).getFileName().toString());
commonFileService.pushMessage(username, fileIntroVO, "createFile");
commonFileService.pushMessage(username, fileIntroVO, Constants.CREATE_FILE);
}

public void notifyUpdateFile(String ossPath, String objectName, long size) {
Expand All @@ -57,15 +57,15 @@ public void notifyUpdateFile(String ossPath, String objectName, long size) {
fileIntroVO.setId(id);
fileIntroVO.setSize(size);
fileIntroVO.setUpdateDate(LocalDateTime.now());
commonFileService.pushMessage(username, fileIntroVO, "updateFile");
commonFileService.pushMessage(username, fileIntroVO, Constants.UPDATE_FILE);
}

public void notifyDeleteFile(String ossPath, String objectName) {
FileIntroVO fileIntroVO = new FileIntroVO();
String username = getUsernameByOssPath(ossPath);
String id = getFileId(getOssRootFolderName(ossPath), objectName, username);
fileIntroVO.setId(id);
commonFileService.pushMessage(username, fileIntroVO, "deleteFile");
commonFileService.pushMessage(username, fileIntroVO, Constants.DELETE_FILE);
}

public static String getFileId(String rootName, String objectName, String username) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public ResponseResult<Object> copyOssToLocal(String ossPathFrom, String sourceOb
String operation = isMove ? "移动" : "复制";
// 复制成功
log.info(operation + "成功, from: {}, to: {}", objectNameFrom, destFileDocument.getName());
commonFileService.pushMessage(destFileDocument.getUsername(), destFileDocument, "createFile");
commonFileService.pushMessage(destFileDocument.getUsername(), destFileDocument, Constants.CREATE_FILE);
Path fromPath = Paths.get(getOssRootFolderName(ossPathFrom), objectNameFrom);
Path toPath = Paths.get(destFileDocument.getPath(), destFileDocument.getName(), Paths.get(objectNameFrom).getFileName().toString());
commonFileService.pushMessageOperationFileSuccess(fromPath.toString(), toPath.toString(), destFileDocument.getUsername(), operation);
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/com/jmal/clouddisk/service/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,12 @@ private Constants() { }

public static final String FILE_ID = "fileId";

public static final String DELETE_FILE = "deleteFile";

public static final String UPDATE_FILE = "updateFile";

public static final String CREATE_FILE = "createFile";

public static final String OPERATION_FILE = "operationFile";

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import cn.hutool.core.io.file.PathUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.BooleanUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.jmal.clouddisk.config.FileProperties;
import com.jmal.clouddisk.controller.sse.Message;
import com.jmal.clouddisk.controller.sse.SseController;
Expand Down Expand Up @@ -109,6 +111,8 @@ public class CommonFileService {
@Autowired
public LuceneService luceneService;

private final Cache<String, Map<String, ThrottleExecutor>> throttleExecutorCache = Caffeine.newBuilder().build();

/**
* 上传文件夹的写入锁缓存
*/
Expand Down Expand Up @@ -347,7 +351,7 @@ public String createFile(String username, File file, String userId, Boolean isPu
// 检查该文件的上级目录是否有已经分享的目录
checkShareBase(update, relativePath);
updateResult = mongoTemplate.upsert(query, update, COLLECTION_NAME);
pushMessage(username, update.getUpdateObject(), "createFile");
pushMessage(username, update.getUpdateObject(), Constants.CREATE_FILE);
// 添加文件索引
luceneService.pushCreateIndexQueue(fileId);
} finally {
Expand Down Expand Up @@ -544,6 +548,22 @@ private void generateThumbnail(File file, Update update) {
* @param url url
*/
public void pushMessage(String username, Object message, String url) {
if (Constants.CREATE_FILE.equals(url) || Constants.DELETE_FILE.equals(url)) {
Map<String, ThrottleExecutor> throttleExecutorMap = throttleExecutorCache.get(username, key -> new HashMap<>(8));
if (throttleExecutorMap != null) {
ThrottleExecutor throttleExecutor = throttleExecutorMap.get(url);
if (throttleExecutor == null) {
throttleExecutor = new ThrottleExecutor(300);
throttleExecutorMap.put(url, throttleExecutor);
}
throttleExecutor.schedule(() -> pushMsg(username, message, url));
}
} else {
pushMsg(username, message, url);
}
}

private void pushMsg(String username, Object message, String url) {
Message msg = new Message();
String userId = userLoginHolder.getUserId();
if (CharSequenceUtil.isBlank(userId)) {
Expand All @@ -559,6 +579,7 @@ public void pushMessage(String username, Object message, String url) {
msg.setUrl(url);
msg.setUsername(username);
msg.setBody(message);
log.info("pushMessage:{}", JSON.toJSONString(msg));
sseController.sendEvent(msg);
}

Expand All @@ -567,7 +588,7 @@ public void pushMessageOperationFileError(String username, String message, Strin
msg.put("code", -1);
msg.put("msg", message);
msg.put("operation", operation);
pushMessage(username, msg, "operationFile");
pushMessage(username, msg, Constants.OPERATION_FILE);
}

public void pushMessageOperationFileSuccess(String fromPath, String toPath, String username, String operation) {
Expand All @@ -576,7 +597,7 @@ public void pushMessageOperationFileSuccess(String fromPath, String toPath, Stri
msg.put("from", fromPath);
msg.put("to", toPath);
msg.put("operation", operation);
pushMessage(username, msg, "operationFile");
pushMessage(username, msg, Constants.OPERATION_FILE);
}

public long occupiedSpace(String userId) {
Expand Down Expand Up @@ -800,7 +821,7 @@ public void modifyFile(String username, File file) {
String markDownContent = FileUtil.readString(file, MyFileUtils.getFileCharset(file));
update.set("contentText", markDownContent);
}
pushMessage(username, fileDocument, "updateFile");
pushMessage(username, fileDocument, Constants.UPDATE_FILE);
if (updateResult.getModifiedCount() > 0) {
luceneService.pushCreateIndexQueue(fileDocument.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ private void renameFile(String newFileName, String username, String id, String o
return;
}
fileDocument.setName(newFileName);
pushMessage(operator, fileDocument, "createFile");
pushMessage(operator, fileDocument, Constants.CREATE_FILE);
} else {
pushMessageOperationFileError(operator, "重命名失败", "重命名");
return;
Expand Down Expand Up @@ -1015,7 +1015,7 @@ public void deleteFile(String username, File file) {
luceneService.deleteIndexDocuments(Collections.singletonList(fileDocument.getId()));
}
}
pushMessage(username, fileDocument, "deleteFile");
pushMessage(username, fileDocument, Constants.DELETE_FILE);
}

@Override
Expand Down Expand Up @@ -1830,7 +1830,7 @@ public ResponseResult<Object> delete(String username, String currentDirectory, L
deleteDependencies(username, delFileIds);
isDel = true;
}
pushMessage(username, fileDocument, "deleteFile");
pushMessage(username, fileDocument, Constants.DELETE_FILE);
}
if (isDel) {
mongoTemplate.remove(query, COLLECTION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ private void startConvert(String username, String relativePath, String fileName,
}
mongoTemplate.upsert(query, update, FileDocument.class);
fileDocument.setM3u8(m3u8);
commonFileService.pushMessage(username, fileDocument, "updateFile");
commonFileService.pushMessage(username, fileDocument, Constants.UPDATE_FILE);
}

private static void printErrorInfo(ProcessBuilder processBuilder, Process process) throws IOException {
Expand Down
45 changes: 45 additions & 0 deletions src/main/java/com/jmal/clouddisk/util/ThrottleExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.jmal.clouddisk.util;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j
public class ThrottleExecutor {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ScheduledFuture<?> scheduledFuture;
private final long delay;

public ThrottleExecutor(long delay) {
this.delay = delay;
}

public void schedule(Runnable command) {
if (scheduledFuture != null && !scheduledFuture.isDone()) {
scheduledFuture.cancel(false);
}
scheduledFuture = scheduler.schedule(command, delay, TimeUnit.MILLISECONDS);
}

public void shutdown() {
scheduler.shutdown();
}

public static void main(String[] args) {
ThrottleExecutor throttleExecutor = new ThrottleExecutor(500); // 节流时间500毫秒

for (int i = 0; i < 10; i++) {
final int param = i;
throttleExecutor.schedule(() -> {
System.out.println("Executing task with param: " + param);
});
try {
Thread.sleep(100); // 模拟频繁调用
} catch (InterruptedException e) {
log.error("Error occurred: ", e);
}
}

throttleExecutor.shutdown();
}
}

0 comments on commit 3cc7a29

Please sign in to comment.