From 40022f7e1517603a860faa57527c67c0c36def32 Mon Sep 17 00:00:00 2001 From: Mark Phelps <209477+markphelps@users.noreply.github.com> Date: Thu, 17 Oct 2024 11:20:52 -0400 Subject: [PATCH] refactor(ffi): refactor engine for streaming (#430) * refactor: decouple evaluator and parser to support streaming Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: use channels for message passing Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: handle errors from http Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: return response from fetcher Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * feat: impl streaming Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: rm streaming file Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fix async tests Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: handle json parsing error Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: rm unneeded test Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: rm tokio from main deps Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fix sending async Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: minor cleanup Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: add some logging Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: try to fix thread lifecycle management Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: put polling in loop Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: add some debug logging Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: dont reset snapshot on error state Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: dont print error on shutdown Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: handle wait for async code to finish Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: use println for debug Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: more debug info Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: perform initial fetch on engine start Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: refactor initial fetch Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: set error in snapshot Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: test stream fetch Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: error message Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: clippyyyyyyyyy Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: rm 303 handling Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: expose fetch mode Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fix url for streaming Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fix streaming test Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: expose fetch mode to other languages Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fix java build Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: rename file cause java Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: handle stream chunk parsing; rm timeout Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fmt java Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: ruby fmt Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: fmt python Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: retry requests, ensure consistency in error handling Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: assert expected retries for fetch error Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: add more tests and documentation Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> * chore: java fmt Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> --------- Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> --- .gitignore | 3 +- flipt-client-dart/lib/src/models.dart | 16 + flipt-client-dart/lib/src/models.g.dart | 8 + flipt-client-go/evaluation.go | 10 + flipt-client-go/models.go | 16 +- flipt-client-java/.idea/gradle.xml | 2 +- flipt-client-java/.idea/misc.xml | 2 +- .../flipt/client/FliptEvaluationClient.java | 89 ++- .../io/flipt/client/models/ClientOptions.java | 10 +- .../io/flipt/client/models/FetchMode.java | 10 + flipt-client-python/flipt_client/models.py | 7 + flipt-client-ruby/.rubocop.yml | 3 + flipt-client-ruby/Gemfile.lock | 2 +- flipt-client-ruby/lib/flipt_client.rb | 7 + flipt-engine-ffi/Cargo.toml | 7 +- flipt-engine-ffi/examples/evaluation.rs | 32 +- flipt-engine-ffi/src/evaluator.rs | 175 ++---- flipt-engine-ffi/src/http.rs | 527 ++++++++++++++---- flipt-engine-ffi/src/lib.rs | 107 +++- flipt-evaluation/src/lib.rs | 1 - flipt-evaluation/src/models/flipt.rs | 16 +- flipt-evaluation/src/models/source.rs | 22 +- flipt-evaluation/src/parser.rs | 46 -- flipt-evaluation/src/store.rs | 47 +- 24 files changed, 791 insertions(+), 374 deletions(-) create mode 100644 flipt-client-java/src/main/java/io/flipt/client/models/FetchMode.java delete mode 100644 flipt-evaluation/src/parser.rs diff --git a/.gitignore b/.gitignore index 4f0ee923..2efb1967 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ Cargo.lock tmp .envrc .vscode -staging \ No newline at end of file +staging +*.h \ No newline at end of file diff --git a/flipt-client-dart/lib/src/models.dart b/flipt-client-dart/lib/src/models.dart index bae4f07e..b9c3f821 100644 --- a/flipt-client-dart/lib/src/models.dart +++ b/flipt-client-dart/lib/src/models.dart @@ -2,6 +2,14 @@ import 'package:json_annotation/json_annotation.dart'; part 'models.g.dart'; +enum FetchMode { + @JsonValue('polling') + polling, + @JsonValue('streaming') + streaming, +} + +/// Options for the Flipt client @JsonSerializable() class Options { final String? url; @@ -9,11 +17,15 @@ class Options { final int? updateInterval; final Map? authentication; + /// Note: Streaming is currently only supported when using the SDK with Flipt Cloud (https://flipt.io/cloud). + final FetchMode? fetchMode; + Options({ this.url = 'http://localhost:8080', this.reference, this.updateInterval = 120, this.authentication, + this.fetchMode = FetchMode.polling, }); factory Options.fromJson(Map json) => @@ -25,6 +37,7 @@ class Options { String? url, String? reference, int? updateInterval, + FetchMode? fetchMode, }) { return Options( url: url, @@ -33,6 +46,7 @@ class Options { authentication: { 'client_token': token, }, + fetchMode: fetchMode, ); } @@ -41,6 +55,7 @@ class Options { String? url, String? reference, int? updateInterval, + FetchMode? fetchMode, }) { return Options( url: url, @@ -49,6 +64,7 @@ class Options { authentication: { 'jwt_token': token, }, + fetchMode: fetchMode, ); } } diff --git a/flipt-client-dart/lib/src/models.g.dart b/flipt-client-dart/lib/src/models.g.dart index d34f22e8..9145d6ce 100644 --- a/flipt-client-dart/lib/src/models.g.dart +++ b/flipt-client-dart/lib/src/models.g.dart @@ -11,6 +11,8 @@ Options _$OptionsFromJson(Map json) => Options( reference: json['reference'] as String?, updateInterval: (json['update_interval'] as num?)?.toInt() ?? 120, authentication: json['authentication'] as Map?, + fetchMode: $enumDecodeNullable(_$FetchModeEnumMap, json['fetch_mode']) ?? + FetchMode.polling, ); Map _$OptionsToJson(Options instance) => { @@ -18,8 +20,14 @@ Map _$OptionsToJson(Options instance) => { 'reference': instance.reference, 'update_interval': instance.updateInterval, 'authentication': instance.authentication, + 'fetch_mode': _$FetchModeEnumMap[instance.fetchMode], }; +const _$FetchModeEnumMap = { + FetchMode.polling: 'polling', + FetchMode.streaming: 'streaming', +}; + Flag _$FlagFromJson(Map json) => Flag( key: json['key'] as String, enabled: json['enabled'] as bool, diff --git a/flipt-client-go/evaluation.go b/flipt-client-go/evaluation.go index 39c76fb5..f9bc2711 100644 --- a/flipt-client-go/evaluation.go +++ b/flipt-client-go/evaluation.go @@ -29,6 +29,7 @@ type EvaluationClient struct { authentication any ref string updateInterval int + fetchMode FetchMode } // NewEvaluationClient constructs a Client. @@ -46,6 +47,7 @@ func NewEvaluationClient(opts ...clientOption) (*EvaluationClient, error) { UpdateInterval: client.updateInterval, Authentication: &client.authentication, Reference: client.ref, + FetchMode: client.fetchMode, } b, err := json.Marshal(clientOpts) @@ -106,6 +108,14 @@ func WithJWTAuthentication(token string) clientOption { } } +// WithFetchMode allows for specifying the fetch mode for the Flipt client (e.g. polling, streaming). +// Note: Streaming is currently only supported when using the SDK with Flipt Cloud (https://flipt.io/cloud). +func WithFetchMode(fetchMode FetchMode) clientOption { + return func(c *EvaluationClient) { + c.fetchMode = fetchMode + } +} + // EvaluateVariant performs evaluation for a variant flag. func (e *EvaluationClient) EvaluateVariant(_ context.Context, flagKey, entityID string, evalContext map[string]string) (*VariantEvaluationResponse, error) { ereq, err := json.Marshal(evaluationRequest{ diff --git a/flipt-client-go/models.go b/flipt-client-go/models.go index 3a9ed577..e47622aa 100644 --- a/flipt-client-go/models.go +++ b/flipt-client-go/models.go @@ -21,11 +21,19 @@ type jwtAuthentication struct { Token string `json:"jwt_token"` } +type FetchMode string + +const ( + FetchModeStreaming FetchMode = "streaming" + FetchModePolling FetchMode = "polling" +) + type clientOptions[T any] struct { - URL string `json:"url,omitempty"` - Authentication *T `json:"authentication,omitempty"` - UpdateInterval int `json:"update_interval,omitempty"` - Reference string `json:"reference,omitempty"` + URL string `json:"url,omitempty"` + Authentication *T `json:"authentication,omitempty"` + UpdateInterval int `json:"update_interval,omitempty"` + Reference string `json:"reference,omitempty"` + FetchMode FetchMode `json:"fetch_mode,omitempty"` } type Flag struct { diff --git a/flipt-client-java/.idea/gradle.xml b/flipt-client-java/.idea/gradle.xml index 2a65317e..1ace7cf4 100644 --- a/flipt-client-java/.idea/gradle.xml +++ b/flipt-client-java/.idea/gradle.xml @@ -5,7 +5,7 @@