Skip to content

Commit

Permalink
Merge pull request #208 from jamebal/develop
Browse files Browse the repository at this point in the history
perf: 优化文件修改事件过于频繁的问题
  • Loading branch information
jamebal authored Dec 23, 2024
2 parents 6ecc009 + afabd6a commit 9ebc528
Showing 1 changed file with 58 additions and 12 deletions.
70 changes: 58 additions & 12 deletions src/main/java/com/jmal/clouddisk/listener/FileListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,94 @@
import com.jmal.clouddisk.service.IFileService;
import io.methvin.watcher.DirectoryChangeEvent;
import io.methvin.watcher.DirectoryChangeListener;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
@RequiredArgsConstructor
public class FileListener implements DirectoryChangeListener {

private final FileProperties fileProperties;

private final IFileService fileService;

/**
* 续要过滤掉的目录列表
* 需要过滤掉的目录列表
*/
private final Set<Path> FILTER_DIR_SET = new CopyOnWriteArraySet<>();
@Getter
private final Set<Path> filterDirSet = new CopyOnWriteArraySet<>();

private final FlowableProcessor<DirectoryChangeEvent> eventProcessor = PublishProcessor.<DirectoryChangeEvent>create().toSerialized();
private Disposable eventSubscription;


@PostConstruct
public void init() {
eventSubscription = eventProcessor
.buffer(100, TimeUnit.MILLISECONDS, 100)
.filter(list -> !list.isEmpty())
.map(this::mergeEvents)
.flatMap(event -> Flowable.just(event)
.subscribeOn(Schedulers.io()) // 在 IO 线程池中执行处理
)
.subscribe(this::processEvent, throwable -> log.error("Error processing file event", throwable));
}

public void addFilterDir(Path path) {
FILTER_DIR_SET.add(path);
private DirectoryChangeEvent mergeEvents(List<DirectoryChangeEvent> events) {
if (events.size() == 1) {
return events.get(0);
}
// 对于同一文件的多个事件,保留最新的事件类型
DirectoryChangeEvent latestEvent = events.get(events.size() - 1);
log.debug("Merged {} events for path: {}", events.size(), latestEvent.path());
return latestEvent;
}

public Set<Path> getFilterDirSet() {
return FILTER_DIR_SET;
public void addFilterDir(Path path) {
filterDirSet.add(path);
}

public void removeFilterDir(Path path) {
FILTER_DIR_SET.remove(path);
filterDirSet.remove(path);
}

public boolean containsFilterDir(Path path) {
return FILTER_DIR_SET.contains(path);
return filterDirSet.contains(path);
}

@Override
public void onEvent(DirectoryChangeEvent directoryChangeEvent) {
Path eventPath = directoryChangeEvent.path();
DirectoryChangeEvent.EventType eventType = directoryChangeEvent.eventType();
// 检查是否为忽略路径
if (FILTER_DIR_SET.stream().anyMatch(eventPath::startsWith)) {
if (filterDirSet.stream().anyMatch(eventPath::startsWith)) {
log.debug("Ignore Event: {}, Path: {}", eventType, eventPath);
return;
}
log.debug("DirectoryChangeEvent: {}, Path: {}", eventType, eventPath);
log.debug("Received Event: {}, Path: {}", eventType, eventPath);
eventProcessor.onNext(directoryChangeEvent);
}

private void processEvent(DirectoryChangeEvent directoryChangeEvent) {
Path eventPath = directoryChangeEvent.path();
DirectoryChangeEvent.EventType eventType = directoryChangeEvent.eventType();
switch (eventType) {
case CREATE:
onFileCreate(eventPath.toFile());
Expand All @@ -65,6 +103,8 @@ public void onEvent(DirectoryChangeEvent directoryChangeEvent) {
case DELETE:
onFileDelete(eventPath.toFile());
break;
default:
break;
}
}

Expand All @@ -74,7 +114,6 @@ public void onEvent(DirectoryChangeEvent directoryChangeEvent) {
*/
public void onFileCreate(File file) {
try {
// 判断文件名是否在monitorIgnoreFilePrefix中
if (fileProperties.getMonitorIgnoreFilePrefix().stream().anyMatch(file.getName()::startsWith)) {
log.info("忽略文件:{}", file.getAbsolutePath());
return;
Expand Down Expand Up @@ -156,4 +195,11 @@ public void onIdle(int i) {
public void onException(Exception e) {
DirectoryChangeListener.super.onException(e);
}

@PreDestroy
public void shutdown() {
if (eventSubscription != null && !eventSubscription.isDisposed()) {
eventSubscription.dispose();
}
}
}

0 comments on commit 9ebc528

Please sign in to comment.