Skip to content

Commit

Permalink
feat: refactor GrpcConnector to use grpc builtin reconnection
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Warmuth <[email protected]>
  • Loading branch information
warber committed Dec 19, 2024
1 parent d8daa07 commit 737bd9d
Show file tree
Hide file tree
Showing 18 changed files with 700 additions and 1,158 deletions.
7 changes: 6 additions & 1 deletion providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@
<version>1.20.4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>1.69.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public final class Config {
public static final String LRU_CACHE = CacheType.LRU.getValue();
static final String DEFAULT_CACHE = LRU_CACHE;

static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 7;
static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000;

static String fallBackToEnvOrDefault(String key, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ public class FlagdOptions {
private int maxCacheSize = fallBackToEnvOrDefault(Config.MAX_CACHE_SIZE_ENV_VAR_NAME,
Config.DEFAULT_MAX_CACHE_SIZE);

/**
* Max event stream connection retries.
*/
@Builder.Default
private int maxEventStreamRetries = fallBackToEnvOrDefault(Config.MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME,
Config.DEFAULT_MAX_EVENT_STREAM_RETRIES);

/**
* Backoff interval in milliseconds.
*/
Expand All @@ -102,11 +95,12 @@ public class FlagdOptions {
Config.DEFAULT_STREAM_DEADLINE_MS);

/**
* Amount of stream retry attempts before provider moves from STALE to ERROR
* Defaults to 5
* Grace time period in milliseconds before provider moves from STALE to ERROR.
* Defaults to 50_000
*/
@Builder.Default
private int streamRetryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
private int streamRetryGracePeriod = fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD,
Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
/**
* Selector to be used with flag sync gRPC contract.
**/
Expand All @@ -116,7 +110,6 @@ public class FlagdOptions {
/**
* gRPC client KeepAlive in milliseconds. Disabled with 0.
* Defaults to 0 (disabled).
*
**/
@Builder.Default
private long keepAlive = fallBackToEnvOrDefault(Config.KEEP_ALIVE_MS_ENV_VAR_NAME,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package dev.openfeature.contrib.providers.flagd;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
Expand All @@ -22,16 +17,21 @@
import dev.openfeature.sdk.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

/**
* OpenFeature provider for flagd.
*/
@Slf4j
@SuppressWarnings({ "PMD.TooManyStaticImports", "checkstyle:NoFinalizer" })
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
public class FlagdProvider extends EventProvider {
private Function<Structure, EvaluationContext> contextEnricher;
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean isInitialized = false;
private volatile boolean connected = false;
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
Expand Down Expand Up @@ -62,7 +62,6 @@ public FlagdProvider(final FlagdOptions options) {
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(options,
new Cache(options.getCacheType(), options.getMaxCacheSize()),
this::isConnected,
this::onConnectionEvent);
break;
default:
Expand All @@ -80,17 +79,17 @@ public List<Hook> getProviderHooks() {

@Override
public synchronized void initialize(EvaluationContext evaluationContext) throws Exception {
if (this.initialized) {
if (this.isInitialized) {
return;
}

this.flagResolver.init();
this.initialized = true;
this.isInitialized = true;
}

@Override
public synchronized void shutdown() {
if (!this.initialized) {
if (!this.isInitialized) {
return;
}

Expand All @@ -99,7 +98,7 @@ public synchronized void shutdown() {
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
this.initialized = false;
this.isInitialized = false;
}
}

Expand Down Expand Up @@ -139,7 +138,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* Set on initial connection and updated with every reconnection.
* see:
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
*
*
* @return Object map representing sync metadata
*/
protected Structure getSyncMetadata() {
Expand All @@ -148,6 +147,7 @@ protected Structure getSyncMetadata() {

/**
* The updated context mixed into all evaluations based on the sync-metadata.
*
* @return context
*/
EvaluationContext getEnrichedContext() {
Expand All @@ -159,33 +159,41 @@ private boolean isConnected() {
}

private void onConnectionEvent(ConnectionEvent connectionEvent) {
boolean previous = connected;
boolean current = connected = connectionEvent.isConnected();
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());

// configuration changed
if (initialized && previous && current) {
log.debug("Configuration changed");
if (!isInitialized) {
return;
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed").build();
this.emitProviderConfigurationChanged(details);
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
}
// there was an error
if (initialized && previous && !current) {
log.debug("There has been an error");
ProviderEventDetails details = ProviderEventDetails.builder().message("there has been an error").build();
this.emitProviderError(details);

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
}
// we recovered from an error
if (initialized && !previous && current) {
log.debug("Recovered from error");
ProviderEventDetails details = ProviderEventDetails.builder().message("recovered from error").build();
this.emitProviderReady(details);
this.emitProviderConfigurationChanged(details);

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder().message("there has been an error").build());
} else {
this.emitProviderError(ProviderEventDetails.builder().message("there has been an error").build());
}
}
}



Original file line number Diff line number Diff line change
@@ -1,69 +1,122 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import java.util.Collections;
import java.util.List;

import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Structure;
import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.Collections;
import java.util.List;

/**
* Event payload for a
* {@link dev.openfeature.contrib.providers.flagd.resolver.Resolver} connection
* state change event.
* Represents an event payload for a connection state change in a
* {@link dev.openfeature.contrib.providers.flagd.resolver.Resolver}.
* The event includes information about the connection status, any flags that have changed,
* and metadata associated with the synchronization process.
*/
@AllArgsConstructor
public class ConnectionEvent {
@Getter
private final boolean connected;

/**
* The current state of the connection.
*/
private final ConnectionState connected;

/**
* A list of flags that have changed due to this connection event.
*/
private final List<String> flagsChanged;

/**
* Metadata associated with synchronization in this connection event.
*/
private final Structure syncMetadata;

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* Constructs a new {@code ConnectionEvent} with the connection status only.
*
* @param connected {@code true} if the connection is established, otherwise {@code false}.
*/
public ConnectionEvent(boolean connected) {
this(connected ? ConnectionState.CONNECTED : ConnectionState.DISCONNECTED, Collections.emptyList(),
new ImmutableStructure());
}

/**
* Constructs a new {@code ConnectionEvent} with the specified connection state.
*
* @param connected the connection state indicating if the connection is established or not.
*/
public ConnectionEvent(ConnectionState connected) {
this(connected, Collections.emptyList(), new ImmutableStructure());
}

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* @param flagsChanged list of flags changed
* Constructs a new {@code ConnectionEvent} with the specified connection state and changed flags.
*
* @param connected the connection state indicating if the connection is established or not.
* @param flagsChanged a list of flags that have changed due to this connection event.
*/
public ConnectionEvent(boolean connected, List<String> flagsChanged) {
public ConnectionEvent(ConnectionState connected, List<String> flagsChanged) {
this(connected, flagsChanged, new ImmutableStructure());
}

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* @param syncMetadata sync.getMetadata
* Constructs a new {@code ConnectionEvent} with the specified connection state and synchronization metadata.
*
* @param connected the connection state indicating if the connection is established or not.
* @param syncMetadata metadata related to the synchronization process of this event.
*/
public ConnectionEvent(boolean connected, Structure syncMetadata) {
public ConnectionEvent(ConnectionState connected, Structure syncMetadata) {
this(connected, Collections.emptyList(), new ImmutableStructure(syncMetadata.asMap()));
}

/**
* Get changed flags.
*
* @return an unmodifiable view of the changed flags
* Constructs a new {@code ConnectionEvent} with the specified connection state, changed flags, and
* synchronization metadata.
*
* @param connectionState the state of the connection.
* @param flagsChanged a list of flags that have changed due to this connection event.
* @param syncMetadata metadata related to the synchronization process of this event.
*/
public ConnectionEvent(ConnectionState connectionState, List<String> flagsChanged, Structure syncMetadata) {
this.connected = connectionState;
this.flagsChanged = flagsChanged != null ? flagsChanged : Collections.emptyList(); // Ensure non-null list
this.syncMetadata = syncMetadata != null ? new ImmutableStructure(syncMetadata.asMap()) :
new ImmutableStructure(); // Ensure valid syncMetadata
}

/**
* Retrieves an unmodifiable view of the list of changed flags.
*
* @return an unmodifiable list of changed flags.
*/
public List<String> getFlagsChanged() {
return Collections.unmodifiableList(flagsChanged);
}

/**
* Get changed sync metadata represented as SDK structure type.
*
* @return an unmodifiable view of the sync metadata
* Retrieves the synchronization metadata represented as an immutable SDK structure type.
*
* @return an immutable structure containing the synchronization metadata.
*/
public Structure getSyncMetadata() {
return new ImmutableStructure(syncMetadata.asMap());
}

/**
* Indicates whether the current connection state is connected.
*
* @return {@code true} if connected, otherwise {@code false}.
*/
public boolean isConnected() {
return this.connected == ConnectionState.CONNECTED;
}

/**
* Indicates
* whether the current connection state is stale.
*
* @return {@code true} if stale, otherwise {@code false}.
*/
public boolean isStale() {
return this.connected == ConnectionState.STALE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

/**
* Represents the possible states of a connection.
*/
public enum ConnectionState {

/**
* The connection is active and functioning as expected.
*/
CONNECTED,

/**
* The connection is not active and has been fully disconnected.
*/
DISCONNECTED,

/**
* The connection is inactive or degraded but may still recover.
*/
STALE,

/**
* The connection has encountered an error and cannot function correctly.
*/
ERROR,
}
Loading

0 comments on commit 737bd9d

Please sign in to comment.