Skip to content

Commit

Permalink
feat: refactor GrpcConnector to use grpc builtin reconnection
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Warmuth <[email protected]>
  • Loading branch information
warber committed Jan 3, 2025
1 parent bc06454 commit 6c2e058
Show file tree
Hide file tree
Showing 19 changed files with 959 additions and 1,145 deletions.
1 change: 0 additions & 1 deletion providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -399,5 +399,4 @@
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public final class Config {
public static final String LRU_CACHE = CacheType.LRU.getValue();
static final String DEFAULT_CACHE = LRU_CACHE;

static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 7;
static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000;

static String fallBackToEnvOrDefault(String key, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ public class FlagdOptions {
private int maxCacheSize = fallBackToEnvOrDefault(Config.MAX_CACHE_SIZE_ENV_VAR_NAME,
Config.DEFAULT_MAX_CACHE_SIZE);

/**
* Max event stream connection retries.
*/
@Builder.Default
private int maxEventStreamRetries = fallBackToEnvOrDefault(Config.MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME,
Config.DEFAULT_MAX_EVENT_STREAM_RETRIES);

/**
* Backoff interval in milliseconds.
*/
Expand All @@ -102,11 +95,12 @@ public class FlagdOptions {
Config.DEFAULT_STREAM_DEADLINE_MS);

/**
* Amount of stream retry attempts before provider moves from STALE to ERROR
* Defaults to 5
* Grace time period in milliseconds before provider moves from STALE to ERROR.
* Defaults to 50_000
*/
@Builder.Default
private int streamRetryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
private int streamRetryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD,
Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
/**
* Selector to be used with flag sync gRPC contract.
**/
Expand All @@ -116,7 +110,6 @@ public class FlagdOptions {
/**
* gRPC client KeepAlive in milliseconds. Disabled with 0.
* Defaults to 0 (disabled).
*
**/
@Builder.Default
private long keepAlive = fallBackToEnvOrDefault(Config.KEEP_ALIVE_MS_ENV_VAR_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package dev.openfeature.contrib.providers.flagd;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
Expand All @@ -22,11 +17,16 @@
import dev.openfeature.sdk.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

/**
* OpenFeature provider for flagd.
*/
@Slf4j
@SuppressWarnings({ "PMD.TooManyStaticImports", "checkstyle:NoFinalizer" })
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
public class FlagdProvider extends EventProvider {
private Function<Structure, EvaluationContext> contextEnricher;
private static final String FLAGD_PROVIDER = "flagd";
Expand Down Expand Up @@ -62,7 +62,6 @@ public FlagdProvider(final FlagdOptions options) {
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(options,
new Cache(options.getCacheType(), options.getMaxCacheSize()),
this::isConnected,
this::onConnectionEvent);
break;
default:
Expand All @@ -85,7 +84,7 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
}

this.flagResolver.init();
this.initialized = true;
this.initialized = this.connected = true;
}

@Override
Expand Down Expand Up @@ -139,7 +138,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* Set on initial connection and updated with every reconnection.
* see:
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
*
*
* @return Object map representing sync metadata
*/
protected Structure getSyncMetadata() {
Expand All @@ -148,6 +147,7 @@ protected Structure getSyncMetadata() {

/**
* The updated context mixed into all evaluations based on the sync-metadata.
*
* @return context
*/
EvaluationContext getEnrichedContext() {
Expand All @@ -159,33 +159,42 @@ private boolean isConnected() {
}

private void onConnectionEvent(ConnectionEvent connectionEvent) {
boolean previous = connected;
boolean current = connected = connectionEvent.isConnected();
final boolean wasConnected = connected;// WHY the F*** is this false? wasconnected is false ,hence no change
// event will be sent. why is was connected false? not updated via event?
final boolean isConnected = connected = connectionEvent.isConnected();

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());

// configuration changed
if (initialized && previous && current) {
log.debug("Configuration changed");
if (!initialized) {
return;
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed").build();
this.emitProviderConfigurationChanged(details);
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
}
// there was an error
if (initialized && previous && !current) {
log.debug("There has been an error");
ProviderEventDetails details = ProviderEventDetails.builder().message("there has been an error").build();
this.emitProviderError(details);

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
}
// we recovered from an error
if (initialized && !previous && current) {
log.debug("Recovered from error");
ProviderEventDetails details = ProviderEventDetails.builder().message("recovered from error").build();
this.emitProviderReady(details);
this.emitProviderConfigurationChanged(details);

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder().message("there has been an error").build());
} else {
this.emitProviderError(ProviderEventDetails.builder().message("there has been an error").build());
}
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import dev.openfeature.sdk.exceptions.GeneralError;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;


/**
* A utility class to monitor and manage the connectivity state of a gRPC ManagedChannel.
*/
@Slf4j
public class ChannelMonitor {


private ChannelMonitor() {

}

/**
* Monitors the state of a gRPC channel and triggers the specified callbacks based on state changes.
*
* @param expectedState the initial state to monitor.
* @param channel the ManagedChannel to monitor.
* @param onConnectionReady callback invoked when the channel transitions to a READY state.
* @param onConnectionLost callback invoked when the channel transitions to a FAILURE or SHUTDOWN state.
*/
public static void monitorChannelState(ConnectivityState expectedState, ManagedChannel channel,
Runnable onConnectionReady, Runnable onConnectionLost) {
channel.notifyWhenStateChanged(expectedState, () -> {
ConnectivityState currentState = channel.getState(true);
log.info("Channel state changed to: {}", currentState);
if (currentState == ConnectivityState.READY) {
onConnectionReady.run();
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE
|| currentState == ConnectivityState.SHUTDOWN) {
onConnectionLost.run();
}
// Re-register the state monitor to watch for the next state transition.
monitorChannelState(currentState, channel, onConnectionReady, onConnectionLost);
});
}


/**
* Waits for the channel to reach a desired state within a specified timeout period.
*
* @param channel the ManagedChannel to monitor.
* @param desiredState the ConnectivityState to wait for.
* @param connectCallback callback invoked when the desired state is reached.
* @param timeout the maximum amount of time to wait.
* @param unit the time unit of the timeout.
* @throws InterruptedException if the current thread is interrupted while waiting.
*/
public static void waitForDesiredState(ManagedChannel channel,
ConnectivityState desiredState,
Runnable connectCallback,
long timeout,
TimeUnit unit) throws InterruptedException {
waitForDesiredState(channel, desiredState, connectCallback, new CountDownLatch(1), timeout, unit);
}


private static void waitForDesiredState(ManagedChannel channel,
ConnectivityState desiredState,
Runnable connectCallback,
CountDownLatch latch,
long timeout,
TimeUnit unit) throws InterruptedException {
channel.notifyWhenStateChanged(ConnectivityState.SHUTDOWN, () -> {
try {
ConnectivityState state = channel.getState(true);
log.debug("Channel state changed to: {}", state);

if (state == desiredState) {
connectCallback.run();
latch.countDown();
return;
}
waitForDesiredState(channel, desiredState, connectCallback, latch, timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Thread interrupted while waiting for desired state", e);
} catch (Exception e) {
log.error("Error occurred while waiting for desired state", e);
}
});

// Await the latch or timeout for the state change
if (!latch.await(timeout, unit)) {
throw new GeneralError(String.format("Deadline exceeded. Condition did not complete within the %d "
+ "deadline", timeout));
}
}


/**
* Polls the state of a gRPC channel at regular intervals and triggers callbacks upon state changes.
*
* @param executor the ScheduledExecutorService used for polling.
* @param channel the ManagedChannel to monitor.
* @param onConnectionReady callback invoked when the channel transitions to a READY state.
* @param onConnectionLost callback invoked when the channel transitions to a FAILURE or SHUTDOWN state.
* @param pollIntervalMs the polling interval in milliseconds.
*/
public static void pollChannelState(ScheduledExecutorService executor, ManagedChannel channel,
Runnable onConnectionReady,
Runnable onConnectionLost, long pollIntervalMs) {

AtomicReference<ConnectivityState> lastState = new AtomicReference<>(ConnectivityState.READY);

Runnable pollTask = () -> {
ConnectivityState currentState = channel.getState(true);
if (currentState != lastState.get()) {
if (currentState == ConnectivityState.READY) {
log.debug("gRPC connection became READY");
onConnectionReady.run();
} else if (currentState == ConnectivityState.TRANSIENT_FAILURE
|| currentState == ConnectivityState.SHUTDOWN) {
log.debug("gRPC connection became TRANSIENT_FAILURE");
onConnectionLost.run();
}
lastState.set(currentState);
}
};
executor.scheduleAtFixedRate(pollTask, 0, pollIntervalMs, TimeUnit.MILLISECONDS);
}


/**
* Polls the channel state at fixed intervals and waits for the channel to reach a desired state within a timeout
* period.
*
* @param executor the ScheduledExecutorService used for polling.
* @param channel the ManagedChannel to monitor.
* @param desiredState the ConnectivityState to wait for.
* @param connectCallback callback invoked when the desired state is reached.
* @param timeout the maximum amount of time to wait.
* @param unit the time unit of the timeout.
* @return {@code true} if the desired state was reached within the timeout period, {@code false} otherwise.
* @throws InterruptedException if the current thread is interrupted while waiting.
*/
public static boolean pollForDesiredState(ScheduledExecutorService executor, ManagedChannel channel,
ConnectivityState desiredState, Runnable connectCallback, long timeout,
TimeUnit unit) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

Runnable waitForStateTask = () -> {
ConnectivityState currentState = channel.getState(true);
if (currentState == desiredState) {
connectCallback.run();
latch.countDown();
}
};

ScheduledFuture<?> scheduledFuture = executor.scheduleWithFixedDelay(waitForStateTask, 0, 100,
TimeUnit.MILLISECONDS);

boolean success = latch.await(timeout, unit);
scheduledFuture.cancel(true);
return success;
}
}
Loading

0 comments on commit 6c2e058

Please sign in to comment.