Skip to content

Commit

Permalink
[TT-13036] A struct is needed to resemble the info section inside of …
Browse files Browse the repository at this point in the history
…x-tyk-streaming.
  • Loading branch information
buraksezer committed Sep 16, 2024
1 parent 56addf1 commit bc19b67
Showing 1 changed file with 60 additions and 44 deletions.
104 changes: 60 additions & 44 deletions gateway/mw_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ const (
// Used for testing
var globalStreamCounter atomic.Int64

type StreamsConfig struct {
Info struct {
Version string `json:"version"`
} `json:"info"`
Streams map[string]interface{} `json:"streams"`
}

// StreamingMiddleware is a middleware that handles streaming functionality
type StreamingMiddleware struct {
*BaseMiddleware
Expand All @@ -40,11 +47,11 @@ type StreamManager struct {
listenPaths []string
}

func (sm *StreamManager) initStreams(specStreams map[string]interface{}) {
func (sm *StreamManager) initStreams(config *StreamsConfig) {
// Clear existing routes for this consumer group
sm.muxer = mux.NewRouter()

for streamID, streamConfig := range specStreams {
for streamID, streamConfig := range config.Streams {
if streamMap, ok := streamConfig.(map[string]interface{}); ok {
hasHttp := HasHttp(streamMap)
if sm.dryRun {
Expand All @@ -61,7 +68,6 @@ func (sm *StreamManager) initStreams(specStreams map[string]interface{}) {
}
}
sm.listenPaths = GetHTTPPaths(streamMap)

}
}
}
Expand Down Expand Up @@ -103,12 +109,12 @@ func (s *StreamingMiddleware) EnabledForSpec() bool {
s.allowedUnsafe = streamingConfig.AllowUnsafe
s.Logger().Debugf("Allowed unsafe components: %v", s.allowedUnsafe)

specStreams := s.getStreamsConfig(nil)
globalStreamCounter.Add(int64(len(specStreams)))
streamsConfig := s.getStreamsConfig(nil)
globalStreamCounter.Add(int64(len(streamsConfig.Streams)))

s.Logger().Debug("Total streams count: ", len(specStreams))
s.Logger().Debug("Total streams count: ", len(streamsConfig.Streams))

if len(specStreams) == 0 {
if len(streamsConfig.Streams) == 0 {
return false
}

Expand Down Expand Up @@ -197,46 +203,56 @@ func GetHTTPPaths(streamConfig map[string]interface{}) []string {
}

// getStreamsConfig extracts streaming configurations from an API spec if available.
func (s *StreamingMiddleware) getStreamsConfig(r *http.Request) map[string]interface{} {
streamConfigs := make(map[string]interface{})
if s.Spec.IsOAS {
if ext, ok := s.Spec.OAS.T.Extensions[ExtensionTykStreaming]; ok {
if streamsMap, ok := ext.(map[string]interface{}); ok {
if streams, ok := streamsMap["streams"].(map[string]interface{}); ok {
for streamID, stream := range streams {
if r != nil {
s.Logger().Debugf("Stream config for %s: %v", streamID, stream)

marshaledStream, err := json.Marshal(stream)
if err != nil {
s.Logger().Errorf("Failed to marshal stream config: %v", err)
continue
}
replacedStream := s.Gw.replaceTykVariables(r, string(marshaledStream), true)

if replacedStream != string(marshaledStream) {
s.Logger().Debugf("Stream config changed for %s: %s", streamID, replacedStream)
} else {
s.Logger().Debugf("Stream config has not changed for %s: %s", streamID, replacedStream)
}

var unmarshaledStream map[string]interface{}
err = json.Unmarshal([]byte(replacedStream), &unmarshaledStream)
if err != nil {
s.Logger().Errorf("Failed to unmarshal replaced stream config: %v", err)
continue
}
stream = unmarshaledStream
} else {
s.Logger().Debugf("No request available to replace variables in stream config for %s", streamID)
}
streamConfigs[streamID] = stream
}
}
func (s *StreamingMiddleware) getStreamsConfig(r *http.Request) *StreamsConfig {
if !s.Spec.IsOAS {
return nil
}

tykStreamingConfig, ok := s.Spec.OAS.T.Extensions[ExtensionTykStreaming]
if !ok {
return nil
}

data, err := json.Marshal(tykStreamingConfig)
if err != nil {
return nil
}

streamsConfig := &StreamsConfig{}
err = json.Unmarshal(data, streamsConfig)
if err != nil {
return nil
}

for streamID, stream := range streamsConfig.Streams {
if r != nil {
s.Logger().Debugf("Stream config for %s: %v", streamID, stream)

marshaledStream, err := json.Marshal(stream)
if err != nil {
s.Logger().Errorf("Failed to marshal stream config: %v", err)
continue
}
replacedStream := s.Gw.replaceTykVariables(r, string(marshaledStream), true)

if replacedStream != string(marshaledStream) {
s.Logger().Debugf("Stream config changed for %s: %s", streamID, replacedStream)
} else {
s.Logger().Debugf("Stream config has not changed for %s: %s", streamID, replacedStream)
}

var unmarshaledStream map[string]interface{}
err = json.Unmarshal([]byte(replacedStream), &unmarshaledStream)
if err != nil {
s.Logger().Errorf("Failed to unmarshal replaced stream config: %v", err)
continue
}
stream = unmarshaledStream
} else {
s.Logger().Debugf("No request available to replace variables in stream config for %s", streamID)
}
}
return streamConfigs
return streamsConfig
}

// createStream creates a new stream
Expand Down

0 comments on commit bc19b67

Please sign in to comment.