Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat [sc 106247]: Add a caching layer to prevent SDK from sending the same payloads too frequently upstream #193

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func Start(params APIServerParams) {
r := mux.NewRouter()
r.Use(handlers.CorsMiddleware)

const DefaultCacheTTL = 1 * time.Minute

// TODO: make all routes authenticated
authRouter := r.NewRoute().Subrouter()
authRouter.Use(handlers.RequireValidLicenseIDMiddleware)
Expand All @@ -66,9 +68,9 @@ func Start(params APIServerParams) {
r.HandleFunc("/api/v1/app/info", handlers.GetCurrentAppInfo).Methods("GET")
r.HandleFunc("/api/v1/app/updates", handlers.GetAppUpdates).Methods("GET")
r.HandleFunc("/api/v1/app/history", handlers.GetAppHistory).Methods("GET")
r.HandleFunc("/api/v1/app/custom-metrics", handlers.SendCustomAppMetrics).Methods("POST", "PATCH")
r.HandleFunc("/api/v1/app/custom-metrics/{key}", handlers.DeleteCustomAppMetricsKey).Methods("DELETE")
r.HandleFunc("/api/v1/app/instance-tags", handlers.SendAppInstanceTags).Methods("POST")
r.HandleFunc("/api/v1/app/custom-metrics", handlers.CacheMiddleware(handlers.SendCustomAppMetrics, DefaultCacheTTL)).Methods("POST", "PATCH")
divolgin marked this conversation as resolved.
Show resolved Hide resolved
r.HandleFunc("/api/v1/app/custom-metrics/{key}", handlers.CacheMiddleware(handlers.DeleteCustomAppMetricsKey, DefaultCacheTTL)).Methods("DELETE")
r.HandleFunc("/api/v1/app/instance-tags", handlers.CacheMiddleware(handlers.SendAppInstanceTags, DefaultCacheTTL)).Methods("POST")

// integration
r.HandleFunc("/api/v1/integration/mock-data", handlers.EnforceMockAccess(handlers.PostIntegrationMockData)).Methods("POST")
Expand Down
137 changes: 137 additions & 0 deletions pkg/handlers/middleware.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package handlers

import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"sync"
"time"

"github.com/pkg/errors"
"github.com/replicatedhq/replicated-sdk/pkg/handlers/types"
"github.com/replicatedhq/replicated-sdk/pkg/logger"
"github.com/replicatedhq/replicated-sdk/pkg/store"
)

Expand Down Expand Up @@ -44,3 +54,130 @@ func RequireValidLicenseIDMiddleware(next http.Handler) http.Handler {
next.ServeHTTP(w, r)
})
}

// Code for the cache middleware
type CacheEntry struct {
RequestBody []byte
ResponseBody []byte
StatusCode int
Expiry time.Time
}

type cache struct {
store map[string]CacheEntry
mu sync.RWMutex
}

func NewCache() *cache {
return &cache{
store: map[string]CacheEntry{},
}
}

func (c *cache) Get(key string) (CacheEntry, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

entry, found := c.store[key]
if !found || time.Now().After(entry.Expiry) {
return CacheEntry{}, false
}
return entry, true
}

func (c *cache) Set(key string, entry CacheEntry, duration time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

// Clean up expired entries
divolgin marked this conversation as resolved.
Show resolved Hide resolved
for k, v := range c.store {
if time.Now().After(v.Expiry) {
delete(c.store, k)
}
}

entry.Expiry = time.Now().Add(duration)
c.store[key] = entry
divolgin marked this conversation as resolved.
Show resolved Hide resolved
}

type responseRecorder struct {
http.ResponseWriter
Body *bytes.Buffer
StatusCode int
}

func (r *responseRecorder) WriteHeader(code int) {
r.StatusCode = code
r.ResponseWriter.WriteHeader(code)
}

func (r *responseRecorder) Write(b []byte) (int, error) {
r.Body.Write(b)
return r.ResponseWriter.Write(b)
}

func CacheMiddleware(next http.HandlerFunc, duration time.Duration) http.HandlerFunc {
// Each handler has its own cache to reduce contention for the in-memory store
cache := NewCache()

return func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
logger.Error(errors.Wrap(err, "cache middleware - failed to read request body"))
http.Error(w, "cache middleware: unable to read request body", http.StatusInternalServerError)
return
}

r.Body = io.NopCloser(bytes.NewBuffer(body))

hash := sha256.Sum256([]byte(r.Method + "::" + r.URL.Path + "::" + r.URL.Query().Encode()))

key := fmt.Sprintf("%x", hash)

if entry, found := cache.Get(key); found && IsSamePayload(entry.RequestBody, body) {
logger.Infof("cache middleware: serving cached payload for method: %s path: %s ttl: %s ", r.Method, r.URL.Path, time.Until(entry.Expiry).Round(time.Second).String())
JSONCached(w, entry.StatusCode, json.RawMessage(entry.ResponseBody))
return
}

recorder := &responseRecorder{ResponseWriter: w, Body: &bytes.Buffer{}}
next(recorder, r)

// Save only successful responses in the cache
if recorder.StatusCode < 200 || recorder.StatusCode >= 300 {
return
}

cache.Set(key, CacheEntry{
StatusCode: recorder.StatusCode,
RequestBody: body,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider making this a digest too because even without historical record this map can get big

ResponseBody: recorder.Body.Bytes(),
}, duration)

}
}

func IsSamePayload(a, b []byte) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use bytes.Equal?

Copy link
Contributor Author

@FourSigma FourSigma Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sgalsaleh JSON key ordering differences when you marshal from map[string]interface{} results in different byte slices for the same payload. Also raw JSON payloads might have different white space with same underlying content.

if len(a) == 0 && len(b) == 0 {
return true
}

if len(a) == 0 {
a = []byte(`{}`)
}

if len(b) == 0 {
b = []byte(`{}`)
}

var aPayload, bPayload map[string]interface{}
if err := json.Unmarshal(a, &aPayload); err != nil {
logger.Error(errors.Wrap(err, "failed to unmarshal payload A"))
return false
}
if err := json.Unmarshal(b, &bPayload); err != nil {
logger.Error(errors.Wrap(err, "failed to unmarshal payload B"))
return false
}
return reflect.DeepEqual(aPayload, bPayload)
}
Loading
Loading