From ec5c70f951bc754fdf14c04a9cb427a621672c61 Mon Sep 17 00:00:00 2001
From: guqing <38999863+guqing@users.noreply.github.com>
Date: Tue, 26 Nov 2024 11:26:28 +0800
Subject: [PATCH] fix: resolve concurrency issue causing duplicate thumbnail
generation (#7077)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
#### What type of PR is this?
/kind improvement
/area core
/milestone 2.20.x
#### What this PR does / why we need it:
修复可能为因为并发调用缩略图生成导致多次创建缩略图的问题
此 PR 为 #7031 的补充,并且会清理以前重复生成的缩略图记录和文件
#### Does this PR introduce a user-facing change?
```release-note
修复可能为因为并发调用缩略图生成导致多次重复缩略图记录的问题
```
---
.../core/attachment/ThumbnailMigration.java | 144 ++++++++++++++++++
.../attachment/impl/ThumbnailServiceImpl.java | 27 +++-
.../run/halo/app/infra/SchemeInitializer.java | 5 +-
.../impl/ThumbnailServiceImplTest.java | 73 +++++++--
4 files changed, 235 insertions(+), 14 deletions(-)
create mode 100644 application/src/main/java/run/halo/app/core/attachment/ThumbnailMigration.java
diff --git a/application/src/main/java/run/halo/app/core/attachment/ThumbnailMigration.java b/application/src/main/java/run/halo/app/core/attachment/ThumbnailMigration.java
new file mode 100644
index 0000000000..0bd62d0c79
--- /dev/null
+++ b/application/src/main/java/run/halo/app/core/attachment/ThumbnailMigration.java
@@ -0,0 +1,144 @@
+package run.halo.app.core.attachment;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.function.Function;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.dao.OptimisticLockingFailureException;
+import org.springframework.lang.NonNull;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.retry.Retry;
+import run.halo.app.core.attachment.extension.LocalThumbnail;
+import run.halo.app.core.attachment.extension.Thumbnail;
+import run.halo.app.extension.Extension;
+import run.halo.app.extension.ListOptions;
+import run.halo.app.extension.ReactiveExtensionClient;
+import run.halo.app.infra.ReactiveExtensionPaginatedOperator;
+
+/**
+ *
TODO Remove this class in the next major version.
+ * when this class is removed, the following code should be added:
+ *
+ *
+ * schemeManager.register(LocalThumbnail.class, indexSpec -> {
+ * indexSpec.add(new IndexSpec()
+ * // mark the index as unique
+ * .setUnique(true)
+ * .setName(LocalThumbnail.UNIQUE_IMAGE_AND_SIZE_INDEX)
+ * .setIndexFunc(simpleAttribute(LocalThumbnail.class,
+ * LocalThumbnail::uniqueImageAndSize)
+ * )
+ * );
+ * // ...
+ * });
+ * schemeManager.register(Thumbnail.class, indexSpec -> {
+ * indexSpec.add(new IndexSpec()
+ * // mark the index as unique
+ * .setUnique(true)
+ * .setName(Thumbnail.ID_INDEX)
+ * .setIndexFunc(simpleAttribute(Thumbnail.class, Thumbnail::idIndexFunc))
+ * );
+ * // ...
+ * });
+ *
+ *
+ *
+ * @see run.halo.app.infra.SchemeInitializer
+ * @since 2.20.9
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class ThumbnailMigration {
+ private final LocalThumbnailService localThumbnailService;
+ private final ReactiveExtensionClient client;
+ private final ReactiveExtensionPaginatedOperator extensionPaginatedOperator;
+
+ @Async
+ @EventListener(ApplicationStartedEvent.class)
+ public void onApplicationEvent(@NonNull ApplicationStartedEvent event) {
+ cleanupThumbnail(Thumbnail.class,
+ thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
+ thumbnail.getSpec().getSize().name()))
+ .count()
+ .doOnNext(count -> log.info("Deleted {} duplicate thumbnail records", count))
+ .block();
+
+ cleanupThumbnail(LocalThumbnail.class,
+ thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
+ thumbnail.getSpec().getSize().name()))
+ .flatMap(thumb -> {
+ var filePath = localThumbnailService.toFilePath(thumb.getSpec().getFilePath());
+ return deleteFile(filePath).thenReturn(thumb.getMetadata().getName());
+ })
+ .count()
+ .doOnNext(count -> log.info("Deleted {} duplicate local thumbnail records.", count))
+ .block();
+ log.info("Duplicate thumbnails have been cleaned up.");
+ }
+
+ private Mono deleteFile(Path path) {
+ return Mono.fromRunnable(
+ () -> {
+ try {
+ Files.deleteIfExists(path);
+ } catch (Exception e) {
+ // Ignore
+ }
+ })
+ .subscribeOn(Schedulers.boundedElastic())
+ .then();
+ }
+
+ private Flux cleanupThumbnail(Class thumbClass,
+ Function keyFunction) {
+ var unique = new HashSet();
+ var duplicateThumbs = new ArrayList();
+
+ var collectDuplicateMono = extensionPaginatedOperator.list(thumbClass, new ListOptions())
+ .doOnNext(thumbnail -> {
+ var key = keyFunction.apply(thumbnail);
+ if (unique.contains(key)) {
+ duplicateThumbs.add(thumbnail);
+ } else {
+ unique.add(key);
+ }
+ })
+ .then();
+
+ return Mono.when(collectDuplicateMono)
+ .thenMany(Flux.fromIterable(duplicateThumbs)
+ .flatMap(this::deleteThumbnail)
+ );
+ }
+
+ @SuppressWarnings("unchecked")
+ private Mono deleteThumbnail(T thumbnail) {
+ return client.delete(thumbnail)
+ .onErrorResume(OptimisticLockingFailureException.class,
+ e -> deleteThumbnail((Class) thumbnail.getClass(),
+ thumbnail.getMetadata().getName())
+ );
+ }
+
+ private Mono deleteThumbnail(Class clazz, String name) {
+ return Mono.defer(() -> client.fetch(clazz, name)
+ .flatMap(client::delete)
+ )
+ .retryWhen(Retry.backoff(8, Duration.ofMillis(100))
+ .filter(OptimisticLockingFailureException.class::isInstance));
+ }
+
+ record UniqueKey(String imageUri, String size) {
+ }
+}
diff --git a/application/src/main/java/run/halo/app/core/attachment/impl/ThumbnailServiceImpl.java b/application/src/main/java/run/halo/app/core/attachment/impl/ThumbnailServiceImpl.java
index a499ebc006..9c34ce64cc 100644
--- a/application/src/main/java/run/halo/app/core/attachment/impl/ThumbnailServiceImpl.java
+++ b/application/src/main/java/run/halo/app/core/attachment/impl/ThumbnailServiceImpl.java
@@ -6,7 +6,9 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Sort;
@@ -38,9 +40,24 @@ public class ThumbnailServiceImpl implements ThumbnailService {
private final ExternalLinkProcessor externalLinkProcessor;
private final ThumbnailProvider thumbnailProvider;
private final LocalThumbnailService localThumbnailService;
+ private final Map> ongoingTasks = new ConcurrentHashMap<>();
@Override
public Mono generate(URI imageUri, ThumbnailSize size) {
+ var cacheKey = new CacheKey(imageUri, size);
+ // Combine caching to implement more elegant deduplication logic, ensure that only
+ // one thread executes the logic of create at the same time, and there is no global lock
+ // restriction
+ return ongoingTasks.computeIfAbsent(cacheKey, k -> doGenerate(imageUri, size)
+ // In the case of concurrency, doGenerate must return the same instance
+ .cache()
+ .doFinally(signalType -> ongoingTasks.remove(cacheKey)));
+ }
+
+ record CacheKey(URI imageUri, ThumbnailSize size) {
+ }
+
+ private Mono doGenerate(URI imageUri, ThumbnailSize size) {
var imageUrlOpt = toImageUrl(imageUri);
if (imageUrlOpt.isEmpty()) {
return Mono.empty();
@@ -91,7 +108,7 @@ Optional toImageUrl(URI imageUri) {
return Optional.empty();
}
- Mono create(URL imageUrl, ThumbnailSize size) {
+ protected Mono create(URL imageUrl, ThumbnailSize size) {
var context = ThumbnailContext.builder()
.imageUrl(imageUrl)
.size(size)
@@ -112,8 +129,12 @@ Mono create(URL imageUrl, ThumbnailSize size) {
.setImageUri(imageUri.toASCIIString())
.setImageSignature(signatureFor(imageUri))
);
- return client.create(thumb)
- .thenReturn(uri);
+ // double check
+ return fetchThumbnail(imageUri, size)
+ .map(thumbnail -> URI.create(thumbnail.getSpec().getThumbnailUri()))
+ .switchIfEmpty(Mono.defer(() -> client.create(thumb)
+ .thenReturn(uri))
+ );
});
}
diff --git a/application/src/main/java/run/halo/app/infra/SchemeInitializer.java b/application/src/main/java/run/halo/app/infra/SchemeInitializer.java
index c8dd622f05..fc22f4b47f 100644
--- a/application/src/main/java/run/halo/app/infra/SchemeInitializer.java
+++ b/application/src/main/java/run/halo/app/infra/SchemeInitializer.java
@@ -607,6 +607,8 @@ public void onApplicationEvent(@NonNull ApplicationContextInitializedEvent event
schemeManager.register(PolicyTemplate.class);
schemeManager.register(Thumbnail.class, indexSpec -> {
indexSpec.add(new IndexSpec()
+ // see run.halo.app.core.attachment.ThumbnailMigration
+ // .setUnique(true)
.setName(Thumbnail.ID_INDEX)
.setIndexFunc(simpleAttribute(Thumbnail.class, Thumbnail::idIndexFunc))
);
@@ -614,7 +616,8 @@ public void onApplicationEvent(@NonNull ApplicationContextInitializedEvent event
schemeManager.register(LocalThumbnail.class, indexSpec -> {
// make sure image and size are unique
indexSpec.add(new IndexSpec()
- .setUnique(true)
+ // see run.halo.app.core.attachment.ThumbnailMigration
+ // .setUnique(true)
.setName(LocalThumbnail.UNIQUE_IMAGE_AND_SIZE_INDEX)
.setIndexFunc(simpleAttribute(LocalThumbnail.class,
LocalThumbnail::uniqueImageAndSize)
diff --git a/application/src/test/java/run/halo/app/core/attachment/impl/ThumbnailServiceImplTest.java b/application/src/test/java/run/halo/app/core/attachment/impl/ThumbnailServiceImplTest.java
index 5461fc8742..6821b6be1e 100644
--- a/application/src/test/java/run/halo/app/core/attachment/impl/ThumbnailServiceImplTest.java
+++ b/application/src/test/java/run/halo/app/core/attachment/impl/ThumbnailServiceImplTest.java
@@ -7,6 +7,7 @@
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static run.halo.app.extension.index.query.QueryFactory.equal;
@@ -15,6 +16,11 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
@@ -116,26 +122,28 @@ void createTest() throws MalformedURLException, URISyntaxException {
.thenReturn(insiteUri);
when(client.create(any())).thenReturn(Mono.empty());
+ when(client.listBy(eq(Thumbnail.class), any(), isA(PageRequest.class)))
+ .thenReturn(Mono.empty());
+
thumbnailService.create(url, ThumbnailSize.M)
.as(StepVerifier::create)
.expectNext(thumbUri)
.verifyComplete();
- when(client.listBy(eq(Thumbnail.class), any(), isA(PageRequest.class)))
- .thenReturn(Mono.empty());
thumbnailService.fetchThumbnail(url.toURI(), ThumbnailSize.M)
.as(StepVerifier::create)
.verifyComplete();
var hash = ThumbnailSigner.generateSignature(insiteUri.toString());
- verify(client).listBy(eq(Thumbnail.class), assertArg(options -> {
- var exceptOptions = ListOptions.builder()
- .fieldQuery(equal(Thumbnail.ID_INDEX,
- Thumbnail.idIndexFunc(hash, ThumbnailSize.M.name())
- ))
- .build();
- assertThat(options.toString()).isEqualTo(exceptOptions.toString());
- }), isA(PageRequest.class));
+ verify(client, times(2)).listBy(eq(Thumbnail.class),
+ assertArg(options -> {
+ var exceptOptions = ListOptions.builder()
+ .fieldQuery(equal(Thumbnail.ID_INDEX,
+ Thumbnail.idIndexFunc(hash, ThumbnailSize.M.name())
+ ))
+ .build();
+ assertThat(options.toString()).isEqualTo(exceptOptions.toString());
+ }), isA(PageRequest.class));
verify(localThumbnailProvider).generate(any());
@@ -169,4 +177,49 @@ void createTest2() throws MalformedURLException {
.as(StepVerifier::create)
.verifyComplete();
}
+
+ @Nested
+ class ThumbnailGenerateConcurrencyTest {
+
+ @Test
+ public void concurrentThumbnailGeneration() throws InterruptedException {
+ var spyThumbnailService = spy(thumbnailService);
+
+ URI imageUri = URI.create("http://localhost:8090/test.jpg");
+
+ doReturn(Mono.empty()).when(spyThumbnailService).fetchThumbnail(eq(imageUri), any());
+
+ var createdUri = URI.create("/test-thumb.jpg");
+ doReturn(Mono.just(createdUri)).when(spyThumbnailService).create(any(), any());
+
+ int threadCount = 10;
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+ var latch = new CountDownLatch(threadCount);
+
+ var results = new ConcurrentLinkedQueue>();
+
+ for (int i = 0; i < threadCount; i++) {
+ executor.submit(() -> {
+ try {
+ results.add(spyThumbnailService.generate(imageUri, ThumbnailSize.M));
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+
+ results.forEach(result -> {
+ StepVerifier.create(result)
+ .expectNext(createdUri)
+ .verifyComplete();
+ });
+
+ verify(spyThumbnailService).fetchThumbnail(eq(imageUri), eq(ThumbnailSize.M));
+ verify(spyThumbnailService).create(any(), eq(ThumbnailSize.M));
+
+ executor.shutdown();
+ }
+ }
}
\ No newline at end of file