From 7471d0182d468d0e3535ccae9a066b8e34428702 Mon Sep 17 00:00:00 2001 From: Vangelis Katikaridis Date: Fri, 24 Jul 2020 10:09:23 +0200 Subject: [PATCH] Server side caching (#193) * #128 cache wip implementation Signed-off-by: Vangelis Katikaridis * #128 adjust to patron new version Signed-off-by: Vangelis Katikaridis * #128 refine requirements Signed-off-by: Vangelis Katikaridis * #128 first iteration of server cache functionality Signed-off-by: Vangelis Katikaridis * #128 server route cache implementation Signed-off-by: Vangelis Katikaridis * Add validation for brokers in kafka.NewBuilder (#191) Signed-off-by: Stanislav Afanasev * Add ActiveBrokers() method to Kafka AsyncProducer (#192) Signed-off-by: Giuseppe Mazzotta * #128 revert sixth example main Signed-off-by: Vangelis Katikaridis * #128 update readme Signed-off-by: Vangelis Katikaridis * #128 fix linting Signed-off-by: Vangelis Katikaridis * #128 fix more linting Signed-off-by: Vangelis Katikaridis * #128 fix tests Signed-off-by: Vangelis Katikaridis * #128 fix tests Signed-off-by: Vangelis Katikaridis * #128 add vendor file Signed-off-by: Vangelis Katikaridis * Add Route struct getters (#195) Signed-off-by: Alex Demin * Introduce dockertest to integration tests (#182) Signed-off-by: Vangelis Katikaridis * #128 wrap up implementation Signed-off-by: Vangelis Katikaridis * #128 fix tests Signed-off-by: Vangelis Katikaridis * #128 add readme details on cache metrics Signed-off-by: Vangelis Katikaridis * #128 readme adjustments Signed-off-by: Vangelis Katikaridis * #128 finalise implementation Signed-off-by: Vangelis Katikaridis * #128 increase test timeout Signed-off-by: Vangelis Katikaridis * #128 decouple tests Signed-off-by: Vangelis Katikaridis * #128 fix tests Signed-off-by: Vangelis Katikaridis * #193 fix tests Signed-off-by: Vangelis Katikaridis * Update tracing and metrics dependencies (#190) Signed-off-by: Paschalis Tsilias * #128 implement review comments Signed-off-by: Vangelis Katikaridis * #128 implement review comments Signed-off-by: Vangelis Katikaridis * #128 fix linting Signed-off-by: Vangelis Katikaridis * #128 update vendors Signed-off-by: Vangelis Katikaridis * #128 implement review comments Signed-off-by: Vangelis Katikaridis * #128 implement review comments Signed-off-by: Vangelis Katikaridis * #128 implement review comments Signed-off-by: Vangelis Katikaridis * #128 implement review comments Signed-off-by: Vangelis Katikaridis * Upgraded github actions v2 (#198) Signed-off-by: Sotirios Mantziaris * #128 make abstraction simpler and easier to use Signed-off-by: Vangelis Katikaridis * #128 add comment Signed-off-by: Vangelis Katikaridis * #128 fix linting Signed-off-by: Vangelis Katikaridis * #128 trigger the build Signed-off-by: Vangelis Katikaridis * #128 unexport response read writer Signed-off-by: Vangelis Katikaridis * #128 check for min < max Signed-off-by: Vangelis Katikaridis * #128 update vendor Signed-off-by: Vangelis Katikaridis * #128 remove timeinstant and make use of ttl cache only for route caching Signed-off-by: Vangelis Katikaridis * #128 update readme after latest changes Signed-off-by: Vangelis Katikaridis * #128 fix redis cache interaction and create cached route example Signed-off-by: Vangelis Katikaridis * #128 remove the processor specific logic for the cache Signed-off-by: Vangelis Katikaridis * #128 use the patron header alias instead of a raw map Signed-off-by: Vangelis Katikaridis * #128 fix linting Signed-off-by: Vangelis Katikaridis * #128 expose cache functionality as a middleware Signed-off-by: Vangelis Katikaridis * #128 fix linting Signed-off-by: Vangelis Katikaridis * #128 refactor cache logic to separate package Signed-off-by: Vangelis Katikaridis * #128 remove timeinstant abstraction Signed-off-by: Vangelis Katikaridis * #128 avoid defining the prometheus registerer Signed-off-by: Vangelis Katikaridis * #128 fix linting and exported objects Signed-off-by: Vangelis Katikaridis * #128 make middleware cache abstraction more concrete Signed-off-by: Vangelis Katikaridis * #128 make the naming conventions and structure more go like Signed-off-by: Vangelis Katikaridis * #128 fix imports Signed-off-by: Vangelis Katikaridis * #128 move timing request example to user service Signed-off-by: Vangelis Katikaridis * #128 fix typo Signed-off-by: Vangelis Katikaridis * #128 add package doc Signed-off-by: Vangelis Katikaridis Co-authored-by: Stanislav Afanasev Co-authored-by: Giuseppe Co-authored-by: Alexander Demin Co-authored-by: Paschalis Tsilias Co-authored-by: Sotirios Mantziaris --- README.md | 127 +- component/http/cache/cache.go | 350 ++++++ component/http/cache/cache_test.go | 1855 ++++++++++++++++++++++++++++ component/http/cache/metric.go | 69 ++ component/http/cache/model.go | 68 + component/http/cache/model_test.go | 67 + component/http/cache/route.go | 159 +++ component/http/cache/route_test.go | 56 + component/http/component_test.go | 4 +- component/http/handler.go | 30 +- component/http/handler_test.go | 12 +- component/http/http.go | 19 +- component/http/middleware.go | 26 +- component/http/middleware_test.go | 2 +- component/http/route.go | 34 +- component/http/route_cache_test.go | 547 ++++++++ component/http/route_test.go | 11 + examples/README.md | 12 +- examples/first/main.go | 41 +- examples/seventh/main.go | 83 ++ 20 files changed, 3532 insertions(+), 40 deletions(-) create mode 100644 component/http/cache/cache.go create mode 100644 component/http/cache/cache_test.go create mode 100644 component/http/cache/metric.go create mode 100644 component/http/cache/model.go create mode 100644 component/http/cache/model_test.go create mode 100644 component/http/cache/route.go create mode 100644 component/http/cache/route_test.go create mode 100644 component/http/route_cache_test.go create mode 100644 examples/seventh/main.go diff --git a/README.md b/README.md index f85429080..925348309 100644 --- a/README.md +++ b/README.md @@ -195,6 +195,131 @@ route := NewRoute("/index", "GET" ProcessorFunc, true, ...MiddlewareFunc) routeWithAuth := NewAuthRoute("/index", "GET" ProcessorFunc, true, Authendicator, ...MiddlewareFunc) ``` +### HTTP Caching + +The caching layer for HTTP routes is specified per Route. + +```go +// RouteCache is the builder needed to build a cache for the corresponding route +type RouteCache struct { + // cache is the ttl cache implementation to be used + cache cache.TTLCache + // age specifies the minimum and maximum amount for max-age and min-fresh header values respectively + // regarding the client cache-control requests in seconds + age age +} + +func NewRouteCache(ttlCache cache.TTLCache, age Age) *RouteCache +``` + +#### server cache +- The **cache key** is based on the route path and the url request parameters. +- The server caches only **GET requests**. +- The server implementation must specify an **Age** parameters upon construction. +- Age with **Min=0** and **Max=0** effectively disables caching +- The route should return always the most fresh object instance. +- An **ETag header** must be always in responses that are part of the cache, representing the hash of the response. +- Requests within the time-to-live threshold, will be served from the cache. +Otherwise the request will be handled as usual by the route processor function. +The resulting response will be cached for future requests. +- Requests where the client control header requirements cannot be met i.e. **very low max-age** or **very high min-fresh** parameters, +will be returned to the client with a `Warning` header present in the response. + +``` +Note : When a cache is used, the handler execution might be skipped. +That implies that all generic handler functionalities MUST be delegated to a custom middleware. +i.e. counting number of server client requests etc ... +``` + +### Usage + +- provide the cache in the route builder +```go +NewRouteBuilder("/", handler). + WithRouteCache(cache, http.Age{ + Min: 30 * time.Minute, + Max: 1 * time.Hour, + }). + MethodGet() +``` + +- use the cache as a middleware +```go +NewRouteBuilder("/", handler). + WithMiddlewares(NewCachingMiddleware(NewRouteCache(cc, Age{Max: 10 * time.Second}))). + MethodGet() +``` + +#### client cache-control +The client can control the cache with the appropriate Headers +- `max-age=?` + +returns the cached instance only if the age of the instance is lower than the max-age parameter. +This parameter is bounded from below by the server option `minAge`. +This is to avoid chatty clients with no cache control policy (or very aggressive max-age policy) to effectively disable the cache +- `min-fresh=?` + +returns the cached instance if the time left for expiration is lower than the provided parameter. +This parameter is bounded from above by the server option `maxFresh`. +This is to avoid chatty clients with no cache control policy (or very aggressive min-fresh policy) to effectively disable the cache + +- `no-cache` / `no-store` + +returns a new response to the client by executing the route processing function. +NOTE : Except for cases where a `minAge` or `maxFresh` parameter has been specified in the server. +This is again a safety mechanism to avoid 'aggressive' clients put unexpected load on the server. +The server is responsible to cap the refresh time, BUT must respond with a `Warning` header in such a case. +- `only-if-cached` + +expects any response that is found in the cache, otherwise returns an empty response + +#### metrics + +The http cache exposes several metrics, used to +- assess the state of the cache +- help trim the optimal time-to-live policy +- identify client control interference + +By default we are using prometheus as the the pre-defined metrics framework. + +- `additions = misses + evictions` + +Always , the cache addition operations (objects added to the cache), +must be equal to the misses (requests that were not cached) plus the evictions (expired objects). +Otherwise we would expect to notice also an increased amount of errors or having the cache misbehaving in a different manner. + +- `additions ~ misses` + +If the additions and misses are comparable e.g. misses are almost as many as the additions, +it would point to some cleanup of the cache itself. In that case the cache seems to not be able to support +the request patterns and control headers. + +- `hits ~ additions` + +The cache hit count represents how well the cache performs for the access patterns of client requests. +If this number is rather low e.g. comparable to the additions, +this would signify that probably a cache is not a good option for the access patterns at hand. + +- `eviction age` + +The age at which the objects are evicted from the cache is a very useful indicator. +If the vast amount of evictions are close to the time to live setting, it would indicate a nicely working cache. +If we find that many evictions happen before the time to live threshold, clients would be making use cache-control headers. + + +#### cache design reference +- https://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html +- https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9 + +#### improvement considerations +- we can could the storing of the cached objects and their age counter. That way we would avoid loading the whole object in memory, +if the object is already expired. This approach might provide considerable performance (in terms of memory utilisation) +improvement for big response objects. +- we could extend the metrics to use the key of the object as a label as well for more fine-grained tuning. +But this has been left out for now, due to the potentially huge number of metric objects. +We can review according to usage or make this optional in the future. +- improve the serialization performance for the cache response objects + ### Asynchronous The implementation of the async processor follows exactly the same principle as the sync processor. @@ -370,4 +495,4 @@ GET /ready Both can return either a `200 OK` or a `503 Service Unavailable` status code (default: `200 OK`). -It is possible to customize their behaviour by injecting an `http.AliveCheck` and/or an `http.ReadyCheck` `OptionFunc` to the HTTP component constructor. \ No newline at end of file +It is possible to customize their behaviour by injecting an `http.AliveCheck` and/or an `http.ReadyCheck` `OptionFunc` to the HTTP component constructor. diff --git a/component/http/cache/cache.go b/component/http/cache/cache.go new file mode 100644 index 000000000..79dc76a15 --- /dev/null +++ b/component/http/cache/cache.go @@ -0,0 +1,350 @@ +// Package cache provides a cache control and implementation components for http routes. +package cache + +import ( + "fmt" + "hash/crc32" + "net/http" + "strconv" + "strings" + "time" + + "github.com/beatlabs/patron/cache" + "github.com/beatlabs/patron/log" +) + +type validationContext int + +const ( + // validation due to normal expiry in terms of ttl setting + ttlValidation validationContext = iota + 1 + // maxAgeValidation represents a validation , happening due to max-age Header requirements + maxAgeValidation + // minFreshValidation represents a validation , happening due to min-fresh Header requirements + minFreshValidation + + // HeaderCacheControl is the Header key for cache related values + // note : it is case-sensitive + HeaderCacheControl = "Cache-Control" + // HeaderETagHeader is the constant representing the Etag http header + HeaderETagHeader = "Etag" + + controlMinFresh = "min-fresh" + controlNoCache = "no-cache" + controlNoStore = "no-store" + controlOnlyIfCached = "only-if-cached" + controlEmpty = "" + + headerCacheMaxAge = "max-age" + headerMustRevalidate = "must-revalidate" + headerWarning = "Warning" +) + +var monitor metrics + +func init() { + monitor = newPrometheusMetrics() +} + +// NowSeconds returns the current unix timestamp in seconds +var NowSeconds = func() int64 { + return time.Now().Unix() +} + +// validator is a conditional function on an objects age and the configured ttl +type validator func(age, ttl int64) (bool, validationContext) + +// expiryCheck is the main validator that checks that the entry has not expired e.g. is stale +var expiryCheck validator = func(age, ttl int64) (bool, validationContext) { + return age <= ttl, ttlValidation +} + +// control is the model of the request parameters regarding the cache control +type control struct { + noCache bool + forceCache bool + warning string + validators []validator + expiryValidator validator +} + +// executor is the function returning a cache Response object from the underlying implementation +type executor func(now int64, key string) *response + +// handler wraps the an execution logic with a cache layer +// exec is the processor func that the cache will wrap +// rc is the route cache implementation to be used +func handler(exec executor, rc *RouteCache) func(request *handlerRequest) (response *handlerResponse, e error) { + + return func(request *handlerRequest) (handlerResponse *handlerResponse, e error) { + + now := NowSeconds() + + key := request.getKey() + + var rsp *response + + if hasNoAgeConfig(rc.age.min, rc.age.max) { + rsp = exec(now, key) + return &rsp.Response, rsp.Err + } + + cfg := extractRequestHeaders(request.header, rc.age.min, rc.age.max-rc.age.min) + if cfg.expiryValidator == nil { + cfg.expiryValidator = expiryCheck + } + + rsp = getResponse(cfg, request.path, key, now, rc, exec) + e = rsp.Err + + if e == nil { + handlerResponse = &rsp.Response + addResponseHeaders(now, handlerResponse.Header, rsp, rc.age.max) + if !rsp.FromCache && !cfg.noCache { + save(request.path, key, rsp, rc.cache, time.Duration(rc.age.max)*time.Second) + } + } + + return + } +} + +// getResponse will get the appropriate Response either using the cache or the executor, +// depending on the +func getResponse(cfg *control, path, key string, now int64, rc *RouteCache, exec executor) *response { + + if cfg.noCache { + return exec(now, key) + } + + rsp := get(key, rc) + if rsp == nil { + monitor.miss(path) + response := exec(now, key) + return response + } + if rsp.Err != nil { + log.Errorf("error during cache interaction: %v", rsp.Err) + monitor.err(path) + return exec(now, key) + } + // if the object has expired + if isValid, cx := isValid(now-rsp.LastValid, rc.age.max, append(cfg.validators, cfg.expiryValidator)...); !isValid { + tmpRsp := exec(now, key) + // if we could not retrieve a fresh Response, + // serve the last cached value, with a Warning Header + if cfg.forceCache || tmpRsp.Err != nil { + rsp.Warning = "last-valid" + monitor.hit(path) + } else { + rsp = tmpRsp + monitor.evict(path, cx, now-rsp.LastValid) + } + } else { + // add any Warning generated while parsing the headers + rsp.Warning = cfg.warning + monitor.hit(path) + } + + return rsp +} + +func isValid(age, maxAge int64, validators ...validator) (bool, validationContext) { + if len(validators) == 0 { + return false, 0 + } + for _, validator := range validators { + if isValid, cx := validator(age, maxAge); !isValid { + return false, cx + } + } + return true, 0 +} + +// get is the implementation that will provide a response instance from the cache, +// if it exists +func get(key string, rc *RouteCache) *response { + if resp, ok, err := rc.cache.Get(key); ok && err == nil { + if b, ok := resp.([]byte); ok { + r := &response{} + err := r.decode(b) + if err != nil { + return &response{Err: fmt.Errorf("could not decode cached bytes as response %v for key %s", resp, key)} + } + r.FromCache = true + return r + } + // NOTE : we need to do this hack to bypass the redis go client implementation of returning result as string instead of bytes + if b, ok := resp.(string); ok { + r := &response{} + err := r.decode([]byte(b)) + if err != nil { + return &response{Err: fmt.Errorf("could not decode cached string as response %v for key %s", resp, key)} + } + r.FromCache = true + return r + } + + return &response{Err: fmt.Errorf("could not parse cached response %v for key %s", resp, key)} + } else if err != nil { + return &response{Err: fmt.Errorf("could not read cache value for [ key = %v , Err = %v ]", key, err)} + } + return nil +} + +// save caches the given Response if required with a ttl +// as we are putting the objects in the cache, if its a TTL one, we need to manage the expiration on our own +func save(path, key string, rsp *response, cache cache.TTLCache, maxAge time.Duration) { + if !rsp.FromCache && rsp.Err == nil { + // encode to a byte array on our side to avoid cache specific encoding / marshaling requirements + bytes, err := rsp.encode() + if err != nil { + log.Errorf("could not encode response for request key %s: %v", key, err) + monitor.err(path) + return + } + if err := cache.SetTTL(key, bytes, maxAge); err != nil { + log.Errorf("could not cache response for request key %s: %v", key, err) + monitor.err(path) + return + } + monitor.add(path) + } +} + +// addResponseHeaders adds the appropriate headers according to the response conditions +func addResponseHeaders(now int64, header http.Header, rsp *response, maxAge int64) { + header.Set(HeaderETagHeader, rsp.Etag) + header.Set(HeaderCacheControl, createCacheControlHeader(maxAge, now-rsp.LastValid)) + if rsp.Warning != "" && rsp.FromCache { + header.Set(headerWarning, rsp.Warning) + } else { + delete(header, headerWarning) + } +} + +// extractRequestHeaders extracts the client request headers allowing the client some control over the cache +func extractRequestHeaders(header string, minAge, maxFresh int64) *control { + + cfg := control{ + validators: make([]validator, 0), + } + + wrn := make([]string, 0) + + for _, header := range strings.Split(header, ",") { + keyValue := strings.Split(header, "=") + headerKey := strings.ToLower(keyValue[0]) + switch headerKey { + case headerCacheMaxAge: + /** + Indicates that the client is willing to accept a Response whose + age is no greater than the specified time in seconds. Unless max- + stale directive is also included, the client is not willing to + accept a stale Response. + */ + value, ok := parseValue(keyValue) + if !ok || value < 0 { + log.Debugf("invalid value for Header '%s', defaulting to '0' ", keyValue) + value = 0 + } + value, adjusted := min(value, minAge) + if adjusted { + wrn = append(wrn, fmt.Sprintf("max-age=%d", minAge)) + } + cfg.validators = append(cfg.validators, func(age, maxAge int64) (bool, validationContext) { + return age <= value, maxAgeValidation + }) + case controlMinFresh: + /** + Indicates that the client is willing to accept a Response whose + freshness lifetime is no less than its current age plus the + specified time in seconds. That is, the client wants a Response + that will still be fresh for at least the specified number of + seconds. + */ + value, ok := parseValue(keyValue) + if !ok || value < 0 { + log.Debugf("invalid value for Header '%s', defaulting to '0' ", keyValue) + value = 0 + } + value, adjusted := max(value, maxFresh) + if adjusted { + wrn = append(wrn, fmt.Sprintf("min-fresh=%d", maxFresh)) + } + cfg.validators = append(cfg.validators, func(age, maxAge int64) (bool, validationContext) { + return maxAge-age >= value, minFreshValidation + }) + case controlNoCache: + /** + return Response if entity has changed + e.g. (304 Response if nothing has changed : 304 Not Modified) + it SHOULD NOT include min-fresh, max-stale, or max-age. + request should be accompanied by an ETag token + */ + fallthrough + case controlNoStore: + /** + no storage whatsoever + */ + wrn = append(wrn, fmt.Sprintf("max-age=%d", minAge)) + cfg.validators = append(cfg.validators, func(age, maxAge int64) (bool, validationContext) { + return age <= minAge, maxAgeValidation + }) + case controlOnlyIfCached: + /** + return only if is in cache , otherwise 504 + */ + cfg.forceCache = true + case controlEmpty: + // nothing to do here + default: + log.Warn("unrecognised cache Header: '%s'", header) + } + } + if len(wrn) > 0 { + cfg.warning = strings.Join(wrn, ",") + } + + return &cfg +} + +func hasNoAgeConfig(minAge, maxFresh int64) bool { + return minAge == 0 && maxFresh == 0 +} + +func generateETag(key []byte, t int) string { + return fmt.Sprintf("%d-%d", crc32.ChecksumIEEE(key), t) +} + +func createCacheControlHeader(ttl, lastValid int64) string { + mAge := ttl - lastValid + if mAge < 0 { + return headerMustRevalidate + } + return fmt.Sprintf("%s=%d", headerCacheMaxAge, ttl-lastValid) +} + +func min(value, threshold int64) (int64, bool) { + if value < threshold { + return threshold, true + } + return value, false +} + +func max(value, threshold int64) (int64, bool) { + if threshold > 0 && value > threshold { + return threshold, true + } + return value, false +} + +func parseValue(keyValue []string) (int64, bool) { + if len(keyValue) > 1 { + value, err := strconv.ParseInt(keyValue[1], 10, 64) + if err == nil { + return value, true + } + } + return 0, false +} diff --git a/component/http/cache/cache_test.go b/component/http/cache/cache_test.go new file mode 100644 index 000000000..f3646582c --- /dev/null +++ b/component/http/cache/cache_test.go @@ -0,0 +1,1855 @@ +package cache + +import ( + "errors" + "fmt" + "net/http" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/beatlabs/patron/cache" + "github.com/beatlabs/patron/log" + "github.com/beatlabs/patron/log/zerolog" +) + +func TestMain(m *testing.M) { + + err := log.Setup(zerolog.Create(log.DebugLevel), make(map[string]interface{})) + + if err != nil { + os.Exit(1) + } + + exitVal := m.Run() + + os.Exit(exitVal) + +} + +func TestExtractCacheHeaders(t *testing.T) { + + type caheRequestCondition struct { + noCache bool + forceCache bool + validators int + expiryValidator bool + } + + type args struct { + cfg caheRequestCondition + headers map[string]string + wrn string + } + + minAge := int64(5) + maxAge := int64(10) + + params := []args{ + { + headers: map[string]string{HeaderCacheControl: "max-age=10"}, + cfg: caheRequestCondition{ + noCache: false, + forceCache: false, + validators: 1, + }, + wrn: "", + }, + // Header cannot be parsed + { + headers: map[string]string{HeaderCacheControl: "maxage=10"}, + cfg: caheRequestCondition{ + noCache: false, + forceCache: false, + }, + wrn: "", + }, + // Header resets to minAge + { + headers: map[string]string{HeaderCacheControl: "max-age=twenty"}, + cfg: caheRequestCondition{ + noCache: false, + forceCache: false, + validators: 1, + }, + wrn: "max-age=5", + }, + // Header resets to maxFresh e.g. maxAge - minAge + { + headers: map[string]string{HeaderCacheControl: "min-fresh=10"}, + cfg: caheRequestCondition{ + noCache: false, + forceCache: false, + validators: 1, + }, + wrn: "min-fresh=5", + }, + // no Warning e.g. headers are within allowed values + { + headers: map[string]string{HeaderCacheControl: "min-fresh=5,max-age=5"}, + cfg: caheRequestCondition{ + noCache: false, + forceCache: false, + validators: 2, + }, + wrn: "", + }, + // cache headers reset to min-age, note we still cache but send a Warning Header back + { + headers: map[string]string{HeaderCacheControl: "no-cache"}, + cfg: caheRequestCondition{ + noCache: false, + forceCache: false, + validators: 1, + }, + wrn: "max-age=5", + }, + { + headers: map[string]string{HeaderCacheControl: "no-store"}, + cfg: caheRequestCondition{ + noCache: false, + forceCache: false, + validators: 1, + }, + wrn: "max-age=5", + }, + } + + for _, param := range params { + header := param.headers[HeaderCacheControl] + cfg := extractRequestHeaders(header, minAge, maxAge-minAge) + assert.Equal(t, param.wrn, cfg.warning) + assert.Equal(t, param.cfg.noCache, cfg.noCache) + assert.Equal(t, param.cfg.forceCache, cfg.forceCache) + assert.Equal(t, param.cfg.validators, len(cfg.validators)) + assert.Equal(t, param.cfg.expiryValidator, cfg.expiryValidator != nil) + } + +} + +type routeConfig struct { + path string + hnd executor + age Age +} + +type requestParams struct { + path string + header map[string]string + query string + timeInstance int64 +} + +// responseStruct emulates the patron http response, +// but this can be any struct in general +type responseStruct struct { + Payload interface{} + Header map[string]string +} + +func newRequestAt(timeInstant int64, ControlHeaders ...string) requestParams { + params := requestParams{ + query: "VALUE=1", + timeInstance: timeInstant, + header: make(map[string]string), + } + if len(ControlHeaders) > 0 { + params.header[HeaderCacheControl] = strings.Join(ControlHeaders, ",") + } + return params +} + +func maxAgeHeader(value string) string { + return fmt.Sprintf("%s=%s", headerCacheMaxAge, value) +} + +func minFreshHeader(value string) string { + return fmt.Sprintf("%s=%s", controlMinFresh, value) +} + +type testArgs struct { + routeConfig routeConfig + cache cache.TTLCache + requestParams requestParams + response *responseStruct + metrics testMetrics + err error +} + +func testHeader(maxAge int64) map[string]string { + header := make(map[string]string) + header[HeaderCacheControl] = createCacheControlHeader(maxAge, 0) + return header +} + +func testHeaderWithWarning(maxAge int64, warning string) map[string]string { + h := testHeader(maxAge) + h[headerWarning] = warning + return h +} + +func TestMinAgeCache_WithoutClientHeader(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 1 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + // cache expiration with max-age Header + { + // initial request, will fill up the cache + { + requestParams: newRequestAt(1), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // cache Response + { + requestParams: newRequestAt(9), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(2)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // still cached Response + { + requestParams: newRequestAt(11), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(0)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 2, + }, + }, + }, + err: nil, + }, + // new Response , due to expiry validator 10 + 1 - 12 < 0 + { + requestParams: newRequestAt(12), + routeConfig: rc, + response: &responseStruct{Payload: 120, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 2, + hits: 2, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +// No cache age configuration +// this effectively disables the cache +func TestNoAgeCache_WithoutClientHeader(t *testing.T) { + + rc := routeConfig{ + path: "/", + // this means , without client control headers we will always return a non-cached Response + // without any proper age configuration + age: Age{}, + } + + args := [][]testArgs{ + // cache expiration with max-age Header + { + // initial request, will fill up the cache + { + requestParams: newRequestAt(1), + routeConfig: rc, + response: &responseStruct{Payload: 10}, + metrics: testMetrics{ + map[string]*metricState{ + "/": {}, + }, + }, + err: nil, + }, + // no cached Response + { + requestParams: newRequestAt(2), + routeConfig: rc, + response: &responseStruct{Payload: 20}, + metrics: testMetrics{ + map[string]*metricState{ + "/": {}, + }, + }, + err: nil, + }, + // no cached Response + { + requestParams: newRequestAt(2, maxAgeHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 20}, + metrics: testMetrics{ + map[string]*metricState{ + "/": {}, + }, + }, + err: nil, + }, + // no cached Response + { + requestParams: newRequestAt(2, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 20}, + metrics: testMetrics{ + map[string]*metricState{ + "/": {}, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithConstantMaxAgeHeader(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 5 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + // cache expiration with max-age Header + { + // initial request, will fill up the cache + { + requestParams: newRequestAt(1, maxAgeHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // cached Response + { + requestParams: newRequestAt(3, maxAgeHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(8)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // new Response, because max-age > 9 - 1 + { + requestParams: newRequestAt(9, maxAgeHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 90, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + // cached Response right before the age threshold max-age == 14 - 9 + { + requestParams: newRequestAt(14, maxAgeHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 90, Header: testHeader(5)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 2, + evictions: 1, + }, + }, + }, + err: nil, + }, + // new Response, because max-age > 15 - 9 + { + requestParams: newRequestAt(15, maxAgeHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 150, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 3, + misses: 1, + hits: 2, + evictions: 2, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithMaxAgeHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Max: 30 * time.Second}, + } + + args := [][]testArgs{ + // cache expiration with max-age Header + { + // initial request, will fill up the cache + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(30)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // cached Response + { + requestParams: newRequestAt(10, maxAgeHeader("10")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(20)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // cached Response + { + requestParams: newRequestAt(20, maxAgeHeader("20")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 2, + }, + }, + }, + err: nil, + }, + // new Response + { + requestParams: newRequestAt(20, maxAgeHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 200, Header: testHeader(30)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 2, + evictions: 1, + }, + }, + }, + err: nil, + }, + // cache Response + { + requestParams: newRequestAt(25, maxAgeHeader("25")), + routeConfig: rc, + response: &responseStruct{Payload: 200, Header: testHeader(25)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 3, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestMinAgeCache_WithHighMaxAgeHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Max: 5 * time.Second}, + } + + args := [][]testArgs{ + // cache expiration with max-age Header + { + // initial request, will fill up the cache + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(5)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // despite the max-age request, the cache will refresh because of it's ttl + { + requestParams: newRequestAt(6, maxAgeHeader("100")), + routeConfig: rc, + response: &responseStruct{Payload: 60, Header: testHeader(5)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 2, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestNoMinAgeCache_WithLowMaxAgeHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Max: 30 * time.Second}, + } + + args := [][]testArgs{ + // cache expiration with max-age Header + { + // initial request, will fill up the cache + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(30)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // a max-age=0 request will always refresh the cache, + // if there is not minAge limit set + { + requestParams: newRequestAt(1, maxAgeHeader("0")), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(30)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestMinAgeCache_WithMaxAgeHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 5 * time.Second, Max: 30 * time.Second}, + } + + args := [][]testArgs{ + // cache expiration with max-age Header + { + // initial request, will fill up the cache + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(30)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // cached Response still, because of minAge override + // note : max-age=2 gets ignored + { + requestParams: newRequestAt(4, maxAgeHeader("2")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeaderWithWarning(26, "max-age=5")}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // cached Response because of bigger max-age parameter + { + requestParams: newRequestAt(5, maxAgeHeader("20")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(25)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 2, + }, + }, + }, + err: nil, + }, + // new Response because of minAge floor + { + requestParams: newRequestAt(6, maxAgeHeader("3")), + routeConfig: rc, + // note : no Warning because it s a new Response + response: &responseStruct{Payload: 60, Header: testHeader(30)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 2, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithConstantMinFreshHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting cache Response, as value is still fresh : 5 - 0 == 5 + { + requestParams: newRequestAt(5, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(5)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // expecting new Response, as value is not fresh enough + { + requestParams: newRequestAt(6, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 60, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + // cache Response, as value is expired : 11 - 6 <= 5 + { + requestParams: newRequestAt(11, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 60, Header: testHeader(5)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 2, + evictions: 1, + }, + }, + }, + err: nil, + }, + // expecting new Response + { + requestParams: newRequestAt(12, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 120, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 3, + misses: 1, + hits: 2, + evictions: 2, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestNoMaxFreshCache_WithLargeMinFreshHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + { + requestParams: newRequestAt(1, minFreshHeader("100")), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestMaxAgeCache_WithMinFreshHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + // Note this is a bad config + age: Age{Min: 5 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0, minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting cache Response, as min-fresh is bounded by maxFresh configuration parameter + { + requestParams: newRequestAt(5, minFreshHeader("100")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeaderWithWarning(5, "min-fresh=5")}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithMixedHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 5 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0, maxAgeHeader("5"), minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting cache Response, as value is still fresh : 5 - 0 == min-fresh and still young : 5 - 0 < max-age + { + requestParams: newRequestAt(5, maxAgeHeader("10"), minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(5)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // new Response, as value is not fresh enough : 6 - 0 > min-fresh + { + requestParams: newRequestAt(6, maxAgeHeader("10"), minFreshHeader("5")), + routeConfig: rc, + response: &responseStruct{Payload: 60, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + // cached Response, as value is still fresh enough and still young + { + requestParams: newRequestAt(6, maxAgeHeader("8"), minFreshHeader("10")), + routeConfig: rc, + response: &responseStruct{Payload: 60, Header: testHeaderWithWarning(10, "min-fresh=5")}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 2, + evictions: 1, + }, + }, + }, + err: nil, + }, + // new Response, as value is still fresh enough but too old + { + requestParams: newRequestAt(15, maxAgeHeader("8"), minFreshHeader("10")), + routeConfig: rc, + response: &responseStruct{Payload: 150, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 3, + misses: 1, + hits: 2, + evictions: 2, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithHandlerErrorWithoutHeaders(t *testing.T) { + + hndErr := errors.New("error encountered on handler") + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + }, + { + requestParams: newRequestAt(11), + routeConfig: routeConfig{ + path: rc.path, + hnd: func(now int64, key string) *response { + return &response{ + Err: hndErr, + } + }, + age: rc.age, + }, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 2, + }, + }, + }, + err: hndErr, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithHandlerErr(t *testing.T) { + + hndErr := errors.New("error encountered on handler") + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + hnd: func(now int64, key string) *response { + return &response{ + Err: hndErr, + } + }, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + misses: 1, + }, + }, + }, + err: hndErr, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithCacheGetErr(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + cacheImpl := &testingCache{ + cache: make(map[string]testingCacheEntity), + getErr: errors.New("get error"), + instant: NowSeconds, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + cache: cacheImpl, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + errors: 1, + }, + }, + }, + }, + // new Response, because of cache get error + { + requestParams: newRequestAt(1), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(10)}, + cache: cacheImpl, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + errors: 2, + }, + }, + }, + }, + }} + assertCache(t, args) + + assert.Equal(t, 2, cacheImpl.getCount) + assert.Equal(t, 2, cacheImpl.setCount) +} + +func TestCache_WithCacheSetErr(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + cacheImpl := &testingCache{ + cache: make(map[string]testingCacheEntity), + setErr: errors.New("set error"), + instant: NowSeconds, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + cache: cacheImpl, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + }, + // new Response, because of cache get error + { + requestParams: newRequestAt(1), + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(10)}, + cache: cacheImpl, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 2, + }, + }, + }, + }, + }, + } + assertCache(t, args) + + assert.Equal(t, 2, cacheImpl.getCount) + assert.Equal(t, 2, cacheImpl.setCount) +} + +func TestCache_WithMixedPaths(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: requestParams{ + query: "VALUE=1", + timeInstance: 0, + path: "/1", + }, + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/1": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // cached Response for the same path + { + requestParams: requestParams{ + query: "VALUE=1", + timeInstance: 1, + path: "/1", + }, + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(9)}, + metrics: testMetrics{ + map[string]*metricState{ + "/1": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // initial request for second path + { + requestParams: requestParams{ + query: "VALUE=1", + timeInstance: 1, + path: "/2", + }, + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/1": { + additions: 1, + misses: 1, + hits: 1, + }, + "/2": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // cached Response for second path + { + requestParams: requestParams{ + query: "VALUE=1", + timeInstance: 2, + path: "/2", + }, + routeConfig: rc, + response: &responseStruct{Payload: 10, Header: testHeader(9)}, + metrics: testMetrics{ + map[string]*metricState{ + "/1": { + additions: 1, + misses: 1, + hits: 1, + }, + "/2": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithMixedRequestParameters(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // cached Response for same request parameter + { + requestParams: requestParams{ + query: "VALUE=1", + timeInstance: 1, + }, + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(9)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // new Response for different request parameter + { + requestParams: requestParams{ + query: "VALUE=2", + timeInstance: 1, + }, + routeConfig: rc, + response: &responseStruct{Payload: 20, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 2, + hits: 1, + }, + }, + }, + err: nil, + }, + // cached Response for second request parameter + { + requestParams: requestParams{ + query: "VALUE=2", + timeInstance: 2, + }, + routeConfig: rc, + response: &responseStruct{Payload: 20, Header: testHeader(9)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 2, + hits: 2, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestZeroAgeCache_WithNoCacheHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting new Response, as we are using no-cache Header + { + requestParams: newRequestAt(5, "no-cache"), + routeConfig: rc, + response: &responseStruct{Payload: 50, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestMinAgeCache_WithNoCacheHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 2 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting cached Response, as we are using no-cache Header but are within the minAge limit + { + requestParams: newRequestAt(2, "no-cache"), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeaderWithWarning(8, "max-age=2")}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // expecting new Response, as we are using no-cache Header + { + requestParams: newRequestAt(5, "no-cache"), + routeConfig: rc, + response: &responseStruct{Payload: 50, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestZeroAgeCache_WithNoStoreHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting new Response, as we are using no-store Header + { + requestParams: newRequestAt(5, "no-store"), + routeConfig: rc, + response: &responseStruct{Payload: 50, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestMinAgeCache_WithNoStoreHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 2 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting cached Response, as we are using no-store Header but are within the minAge limit + { + requestParams: newRequestAt(2, "no-store"), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeaderWithWarning(8, "max-age=2")}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + // expecting new Response, as we are using no-store Header + { + requestParams: newRequestAt(5, "no-store"), + routeConfig: rc, + response: &responseStruct{Payload: 50, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 2, + misses: 1, + hits: 1, + evictions: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func TestCache_WithForceCacheHeaders(t *testing.T) { + + rc := routeConfig{ + path: "/", + age: Age{Min: 10 * time.Second, Max: 10 * time.Second}, + } + + args := [][]testArgs{ + { + // initial request + { + requestParams: newRequestAt(0), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(10)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + }, + }, + }, + err: nil, + }, + // expecting cache Response, as min-fresh is bounded by maxFresh configuration parameter + { + requestParams: newRequestAt(5, "only-if-cached"), + routeConfig: rc, + response: &responseStruct{Payload: 0, Header: testHeader(5)}, + metrics: testMetrics{ + map[string]*metricState{ + "/": { + additions: 1, + misses: 1, + hits: 1, + }, + }, + }, + err: nil, + }, + }, + } + assertCache(t, args) +} + +func assertCache(t *testing.T, args [][]testArgs) { + + monitor = &testMetrics{} + + // create a test request handler + // that returns the current time instant times '10' multiplied by the VALUE parameter in the request + exec := func(request requestParams) func(now int64, key string) *response { + return func(now int64, key string) *response { + i, err := strconv.Atoi(strings.Split(request.query, "=")[1]) + if err != nil { + return &response{ + Err: err, + } + } + response := &response{ + Response: handlerResponse{ + Bytes: []byte(strconv.Itoa(i * 10 * int(request.timeInstance))), + Header: make(map[string][]string), + }, + Etag: generateETag([]byte{}, int(now)), + LastValid: request.timeInstance, + } + return response + } + } + + // test cache implementation + cacheIml := newTestingCache() + + for _, testArg := range args { + for _, arg := range testArg { + + path := arg.routeConfig.path + if arg.requestParams.path != "" { + path = arg.requestParams.path + } + + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s?%s", path, arg.requestParams.query), nil) + assert.NoError(t, err) + propagateHeaders(arg.requestParams.header, req.Header) + assert.NoError(t, err) + request := toCacheHandlerRequest(req) + + var hnd executor + if arg.routeConfig.hnd != nil { + hnd = arg.routeConfig.hnd + } else { + hnd = exec(arg.requestParams) + } + + var ch cache.TTLCache + if arg.cache != nil { + ch = arg.cache + } else { + ch = cacheIml + cacheIml.instant = func() int64 { + return arg.requestParams.timeInstance + } + } + + NowSeconds = func() int64 { + return arg.requestParams.timeInstance + } + + routeCache, errs := NewRouteCache(ch, arg.routeConfig.age) + assert.Empty(t, errs) + + response, err := handler(hnd, routeCache)(request) + + if arg.err != nil { + assert.Error(t, err) + assert.Nil(t, response) + assert.Equal(t, err, arg.err) + } else { + assert.NoError(t, err) + assert.NotNil(t, response) + payload, err := strconv.Atoi(string(response.Bytes)) + assert.NoError(t, err) + assert.Equal(t, arg.response.Payload, payload) + assertHeader(t, HeaderCacheControl, arg.response.Header, response.Header) + assertHeader(t, headerWarning, arg.response.Header, response.Header) + assert.NotNil(t, arg.response.Header[HeaderETagHeader]) + if !hasNoAgeConfig(int64(arg.routeConfig.age.Min), int64(arg.routeConfig.age.Max)) { + assert.NotEmpty(t, response.Header[HeaderETagHeader]) + } + } + assertMetrics(t, arg.metrics, *monitor.(*testMetrics)) + } + } +} + +func propagateHeaders(header map[string]string, wHeader http.Header) { + for k, h := range header { + wHeader.Set(k, h) + } +} + +func assertHeader(t *testing.T, key string, expected map[string]string, actual http.Header) { + if expected[key] == "" { + assert.Empty(t, actual[key]) + } else { + assert.Equal(t, expected[key], actual[key][0]) + } + +} + +func assertMetrics(t *testing.T, expected, actual testMetrics) { + for k, v := range expected.values { + if actual.values == nil { + assert.Equal(t, v, &metricState{}) + } else { + assert.Equal(t, v, actual.values[k]) + } + } +} + +type testingCacheEntity struct { + v interface{} + ttl int64 + t0 int64 +} + +type testingCache struct { + cache map[string]testingCacheEntity + getCount int + setCount int + getErr error + setErr error + instant func() int64 +} + +func newTestingCache() *testingCache { + return &testingCache{cache: make(map[string]testingCacheEntity)} +} + +func (t *testingCache) Get(key string) (interface{}, bool, error) { + t.getCount++ + if t.getErr != nil { + return nil, false, t.getErr + } + r, ok := t.cache[key] + if t.instant()-r.t0 > r.ttl { + return nil, false, nil + } + return r.v, ok, nil +} + +func (t *testingCache) Purge() error { + for k := range t.cache { + _ = t.Remove(k) + } + return nil +} + +func (t *testingCache) Remove(key string) error { + delete(t.cache, key) + return nil +} + +// Note : this method will effectively not cache anything +// e.g. testingCacheEntity.t is `0` +func (t *testingCache) Set(key string, value interface{}) error { + t.setCount++ + if t.setErr != nil { + return t.getErr + } + t.cache[key] = testingCacheEntity{ + v: value, + } + return nil +} + +func (t *testingCache) SetTTL(key string, value interface{}, ttl time.Duration) error { + t.setCount++ + if t.setErr != nil { + return t.getErr + } + t.cache[key] = testingCacheEntity{ + v: value, + ttl: int64(ttl / time.Second), + t0: t.instant(), + } + return nil +} + +type testMetrics struct { + values map[string]*metricState +} + +type metricState struct { + additions int + misses int + evictions int + hits int + errors int +} + +func (m *testMetrics) init(path string) { + if m.values == nil { + m.values = make(map[string]*metricState) + } + if _, exists := m.values[path]; !exists { + + m.values[path] = &metricState{} + } +} + +func (m *testMetrics) add(path string) { + m.init(path) + m.values[path].additions++ +} + +func (m *testMetrics) miss(path string) { + m.init(path) + m.values[path].misses++ +} + +func (m *testMetrics) hit(path string) { + m.init(path) + m.values[path].hits++ +} + +func (m *testMetrics) err(path string) { + m.init(path) + m.values[path].errors++ +} + +func (m *testMetrics) evict(path string, context validationContext, age int64) { + m.init(path) + m.values[path].evictions++ +} diff --git a/component/http/cache/metric.go b/component/http/cache/metric.go new file mode 100644 index 000000000..5bfb780ae --- /dev/null +++ b/component/http/cache/metric.go @@ -0,0 +1,69 @@ +package cache + +import "github.com/prometheus/client_golang/prometheus" + +var validationReason = map[validationContext]string{0: "nil", ttlValidation: "expired", maxAgeValidation: "max_age", minFreshValidation: "min_fresh"} + +type metrics interface { + add(path string) + miss(path string) + hit(path string) + err(path string) + evict(path string, context validationContext, age int64) +} + +// prometheusMetrics is the prometheus implementation for exposing cache metrics +type prometheusMetrics struct { + ageHistogram *prometheus.HistogramVec + operations *prometheus.CounterVec +} + +func (m *prometheusMetrics) add(path string) { + m.operations.WithLabelValues(path, "add", "").Inc() +} + +func (m *prometheusMetrics) miss(path string) { + m.operations.WithLabelValues(path, "miss", "").Inc() +} + +func (m *prometheusMetrics) hit(path string) { + m.operations.WithLabelValues(path, "hit", "").Inc() +} + +func (m *prometheusMetrics) err(path string) { + m.operations.WithLabelValues(path, "Err", "").Inc() +} + +func (m *prometheusMetrics) evict(path string, context validationContext, age int64) { + m.ageHistogram.WithLabelValues(path).Observe(float64(age)) + m.operations.WithLabelValues(path, "evict", validationReason[context]).Inc() +} + +// newPrometheusMetrics constructs a new prometheus metrics implementation instance +func newPrometheusMetrics() *prometheusMetrics { + + histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "expiration", + Help: "Expiry age for evicted objects.", + Buckets: []float64{1, 10, 30, 60, 60 * 5, 60 * 10, 60 * 30, 60 * 60}, + }, []string{"route"}) + + operations := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "operations", + Help: "Number of cache operations.", + }, []string{"route", "operation", "reason"}) + + m := &prometheusMetrics{ + ageHistogram: histogram, + operations: operations, + } + + prometheus.MustRegister(m.ageHistogram, m.operations) + + return m + +} diff --git a/component/http/cache/model.go b/component/http/cache/model.go new file mode 100644 index 000000000..e2af4826f --- /dev/null +++ b/component/http/cache/model.go @@ -0,0 +1,68 @@ +package cache + +import ( + "encoding/json" + "fmt" + "net/http" +) + +// handlerRequest is the dedicated request object for the cache handler +type handlerRequest struct { + header string + path string + query string +} + +// toCacheHandlerRequest transforms the http Request object to the cache handler request +func toCacheHandlerRequest(req *http.Request) *handlerRequest { + var header string + if req.Header != nil { + header = req.Header.Get(HeaderCacheControl) + } + var path string + var query string + if req.URL != nil { + path = req.URL.Path + query = req.URL.RawQuery + } + return &handlerRequest{ + header: header, + path: path, + query: query, + } +} + +// getKey generates a unique cache key based on the route path and the query parameters +func (c *handlerRequest) getKey() string { + return fmt.Sprintf("%s:%s", c.path, c.query) +} + +// handlerResponse is the dedicated Response object for the cache handler +type handlerResponse struct { + Bytes []byte + Header http.Header +} + +// response is the struct representing an object retrieved or ready to be put into the route cache +type response struct { + Response handlerResponse + LastValid int64 + Etag string + Warning string + FromCache bool + Err error +} + +// encode encodes the generic response to bytes for external memory storage +func (c *response) encode() ([]byte, error) { + b, err := json.Marshal(c) + if err != nil { + return nil, fmt.Errorf("could not encode cache response object: %w", err) + } + return b, nil +} + +// decode decodes the cached object bytes +func (c *response) decode(data []byte) error { + return json.Unmarshal(data, c) +} diff --git a/component/http/cache/model_test.go b/component/http/cache/model_test.go new file mode 100644 index 000000000..98ce53833 --- /dev/null +++ b/component/http/cache/model_test.go @@ -0,0 +1,67 @@ +package cache + +import ( + "testing" + + "github.com/beatlabs/patron/encoding/json" + "github.com/stretchr/testify/assert" +) + +func TestResponse(t *testing.T) { + + type arg struct { + payload interface{} + } + + args := []arg{ + {"string"}, + {10.0}, + //{10}, + {struct { + a string + f float64 + i int + mi map[string]int + mf map[float64]string + }{ + a: "a string", + f: 12.2, + i: 22, + mi: map[string]int{"1": 1}, + mf: map[float64]string{1.1: "1.1"}, + }}, + } + + for _, argument := range args { + assertForHandlerResponse(t, argument.payload) + } + +} + +func assertForHandlerResponse(t *testing.T, payload interface{}) { + + bp, err := json.Encode(payload) + assert.NoError(t, err) + + r := response{ + Response: handlerResponse{ + Bytes: bp, + Header: map[string][]string{"header": {"header-value"}}, + }, + LastValid: 10, + Etag: "", + Warning: "", + FromCache: false, + Err: nil, + } + + b, err := r.encode() + assert.NoError(t, err) + + rsp := response{} + err = rsp.decode(b) + assert.NoError(t, err) + + assert.Equal(t, r, rsp) + +} diff --git a/component/http/cache/route.go b/component/http/cache/route.go new file mode 100644 index 000000000..c9a7c860c --- /dev/null +++ b/component/http/cache/route.go @@ -0,0 +1,159 @@ +package cache + +import ( + "bytes" + "errors" + "fmt" + "net/http" + "time" + + "github.com/beatlabs/patron/cache" + "github.com/beatlabs/patron/log" +) + +// RouteCache is the builder needed to build a cache for the corresponding route +type RouteCache struct { + // cache is the ttl cache implementation to be used + cache cache.TTLCache + // age specifies the minimum and maximum amount for max-age and min-fresh Header values respectively + // regarding the client cache-control requests in seconds + age age +} + +// NewRouteCache creates a new cache implementation for an http route +func NewRouteCache(ttlCache cache.TTLCache, age Age) (*RouteCache, []error) { + + errs := make([]error, 0) + + if ttlCache == nil { + errs = append(errs, errors.New("route cache is nil")) + } + + if age.Min > age.Max { + errs = append(errs, errors.New("max age must always be greater than min age")) + } + + if hasNoAgeConfig(age.Min.Milliseconds(), age.Max.Milliseconds()) { + log.Warnf("route cache for %s is disabled because of empty Age property %v ") + } + + return &RouteCache{ + cache: ttlCache, + age: age.toAgeInSeconds(), + }, errs +} + +// Age defines the route cache life-time boundaries for cached objects +type Age struct { + // Min adds a minimum age threshold for the client controlled cache responses. + // This will avoid cases where a single client with high request rate and no cache control headers might effectively disable the cache + // This means that if this parameter is missing (e.g. is equal to '0' , the cache can effectively be made obsolete in the above scenario) + Min time.Duration + // Max adds a maximum age for the cache responses. Which effectively works as a time-to-live wrapper on top of the cache + Max time.Duration + // The difference of maxAge-minAge sets automatically the max threshold for min-fresh requests + // This will avoid cases where a single client with high request rate and no cache control headers might effectively disable the cache + // This means that if this parameter is very high (e.g. greater than ttl , the cache can effectively be made obsolete in the above scenario) +} + +func (a Age) toAgeInSeconds() age { + return age{ + min: int64(a.Min / time.Second), + max: int64(a.Max / time.Second), + } +} + +type age struct { + min int64 + max int64 +} + +// responseReadWriter is a Response writer able to Read the Payload. +type responseReadWriter struct { + buffer *bytes.Buffer + len int + header http.Header + statusCode int +} + +// newResponseReadWriter creates a new responseReadWriter. +func newResponseReadWriter() *responseReadWriter { + return &responseReadWriter{ + buffer: new(bytes.Buffer), + header: make(http.Header), + } +} + +// Read reads the responsereadWriter Payload. +func (rw *responseReadWriter) Read(p []byte) (n int, err error) { + return rw.buffer.Read(p) +} + +// ReadAll returns the Response Payload Bytes. +func (rw *responseReadWriter) ReadAll() ([]byte, error) { + if rw.len == 0 { + // nothing has been written + return []byte{}, nil + } + b := make([]byte, rw.len) + _, err := rw.Read(b) + return b, err +} + +// Header returns the Header object. +func (rw *responseReadWriter) Header() http.Header { + return rw.header +} + +// Write writes the provied Bytes to the byte buffer. +func (rw *responseReadWriter) Write(p []byte) (int, error) { + rw.len = len(p) + return rw.buffer.Write(p) +} + +// WriteHeader writes the Header status code. +func (rw *responseReadWriter) WriteHeader(statusCode int) { + rw.statusCode = statusCode +} + +// Handler will wrap the handler func with the route cache abstraction +func Handler(w http.ResponseWriter, r *http.Request, rc *RouteCache, httpHandler http.Handler) error { + req := toCacheHandlerRequest(r) + response, err := handler(httpExecutor(w, r, func(writer http.ResponseWriter, request *http.Request) { + httpHandler.ServeHTTP(writer, request) + }), rc)(req) + if err != nil { + return fmt.Errorf("could not handle request with the cache processor: %v", err) + } + for k, h := range response.Header { + w.Header().Set(k, h[0]) + } + if i, err := w.Write(response.Bytes); err != nil { + return fmt.Errorf("could not Write cache processor result into Response %d: %w", i, err) + } + return nil +} + +// httpExecutor is the function that will create a new response based on a HandlerFunc implementation +// this wrapper adapts the http handler signature to the cache layer abstraction +func httpExecutor(_ http.ResponseWriter, request *http.Request, hnd http.HandlerFunc) executor { + return func(now int64, key string) *response { + var err error + responseReadWriter := newResponseReadWriter() + hnd(responseReadWriter, request) + payload, err := responseReadWriter.ReadAll() + rw := *responseReadWriter + if err == nil { + return &response{ + Response: handlerResponse{ + Bytes: payload, + // cache also the headers generated by the handler + Header: rw.Header(), + }, + LastValid: now, + Etag: generateETag([]byte(key), time.Now().Nanosecond()), + } + } + return &response{Err: err} + } +} diff --git a/component/http/cache/route_test.go b/component/http/cache/route_test.go new file mode 100644 index 000000000..5c4e98a8b --- /dev/null +++ b/component/http/cache/route_test.go @@ -0,0 +1,56 @@ +package cache + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestResponseReadWriter_Header(t *testing.T) { + rw := newResponseReadWriter() + rw.Header().Set("key", "value") + assert.Equal(t, "value", rw.Header().Get("key")) +} + +func TestResponseReadWriter_StatusCode(t *testing.T) { + rw := newResponseReadWriter() + rw.WriteHeader(100) + assert.Equal(t, 100, rw.statusCode) +} + +func TestResponseReadWriter_ReadWrite(t *testing.T) { + rw := newResponseReadWriter() + str := "body" + i, err := rw.Write([]byte(str)) + assert.NoError(t, err) + + r := make([]byte, i) + j, err := rw.Read(r) + assert.NoError(t, err) + + assert.Equal(t, i, j) + assert.Equal(t, str, string(r)) +} + +func TestResponseReadWriter_ReadWriteAll(t *testing.T) { + rw := newResponseReadWriter() + str := "body" + i, err := rw.Write([]byte(str)) + assert.NoError(t, err) + + b, err := rw.ReadAll() + assert.NoError(t, err) + + assert.Equal(t, i, len(b)) + assert.Equal(t, str, string(b)) +} + +func TestResponseReadWriter_ReadAllEmpty(t *testing.T) { + rw := newResponseReadWriter() + + b, err := rw.ReadAll() + assert.NoError(t, err) + + assert.Equal(t, 0, len(b)) + assert.Equal(t, "", string(b)) +} diff --git a/component/http/component_test.go b/component/http/component_test.go index e8ae3eea1..f52916aa1 100644 --- a/component/http/component_test.go +++ b/component/http/component_test.go @@ -20,7 +20,7 @@ func TestBuilderWithoutOptions(t *testing.T) { func TestComponent_ListenAndServe_DefaultRoutes_Shutdown(t *testing.T) { rb := NewRoutesBuilder(). Append(NewRawRouteBuilder("/", func(http.ResponseWriter, *http.Request) {}).MethodGet().WithTrace()) - s, err := NewBuilder().WithRoutesBuilder(rb).WithPort(50003).Create() + s, err := NewBuilder().WithRoutesBuilder(rb).WithPort(50013).Create() assert.NoError(t, err) done := make(chan bool) ctx, cnl := context.WithCancel(context.Background()) @@ -36,7 +36,7 @@ func TestComponent_ListenAndServe_DefaultRoutes_Shutdown(t *testing.T) { func TestComponent_ListenAndServeTLS_DefaultRoutes_Shutdown(t *testing.T) { rb := NewRoutesBuilder().Append(NewRawRouteBuilder("/", func(http.ResponseWriter, *http.Request) {}).MethodGet()) - s, err := NewBuilder().WithRoutesBuilder(rb).WithSSL("testdata/server.pem", "testdata/server.key").WithPort(50003).Create() + s, err := NewBuilder().WithRoutesBuilder(rb).WithSSL("testdata/server.pem", "testdata/server.key").WithPort(50014).Create() assert.NoError(t, err) done := make(chan bool) ctx, cnl := context.WithCancel(context.Background()) diff --git a/component/http/handler.go b/component/http/handler.go index 36ea04e4e..a6d62b95a 100644 --- a/component/http/handler.go +++ b/component/http/handler.go @@ -17,7 +17,7 @@ func handler(hnd ProcessorFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - ct, dec, enc, err := determineEncoding(r) + ct, dec, enc, err := determineEncoding(r.Header) if err != nil { http.Error(w, http.StatusText(http.StatusUnsupportedMediaType), http.StatusUnsupportedMediaType) return @@ -29,14 +29,18 @@ func handler(hnd ProcessorFunc) http.HandlerFunc { f[k] = v } + // TODO : for cached responses this becomes inconsistent, to be fixed in #160 + // the corID will be passed to all consecutive responses + // if it was missing from the initial request corID := getOrSetCorrelationID(r.Header) ctx := correlation.ContextWithID(r.Context(), corID) logger := log.Sub(map[string]interface{}{correlation.ID: corID}) ctx = log.WithContext(ctx, logger) - h := extractHeaders(r) + h := extractHeaders(r.Header) req := NewRequest(f, r.Body, h, dec) + rsp, err := hnd(ctx, req) if err != nil { handleError(logger, w, enc, err) @@ -50,9 +54,9 @@ func handler(hnd ProcessorFunc) http.HandlerFunc { } } -func determineEncoding(r *http.Request) (string, encoding.DecodeFunc, encoding.EncodeFunc, error) { - cth, cok := r.Header[encoding.ContentTypeHeader] - ach, aok := r.Header[encoding.AcceptHeader] +func determineEncoding(h http.Header) (string, encoding.DecodeFunc, encoding.EncodeFunc, error) { + cth, cok := h[encoding.ContentTypeHeader] + ach, aok := h[encoding.AcceptHeader] // No headers default to JSON if !cok && !aok { @@ -74,7 +78,7 @@ func determineEncoding(r *http.Request) (string, encoding.DecodeFunc, encoding.E dec = protobuf.Decode ct = protobuf.Type default: - return "", nil, nil, errors.New("content type header not supported") + return "", nil, nil, errors.New("content type Header not supported") } } @@ -93,7 +97,7 @@ func determineEncoding(r *http.Request) (string, encoding.DecodeFunc, encoding.E } ct = protobuf.Type default: - return "", nil, nil, errors.New("accept header not supported") + return "", nil, nil, errors.New("accept Header not supported") } } @@ -109,10 +113,10 @@ func extractFields(r *http.Request) map[string]string { return f } -func extractHeaders(r *http.Request) map[string]string { +func extractHeaders(header http.Header) Header { h := make(map[string]string) - for name, values := range r.Header { + for name, values := range header { for _, value := range values { if len(value) > 0 { h[strings.ToUpper(name)] = value @@ -137,12 +141,14 @@ func handleSuccess(w http.ResponseWriter, r *http.Request, rsp *Response, enc en w.WriteHeader(http.StatusCreated) } + propagateHeaders(rsp.Header, w.Header()) + _, err = w.Write(p) return err } func handleError(logger log.Logger, w http.ResponseWriter, enc encoding.EncodeFunc, err error) { - // Assert error to type Error in order to leverage the code and payload values that such errors contain. + // Assert error to type Error in order to leverage the code and Payload values that such errors contain. if err, ok := err.(*Error); ok { p, encErr := enc(err.payload) if encErr != nil { @@ -151,11 +157,11 @@ func handleError(logger log.Logger, w http.ResponseWriter, enc encoding.EncodeFu } w.WriteHeader(err.code) if _, err := w.Write(p); err != nil { - logger.Errorf("failed to write response: %v", err) + logger.Errorf("failed to write Response: %v", err) } return } - // Using http.Error helper hijacks the content type header of the response returning plain text payload. + // Using http.Error helper hijacks the content type Header of the Response returning plain text Payload. http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } diff --git a/component/http/handler_test.go b/component/http/handler_test.go index 728e1edf0..39f4bc7c6 100644 --- a/component/http/handler_test.go +++ b/component/http/handler_test.go @@ -30,11 +30,11 @@ func Test_extractHeaders(t *testing.T) { r, err := http.NewRequest("GET", "/test", nil) r.Header.Set("X-HEADER-1", "all capsssss") r.Header.Set("X-HEADER-1", "all caps") - r.Header.Set("x-header-2", "all lower") + r.Header.Set("x-Header-2", "all lower") r.Header.Set("X-hEadEr-3", "all mixed") r.Header.Set("X-ACME", "") assert.NoError(t, err) - h := extractHeaders(r) + h := extractHeaders(r.Header) assert.Len(t, h, 3) assert.Equal(t, "all caps", h["X-HEADER-1"]) assert.Equal(t, "all lower", h["X-HEADER-2"]) @@ -60,14 +60,14 @@ func Test_determineEncoding(t *testing.T) { {"success protobuf, missing accept", args{req: request(t, protobuf.Type, "")}, protobuf.Decode, protobuf.Encode, protobuf.Type, false}, {"success protobuf, missing content type", args{req: request(t, "", protobuf.Type)}, protobuf.Decode, protobuf.Encode, protobuf.Type, false}, {"wrong accept", args{req: request(t, json.Type, "xxx")}, nil, nil, json.TypeCharset, true}, - {"missing content header, defaults json", args{req: request(t, "", json.TypeCharset)}, json.Decode, json.Encode, json.TypeCharset, false}, + {"missing content Header, defaults json", args{req: request(t, "", json.TypeCharset)}, json.Decode, json.Encode, json.TypeCharset, false}, {"missing headers, defaults json", args{req: request(t, "", "")}, json.Decode, json.Encode, json.TypeCharset, false}, {"accept */*, defaults to json", args{req: request(t, json.TypeCharset, "*/*")}, json.Decode, json.Encode, json.TypeCharset, false}, {"wrong content", args{req: request(t, "application/xml", json.TypeCharset)}, nil, nil, json.TypeCharset, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ct, got, got1, err := determineEncoding(tt.args.req) + ct, got, got1, err := determineEncoding(tt.args.req.Header) if tt.wantErr { assert.Error(t, err) assert.Nil(t, got) @@ -109,7 +109,7 @@ func Test_getOrSetCorrelationID(t *testing.T) { "with id": {args: args{hdr: withID}}, "without id": {args: args{hdr: withoutID}}, "with empty id": {args: args{hdr: withEmptyID}}, - "missing header": {args: args{hdr: missingHeader}}, + "missing Header": {args: args{hdr: missingHeader}}, } for name, tt := range tests { t.Run(name, func(t *testing.T) { @@ -183,7 +183,7 @@ func Test_handleError(t *testing.T) { {"service unavailable error", args{err: NewServiceUnavailableError(), enc: json.Encode}, http.StatusServiceUnavailable}, {"internal server error", args{err: NewError(), enc: json.Encode}, http.StatusInternalServerError}, {"default error", args{err: errors.New("Test"), enc: json.Encode}, http.StatusInternalServerError}, - {"payload encoding error", args{err: NewErrorWithCodeAndPayload(http.StatusBadRequest, make(chan int)), enc: json.Encode}, http.StatusInternalServerError}, + {"Payload encoding error", args{err: NewErrorWithCodeAndPayload(http.StatusBadRequest, make(chan int)), enc: json.Encode}, http.StatusInternalServerError}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/component/http/http.go b/component/http/http.go index f9b6c3944..b3567874c 100644 --- a/component/http/http.go +++ b/component/http/http.go @@ -4,15 +4,19 @@ package http import ( "context" "io" + "net/http" "github.com/beatlabs/patron/encoding" ) +// Header is the http header representation as a map of strings +type Header map[string]string + // Request definition of the sync request model. type Request struct { Fields map[string]string Raw io.Reader - Headers map[string]string + Headers Header decode encoding.DecodeFunc } @@ -26,15 +30,22 @@ func (r *Request) Decode(v interface{}) error { return r.decode(r.Raw, v) } -// Response definition of the sync response model. +// Response definition of the sync Response model. type Response struct { Payload interface{} + Header Header } -// NewResponse creates a new response. +// NewResponse creates a new Response. func NewResponse(p interface{}) *Response { - return &Response{Payload: p} + return &Response{Payload: p, Header: make(map[string]string)} } // ProcessorFunc definition of a function type for processing sync requests. type ProcessorFunc func(context.Context, *Request) (*Response, error) + +func propagateHeaders(header Header, wHeader http.Header) { + for k, h := range header { + wHeader.Set(k, h) + } +} diff --git a/component/http/middleware.go b/component/http/middleware.go index 241ce96de..a05bf800b 100644 --- a/component/http/middleware.go +++ b/component/http/middleware.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/beatlabs/patron/component/http/auth" + "github.com/beatlabs/patron/component/http/cache" "github.com/beatlabs/patron/correlation" "github.com/beatlabs/patron/log" "github.com/beatlabs/patron/trace" @@ -33,12 +34,12 @@ func (w *responseWriter) Status() int { return w.status } -// Header returns the header. +// Header returns the Header. func (w *responseWriter) Header() http.Header { return w.writer.Header() } -// Write to the internal ResponseWriter and sets the status if not set already. +// Write to the internal responseWriter and sets the status if not set already. func (w *responseWriter) Write(d []byte) (int, error) { value, err := w.writer.Write(d) @@ -54,7 +55,7 @@ func (w *responseWriter) Write(d []byte) (int, error) { return value, err } -// WriteHeader writes the internal header and saves the status for retrieval. +// WriteHeader writes the internal Header and saves the status for retrieval. func (w *responseWriter) WriteHeader(code int) { w.status = code w.writer.WriteHeader(code) @@ -123,6 +124,25 @@ func NewLoggingTracingMiddleware(path string) MiddlewareFunc { } } +// NewCachingMiddleware creates a cache layer as a middleware +// when used as part of a middleware chain any middleware later in the chain, +// will not be executed, but the headers it appends will be part of the cache +func NewCachingMiddleware(rc *cache.RouteCache) MiddlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + next.ServeHTTP(w, r) + return + } + err := cache.Handler(w, r, rc, next) + if err != nil { + log.Errorf("error encountered in the caching middleware: %v", err) + return + } + }) + } +} + // MiddlewareChain chains middlewares to a handler func. func MiddlewareChain(f http.Handler, mm ...MiddlewareFunc) http.Handler { for i := len(mm) - 1; i >= 0; i-- { diff --git a/component/http/middleware_test.go b/component/http/middleware_test.go index 187fb5f8f..00626223b 100644 --- a/component/http/middleware_test.go +++ b/component/http/middleware_test.go @@ -114,7 +114,7 @@ func TestResponseWriter(t *testing.T) { rw.WriteHeader(202) assert.Equal(t, 202, rw.status, "status expected 202 but got %d", rw.status) - assert.Len(t, rw.Header(), 1, "header count expected to be 1") + assert.Len(t, rw.Header(), 1, "Header count expected to be 1") assert.True(t, rw.statusHeaderWritten, "expected to be true") assert.Equal(t, "test", rc.Body.String(), "body expected to be test but was %s", rc.Body.String()) } diff --git a/component/http/route.go b/component/http/route.go index 3844ef33c..78ebbfa01 100644 --- a/component/http/route.go +++ b/component/http/route.go @@ -6,7 +6,9 @@ import ( "net/http" "strings" + "github.com/beatlabs/patron/cache" "github.com/beatlabs/patron/component/http/auth" + httpcache "github.com/beatlabs/patron/component/http/cache" errs "github.com/beatlabs/patron/errors" ) @@ -46,6 +48,7 @@ type RouteBuilder struct { middlewares []MiddlewareFunc authenticator auth.Authenticator handler http.HandlerFunc + routeCache *httpcache.RouteCache errors []error } @@ -73,6 +76,16 @@ func (rb *RouteBuilder) WithAuth(auth auth.Authenticator) *RouteBuilder { return rb } +// WithRouteCache adds a cache to the corresponding route +func (rb *RouteBuilder) WithRouteCache(cache cache.TTLCache, ageBounds httpcache.Age) *RouteBuilder { + + rc, ee := httpcache.NewRouteCache(cache, ageBounds) + + rb.routeCache = rc + rb.errors = append(rb.errors, ee...) + return rb +} + func (rb *RouteBuilder) setMethod(method string) *RouteBuilder { if rb.method != "" { rb.errors = append(rb.errors, errors.New("method already set")) @@ -146,6 +159,13 @@ func (rb *RouteBuilder) Build() (Route, error) { if len(rb.middlewares) > 0 { middlewares = append(middlewares, rb.middlewares...) } + // cache middleware is always last, so that it caches only the headers of the handler + if rb.routeCache != nil { + if rb.method != http.MethodGet { + return Route{}, errors.New("cannot apply cache to a route with any method other than GET ") + } + middlewares = append(middlewares, NewCachingMiddleware(rb.routeCache)) + } return Route{ path: rb.path, @@ -173,17 +193,17 @@ func NewRawRouteBuilder(path string, handler http.HandlerFunc) *RouteBuilder { // NewRouteBuilder constructor. func NewRouteBuilder(path string, processor ProcessorFunc) *RouteBuilder { - var err error + var ee []error + + if path == "" { + ee = append(ee, errors.New("path is empty")) + } if processor == nil { - err = errors.New("processor is nil") + ee = append(ee, errors.New("processor is nil")) } - rb := NewRawRouteBuilder(path, handler(processor)) - if err != nil { - rb.errors = append(rb.errors, err) - } - return rb + return &RouteBuilder{path: path, errors: ee, handler: handler(processor)} } // RoutesBuilder creates a list of routes. diff --git a/component/http/route_cache_test.go b/component/http/route_cache_test.go new file mode 100644 index 000000000..c15426adc --- /dev/null +++ b/component/http/route_cache_test.go @@ -0,0 +1,547 @@ +package http + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/beatlabs/patron/cache" + httpclient "github.com/beatlabs/patron/client/http" + httpcache "github.com/beatlabs/patron/component/http/cache" + + "github.com/stretchr/testify/assert" +) + +type cacheState struct { + setOps int + getOps int + size int +} + +type builderOperation func(routeBuilder *RouteBuilder) *RouteBuilder + +type arg struct { + bop builderOperation + age httpcache.Age + err bool +} + +func TestCachingMiddleware(t *testing.T) { + + getRequest, err := http.NewRequest("GET", "/test", nil) + assert.NoError(t, err) + + postRequest, err := http.NewRequest("POST", "/test", nil) + assert.NoError(t, err) + + type args struct { + next http.Handler + mws []MiddlewareFunc + } + + testingCache := newTestingCache() + testingCache.instant = httpcache.NowSeconds + + cache, errs := httpcache.NewRouteCache(testingCache, httpcache.Age{Max: 1 * time.Second}) + assert.Empty(t, errs) + + tests := []struct { + name string + args args + r *http.Request + expectedCode int + expectedBody string + cacheState cacheState + }{ + {"caching middleware with POST request", args{next: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(202) + i, err := w.Write([]byte{1, 2, 3, 4}) + assert.NoError(t, err) + assert.Equal(t, 4, i) + }), mws: []MiddlewareFunc{NewCachingMiddleware(cache)}}, + postRequest, 202, "\x01\x02\x03\x04", cacheState{}}, + {"caching middleware with GET request", args{next: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + i, err := w.Write([]byte{1, 2, 3, 4}) + assert.NoError(t, err) + assert.Equal(t, 4, i) + }), mws: []MiddlewareFunc{NewCachingMiddleware(cache)}}, + getRequest, 200, "\x01\x02\x03\x04", cacheState{ + setOps: 1, + getOps: 1, + size: 1, + }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rc := httptest.NewRecorder() + rw := newResponseWriter(rc) + tt.args.next = MiddlewareChain(tt.args.next, tt.args.mws...) + tt.args.next.ServeHTTP(rw, tt.r) + assert.Equal(t, tt.expectedCode, rw.Status()) + assert.Equal(t, tt.expectedBody, rc.Body.String()) + assertCacheState(t, *testingCache, tt.cacheState) + }) + } + +} + +func TestNewRouteBuilder_WithCache(t *testing.T) { + + args := []arg{ + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + age: httpcache.Age{Max: 10}, + }, + // error with '0' ttl + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + age: httpcache.Age{Min: 10, Max: 1}, + err: true, + }, + // error for POST method + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodPost() + }, + age: httpcache.Age{Max: 10}, + err: true, + }, + } + + c := newTestingCache() + + processor := func(context context.Context, request *Request) (response *Response, e error) { + return nil, nil + } + + handler := func(writer http.ResponseWriter, i *http.Request) { + } + + for _, arg := range args { + + assertRouteBuilder(t, arg, NewRouteBuilder("/", processor), c) + + assertRouteBuilder(t, arg, NewRawRouteBuilder("/", handler), c) + + } +} + +func assertRouteBuilder(t *testing.T, arg arg, routeBuilder *RouteBuilder, cache cache.TTLCache) { + + routeBuilder.WithRouteCache(cache, arg.age) + + if arg.bop != nil { + routeBuilder = arg.bop(routeBuilder) + } + + route, err := routeBuilder.Build() + assert.NotNil(t, route) + + if arg.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } +} + +func TestRouteCacheImplementation_WithSingleRequest(t *testing.T) { + + cc := newTestingCache() + cc.instant = httpcache.NowSeconds + + var executions uint32 + + preWrapper := newMiddlewareWrapper(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("pre-middleware-header", "pre") + }) + + postWrapper := newMiddlewareWrapper(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("post-middleware-header", "post") + }) + + routeBuilder := NewRouteBuilder("/path", func(context context.Context, request *Request) (response *Response, e error) { + atomic.AddUint32(&executions, 1) + newResponse := NewResponse("body") + newResponse.Header["Custom-Header"] = "11" + return newResponse, nil + }). + WithRouteCache(cc, httpcache.Age{Max: 10 * time.Second}). + WithMiddlewares(preWrapper.middleware, postWrapper.middleware). + MethodGet() + + ctx, cln := context.WithTimeout(context.Background(), 5*time.Second) + + port := 50023 + runRoute(ctx, t, routeBuilder, port) + + assertResponse(ctx, t, []http.Response{ + { + Header: map[string][]string{ + httpcache.HeaderCacheControl: {"max-age=10"}, + "Content-Type": {"application/json; charset=utf-8"}, + "Content-Length": {"6"}, + "Post-Middleware-Header": {"post"}, + "Pre-Middleware-Header": {"pre"}, + "Custom-Header": {"11"}, + }, + Body: &bodyReader{body: "\"body\""}, + }, + { + Header: map[string][]string{ + httpcache.HeaderCacheControl: {"max-age=10"}, + "Content-Type": {"application/json; charset=utf-8"}, + "Content-Length": {"6"}, + "Post-Middleware-Header": {"post"}, + "Pre-Middleware-Header": {"pre"}, + "Custom-Header": {"11"}, + }, + Body: &bodyReader{body: "\"body\""}, + }, + }, port) + + assertCacheState(t, *cc, cacheState{ + setOps: 1, + getOps: 2, + size: 1, + }) + + assert.Equal(t, 2, preWrapper.invocations) + assert.Equal(t, 2, postWrapper.invocations) + + assert.Equal(t, executions, uint32(1)) + + cln() +} + +func TestRouteCacheAsMiddleware_WithSingleRequest(t *testing.T) { + + cc := newTestingCache() + cc.instant = httpcache.NowSeconds + + var executions uint32 + + preWrapper := newMiddlewareWrapper(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("pre-middleware-header", "pre") + }) + + postWrapper := newMiddlewareWrapper(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("post-middleware-header", "post") + }) + + routeCache, errs := httpcache.NewRouteCache(cc, httpcache.Age{Max: 10 * time.Second}) + assert.Empty(t, errs) + routeBuilder := NewRouteBuilder("/path", func(context context.Context, request *Request) (response *Response, e error) { + atomic.AddUint32(&executions, 1) + newResponse := NewResponse("body") + newResponse.Header["internal-handler-header"] = "header" + return newResponse, nil + }). + WithMiddlewares( + preWrapper.middleware, + NewCachingMiddleware(routeCache), + postWrapper.middleware). + MethodGet() + + ctx, cln := context.WithTimeout(context.Background(), 5*time.Second) + + port := 50023 + runRoute(ctx, t, routeBuilder, port) + + assertResponse(ctx, t, []http.Response{ + { + Header: map[string][]string{ + httpcache.HeaderCacheControl: {"max-age=10"}, + "Content-Type": {"application/json; charset=utf-8"}, + "Content-Length": {"6"}, + "Post-Middleware-Header": {"post"}, + "Pre-Middleware-Header": {"pre"}, + "Internal-Handler-Header": {"header"}, + }, + Body: &bodyReader{body: "\"body\""}, + }, + { + Header: map[string][]string{ + httpcache.HeaderCacheControl: {"max-age=10"}, + "Content-Type": {"application/json; charset=utf-8"}, + "Post-Middleware-Header": {"post"}, + "Pre-Middleware-Header": {"pre"}, + "Content-Length": {"6"}, + "Internal-Handler-Header": {"header"}, + }, + Body: &bodyReader{body: "\"body\""}, + }, + }, port) + + assertCacheState(t, *cc, cacheState{ + setOps: 1, + getOps: 2, + size: 1, + }) + + assert.Equal(t, 2, preWrapper.invocations) + // NOTE : the post middleware is not executed, as it s hidden behind the cache + assert.Equal(t, 1, postWrapper.invocations) + + assert.Equal(t, executions, uint32(1)) + + cln() + +} + +type middlewareWrapper struct { + middleware MiddlewareFunc + invocations int +} + +func newMiddlewareWrapper(middlewareFunc func(w http.ResponseWriter, r *http.Request)) *middlewareWrapper { + wrapper := &middlewareWrapper{} + wrapper.middleware = func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wrapper.invocations++ + middlewareFunc(w, r) + next.ServeHTTP(w, r) + }) + } + return wrapper +} + +func TestRawRouteCacheImplementation_WithSingleRequest(t *testing.T) { + + cc := newTestingCache() + cc.instant = httpcache.NowSeconds + + var executions uint32 + + preWrapper := newMiddlewareWrapper(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("pre-middleware-header", "pre") + }) + + postWrapper := newMiddlewareWrapper(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("post-middleware-header", "post") + }) + + routeBuilder := NewRawRouteBuilder("/path", func(writer http.ResponseWriter, request *http.Request) { + atomic.AddUint32(&executions, 1) + i, err := writer.Write([]byte("\"body\"")) + writer.Header().Set("internal-handler-header", "header") + assert.NoError(t, err) + assert.True(t, i > 0) + }). + WithRouteCache(cc, httpcache.Age{Max: 10 * time.Second}). + WithMiddlewares(preWrapper.middleware, postWrapper.middleware). + MethodGet() + + ctx, cln := context.WithTimeout(context.Background(), 5*time.Second) + + port := 50024 + runRoute(ctx, t, routeBuilder, port) + + assertResponse(ctx, t, []http.Response{ + { + Header: map[string][]string{ + httpcache.HeaderCacheControl: {"max-age=10"}, + "Content-Type": {"text/plain; charset=utf-8"}, + "Content-Length": {"6"}, + "Post-Middleware-Header": {"post"}, + "Pre-Middleware-Header": {"pre"}, + "Internal-Handler-Header": {"header"}}, + Body: &bodyReader{body: "\"body\""}, + }, + { + Header: map[string][]string{ + httpcache.HeaderCacheControl: {"max-age=10"}, + "Content-Type": {"text/plain; charset=utf-8"}, + "Content-Length": {"6"}, + "Post-Middleware-Header": {"post"}, + "Pre-Middleware-Header": {"pre"}, + "Internal-Handler-Header": {"header"}}, + Body: &bodyReader{body: "\"body\""}, + }, + }, port) + + assertCacheState(t, *cc, cacheState{ + setOps: 1, + getOps: 2, + size: 1, + }) + + assert.Equal(t, 2, preWrapper.invocations) + assert.Equal(t, 2, postWrapper.invocations) + + assert.Equal(t, executions, uint32(1)) + + cln() + +} + +type bodyReader struct { + body string +} + +func (br *bodyReader) Read(p []byte) (n int, err error) { + var c int + for i, b := range []byte(br.body) { + p[i] = b + c = i + } + return c + 1, nil +} + +func (br *bodyReader) Close() error { + // nothing to do + return nil +} + +func runRoute(ctx context.Context, t *testing.T, routeBuilder *RouteBuilder, port int) { + cmp, err := NewBuilder().WithRoutesBuilder(NewRoutesBuilder().Append(routeBuilder)).WithPort(port).Create() + + assert.NoError(t, err) + assert.NotNil(t, cmp) + + go func() { + err = cmp.Run(ctx) + assert.NoError(t, err) + }() + + var lwg sync.WaitGroup + lwg.Add(1) + go func() { + cl, err := httpclient.New() + assert.NoError(t, err) + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d/ready", port), nil) + assert.NoError(t, err) + for { + select { + case <-ctx.Done(): + return + default: + r, err := cl.Do(ctx, req) + if err == nil && r != nil { + lwg.Done() + return + } + } + } + }() + lwg.Wait() +} + +func assertResponse(ctx context.Context, t *testing.T, expected []http.Response, port int) { + + cl, err := httpclient.New() + assert.NoError(t, err) + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost:%d/path", port), nil) + assert.NoError(t, err) + + for _, expectedResponse := range expected { + response, err := cl.Do(ctx, req) + + assert.NoError(t, err) + + for k, v := range expectedResponse.Header { + assert.Equal(t, v, response.Header[k]) + } + assert.Equal(t, expectedResponse.Header.Get(httpcache.HeaderCacheControl), response.Header.Get(httpcache.HeaderCacheControl)) + assert.True(t, response.Header.Get(httpcache.HeaderETagHeader) != "") + expectedPayload := make([]byte, 6) + i, err := expectedResponse.Body.Read(expectedPayload) + assert.NoError(t, err) + + responsePayload := make([]byte, 6) + j, err := response.Body.Read(responsePayload) + assert.Error(t, err) + + assert.Equal(t, i, j) + assert.Equal(t, expectedPayload, responsePayload) + } + +} + +func assertCacheState(t *testing.T, cache testingCache, cacheState cacheState) { + assert.Equal(t, cacheState.setOps, cache.setCount) + assert.Equal(t, cacheState.getOps, cache.getCount) + assert.Equal(t, cacheState.size, cache.size()) +} + +type testingCacheEntity struct { + v interface{} + ttl int64 + t0 int64 +} + +type testingCache struct { + cache map[string]testingCacheEntity + getCount int + setCount int + getErr error + setErr error + instant func() int64 +} + +func newTestingCache() *testingCache { + return &testingCache{cache: make(map[string]testingCacheEntity)} +} + +func (t *testingCache) Get(key string) (interface{}, bool, error) { + t.getCount++ + if t.getErr != nil { + return nil, false, t.getErr + } + r, ok := t.cache[key] + if t.instant()-r.t0 > r.ttl { + return nil, false, nil + } + return r.v, ok, nil +} + +func (t *testingCache) Purge() error { + for k := range t.cache { + _ = t.Remove(k) + } + return nil +} + +func (t *testingCache) Remove(key string) error { + delete(t.cache, key) + return nil +} + +// Note : this method will effectively not cache anything +// e.g. testingCacheEntity.t is `0` +func (t *testingCache) Set(key string, value interface{}) error { + t.setCount++ + if t.setErr != nil { + return t.getErr + } + t.cache[key] = testingCacheEntity{ + v: value, + } + return nil +} + +func (t *testingCache) SetTTL(key string, value interface{}, ttl time.Duration) error { + t.setCount++ + if t.setErr != nil { + return t.getErr + } + t.cache[key] = testingCacheEntity{ + v: value, + ttl: int64(ttl / time.Second), + t0: t.instant(), + } + return nil +} + +func (t *testingCache) size() int { + return len(t.cache) +} diff --git a/component/http/route_test.go b/component/http/route_test.go index c925dcfbf..25aa25d87 100644 --- a/component/http/route_test.go +++ b/component/http/route_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/beatlabs/patron/component/http/auth" + "github.com/beatlabs/patron/component/http/cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -161,6 +162,16 @@ func TestRouteBuilder_WithAuth(t *testing.T) { } } +func TestRouteBuilder_WithRouteCacheNil(t *testing.T) { + + rb := NewRawRouteBuilder("/", func(writer http.ResponseWriter, request *http.Request) {}). + WithRouteCache(nil, cache.Age{Max: 1}) + + assert.Len(t, rb.errors, 1) + assert.EqualError(t, rb.errors[0], "route cache is nil") + +} + func TestRouteBuilder_Build(t *testing.T) { mockAuth := &MockAuthenticator{} mockProcessor := func(context.Context, *Request) (*Response, error) { return nil, nil } diff --git a/examples/README.md b/examples/README.md index 65b7fef07..5bb616c65 100644 --- a/examples/README.md +++ b/examples/README.md @@ -36,6 +36,13 @@ The processing will be kicked of by sending a request to the HTTP component. The - Receives a request from service 5 - Responds to service 5 +## Service 7 + +- receives a raw http request +- returns the 7th minute unix interval for the current server time +- first makes a request to seventh +- seventh responds with the timing information + Since tracing instrumentation is in place we can observer the flow in Jaeger. ## Prerequisites @@ -59,10 +66,11 @@ docker-compose down ## Running the examples -When the services started with Docker Compose are ready, you will need to start each of the five +When the services started with Docker Compose are ready, you will need to start each of the seven examples in order: ```shell +go run examples/seventh/main.go go run examples/first/main.go go run examples/second/main.go go run examples/third/main.go @@ -77,4 +85,4 @@ and then send a sample request: ./start_processing.sh ``` -After that head over to [jaeger](http://localhost:16686/search) and [prometheus](http://localhost:9090/graph). \ No newline at end of file +After that head over to [jaeger](http://localhost:16686/search) and [prometheus](http://localhost:9090/graph). diff --git a/examples/first/main.go b/examples/first/main.go index a9f4d1312..d30c0d5c5 100644 --- a/examples/first/main.go +++ b/examples/first/main.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "fmt" + "io/ioutil" "net/http" "os" + "regexp" "time" "github.com/beatlabs/patron" @@ -69,9 +71,16 @@ func main() { func first(ctx context.Context, req *patronhttp.Request) (*patronhttp.Response, error) { + timing, err := DoTimingRequest(ctx) + if err != nil { + log.FromContext(ctx).Infof("first: failed to get timing information %v: could it be that the seventh service is not running ?", err) + } else { + log.FromContext(ctx).Infof("first: pipeline initiated at: %s", timing) + } + var u examples.User - err := req.Decode(&u) + err = req.Decode(&u) if err != nil { return nil, fmt.Errorf("failed to decode request: %w", err) } @@ -96,7 +105,35 @@ func first(ctx context.Context, req *patronhttp.Request) (*patronhttp.Response, if err != nil { return nil, fmt.Errorf("failed to post to second service: %w", err) } - log.FromContext(ctx).Infof("request processed: %s %s", u.GetFirstname(), u.GetLastname()) return patronhttp.NewResponse(fmt.Sprintf("got %s from second HTTP route", rsp.Status)), nil } + +// DoTimingRequest is a helper method to make a request to the seventh example service from other examples +func DoTimingRequest(ctx context.Context) (string, error) { + request, err := http.NewRequest("GET", "http://localhost:50006/", nil) + if err != nil { + return "", fmt.Errorf("failed create route request: %w", err) + } + cl, err := clienthttp.New(clienthttp.Timeout(5 * time.Second)) + if err != nil { + return "", fmt.Errorf("could not create http client: %w", err) + } + + response, err := cl.Do(ctx, request) + if err != nil { + return "", fmt.Errorf("failed create get to seventh service: %w", err) + } + + tb, err := ioutil.ReadAll(response.Body) + if err != nil { + return "", fmt.Errorf("failed to decode timing response body: %w", err) + } + + var rgx = regexp.MustCompile(`\((.*?)\)`) + timeInstance := rgx.FindStringSubmatch(string(tb)) + if len(timeInstance) == 1 { + return "", fmt.Errorf("could not match timeinstance from response %s", string(tb)) + } + return timeInstance[1], nil +} diff --git a/examples/seventh/main.go b/examples/seventh/main.go new file mode 100644 index 000000000..4e9c2ee15 --- /dev/null +++ b/examples/seventh/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "github.com/beatlabs/patron" + "github.com/beatlabs/patron/cache/redis" + patronhttp "github.com/beatlabs/patron/component/http" + httpcache "github.com/beatlabs/patron/component/http/cache" +) + +func init() { + err := os.Setenv("PATRON_LOG_LEVEL", "debug") + if err != nil { + fmt.Printf("failed to set log level env var: %v", err) + os.Exit(1) + } + err = os.Setenv("PATRON_JAEGER_SAMPLER_PARAM", "1.0") + if err != nil { + fmt.Printf("failed to set sampler env vars: %v", err) + os.Exit(1) + } + err = os.Setenv("PATRON_HTTP_DEFAULT_PORT", "50006") + if err != nil { + fmt.Printf("failed to set default patron port env vars: %v", err) + os.Exit(1) + } +} + +func main() { + name := "seventh" + version := "1.0.0" + + err := patron.SetupLogging(name, version) + if err != nil { + fmt.Printf("failed to set up logging: %v", err) + os.Exit(1) + } + + ctx := context.Background() + + cache, err := redis.New(ctx, redis.Options{}) + if err != nil { + fmt.Printf("failed to set up redis cache: %v", err) + os.Exit(1) + } + + routesBuilder := patronhttp.NewRoutesBuilder(). + Append(patronhttp.NewRouteBuilder("/", seventh). + WithRouteCache(cache, httpcache.Age{ + // we wont allow to override the cache more than once per 15 seconds + Min: 15 * time.Second, + // by default we might send stale response for up to 1 minute + Max: 60 * time.Second, + }). + MethodGet()) + + sig := func() { + fmt.Println("exit gracefully...") + os.Exit(0) + } + + err = patron.New(name, version). + WithRoutesBuilder(routesBuilder). + WithSIGHUP(sig). + Run(ctx) + if err != nil { + log.Fatalf("failed to create and run service %v", err) + } +} + +// seventh gives the 7 minute interval of the current unix timestamp +// since the response will be the same for the next 7 minutes, it s a good use-case to apply caching +func seventh(ctx context.Context, req *patronhttp.Request) (*patronhttp.Response, error) { + now := time.Now() + minutes := now.Unix() / 60 + minuteInterval := minutes / 7 + return patronhttp.NewResponse(fmt.Sprintf("current unix 7-minute interval is (%d) called at %v", minuteInterval, now.Unix())), nil +}