Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-12893]: Adding first implementation of streams API #6511

Merged
merged 58 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
2f82da2
Streaming POC
buger May 18, 2024
d1df93c
Directly manage streams instead of benthos rest api
buger May 19, 2024
b4f0b6a
Add TYk integration
buger May 20, 2024
4617972
Add fan-out mechanism
buger May 20, 2024
26ecca8
Add support for redis streams
buger May 21, 2024
0773abf
fixed panic where redis client wasn't ready when server came up
lonelycode May 23, 2024
dc53536
fixd panic where if messages were still in redis stream when client d…
lonelycode May 23, 2024
7974f95
Fix bugs
buger May 23, 2024
a32da89
Add portal support
buger May 23, 2024
293cf9a
Fix router issue
buger Jun 27, 2024
6043994
Refactor streaming functionality to become a standard middleware
buger Jun 29, 2024
8830ab6
Fix consumer group and some panics
buger Jun 29, 2024
66b4622
Add new Labs configuration option, and ensure that streaming runs onl…
buger Jun 29, 2024
aaa4038
Fix linter
buger Jun 30, 2024
bb0079f
Do not load unsafe components
buger Jun 30, 2024
f81a771
Fix gomod
buger Jun 30, 2024
efc2e5c
Add mutex to unload function
buger Jun 30, 2024
9fafb3c
refactor: Implement direct user-to-stream connections in streaming mi…
buger Aug 11, 2024
31aaaaf
feat: initialize default consumer group manager in StreamingMiddleware
buger Aug 12, 2024
5291850
refactor: optimize streaming middleware request processing
buger Aug 12, 2024
d4ac5d6
Rename a few methods
buger Aug 21, 2024
64fb1fd
refactor: update streaming implementation to use single Stream object
buger Aug 21, 2024
2c90c44
refactor: update Stream struct and methods in manager.go
buger Aug 21, 2024
2b624f1
refactor: optimize stream reset in StreamingMiddleware Unload method
buger Aug 21, 2024
66351d8
refactor: update stream counting in StreamingMiddleware Unload method
buger Aug 21, 2024
4d1610c
refactor: improve stream stopping mechanism
buger Aug 21, 2024
a20adcf
feat: add time import to streaming manager
buger Aug 21, 2024
0fc3ffd
feat: enhance AsyncAPI test robustness and logging
buger Aug 21, 2024
b56aa58
feat: enhance AsyncAPI test with increased timeouts and logging
buger Aug 21, 2024
0fc7dfa
test: increase timeouts and add more logging in AsyncAPI test
buger Aug 21, 2024
7480541
feat: optimize AsyncAPI HTTP test for faster execution
buger Aug 21, 2024
2ba5d8f
refactor: simplify streaming middleware configuration
buger Aug 21, 2024
508a347
refactor: remove GC job and simplify resource management in streaming…
buger Aug 21, 2024
3693138
Small refactor
buger Aug 21, 2024
4302956
More unused constants
buger Aug 21, 2024
0b3d7d0
modified manager test
kofoworola Sep 3, 2024
57f6dc6
nats stream
kofoworola Sep 4, 2024
74214cb
fixed tests
kofoworola Sep 10, 2024
3c81db7
fixed syntax error
kofoworola Sep 11, 2024
eadf84e
gofmt run
kofoworola Sep 11, 2024
0b1a766
fixing stream manage
kofoworola Sep 12, 2024
0f6d6ec
added dry run logic
kofoworola Sep 13, 2024
af4905a
fixed flaky tests
kofoworola Sep 13, 2024
3658e19
fix schema
kofoworola Sep 13, 2024
66733e6
reorg and add linting
kofoworola Sep 13, 2024
20a6670
expose labs
kofoworola Sep 13, 2024
9f874c1
disable context var by default
kofoworola Sep 16, 2024
16257ca
fix mod dep
kofoworola Sep 16, 2024
ef25b6c
fix tests
kofoworola Sep 16, 2024
f2c5b42
fix race condition
kofoworola Sep 16, 2024
aa0cac9
fix for broken tests
kofoworola Sep 18, 2024
b6c1ec1
[TT-13036] A struct is needed to resemble the info section inside of …
buraksezer Sep 18, 2024
2577d86
[TT-12879] Replace Benthos with Bento (#6545)
buraksezer Sep 18, 2024
4de28e4
Fix using broker (#6535)
buger Sep 20, 2024
57631f1
Feat/tt 13036/bring back changes (#6561)
buraksezer Sep 20, 2024
d3c7bf7
[TT-13036] Fix bad streams config (#6562)
buraksezer Sep 20, 2024
d0a387a
lint fixes
kofoworola Sep 23, 2024
fdc6104
lint fixes
kofoworola Sep 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@ tyk.test
tyk-gateway.pid

tyk_linux_*
.aider*
/dist/

.terraform**
.terraform.lock.hcl
.task/
*.test

main
main
2 changes: 1 addition & 1 deletion apidef/oas/linter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestXTykGateway_Lint(t *testing.T) {
settings.Server.EventHandlers[i].Webhook.CoolDownPeriod = ReadableDuration(time.Second * 20)
}

for idx, _ := range settings.Middleware.Operations {
for idx := range settings.Middleware.Operations {
settings.Middleware.Operations[idx].CircuitBreaker.Threshold = 0.5
}

Expand Down
11 changes: 11 additions & 0 deletions benthos.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
input:
stdin: {}

output:
http_server:
address: "0.0.0.0:4196"
path: /get
stream_path: /get/stream
ws_path: /get/ws
allowed_verbs:
- GET
19 changes: 19 additions & 0 deletions cli/linter/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,23 @@
}
}
}
},
"Streaming": {
"type": "object",
"properties": {
"enabled": {
"type": "boolean"
},
"allow_unsafe": {
"type": "array"
}
}
}
},
"properties": {
"streaming": {
"$ref": "#/definitions/Streaming"
},
"allow_insecure_configs": {
"type": "boolean"
},
Expand Down Expand Up @@ -1194,6 +1208,11 @@
"oas_config": {
"validate_examples": false,
"validate_schema_defaults": false
},
"labs": {
"type": ["object", "null"],
"additionalProperties": true,
"properties": {}
}
}
}
28 changes: 28 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ var (
LivenessCheck: LivenessCheckConfig{
CheckDuration: time.Second * 10,
},
Streaming: StreamingConfig{
Enabled: false,
AllowUnsafe: []string{},
},
}
)

Expand Down Expand Up @@ -653,6 +657,12 @@ func (pwl *PortsWhiteList) Decode(value string) error {
return nil
}

// StreamingConfig is for configuring tyk streaming
type StreamingConfig struct {
Enabled bool `json:"enabled"`
AllowUnsafe []string `json:"allow_unsafe"`
}

// Config is the configuration object used by Tyk to set up various parameters.
type Config struct {
// Force your Gateway to work only on a specific domain name. Can be overridden by API custom domain.
Expand Down Expand Up @@ -1117,6 +1127,24 @@ type Config struct {

// OAS holds the configuration for various OpenAPI-specific functionalities
OAS OASConfig `json:"oas_config"`

Streaming StreamingConfig `json:"streaming"`

Labs LabsConfig `json:"labs"`
}

// LabsConfig include config for streaming
type LabsConfig map[string]interface{}

// Decode unmarshals json config into the Labs config
func (lc *LabsConfig) Decode(value string) error {
var temp map[string]interface{}
if err := json.Unmarshal([]byte(value), &temp); err != nil {
log.Error("Error unmarshalling LabsConfig: ", err)
return err
}
*lc = temp
return nil
}

// OASConfig holds the configuration for various OpenAPI-specific functionalities
Expand Down
16 changes: 15 additions & 1 deletion gateway/api_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ type APISpec struct {
AnalyticsPluginConfig *GoAnalyticsPlugin

middlewareChain *ChainObject
unloadHooks []func()

network analytics.NetworkStats

Expand All @@ -237,8 +238,16 @@ func (a *APISpec) GetSessionLifetimeRespectsKeyExpiration() bool {
return a.SessionLifetimeRespectsKeyExpiration
}

// AddUnloadHook adds a function to be called when the API spec is unloaded
func (s *APISpec) AddUnloadHook(hook func()) {
s.unloadHooks = append(s.unloadHooks, hook)
}

// Release releases all resources associated with API spec
func (s *APISpec) Release() {
func (s *APISpec) Unload() {
s.Lock()
defer s.Unlock()

// release circuit breaker resources
for _, path := range s.RxPaths {
for _, urlSpec := range path {
Expand Down Expand Up @@ -267,6 +276,11 @@ func (s *APISpec) Release() {
s.HTTPTransport.transport.CloseIdleConnections()
s.HTTPTransport = nil
}

for _, hook := range s.unloadHooks {
hook()
}
s.unloadHooks = nil
}

// Validate returns nil if s is a valid spec and an error stating why the spec is not valid.
Expand Down
14 changes: 9 additions & 5 deletions gateway/api_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,8 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,

gw.mwAppendEnabled(&chainArray, &RateLimitForAPI{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &GraphQLMiddleware{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &StreamingMiddleware{BaseMiddleware: baseMid})

if !spec.UseKeylessAccess {
gw.mwAppendEnabled(&chainArray, &GraphQLComplexityMiddleware{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &GraphQLGranularAccessMiddleware{BaseMiddleware: baseMid})
Expand Down Expand Up @@ -946,7 +948,6 @@ func (gw *Gateway) loadApps(specs []*APISpec) {
gw.loadControlAPIEndpoints(router)

muxer.setRouter(port, "", router, gw.GetConfig())

gs := gw.prepareStorage()
shouldTrace := trace.IsEnabled()

Expand Down Expand Up @@ -996,13 +997,15 @@ func (gw *Gateway) loadApps(specs []*APISpec) {

gw.DefaultProxyMux.swap(muxer, gw)

var specsToRelease []*APISpec
var specsToUnload []*APISpec

gw.apisMu.Lock()

for _, spec := range specs {
curSpec, ok := gw.apisByID[spec.APIID]
if ok && curSpec != nil && shouldReloadSpec(curSpec, spec) {
specsToRelease = append(specsToRelease, curSpec)
mainLog.Debugf("Spec %s has changed and needs to be reloaded", curSpec.APIID)
specsToUnload = append(specsToUnload, curSpec)
}

// Bind versions to base APIs again
Expand All @@ -1018,8 +1021,9 @@ func (gw *Gateway) loadApps(specs []*APISpec) {

gw.apisMu.Unlock()

for _, spec := range specsToRelease {
spec.Release()
for _, spec := range specsToUnload {
mainLog.Debugf("Unloading spec %s", spec.APIID)
spec.Unload()
}

mainLog.Debug("Checker host list")
Expand Down
20 changes: 19 additions & 1 deletion gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type TykMiddleware interface {
ProcessRequest(w http.ResponseWriter, r *http.Request, conf interface{}) (error, int) // Handles request
EnabledForSpec() bool
Name() string

GetSpec() *APISpec

Unload()
}

type TraceMiddleware struct {
Expand Down Expand Up @@ -121,6 +125,9 @@ func (gw *Gateway) createMiddleware(actualMW TykMiddleware) func(http.Handler) h
mw.SetName(mw.Name())
mw.Logger().Debug("Init")

spec := mw.GetSpec()
spec.AddUnloadHook(actualMW.Unload)

// Pull the configuration
mwConf, err := mw.Config()
if err != nil {
Expand Down Expand Up @@ -155,14 +162,15 @@ func (gw *Gateway) createMiddleware(actualMW TykMiddleware) func(http.Handler) h
}

startTime := time.Now()
mw.Logger().WithField("ts", startTime.UnixNano()).Debug("Started")
mw.Logger().WithField("ts", startTime.UnixNano()).WithField("mw", mw.Name()).Debug("Started")

if mw.Base().Spec.CORS.OptionsPassthrough && r.Method == "OPTIONS" {
h.ServeHTTP(w, r)
return
}

err, errCode := mw.ProcessRequest(w, r, mwConf)

if err != nil {
writeResponse := true
// Prevent double error write
Expand Down Expand Up @@ -268,6 +276,16 @@ func (t *BaseMiddleware) Config() (interface{}, error) {
return nil, nil
}

// Unload unloads the middleware and frees resources
func (t *BaseMiddleware) Unload() {
// methos created to satisfy middleware contract
}

// GetSpec returns the spec of the middleware
func (t *BaseMiddleware) GetSpec() *APISpec {
return t.Spec
}

func (t *BaseMiddleware) OrgSession(orgID string) (user.SessionState, bool) {

if rpc.IsEmergencyMode() {
Expand Down
7 changes: 7 additions & 0 deletions gateway/mw_context_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ func (m *MiddlewareContextVars) ProcessRequest(w http.ResponseWriter, r *http.Re
contextDataObject[name] = c.Value
}

for key, vals := range r.Form {
name := "request_data_" + strings.Replace(key, "-", "_", -1)
if len(vals) > 0 {
contextDataObject[name] = vals[0]
}
}

ctxSetData(r, contextDataObject)

return nil, http.StatusOK
Expand Down
17 changes: 16 additions & 1 deletion gateway/mw_context_vars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strings"
"testing"

"github.com/google/go-cmp/cmp"

"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/test"
)
Expand Down Expand Up @@ -88,6 +90,8 @@ func testPrepareTestContextVarsMiddleware() map[string]testContextVarsData {
"x": {"123"},
"y": {"test"},
},
"request_data_x": "123",
"request_data_y": "test",
"headers": map[string][]string{
"x-header-a": {"A"},
"x-header-b": {"B"},
Expand Down Expand Up @@ -120,6 +124,11 @@ func testPrepareTestContextVarsMiddleware() map[string]testContextVarsData {
"j": {"2"},
"str": {"abc"},
},
"request_data_x": "123",
"request_data_y": "test",
"request_data_i": "1",
"request_data_j": "2",
"request_data_str": "abc",
"headers": map[string][]string{
"x-header-a": {"A"},
"x-header-b": {"B"},
Expand Down Expand Up @@ -155,6 +164,11 @@ func testPrepareTestContextVarsMiddleware() map[string]testContextVarsData {
"j": {"2"},
"str": {"abc"},
},
"request_data_x": "123",
"request_data_y": "test",
"request_data_i": "1",
"request_data_j": "2",
"request_data_str": "abc",
"headers": map[string][]string{
"x-header-a": {"A"},
"x-header-b": {"B"},
Expand Down Expand Up @@ -209,7 +223,8 @@ func TestContextVarsMiddlewareProcessRequest(t *testing.T) {
delete(ctxDataObject, "request_id")

if !reflect.DeepEqual(ctxDataObject, test.ExpectedCtxDataObject) {
t.Errorf("Expected: %v\n Got: %v\n", test.ExpectedCtxDataObject, ctxDataObject)
//t.Errorf("Expected: %v\n Got: %v\n", test.ExpectedCtxDataObject, ctxDataObject)
t.Errorf("diff: %s", cmp.Diff(ctxDataObject, test.ExpectedCtxDataObject))
}
})
}
Expand Down
Loading
Loading