diff --git a/.github/workflows/plugin-compiler-build.yml b/.github/workflows/plugin-compiler-build.yml
index 7ff0b3d1d05..aa4939fd603 100644
--- a/.github/workflows/plugin-compiler-build.yml
+++ b/.github/workflows/plugin-compiler-build.yml
@@ -8,7 +8,7 @@ on:
- master
- release-**
tags:
- - 'v*'
+ - "v*"
env:
GOLANG_CROSS: 1.22-bullseye
@@ -80,3 +80,35 @@ jobs:
BASE-IMAGE=tykio/golang-cross:${{ env.GOLANG_CROSS }}
GITHUB_SHA=${{ github.sha }}
GITHUB_TAG=${{ github.ref_name }}
+
+ - name: Set docker metadata EE
+ id: set-metadata-ee
+ uses: docker/metadata-action@v4
+ with:
+ images: |
+ tykio/tyk-plugin-compiler-ee,enable=${{ startsWith(github.ref, 'refs/tags') }}
+ ${{ steps.login-ecr.outputs.registry }}/tyk-plugin-compiler-ee
+ labels: |
+ org.opencontainers.image.title=tyk-plugin-compiler-ee
+ org.opencontainers.image.description=Plugin compiler for the Tyk API Gateway Enterprise Edition
+ tags: |
+ type=ref,event=pr
+ type=semver,pattern=v{{version}}
+ type=semver,pattern=v{{major}}.{{minor}}
+ type=semver,pattern={{raw}}
+ type=sha,format=long
+
+ - name: Build and push to dockerhub/ECR EE
+ uses: docker/build-push-action@v4
+ with:
+ context: .
+ file: ci/images/plugin-compiler/Dockerfile
+ platforms: linux/amd64
+ push: true
+ labels: ${{ steps.set-metadata-ee.outputs.labels }}
+ tags: ${{ steps.set-metadata-ee.outputs.tags }}
+ build-args: |
+ BASE-IMAGE=tykio/golang-cross:${{ env.GOLANG_CROSS }}
+ GITHUB_SHA=${{ github.sha }}
+ GITHUB_TAG=${{ github.ref_name }}
+ BUILD_TAG=ee
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index cf7fec94016..f39edc6bd5c 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -131,7 +131,7 @@ jobs:
with:
mask-password: 'true'
- name: Docker metadata for CI
- id: ci_metadata
+ id: ci_metadata_
if: ${{ matrix.golang_cross == '1.22-bullseye' }}
uses: docker/metadata-action@v5
with:
@@ -157,10 +157,43 @@ jobs:
push: true
cache-from: type=gha
cache-to: type=gha,mode=max
- tags: ${{ steps.ci_metadata.outputs.tags }}
+ tags: ${{ steps.ci_metadata_.outputs.tags }}
labels: ${{ steps.tag_metadata.outputs.labels }}
+ build-args: |
+ EDITION=
+ - name: Docker metadata for CI ee
+ id: ci_metadata_ee
+ if: ${{ matrix.golang_cross == '1.22-bullseye' }}
+ uses: docker/metadata-action@v5
+ with:
+ images: ${{ steps.ecr.outputs.registry }}/tyk-ee
+ flavor: |
+ latest=false
+ tags: |
+ type=ref,event=branch
+ type=ref,event=pr
+ type=sha,format=long
+ type=semver,pattern={{major}},prefix=v
+ type=semver,pattern={{major}}.{{minor}},prefix=v
+ type=semver,pattern={{version}},prefix=v
+ - name: push image to CI ee
+ if: ${{ matrix.golang_cross == '1.22-bullseye' }}
+ uses: docker/build-push-action@v6
+ with:
+ context: "dist"
+ platforms: linux/amd64,linux/arm64
+ file: ci/Dockerfile.distroless
+ provenance: mode=max
+ sbom: true
+ push: true
+ cache-from: type=gha
+ cache-to: type=gha,mode=max
+ tags: ${{ steps.ci_metadata_ee.outputs.tags }}
+ labels: ${{ steps.tag_metadata.outputs.labels }}
+ build-args: |
+ EDITION=-ee
- name: Docker metadata for tag push
- id: tag_metadata
+ id: tag_metadata_
uses: docker/metadata-action@v5
with:
images: |
@@ -185,8 +218,39 @@ jobs:
cache-from: type=gha
cache-to: type=gha,mode=max
push: ${{ startsWith(github.ref, 'refs/tags') }}
- tags: ${{ steps.tag_metadata.outputs.tags }}
- labels: ${{ steps.tag_metadata.outputs.labels }}
+ tags: ${{ steps.tag_metadata_.outputs.tags }}
+ labels: ${{ steps.tag_metadata_.outputs.labels }}
+ build-args: |
+ EDITION=
+ - name: Docker metadata for tag push ee
+ id: tag_metadata_ee
+ uses: docker/metadata-action@v5
+ with:
+ images: |
+ tykio/tyk-gateway-ee
+ flavor: |
+ latest=false
+ prefix=v
+ tags: |
+ type=semver,pattern={{major}}.{{minor}}
+ type=semver,pattern={{version}}
+ labels: "org.opencontainers.image.title=tyk-gateway Enterprise Edition (distroless) \norg.opencontainers.image.description=Tyk Open Source API Gateway written in Go, supporting REST, GraphQL, TCP and gRPC protocols\norg.opencontainers.image.vendor=tyk.io\norg.opencontainers.image.version=${{ github.ref_name }}\n"
+ - name: push image to prod ee
+ if: ${{ matrix.golang_cross == '1.22-bullseye' }}
+ uses: docker/build-push-action@v6
+ with:
+ context: "dist"
+ platforms: linux/amd64,linux/arm64
+ file: ci/Dockerfile.distroless
+ provenance: mode=max
+ sbom: true
+ cache-from: type=gha
+ cache-to: type=gha,mode=max
+ push: ${{ startsWith(github.ref, 'refs/tags') }}
+ tags: ${{ steps.tag_metadata_ee.outputs.tags }}
+ labels: ${{ steps.tag_metadata_ee.outputs.labels }}
+ build-args: |
+ EDITION=-ee
- name: save deb
uses: actions/upload-artifact@v4
if: ${{ matrix.golang_cross == '1.22-bullseye' }}
@@ -310,37 +374,9 @@ jobs:
repository: TykTechnologies/tyk-analytics
path: tyk-analytics
token: ${{ secrets.ORG_GH_TOKEN }}
- fetch-depth: 0
+ fetch-depth: 1
+ ref: ${{ env.BASE_REF }}
sparse-checkout: tests/api
- - name: Choosing test code branch
- working-directory: tyk-analytics/tests/api
- run: |
- if [[ ${{ github.event_name }} == "release" ]]; then
- echo "Checking out release tag..."
- TAG_NAME=${{ github.event.release.tag_name }}
- git checkout "$TAG_NAME"
- fi
- if [[ ${{ github.event_name }} == "pull_request" ]]; then
- PR_BRANCH=${{ github.event.pull_request.head.ref }}
- TARGET_BRANCH=${{ github.event.pull_request.base.ref }}
- echo "Looking for PR_BRANCH:$PR_BRANCH or TARGET_BRANCH:$TARGET_BRANCH..."
- if git rev-parse --verify "origin/$PR_BRANCH" >/dev/null 2>&1; then
- echo "PR branch $PR_BRANCH exists. Checking out..."
- git checkout "$PR_BRANCH"
- elif git rev-parse --verify "origin/$TARGET_BRANCH" >/dev/null 2>&1; then
- echo "Target branch $TARGET_BRANCH exists. Checking out..."
- git checkout "$TARGET_BRANCH"
- fi
- fi
- if [[ ${{ github.event_name }} == "push" ]]; then
- PUSH_BRANCH=${{ github.ref_name }}
- echo "Looking for PUSH_BRANCH:$PUSH_BRANCH..."
- if git rev-parse --verify "origin/$PUSH_BRANCH" >/dev/null 2>&1; then
- echo "Push branch $PUSH_BRANCH exists. Checking out..."
- git checkout "$PUSH_BRANCH"
- fi
- fi
- echo "Current commit: $(git rev-parse HEAD)"
- uses: actions/setup-python@v5
with:
cache: 'pip'
@@ -369,6 +405,20 @@ jobs:
USER_API_SECRET=${{ steps.env_up.outputs.USER_API_SECRET }}
EOF
env $(cat pytest.env | xargs) $pytest -m "${{ matrix.envfiles.apimarkers }}"
+ - name: Upload Playwright Test Report to S3
+ if: failure() && steps.test_execution.outcome != 'success' && steps.env_up.outcome == 'success'
+ run: npm run upload_report_to_s3
+ env:
+ AWS_ACCESS_KEY_ID: ${{ secrets.UI_AWS_ACCESS_KEY_ID }}
+ AWS_SECRET_ACCESS_KEY: ${{ secrets.UI_AWS_SECRET_ACCESS_KEY }}
+ RUN_ID: 'tyk-analytics/${{ github.run_id }}'
+ working-directory: tyk-analytics/tests/ui
+ - name: Share S3 report link into summary
+ if: failure() && steps.test_execution.outcome != 'success' && steps.env_up.outcome == 'success'
+ run: |
+ echo "# :clipboard: S3 UI Test REPORT: ${{ matrix.envfiles.db }}-${{ matrix.envfiles.conf }}" >> $GITHUB_STEP_SUMMARY
+ echo "- Status: ${{ steps.test_execution.outcome == 'success' && ':white_check_mark:' || ':no_entry_sign:' }}" >> $GITHUB_STEP_SUMMARY
+ echo "- [Link to report](https://tyk-qa-reports.s3.eu-central-1.amazonaws.com/tyk-analytics/${{ github.run_id }}/index.html)" >> $GITHUB_STEP_SUMMARY
- name: Generate metadata and upload test reports
id: metadata_report
if: always() && (steps.test_execution.conclusion != 'skipped')
diff --git a/LICENSE.md b/LICENSE.md
index 771dfbcd592..810075a41b7 100644
--- a/LICENSE.md
+++ b/LICENSE.md
@@ -1,3 +1,12 @@
+The code in the root directory and all subdirectories, except for the 'ee' folder,
+is licensed under the Mozilla Public License Version 2.0 (the "MPL"), as detailed below.
+
+The code in the 'ee' folder is subject to a separate commercial license.
+See the [LICENSE-EE](ee/LICENSE-EE.md) file in the 'ee' folder for details on the Enterprise Edition license.
+
+For the open source components:
+-------------------------------
+
# Mozilla Public License Version 2.0
## 1. Definitions
@@ -181,4 +190,4 @@ You may add additional accurate notices of copyright ownership.
## Exhibit B - “Incompatible With Secondary Licenses” Notice
-> This Source Code Form is "Incompatible With Secondary Licenses", as defined by the Mozilla Public License, v. 2.0.
\ No newline at end of file
+> This Source Code Form is "Incompatible With Secondary Licenses", as defined by the Mozilla Public License, v. 2.0.
diff --git a/README.md b/README.md
index 1ef30ddbf62..bab78264f8d 100644
--- a/README.md
+++ b/README.md
@@ -45,7 +45,7 @@ Tyk runs natively on _Kubernetes_, if you prefer, thanks to the _[Tyk Kubernetes
- The Enterprise API Management platform SaaS: Management Control Plane, Dashboard GUI & Developer Portal.
+ The Enterprise API Management platform SaaS: Management Control Plane, Dashboard GUI & Developer Portal.
Deploy Tyk Cloud
@@ -82,7 +82,7 @@ Your Tyk Gateway is now configured and ready to use. Confirm this by checking ag
```console
curl localhost:8080/hello
```
-Output:
+Output:
```json
{"status": "pass", "version": "v3.2.1", "description": "Tyk GW"}
```
@@ -166,11 +166,13 @@ All the documentation for Tyk Gateway and other OSS-related topics can be found
* [Newsletters ](https://pages.tyk.io/newsletter)- Subscribe to our GraphQL & API newsletters
* If you are using Tyk give us a star ⭐️
-## Open Source License
+## Licensing
-Tyk is released under the MPL v2.0; please see [LICENSE.md](https://github.com/TykTechnologies/tyk/blob/master/LICENSE.md) for a full version of the license.
+Tyk is dual-licensed:
-![FOSSA Status](https://app.fossa.io/api/projects/git%2Bgithub.com%2FTykTechnologies%2Ftyk.svg?type=large)
+1. Open Source License: The code in the root directory and all subdirectories except the 'ee' folder is released under the MPL v2.0. Please see [LICENSE](https://github.com/TykTechnologies/tyk/blob/master/LICENSE) for the full version of the open source license.
+
+2. Commercial License: The code in the 'ee' folder is subject to a commercial license. For more information about obtaining a commercial license, please contact our sales team at sales@tyk.io.
## Compiling Tyk Gateway
diff --git a/ci/Dockerfile.distroless b/ci/Dockerfile.distroless
index 45570f602f9..d823708f0d1 100644
--- a/ci/Dockerfile.distroless
+++ b/ci/Dockerfile.distroless
@@ -2,11 +2,12 @@
FROM debian:bookworm-slim as DEB
ARG TARGETARCH
+ARG EDITION
ENV DEBIAN_FRONTEND=noninteractive
COPY *${TARGETARCH}.deb /
-RUN rm -f /*fips*.deb && dpkg -i /tyk-gateway*${TARGETARCH}.deb && rm /*.deb
+RUN rm -f /*fips*.deb && dpkg -i /tyk-gateway${EDITION}_*${TARGETARCH}.deb && rm /*.deb
FROM gcr.io/distroless/base-debian12:latest
diff --git a/ci/goreleaser/goreleaser.yml b/ci/goreleaser/goreleaser.yml
index 1a52115a178..8296607b48f 100644
--- a/ci/goreleaser/goreleaser.yml
+++ b/ci/goreleaser/goreleaser.yml
@@ -21,12 +21,45 @@ builds:
goarch:
- amd64
binary: tyk
- - id: fips
+ - id: std-arm64
flags:
- -tags=ignore
- -trimpath
- -tags=goplugin
- - -tags=fips,boringcrypto
+ ldflags:
+ - -X github.com/TykTechnologies/tyk/internal/build.Version={{.Version}}
+ - -X github.com/TykTechnologies/tyk/internal/build.Commit={{.FullCommit}}
+ - -X github.com/TykTechnologies/tyk/internal/build.BuildDate={{.Date}}
+ - -X github.com/TykTechnologies/tyk/internal/build.BuiltBy=goreleaser
+ env:
+ - CC=aarch64-linux-gnu-gcc
+ goos:
+ - linux
+ goarch:
+ - arm64
+ binary: tyk
+ - id: std-s390x
+ flags:
+ - -tags=ignore
+ - -trimpath
+ - -tags=goplugin
+ ldflags:
+ - -X github.com/TykTechnologies/tyk/internal/build.Version={{.Version}}
+ - -X github.com/TykTechnologies/tyk/internal/build.Commit={{.FullCommit}}
+ - -X github.com/TykTechnologies/tyk/internal/build.BuildDate={{.Date}}
+ - -X github.com/TykTechnologies/tyk/internal/build.BuiltBy=goreleaser
+ env:
+ - CC=s390x-linux-gnu-gcc
+ goos:
+ - linux
+ goarch:
+ - s390x
+ binary: tyk
+ - id: fips
+ flags:
+ - -tags=ignore
+ - -trimpath
+ - -tags=goplugin,fips,boringcrypto,ee
env:
- GOEXPERIMENT=boringcrypto
ldflags:
@@ -39,11 +72,26 @@ builds:
goarch:
- amd64
binary: tyk
- - id: std-arm64
+ - id: ee
flags:
- -tags=ignore
- -trimpath
- - -tags=goplugin
+ - -tags=goplugin,ee
+ ldflags:
+ - -X github.com/TykTechnologies/tyk/internal/build.Version={{.Version}}
+ - -X github.com/TykTechnologies/tyk/internal/build.Commit={{.FullCommit}}
+ - -X github.com/TykTechnologies/tyk/internal/build.BuildDate={{.Date}}
+ - -X github.com/TykTechnologies/tyk/internal/build.BuiltBy=goreleaser
+ goos:
+ - linux
+ goarch:
+ - amd64
+ binary: tyk
+ - id: ee-arm64
+ flags:
+ - -tags=ignore
+ - -trimpath
+ - -tags=goplugin,ee
ldflags:
- -X github.com/TykTechnologies/tyk/internal/build.Version={{.Version}}
- -X github.com/TykTechnologies/tyk/internal/build.Commit={{.FullCommit}}
@@ -56,11 +104,11 @@ builds:
goarch:
- arm64
binary: tyk
- - id: std-s390x
+ - id: ee-s390x
flags:
- -tags=ignore
- -trimpath
- - -tags=goplugin
+ - -tags=goplugin,ee
ldflags:
- -X github.com/TykTechnologies/tyk/internal/build.Version={{.Version}}
- -X github.com/TykTechnologies/tyk/internal/build.Commit={{.FullCommit}}
@@ -190,6 +238,65 @@ nfpms:
signature:
key_file: tyk.io.signing.key
type: origin
+ - id: ee
+ vendor: "Tyk Technologies Ltd"
+ homepage: "https://tyk.io"
+ maintainer: "Tyk "
+ description: Tyk Open Source API Gateway written in Go, supporting REST, GraphQL, TCP and gRPC protocols
+ package_name: tyk-gateway-ee
+ file_name_template: "{{ .ConventionalFileName }}"
+ builds:
+ - ee
+ - ee-arm64
+ - ee-s390x
+ formats:
+ - deb
+ - rpm
+ contents:
+ - src: "README.md"
+ dst: "/opt/share/docs/tyk-gateway/README.md"
+ - src: "ci/install/*"
+ dst: "/opt/tyk-gateway/install"
+ - src: ci/install/inits/systemd/system/tyk-gateway.service
+ dst: /lib/systemd/system/tyk-gateway.service
+ - src: ci/install/inits/sysv/init.d/tyk-gateway
+ dst: /etc/init.d/tyk-gateway
+ - src: /opt/tyk-gateway
+ dst: /opt/tyk
+ type: "symlink"
+ - src: "LICENSE.md"
+ dst: "/opt/share/docs/tyk-gateway/LICENSE.md"
+ - src: "apps/app_sample.*"
+ dst: "/opt/tyk-gateway/apps"
+ - src: "templates/*.json"
+ dst: "/opt/tyk-gateway/templates"
+ - src: "templates/playground/*"
+ dst: "/opt/tyk-gateway/templates/playground"
+ - src: "middleware/*.js"
+ dst: "/opt/tyk-gateway/middleware"
+ - src: "event_handlers/sample/*.js"
+ dst: "/opt/tyk-gateway/event_handlers/sample"
+ - src: "policies/*.json"
+ dst: "/opt/tyk-gateway/policies"
+ - src: "coprocess/*"
+ dst: "/opt/tyk-gateway/coprocess"
+ - src: tyk.conf.example
+ dst: /opt/tyk-gateway/tyk.conf
+ type: "config|noreplace"
+ scripts:
+ preinstall: "ci/install/before_install.sh"
+ postinstall: "ci/install/post_install.sh"
+ postremove: "ci/install/post_remove.sh"
+ bindir: "/opt/tyk-gateway"
+ rpm:
+ scripts:
+ posttrans: ci/install/post_trans.sh
+ signature:
+ key_file: tyk.io.signing.key
+ deb:
+ signature:
+ key_file: tyk.io.signing.key
+ type: origin
publishers:
- name: tyk-gateway-unstable
env:
diff --git a/ci/images/plugin-compiler/Dockerfile b/ci/images/plugin-compiler/Dockerfile
index b6c021f997d..f045ce5a784 100644
--- a/ci/images/plugin-compiler/Dockerfile
+++ b/ci/images/plugin-compiler/Dockerfile
@@ -36,6 +36,9 @@ ARG GITHUB_TAG
ENV GITHUB_SHA=${GITHUB_SHA}
ENV GITHUB_TAG=${GITHUB_TAG}
+ARG BUILD_TAG
+ENV BUILD_TAG=${BUILD_TAG}
+
COPY ci/images/plugin-compiler/data/build.sh /build.sh
RUN chmod +x /build.sh
diff --git a/ci/images/plugin-compiler/data/build.sh b/ci/images/plugin-compiler/data/build.sh
index 885af399802..c5006653e46 100755
--- a/ci/images/plugin-compiler/data/build.sh
+++ b/ci/images/plugin-compiler/data/build.sh
@@ -16,7 +16,7 @@ GATEWAY_VERSION=$(echo $GITHUB_TAG | perl -n -e'/v(\d+).(\d+).(\d+)/'' && print
#
# If GOOS and GOARCH are not set, it will build `{plugin_name}`.
#
-# Example command: ./build.sh
+# Example command: ./build.sh
# Example output: tyk-extras_5.0.0_linux_amd64.so
plugin_name=$1
@@ -145,7 +145,11 @@ if [[ "$DEBUG" == "1" ]] ; then
git diff --cached
fi
-CC=$CC CGO_ENABLED=1 GOOS=$GOOS GOARCH=$GOARCH go build -buildmode=plugin -trimpath -o $plugin_name
+if [ -n "$BUILD_TAG" ]; then
+ CC=$CC CGO_ENABLED=1 GOOS=$GOOS GOARCH=$GOARCH go build -buildmode=plugin -trimpath -tags=$BUILD_TAG -o $plugin_name
+else
+ CC=$CC CGO_ENABLED=1 GOOS=$GOOS GOARCH=$GOARCH go build -buildmode=plugin -trimpath -o $plugin_name
+fi
set +x
diff --git a/ee/EULA.pdf b/ee/EULA.pdf
new file mode 100644
index 00000000000..53c2c895c0a
Binary files /dev/null and b/ee/EULA.pdf differ
diff --git a/ee/LICENSE-EE.md b/ee/LICENSE-EE.md
new file mode 100644
index 00000000000..de7aeeab9d7
--- /dev/null
+++ b/ee/LICENSE-EE.md
@@ -0,0 +1,7 @@
+Commercial License
+
+The code in this 'ee' folder is subject to the commercial license terms outlined in the accompanying [EULA.pdf](ee/EULA.pdf) file.
+
+For the full End User License Agreement, please refer to the [EULA.pdf](EULA.pdf) file in this directory.
+
+Copyright 2024 Tyk Technologies. All rights reserved.
diff --git a/ee/middleware/streams/manager.go b/ee/middleware/streams/manager.go
new file mode 100644
index 00000000000..42b2ce78f36
--- /dev/null
+++ b/ee/middleware/streams/manager.go
@@ -0,0 +1,115 @@
+package streams
+
+import (
+ "fmt"
+ "net/http"
+ "strings"
+ "sync"
+ "sync/atomic"
+
+ "github.com/gorilla/mux"
+)
+
+// Manager is responsible for creating a single stream.
+type Manager struct {
+ streams sync.Map
+ routeLock sync.Mutex
+ muxer *mux.Router
+ mw *Middleware
+ dryRun bool
+ listenPaths []string
+ activityCounter atomic.Int32 // Counts active subscriptions, requests.
+}
+
+func (sm *Manager) initStreams(r *http.Request, config *StreamsConfig) {
+ // Clear existing routes for this consumer group
+ sm.muxer = mux.NewRouter()
+
+ for streamID, streamConfig := range config.Streams {
+ sm.setUpOrDryRunStream(streamConfig, streamID)
+ }
+
+ // If it is default stream manager, init muxer
+ if r == nil {
+ for _, path := range sm.listenPaths {
+ sm.muxer.HandleFunc(path, func(_ http.ResponseWriter, _ *http.Request) {
+ // Dummy handler
+ })
+ }
+ }
+}
+
+func (sm *Manager) setUpOrDryRunStream(streamConfig any, streamID string) {
+ if streamMap, ok := streamConfig.(map[string]interface{}); ok {
+ httpPaths := GetHTTPPaths(streamMap)
+
+ if sm.dryRun {
+ if len(httpPaths) == 0 {
+ err := sm.createStream(streamID, streamMap)
+ if err != nil {
+ sm.mw.Logger().WithError(err).Errorf("Error creating stream %s", streamID)
+ }
+ }
+ } else {
+ err := sm.createStream(streamID, streamMap)
+ if err != nil {
+ sm.mw.Logger().WithError(err).Errorf("Error creating stream %s", streamID)
+ }
+ }
+ sm.listenPaths = append(sm.listenPaths, httpPaths...)
+ }
+}
+
+// removeStream removes a stream
+func (sm *Manager) removeStream(streamID string) error {
+ streamFullID := fmt.Sprintf("%s_%s", sm.mw.Spec.APIID, streamID)
+
+ if streamValue, exists := sm.streams.Load(streamFullID); exists {
+ stream, ok := streamValue.(*Stream)
+ if !ok {
+ return fmt.Errorf("stream %s is not a valid stream", streamID)
+ }
+ err := stream.Stop()
+ if err != nil {
+ return err
+ }
+ sm.streams.Delete(streamFullID)
+ } else {
+ return fmt.Errorf("stream %s does not exist", streamID)
+ }
+ return nil
+}
+
+// createStream creates a new stream
+func (sm *Manager) createStream(streamID string, config map[string]interface{}) error {
+ streamFullID := fmt.Sprintf("%s_%s", sm.mw.Spec.APIID, streamID)
+ sm.mw.Logger().Debugf("Creating stream: %s", streamFullID)
+
+ stream := NewStream(sm.mw.allowedUnsafe)
+ err := stream.Start(config, &handleFuncAdapter{
+ mw: sm.mw,
+ streamID: streamFullID,
+ muxer: sm.muxer,
+ sm: sm,
+ // child logger is necessary to prevent race condition
+ logger: sm.mw.Logger().WithField("stream", streamFullID),
+ })
+ if err != nil {
+ sm.mw.Logger().Errorf("Failed to start stream %s: %v", streamFullID, err)
+ return err
+ }
+
+ sm.streams.Store(streamFullID, stream)
+ sm.mw.Logger().Infof("Successfully created stream: %s", streamFullID)
+
+ return nil
+}
+
+func (sm *Manager) hasPath(path string) bool {
+ for _, p := range sm.listenPaths {
+ if strings.TrimPrefix(path, "/") == strings.TrimPrefix(p, "/") {
+ return true
+ }
+ }
+ return false
+}
diff --git a/ee/middleware/streams/middleware.go b/ee/middleware/streams/middleware.go
new file mode 100644
index 00000000000..5f96a8a0e3f
--- /dev/null
+++ b/ee/middleware/streams/middleware.go
@@ -0,0 +1,281 @@
+package streams
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/http"
+ "net/url"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/gorilla/mux"
+ "github.com/sirupsen/logrus"
+
+ "github.com/TykTechnologies/tyk/internal/middleware"
+ "github.com/TykTechnologies/tyk/internal/model"
+)
+
+// Middleware implements a streaming middleware.
+type Middleware struct {
+ Spec *APISpec
+ Gw Gateway
+
+ base BaseMiddleware
+
+ createStreamManagerLock sync.Mutex
+ StreamManagerCache sync.Map // Map of payload hash to Manager
+
+ ctx context.Context
+ cancel context.CancelFunc
+ allowedUnsafe []string
+ defaultManager *Manager
+}
+
+// Middleware implements model.Middleware.
+var _ model.Middleware = &Middleware{}
+
+// NewMiddleware returns a new instance of Middleware.
+func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec) *Middleware {
+ return &Middleware{
+ base: mw,
+ Gw: gw,
+ Spec: spec,
+ }
+}
+
+// Logger returns a logger with middleware filled out.
+func (s *Middleware) Logger() *logrus.Entry {
+ return s.base.Logger().WithField("mw", s.Name())
+}
+
+// Name returns the name for the middleware.
+func (s *Middleware) Name() string {
+ return "StreamingMiddleware"
+}
+
+// EnabledForSpec checks if streaming is enabled on the config.
+func (s *Middleware) EnabledForSpec() bool {
+ s.Logger().Debug("Checking if streaming is enabled")
+
+ streamingConfig := s.Gw.GetConfig().Streaming
+ s.Logger().Debugf("Streaming config: %+v", streamingConfig)
+
+ if streamingConfig.Enabled {
+ s.Logger().Debug("Streaming is enabled in the config")
+ s.allowedUnsafe = streamingConfig.AllowUnsafe
+ s.Logger().Debugf("Allowed unsafe components: %v", s.allowedUnsafe)
+
+ config := s.getStreamsConfig(nil)
+ GlobalStreamCounter.Add(int64(len(config.Streams)))
+
+ s.Logger().Debug("Total streams count: ", len(config.Streams))
+
+ return len(config.Streams) != 0
+ }
+
+ s.Logger().Debug("Streaming is not enabled in the config")
+ return false
+}
+
+// Init initializes the middleware
+func (s *Middleware) Init() {
+ s.Logger().Debug("Initializing Middleware")
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+
+ s.Logger().Debug("Initializing default stream manager")
+ s.defaultManager = s.CreateStreamManager(nil)
+
+ // Start garbage collection routine
+ go func() {
+ ticker := time.NewTicker(StreamGCInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ s.GC()
+ case <-s.ctx.Done():
+ return
+ }
+ }
+ }()
+}
+
+// CreateStreamManager creates or retrieves a stream manager based on the request.
+func (s *Middleware) CreateStreamManager(r *http.Request) *Manager {
+ streamsConfig := s.getStreamsConfig(r)
+ configJSON, _ := json.Marshal(streamsConfig)
+ cacheKey := fmt.Sprintf("%x", sha256.Sum256(configJSON))
+
+ s.createStreamManagerLock.Lock()
+ defer s.createStreamManagerLock.Unlock()
+
+ s.Logger().Debug("Attempting to load stream manager from cache")
+ s.Logger().Debugf("Cache key: %s", cacheKey)
+ if cachedManager, found := s.StreamManagerCache.Load(cacheKey); found {
+ s.Logger().Debug("Found cached stream manager")
+ return cachedManager.(*Manager)
+ }
+
+ newManager := &Manager{
+ muxer: mux.NewRouter(),
+ mw: s,
+ dryRun: r == nil,
+ activityCounter: atomic.Int32{},
+ }
+ newManager.initStreams(r, streamsConfig)
+
+ if r != nil {
+ s.StreamManagerCache.Store(cacheKey, newManager)
+ }
+ return newManager
+}
+
+// GC removes inactive stream managers.
+func (s *Middleware) GC() {
+ s.Logger().Debug("Starting garbage collection for inactive stream managers")
+
+ s.StreamManagerCache.Range(func(key, value interface{}) bool {
+ manager := value.(*Manager)
+ if manager == s.defaultManager {
+ return true
+ }
+
+ if manager.activityCounter.Load() <= 0 {
+ s.Logger().Infof("Removing inactive stream manager: %v", key)
+ manager.streams.Range(func(streamKey, streamValue interface{}) bool {
+ streamID := streamKey.(string)
+ err := manager.removeStream(streamID)
+ if err != nil {
+ s.Logger().WithError(err).Errorf("Error removing stream %s", streamID)
+ }
+ return true
+ })
+ s.StreamManagerCache.Delete(key)
+ }
+
+ return true
+ })
+}
+
+func (s *Middleware) getStreamsConfig(r *http.Request) *StreamsConfig {
+ config := &StreamsConfig{Streams: make(map[string]any)}
+ if !s.Spec.IsOAS {
+ return config
+ }
+
+ extension, ok := s.Spec.OAS.T.Extensions[ExtensionTykStreaming]
+ if !ok {
+ return config
+ }
+
+ if streamsMap, ok := extension.(map[string]any); ok {
+ if streams, ok := streamsMap["streams"].(map[string]any); ok {
+ s.processStreamsConfig(r, streams, config)
+ }
+ }
+
+ return config
+}
+
+func (s *Middleware) processStreamsConfig(r *http.Request, streams map[string]any, config *StreamsConfig) {
+ for streamID, stream := range streams {
+ if r == nil {
+ s.Logger().Debugf("No request available to replace variables in stream config for %s", streamID)
+ } else {
+ s.Logger().Debugf("Stream config for %s: %v", streamID, stream)
+ marshaledStream, err := json.Marshal(stream)
+ if err != nil {
+ s.Logger().Errorf("Failed to marshal stream config: %v", err)
+ continue
+ }
+ replacedStream := s.Gw.ReplaceTykVariables(r, string(marshaledStream), true)
+
+ if replacedStream != string(marshaledStream) {
+ s.Logger().Debugf("Stream config changed for %s: %s", streamID, replacedStream)
+ } else {
+ s.Logger().Debugf("Stream config has not changed for %s: %s", streamID, replacedStream)
+ }
+
+ var unmarshaledStream map[string]interface{}
+ err = json.Unmarshal([]byte(replacedStream), &unmarshaledStream)
+ if err != nil {
+ s.Logger().Errorf("Failed to unmarshal replaced stream config: %v", err)
+ continue
+ }
+ stream = unmarshaledStream
+ }
+ config.Streams[streamID] = stream
+ }
+}
+
+// ProcessRequest will handle the streaming functionality.
+func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int) {
+ strippedPath := s.Spec.StripListenPath(r.URL.Path)
+ if !s.defaultManager.hasPath(strippedPath) {
+ return nil, http.StatusOK
+ }
+
+ s.Logger().Debugf("Processing request: %s, %s", r.URL.Path, strippedPath)
+
+ newRequest := &http.Request{
+ Method: r.Method,
+ URL: &url.URL{Scheme: r.URL.Scheme, Host: r.URL.Host, Path: strippedPath},
+ }
+
+ if !s.defaultManager.muxer.Match(newRequest, &mux.RouteMatch{}) {
+ return nil, http.StatusOK
+ }
+
+ var match mux.RouteMatch
+ streamManager := s.CreateStreamManager(r)
+ streamManager.routeLock.Lock()
+ streamManager.muxer.Match(newRequest, &match)
+ streamManager.routeLock.Unlock()
+
+ // direct Bento handler
+ handler, ok := match.Handler.(http.HandlerFunc)
+ if !ok {
+ return errors.New("invalid route handler"), http.StatusInternalServerError
+ }
+
+ streamManager.activityCounter.Add(1)
+ defer streamManager.activityCounter.Add(-1)
+
+ handler.ServeHTTP(w, r)
+
+ return nil, middleware.StatusRespond
+}
+
+// Unload closes and remove active streams.
+func (s *Middleware) Unload() {
+ s.Logger().Debugf("Unloading streaming middleware %s", s.Spec.Name)
+
+ totalStreams := 0
+ s.cancel()
+
+ s.StreamManagerCache.Range(func(_, value interface{}) bool {
+ manager, ok := value.(*Manager)
+ if !ok {
+ return true
+ }
+ manager.streams.Range(func(_, streamValue interface{}) bool {
+ totalStreams++
+ if stream, ok := streamValue.(*Stream); ok {
+ if err := stream.Reset(); err != nil {
+ s.Logger().WithError(err).Error("Failed to reset stream")
+ }
+ }
+ return true
+ })
+ return true
+ })
+
+ GlobalStreamCounter.Add(-int64(totalStreams))
+ s.StreamManagerCache = sync.Map{}
+ s.Logger().Info("All streams successfully removed")
+}
diff --git a/ee/middleware/streams/model.go b/ee/middleware/streams/model.go
new file mode 100644
index 00000000000..784cfe215e7
--- /dev/null
+++ b/ee/middleware/streams/model.go
@@ -0,0 +1,60 @@
+package streams
+
+import (
+ "sync/atomic"
+ "time"
+
+ "github.com/TykTechnologies/tyk/apidef/oas"
+ "github.com/TykTechnologies/tyk/internal/model"
+)
+
+const (
+ // ExtensionTykStreaming is the OAS extension for Tyk streaming.
+ ExtensionTykStreaming = "x-tyk-streaming"
+ StreamGCInterval = 1 * time.Minute
+)
+
+// BaseMiddleware is the subset of BaseMiddleware APIs that the middleware uses.
+type BaseMiddleware interface {
+ model.LoggerProvider
+}
+
+// Gateway is the subset of Gateway APIs that the middleware uses.
+type Gateway interface {
+ model.ConfigProvider
+ model.ReplaceTykVariables
+}
+
+// APISpec is a subset of gateway.APISpec for the values the middleware consumes.
+type APISpec struct {
+ APIID string
+ Name string
+ IsOAS bool
+ OAS oas.OAS
+
+ StripListenPath model.StripListenPathFunc
+}
+
+// NewAPISpec creates a new APISpec object based on the required inputs.
+// The resulting object is a subset of `*gateway.APISpec`.
+func NewAPISpec(id string, name string, isOasDef bool, oasDef oas.OAS, stripListenPath model.StripListenPathFunc) *APISpec {
+ return &APISpec{
+ APIID: id,
+ Name: name,
+ IsOAS: isOasDef,
+ OAS: oasDef,
+ StripListenPath: stripListenPath,
+ }
+}
+
+// StreamsConfig represents a stream configuration.
+type StreamsConfig struct {
+ Info struct {
+ Version string `json:"version"`
+ } `json:"info"`
+
+ Streams map[string]any `json:"streams"`
+}
+
+// GlobalStreamCounter is used for testing.
+var GlobalStreamCounter atomic.Int64
diff --git a/internal/streaming/manager.go b/ee/middleware/streams/stream.go
similarity index 94%
rename from internal/streaming/manager.go
rename to ee/middleware/streams/stream.go
index 65c95542ec4..cc5554e2361 100644
--- a/internal/streaming/manager.go
+++ b/ee/middleware/streams/stream.go
@@ -1,4 +1,4 @@
-package streaming
+package streams
import (
"context"
@@ -17,7 +17,7 @@ import (
_ "github.com/TykTechnologies/tyk/internal/portal"
)
-// Stream is a wrapper around benthos stream
+// Stream is a wrapper around stream
type Stream struct {
allowedUnsafe []string
streamConfig string
@@ -51,7 +51,7 @@ func (s *Stream) SetLogger(logger *logrus.Logger) {
}
}
-// Start loads up the configuration and starts the benthos stream. Non blocking
+// Start loads up the configuration and starts the stream. Non blocking
func (s *Stream) Start(config map[string]interface{}, mux service.HTTPMultiplexer) error {
s.log.Debugf("Starting stream")
@@ -107,7 +107,7 @@ func (s *Stream) Start(config map[string]interface{}, mux service.HTTPMultiplexe
return nil
}
-// Stop cleans up the benthos stream
+// Stop cleans up the stream
func (s *Stream) Stop() error {
s.log.Printf("Stopping stream")
@@ -141,7 +141,7 @@ func (s *Stream) Stop() error {
return nil
}
-// GetConfig returns the benthos configuration of the stream
+// GetConfig returns the configuration of the stream
func (s *Stream) GetConfig() string {
return s.streamConfig
}
diff --git a/internal/streaming/manager_test.go b/ee/middleware/streams/stream_test.go
similarity index 99%
rename from internal/streaming/manager_test.go
rename to ee/middleware/streams/stream_test.go
index 5f565130a39..7c2a0b8ee19 100644
--- a/internal/streaming/manager_test.go
+++ b/ee/middleware/streams/stream_test.go
@@ -1,4 +1,4 @@
-package streaming
+package streams
import (
"strings"
diff --git a/ee/middleware/streams/util.go b/ee/middleware/streams/util.go
new file mode 100644
index 00000000000..a3218a12b1f
--- /dev/null
+++ b/ee/middleware/streams/util.go
@@ -0,0 +1,98 @@
+package streams
+
+import (
+ "net/http"
+
+ "github.com/gorilla/mux"
+ "github.com/sirupsen/logrus"
+)
+
+type handleFuncAdapter struct {
+ streamID string
+ sm *Manager
+ mw *Middleware
+ muxer *mux.Router
+ logger *logrus.Entry
+}
+
+func (h *handleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter, *http.Request)) {
+ h.logger.Debugf("Registering streaming handleFunc for path: %s", path)
+
+ if h.mw == nil || h.muxer == nil {
+ h.logger.Error("Middleware or muxer is nil")
+ return
+ }
+
+ h.sm.routeLock.Lock()
+ h.muxer.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
+ h.sm.activityCounter.Add(1)
+ defer h.sm.activityCounter.Add(-1)
+ f(w, r)
+ })
+ h.sm.routeLock.Unlock()
+ h.logger.Debugf("Registered handler for path: %s", path)
+}
+
+// Helper function to extract paths from an http_server configuration
+func extractPaths(httpConfig map[string]interface{}) []string {
+ var paths []string
+ defaultPaths := map[string]string{
+ "path": "/post",
+ "ws_path": "/post/ws",
+ "stream_path": "/get/stream",
+ }
+ for key, defaultValue := range defaultPaths {
+ if val, ok := httpConfig[key].(string); ok {
+ paths = append(paths, val)
+ } else {
+ paths = append(paths, defaultValue)
+ }
+ }
+ return paths
+}
+
+// extractHTTPServerPaths is a helper function to extract HTTP server paths from a given configuration.
+func extractHTTPServerPaths(config map[string]interface{}) []string {
+ if httpServerConfig, ok := config["http_server"].(map[string]interface{}); ok {
+ return extractPaths(httpServerConfig)
+ }
+ return nil
+}
+
+// handleBroker is a helper function to handle broker configurations.
+func handleBroker(brokerConfig map[string]interface{}) []string {
+ var paths []string
+ for _, ioKey := range []string{"inputs", "outputs"} {
+ if ioList, ok := brokerConfig[ioKey].([]interface{}); ok {
+ for _, ioItem := range ioList {
+ if ioItemMap, ok := ioItem.(map[string]interface{}); ok {
+ paths = append(paths, extractHTTPServerPaths(ioItemMap)...)
+ }
+ }
+ }
+ }
+ return paths
+}
+
+// GetHTTPPaths is the main function to get HTTP paths from the stream configuration.
+func GetHTTPPaths(streamConfig map[string]interface{}) []string {
+ var paths []string
+ for _, component := range []string{"input", "output"} {
+ if componentMap, ok := streamConfig[component].(map[string]interface{}); ok {
+ paths = append(paths, extractHTTPServerPaths(componentMap)...)
+ if brokerConfig, ok := componentMap["broker"].(map[string]interface{}); ok {
+ paths = append(paths, handleBroker(brokerConfig)...)
+ }
+ }
+ }
+ // remove duplicates
+ var deduplicated []string
+ exists := map[string]struct{}{}
+ for _, item := range paths {
+ if _, ok := exists[item]; !ok {
+ deduplicated = append(deduplicated, item)
+ exists[item] = struct{}{}
+ }
+ }
+ return deduplicated
+}
diff --git a/gateway/api_loader.go b/gateway/api_loader.go
index e8a77d54a8c..2d972b19b81 100644
--- a/gateway/api_loader.go
+++ b/gateway/api_loader.go
@@ -15,8 +15,6 @@ import (
"sync"
texttemplate "text/template"
- "github.com/TykTechnologies/tyk/rpc"
-
"github.com/gorilla/mux"
"github.com/justinas/alice"
"github.com/rs/cors"
@@ -24,9 +22,11 @@ import (
"github.com/TykTechnologies/tyk/apidef"
"github.com/TykTechnologies/tyk/coprocess"
- "github.com/TykTechnologies/tyk/internal/otel"
+ "github.com/TykTechnologies/tyk/rpc"
"github.com/TykTechnologies/tyk/storage"
"github.com/TykTechnologies/tyk/trace"
+
+ "github.com/TykTechnologies/tyk/internal/otel"
)
const (
@@ -426,7 +426,10 @@ func (gw *Gateway) processSpec(spec *APISpec, apisByListen map[string]int,
gw.mwAppendEnabled(&chainArray, &RateLimitForAPI{BaseMiddleware: baseMid})
gw.mwAppendEnabled(&chainArray, &GraphQLMiddleware{BaseMiddleware: baseMid})
- gw.mwAppendEnabled(&chainArray, &StreamingMiddleware{BaseMiddleware: baseMid})
+
+ if streamMw := getStreamingMiddleware(baseMid); streamMw != nil {
+ gw.mwAppendEnabled(&chainArray, streamMw)
+ }
if !spec.UseKeylessAccess {
gw.mwAppendEnabled(&chainArray, &GraphQLComplexityMiddleware{BaseMiddleware: baseMid})
diff --git a/gateway/gateway.go b/gateway/gateway.go
index da5d538ae43..cef70e752e3 100644
--- a/gateway/gateway.go
+++ b/gateway/gateway.go
@@ -1,17 +1,12 @@
package gateway
import (
- "github.com/TykTechnologies/tyk/internal/policy"
+ "github.com/TykTechnologies/tyk/internal/model"
"github.com/TykTechnologies/tyk/user"
)
-// Repository is a description of our Gateway API promises.
-type Repository interface {
- policy.Repository
-}
-
// Gateway implements the Repository interface.
-var _ Repository = &Gateway{}
+var _ model.Gateway = &Gateway{}
// PolicyIDs returns a list of IDs for each policy loaded in the gateway.
func (gw *Gateway) PolicyIDs() []string {
diff --git a/gateway/middleware.go b/gateway/middleware.go
index 1be31e4a414..55a68ff67fb 100644
--- a/gateway/middleware.go
+++ b/gateway/middleware.go
@@ -12,14 +12,6 @@ import (
"strconv"
"time"
- "github.com/TykTechnologies/tyk/internal/cache"
- "github.com/TykTechnologies/tyk/internal/event"
- "github.com/TykTechnologies/tyk/internal/otel"
- "github.com/TykTechnologies/tyk/internal/policy"
- "github.com/TykTechnologies/tyk/rpc"
-
- "github.com/TykTechnologies/tyk/header"
-
"github.com/gocraft/health"
"github.com/justinas/alice"
newrelic "github.com/newrelic/go-agent"
@@ -28,14 +20,21 @@ import (
"golang.org/x/sync/singleflight"
"github.com/TykTechnologies/tyk/apidef"
+ "github.com/TykTechnologies/tyk/header"
+ "github.com/TykTechnologies/tyk/internal/cache"
+ "github.com/TykTechnologies/tyk/internal/event"
+ "github.com/TykTechnologies/tyk/internal/middleware"
+ "github.com/TykTechnologies/tyk/internal/otel"
+ "github.com/TykTechnologies/tyk/internal/policy"
"github.com/TykTechnologies/tyk/request"
+ "github.com/TykTechnologies/tyk/rpc"
"github.com/TykTechnologies/tyk/storage"
"github.com/TykTechnologies/tyk/trace"
"github.com/TykTechnologies/tyk/user"
)
const (
- mwStatusRespond = 666
+ mwStatusRespond = middleware.StatusRespond
DEFAULT_ORG_SESSION_EXPIRATION = int64(604800)
)
@@ -45,9 +44,10 @@ var (
)
type TykMiddleware interface {
- Init()
Base() *BaseMiddleware
+ GetSpec() *APISpec
+ Init()
SetName(string)
SetRequestLogger(*http.Request)
Logger() *logrus.Entry
@@ -56,8 +56,6 @@ type TykMiddleware interface {
EnabledForSpec() bool
Name() string
- GetSpec() *APISpec
-
Unload()
}
diff --git a/gateway/middleware_wrap.go b/gateway/middleware_wrap.go
new file mode 100644
index 00000000000..0cd985d3ef1
--- /dev/null
+++ b/gateway/middleware_wrap.go
@@ -0,0 +1,54 @@
+package gateway
+
+import (
+ "net/http"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/TykTechnologies/tyk/internal/model"
+)
+
+type wrapMiddleware struct {
+ *BaseMiddleware
+ mw model.Middleware
+}
+
+var _ TykMiddleware = &wrapMiddleware{}
+
+// WrapMiddleware returns a new TykMiddleware with the provided base middleware,
+// and the smaller model.Middleware interface. It allows to implement model.Middleware,
+// and use it as a TykMiddleware.
+func WrapMiddleware(base *BaseMiddleware, in model.Middleware) TykMiddleware {
+ return &wrapMiddleware{
+ BaseMiddleware: base,
+ mw: in,
+ }
+}
+
+func (w *wrapMiddleware) Base() *BaseMiddleware {
+ return w.BaseMiddleware
+}
+
+func (w *wrapMiddleware) Config() (interface{}, error) {
+ return w.BaseMiddleware.Config()
+}
+
+func (w *wrapMiddleware) Init() {
+ w.mw.Init()
+}
+
+func (w *wrapMiddleware) Name() string {
+ return w.mw.Name()
+}
+
+func (s *wrapMiddleware) Logger() *logrus.Entry {
+ return s.mw.Logger()
+}
+
+func (w *wrapMiddleware) EnabledForSpec() bool {
+ return w.mw.EnabledForSpec()
+}
+
+func (w *wrapMiddleware) ProcessRequest(rw http.ResponseWriter, r *http.Request, data interface{}) (error, int) {
+ return w.mw.ProcessRequest(rw, r, data)
+}
diff --git a/gateway/mw_auth_key.go b/gateway/mw_auth_key.go
index aaf136ef778..25dd659468c 100644
--- a/gateway/mw_auth_key.go
+++ b/gateway/mw_auth_key.go
@@ -242,7 +242,7 @@ func (k *AuthKey) validateSignature(r *http.Request, key string) (error, int) {
return errors.New(errorMessage), errorCode
}
- secret := k.Gw.replaceTykVariables(r, authConfig.Signature.Secret, false)
+ secret := k.Gw.ReplaceTykVariables(r, authConfig.Signature.Secret, false)
if secret == "" {
logger.Info("Request signature secret not found or empty")
diff --git a/gateway/mw_graphql.go b/gateway/mw_graphql.go
index f2b83b8a17c..5f3e017ba3c 100644
--- a/gateway/mw_graphql.go
+++ b/gateway/mw_graphql.go
@@ -116,7 +116,7 @@ func (m *GraphQLMiddleware) Init() {
}
return body, nil
},
- TykVariableReplacer: m.Gw.replaceTykVariables,
+ TykVariableReplacer: m.Gw.ReplaceTykVariables,
},
})
} else if m.Spec.GraphQL.Version == apidef.GraphQLConfigVersion3Preview {
@@ -153,7 +153,7 @@ func (m *GraphQLMiddleware) Init() {
}
return body, nil
},
- TykVariableReplacer: m.Gw.replaceTykVariables,
+ TykVariableReplacer: m.Gw.ReplaceTykVariables,
},
})
if err != nil {
diff --git a/gateway/mw_modify_headers.go b/gateway/mw_modify_headers.go
index cc18f93f459..1d8a178f9e9 100644
--- a/gateway/mw_modify_headers.go
+++ b/gateway/mw_modify_headers.go
@@ -44,7 +44,7 @@ func (t *TransformHeaders) ProcessRequest(w http.ResponseWriter, r *http.Request
// Add
for nKey, nVal := range vInfo.GlobalHeaders {
t.Logger().Debug("Adding: ", nKey)
- setCustomHeader(r.Header, nKey, t.Gw.replaceTykVariables(r, nVal, false), ignoreCanonical)
+ setCustomHeader(r.Header, nKey, t.Gw.ReplaceTykVariables(r, nVal, false), ignoreCanonical)
}
}
@@ -56,7 +56,7 @@ func (t *TransformHeaders) ProcessRequest(w http.ResponseWriter, r *http.Request
r.Header.Del(dKey)
}
for nKey, nVal := range hmeta.AddHeaders {
- setCustomHeader(r.Header, nKey, t.Gw.replaceTykVariables(r, nVal, false), ignoreCanonical)
+ setCustomHeader(r.Header, nKey, t.Gw.ReplaceTykVariables(r, nVal, false), ignoreCanonical)
}
}
diff --git a/gateway/mw_persist_graphql_operation.go b/gateway/mw_persist_graphql_operation.go
index 79f73ac86cf..6cd2aa03c28 100644
--- a/gateway/mw_persist_graphql_operation.go
+++ b/gateway/mw_persist_graphql_operation.go
@@ -74,7 +74,7 @@ func (i *PersistGraphQLOperationMiddleware) ProcessRequest(w http.ResponseWriter
return ProxyingRequestFailedErr, http.StatusInternalServerError
}
- variablesStr := i.Gw.replaceTykVariables(r, string(varBytes), false)
+ variablesStr := i.Gw.ReplaceTykVariables(r, string(varBytes), false)
requestPathParts := strings.Split(r.RequestURI, "/")
for replacer, pathIndex := range replacers {
diff --git a/gateway/mw_rate_limiting.go b/gateway/mw_rate_limiting.go
index dd9ab045540..7b38971d05a 100644
--- a/gateway/mw_rate_limiting.go
+++ b/gateway/mw_rate_limiting.go
@@ -59,7 +59,7 @@ func (k *RateLimitAndQuotaCheck) ProcessRequest(w http.ResponseWriter, r *http.R
if pattern, found := session.MetaData["rate_limit_pattern"]; found {
if patternString, ok := pattern.(string); ok && patternString != "" {
- if customKeyValue := k.Gw.replaceTykVariables(r, patternString, false); customKeyValue != "" {
+ if customKeyValue := k.Gw.ReplaceTykVariables(r, patternString, false); customKeyValue != "" {
rateLimitKey = customKeyValue
quotaKey = customKeyValue
}
diff --git a/gateway/mw_streaming.go b/gateway/mw_streaming.go
index d24ddbe24b9..3131ce1b582 100644
--- a/gateway/mw_streaming.go
+++ b/gateway/mw_streaming.go
@@ -1,473 +1,34 @@
+//go:build !ee && !dev
+
+// Provides getStreamingMiddleware
package gateway
import (
- "context"
- "crypto/sha256"
- "encoding/json"
- "errors"
- "fmt"
"net/http"
- "net/url"
- "strings"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/sirupsen/logrus"
-
- "github.com/gorilla/mux"
-
- "github.com/TykTechnologies/tyk/internal/streaming"
)
-const (
- // ExtensionTykStreaming is the oas extension for tyk streaming
- ExtensionTykStreaming = "x-tyk-streaming"
- StreamGCInterval = 1 * time.Minute
-)
-
-// StreamsConfig represents a stream configuration
-type StreamsConfig struct {
- Info struct {
- Version string `json:"version"`
- } `json:"info"`
- Streams map[string]any `json:"streams"`
+func getStreamingMiddleware(base *BaseMiddleware) TykMiddleware {
+ return &dummyStreamingMiddleware{base}
}
-// Used for testing
-var globalStreamCounter atomic.Int64
-
-// StreamingMiddleware is a middleware that handles streaming functionality
-type StreamingMiddleware struct {
+type dummyStreamingMiddleware struct {
*BaseMiddleware
-
- createStreamManagerLock sync.Mutex
- streamManagerCache sync.Map // Map of payload hash to StreamManager
- ctx context.Context
- cancel context.CancelFunc
- allowedUnsafe []string
- defaultStreamManager *StreamManager
-}
-
-// StreamManager is responsible for creating a single stream
-type StreamManager struct {
- streams sync.Map
- routeLock sync.Mutex
- muxer *mux.Router
- mw *StreamingMiddleware
- dryRun bool
- listenPaths []string
-
- activityCounter atomic.Int32 // Counts active subscriptions, requests.
-}
-
-func (sm *StreamManager) initStreams(r *http.Request, config *StreamsConfig) {
- // Clear existing routes for this consumer group
- sm.muxer = mux.NewRouter()
-
- for streamID, streamConfig := range config.Streams {
- sm.setUpOrDryRunStream(streamConfig, streamID)
- }
-
- // If it is default stream manager, init muxer
- if r == nil {
- for _, path := range sm.listenPaths {
- sm.muxer.HandleFunc(path, func(_ http.ResponseWriter, _ *http.Request) {
- // Dummy handler
- })
- }
- }
-}
-
-func (sm *StreamManager) setUpOrDryRunStream(streamConfig any, streamID string) {
- if streamMap, ok := streamConfig.(map[string]interface{}); ok {
- httpPaths := GetHTTPPaths(streamMap)
-
- if sm.dryRun {
- if len(httpPaths) == 0 {
- err := sm.createStream(streamID, streamMap)
- if err != nil {
- sm.mw.Logger().WithError(err).Errorf("Error creating stream %s", streamID)
- }
- }
- } else {
- err := sm.createStream(streamID, streamMap)
- if err != nil {
- sm.mw.Logger().WithError(err).Errorf("Error creating stream %s", streamID)
- }
- }
- sm.listenPaths = append(sm.listenPaths, httpPaths...)
- }
-}
-
-// removeStream removes a stream
-func (sm *StreamManager) removeStream(streamID string) error {
- streamFullID := fmt.Sprintf("%s_%s", sm.mw.Spec.APIID, streamID)
-
- if streamValue, exists := sm.streams.Load(streamFullID); exists {
- stream, ok := streamValue.(*streaming.Stream)
- if !ok {
- return fmt.Errorf("stream %s is not a valid stream", streamID)
- }
- err := stream.Stop()
- if err != nil {
- return err
- }
- sm.streams.Delete(streamFullID)
- } else {
- return fmt.Errorf("stream %s does not exist", streamID)
- }
- return nil
-}
-
-func (s *StreamingMiddleware) garbageCollect() {
- s.Logger().Debug("Starting garbage collection for inactive stream managers")
-
- s.streamManagerCache.Range(func(key, value interface{}) bool {
- manager := value.(*StreamManager)
- if manager == s.defaultStreamManager {
- return true
- }
-
- if manager.activityCounter.Load() <= 0 {
- s.Logger().Infof("Removing inactive stream manager: %v", key)
- manager.streams.Range(func(streamKey, streamValue interface{}) bool {
- streamID := streamKey.(string)
- err := manager.removeStream(streamID)
- if err != nil {
- s.Logger().Errorf("Error removing stream %s: %v", streamID, err)
- }
- return true
- })
- s.streamManagerCache.Delete(key)
- }
-
- return true
- })
}
-// Name is StreamingMiddleware
-func (s *StreamingMiddleware) Name() string {
- return "StreamingMiddleware"
+func (d *dummyStreamingMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int) {
+ return nil, http.StatusOK
}
-// EnabledForSpec checks if streaming is enabled on the config
-func (s *StreamingMiddleware) EnabledForSpec() bool {
- s.Logger().Debug("Checking if streaming is enabled")
-
- streamingConfig := s.Gw.GetConfig().Streaming
- s.Logger().Debugf("Streaming config: %+v", streamingConfig)
+func (d *dummyStreamingMiddleware) EnabledForSpec() bool {
+ streamingConfig := d.Gw.GetConfig().Streaming
if streamingConfig.Enabled {
- s.Logger().Debug("Streaming is enabled in the config")
- s.allowedUnsafe = streamingConfig.AllowUnsafe
- s.Logger().Debugf("Allowed unsafe components: %v", s.allowedUnsafe)
-
- config := s.getStreamsConfig(nil)
- globalStreamCounter.Add(int64(len(config.Streams)))
-
- s.Logger().Debug("Total streams count: ", len(config.Streams))
-
- return len(config.Streams) != 0
+ d.Logger().Error("Error: Streaming is supported only in Tyk Enterprise Edition")
}
- s.Logger().Debug("Streaming is not enabled in the config")
return false
}
-// Init initializes the middleware
-func (s *StreamingMiddleware) Init() {
- s.Logger().Debug("Initializing StreamingMiddleware")
- s.ctx, s.cancel = context.WithCancel(context.Background())
-
- s.Logger().Debug("Initializing default stream manager")
- s.defaultStreamManager = s.createStreamManager(nil)
-
- // Start garbage collection routine
- go func() {
- ticker := time.NewTicker(StreamGCInterval)
- defer ticker.Stop()
-
- for {
- select {
- case <-ticker.C:
- s.garbageCollect()
- case <-s.ctx.Done():
- return
- }
- }
- }()
-}
-
-func (s *StreamingMiddleware) createStreamManager(r *http.Request) *StreamManager {
- streamsConfig := s.getStreamsConfig(r)
- configJSON, _ := json.Marshal(streamsConfig)
- cacheKey := fmt.Sprintf("%x", sha256.Sum256(configJSON))
-
- // Critical section starts here
- // This section is called by ProcessRequest method of the middleware implementation
- // Concurrent requests can call this method at the same time and those requests
- // creates new StreamManagers and store them concurrently, as a result
- // the returned stream manager has overwritten by a different one by leaking
- // the previously stored StreamManager.
- s.createStreamManagerLock.Lock()
- defer s.createStreamManagerLock.Unlock()
-
- s.Logger().Debug("Attempting to load stream manager from cache")
- s.Logger().Debugf("Cache key: %s", cacheKey)
- if cachedManager, found := s.streamManagerCache.Load(cacheKey); found {
- s.Logger().Debug("Found cached stream manager")
- return cachedManager.(*StreamManager)
- }
-
- newStreamManager := &StreamManager{
- muxer: mux.NewRouter(),
- mw: s,
- dryRun: r == nil,
- activityCounter: atomic.Int32{},
- }
- newStreamManager.initStreams(r, streamsConfig)
-
- if r != nil {
- s.streamManagerCache.Store(cacheKey, newStreamManager)
- }
- return newStreamManager
-}
-
-// Helper function to extract paths from an http_server configuration
-func extractPaths(httpConfig map[string]interface{}) []string {
- var paths []string
- defaultPaths := map[string]string{
- "path": "/post",
- "ws_path": "/post/ws",
- "stream_path": "/get/stream",
- }
- for key, defaultValue := range defaultPaths {
- if val, ok := httpConfig[key].(string); ok {
- paths = append(paths, val)
- } else {
- paths = append(paths, defaultValue)
- }
- }
- return paths
-}
-
-// Helper function to extract HTTP server paths from a given configuration
-func extractHTTPServerPaths(config map[string]interface{}) []string {
- if httpServerConfig, ok := config["http_server"].(map[string]interface{}); ok {
- return extractPaths(httpServerConfig)
- }
- return nil
-}
-
-// Helper function to handle broker configurations
-func handleBroker(brokerConfig map[string]interface{}) []string {
- var paths []string
- for _, ioKey := range []string{"inputs", "outputs"} {
- if ioList, ok := brokerConfig[ioKey].([]interface{}); ok {
- for _, ioItem := range ioList {
- if ioItemMap, ok := ioItem.(map[string]interface{}); ok {
- paths = append(paths, extractHTTPServerPaths(ioItemMap)...)
- }
- }
- }
- }
- return paths
-}
-
-// GetHTTPPaths is the ain function to get HTTP paths from the stream configuration
-func GetHTTPPaths(streamConfig map[string]interface{}) []string {
- var paths []string
- for _, component := range []string{"input", "output"} {
- if componentMap, ok := streamConfig[component].(map[string]interface{}); ok {
- paths = append(paths, extractHTTPServerPaths(componentMap)...)
- if brokerConfig, ok := componentMap["broker"].(map[string]interface{}); ok {
- paths = append(paths, handleBroker(brokerConfig)...)
- }
- }
- }
- // remove duplicates
- var deduplicated []string
- exists := map[string]struct{}{}
- for _, item := range paths {
- if _, ok := exists[item]; !ok {
- deduplicated = append(deduplicated, item)
- exists[item] = struct{}{}
- }
- }
- return deduplicated
-}
-
-func (s *StreamingMiddleware) getStreamsConfig(r *http.Request) *StreamsConfig {
- config := &StreamsConfig{Streams: make(map[string]any)}
- if !s.Spec.IsOAS {
- return config
- }
-
- extension, ok := s.Spec.OAS.T.Extensions[ExtensionTykStreaming]
- if !ok {
- return config
- }
-
- if streamsMap, ok := extension.(map[string]any); ok {
- if streams, ok := streamsMap["streams"].(map[string]any); ok {
- s.processStreamsConfig(r, streams, config)
- }
- }
-
- return config
-}
-
-func (s *StreamingMiddleware) processStreamsConfig(r *http.Request, streams map[string]any, config *StreamsConfig) {
- for streamID, stream := range streams {
- if r == nil {
- s.Logger().Debugf("No request available to replace variables in stream config for %s", streamID)
- } else {
- s.Logger().Debugf("Stream config for %s: %v", streamID, stream)
- marshaledStream, err := json.Marshal(stream)
- if err != nil {
- s.Logger().Errorf("Failed to marshal stream config: %v", err)
- continue
- }
- replacedStream := s.Gw.replaceTykVariables(r, string(marshaledStream), true)
-
- if replacedStream != string(marshaledStream) {
- s.Logger().Debugf("Stream config changed for %s: %s", streamID, replacedStream)
- } else {
- s.Logger().Debugf("Stream config has not changed for %s: %s", streamID, replacedStream)
- }
-
- var unmarshaledStream map[string]interface{}
- err = json.Unmarshal([]byte(replacedStream), &unmarshaledStream)
- if err != nil {
- s.Logger().Errorf("Failed to unmarshal replaced stream config: %v", err)
- continue
- }
- stream = unmarshaledStream
- }
- config.Streams[streamID] = stream
- }
-}
-
-// createStream creates a new stream
-func (sm *StreamManager) createStream(streamID string, config map[string]interface{}) error {
- streamFullID := fmt.Sprintf("%s_%s", sm.mw.Spec.APIID, streamID)
- sm.mw.Logger().Debugf("Creating stream: %s", streamFullID)
-
- stream := streaming.NewStream(sm.mw.allowedUnsafe)
- err := stream.Start(config, &handleFuncAdapter{
- mw: sm.mw,
- streamID: streamFullID,
- muxer: sm.muxer,
- sm: sm,
- // child logger is necessary to prevent race condition
- logger: sm.mw.Logger().WithField("stream", streamFullID),
- })
- if err != nil {
- sm.mw.Logger().Errorf("Failed to start stream %s: %v", streamFullID, err)
- return err
- }
-
- sm.streams.Store(streamFullID, stream)
- sm.mw.Logger().Infof("Successfully created stream: %s", streamFullID)
-
- return nil
-}
-
-func (sm *StreamManager) hasPath(path string) bool {
- for _, p := range sm.listenPaths {
- if strings.TrimPrefix(path, "/") == strings.TrimPrefix(p, "/") {
- return true
- }
- }
- return false
-}
-
-// ProcessRequest will handle the streaming functionality
-func (s *StreamingMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int) {
- strippedPath := s.Spec.StripListenPath(r.URL.Path)
- if !s.defaultStreamManager.hasPath(strippedPath) {
- return nil, http.StatusOK
- }
-
- s.Logger().Debugf("Processing request: %s, %s", r.URL.Path, strippedPath)
-
- newRequest := &http.Request{
- Method: r.Method,
- URL: &url.URL{Scheme: r.URL.Scheme, Host: r.URL.Host, Path: strippedPath},
- }
-
- if !s.defaultStreamManager.muxer.Match(newRequest, &mux.RouteMatch{}) {
- return nil, http.StatusOK
- }
-
- var match mux.RouteMatch
- streamManager := s.createStreamManager(r)
- streamManager.routeLock.Lock()
- streamManager.muxer.Match(newRequest, &match)
- streamManager.routeLock.Unlock()
-
- // direct Bento handler
- handler, ok := match.Handler.(http.HandlerFunc)
- if !ok {
- return errors.New("invalid route handler"), http.StatusInternalServerError
- }
-
- handler.ServeHTTP(w, r)
-
- return nil, mwStatusRespond
-}
-
-// Unload closes and remove active streams
-func (s *StreamingMiddleware) Unload() {
- s.Logger().Debugf("Unloading streaming middleware %s", s.Spec.Name)
-
- totalStreams := 0
- s.cancel()
-
- s.Logger().Debug("Closing active streams")
- s.streamManagerCache.Range(func(_, value interface{}) bool {
- manager := value.(*StreamManager)
- manager.streams.Range(func(_, streamValue interface{}) bool {
- totalStreams++
- if stream, ok := streamValue.(*streaming.Stream); ok {
- if err := stream.Reset(); err != nil {
- return true
- }
- }
- return true
- })
- return true
- })
-
- globalStreamCounter.Add(-int64(totalStreams))
- s.streamManagerCache = sync.Map{}
-
- s.Logger().Info("All streams successfully removed")
-}
-
-type handleFuncAdapter struct {
- streamID string
- sm *StreamManager
- mw *StreamingMiddleware
- muxer *mux.Router
- logger *logrus.Entry
-}
-
-func (h *handleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter, *http.Request)) {
- h.logger.Debugf("Registering streaming handleFunc for path: %s", path)
-
- if h.mw == nil || h.muxer == nil {
- h.logger.Error("StreamingMiddleware or muxer is nil")
- return
- }
-
- h.sm.routeLock.Lock()
- h.muxer.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
- h.sm.activityCounter.Add(1)
- f(w, r)
- h.sm.activityCounter.Add(-1)
- })
- h.sm.routeLock.Unlock()
- h.logger.Debugf("Registered handler for path: %s", path)
+func (d *dummyStreamingMiddleware) Name() string {
+ return "StreamingMiddleware"
}
diff --git a/gateway/mw_streaming_ee.go b/gateway/mw_streaming_ee.go
new file mode 100644
index 00000000000..64ee4ffedb7
--- /dev/null
+++ b/gateway/mw_streaming_ee.go
@@ -0,0 +1,15 @@
+//go:build ee || dev
+
+// Provides StreamingMiddleware
+package gateway
+
+import (
+ "github.com/TykTechnologies/tyk/ee/middleware/streams"
+)
+
+func getStreamingMiddleware(baseMid *BaseMiddleware) TykMiddleware {
+ spec := baseMid.Spec
+ streamSpec := streams.NewAPISpec(spec.APIID, spec.Name, spec.IsOAS, spec.OAS, spec.StripListenPath)
+ streamMw := streams.NewMiddleware(baseMid.Gw, baseMid, streamSpec)
+ return WrapMiddleware(baseMid, streamMw)
+}
diff --git a/gateway/mw_streaming_test.go b/gateway/mw_streaming_test.go
index 8870d565d21..02377eaf9a2 100644
--- a/gateway/mw_streaming_test.go
+++ b/gateway/mw_streaming_test.go
@@ -1,3 +1,5 @@
+//go:build ee || dev
+
package gateway
import (
@@ -13,6 +15,7 @@ import (
"time"
"github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
@@ -28,6 +31,8 @@ import (
"github.com/TykTechnologies/tyk/apidef/oas"
"github.com/TykTechnologies/tyk/config"
+ "github.com/TykTechnologies/tyk/ee/middleware/streams"
+ "github.com/TykTechnologies/tyk/internal/model"
"github.com/TykTechnologies/tyk/test"
)
@@ -88,7 +93,7 @@ output:
t.Run(tc.name, func(t *testing.T) {
config, err := yamlConfigToMap(tc.configYaml)
require.NoError(t, err)
- httpPaths := GetHTTPPaths(config)
+ httpPaths := streams.GetHTTPPaths(config)
assert.ElementsMatch(t, tc.expected, httpPaths)
})
}
@@ -145,8 +150,6 @@ streams:
level: DEBUG
format: logfmt
add_timestamp: false
- static_fields:
- '@service': benthos
`
const bentoHTTPServerTemplate = `
streams:
@@ -200,6 +203,9 @@ func TestStreamingAPISingleClient(t *testing.T) {
}
wsURL := strings.Replace(ts.URL, "http", "ws", 1) + fmt.Sprintf("/%s/get/ws", apiName)
+
+ println("wsURL:", wsURL)
+
wsConn, _, err := dialer.Dial(wsURL, nil)
require.NoError(t, err, "failed to connect to ws server")
t.Cleanup(func() {
@@ -354,7 +360,7 @@ func setupOASForStreamAPI(streamingConfig string) (oas.OAS, error) {
}
oasAPI.Extensions = map[string]interface{}{
- ExtensionTykStreaming: parsedStreamingConfig,
+ streams.ExtensionTykStreaming: parsedStreamingConfig,
}
return oasAPI, nil
@@ -377,11 +383,7 @@ func TestAsyncAPI(t *testing.T) {
t.SkipNow()
ts := StartTest(func(globalConf *config.Config) {
- globalConf.Labs = map[string]interface{}{
- "streaming": map[string]interface{}{
- "enabled": true,
- },
- }
+ globalConf.Streaming.Enabled = true
})
ts.Gw.BuildAndLoadAPI(func(spec *APISpec) {
@@ -446,7 +448,7 @@ streams:
}
oasAPI.Extensions = map[string]interface{}{
- ExtensionTykStreaming: parsedStreamingConfig,
+ streams.ExtensionTykStreaming: parsedStreamingConfig,
// oas.ExtensionTykAPIGateway: tykExtension,
}
@@ -468,8 +470,8 @@ streams:
// Check that standard API still works
_, _ = ts.Run(t, test.TestCase{Code: http.StatusOK, Method: http.MethodGet, Path: "/test"})
- if globalStreamCounter.Load() != 1 {
- t.Fatalf("Expected 1 stream, got %d", globalStreamCounter.Load())
+ if streams.GlobalStreamCounter.Load() != 1 {
+ t.Fatalf("Expected 1 stream, got %d", streams.GlobalStreamCounter.Load())
}
time.Sleep(500 * time.Millisecond)
@@ -592,7 +594,7 @@ streams:
}
oasAPI.Extensions = map[string]interface{}{
- ExtensionTykStreaming: parsedStreamingConfig,
+ streams.ExtensionTykStreaming: parsedStreamingConfig,
}
return oasAPI
@@ -604,7 +606,7 @@ func testAsyncAPIHttp(t *testing.T, ts *Test, isDynamic bool, tenantID string, a
const numMessages = 2
const numClients = 2
- streamCount := globalStreamCounter.Load()
+ streamCount := streams.GlobalStreamCounter.Load()
t.Logf("Stream count for tenant %s: %d", tenantID, streamCount)
// Create WebSocket clients
@@ -755,11 +757,7 @@ func TestWebSocketConnectionClosedOnAPIReload(t *testing.T) {
}
ts := StartTest(func(globalConf *config.Config) {
- globalConf.Labs = map[string]interface{}{
- "streaming": map[string]interface{}{
- "enabled": true,
- },
- }
+ globalConf.Streaming.Enabled = true
})
defer ts.Close()
@@ -851,6 +849,7 @@ func TestStreamingAPISingleClient_Input_HTTPServer(t *testing.T) {
require.NoError(t, err, "error setting read deadline")
for i := 0; i < totalMessages; i++ {
+ println("reading message", i)
_, p, err := wsConn.ReadMessage()
require.NoError(t, err, "error reading message")
assert.Equal(t, fmt.Sprintf("{\"test\": \"message %d\"}", i), string(p), "message not equal")
@@ -935,6 +934,14 @@ func TestStreamingAPIMultipleClients_Input_HTTPServer(t *testing.T) {
require.Empty(t, messages)
}
+type DummyBase struct {
+ model.LoggerProvider
+}
+
+func (d *DummyBase) Logger() *logrus.Entry {
+ return logrus.NewEntry(logrus.New())
+}
+
func TestStreamingAPIGarbageCollection(t *testing.T) {
ts := StartTest(func(globalConf *config.Config) {
globalConf.Streaming.Enabled = true
@@ -956,8 +963,9 @@ func TestStreamingAPIGarbageCollection(t *testing.T) {
spec.OAS.Fill(*spec.APIDefinition)
})
- baseMiddleware := &BaseMiddleware{Gw: ts.Gw, Spec: specs[0]}
- s := StreamingMiddleware{BaseMiddleware: baseMiddleware}
+ apiSpec := streams.NewAPISpec(specs[0].APIID, specs[0].Name, specs[0].IsOAS, specs[0].OAS, specs[0].StripListenPath)
+
+ s := streams.NewMiddleware(ts.Gw, &DummyBase{}, apiSpec)
if err := setUpStreamAPI(ts, apiName, bentoHTTPServerTemplate); err != nil {
t.Fatal(err)
@@ -968,21 +976,21 @@ func TestStreamingAPIGarbageCollection(t *testing.T) {
r, err := http.NewRequest("POST", publishURL, nil)
require.NoError(t, err)
- s.createStreamManager(r)
+ s.CreateStreamManager(r)
// We should have a Stream manager in the cache.
var streamManagersBeforeGC int
- s.streamManagerCache.Range(func(k, v interface{}) bool {
+ s.StreamManagerCache.Range(func(k, v interface{}) bool {
streamManagersBeforeGC++
return true
})
require.Equal(t, 1, streamManagersBeforeGC)
- s.garbageCollect()
+ s.GC()
// Garbage collection removed the stream manager because the activity counter is zero.
var streamManagersAfterGC int
- s.streamManagerCache.Range(func(k, v interface{}) bool {
+ s.StreamManagerCache.Range(func(k, v interface{}) bool {
streamManagersAfterGC++
return true
})
diff --git a/gateway/mw_transform.go b/gateway/mw_transform.go
index 341401746ed..e803f8421dc 100644
--- a/gateway/mw_transform.go
+++ b/gateway/mw_transform.go
@@ -107,7 +107,7 @@ func transformBody(r *http.Request, tmeta *TransformSpec, t *TransformMiddleware
return fmt.Errorf("failed to apply template to request: %w", err)
}
- s := t.Gw.replaceTykVariables(r, bodyBuffer.String(), true)
+ s := t.Gw.ReplaceTykVariables(r, bodyBuffer.String(), true)
newBuf := bytes.NewBufferString(s)
diff --git a/gateway/mw_url_rewrite.go b/gateway/mw_url_rewrite.go
index b3e75f37505..019ce7a1346 100644
--- a/gateway/mw_url_rewrite.go
+++ b/gateway/mw_url_rewrite.go
@@ -205,12 +205,16 @@ func (gw *Gateway) urlRewrite(meta *apidef.URLRewriteMeta, r *http.Request) (str
ctxSetUrlRewritePath(r, meta.Path)
}
- newpath = gw.replaceTykVariables(r, newpath, true)
+ newpath = gw.ReplaceTykVariables(r, newpath, true)
return newpath, nil
}
-func (gw *Gateway) replaceTykVariables(r *http.Request, in string, escape bool) string {
+// ReplaceTykVariables implements a variable replacement hook. It will replace
+// the template `in`. If `escape` is true, the values get escaped as a query
+// parameter for a HTTP request would. If no replacement has been made, `in`
+// is returned without modification.
+func (gw *Gateway) ReplaceTykVariables(r *http.Request, in string, escape bool) string {
if strings.Contains(in, secretsConfLabel) {
contextData := ctxGetData(r)
diff --git a/gateway/res_handler_header_injector.go b/gateway/res_handler_header_injector.go
index 4587e941f56..9b27089c74f 100644
--- a/gateway/res_handler_header_injector.go
+++ b/gateway/res_handler_header_injector.go
@@ -62,7 +62,7 @@ func (h *HeaderInjector) HandleResponse(rw http.ResponseWriter, res *http.Respon
res.Header.Del(dKey)
}
for nKey, nVal := range hmeta.AddHeaders {
- setCustomHeader(res.Header, nKey, h.Gw.replaceTykVariables(req, nVal, false), ignoreCanonical)
+ setCustomHeader(res.Header, nKey, h.Gw.ReplaceTykVariables(req, nVal, false), ignoreCanonical)
}
}
@@ -75,7 +75,7 @@ func (h *HeaderInjector) HandleResponse(rw http.ResponseWriter, res *http.Respon
for key, val := range vInfo.GlobalResponseHeaders {
log.Debug("Adding: ", key)
- setCustomHeader(res.Header, key, h.Gw.replaceTykVariables(req, val, false), ignoreCanonical)
+ setCustomHeader(res.Header, key, h.Gw.ReplaceTykVariables(req, val, false), ignoreCanonical)
}
// Manage global response header options with response_processors
@@ -83,7 +83,7 @@ func (h *HeaderInjector) HandleResponse(rw http.ResponseWriter, res *http.Respon
res.Header.Del(n)
}
for header, v := range h.config.AddHeaders {
- setCustomHeader(res.Header, header, h.Gw.replaceTykVariables(req, v, false), ignoreCanonical)
+ setCustomHeader(res.Header, header, h.Gw.ReplaceTykVariables(req, v, false), ignoreCanonical)
}
}
diff --git a/gateway/rpc_storage_handler.go b/gateway/rpc_storage_handler.go
index dcaefc212fd..1829c9e189c 100644
--- a/gateway/rpc_storage_handler.go
+++ b/gateway/rpc_storage_handler.go
@@ -1,3 +1,4 @@
+//nolint:revive
package gateway
import (
diff --git a/gateway/rpc_storage_handler_test.go b/gateway/rpc_storage_handler_test.go
index b063d9c08a6..23f4aebf2cf 100644
--- a/gateway/rpc_storage_handler_test.go
+++ b/gateway/rpc_storage_handler_test.go
@@ -8,15 +8,13 @@ import (
"net/http"
"testing"
- "github.com/TykTechnologies/tyk/internal/model"
- "github.com/TykTechnologies/tyk/rpc"
-
- "github.com/TykTechnologies/tyk/config"
-
"github.com/lonelycode/osin"
"github.com/stretchr/testify/assert"
+ "github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/header"
+ "github.com/TykTechnologies/tyk/internal/model"
+ "github.com/TykTechnologies/tyk/rpc"
"github.com/TykTechnologies/tyk/storage"
"github.com/TykTechnologies/tyk/test"
"github.com/TykTechnologies/tyk/user"
diff --git a/gateway/rpc_test.go b/gateway/rpc_test.go
index 19372710d22..ff5b7959507 100644
--- a/gateway/rpc_test.go
+++ b/gateway/rpc_test.go
@@ -1,3 +1,4 @@
+//nolint:revive
package gateway
import (
diff --git a/go.mod b/go.mod
index 270ef0bd192..d4758961558 100644
--- a/go.mod
+++ b/go.mod
@@ -506,9 +506,3 @@ require (
modernc.org/token v1.1.0 // indirect
nhooyr.io/websocket v1.8.10 // indirect
)
-
-//replace github.com/TykTechnologies/graphql-go-tools => ../graphql-go-tools
-
-//replace github.com/TykTechnologies/graphql-go-tools/v2 => ../graphql-go-tools/v2
-//
-//replace github.com/warpstreamlabs/bento => ../benthos
diff --git a/internal/middleware/const.go b/internal/middleware/const.go
new file mode 100644
index 00000000000..0b26e05ad56
--- /dev/null
+++ b/internal/middleware/const.go
@@ -0,0 +1,5 @@
+package middleware
+
+// StatusRespond should be returned by a middleware to stop processing
+// further middleware from the middleware chain.
+const StatusRespond = 666
diff --git a/internal/model/interfaces.go b/internal/model/interfaces.go
new file mode 100644
index 00000000000..db761b30255
--- /dev/null
+++ b/internal/model/interfaces.go
@@ -0,0 +1,66 @@
+package model
+
+import (
+ "net/http"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/TykTechnologies/tyk/config"
+ "github.com/TykTechnologies/tyk/user"
+)
+
+// Gateway is a collection of well defined gateway interfaces. It should only
+// be implemented in full by gateway.Gateway, and is used for a built-time
+// type assertion. Do not use the symbol elsewhere, use the smaller interfaces.
+type Gateway interface {
+ ConfigProvider
+ PolicyProvider
+
+ ReplaceTykVariables
+}
+
+// Middleware is a subset of the gateway.Middleware interface, that can be
+// implemented outside of gateway scope.
+type Middleware interface {
+ Init()
+ Name() string
+ Logger() *logrus.Entry
+ ProcessRequest(w http.ResponseWriter, r *http.Request, conf interface{}) (error, int) // Handles request
+ EnabledForSpec() bool
+}
+
+// LoggerProvider returns a new *logrus.Entry for the request.
+// It's implemented by gateway and middleware. Middleware typically
+// adds the `mw` field with the middleware name.
+type LoggerProvider interface {
+ Logger() *logrus.Entry
+}
+
+// ConfigProvider provides a typical config getter signature.
+type ConfigProvider interface {
+ GetConfig() config.Config
+}
+
+// PolicyProvider is a storage interface encapsulating policy retrieval.
+type PolicyProvider interface {
+ PolicyCount() int
+ PolicyIDs() []string
+ PolicyByID(string) (user.Policy, bool)
+}
+
+// These are utility methods without any real data model design around them.
+type (
+ // ReplaceTykVariables is a request-based template replacement hook.
+ // Implemented by gateway.Gateway.
+ ReplaceTykVariables interface {
+ ReplaceTykVariables(r *http.Request, in string, escape bool) string
+ }
+
+ // StripListenPath is the interface implemented by APISpec.StripListenPath.
+ StripListenPath interface {
+ StripListenPath(string) string
+ }
+
+ // StripListenPathFunc is the function signature for StripListenPath.
+ StripListenPathFunc func(string) string
+)
diff --git a/internal/policy/apply.go b/internal/policy/apply.go
index aa811f05ba8..380a34922e0 100644
--- a/internal/policy/apply.go
+++ b/internal/policy/apply.go
@@ -6,6 +6,7 @@ import (
"github.com/sirupsen/logrus"
+ "github.com/TykTechnologies/tyk/internal/model"
"github.com/TykTechnologies/tyk/user"
)
@@ -14,25 +15,16 @@ var (
ErrMixedPartitionAndPerAPIPolicies = errors.New("cannot apply multiple policies when some have per_api set and some are partitioned")
)
-// Repository is a storage encapsulating policy retrieval.
-// Gateway implements this object to decouple this package.
-type Repository interface {
- PolicyCount() int
- PolicyIDs() []string
- PolicyByID(string) (user.Policy, bool)
-}
-
-// Service represents the policy service for gateway.
+// Service represents the implementation for apply policies logic.
type Service struct {
- storage Repository
+ storage model.PolicyProvider
logger *logrus.Logger
// used for validation if not empty
orgID *string
}
-// New creates a new policy.Service object.
-func New(orgID *string, storage Repository, logger *logrus.Logger) *Service {
+func New(orgID *string, storage model.PolicyProvider, logger *logrus.Logger) *Service {
return &Service{
orgID: orgID,
storage: storage,