Skip to content

Commit

Permalink
Refactor plugin reconciliation to ensure only one update on plugin
Browse files Browse the repository at this point in the history
Signed-off-by: John Niang <[email protected]>
  • Loading branch information
JohnNiang committed Jan 9, 2024
1 parent 0b30c0d commit d454eca
Show file tree
Hide file tree
Showing 22 changed files with 928 additions and 1,385 deletions.
17 changes: 16 additions & 1 deletion api/src/main/java/run/halo/app/core/extension/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ public static class License {
@Data
public static class PluginStatus {

private PluginState phase;
private Phase phase;

private ConditionList conditions;

private Instant lastStartTime;

private PluginState lastProbeState;

private String entry;

private String stylesheet;
Expand All @@ -134,6 +136,19 @@ public static ConditionList nullSafeConditions(@NonNull PluginStatus status) {
}
}

public enum Phase {
PENDING,
STARTING,
CREATED,
DISABLED,
RESOLVED,
STARTED,
STOPPED,
FAILED,
UNKNOWN,
;
}

@Data
@ToString
public static class PluginAuthor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,17 @@ public void run() {
log.debug("{} >>> Reconciled request: {} with result: {}, usage: {}",
this.name, entry.getEntry(), result, watch.getTotalTimeMillis());
} catch (Throwable t) {
result = new Reconciler.Result(true, null);
if (t instanceof OptimisticLockingFailureException) {
log.warn("Optimistic locking failure when reconciling request: {}/{}",
this.name, entry.getEntry());
} else if (t instanceof RequeueException re) {
result = re.getResult();
} else {
log.error("Reconciler in " + this.name
+ " aborted with an error, re-enqueuing...",
t);
}
result = new Reconciler.Result(true, null);
} finally {
queue.done(entry.getEntry());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ record Result(boolean reEnqueue, Duration retryAfter) {
public static Result doNotRetry() {
return new Result(false, null);
}

public static Result requeue(Duration retryAfter) {
return new Result(true, retryAfter);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package run.halo.app.extension.controller;

import run.halo.app.extension.controller.Reconciler.Result;


/**
* Requeue with result data after throwing this exception.
*
* @author johnniang
*/
public class RequeueException extends RuntimeException {

private final Result result;

public RequeueException(Result result) {
this(result, null);
}

public RequeueException(Result result, String reason) {
this(result, reason, null);
}

public RequeueException(Result result, String reason, Throwable t) {
super(reason, t);
this.result = result;
}

public Result getResult() {
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -139,6 +140,47 @@ void shouldReRunIfReconcilerThrowException() throws InterruptedException {
verify(reconciler, times(1)).reconcile(any(Request.class));
}

@Test
void canReRunIfReconcilerThrowRequeueException() throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), Duration.ofSeconds(1), () -> now
))
.thenThrow(InterruptedException.class);
when(queue.add(any())).thenReturn(true);
var expectException = new RequeueException(Result.requeue(Duration.ofSeconds(2)));
when(reconciler.reconcile(any(Request.class))).thenThrow(expectException);

controller.new Worker().run();

verify(synchronizer).start();
verify(queue, times(2)).take();
verify(queue).done(any());
verify(queue).add(argThat(de ->
de.getEntry().name().equals("fake-request")
&& de.getRetryAfter().equals(Duration.ofSeconds(2))));
verify(reconciler).reconcile(any(Request.class));
}

@Test
void doNotReRunIfReconcilerThrowsRequeueExceptionWithoutRequeue()
throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
new Request("fake-request"), Duration.ofSeconds(1), () -> now
))
.thenThrow(InterruptedException.class);
var expectException = new RequeueException(Result.doNotRetry());
when(reconciler.reconcile(any(Request.class))).thenThrow(expectException);

controller.new Worker().run();

verify(synchronizer).start();
verify(queue, times(2)).take();
verify(queue).done(any());

verify(queue, never()).add(any());
verify(reconciler).reconcile(any(Request.class));
}

@Test
void shouldSetMinRetryAfterWhenTakeZeroDelayedEntry() throws InterruptedException {
when(queue.take()).thenReturn(new DelayedEntry<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.pf4j.PluginState;
import org.reactivestreams.Publisher;
import org.springdoc.webflux.core.fn.SpringdocRouteBuilder;
import org.springframework.beans.factory.DisposableBean;
Expand Down Expand Up @@ -297,10 +296,10 @@ Mono<ServerResponse> changePluginRunningState(ServerRequest request) {
// when enabled = false,excepted phase = !started
var phase = p.statusNonNull().getPhase();
if (enable) {
return PluginState.STARTED.equals(phase)
|| PluginState.FAILED.equals(phase);
return Plugin.Phase.STARTED.equals(phase)
|| Plugin.Phase.FAILED.equals(phase);
}
return !PluginState.STARTED.equals(phase);
return !Plugin.Phase.STARTED.equals(phase);
});
});
})
Expand Down
Loading

0 comments on commit d454eca

Please sign in to comment.