diff --git a/README.md b/README.md index 98d00e3a2a..72f4af2015 100644 --- a/README.md +++ b/README.md @@ -241,6 +241,12 @@ This is transparent to the server. As long as a key cannot be found in the cache the server will execute the route processor function and fill the corresponding cache entry ``` +``` +Note : When a cache is used, the handler execution might be skipped. +That implies that all generic handler functionalities MUST be delegated to a custom middleware. +i.e. counting number of server client requests etc ... +``` + #### client cache-control The client can control the cache with the appropriate Headers - `max-age=?` @@ -271,6 +277,11 @@ expects any response that is found in the cache, otherwise returns an empty resp - https://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html - https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9 +#### improvement considerations +- we can split the storing of the cached objects and the age parameter, to avoid loading the whole object in memory, +if the object is already expired. This would provide considerable performance (in terms of memory utilisation) +improvement for big response objects. + ### Asynchronous The implementation of the async processor follows exactly the same principle as the sync processor. diff --git a/sync/http/cache.go b/component/http/cache.go similarity index 63% rename from sync/http/cache.go rename to component/http/cache.go index f598db19c8..533f9714ad 100644 --- a/sync/http/cache.go +++ b/component/http/cache.go @@ -1,22 +1,32 @@ package http import ( - "context" + "encoding/json" "fmt" "hash/crc32" + "net/http" "strconv" "strings" - "time" "github.com/beatlabs/patron/log" - "github.com/beatlabs/patron/sync" ) -// TODO : add comments where applicable +// TODO : wrap up implementation type CacheHeader int +type validationContext int + const ( + // validation due to normal expiry in terms of ttl setting + ttlValidation validationContext = iota + 1 + // maxAgeValidation represents a validation , happening due to max-age header requirements + maxAgeValidation + // minFreshValidation represents a validation , happening due to min-fresh header requirements + minFreshValidation + // maxStaleValidation represents a validation , happening due to max-stale header requirements + maxStaleValidation + // cacheControlHeader is the header key for cache related values // note : it is case-sensitive cacheControlHeader = "Cache-Control" @@ -61,11 +71,11 @@ const ( type TimeInstant func() int64 // validator is a conditional function on an objects age and the configured ttl -type validator func(age, ttl int64) bool +type validator func(age, ttl int64) (bool, validationContext) // expiryCheck is the main validator that checks that the entry has not expired e.g. is stale -var expiryCheck validator = func(age, ttl int64) bool { - return age <= ttl +var expiryCheck validator = func(age, ttl int64) (bool, validationContext) { + return age <= ttl, ttlValidation } // cacheControl is the model of the request parameters regarding the cache control @@ -76,79 +86,122 @@ type cacheControl struct { expiryValidator validator } -// cacheHandler wraps the handler func with a cache layer -// hnd is the processor func that the cache will wrap -// rc is the route cache implementation to be used -func cacheHandler(hnd sync.ProcessorFunc, rc *routeCache) sync.ProcessorFunc { +// cacheHandlerResponse is the dedicated response object for the cache handler +type cacheHandlerResponse struct { + payload interface{} + bytes []byte + header map[string]string +} + +// cacheHandlerRequest is the dedicated request object for the cache handler +type cacheHandlerRequest struct { + header string + path string + query string +} + +// fromRequest transforms the Request object to the cache handler request +func (r *cacheHandlerRequest) fromRequest(path string, req *Request) { + r.path = path + if req.Headers != nil { + r.header = req.Headers[cacheControlHeader] + } + if req.Fields != nil { + if query, err := json.Marshal(req.Fields); err == nil { + r.query = string(query) + } + } +} + +// fromRequest transforms the http Request object to the cache handler request +func (r *cacheHandlerRequest) fromHTTPRequest(req *http.Request) { + if req.Header != nil { + r.header = req.Header.Get(cacheControlHeader) + } + if req.URL != nil { + r.path = req.URL.Path + r.query = req.URL.RawQuery + } +} - return func(ctx context.Context, request *sync.Request) (response *sync.Response, e error) { +// executor is the function returning a cache response object from the underlying implementation +type executor func(now int64, key string) *cachedResponse - responseHeaders := make(map[string]string) +// cacheHandler wraps the an execution logic with a cache layer +// exec is the processor func that the cache will wrap +// rc is the route cache implementation to be used +func cacheHandler(exec executor, rc *routeCache) func(request *cacheHandlerRequest) (response *cacheHandlerResponse, e error) { + + return func(request *cacheHandlerRequest) (response *cacheHandlerResponse, e error) { now := rc.instant() - cfg, warning := extractCacheHeaders(request, rc.minAge, rc.maxFresh) + cfg, warning := extractCacheHeaders(request.header, rc.minAge, rc.maxFresh) if cfg.expiryValidator == nil { cfg.expiryValidator = expiryCheck } - key := extractRequestKey(rc.path, request) - - // TODO : add metrics - + key := extractRequestKey(request.path, request.query) var rsp *cachedResponse var fromCache bool // explore the cache if cfg.noCache && !rc.staleResponse { // need to execute the handler always - rsp = handlerExecutor(ctx, request, hnd, now, key) + rsp = exec(now, key) } else { // lets check the cache if we have anything for the given key if rsp = cacheRetriever(key, rc, now); rsp == nil { + rc.metrics.miss(key) // we have not encountered this key before - rsp = handlerExecutor(ctx, request, hnd, now, key) + rsp = exec(now, key) } else { - expiry := int64(rc.ttl / time.Second) - if !isValid(expiry, now, rsp.lastValid, append(cfg.validators, cfg.expiryValidator)...) { - tmpRsp := handlerExecutor(ctx, request, hnd, now, key) + expiry := rc.ttl + age := now - rsp.lastValid + if isValid, cx := isValid(age, expiry, append(cfg.validators, cfg.expiryValidator)...); !isValid { + tmpRsp := exec(now, key) + // if we could not generate a fresh response, serve the last cached value, + // with a warning header if rc.staleResponse && tmpRsp.err != nil { warning = "last-valid" fromCache = true + rc.metrics.hit(key) } else { rsp = tmpRsp + rc.metrics.evict(key, cx, age) } } else { fromCache = true + rc.metrics.hit(key) } } } // TODO : use the forceCache parameter if cfg.forceCache { // return empty response if we have rc-only responseHeaders present - return sync.NewResponse([]byte{}), nil + return &cacheHandlerResponse{payload: []byte{}}, nil } response = rsp.response e = rsp.err - // TODO : abstract into method - if e == nil { - responseHeaders[eTagHeader] = rsp.etag - - responseHeaders[cacheControlHeader] = genCacheControlHeader(rc.ttl, now-rsp.lastValid) - - if warning != "" && fromCache { - responseHeaders[warningHeader] = warning - } - - response.Headers = responseHeaders - } - - // we cache response only if we did not retrieve it from the cache itself and error is nil + // we cache response only if we did not retrieve it from the cache itself + // and if there was no error if !fromCache && e == nil { if err := rc.cache.Set(key, rsp); err != nil { log.Errorf("could not cache response for request key %s %v", key, err) } + rc.metrics.add(key) + } + + // TODO : abstract into method + if e == nil { + response.header[eTagHeader] = rsp.etag + response.header[cacheControlHeader] = createCacheControlHeader(rc.ttl, now-rsp.lastValid) + if warning != "" && fromCache { + response.header[warningHeader] = warning + } else { + delete(response.header, warningHeader) + } } return @@ -170,27 +223,12 @@ var cacheRetriever = func(key string, rc *routeCache, now int64) *cachedResponse return nil } -// handlerExecutor is the function that will create a new cachedResponse from based on the handler implementation -var handlerExecutor = func(ctx context.Context, request *sync.Request, hnd sync.ProcessorFunc, now int64, key string) *cachedResponse { - response, err := hnd(ctx, request) - return &cachedResponse{ - response: response, - lastValid: now, - etag: genETag([]byte(key), time.Now().Nanosecond()), - err: err, - } -} - // extractCacheHeaders extracts the client request headers allowing the client some control over the cache -func extractCacheHeaders(request *sync.Request, minAge, maxFresh uint) (*cacheControl, string) { - if CacheControl, ok := request.Headers[cacheControlHeader]; ok { - return extractCacheHeader(CacheControl, minAge, maxFresh) +func extractCacheHeaders(header string, minAge, maxFresh int64) (*cacheControl, string) { + if header == "" { + return &cacheControl{noCache: minAge == 0 && maxFresh == 0}, "" } - // if we have no headers we assume we dont want to cache, - return &cacheControl{noCache: minAge == 0 && maxFresh == 0}, "" -} -func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, string) { cfg := cacheControl{ validators: make([]validator, 0), } @@ -198,7 +236,7 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s var warning string wrn := make([]string, 0) - for _, header := range strings.Split(headers, ",") { + for _, header := range strings.Split(header, ",") { keyValue := strings.Split(header, "=") headerKey := strings.ToLower(keyValue[0]) switch headerKey { @@ -216,8 +254,8 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue) value = 0 } - cfg.expiryValidator = func(age, ttl int64) bool { - return ttl-age+value >= 0 + cfg.expiryValidator = func(age, ttl int64) (bool, validationContext) { + return ttl-age+value >= 0, maxStaleValidation } case maxAge: /** @@ -231,12 +269,12 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue) value = 0 } - value, adjusted := min(value, int64(minAge)) + value, adjusted := min(value, minAge) if adjusted { wrn = append(wrn, fmt.Sprintf("max-age=%d", minAge)) } - cfg.validators = append(cfg.validators, func(age, ttl int64) bool { - return age <= value + cfg.validators = append(cfg.validators, func(age, ttl int64) (bool, validationContext) { + return age <= value, maxAgeValidation }) case minFresh: /** @@ -251,12 +289,12 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue) value = 0 } - value, adjusted := max(value, int64(maxFresh)) + value, adjusted := max(value, maxFresh) if adjusted { wrn = append(wrn, fmt.Sprintf("min-fresh=%d", maxFresh)) } - cfg.validators = append(cfg.validators, func(age, ttl int64) bool { - return ttl-age >= value + cfg.validators = append(cfg.validators, func(age, ttl int64) (bool, validationContext) { + return ttl-age >= value, minFreshValidation }) case noCache: /** @@ -280,7 +318,6 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s log.Warnf("unrecognised cache header %s", header) } } - if len(wrn) > 0 { warning = strings.Join(wrn, ",") } @@ -313,37 +350,36 @@ func parseValue(keyValue []string) (int64, bool) { } type cachedResponse struct { - response *sync.Response + response *cacheHandlerResponse lastValid int64 etag string err error } -func extractRequestKey(path string, request *sync.Request) string { - return fmt.Sprintf("%s:%s", path, request.Fields) +func extractRequestKey(path, query string) string { + return fmt.Sprintf("%s:%s", path, query) } -func isValid(ttl, now, last int64, validators ...validator) bool { +func isValid(age, ttl int64, validators ...validator) (bool, validationContext) { if len(validators) == 0 { - return false + return false, 0 } - age := now - last for _, validator := range validators { - if !validator(age, ttl) { - return false + if isValid, cx := validator(age, ttl); !isValid { + return false, cx } } - return true + return true, 0 } -func genETag(key []byte, t int) string { +func generateETag(key []byte, t int) string { return fmt.Sprintf("%d-%d", crc32.ChecksumIEEE(key), t) } -func genCacheControlHeader(ttl time.Duration, lastValid int64) string { - maxAge := int64(ttl/time.Second) - lastValid - if maxAge <= 0 { - return fmt.Sprintf("%s", mustRevalidate) +func createCacheControlHeader(ttl, lastValid int64) string { + mAge := ttl - lastValid + if mAge < 0 { + return mustRevalidate } - return fmt.Sprintf("%s=%d", maxAge, int64(ttl/time.Second)-lastValid) + return fmt.Sprintf("%s=%d", maxAge, ttl-lastValid) } diff --git a/component/http/cache_builder.go b/component/http/cache_builder.go new file mode 100644 index 0000000000..66d74a6c9b --- /dev/null +++ b/component/http/cache_builder.go @@ -0,0 +1,130 @@ +package http + +import ( + "errors" + "fmt" + "time" + + "github.com/beatlabs/patron/cache" + errs "github.com/beatlabs/patron/errors" +) + +// RouteCacheBuilder is the builder needed to build a cache for the corresponding route +type RouteCacheBuilder struct { + cache cache.Cache + instant TimeInstant + ttl time.Duration + minAge time.Duration + maxFresh time.Duration + staleResponse bool + metrics cacheMetrics + errors []error +} + +// NewRouteCacheBuilder creates a new builder for the route cache implementation. +func NewRouteCacheBuilder(cache cache.Cache, ttl time.Duration) *RouteCacheBuilder { + + var ee []error + + if ttl <= 0 { + ee = append(ee, errors.New("time to live must be greater than `0`")) + } + + return &RouteCacheBuilder{ + cache: cache, + ttl: ttl, + instant: func() int64 { + return time.Now().Unix() + }, + metrics: NewVoidMetrics(), + errors: ee, + } +} + +// WithTimeInstant specifies a time instant function for checking expiry. +func (cb *RouteCacheBuilder) WithTimeInstant(instant TimeInstant) *RouteCacheBuilder { + if instant == nil { + cb.errors = append(cb.errors, errors.New("time instant is nil")) + } + cb.instant = instant + return cb +} + +// WithMinAge adds a minimum age for the cache responses. +// This will avoid cases where a single client with high request rate and no cache control headers might effectively disable the cache +// This means that if this parameter is missing (e.g. is equal to '0' , the cache can effectively be made obsolete in the above scenario) +func (cb *RouteCacheBuilder) WithMinAge(minAge time.Duration) *RouteCacheBuilder { + if minAge <= 0 { + cb.errors = append(cb.errors, fmt.Errorf("min-age cannot be lower or equal to zero '0 < %v'", minAge)) + } + cb.minAge = minAge + return cb +} + +// WithMaxFresh adds a maximum age for the cache responses. +// This will avoid cases where a single client with high request rate and no cache control headers might effectively disable the cache +// This means that if this parameter is very high (e.g. greater than ttl , the cache can effectively be made obsolete in the above scenario) +func (cb *RouteCacheBuilder) WithMaxFresh(maxFresh time.Duration) *RouteCacheBuilder { + if maxFresh > cb.ttl { + cb.errors = append(cb.errors, fmt.Errorf("max-fresh cannot be greater than the time to live '%v <= %v'", maxFresh, cb.ttl)) + } + cb.maxFresh = maxFresh + return cb +} + +// WithStaleResponse allows the cache to return stale responses. +func (cb *RouteCacheBuilder) WithStaleResponse(staleResponse bool) *RouteCacheBuilder { + cb.staleResponse = staleResponse + return cb +} + +// WithMetrics allows the cache to return stale responses. +func (cb *RouteCacheBuilder) WithMetrics(metrics cacheMetrics) *RouteCacheBuilder { + if metrics == nil { + cb.errors = append(cb.errors, errors.New("metrics implementation is nil")) + } + cb.metrics = metrics + return cb +} + +func (cb *RouteCacheBuilder) create(path string) (*routeCache, error) { + if len(cb.errors) > 0 { + return nil, errs.Aggregate(cb.errors...) + } + + if cb.maxFresh == 0 { + cb.maxFresh = cb.ttl + } + + if cb.minAge == 0 { + cb.minAge = cb.ttl + } + + return &routeCache{ + cache: cb.cache, + ttl: int64(cb.ttl / time.Second), + instant: cb.instant, + minAge: int64(cb.minAge / time.Second), + maxFresh: int64(cb.maxFresh / time.Second), + staleResponse: cb.staleResponse, + metrics: cb.metrics, + }, nil +} + +type routeCache struct { + // cache is the cache implementation to be used + cache cache.Cache + // ttl is the time to live for all cached objects + ttl int64 + // instant is the timing function for the cache expiry calculations + instant TimeInstant + // minAge specifies the minimum amount of max-age header value for client cache-control requests + minAge int64 + // max-fresh specifies the maximum amount of min-fresh header value for client cache-control requests + maxFresh int64 + // staleResponse specifies if the server is willing to send stale responses + // if a new response could not be generated for any reason + staleResponse bool + // metrics is the implementation for keeping track of the cache operations + metrics cacheMetrics +} diff --git a/component/http/cache_builder_test.go b/component/http/cache_builder_test.go new file mode 100644 index 0000000000..79b5a2db6d --- /dev/null +++ b/component/http/cache_builder_test.go @@ -0,0 +1,140 @@ +package http + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/beatlabs/patron/cache" + + "github.com/stretchr/testify/assert" +) + +type builderOperation func(routeBuilder *RouteBuilder) *RouteBuilder + +type cacheBuilderOperation func(routeBuilder *RouteCacheBuilder) *RouteCacheBuilder + +type arg struct { + bop builderOperation + cbop cacheBuilderOperation + ttl time.Duration + err bool +} + +func TestNewRouteCacheBuilder(t *testing.T) { + + args := []arg{ + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + ttl: 10, + }, + // error with '0' ttl + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + ttl: 0, + err: true, + }, + // error for POST method + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodPost() + }, + ttl: 10, + err: true, + }, + // error for maxFresh greater than ttl + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + cbop: func(routeBuilder *RouteCacheBuilder) *RouteCacheBuilder { + routeBuilder.WithMaxFresh(10 + 1) + return routeBuilder + }, + ttl: 10, + err: true, + }, + // error for minAge value of '0' + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + cbop: func(routeBuilder *RouteCacheBuilder) *RouteCacheBuilder { + routeBuilder.WithMinAge(0) + return routeBuilder + }, + ttl: 10, + err: true, + }, // error for instant function nil + + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + cbop: func(routeBuilder *RouteCacheBuilder) *RouteCacheBuilder { + routeBuilder.WithTimeInstant(nil) + return routeBuilder + }, + ttl: 10, + err: true, + }, + // error for metrics implementation nil + { + bop: func(routeBuilder *RouteBuilder) *RouteBuilder { + return routeBuilder.MethodGet() + }, + cbop: func(routeBuilder *RouteCacheBuilder) *RouteCacheBuilder { + routeBuilder.WithMetrics(nil) + return routeBuilder + }, + ttl: 10, + err: true, + }, + } + + c := newTestingCache() + + processor := func(context context.Context, request *Request) (response *Response, e error) { + return nil, nil + } + + handler := func(writer http.ResponseWriter, i *http.Request) { + } + + for _, arg := range args { + + assertCacheBuilderBuild(t, arg, NewRouteBuilder("/", processor), c) + + assertCacheBuilderBuild(t, arg, NewRawRouteBuilder("/", handler), c) + + } +} + +func assertCacheBuilderBuild(t *testing.T, arg arg, routeBuilder *RouteBuilder, cache cache.Cache) { + + routeCacheBuilder := NewRouteCacheBuilder(cache, arg.ttl) + + if arg.cbop != nil { + routeCacheBuilder = arg.cbop(routeCacheBuilder) + } + + routeBuilder.WithRouteCachedBuilder(routeCacheBuilder) + + if arg.bop != nil { + routeBuilder = arg.bop(routeBuilder) + } + + route, err := routeBuilder.Build() + assert.NotNil(t, route) + + if arg.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } +} diff --git a/component/http/cache_metrics.go b/component/http/cache_metrics.go new file mode 100644 index 0000000000..1d7f5dd758 --- /dev/null +++ b/component/http/cache_metrics.go @@ -0,0 +1,139 @@ +package http + +import "github.com/prometheus/client_golang/prometheus" + +var validationReason = map[validationContext]string{0: "nil", ttlValidation: "expired", maxAgeValidation: "max_age", minFreshValidation: "min_fresh", maxStaleValidation: "max_stale"} + +type cacheMetrics interface { + add(key string) + miss(key string) + hit(key string) + evict(key string, context validationContext, age int64) + reset() bool +} + +type prometheusMetrics struct { + path string + expiry *prometheus.GaugeVec + ageHistogram *prometheus.HistogramVec + misses *prometheus.CounterVec + additions *prometheus.CounterVec + hits *prometheus.CounterVec + evictions *prometheus.CounterVec +} + +func (m *prometheusMetrics) add(key string) { + m.additions.WithLabelValues(m.path).Inc() +} + +func (m *prometheusMetrics) miss(key string) { + m.misses.WithLabelValues(m.path).Inc() +} + +func (m *prometheusMetrics) hit(key string) { + m.hits.WithLabelValues(m.path).Inc() +} + +func (m *prometheusMetrics) evict(key string, context validationContext, age int64) { + m.ageHistogram.WithLabelValues(m.path).Observe(float64(age)) + m.evictions.WithLabelValues(m.path, validationReason[context]).Inc() +} + +func (m *prometheusMetrics) reset() bool { + exp := prometheus.DefaultRegisterer.Unregister(m.expiry) + hist := prometheus.DefaultRegisterer.Unregister(m.ageHistogram) + miss := prometheus.DefaultRegisterer.Unregister(m.misses) + hits := prometheus.DefaultRegisterer.Unregister(m.hits) + evict := prometheus.DefaultRegisterer.Unregister(m.evictions) + add := prometheus.DefaultRegisterer.Unregister(m.additions) + return exp && hist && miss && evict && hits && add +} + +func NewPrometheusMetrics(path string, expiry int64) *prometheusMetrics { + + histogram := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "expiration", + Help: "Expiry age for evicted objects.", + Buckets: []float64{1, 10, 30, 60, 60 * 5, 60 * 10, 60 * 30, 60 * 60}, + }, []string{"route"}) + + additions := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "adds", + Help: "Number of Added objects to the cache.", + }, []string{"route"}) + + misses := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "misses", + Help: "Number of cache missed.", + }, []string{"route"}) + + hits := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "hits", + Help: "Number of cache hits.", + }, []string{"route"}) + + evictions := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "evicts", + Help: "Number of cache evictions.", + }, []string{"route", "reason"}) + + expiration := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "http_cache", + Subsystem: "handler", + Name: "time_to_live", + Help: "Expiration parameter of the http cache.", + }, []string{"route"}) + + prometheus.MustRegister(histogram, additions, misses, hits, evictions, expiration) + + expiration.WithLabelValues(path).Set(float64(expiry)) + + return &prometheusMetrics{ + path: path, + expiry: expiration, + ageHistogram: histogram, + additions: additions, + misses: misses, + hits: hits, + evictions: evictions, + } + +} + +type voidMetrics struct { +} + +func NewVoidMetrics() *voidMetrics { + return &voidMetrics{} +} + +func (v *voidMetrics) add(key string) { + // do nothing +} + +func (v *voidMetrics) miss(key string) { + // do nothing +} + +func (v *voidMetrics) hit(key string) { + // do nothing +} + +func (v *voidMetrics) evict(key string, context validationContext, age int64) { + // do nothing +} + +func (v *voidMetrics) reset() bool { + // do nothing + return true +} diff --git a/sync/http/cache_test.go b/component/http/cache_test.go similarity index 65% rename from sync/http/cache_test.go rename to component/http/cache_test.go index dfbf9740e7..db9f881a19 100644 --- a/sync/http/cache_test.go +++ b/component/http/cache_test.go @@ -1,12 +1,14 @@ package http import ( - "context" "errors" "os" "strconv" "testing" - "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/beatlabs/patron/cache" @@ -14,8 +16,6 @@ import ( "github.com/beatlabs/patron/log/zerolog" "github.com/stretchr/testify/assert" - - "github.com/beatlabs/patron/sync" ) func TestMain(m *testing.M) { @@ -49,8 +49,8 @@ func TestExtractCacheHeaders(t *testing.T) { // TODO : cover the extract headers functionality from 'real' http header samples - minAge := uint(0) - minFresh := uint(0) + minAge := int64(0) + minFresh := int64(0) params := []args{ { @@ -127,8 +127,8 @@ func TestExtractCacheHeaders(t *testing.T) { } for _, param := range params { - req := sync.NewRequest(map[string]string{}, nil, param.headers, nil) - cfg, wrn := extractCacheHeaders(req, minAge, minFresh) + header := param.headers[cacheControlHeader] + cfg, wrn := extractCacheHeaders(header, minAge, minFresh) assert.Equal(t, param.wrn, wrn) assert.Equal(t, param.cfg.noCache, cfg.noCache) assert.Equal(t, param.cfg.forceCache, cfg.forceCache) @@ -140,34 +140,50 @@ func TestExtractCacheHeaders(t *testing.T) { type routeConfig struct { path string - ttl time.Duration - hnd sync.ProcessorFunc - minAge uint - maxFresh uint + ttl int64 + hnd executor + minAge int64 + maxFresh int64 staleResponse bool } type requestParams struct { + path string header map[string]string fields map[string]string timeInstance int64 } +type metricState struct { + additions int + misses int + evictions int + hits int +} + +func (m *metricState) add(n metricState) { + m.evictions += n.evictions + m.additions += n.additions + m.misses += n.misses + m.hits += n.hits +} + type testArgs struct { routeConfig routeConfig cache cache.Cache requestParams requestParams - response *sync.Response + response *Response + metrics metricState err error } -func testHeader(maxAge int) map[string]string { +func testHeader(maxAge int64) map[string]string { header := make(map[string]string) - header[cacheControlHeader] = genCacheControlHeader(time.Duration(maxAge)*time.Second, 0) + header[cacheControlHeader] = createCacheControlHeader(maxAge, 0) return header } -func testHeaderWithWarning(maxAge int, warning string) map[string]string { +func testHeaderWithWarning(maxAge int64, warning string) map[string]string { h := testHeader(maxAge) h[warningHeader] = warning return h @@ -177,7 +193,7 @@ func TestMinAgeCache_WithoutClientHeader(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 1, // to avoid no-cache staleResponse: false, } @@ -192,8 +208,12 @@ func TestMinAgeCache_WithoutClientHeader(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cache response { @@ -202,8 +222,11 @@ func TestMinAgeCache_WithoutClientHeader(t *testing.T) { timeInstance: 9, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(2)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(2)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // still cached response { @@ -212,8 +235,11 @@ func TestMinAgeCache_WithoutClientHeader(t *testing.T) { timeInstance: 11, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(0)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(0)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response , due to expiry validator 10 + 1 - 12 < 0 { @@ -222,19 +248,23 @@ func TestMinAgeCache_WithoutClientHeader(t *testing.T) { timeInstance: 12, }, routeConfig: rc, - response: &sync.Response{Payload: 120, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 120, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestNoMinAgeCache_WithoutClientHeader(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 0, // min age is set to '0', // this means , without client control headers we will always return a non-cached response // despite the ttl parameter @@ -251,8 +281,11 @@ func TestNoMinAgeCache_WithoutClientHeader(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + }, + err: nil, }, // no cached response { @@ -261,19 +294,22 @@ func TestNoMinAgeCache_WithoutClientHeader(t *testing.T) { timeInstance: 2, }, routeConfig: rc, - response: &sync.Response{Payload: 20, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 20, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestNoMinAgeCache_WithMaxAgeHeader(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 0, staleResponse: false, } @@ -288,8 +324,11 @@ func TestNoMinAgeCache_WithMaxAgeHeader(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + }, + err: nil, }, // cached response, because of the max-age header { @@ -299,8 +338,11 @@ func TestNoMinAgeCache_WithMaxAgeHeader(t *testing.T) { timeInstance: 3, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(8)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(8)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response, because of missing header, and minAge == 0 { @@ -309,8 +351,11 @@ func TestNoMinAgeCache_WithMaxAgeHeader(t *testing.T) { timeInstance: 9, }, routeConfig: rc, - response: &sync.Response{Payload: 90, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 90, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + }, + err: nil, }, // new cached response , because max-age header again // note : because of the cache refresh triggered by the previous call we see the last cached value @@ -321,19 +366,22 @@ func TestNoMinAgeCache_WithMaxAgeHeader(t *testing.T) { timeInstance: 14, }, routeConfig: rc, - response: &sync.Response{Payload: 90, Headers: testHeader(5)}, - err: nil, + response: &Response{Payload: 90, Headers: testHeader(5)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestCache_WithConstantMaxAgeHeader(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 5, staleResponse: false, } @@ -349,8 +397,12 @@ func TestCache_WithConstantMaxAgeHeader(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cached response { @@ -360,8 +412,11 @@ func TestCache_WithConstantMaxAgeHeader(t *testing.T) { timeInstance: 3, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(8)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(8)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response, because max-age > 9 - 1 { @@ -371,8 +426,12 @@ func TestCache_WithConstantMaxAgeHeader(t *testing.T) { timeInstance: 9, }, routeConfig: rc, - response: &sync.Response{Payload: 90, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 90, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, // cached response right before the age threshold max-age == 14 - 9 { @@ -382,8 +441,11 @@ func TestCache_WithConstantMaxAgeHeader(t *testing.T) { timeInstance: 14, }, routeConfig: rc, - response: &sync.Response{Payload: 90, Headers: testHeader(5)}, - err: nil, + response: &Response{Payload: 90, Headers: testHeader(5)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response, because max-age > 15 - 9 { @@ -393,19 +455,23 @@ func TestCache_WithConstantMaxAgeHeader(t *testing.T) { timeInstance: 15, }, routeConfig: rc, - response: &sync.Response{Payload: 150, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 150, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestCache_WithMaxAgeHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 30 * time.Second, + ttl: 30, staleResponse: false, } @@ -419,8 +485,11 @@ func TestCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(30)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(30)}, + metrics: metricState{ + additions: 1, + }, + err: nil, }, // cached response { @@ -430,8 +499,11 @@ func TestCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 10, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(20)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(20)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // cached response { @@ -441,8 +513,11 @@ func TestCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 20, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response { @@ -452,8 +527,12 @@ func TestCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 20, }, routeConfig: rc, - response: &sync.Response{Payload: 200, Headers: testHeader(30)}, - err: nil, + response: &Response{Payload: 200, Headers: testHeader(30)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, // cache response { @@ -463,19 +542,22 @@ func TestCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 25, }, routeConfig: rc, - response: &sync.Response{Payload: 200, Headers: testHeader(25)}, - err: nil, + response: &Response{Payload: 200, Headers: testHeader(25)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestMinAgeCache_WithHighMaxAgeHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 5 * time.Second, + ttl: 5, staleResponse: false, } @@ -489,8 +571,11 @@ func TestMinAgeCache_WithHighMaxAgeHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(5)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(5)}, + metrics: metricState{ + additions: 1, + }, + err: nil, }, // despite the max-age request, the cache will refresh because of it's ttl { @@ -500,19 +585,23 @@ func TestMinAgeCache_WithHighMaxAgeHeaders(t *testing.T) { timeInstance: 6, }, routeConfig: rc, - response: &sync.Response{Payload: 60, Headers: testHeader(5)}, - err: nil, + response: &Response{Payload: 60, Headers: testHeader(5)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestNoMinAgeCache_WithLowMaxAgeHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 30 * time.Second, + ttl: 30, staleResponse: false, } @@ -526,8 +615,11 @@ func TestNoMinAgeCache_WithLowMaxAgeHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(30)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(30)}, + metrics: metricState{ + additions: 1, + }, + err: nil, }, // a max-age=0 request will always refresh the cache, // if there is not minAge limit set @@ -538,19 +630,23 @@ func TestNoMinAgeCache_WithLowMaxAgeHeaders(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(30)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(30)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestMinAgeCache_WithMaxAgeHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 30 * time.Second, + ttl: 30, minAge: 5, staleResponse: false, } @@ -565,8 +661,12 @@ func TestMinAgeCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(30)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(30)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cached response still, because of minAge override // note : max-age=2 gets ignored @@ -577,8 +677,11 @@ func TestMinAgeCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 4, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeaderWithWarning(26, "max-age=5")}, - err: nil, + response: &Response{Payload: 0, Headers: testHeaderWithWarning(26, "max-age=5")}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // cached response because of bigger max-age parameter { @@ -588,8 +691,11 @@ func TestMinAgeCache_WithMaxAgeHeaders(t *testing.T) { timeInstance: 5, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(25)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(25)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response because of minAge floor { @@ -600,19 +706,23 @@ func TestMinAgeCache_WithMaxAgeHeaders(t *testing.T) { }, routeConfig: rc, // note : no warning because it s a new response - response: &sync.Response{Payload: 60, Headers: testHeader(30)}, - err: nil, + response: &Response{Payload: 60, Headers: testHeader(30)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestCache_WithConstantMinFreshHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, staleResponse: false, } @@ -627,8 +737,12 @@ func TestCache_WithConstantMinFreshHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // expecting cache response, as value is still fresh : 5 - 0 == 5 { @@ -638,8 +752,11 @@ func TestCache_WithConstantMinFreshHeaders(t *testing.T) { timeInstance: 5, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(5)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(5)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // expecting new response, as value is not fresh enough { @@ -649,8 +766,12 @@ func TestCache_WithConstantMinFreshHeaders(t *testing.T) { timeInstance: 6, }, routeConfig: rc, - response: &sync.Response{Payload: 60, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 60, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, // cache response, as value is expired : 11 - 6 <= 5 { @@ -660,8 +781,11 @@ func TestCache_WithConstantMinFreshHeaders(t *testing.T) { timeInstance: 11, }, routeConfig: rc, - response: &sync.Response{Payload: 60, Headers: testHeader(5)}, - err: nil, + response: &Response{Payload: 60, Headers: testHeader(5)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // expecting new response { @@ -671,19 +795,23 @@ func TestCache_WithConstantMinFreshHeaders(t *testing.T) { timeInstance: 12, }, routeConfig: rc, - response: &sync.Response{Payload: 120, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 120, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestNoMaxFreshCache_WithExtremeMinFreshHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, staleResponse: false, } @@ -698,8 +826,12 @@ func TestNoMaxFreshCache_WithExtremeMinFreshHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, { requestParams: requestParams{ @@ -708,19 +840,23 @@ func TestNoMaxFreshCache_WithExtremeMinFreshHeaders(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 10, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestMaxFreshCache_WithMinFreshHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, maxFresh: 5, staleResponse: false, @@ -736,8 +872,12 @@ func TestMaxFreshCache_WithMinFreshHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // expecting cache response, as min-fresh is bounded by maxFresh configuration parameter { @@ -747,19 +887,22 @@ func TestMaxFreshCache_WithMinFreshHeaders(t *testing.T) { timeInstance: 5, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeaderWithWarning(5, "min-fresh=5")}, - err: nil, + response: &Response{Payload: 0, Headers: testHeaderWithWarning(5, "min-fresh=5")}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestCache_WithConstantMaxStaleHeader(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, staleResponse: false, } @@ -773,8 +916,12 @@ func TestCache_WithConstantMaxStaleHeader(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cached response { @@ -784,8 +931,11 @@ func TestCache_WithConstantMaxStaleHeader(t *testing.T) { timeInstance: 3, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(7)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(7)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // cached response { @@ -795,8 +945,11 @@ func TestCache_WithConstantMaxStaleHeader(t *testing.T) { timeInstance: 8, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(2)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(2)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // cached response , still stale threshold not breached , 12 - 0 <= 10 + 5 { @@ -807,8 +960,11 @@ func TestCache_WithConstantMaxStaleHeader(t *testing.T) { }, routeConfig: rc, // note : we are also getting a must-revalidate header - response: &sync.Response{Payload: 0, Headers: testHeader(-5)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(-5)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response { @@ -818,19 +974,23 @@ func TestCache_WithConstantMaxStaleHeader(t *testing.T) { timeInstance: 16, }, routeConfig: rc, - response: &sync.Response{Payload: 160, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 160, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestCache_WithMixedHeaders(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 5, staleResponse: false, } @@ -845,8 +1005,12 @@ func TestCache_WithMixedHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // expecting cache response, as value is still fresh : 5 - 0 == min-fresh and still young : 5 - 0 < max-age { @@ -856,8 +1020,11 @@ func TestCache_WithMixedHeaders(t *testing.T) { timeInstance: 5, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(5)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(5)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response, as value is not fresh enough : 6 - 0 > min-fresh { @@ -867,8 +1034,12 @@ func TestCache_WithMixedHeaders(t *testing.T) { timeInstance: 6, }, routeConfig: rc, - response: &sync.Response{Payload: 60, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 60, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, // cached response, as value is still fresh enough and still young { @@ -878,8 +1049,11 @@ func TestCache_WithMixedHeaders(t *testing.T) { timeInstance: 6, }, routeConfig: rc, - response: &sync.Response{Payload: 60, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 60, Headers: testHeader(10)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response, as value is still fresh enough but too old { @@ -889,21 +1063,25 @@ func TestCache_WithMixedHeaders(t *testing.T) { timeInstance: 15, }, routeConfig: rc, - response: &sync.Response{Payload: 150, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 150, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + evictions: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } -func TestStaleCache_WithoutHeaders(t *testing.T) { +func TestStaleCache_WithHandlerErrorWithoutHeaders(t *testing.T) { hndErr := errors.New("error encountered on handler") rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, maxFresh: 10, staleResponse: true, @@ -918,7 +1096,11 @@ func TestStaleCache_WithoutHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, }, { requestParams: requestParams{ @@ -928,27 +1110,32 @@ func TestStaleCache_WithoutHeaders(t *testing.T) { routeConfig: routeConfig{ path: rc.path, ttl: rc.ttl, - hnd: func(i context.Context, i2 *sync.Request) (response *sync.Response, e error) { - return nil, hndErr + hnd: func(now int64, key string) *cachedResponse { + return &cachedResponse{ + err: hndErr, + } }, minAge: rc.minAge, maxFresh: rc.maxFresh, staleResponse: rc.staleResponse, }, - response: &sync.Response{Payload: 0, Headers: testHeaderWithWarning(-1, "last-valid")}, + metrics: metricState{ + hits: 1, + }, + response: &Response{Payload: 0, Headers: testHeaderWithWarning(-1, "last-valid")}, }, }, } - run(t, args) + assertCache(t, args) } -func TestNoStaleCache_WithoutHeaders(t *testing.T) { +func TestNoStaleCache_WithHandlerErrorWithoutHeaders(t *testing.T) { hndErr := errors.New("error encountered on handler") rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, maxFresh: 10, staleResponse: false, @@ -963,7 +1150,11 @@ func TestNoStaleCache_WithoutHeaders(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, }, { requestParams: requestParams{ @@ -973,18 +1164,23 @@ func TestNoStaleCache_WithoutHeaders(t *testing.T) { routeConfig: routeConfig{ path: rc.path, ttl: rc.ttl, - hnd: func(i context.Context, i2 *sync.Request) (response *sync.Response, e error) { - return nil, hndErr + hnd: func(now int64, key string) *cachedResponse { + return &cachedResponse{ + err: hndErr, + } }, minAge: rc.minAge, maxFresh: rc.maxFresh, staleResponse: rc.staleResponse, }, + metrics: metricState{ + evictions: 1, + }, err: hndErr, }, }, } - run(t, args) + assertCache(t, args) } // TODO : test stale response for error (with Warning) @@ -995,12 +1191,14 @@ func TestCache_WithHandlerErr(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, maxFresh: 10, staleResponse: false, - hnd: func(i context.Context, i2 *sync.Request) (response *sync.Response, e error) { - return nil, hndErr + hnd: func(now int64, key string) *cachedResponse { + return &cachedResponse{ + err: hndErr, + } }, } @@ -1013,18 +1211,21 @@ func TestCache_WithHandlerErr(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - err: hndErr, + metrics: metricState{ + misses: 1, + }, + err: hndErr, }, }, } - run(t, args) + assertCache(t, args) } func TestCache_WithCacheGetErr(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, maxFresh: 10, staleResponse: false, @@ -1044,8 +1245,12 @@ func TestCache_WithCacheGetErr(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, + response: &Response{Payload: 0, Headers: testHeader(10)}, cache: cacheImpl, + metrics: metricState{ + additions: 1, + misses: 1, + }, }, // new response, because of cache get error { @@ -1054,12 +1259,16 @@ func TestCache_WithCacheGetErr(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, + response: &Response{Payload: 10, Headers: testHeader(10)}, cache: cacheImpl, + metrics: metricState{ + additions: 1, + misses: 1, + }, }, }, } - run(t, args) + assertCache(t, args) assert.Equal(t, 2, cacheImpl.getCount) assert.Equal(t, 2, cacheImpl.setCount) @@ -1070,7 +1279,7 @@ func TestCache_WithCacheSetErr(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, maxFresh: 10, staleResponse: false, @@ -1090,8 +1299,12 @@ func TestCache_WithCacheSetErr(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, + response: &Response{Payload: 0, Headers: testHeader(10)}, cache: cacheImpl, + metrics: metricState{ + additions: 1, + misses: 1, + }, }, // new response, because of cache get error { @@ -1100,12 +1313,16 @@ func TestCache_WithCacheSetErr(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, + response: &Response{Payload: 10, Headers: testHeader(10)}, cache: cacheImpl, + metrics: metricState{ + additions: 1, + misses: 1, + }, }, }, } - run(t, args) + assertCache(t, args) assert.Equal(t, 2, cacheImpl.getCount) assert.Equal(t, 2, cacheImpl.setCount) @@ -1114,17 +1331,9 @@ func TestCache_WithCacheSetErr(t *testing.T) { func TestCache_WithMixedPaths(t *testing.T) { - rc1 := routeConfig{ - path: "/1", - ttl: 10 * time.Second, - minAge: 10, - maxFresh: 10, - staleResponse: false, - } - - rc2 := routeConfig{ - path: "/2", - ttl: 10 * time.Second, + rc := routeConfig{ + path: "/", + ttl: 10, minAge: 10, maxFresh: 10, staleResponse: false, @@ -1137,51 +1346,69 @@ func TestCache_WithMixedPaths(t *testing.T) { requestParams: requestParams{ fields: map[string]string{"VALUE": "1"}, timeInstance: 0, + path: "/1", }, - routeConfig: rc1, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + routeConfig: rc, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cached response for the same path { requestParams: requestParams{ fields: map[string]string{"VALUE": "1"}, timeInstance: 1, + path: "/1", + }, + routeConfig: rc, + response: &Response{Payload: 0, Headers: testHeader(9)}, + metrics: metricState{ + hits: 1, }, - routeConfig: rc1, - response: &sync.Response{Payload: 0, Headers: testHeader(9)}, - err: nil, + err: nil, }, // initial request for second path { requestParams: requestParams{ fields: map[string]string{"VALUE": "1"}, timeInstance: 1, + path: "/2", }, - routeConfig: rc2, - response: &sync.Response{Payload: 10, Headers: testHeader(10)}, - err: nil, + routeConfig: rc, + response: &Response{Payload: 10, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cached response for second path { requestParams: requestParams{ fields: map[string]string{"VALUE": "1"}, timeInstance: 2, + path: "/2", }, - routeConfig: rc2, - response: &sync.Response{Payload: 10, Headers: testHeader(9)}, - err: nil, + routeConfig: rc, + response: &Response{Payload: 10, Headers: testHeader(9)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } func TestCache_WithMixedRequestParameters(t *testing.T) { rc := routeConfig{ path: "/", - ttl: 10 * time.Second, + ttl: 10, minAge: 10, maxFresh: 10, staleResponse: false, @@ -1196,8 +1423,12 @@ func TestCache_WithMixedRequestParameters(t *testing.T) { timeInstance: 0, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cached response for same request parameter { @@ -1206,8 +1437,11 @@ func TestCache_WithMixedRequestParameters(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 0, Headers: testHeader(9)}, - err: nil, + response: &Response{Payload: 0, Headers: testHeader(9)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, // new response for different request parameter { @@ -1216,8 +1450,12 @@ func TestCache_WithMixedRequestParameters(t *testing.T) { timeInstance: 1, }, routeConfig: rc, - response: &sync.Response{Payload: 20, Headers: testHeader(10)}, - err: nil, + response: &Response{Payload: 20, Headers: testHeader(10)}, + metrics: metricState{ + additions: 1, + misses: 1, + }, + err: nil, }, // cached response for second request parameter { @@ -1226,44 +1464,68 @@ func TestCache_WithMixedRequestParameters(t *testing.T) { timeInstance: 2, }, routeConfig: rc, - response: &sync.Response{Payload: 20, Headers: testHeader(9)}, - err: nil, + response: &Response{Payload: 20, Headers: testHeader(9)}, + metrics: metricState{ + hits: 1, + }, + err: nil, }, }, } - run(t, args) + assertCache(t, args) } // TODO : test no-cache // TODO : test no-store // TODO : test only-if-cached -func run(t *testing.T, args [][]testArgs) { +func assertCache(t *testing.T, args [][]testArgs) { + + chMetrics := NewPrometheusMetrics("path", 0) + chMetrics.reset() // create a test request handler // that returns the current time instant times '10' multiplied by the VALUE parameter in the request - handler := func(timeInstance int64) func(ctx context.Context, request *sync.Request) (*sync.Response, error) { - return func(ctx context.Context, request *sync.Request) (*sync.Response, error) { - i, err := strconv.Atoi(request.Fields["VALUE"]) + exec := func(request requestParams) func(now int64, key string) *cachedResponse { + return func(now int64, key string) *cachedResponse { + i, err := strconv.Atoi(request.fields["VALUE"]) if err != nil { - return nil, err + return &cachedResponse{ + err: err, + } + } + return &cachedResponse{ + response: &cacheHandlerResponse{ + payload: i * 10 * int(request.timeInstance), + header: make(map[string]string), + }, + etag: generateETag([]byte{}, int(now)), + lastValid: request.timeInstance, } - return sync.NewResponse(i * 10 * int(timeInstance)), nil } } // test cache implementation - cacheIml := &testingCache{cache: make(map[string]interface{})} + cacheIml := newTestingCache() + + argMetrics := metricState{} for _, testArg := range args { for _, arg := range testArg { - request := sync.NewRequest(arg.requestParams.fields, nil, arg.requestParams.header, nil) - var hnd sync.ProcessorFunc + path := arg.routeConfig.path + if arg.requestParams.path != "" { + path = arg.requestParams.path + } + + request := &cacheHandlerRequest{} + request.fromRequest(path, NewRequest(arg.requestParams.fields, nil, arg.requestParams.header, nil)) + + var hnd executor if arg.routeConfig.hnd != nil { hnd = arg.routeConfig.hnd } else { - hnd = handler(arg.requestParams.timeInstance) + hnd = exec(arg.requestParams) } var ch cache.Cache @@ -1279,11 +1541,11 @@ func run(t *testing.T, args [][]testArgs) { return arg.requestParams.timeInstance }, ttl: arg.routeConfig.ttl, - path: arg.routeConfig.path, minAge: arg.routeConfig.minAge, maxFresh: arg.routeConfig.maxFresh, staleResponse: arg.routeConfig.staleResponse, - })(context.Background(), request) + metrics: chMetrics, + })(request) if arg.err != nil { assert.Error(t, err) @@ -1292,16 +1554,49 @@ func run(t *testing.T, args [][]testArgs) { } else { assert.NoError(t, err) assert.NotNil(t, response) - assert.Equal(t, arg.response.Payload, response.Payload) - assert.Equal(t, arg.response.Headers[cacheControlHeader], response.Headers[cacheControlHeader]) - assert.Equal(t, arg.response.Headers[warningHeader], response.Headers[warningHeader]) + assert.Equal(t, arg.response.Payload, response.payload) + assert.Equal(t, arg.response.Headers[cacheControlHeader], response.header[cacheControlHeader]) + assert.Equal(t, arg.response.Headers[warningHeader], response.header[warningHeader]) assert.NotNil(t, arg.response.Headers[eTagHeader]) - assert.False(t, response.Headers[eTagHeader] == "") + assert.False(t, response.header[eTagHeader] == "") } + // we provide the diff + argMetrics.add(arg.metrics) + // we assert the sum + assertPrometheusMetrics(t, argMetrics, chMetrics) } } } +func assertPrometheusMetrics(t *testing.T, mState metricState, metrics *prometheusMetrics) { + + assertMetric(t, mState.misses, metrics.misses) + assertMetric(t, mState.additions, metrics.additions) + assertMetric(t, mState.hits, metrics.hits) + assertMetric(t, mState.evictions, metrics.evictions) + +} + +func assertMetric(t *testing.T, value int, c prometheus.Collector) { + if value > 0 { + v := testutil.ToFloat64(c) + assert.Equal(t, float64(value), v) + } else { + assertPanic(t, func() { + testutil.ToFloat64(c) + }) + } +} + +func assertPanic(t *testing.T, exec func()) { + defer func() { + if r := recover(); r == nil { + t.Errorf("The code did not panic") + } + }() + exec() +} + type testingCache struct { cache map[string]interface{} getCount int @@ -1310,6 +1605,10 @@ type testingCache struct { setErr error } +func newTestingCache() *testingCache { + return &testingCache{cache: make(map[string]interface{})} +} + func (t *testingCache) Get(key string) (interface{}, bool, error) { t.getCount++ if t.getErr != nil { @@ -1339,3 +1638,7 @@ func (t *testingCache) Set(key string, value interface{}) error { t.cache[key] = value return nil } + +func (t *testingCache) size() int { + return len(t.cache) +} diff --git a/component/http/component.go b/component/http/component.go index 5455c63c0a..c40be3fc45 100644 --- a/component/http/component.go +++ b/component/http/component.go @@ -56,6 +56,10 @@ func (c *Component) Run(ctx context.Context) error { log.Info("shutting down component") return srv.Shutdown(ctx) case err := <-chFail: + log.Errorf("shutting down component due to error: %v", err) + if shutDownErr := srv.Shutdown(ctx); shutDownErr != nil { + return patronErrors.Aggregate(shutDownErr, err) + } return err } } diff --git a/component/http/handler.go b/component/http/handler.go index af28eae53c..f69258ae11 100644 --- a/component/http/handler.go +++ b/component/http/handler.go @@ -36,9 +36,8 @@ func handler(hnd ProcessorFunc) http.HandlerFunc { h := extractHeaders(r) - // TODO : pass url to the Request req := NewRequest(f, r.Body, h, dec) - // TODO : manage warning error type by adding warning to headers + rsp, err := hnd(ctx, req) if err != nil { handleError(logger, w, enc, err) @@ -139,6 +138,8 @@ func handleSuccess(w http.ResponseWriter, r *http.Request, rsp *Response, enc en w.WriteHeader(http.StatusCreated) } + propagateHeaders(rsp.Headers, w.Header()) + _, err = w.Write(p) return err } diff --git a/component/http/http.go b/component/http/http.go index 7bd7885b6d..786a39dd0a 100644 --- a/component/http/http.go +++ b/component/http/http.go @@ -1,8 +1,10 @@ package http import ( + "bytes" "context" "io" + "net/http" "github.com/beatlabs/patron/encoding" ) @@ -38,3 +40,57 @@ func NewResponse(p interface{}) *Response { // ProcessorFunc definition of a function type for processing sync requests. type ProcessorFunc func(context.Context, *Request) (*Response, error) + +// ResponseReadWriter is a response writer able to read the payload. +type ResponseReadWriter struct { + buffer *bytes.Buffer + len int + header http.Header + statusCode int +} + +// NewResponseReadWriter creates a new ResponseReadWriter. +func NewResponseReadWriter() *ResponseReadWriter { + return &ResponseReadWriter{ + buffer: new(bytes.Buffer), + header: make(http.Header), + } +} + +// Read reads the ResponsereadWriter payload. +func (rw *ResponseReadWriter) Read(p []byte) (n int, err error) { + return rw.buffer.Read(p) +} + +// ReadAll returns the response payload bytes. +func (rw *ResponseReadWriter) ReadAll() ([]byte, error) { + if rw.len == 0 { + // nothing has been written + return []byte{}, nil + } + b := make([]byte, rw.len) + _, err := rw.Read(b) + return b, err +} + +// Header returns the Header object. +func (rw *ResponseReadWriter) Header() http.Header { + return rw.header +} + +// Write writes the provied bytes to the byte buffer. +func (rw *ResponseReadWriter) Write(p []byte) (int, error) { + rw.len = len(p) + return rw.buffer.Write(p) +} + +// WriteHeader writes the header status code. +func (rw *ResponseReadWriter) WriteHeader(statusCode int) { + rw.statusCode = statusCode +} + +func propagateHeaders(header map[string]string, wHeader http.Header) { + for k, h := range header { + wHeader.Set(k, h) + } +} diff --git a/component/http/http_test.go b/component/http/http_test.go index b2f155ac1f..88d95bb15c 100644 --- a/component/http/http_test.go +++ b/component/http/http_test.go @@ -30,3 +30,52 @@ func TestNewResponse(t *testing.T) { assert.NotNil(t, rsp) assert.IsType(t, "test", rsp.Payload) } + +func TestResponseReadWriter_Header(t *testing.T) { + rw := NewResponseReadWriter() + rw.Header().Set("key", "value") + assert.Equal(t, "value", rw.Header().Get("key")) +} + +func TestResponseReadWriter_StatusCode(t *testing.T) { + rw := NewResponseReadWriter() + rw.WriteHeader(100) + assert.Equal(t, 100, rw.statusCode) +} + +func TestResponseReadWriter_ReadWrite(t *testing.T) { + rw := NewResponseReadWriter() + str := "body" + i, err := rw.Write([]byte(str)) + assert.NoError(t, err) + + r := make([]byte, i) + j, err := rw.Read(r) + assert.NoError(t, err) + + assert.Equal(t, i, j) + assert.Equal(t, str, string(r)) +} + +func TestResponseReadWriter_ReadWriteAll(t *testing.T) { + rw := NewResponseReadWriter() + str := "body" + i, err := rw.Write([]byte(str)) + assert.NoError(t, err) + + b, err := rw.ReadAll() + assert.NoError(t, err) + + assert.Equal(t, i, len(b)) + assert.Equal(t, str, string(b)) +} + +func TestResponseReadWriter_ReadAllEmpty(t *testing.T) { + rw := NewResponseReadWriter() + + b, err := rw.ReadAll() + assert.NoError(t, err) + + assert.Equal(t, 0, len(b)) + assert.Equal(t, "", string(b)) +} diff --git a/component/http/route.go b/component/http/route.go index dfec72bdf5..78315c7553 100644 --- a/component/http/route.go +++ b/component/http/route.go @@ -5,9 +5,6 @@ import ( "fmt" "net/http" "strings" - "time" - - "github.com/beatlabs/patron/cache" "github.com/beatlabs/patron/component/http/auth" errs "github.com/beatlabs/patron/errors" @@ -23,13 +20,15 @@ type Route struct { // RouteBuilder for building a route. type RouteBuilder struct { - method string - path string - trace bool - middlewares []MiddlewareFunc - authenticator auth.Authenticator - handler http.HandlerFunc - errors []error + method string + path string + trace bool + middlewares []MiddlewareFunc + authenticator auth.Authenticator + processor ProcessorFunc + handler http.HandlerFunc + routeCacheBuilder *RouteCacheBuilder + errors []error } // WithTrace enables route tracing. @@ -56,6 +55,14 @@ func (rb *RouteBuilder) WithAuth(auth auth.Authenticator) *RouteBuilder { return rb } +func (rb *RouteBuilder) WithRouteCachedBuilder(routeCacheBuilder *RouteCacheBuilder) *RouteBuilder { + if routeCacheBuilder == nil { + rb.errors = append(rb.errors, errors.New("cache route builder is nil")) + } + rb.routeCacheBuilder = routeCacheBuilder + return rb +} + func (rb *RouteBuilder) setMethod(method string) *RouteBuilder { if rb.method != "" { rb.errors = append(rb.errors, errors.New("method already set")) @@ -130,14 +137,56 @@ func (rb *RouteBuilder) Build() (Route, error) { middlewares = append(middlewares, rb.middlewares...) } + // TODO : refactor appropriately + var processor ProcessorFunc + var handler http.HandlerFunc + + if rb.routeCacheBuilder != nil { + + if rb.method != http.MethodGet { + return Route{}, errors.New("cannot apply cache to a route with any method other than GET ") + } + + rc, err := rb.routeCacheBuilder.create(rb.path) + if err != nil { + return Route{}, fmt.Errorf("could not build cache from builder %v: %w", rb.routeCacheBuilder, err) + } + + // TODO : we need to refactor the abstraction in issue #160 + if rb.processor != nil { + // builder was initialised from the NewRouteBuilder constructor + // e.g. the only place where the rb.processor is set + processor = wrapProcessorFunc(rb.path, rb.processor, rc) + } else { + // we could have handled the processor also at a middleware level, + // but this would not work uniformly for the above case as well. + handler = wrapHandlerFunc(rb.handler, rc) + } + } + + if processor == nil { + processor = rb.processor + } + + if handler == nil { + handler = rb.handler + } + return Route{ path: rb.path, method: rb.method, - handler: rb.handler, + handler: constructHTTPHandler(processor, handler), middlewares: middlewares, }, nil } +func constructHTTPHandler(processor ProcessorFunc, httpHandler http.HandlerFunc) http.HandlerFunc { + if processor == nil { + return httpHandler + } + return handler(processor) +} + // NewRawRouteBuilder constructor. func NewRawRouteBuilder(path string, handler http.HandlerFunc) *RouteBuilder { var ee []error @@ -156,17 +205,17 @@ func NewRawRouteBuilder(path string, handler http.HandlerFunc) *RouteBuilder { // NewRouteBuilder constructor. func NewRouteBuilder(path string, processor ProcessorFunc) *RouteBuilder { - var err error + var ee []error - if processor == nil { - err = errors.New("processor is nil") + if path == "" { + ee = append(ee, errors.New("path is empty")) } - rb := NewRawRouteBuilder(path, handler(processor)) - if err != nil { - rb.errors = append(rb.errors, err) + if processor == nil { + ee = append(ee, errors.New("processor is nil")) } - return rb + + return &RouteBuilder{path: path, errors: ee, processor: processor} } // RoutesBuilder creates a list of routes. @@ -212,109 +261,3 @@ func (rb *RoutesBuilder) Build() ([]Route, error) { func NewRoutesBuilder() *RoutesBuilder { return &RoutesBuilder{} } - -type CachedRouteBuilder struct { - path string - processor sync.ProcessorFunc - cache cache.Cache - instant TimeInstant - ttl time.Duration - minAge uint - maxFresh uint - staleResponse bool - errors []error -} - -// WithTimeInstant specifies a time instant function for checking expiry. -func (cb *CachedRouteBuilder) WithTimeInstant(instant TimeInstant) *CachedRouteBuilder { - if instant == nil { - cb.errors = append(cb.errors, errors.New("time instant is nil")) - } - cb.instant = instant - return cb -} - -// WithTimeInstant adds a time to live parameter to control the cache expiry policy. -func (cb *CachedRouteBuilder) WithTimeToLive(ttl time.Duration) *CachedRouteBuilder { - if ttl <= 0 { - cb.errors = append(cb.errors, errors.New("time to live must be greater than `0`")) - } - cb.ttl = ttl - return cb -} - -// WithMinAge adds a minimum age for the cache responses. -// This will avoid cases where a single client with high request rate and no cache control headers might effectively disable the cache -// This means that if this parameter is missing (e.g. is equal to '0' , the cache can effectively be made obsolete in the above scenario) -func (cb *CachedRouteBuilder) WithMinAge(minAge uint) *CachedRouteBuilder { - cb.minAge = minAge - return cb -} - -// WithMinFresh adds a minimum age for the cache responses. -// This will avoid cases where a single client with high request rate and no cache control headers might effectively disable the cache -// This means that if this parameter is missing (e.g. is equal to '0' , the cache can effectively be made obsolete in the above scenario) -func (cb *CachedRouteBuilder) WithMaxFresh(maxFresh uint) *CachedRouteBuilder { - cb.maxFresh = maxFresh - return cb -} - -// WithStaleResponse allows the cache to return stale responses. -func (cb *CachedRouteBuilder) WithStaleResponse(staleResponse bool) *CachedRouteBuilder { - cb.staleResponse = staleResponse - return cb -} - -func (cb *CachedRouteBuilder) Create() (*routeCache, error) { - //if len(cb.errors) > 0 { - //ttl > 0 - //maxfresh < ttl - return &routeCache{}, nil - //} -} - -func NewRouteCache(path string, processor sync.ProcessorFunc, cache cache.Cache) *routeCache { - if strings.ReplaceAll(path, " ", "") == "" { - - } - return &routeCache{ - path: path, - processor: processor, - cache: cache, - instant: func() int64 { - return time.Now().Unix() - }, - } -} - -// ToGetRouteBuilder transforms the cached builder to a GET endpoint builder -// while propagating any errors -func (cb *CachedRouteBuilder) ToGetRouteBuilder() *RouteBuilder { - routeCache, err := cb.Create() - if err == nil { - - } - rb := NewRouteBuilder(cb.path, cacheHandler(cb.processor, routeCache)).MethodGet() - rb.errors = append(rb.errors, cb.errors...) - return rb -} - -type routeCache struct { - // path is the route path, which the cache is enabled for - path string - // processor is the processor function for the route - processor sync.ProcessorFunc - // cache is the cache implementation to be used - cache cache.Cache - // ttl is the time to live for all cached objects - ttl time.Duration - // instant is the timing function for the cache expiry calculations - instant TimeInstant - // minAge specifies the minimum amount of max-age header value for client cache-control requests - minAge uint - // max-fresh specifies the maximum amount of min-fresh header value for client cache-control requests - maxFresh uint - // staleResponse specifies if the server is willing to send stale responses - // if a new response could not be generated for any reason - staleResponse bool -} diff --git a/component/http/route_cache.go b/component/http/route_cache.go new file mode 100644 index 0000000000..7ebbf61250 --- /dev/null +++ b/component/http/route_cache.go @@ -0,0 +1,79 @@ +package http + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/beatlabs/patron/log" +) + +func wrapProcessorFunc(path string, processor ProcessorFunc, rc *routeCache) ProcessorFunc { + return func(ctx context.Context, request *Request) (response *Response, e error) { + // we are doing the opposite work that we would do in the processor, + // but until we refactor this part this seems the only way + req := &cacheHandlerRequest{} + req.fromRequest(path, request) + resp, err := cacheHandler(processorExecutor(ctx, request, processor), rc)(req) + if err != nil { + return nil, err + } + return &Response{Payload: resp.payload, Headers: resp.header}, nil + } +} + +// processorExecutor is the function that will create a new cachedResponse based on a ProcessorFunc implementation +var processorExecutor = func(ctx context.Context, request *Request, hnd ProcessorFunc) executor { + return func(now int64, key string) *cachedResponse { + if response, err := hnd(ctx, request); err == nil { + return &cachedResponse{ + response: &cacheHandlerResponse{ + payload: response.Payload, + header: make(map[string]string), + }, + lastValid: now, + etag: generateETag([]byte(key), time.Now().Nanosecond()), + } + } else { + return &cachedResponse{err: err} + } + } +} + +func wrapHandlerFunc(handler http.HandlerFunc, rc *routeCache) http.HandlerFunc { + return func(response http.ResponseWriter, request *http.Request) { + req := &cacheHandlerRequest{} + req.fromHTTPRequest(request) + if resp, err := cacheHandler(handlerExecutor(response, request, handler), rc)(req); err != nil { + log.Errorf("could not handle request with the cache processor: %v", err) + } else { + println(fmt.Sprintf("resp = %v", resp)) + propagateHeaders(resp.header, response.Header()) + if i, err := response.Write(resp.bytes); err != nil { + log.Errorf("could not write cache processor result into response %d: %v", i, err) + } + } + } +} + +// handlerExecutor is the function that will create a new cachedResponse based on a HandlerFunc implementation +var handlerExecutor = func(response http.ResponseWriter, request *http.Request, hnd http.HandlerFunc) executor { + return func(now int64, key string) *cachedResponse { + responseReadWriter := NewResponseReadWriter() + hnd(responseReadWriter, request) + if payload, err := responseReadWriter.ReadAll(); err == nil { + return &cachedResponse{ + response: &cacheHandlerResponse{ + bytes: payload, + header: make(map[string]string), + }, + lastValid: now, + etag: generateETag([]byte(key), time.Now().Nanosecond()), + } + } else { + return &cachedResponse{err: err} + } + + } +} diff --git a/component/http/route_cache_test.go b/component/http/route_cache_test.go new file mode 100644 index 0000000000..02d87c9579 --- /dev/null +++ b/component/http/route_cache_test.go @@ -0,0 +1,268 @@ +package http + +import ( + "context" + "errors" + "net/http" + "sync" + "sync/atomic" + "testing" + "time" + + client "github.com/beatlabs/patron/client/http" + + "github.com/stretchr/testify/assert" +) + +type cacheState struct { + setOps int + getOps int + size int +} + +func TestProcessorWrapper(t *testing.T) { + + type arg struct { + processor ProcessorFunc + req *Request + err bool + } + + args := []arg{ + { + processor: func(ctx context.Context, request *Request) (response *Response, e error) { + return nil, errors.New("processor error") + }, + req: NewRequest(make(map[string]string), nil, make(map[string]string), nil), + err: true, + }, + { + processor: func(ctx context.Context, request *Request) (response *Response, e error) { + return NewResponse(request.Fields), nil + }, + req: NewRequest(make(map[string]string), nil, make(map[string]string), nil), + }, + } + + ctx := context.Background() + + for _, testArg := range args { + c := newTestingCache() + rc, err := NewRouteCacheBuilder(c, 10).create("/") + assert.NoError(t, err) + + wrappedProcessor := wrapProcessorFunc("/", testArg.processor, rc) + + response, err := wrappedProcessor(ctx, testArg.req) + + if testArg.err { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, response) + } + + } + +} + +func TestHandlerWrapper(t *testing.T) { + + type arg struct { + handler http.HandlerFunc + req *http.Request + rsp *ResponseReadWriter + } + + args := []arg{ + { + handler: func(writer http.ResponseWriter, request *http.Request) { + i, err := writer.Write([]byte(request.RequestURI)) + assert.NoError(t, err) + assert.True(t, i > 0) + }, + rsp: NewResponseReadWriter(), + req: &http.Request{RequestURI: "http://www.localhost.com"}, + }, + } + + for _, testArg := range args { + c := newTestingCache() + rc, err := NewRouteCacheBuilder(c, 10).create("/") + assert.NoError(t, err) + + wrappedHandler := wrapHandlerFunc(testArg.handler, rc) + + wrappedHandler(testArg.rsp, testArg.req) + + assert.NoError(t, err) + b, err := testArg.rsp.ReadAll() + assert.NoError(t, err) + assert.NotNil(t, b) + assert.True(t, len(b) > 0) + } + +} + +func TestRouteCacheImplementation_WithSingleRequest(t *testing.T) { + + cache := newTestingCache() + + var executions uint32 + + routeBuilder := NewRouteBuilder("/path", func(context context.Context, request *Request) (response *Response, e error) { + atomic.AddUint32(&executions, 1) + return NewResponse("body"), nil + }).WithRouteCachedBuilder(NewRouteCacheBuilder(cache, 10*time.Second)).MethodGet() + + ctx, cln := context.WithTimeout(context.Background(), 3*time.Second) + + runRoute(t, ctx, routeBuilder) + + assertResponse(t, ctx, []http.Response{ + { + Header: map[string][]string{cacheControlHeader: {"max-age=10"}}, + Body: &bodyReader{body: "\"body\""}, + }, + { + Header: map[string][]string{cacheControlHeader: {"max-age=10"}}, + Body: &bodyReader{body: "\"body\""}, + }, + }) + + assertCacheState(t, *cache, cacheState{ + setOps: 1, + getOps: 2, + size: 1, + }) + + assert.Equal(t, executions, uint32(1)) + + cln() + +} + +func TestRawRouteCacheImplementation_WithSingleRequest(t *testing.T) { + + cache := newTestingCache() + + var executions uint32 + + routeBuilder := NewRawRouteBuilder("/path", func(writer http.ResponseWriter, request *http.Request) { + atomic.AddUint32(&executions, 1) + i, err := writer.Write([]byte("\"body\"")) + assert.NoError(t, err) + assert.True(t, i > 0) + }).WithRouteCachedBuilder(NewRouteCacheBuilder(cache, 10*time.Second)).MethodGet() + + ctx, cln := context.WithTimeout(context.Background(), 3*time.Second) + + runRoute(t, ctx, routeBuilder) + + assertResponse(t, ctx, []http.Response{ + { + Header: map[string][]string{cacheControlHeader: {"max-age=10"}}, + Body: &bodyReader{body: "\"body\""}, + }, + { + Header: map[string][]string{cacheControlHeader: {"max-age=10"}}, + Body: &bodyReader{body: "\"body\""}, + }, + }) + + assertCacheState(t, *cache, cacheState{ + setOps: 1, + getOps: 2, + size: 1, + }) + + assert.Equal(t, executions, uint32(1)) + + cln() + +} + +type bodyReader struct { + body string +} + +func (br *bodyReader) Read(p []byte) (n int, err error) { + var c int + for i, b := range []byte(br.body) { + p[i] = b + c = i + } + return c + 1, nil +} + +func (br *bodyReader) Close() error { + // nothing to do + return nil +} + +func runRoute(t *testing.T, ctx context.Context, routeBuilder *RouteBuilder) { + cmp, err := NewBuilder().WithRoutesBuilder(NewRoutesBuilder().Append(routeBuilder)).Create() + + assert.NoError(t, err) + assert.NotNil(t, cmp) + + go func() { + err = cmp.Run(ctx) + assert.NoError(t, err) + }() + + var lwg sync.WaitGroup + lwg.Add(1) + go func() { + cl, err := client.New() + assert.NoError(t, err) + req, err := http.NewRequest("GET", "http://localhost:50000/ready", nil) + assert.NoError(t, err) + for { + select { + case <-ctx.Done(): + return + default: + r, err := cl.Do(ctx, req) + if err == nil && r != nil { + lwg.Done() + return + } + } + } + }() + lwg.Wait() +} + +func assertResponse(t *testing.T, ctx context.Context, expected []http.Response) { + + cl, err := client.New() + assert.NoError(t, err) + req, err := http.NewRequest("GET", "http://localhost:50000/path", nil) + assert.NoError(t, err) + + for _, expectedResponse := range expected { + response, err := cl.Do(ctx, req) + + assert.NoError(t, err) + assert.Equal(t, expectedResponse.Header.Get(cacheControlHeader), response.Header.Get(cacheControlHeader)) + assert.True(t, response.Header.Get(eTagHeader) != "") + expectedPayload := make([]byte, 6) + i, err := expectedResponse.Body.Read(expectedPayload) + assert.NoError(t, err) + + responsePayload := make([]byte, 6) + j, err := response.Body.Read(responsePayload) + assert.Error(t, err) + + assert.Equal(t, i, j) + assert.Equal(t, expectedPayload, responsePayload) + } + +} + +func assertCacheState(t *testing.T, cache testingCache, cacheState cacheState) { + assert.Equal(t, cacheState.setOps, cache.setCount) + assert.Equal(t, cacheState.getOps, cache.getCount) + assert.Equal(t, cacheState.size, cache.size()) +} diff --git a/component/http/route_test.go b/component/http/route_test.go index 35712e3f44..8c596edf81 100644 --- a/component/http/route_test.go +++ b/component/http/route_test.go @@ -157,6 +157,16 @@ func TestRouteBuilder_WithAuth(t *testing.T) { } } +func TestRouteBuilder_WithRouteCacheNil(t *testing.T) { + + rb := NewRawRouteBuilder("/", func(writer http.ResponseWriter, request *http.Request) {}). + WithRouteCachedBuilder(nil) + + assert.Len(t, rb.errors, 1) + assert.EqualError(t, rb.errors[0], "cache route builder is nil") + +} + func TestRouteBuilder_Build(t *testing.T) { mockAuth := &MockAuthenticator{} mockProcessor := func(context.Context, *Request) (*Response, error) { return nil, nil } diff --git a/sync/http/route_cache_test.go b/sync/http/route_cache_test.go deleted file mode 100644 index d02cfda642..0000000000 --- a/sync/http/route_cache_test.go +++ /dev/null @@ -1 +0,0 @@ -package http