Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-13269 - Refactor/streams #6593

Merged
merged 34 commits into from
Oct 19, 2024
Merged

TT-13269 - Refactor/streams #6593

merged 34 commits into from
Oct 19, 2024

Conversation

titpetric
Copy link
Contributor

@titpetric titpetric commented Sep 30, 2024

User description

Extracts streaming middleware into internal/middleware/stream;
Implements a number of interfaces to cover gateway/base middleware
Implements wrappedMiddleware to allow middlewares with less coupling.

I tried to update couplings along as I went, it's passing go build and go test -c.


PR Type

enhancement, tests


Description

  • Refactored streaming middleware by introducing a new Manager for handling streams and updated the StreamingMiddleware to use this new structure.
  • Replaced various apidef types with model types across multiple files to improve modularity and reduce coupling.
  • Introduced WrapMiddleware functionality to allow middleware wrapping with less coupling.
  • Updated test cases to align with the new streaming middleware implementation and the use of the model package.
  • Added new interfaces in the model package to define common behaviors for Gateway and Middleware components.
  • Updated method calls to use the new ReplaceTykVariables function for consistency.

Changes walkthrough 📝

Relevant files
Enhancement
22 files
api_loader.go
Integrate new streaming middleware and wrap functionality

gateway/api_loader.go

  • Added import for internal/middleware/stream.
  • Replaced StreamingMiddleware with a new streaming middleware
    implementation.
  • Introduced WrapMiddleware for middleware wrapping.
  • +9/-4     
    gateway.go
    Update Gateway interface implementation                                   

    gateway/gateway.go

    • Updated interface implementation to use model.Gateway.
    +2/-7     
    health_check.go
    Use model package for health check items                                 

    gateway/health_check.go

    • Replaced apidef.HealthCheckItem with model.HealthCheckItem.
    +31/-30 
    middleware.go
    Update TykMiddleware interface and constants                         

    gateway/middleware.go

  • Added GetSpec method to TykMiddleware interface.
  • Changed constant mwStatusRespond to use middleware.StatusRespond.
  • +10/-12 
    middleware_wrap.go
    Introduce middleware wrapping functionality                           

    gateway/middleware_wrap.go

  • Introduced wrapMiddleware struct for middleware wrapping.
  • Implemented WrapMiddleware function.
  • +51/-0   
    mw_auth_key.go
    Update variable replacement method call                                   

    gateway/mw_auth_key.go

    • Updated method call to ReplaceTykVariables.
    +1/-1     
    mw_graphql.go
    Update variable replacement method call                                   

    gateway/mw_graphql.go

    • Updated method call to ReplaceTykVariables.
    +2/-2     
    mw_modify_headers.go
    Update variable replacement method call                                   

    gateway/mw_modify_headers.go

    • Updated method call to ReplaceTykVariables.
    +2/-2     
    mw_persist_graphql_operation.go
    Update variable replacement method call                                   

    gateway/mw_persist_graphql_operation.go

    • Updated method call to ReplaceTykVariables.
    +1/-1     
    mw_rate_limiting.go
    Update variable replacement method call                                   

    gateway/mw_rate_limiting.go

    • Updated method call to ReplaceTykVariables.
    +1/-1     
    mw_transform.go
    Update variable replacement method call                                   

    gateway/mw_transform.go

    • Updated method call to ReplaceTykVariables.
    +1/-1     
    mw_url_rewrite.go
    Rename and document ReplaceTykVariables function                 

    gateway/mw_url_rewrite.go

  • Renamed replaceTykVariables to ReplaceTykVariables and added
    documentation.
  • +6/-2     
    res_handler_header_injector.go
    Update variable replacement method call                                   

    gateway/res_handler_header_injector.go

    • Updated method call to ReplaceTykVariables.
    +3/-3     
    rpc_storage_handler.go
    Use model package for RPC storage handler types                   

    gateway/rpc_storage_handler.go

  • Replaced apidef types with model types for RPC storage handling.
  • +24/-25 
    const.go
    Add StatusRespond constant for middleware                               

    internal/middleware/const.go

    • Added constant StatusRespond for middleware processing.
    +5/-0     
    stream_manager.go
    Introduce Manager for stream management                                   

    internal/middleware/stream/stream_manager.go

  • Introduced Manager struct for stream management.
  • Implemented stream creation and removal functionalities.
  • +133/-0 
    streaming_middleware.go
    Refactor StreamingMiddleware with new Manager                       

    internal/middleware/stream/streaming_middleware.go

  • Refactored StreamingMiddleware to use new Manager.
  • Updated logging and stream management logic.
  • +62/-133
    health_check.go
    Rename package to model                                                                   

    internal/model/health_check.go

    • Renamed package from apidef to model.
    +1/-1     
    interfaces.go
    Add interfaces for Gateway and Middleware                               

    internal/model/interfaces.go

    • Introduced interfaces for Gateway and Middleware.
    +66/-0   
    rpc.go
    Move RPC types to model package                                                   

    internal/model/rpc.go

    • Moved RPC-related types to model package.
    +2/-6     
    apply.go
    Use model.PolicyProvider in Service struct                             

    internal/policy/apply.go

    • Updated Service struct to use model.PolicyProvider.
    +3/-10   
    synchronization_forcer.go
    Use model package for GroupLoginRequest                                   

    rpc/synchronization_forcer.go

    • Replaced apidef.GroupLoginRequest with model.GroupLoginRequest.
    +2/-2     
    Tests
    3 files
    mw_streaming_test.go
    Update streaming test cases to use new package                     

    gateway/mw_streaming_test.go

  • Updated test cases to use stream package for streaming
    functionalities.
  • +8/-7     
    rpc_storage_handler_test.go
    Update RPC storage handler tests to use model package       

    gateway/rpc_storage_handler_test.go

    • Updated tests to use model package for RPC storage handler.
    +20/-23 
    rpc_test.go
    Update RPC tests to use model package                                       

    gateway/rpc_test.go

    • Updated RPC test cases to use model package for API definitions.
    +5/-5     

    💡 PR-Agent usage: Comment /help "your question" on any pull request to receive relevant information

    Copy link
    Contributor

    github-actions bot commented Sep 30, 2024

    API Changes

    --- prev.txt	2024-10-17 15:26:12.273044467 +0000
    +++ current.txt	2024-10-17 15:26:06.309080445 +0000
    @@ -897,11 +897,17 @@
     								},
     								"scopes":{
     									"type": ["array", "null"]
    +								},
    +								"endpoint_params": {
    +									"type": ["object", "null"]
     								}	
     							}
     						},
     						"header_name": {
     							"type": "string"		
    +						},
    +						"distributed_token": {
    +							"type": "boolean"
     						}
     					}
     				}
    @@ -1207,24 +1213,17 @@
     	DisableHalfOpenState bool    `bson:"disable_half_open_state" json:"disable_half_open_state"`
     }
     
    -type ClientAuthData struct {
    +type ClientCredentials struct {
     	// ClientID is the application's ID.
     	ClientID string `bson:"client_id" json:"client_id"`
     	// ClientSecret is the application's secret.
     	ClientSecret string `bson:"client_secret" json:"client_secret"`
    -}
    -    ClientAuthData holds the client ID and secret for upstream OAuth2
    -    authentication.
    -
    -type ClientCredentials struct {
    -	ClientAuthData
     	// TokenURL is the resource server's token endpoint
     	// URL. This is a constant specific to each server.
     	TokenURL string `bson:"token_url" json:"token_url"`
     	// Scopes specifies optional requested permissions.
     	Scopes []string `bson:"scopes" json:"scopes,omitempty"`
     
    -	// TokenProvider is the OAuth2 token provider for internal use.
     	TokenProvider oauth2.TokenSource `bson:"-" json:"-"`
     }
         ClientCredentials holds the client credentials for upstream OAuth2
    @@ -7664,6 +7663,136 @@
     
     func (ms *MockStorage) Set(key string, addrs []string)
     
    +# Package: ./ee/middleware/streams
    +
    +package streams // import "github.com/TykTechnologies/tyk/ee/middleware/streams"
    +
    +
    +CONSTANTS
    +
    +const (
    +	// ExtensionTykStreaming is the OAS extension for Tyk streaming.
    +	ExtensionTykStreaming = "x-tyk-streaming"
    +	StreamGCInterval      = 1 * time.Minute
    +)
    +
    +VARIABLES
    +
    +var GlobalStreamCounter atomic.Int64
    +    GlobalStreamCounter is used for testing.
    +
    +
    +FUNCTIONS
    +
    +func GetHTTPPaths(streamConfig map[string]interface{}) []string
    +    GetHTTPPaths is the main function to get HTTP paths from the stream
    +    configuration.
    +
    +
    +TYPES
    +
    +type APISpec struct {
    +	APIID string
    +	Name  string
    +	IsOAS bool
    +	OAS   oas.OAS
    +
    +	StripListenPath model.StripListenPathFunc
    +}
    +    APISpec is a subset of gateway.APISpec for the values the middleware
    +    consumes.
    +
    +func NewAPISpec(id string, name string, isOasDef bool, oasDef oas.OAS, stripListenPath model.StripListenPathFunc) *APISpec
    +    NewAPISpec creates a new APISpec object based on the required inputs.
    +    The resulting object is a subset of `*gateway.APISpec`.
    +
    +type BaseMiddleware interface {
    +	model.LoggerProvider
    +}
    +    BaseMiddleware is the subset of BaseMiddleware APIs that the middleware
    +    uses.
    +
    +type Gateway interface {
    +	model.ConfigProvider
    +	model.ReplaceTykVariables
    +}
    +    Gateway is the subset of Gateway APIs that the middleware uses.
    +
    +type Manager struct {
    +	// Has unexported fields.
    +}
    +    Manager is responsible for creating a single stream.
    +
    +type Middleware struct {
    +	Spec *APISpec
    +	Gw   Gateway
    +
    +	StreamManagerCache sync.Map // Map of payload hash to Manager
    +
    +	// Has unexported fields.
    +}
    +    Middleware implements a streaming middleware.
    +
    +func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec) *Middleware
    +    NewMiddleware returns a new instance of Middleware.
    +
    +func (s *Middleware) CreateStreamManager(r *http.Request) *Manager
    +    CreateStreamManager creates or retrieves a stream manager based on the
    +    request.
    +
    +func (s *Middleware) EnabledForSpec() bool
    +    EnabledForSpec checks if streaming is enabled on the config.
    +
    +func (s *Middleware) GC()
    +    GC removes inactive stream managers.
    +
    +func (s *Middleware) Init()
    +    Init initializes the middleware
    +
    +func (s *Middleware) Logger() *logrus.Entry
    +    Logger returns a logger with middleware filled out.
    +
    +func (s *Middleware) Name() string
    +    Name returns the name for the middleware.
    +
    +func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int)
    +    ProcessRequest will handle the streaming functionality.
    +
    +func (s *Middleware) Unload()
    +    Unload closes and remove active streams.
    +
    +type Stream struct {
    +	// Has unexported fields.
    +}
    +    Stream is a wrapper around stream
    +
    +func NewStream(allowUnsafe []string) *Stream
    +    NewStream creates a new stream without initializing it
    +
    +func (s *Stream) GetConfig() string
    +    GetConfig returns the configuration of the stream
    +
    +func (s *Stream) Reset() error
    +    Reset stops the stream
    +
    +func (s *Stream) SetLogger(logger *logrus.Logger)
    +    SetLogger to be used by the stream
    +
    +func (s *Stream) Start(config map[string]interface{}, mux service.HTTPMultiplexer) error
    +    Start loads up the configuration and starts the stream. Non blocking
    +
    +func (s *Stream) Stop() error
    +    Stop cleans up the stream
    +
    +type StreamsConfig struct {
    +	Info struct {
    +		Version string `json:"version"`
    +	} `json:"info"`
    +
    +	Streams map[string]any `json:"streams"`
    +}
    +    StreamsConfig represents a stream configuration.
    +
     # Package: ./gateway
     
     package gateway // import "github.com/TykTechnologies/tyk/gateway"
    @@ -7693,6 +7822,8 @@
     
     swagger:meta
     
    +Provides getStreamingMiddleware
    +
     CONSTANTS
     
     const (
    @@ -7809,11 +7940,6 @@
     	ErrOAuthClientDeleted               = "oauth.client_deleted"
     )
     const (
    -	// ExtensionTykStreaming is the oas extension for tyk streaming
    -	ExtensionTykStreaming = "x-tyk-streaming"
    -	StreamGCInterval      = 1 * time.Minute
    -)
    -const (
     	ResetQuota              string = "resetQuota"
     	CertificateRemoved      string = "CertificateRemoved"
     	CertificateAdded        string = "CertificateAdded"
    @@ -7965,10 +8091,6 @@
     
     func GenerateTestBinaryData() (buf *bytes.Buffer)
     func GetAccessDefinitionByAPIIDOrSession(session *user.SessionState, api *APISpec) (accessDef *user.AccessDefinition, allowanceScope string, err error)
    -func GetHTTPPaths(streamConfig map[string]interface{}) []string
    -    GetHTTPPaths is the ain function to get HTTP paths from the stream
    -    configuration
    -
     func GetTLSClient(cert *tls.Certificate, caCert []byte) *http.Client
     func GetTLSConfig(cert *tls.Certificate, caCert []byte) *tls.Config
     func InitTestMain(ctx context.Context, m *testing.M) int
    @@ -8406,8 +8528,6 @@
     	Skip           bool
     }
     
    -type ClientCredentialsOAuthProvider struct{}
    -
     type CoProcessEventHandler struct {
     	Spec     *APISpec
     	SpecJSON json.RawMessage
    @@ -8583,6 +8703,8 @@
     	resetTTLTo int64, hashed bool) error
         UpdateSession updates the session state in the storage engine
     
    +type DistributedCacheOAuthProvider struct{}
    +
     type DummyProxyHandler struct {
     	SH SuccessHandler
     	Gw *Gateway `json:"-"`
    @@ -8780,7 +8902,7 @@
     	HostCheckerClient    *http.Client
     	TracerProvider       otel.TracerProvider
     	// UpstreamOAuthCache is used to cache upstream OAuth tokens
    -	UpstreamOAuthCache UpstreamOAuthCache
    +	UpstreamOAuthCache *upstreamOAuthCache
     
     	SessionLimiter SessionLimiter
     	SessionMonitor Monitor
    @@ -8904,6 +9026,12 @@
     
     func (gw *Gateway) ProcessSingleOauthClientEvent(apiId, oauthClientId, orgID, event string)
     
    +func (gw *Gateway) ReplaceTykVariables(r *http.Request, in string, escape bool) string
    +    ReplaceTykVariables implements a variable replacement hook. It will replace
    +    the template `in`. If `escape` is true, the values get escaped as a query
    +    parameter for a HTTP request would. If no replacement has been made, `in` is
    +    returned without modification.
    +
     func (gw *Gateway) RevokeAllTokensHandler(w http.ResponseWriter, r *http.Request)
     
     func (gw *Gateway) RevokeTokenHandler(w http.ResponseWriter, r *http.Request)
    @@ -9775,7 +9903,7 @@
     
     func (k *OrganizationMonitor) SetOrgSentinel(orgChan chan bool, orgId string)
     
    -type PerAPIClientCredentialsOAuthProvider struct{}
    +type PerAPIOAuthProvider struct{}
     
     type PersistGraphQLOperationMiddleware struct {
     	*BaseMiddleware
    @@ -10204,11 +10332,6 @@
         TickOk triggers a reload and ensures a queue happened and a reload cycle
         happens. This will block until all the cases are met.
     
    -type Repository interface {
    -	policy.Repository
    -}
    -    Repository is a description of our Gateway API promises.
    -
     type RequestDefinition struct {
     	Method      string            `json:"method"`
     	Headers     map[string]string `json:"headers"`
    @@ -10571,41 +10694,6 @@
     
     type StatsDSinkSanitizationFunc func(*bytes.Buffer, string)
     
    -type StreamManager struct {
    -	// Has unexported fields.
    -}
    -    StreamManager is responsible for creating a single stream
    -
    -type StreamingMiddleware struct {
    -	*BaseMiddleware
    -
    -	// Has unexported fields.
    -}
    -    StreamingMiddleware is a middleware that handles streaming functionality
    -
    -func (s *StreamingMiddleware) EnabledForSpec() bool
    -    EnabledForSpec checks if streaming is enabled on the config
    -
    -func (s *StreamingMiddleware) Init()
    -    Init initializes the middleware
    -
    -func (s *StreamingMiddleware) Name() string
    -    Name is StreamingMiddleware
    -
    -func (s *StreamingMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int)
    -    ProcessRequest will handle the streaming functionality
    -
    -func (s *StreamingMiddleware) Unload()
    -    Unload closes and remove active streams
    -
    -type StreamsConfig struct {
    -	Info struct {
    -		Version string `json:"version"`
    -	} `json:"info"`
    -	Streams map[string]any `json:"streams"`
    -}
    -    StreamsConfig represents a stream configuration
    -
     type StripAuth struct {
     	*BaseMiddleware
     }
    @@ -10826,9 +10914,10 @@
     }
     
     type TykMiddleware interface {
    -	Init()
     	Base() *BaseMiddleware
    +	GetSpec() *APISpec
     
    +	Init()
     	SetName(string)
     	SetRequestLogger(*http.Request)
     	Logger() *logrus.Entry
    @@ -10837,11 +10926,14 @@
     	EnabledForSpec() bool
     	Name() string
     
    -	GetSpec() *APISpec
    -
     	Unload()
     }
     
    +func WrapMiddleware(base *BaseMiddleware, in model.Middleware) TykMiddleware
    +    WrapMiddleware returns a new TykMiddleware with the provided base
    +    middleware, and the smaller model.Middleware interface. It allows to
    +    implement model.Middleware, and use it as a TykMiddleware.
    +
     type TykOsinServer struct {
     	osin.Server
     	Config            *osin.ServerConfig
    @@ -11001,10 +11093,6 @@
         ProcessRequest will inject basic auth info into request context so that it
         can be used during reverse proxy.
     
    -type UpstreamOAuthCache interface {
    -	// Has unexported methods.
    -}
    -
     type UpstreamOAuthProvider struct {
     	// HeaderName is the header name to be used to fill upstream auth with.
     	HeaderName string

    Copy link
    Contributor

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 No relevant tests
    🔒 No security concerns identified
    ⚡ Key issues to review

    Possible Bug
    The method setUpOrDryRunStream in stream_manager.go seems to have a logical issue. It checks if sm.dryRun is true and if httpPaths is empty before creating a stream, which might not be the intended behavior. It should be reviewed to ensure that streams are created correctly in all scenarios.

    Code Smell
    The ProcessRequest method in streaming_middleware.go uses a new instance of Manager for each request, which might not be efficient. Consider reusing managers or using a pool of managers.

    Resource Leak
    The Manager class in stream_manager.go does not appear to properly manage the lifecycle of stream objects, potentially leading to resource leaks. Ensure that streams are correctly stopped and removed when no longer needed.

    Copy link
    Contributor

    PR Code Suggestions ✨

    No code suggestions found for the PR.

    @titpetric titpetric requested a review from a team as a code owner October 10, 2024 08:22
    @buger
    Copy link
    Member

    buger commented Oct 10, 2024

    @titpetric as discussed it should live in "ee" folder, e.g. "ee/internal/middleware"
    Also are there any reason to use "v1" in module name streamv1? It is non public module, not intended to be used outside, separately.

    @agata-wit agata-wit changed the title Refactor/streams TT-13269 - Refactor/streams Oct 17, 2024
    @buger buger requested a review from a team as a code owner October 17, 2024 11:58
    Copy link

    Quality Gate Failed Quality Gate failed

    Failed conditions
    0.0% Coverage on New Code (required ≥ 80%)
    C Reliability Rating on New Code (required ≥ A)

    See analysis details on SonarCloud

    Catch issues before they fail your Quality Gate with our IDE extension SonarLint

    @buger buger merged commit aee8137 into master Oct 19, 2024
    37 of 49 checks passed
    @buger buger deleted the refactor/streams branch October 19, 2024 14:08
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    4 participants