Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Warmuth <[email protected]>
  • Loading branch information
warber committed Jan 7, 2025
1 parent 38f8e89 commit fa83483
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -98,78 +95,4 @@ private static void waitForDesiredState(
"Deadline exceeded. Condition did not complete within the %d " + "deadline", timeout));
}
}

/**
* Polls the state of a gRPC channel at regular intervals and triggers callbacks upon state changes.
*
* @param executor the ScheduledExecutorService used for polling.
* @param channel the ManagedChannel to monitor.
* @param onConnectionReady callback invoked when the channel transitions to a READY state.
* @param onConnectionLost callback invoked when the channel transitions to a FAILURE or SHUTDOWN state.
* @param pollIntervalMs the polling interval in milliseconds.
*/
public static void pollChannelState(
ScheduledExecutorService executor,
ManagedChannel channel,
Runnable onConnectionReady,
Runnable onConnectionLost,
long pollIntervalMs) {

AtomicReference<ConnectivityState> lastState = new AtomicReference<>(ConnectivityState.READY);

Runnable pollTask = () -> {
ConnectivityState currentState = channel.getState(true);
if (currentState != lastState.get()) {
if (currentState == ConnectivityState.READY) {
log.debug("gRPC connection became READY");
onConnectionReady.run();
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE
|| currentState == ConnectivityState.SHUTDOWN) {
log.debug("gRPC connection became TRANSIENT_FAILURE");
onConnectionLost.run();
}
lastState.set(currentState);
}
};
executor.scheduleAtFixedRate(pollTask, 0, pollIntervalMs, TimeUnit.MILLISECONDS);
}

/**
* Polls the channel state at fixed intervals and waits for the channel to reach a desired state within a timeout
* period.
*
* @param executor the ScheduledExecutorService used for polling.
* @param channel the ManagedChannel to monitor.
* @param desiredState the ConnectivityState to wait for.
* @param connectCallback callback invoked when the desired state is reached.
* @param timeout the maximum amount of time to wait.
* @param unit the time unit of the timeout.
* @return {@code true} if the desired state was reached within the timeout period, {@code false} otherwise.
* @throws InterruptedException if the current thread is interrupted while waiting.
*/
public static boolean pollForDesiredState(
ScheduledExecutorService executor,
ManagedChannel channel,
ConnectivityState desiredState,
Runnable connectCallback,
long timeout,
TimeUnit unit)
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

Runnable waitForStateTask = () -> {
ConnectivityState currentState = channel.getState(true);
if (currentState == desiredState) {
connectCallback.run();
latch.countDown();
}
};

ScheduledFuture<?> scheduledFuture =
executor.scheduleWithFixedDelay(waitForStateTask, 0, 100, TimeUnit.MILLISECONDS);

boolean success = latch.await(timeout, unit);
scheduledFuture.cancel(true);
return success;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) {
* Handles provider readiness events by clearing the cache (if enabled) and notifying listeners of readiness.
*/
private void handleProviderReadyEvent() {
this.onConnectionEvent.accept(true, Collections.emptyList()); // TODO: check if this is needed
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
this.cache.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,6 @@ public void change() {
verify(cache, atLeast(1)).clear();
}

@Test
public void ready() {
EventStreamResponse resp = mock(EventStreamResponse.class);
when(resp.getType()).thenReturn("provider_ready");
stream.onNext(resp);
// we notify that we are ready
assertEquals(1, states.size());
assertTrue(states.get(0));
// cache was cleaned
verify(cache, atLeast(1)).clear();
}

@Test
public void cacheBustingForKnownKeys() {
final String key1 = "myKey1";
Expand Down

0 comments on commit fa83483

Please sign in to comment.