Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Use grpc intern reconnections for rpc event stream #1112

Merged
merged 4 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public final class Config {

static final int DEFAULT_DEADLINE = 500;
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
static final long DEFAULT_KEEP_ALIVE = 0;

Expand All @@ -35,6 +36,7 @@ public final class Config {
static final String KEEP_ALIVE_MS_ENV_VAR_NAME_OLD = "FLAGD_KEEP_ALIVE_TIME";
static final String KEEP_ALIVE_MS_ENV_VAR_NAME = "FLAGD_KEEP_ALIVE_TIME_MS";
static final String TARGET_URI_ENV_VAR_NAME = "FLAGD_TARGET_URI";
static final String STREAM_RETRY_GRACE_PERIOD = "FLAGD_RETRY_GRACE_PERIOD";

static final String RESOLVER_RPC = "rpc";
static final String RESOLVER_IN_PROCESS = "in-process";
Expand All @@ -52,7 +54,6 @@ public final class Config {
public static final String LRU_CACHE = CacheType.LRU.getValue();
static final String DEFAULT_CACHE = LRU_CACHE;

static final int DEFAULT_MAX_EVENT_STREAM_RETRIES = 5;
static final int BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000;

static String fallBackToEnvOrDefault(String key, String defaultValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,114 +13,147 @@
import lombok.Builder;
import lombok.Getter;

/** FlagdOptions is a builder to build flagd provider options. */
/**
* FlagdOptions is a builder to build flagd provider options.
*/
@Builder
@Getter
@SuppressWarnings("PMD.TooManyStaticImports")
public class FlagdOptions {

/** flagd resolving type. */
/**
warber marked this conversation as resolved.
Show resolved Hide resolved
* flagd resolving type.
*/
private Config.EvaluatorType resolverType;

/** flagd connection host. */
/**
* flagd connection host.
*/
@Builder.Default
private String host = fallBackToEnvOrDefault(Config.HOST_ENV_VAR_NAME, Config.DEFAULT_HOST);

/** flagd connection port. */
/**
* flagd connection port.
*/
private int port;

/** Use TLS connectivity. */
/**
* Use TLS connectivity.
*/
@Builder.Default
private boolean tls = Boolean.parseBoolean(fallBackToEnvOrDefault(Config.TLS_ENV_VAR_NAME, Config.DEFAULT_TLS));

/** TLS certificate overriding if TLS connectivity is used. */
/**
* TLS certificate overriding if TLS connectivity is used.
*/
@Builder.Default
private String certPath = fallBackToEnvOrDefault(Config.SERVER_CERT_PATH_ENV_VAR_NAME, null);

/** Unix socket path to flagd. */
/**
* Unix socket path to flagd.
*/
@Builder.Default
private String socketPath = fallBackToEnvOrDefault(Config.SOCKET_PATH_ENV_VAR_NAME, null);

/** Cache type to use. Supports - lru, disabled. */
/**
* Cache type to use. Supports - lru, disabled.
*/
@Builder.Default
private String cacheType = fallBackToEnvOrDefault(Config.CACHE_ENV_VAR_NAME, Config.DEFAULT_CACHE);

/** Max cache size. */
/**
* Max cache size.
*/
@Builder.Default
private int maxCacheSize =
fallBackToEnvOrDefault(Config.MAX_CACHE_SIZE_ENV_VAR_NAME, Config.DEFAULT_MAX_CACHE_SIZE);

/** Max event stream connection retries. */
@Builder.Default
private int maxEventStreamRetries = fallBackToEnvOrDefault(
Config.MAX_EVENT_STREAM_RETRIES_ENV_VAR_NAME, Config.DEFAULT_MAX_EVENT_STREAM_RETRIES);

/** Backoff interval in milliseconds. */
/**
* Backoff interval in milliseconds.
*/
@Builder.Default
private int retryBackoffMs = fallBackToEnvOrDefault(
Config.BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME, Config.BASE_EVENT_STREAM_RETRY_BACKOFF_MS);

/**
* Connection deadline in milliseconds. For RPC resolving, this is the deadline to connect to
* flagd for flag evaluation. For in-process resolving, this is the deadline for sync stream
* termination.
* Connection deadline in milliseconds.
* For RPC resolving, this is the deadline to connect to flagd for flag
* evaluation.
* For in-process resolving, this is the deadline for sync stream termination.
*/
@Builder.Default
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);

/**
* Streaming connection deadline in milliseconds. Set to 0 to disable the deadline. Defaults to
* 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
* Streaming connection deadline in milliseconds.
* Set to 0 to disable the deadline.
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
*/
@Builder.Default
private int streamDeadlineMs =
fallBackToEnvOrDefault(Config.STREAM_DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_STREAM_DEADLINE_MS);

/** Selector to be used with flag sync gRPC contract. */
/**
* Grace time period in seconds before provider moves from STALE to ERROR.
* Defaults to 5
*/
@Builder.Default
private int retryGracePeriod =
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
/**
* Selector to be used with flag sync gRPC contract.
**/
@Builder.Default
private String selector = fallBackToEnvOrDefault(Config.SOURCE_SELECTOR_ENV_VAR_NAME, null);

/** gRPC client KeepAlive in milliseconds. Disabled with 0. Defaults to 0 (disabled). */
/**
* gRPC client KeepAlive in milliseconds. Disabled with 0.
* Defaults to 0 (disabled).
**/
@Builder.Default
private long keepAlive = fallBackToEnvOrDefault(
Config.KEEP_ALIVE_MS_ENV_VAR_NAME,
fallBackToEnvOrDefault(Config.KEEP_ALIVE_MS_ENV_VAR_NAME_OLD, Config.DEFAULT_KEEP_ALIVE));

/**
* File source of flags to be used by offline mode. Setting this enables the offline mode of the
* in-process provider.
* File source of flags to be used by offline mode.
* Setting this enables the offline mode of the in-process provider.
*/
@Builder.Default
private String offlineFlagSourcePath = fallBackToEnvOrDefault(Config.OFFLINE_SOURCE_PATH, null);

/**
* gRPC custom target string.
*
* <p>Setting this will allow user to use custom gRPC name resolver at present we are supporting
* all core resolver along with a custom resolver for envoy proxy resolution. For more visit
* (https://grpc.io/docs/guides/custom-name-resolution/)
* <p>Setting this will allow user to use custom gRPC name resolver at present
* we are supporting all core resolver along with a custom resolver for envoy proxy
* resolution. For more visit (https://grpc.io/docs/guides/custom-name-resolution/)
*/
@Builder.Default
private String targetUri = fallBackToEnvOrDefault(Config.TARGET_URI_ENV_VAR_NAME, null);

/**
* Function providing an EvaluationContext to mix into every evaluations. The sync-metadata
* response
* Function providing an EvaluationContext to mix into every evaluations.
* The sync-metadata response
* (https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.GetMetadataResponse),
* represented as a {@link dev.openfeature.sdk.Structure}, is passed as an argument. This function
* runs every time the provider (re)connects, and its result is cached and used in every
* evaluation. By default, the entire sync response (converted to a Structure) is used.
* represented as a {@link dev.openfeature.sdk.Structure}, is passed as an
* argument.
* This function runs every time the provider (re)connects, and its result is cached and used in every evaluation.
* By default, the entire sync response (converted to a Structure) is used.
*/
@Builder.Default
private Function<Structure, EvaluationContext> contextEnricher =
(syncMetadata) -> new ImmutableContext(syncMetadata.asMap());

/** Inject a Custom Connector for fetching flags. */
/**
* Inject a Custom Connector for fetching flags.
*/
private Connector customConnector;

/**
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate distributed tracing
* for flagd grpc connectivity.
* Inject OpenTelemetry for the library runtime. Providing sdk will initiate
* distributed tracing for flagd grpc
* connectivity.
*/
private OpenTelemetry openTelemetry;

Expand All @@ -139,11 +172,14 @@ public FlagdOptions build() {
};
}

/** Overload default lombok builder. */
/**
* Overload default lombok builder.
*/
public static class FlagdOptionsBuilder {
/**
* Enable OpenTelemetry instance extraction from GlobalOpenTelemetry. Note that, this is only
* useful if global configurations are registered.
* Enable OpenTelemetry instance extraction from GlobalOpenTelemetry. Note that,
* this is only useful if global
* configurations are registered.
*/
public FlagdOptionsBuilder withGlobalTelemetry(final boolean b) {
if (b) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

/** OpenFeature provider for flagd. */
/**
* OpenFeature provider for flagd.
*/
@Slf4j
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
public class FlagdProvider extends EventProvider {
Expand All @@ -38,7 +40,9 @@ protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
}

/** Create a new FlagdProvider instance with default options. */
/**
* Create a new FlagdProvider instance with default options.
*/
public FlagdProvider() {
this(FlagdOptions.builder().build());
}
Expand All @@ -55,10 +59,7 @@ public FlagdProvider(final FlagdOptions options) {
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
options,
new Cache(options.getCacheType(), options.getMaxCacheSize()),
this::isConnected,
this::onConnectionEvent);
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onConnectionEvent);
break;
default:
throw new IllegalStateException(
Expand All @@ -80,7 +81,7 @@ public synchronized void initialize(EvaluationContext evaluationContext) throws
}

this.flagResolver.init();
this.initialized = true;
this.initialized = this.connected = true;
Copy link
Contributor Author

@warber warber Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think here we have a potential race condition between event emiting stuff happening in init()and setting these fields and the logic which depends on them (onConnectionEvent(...))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do think, that the state of initialization is not bound to the connection state. i can be initialized but not connected, or there could have been a connection issue in the middle. Maybe it would be better if the .init() method returns a boolean. The provider it self should/does not know about the status connected in theory. because that is something, which happens in this pr purely because we only have grpc connection. But on the other hand for inprocess, we do have a filewatcher, what does connected mean here? is the method onConnectionEvent correct, or is this more an abstraction of the underlying provider events, which we could directlu y use? All this discussion brings me more to the conclusio, that this connection logic etc should be part of the connector, and not of this outside class.

As this pr is already quiet big, and this is a potential race condition but with limited impact, i would suggest to move forward with this pr as is. Start the migration of the in-process to this really cool and neat reusable implementation of our connector. Afterwards we can tackle this in a better and controlled manner, as soon as both implementations are normalized

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

}

@Override
Expand Down Expand Up @@ -129,8 +130,10 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
}

/**
* An unmodifiable view of a Structure representing the latest result of the SyncMetadata. Set on
* initial connection and updated with every reconnection. see:
* An unmodifiable view of a Structure representing the latest result of the
* SyncMetadata.
* Set on initial connection and updated with every reconnection.
* see:
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
*
* @return Object map representing sync metadata
Expand All @@ -153,38 +156,42 @@ private boolean isConnected() {
}

private void onConnectionEvent(ConnectionEvent connectionEvent) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok with this method not being thread safe?

boolean previous = connected;
boolean current = connected = connectionEvent.isConnected();
final boolean wasConnected = connected;
warber marked this conversation as resolved.
Show resolved Hide resolved
final boolean isConnected = connected = connectionEvent.isConnected();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😎


syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());

// configuration changed
if (initialized && previous && current) {
log.debug("Configuration changed");
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
if (!initialized) {
return;
}
// there was an error
if (initialized && previous && !current) {
log.debug("There has been an error");

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.message("there has been an error")
.flagsChanged(connectionEvent.getFlagsChanged())
.message("connected to flagd")
.build();
this.emitProviderError(details);
this.emitProviderReady(details);
return;
}
// we recovered from an error
if (initialized && !previous && current) {
log.debug("Recovered from error");

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.message("recovered from error")
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderReady(details);
this.emitProviderConfigurationChanged(details);
return;
}

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());
} else {
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
}
}
}
Loading
Loading