Skip to content

Commit

Permalink
feat: refactor GrpcConnector to use grpc builtin reconnection
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Warmuth <[email protected]>
  • Loading branch information
Bernd Warmuth committed Dec 18, 2024
1 parent d8daa07 commit f11e0d5
Show file tree
Hide file tree
Showing 15 changed files with 974 additions and 1,226 deletions.
7 changes: 6 additions & 1 deletion providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,12 @@
<version>1.20.4</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>1.69.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package dev.openfeature.contrib.providers.flagd;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
Expand All @@ -22,16 +17,21 @@
import dev.openfeature.sdk.Value;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

/**
* OpenFeature provider for flagd.
*/
@Slf4j
@SuppressWarnings({ "PMD.TooManyStaticImports", "checkstyle:NoFinalizer" })
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
public class FlagdProvider extends EventProvider {
private Function<Structure, EvaluationContext> contextEnricher;
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean isInitialized = false;
private volatile boolean connected = false;
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
Expand Down Expand Up @@ -62,7 +62,6 @@ public FlagdProvider(final FlagdOptions options) {
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(options,
new Cache(options.getCacheType(), options.getMaxCacheSize()),
this::isConnected,
this::onConnectionEvent);
break;
default:
Expand All @@ -80,17 +79,17 @@ public List<Hook> getProviderHooks() {

@Override
public synchronized void initialize(EvaluationContext evaluationContext) throws Exception {
if (this.initialized) {
if (this.isInitialized) {
return;
}

this.flagResolver.init();
this.initialized = true;
this.isInitialized = true;
}

@Override
public synchronized void shutdown() {
if (!this.initialized) {
if (!this.isInitialized) {
return;
}

Expand All @@ -99,7 +98,7 @@ public synchronized void shutdown() {
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
this.initialized = false;
this.isInitialized = false;
}
}

Expand Down Expand Up @@ -139,7 +138,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* Set on initial connection and updated with every reconnection.
* see:
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
*
*
* @return Object map representing sync metadata
*/
protected Structure getSyncMetadata() {
Expand All @@ -148,6 +147,7 @@ protected Structure getSyncMetadata() {

/**
* The updated context mixed into all evaluations based on the sync-metadata.
*
* @return context
*/
EvaluationContext getEnrichedContext() {
Expand All @@ -159,33 +159,41 @@ private boolean isConnected() {
}

private void onConnectionEvent(ConnectionEvent connectionEvent) {
boolean previous = connected;
boolean current = connected = connectionEvent.isConnected();
boolean wasConnected = connected;
boolean isConnected = connected = connectionEvent.isConnected();

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());

// configuration changed
if (initialized && previous && current) {
log.debug("Configuration changed");
if (!isInitialized) {
return;
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed").build();
this.emitProviderConfigurationChanged(details);
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
}
// there was an error
if (initialized && previous && !current) {
log.debug("There has been an error");
ProviderEventDetails details = ProviderEventDetails.builder().message("there has been an error").build();
this.emitProviderError(details);

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
}
// we recovered from an error
if (initialized && !previous && current) {
log.debug("Recovered from error");
ProviderEventDetails details = ProviderEventDetails.builder().message("recovered from error").build();
this.emitProviderReady(details);
this.emitProviderConfigurationChanged(details);

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder().message("there has been an error").build());
} else {
this.emitProviderError(ProviderEventDetails.builder().message("there has been an error").build());
}
}
}



Original file line number Diff line number Diff line change
@@ -1,69 +1,119 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import java.util.Collections;
import java.util.List;

import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Structure;
import lombok.AllArgsConstructor;
import lombok.Getter;

import java.util.Collections;
import java.util.List;

/**
* Event payload for a
* {@link dev.openfeature.contrib.providers.flagd.resolver.Resolver} connection
* state change event.
* Represents an event payload for a connection state change in a
* {@link dev.openfeature.contrib.providers.flagd.resolver.Resolver}.
* The event includes information about the connection status, any flags that have changed,
* and metadata associated with the synchronization process.
*/
@AllArgsConstructor
public class ConnectionEvent {
@Getter
private final boolean connected;

/**
* The current state of the connection.
*/
private final ConnectionState connected;

/**
* A list of flags that have changed due to this connection event.
*/
private final List<String> flagsChanged;

/**
* Metadata associated with synchronization in this connection event.
*/
private final Structure syncMetadata;

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* Constructs a new {@code ConnectionEvent} with the connection status only.
*
* @param connected {@code true} if the connection is established, otherwise {@code false}.
*/
public ConnectionEvent(boolean connected) {
this(connected ? ConnectionState.Connected() : ConnectionState.Disconnected(),
Collections.emptyList(), new ImmutableStructure());
}

/**
* Constructs a new {@code ConnectionEvent} with the specified connection state.
*
* @param connected the connection state indicating if the connection is established or not.
*/
public ConnectionEvent(ConnectionState connected) {
this(connected, Collections.emptyList(), new ImmutableStructure());
}

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* @param flagsChanged list of flags changed
* Constructs a new {@code ConnectionEvent} with the specified connection state and changed flags.
*
* @param connected the connection state indicating if the connection is established or not.
* @param flagsChanged a list of flags that have changed due to this connection event.
*/
public ConnectionEvent(boolean connected, List<String> flagsChanged) {
public ConnectionEvent(ConnectionState connected, List<String> flagsChanged) {
this(connected, flagsChanged, new ImmutableStructure());
}

/**
* Construct a new ConnectionEvent.
*
* @param connected status of the connection
* @param syncMetadata sync.getMetadata
* Constructs a new {@code ConnectionEvent} with the specified connection state and synchronization metadata.
*
* @param connected the connection state indicating if the connection is established or not.
* @param syncMetadata metadata related to the synchronization process of this event.
*/
public ConnectionEvent(boolean connected, Structure syncMetadata) {
public ConnectionEvent(ConnectionState connected, Structure syncMetadata) {
this(connected, Collections.emptyList(), new ImmutableStructure(syncMetadata.asMap()));
}

/**
* Get changed flags.
*
* @return an unmodifiable view of the changed flags
* Constructs a new {@code ConnectionEvent} with the specified connection state, changed flags, and synchronization metadata.
*
* @param connectionState the state of the connection.
* @param flagsChanged a list of flags that have changed due to this connection event.
* @param syncMetadata metadata related to the synchronization process of this event.
*/
public ConnectionEvent(ConnectionState connectionState, List<String> flagsChanged, Structure syncMetadata) {
this.connected = connectionState;
this.flagsChanged = flagsChanged != null ? flagsChanged : Collections.emptyList(); // Ensure non-null list
this.syncMetadata = syncMetadata != null ? new ImmutableStructure(syncMetadata.asMap()) : new ImmutableStructure(); // Ensure valid syncMetadata
}

/**
* Retrieves an unmodifiable view of the list of changed flags.
*
* @return an unmodifiable list of changed flags.
*/
public List<String> getFlagsChanged() {
return Collections.unmodifiableList(flagsChanged);
}

/**
* Get changed sync metadata represented as SDK structure type.
*
* @return an unmodifiable view of the sync metadata
* Retrieves the synchronization metadata represented as an immutable SDK structure type.
*
* @return an immutable structure containing the synchronization metadata.
*/
public Structure getSyncMetadata() {
return new ImmutableStructure(syncMetadata.asMap());
}

/**
* Indicates whether the current connection state is connected.
*
* @return {@code true} if connected, otherwise {@code false}.
*/
public boolean isConnected() {
return this.connected.isConnected();
}

/**
* Indicates whether the current connection state is stale.
*
* @return {@code true} if stale, otherwise {@code false}.
*/
public boolean isStale() {
return this.connected.isStale();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package dev.openfeature.contrib.providers.flagd.resolver.common;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
* Represents the state of a connection, indicating whether it is connected,
* disconnected, or stale.
*
* This class is immutable and uses the {@link lombok.AllArgsConstructor} annotation
* to generate a constructor with parameters for all fields.
* It also uses {@link lombok.Getter} to provide getter methods for the fields.
*/
@AllArgsConstructor
public class ConnectionState {

/**
* Indicates whether the connection is currently active.
*/
@Getter
private final boolean connected;

/**
* Indicates whether the connection is stale (e.g., no longer valid or in a degraded state).
*/
@Getter
private final boolean stale;

/**
* Returns a {@code ConnectionState} representing a connected state.
*
* @return a new {@code ConnectionState} instance where {@code connected} is {@code true}
* and {@code stale} is {@code false}.
*/
public static ConnectionState Connected() {
return new ConnectionState(true, false);
}

/**
* Returns a {@code ConnectionState} representing a disconnected state.
*
* @return a new {@code ConnectionState} instance where {@code connected} is {@code false}
* and {@code stale} is {@code false}.
*/
public static ConnectionState Disconnected() {
return new ConnectionState(false, false);
}

/**
* Returns a {@code ConnectionState} representing a stale state.
*
* @return a new {@code ConnectionState} instance where {@code connected} is {@code false}
* and {@code stale} is {@code true}.
*/
public static ConnectionState Stale() {
return new ConnectionState(false, true);
}
}
Loading

0 comments on commit f11e0d5

Please sign in to comment.