Skip to content

Commit

Permalink
fix: resolve concurrency issue causing duplicate thumbnail generation (
Browse files Browse the repository at this point in the history
…#7077)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.20.x

#### What this PR does / why we need it:
修复可能为因为并发调用缩略图生成导致多次创建缩略图的问题

此 PR 为 #7031 的补充,并且会清理以前重复生成的缩略图记录和文件

#### Does this PR introduce a user-facing change?

```release-note
修复可能为因为并发调用缩略图生成导致多次重复缩略图记录的问题
```
  • Loading branch information
guqing authored Nov 26, 2024
1 parent 2ed3bb6 commit ec5c70f
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package run.halo.app.core.attachment;

import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import run.halo.app.core.attachment.extension.LocalThumbnail;
import run.halo.app.core.attachment.extension.Thumbnail;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.infra.ReactiveExtensionPaginatedOperator;

/**
* <p>TODO Remove this class in the next major version.</p>
* when this class is removed, the following code should be added:
* <pre>
* <code>
* schemeManager.register(LocalThumbnail.class, indexSpec -> {
* indexSpec.add(new IndexSpec()
* // mark the index as unique
* .setUnique(true)
* .setName(LocalThumbnail.UNIQUE_IMAGE_AND_SIZE_INDEX)
* .setIndexFunc(simpleAttribute(LocalThumbnail.class,
* LocalThumbnail::uniqueImageAndSize)
* )
* );
* // ...
* });
* schemeManager.register(Thumbnail.class, indexSpec -> {
* indexSpec.add(new IndexSpec()
* // mark the index as unique
* .setUnique(true)
* .setName(Thumbnail.ID_INDEX)
* .setIndexFunc(simpleAttribute(Thumbnail.class, Thumbnail::idIndexFunc))
* );
* // ...
* });
* </code>
* </pre>
*
* @see run.halo.app.infra.SchemeInitializer
* @since 2.20.9
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ThumbnailMigration {
private final LocalThumbnailService localThumbnailService;
private final ReactiveExtensionClient client;
private final ReactiveExtensionPaginatedOperator extensionPaginatedOperator;

@Async
@EventListener(ApplicationStartedEvent.class)
public void onApplicationEvent(@NonNull ApplicationStartedEvent event) {
cleanupThumbnail(Thumbnail.class,
thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
thumbnail.getSpec().getSize().name()))
.count()
.doOnNext(count -> log.info("Deleted {} duplicate thumbnail records", count))
.block();

cleanupThumbnail(LocalThumbnail.class,
thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
thumbnail.getSpec().getSize().name()))
.flatMap(thumb -> {
var filePath = localThumbnailService.toFilePath(thumb.getSpec().getFilePath());
return deleteFile(filePath).thenReturn(thumb.getMetadata().getName());
})
.count()
.doOnNext(count -> log.info("Deleted {} duplicate local thumbnail records.", count))
.block();
log.info("Duplicate thumbnails have been cleaned up.");
}

private Mono<Void> deleteFile(Path path) {
return Mono.fromRunnable(
() -> {
try {
Files.deleteIfExists(path);
} catch (Exception e) {
// Ignore
}
})
.subscribeOn(Schedulers.boundedElastic())
.then();
}

private <T extends Extension> Flux<T> cleanupThumbnail(Class<T> thumbClass,
Function<T, UniqueKey> keyFunction) {
var unique = new HashSet<UniqueKey>();
var duplicateThumbs = new ArrayList<T>();

var collectDuplicateMono = extensionPaginatedOperator.list(thumbClass, new ListOptions())
.doOnNext(thumbnail -> {
var key = keyFunction.apply(thumbnail);
if (unique.contains(key)) {
duplicateThumbs.add(thumbnail);
} else {
unique.add(key);
}
})
.then();

return Mono.when(collectDuplicateMono)
.thenMany(Flux.fromIterable(duplicateThumbs)
.flatMap(this::deleteThumbnail)
);
}

@SuppressWarnings("unchecked")
private <T extends Extension> Mono<T> deleteThumbnail(T thumbnail) {
return client.delete(thumbnail)
.onErrorResume(OptimisticLockingFailureException.class,
e -> deleteThumbnail((Class<T>) thumbnail.getClass(),
thumbnail.getMetadata().getName())
);
}

private <T extends Extension> Mono<T> deleteThumbnail(Class<T> clazz, String name) {
return Mono.defer(() -> client.fetch(clazz, name)
.flatMap(client::delete)
)
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}

record UniqueKey(String imageUri, String size) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Sort;
Expand Down Expand Up @@ -38,9 +40,24 @@ public class ThumbnailServiceImpl implements ThumbnailService {
private final ExternalLinkProcessor externalLinkProcessor;
private final ThumbnailProvider thumbnailProvider;
private final LocalThumbnailService localThumbnailService;
private final Map<CacheKey, Mono<URI>> ongoingTasks = new ConcurrentHashMap<>();

@Override
public Mono<URI> generate(URI imageUri, ThumbnailSize size) {
var cacheKey = new CacheKey(imageUri, size);
// Combine caching to implement more elegant deduplication logic, ensure that only
// one thread executes the logic of create at the same time, and there is no global lock
// restriction
return ongoingTasks.computeIfAbsent(cacheKey, k -> doGenerate(imageUri, size)
// In the case of concurrency, doGenerate must return the same instance
.cache()
.doFinally(signalType -> ongoingTasks.remove(cacheKey)));
}

record CacheKey(URI imageUri, ThumbnailSize size) {
}

private Mono<URI> doGenerate(URI imageUri, ThumbnailSize size) {
var imageUrlOpt = toImageUrl(imageUri);
if (imageUrlOpt.isEmpty()) {
return Mono.empty();
Expand Down Expand Up @@ -91,7 +108,7 @@ Optional<URL> toImageUrl(URI imageUri) {
return Optional.empty();
}

Mono<URI> create(URL imageUrl, ThumbnailSize size) {
protected Mono<URI> create(URL imageUrl, ThumbnailSize size) {
var context = ThumbnailContext.builder()
.imageUrl(imageUrl)
.size(size)
Expand All @@ -112,8 +129,12 @@ Mono<URI> create(URL imageUrl, ThumbnailSize size) {
.setImageUri(imageUri.toASCIIString())
.setImageSignature(signatureFor(imageUri))
);
return client.create(thumb)
.thenReturn(uri);
// double check
return fetchThumbnail(imageUri, size)
.map(thumbnail -> URI.create(thumbnail.getSpec().getThumbnailUri()))
.switchIfEmpty(Mono.defer(() -> client.create(thumb)
.thenReturn(uri))
);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,14 +607,17 @@ public void onApplicationEvent(@NonNull ApplicationContextInitializedEvent event
schemeManager.register(PolicyTemplate.class);
schemeManager.register(Thumbnail.class, indexSpec -> {
indexSpec.add(new IndexSpec()
// see run.halo.app.core.attachment.ThumbnailMigration
// .setUnique(true)
.setName(Thumbnail.ID_INDEX)
.setIndexFunc(simpleAttribute(Thumbnail.class, Thumbnail::idIndexFunc))
);
});
schemeManager.register(LocalThumbnail.class, indexSpec -> {
// make sure image and size are unique
indexSpec.add(new IndexSpec()
.setUnique(true)
// see run.halo.app.core.attachment.ThumbnailMigration
// .setUnique(true)
.setName(LocalThumbnail.UNIQUE_IMAGE_AND_SIZE_INDEX)
.setIndexFunc(simpleAttribute(LocalThumbnail.class,
LocalThumbnail::uniqueImageAndSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static run.halo.app.extension.index.query.QueryFactory.equal;
Expand All @@ -15,6 +16,11 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
Expand Down Expand Up @@ -116,26 +122,28 @@ void createTest() throws MalformedURLException, URISyntaxException {
.thenReturn(insiteUri);
when(client.create(any())).thenReturn(Mono.empty());

when(client.listBy(eq(Thumbnail.class), any(), isA(PageRequest.class)))
.thenReturn(Mono.empty());

thumbnailService.create(url, ThumbnailSize.M)
.as(StepVerifier::create)
.expectNext(thumbUri)
.verifyComplete();

when(client.listBy(eq(Thumbnail.class), any(), isA(PageRequest.class)))
.thenReturn(Mono.empty());
thumbnailService.fetchThumbnail(url.toURI(), ThumbnailSize.M)
.as(StepVerifier::create)
.verifyComplete();
var hash = ThumbnailSigner.generateSignature(insiteUri.toString());

verify(client).listBy(eq(Thumbnail.class), assertArg(options -> {
var exceptOptions = ListOptions.builder()
.fieldQuery(equal(Thumbnail.ID_INDEX,
Thumbnail.idIndexFunc(hash, ThumbnailSize.M.name())
))
.build();
assertThat(options.toString()).isEqualTo(exceptOptions.toString());
}), isA(PageRequest.class));
verify(client, times(2)).listBy(eq(Thumbnail.class),
assertArg(options -> {
var exceptOptions = ListOptions.builder()
.fieldQuery(equal(Thumbnail.ID_INDEX,
Thumbnail.idIndexFunc(hash, ThumbnailSize.M.name())
))
.build();
assertThat(options.toString()).isEqualTo(exceptOptions.toString());
}), isA(PageRequest.class));

verify(localThumbnailProvider).generate(any());

Expand Down Expand Up @@ -169,4 +177,49 @@ void createTest2() throws MalformedURLException {
.as(StepVerifier::create)
.verifyComplete();
}

@Nested
class ThumbnailGenerateConcurrencyTest {

@Test
public void concurrentThumbnailGeneration() throws InterruptedException {
var spyThumbnailService = spy(thumbnailService);

URI imageUri = URI.create("http://localhost:8090/test.jpg");

doReturn(Mono.empty()).when(spyThumbnailService).fetchThumbnail(eq(imageUri), any());

var createdUri = URI.create("/test-thumb.jpg");
doReturn(Mono.just(createdUri)).when(spyThumbnailService).create(any(), any());

int threadCount = 10;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
var latch = new CountDownLatch(threadCount);

var results = new ConcurrentLinkedQueue<Mono<URI>>();

for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
results.add(spyThumbnailService.generate(imageUri, ThumbnailSize.M));
} finally {
latch.countDown();
}
});
}

latch.await();

results.forEach(result -> {
StepVerifier.create(result)
.expectNext(createdUri)
.verifyComplete();
});

verify(spyThumbnailService).fetchThumbnail(eq(imageUri), eq(ThumbnailSize.M));
verify(spyThumbnailService).create(any(), eq(ThumbnailSize.M));

executor.shutdown();
}
}
}

0 comments on commit ec5c70f

Please sign in to comment.