From e38778af994fb1c8f3030594acc7dc06f974eaa1 Mon Sep 17 00:00:00 2001 From: guqing Date: Fri, 19 Jan 2024 14:19:49 +0800 Subject: [PATCH] refactor: client writable check for extension --- .../ReactiveExtensionClientImpl.java | 46 +++++++++++-------- .../ReactiveExtensionClientTest.java | 1 - 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java index 116dd7e142..2bbe08ad0b 100644 --- a/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java +++ b/application/src/main/java/run/halo/app/extension/ReactiveExtensionClientImpl.java @@ -11,6 +11,8 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import lombok.NonNull; @@ -55,7 +57,8 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient { private final IndexedQueryEngine indexedQueryEngine; - private final AtomicBoolean ready = new AtomicBoolean(false); + private final ConcurrentMap indexBuildingState = + new ConcurrentHashMap<>(); @Override public Flux list(Class type, Predicate predicate, @@ -150,7 +153,7 @@ private Mono get(GroupVersionKind gvk, String name) { @Override @Transactional public Mono create(E extension) { - checkClientReady(); + checkClientWritable(extension); return Mono.just(extension) .doOnNext(ext -> { var metadata = extension.getMetadata(); @@ -184,7 +187,7 @@ && hasText(extension.getMetadata().getGenerateName())) @Override @Transactional public Mono update(E extension) { - checkClientReady(); + checkClientWritable(extension); // Refactor the atomic reference if we have a better solution. return getLatest(extension).flatMap(old -> { var oldJsonExt = new JsonExtension(objectMapper, old); @@ -222,7 +225,7 @@ private Mono getLatest(Extension extension) { @Override @Transactional public Mono delete(E extension) { - checkClientReady(); + checkClientWritable(extension); // set deletionTimestamp extension.getMetadata().setDeletionTimestamp(Instant.now()); var extensionStore = converter.convertTo(extension); @@ -259,19 +262,21 @@ Mono doUpdate(E oldExtension, String name, Long version }); } - private void checkClientReady() { - if (!ready.get()) { - throw new IllegalStateException("Service is not ready yet"); + /** + * If the extension is being updated, we should the index is not building index for the + * extension, otherwise the {@link IllegalStateException} will be thrown. + */ + private void checkClientWritable(E extension) { + var buildingState = indexBuildingState.get(extension.groupVersionKind().groupKind()); + if (buildingState != null && buildingState.get()) { + throw new IllegalStateException("Index is building for " + extension.groupVersionKind() + + ", please wait for a moment and try again."); } } - /** - * Internal method for changing the ready state of the client. - * - * @param ready ready state - */ - void setReady(boolean ready) { - this.ready.set(ready); + void setIndexBuildingStateFor(GroupKind groupKind, boolean building) { + indexBuildingState.computeIfAbsent(groupKind, k -> new AtomicBoolean(building)) + .set(building); } @Override @@ -334,13 +339,11 @@ public void startBuildingIndex() { final long startTimeMs = System.currentTimeMillis(); log.info("Start building index for all extensions, please wait..."); schemeManager.schemes() - .forEach(scheme -> indexerFactory.createIndexerFor(scheme.type(), - createExtensionIterator(scheme))); + .forEach(this::createIndexerFor); schemeWatcherManager.register(event -> { if (event instanceof SchemeWatcherManager.SchemeRegistered schemeRegistered) { - var scheme = schemeRegistered.getNewScheme(); - indexerFactory.createIndexerFor(scheme.type(), createExtensionIterator(scheme)); + createIndexerFor(schemeRegistered.getNewScheme()); return; } if (event instanceof SchemeWatcherManager.SchemeUnregistered schemeUnregistered) { @@ -350,9 +353,12 @@ public void startBuildingIndex() { }); log.info("Successfully built index in {}ms, Preparing to lunch application...", System.currentTimeMillis() - startTimeMs); + } - // mark client as ready - setReady(true); + private void createIndexerFor(Scheme scheme) { + setIndexBuildingStateFor(scheme.groupVersionKind().groupKind(), true); + indexerFactory.createIndexerFor(scheme.type(), createExtensionIterator(scheme)); + setIndexBuildingStateFor(scheme.groupVersionKind().groupKind(), false); } } } diff --git a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java index e9a20eb951..dc7cd9528b 100644 --- a/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java +++ b/application/src/test/java/run/halo/app/extension/ReactiveExtensionClientTest.java @@ -73,7 +73,6 @@ class ReactiveExtensionClientTest { @BeforeEach void setUp() { - client.setReady(true); lenient().when(schemeManager.get(eq(FakeExtension.class))) .thenReturn(fakeScheme); lenient().when(schemeManager.get(eq(fakeScheme.groupVersionKind()))).thenReturn(fakeScheme);