Skip to content

Commit

Permalink
beatlabs#128 server route cache implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vangelis Katikaridis <[email protected]>
  • Loading branch information
drakos74 committed Apr 1, 2020
1 parent 7e3c6b9 commit ee0460c
Show file tree
Hide file tree
Showing 15 changed files with 1,588 additions and 420 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ This is transparent to the server. As long as a key cannot be found in the cache
the server will execute the route processor function and fill the corresponding cache entry
```

```
Note : When a cache is used, the handler execution might be skipped.
That implies that all generic handler functionalities MUST be delegated to a custom middleware.
i.e. counting number of server client requests etc ...
```

#### client cache-control
The client can control the cache with the appropriate Headers
- `max-age=?`
Expand Down Expand Up @@ -271,6 +277,11 @@ expects any response that is found in the cache, otherwise returns an empty resp
- https://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
- https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.9

#### improvement considerations
- we can split the storing of the cached objects and the age parameter, to avoid loading the whole object in memory,
if the object is already expired. This would provide considerable performance (in terms of memory utilisation)
improvement for big response objects.

### Asynchronous

The implementation of the async processor follows exactly the same principle as the sync processor.
Expand Down
198 changes: 117 additions & 81 deletions sync/http/cache.go → component/http/cache.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package http

import (
"context"
"encoding/json"
"fmt"
"hash/crc32"
"net/http"
"strconv"
"strings"
"time"

"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/sync"
)

// TODO : add comments where applicable
// TODO : wrap up implementation

type CacheHeader int

type validationContext int

const (
// validation due to normal expiry in terms of ttl setting
ttlValidation validationContext = iota + 1
// maxAgeValidation represents a validation , happening due to max-age header requirements
maxAgeValidation
// minFreshValidation represents a validation , happening due to min-fresh header requirements
minFreshValidation
// maxStaleValidation represents a validation , happening due to max-stale header requirements
maxStaleValidation

// cacheControlHeader is the header key for cache related values
// note : it is case-sensitive
cacheControlHeader = "Cache-Control"
Expand Down Expand Up @@ -61,11 +71,11 @@ const (
type TimeInstant func() int64

// validator is a conditional function on an objects age and the configured ttl
type validator func(age, ttl int64) bool
type validator func(age, ttl int64) (bool, validationContext)

// expiryCheck is the main validator that checks that the entry has not expired e.g. is stale
var expiryCheck validator = func(age, ttl int64) bool {
return age <= ttl
var expiryCheck validator = func(age, ttl int64) (bool, validationContext) {
return age <= ttl, ttlValidation
}

// cacheControl is the model of the request parameters regarding the cache control
Expand All @@ -76,79 +86,122 @@ type cacheControl struct {
expiryValidator validator
}

// cacheHandler wraps the handler func with a cache layer
// hnd is the processor func that the cache will wrap
// rc is the route cache implementation to be used
func cacheHandler(hnd sync.ProcessorFunc, rc *routeCache) sync.ProcessorFunc {
// cacheHandlerResponse is the dedicated response object for the cache handler
type cacheHandlerResponse struct {
payload interface{}
bytes []byte
header map[string]string
}

// cacheHandlerRequest is the dedicated request object for the cache handler
type cacheHandlerRequest struct {
header string
path string
query string
}

// fromRequest transforms the Request object to the cache handler request
func (r *cacheHandlerRequest) fromRequest(path string, req *Request) {
r.path = path
if req.Headers != nil {
r.header = req.Headers[cacheControlHeader]
}
if req.Fields != nil {
if query, err := json.Marshal(req.Fields); err == nil {
r.query = string(query)
}
}
}

// fromRequest transforms the http Request object to the cache handler request
func (r *cacheHandlerRequest) fromHTTPRequest(req *http.Request) {
if req.Header != nil {
r.header = req.Header.Get(cacheControlHeader)
}
if req.URL != nil {
r.path = req.URL.Path
r.query = req.URL.RawQuery
}
}

return func(ctx context.Context, request *sync.Request) (response *sync.Response, e error) {
// executor is the function returning a cache response object from the underlying implementation
type executor func(now int64, key string) *cachedResponse

responseHeaders := make(map[string]string)
// cacheHandler wraps the an execution logic with a cache layer
// exec is the processor func that the cache will wrap
// rc is the route cache implementation to be used
func cacheHandler(exec executor, rc *routeCache) func(request *cacheHandlerRequest) (response *cacheHandlerResponse, e error) {

return func(request *cacheHandlerRequest) (response *cacheHandlerResponse, e error) {

now := rc.instant()

cfg, warning := extractCacheHeaders(request, rc.minAge, rc.maxFresh)
cfg, warning := extractCacheHeaders(request.header, rc.minAge, rc.maxFresh)
if cfg.expiryValidator == nil {
cfg.expiryValidator = expiryCheck
}
key := extractRequestKey(rc.path, request)

// TODO : add metrics

key := extractRequestKey(request.path, request.query)
var rsp *cachedResponse
var fromCache bool

// explore the cache
if cfg.noCache && !rc.staleResponse {
// need to execute the handler always
rsp = handlerExecutor(ctx, request, hnd, now, key)
rsp = exec(now, key)
} else {
// lets check the cache if we have anything for the given key
if rsp = cacheRetriever(key, rc, now); rsp == nil {
rc.metrics.miss(key)
// we have not encountered this key before
rsp = handlerExecutor(ctx, request, hnd, now, key)
rsp = exec(now, key)
} else {
expiry := int64(rc.ttl / time.Second)
if !isValid(expiry, now, rsp.lastValid, append(cfg.validators, cfg.expiryValidator)...) {
tmpRsp := handlerExecutor(ctx, request, hnd, now, key)
expiry := rc.ttl
age := now - rsp.lastValid
if isValid, cx := isValid(age, expiry, append(cfg.validators, cfg.expiryValidator)...); !isValid {
tmpRsp := exec(now, key)
// if we could not generate a fresh response, serve the last cached value,
// with a warning header
if rc.staleResponse && tmpRsp.err != nil {
warning = "last-valid"
fromCache = true
rc.metrics.hit(key)
} else {
rsp = tmpRsp
rc.metrics.evict(key, cx, age)
}
} else {
fromCache = true
rc.metrics.hit(key)
}
}
}
// TODO : use the forceCache parameter
if cfg.forceCache {
// return empty response if we have rc-only responseHeaders present
return sync.NewResponse([]byte{}), nil
return &cacheHandlerResponse{payload: []byte{}}, nil
}

response = rsp.response
e = rsp.err

// TODO : abstract into method
if e == nil {
responseHeaders[eTagHeader] = rsp.etag

responseHeaders[cacheControlHeader] = genCacheControlHeader(rc.ttl, now-rsp.lastValid)

if warning != "" && fromCache {
responseHeaders[warningHeader] = warning
}

response.Headers = responseHeaders
}

// we cache response only if we did not retrieve it from the cache itself and error is nil
// we cache response only if we did not retrieve it from the cache itself
// and if there was no error
if !fromCache && e == nil {
if err := rc.cache.Set(key, rsp); err != nil {
log.Errorf("could not cache response for request key %s %v", key, err)
}
rc.metrics.add(key)
}

// TODO : abstract into method
if e == nil {
response.header[eTagHeader] = rsp.etag
response.header[cacheControlHeader] = createCacheControlHeader(rc.ttl, now-rsp.lastValid)
if warning != "" && fromCache {
response.header[warningHeader] = warning
} else {
delete(response.header, warningHeader)
}
}

return
Expand All @@ -170,35 +223,20 @@ var cacheRetriever = func(key string, rc *routeCache, now int64) *cachedResponse
return nil
}

// handlerExecutor is the function that will create a new cachedResponse from based on the handler implementation
var handlerExecutor = func(ctx context.Context, request *sync.Request, hnd sync.ProcessorFunc, now int64, key string) *cachedResponse {
response, err := hnd(ctx, request)
return &cachedResponse{
response: response,
lastValid: now,
etag: genETag([]byte(key), time.Now().Nanosecond()),
err: err,
}
}

// extractCacheHeaders extracts the client request headers allowing the client some control over the cache
func extractCacheHeaders(request *sync.Request, minAge, maxFresh uint) (*cacheControl, string) {
if CacheControl, ok := request.Headers[cacheControlHeader]; ok {
return extractCacheHeader(CacheControl, minAge, maxFresh)
func extractCacheHeaders(header string, minAge, maxFresh int64) (*cacheControl, string) {
if header == "" {
return &cacheControl{noCache: minAge == 0 && maxFresh == 0}, ""
}
// if we have no headers we assume we dont want to cache,
return &cacheControl{noCache: minAge == 0 && maxFresh == 0}, ""
}

func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, string) {
cfg := cacheControl{
validators: make([]validator, 0),
}

var warning string
wrn := make([]string, 0)

for _, header := range strings.Split(headers, ",") {
for _, header := range strings.Split(header, ",") {
keyValue := strings.Split(header, "=")
headerKey := strings.ToLower(keyValue[0])
switch headerKey {
Expand All @@ -216,8 +254,8 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s
log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue)
value = 0
}
cfg.expiryValidator = func(age, ttl int64) bool {
return ttl-age+value >= 0
cfg.expiryValidator = func(age, ttl int64) (bool, validationContext) {
return ttl-age+value >= 0, maxStaleValidation
}
case maxAge:
/**
Expand All @@ -231,12 +269,12 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s
log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue)
value = 0
}
value, adjusted := min(value, int64(minAge))
value, adjusted := min(value, minAge)
if adjusted {
wrn = append(wrn, fmt.Sprintf("max-age=%d", minAge))
}
cfg.validators = append(cfg.validators, func(age, ttl int64) bool {
return age <= value
cfg.validators = append(cfg.validators, func(age, ttl int64) (bool, validationContext) {
return age <= value, maxAgeValidation
})
case minFresh:
/**
Expand All @@ -251,12 +289,12 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s
log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue)
value = 0
}
value, adjusted := max(value, int64(maxFresh))
value, adjusted := max(value, maxFresh)
if adjusted {
wrn = append(wrn, fmt.Sprintf("min-fresh=%d", maxFresh))
}
cfg.validators = append(cfg.validators, func(age, ttl int64) bool {
return ttl-age >= value
cfg.validators = append(cfg.validators, func(age, ttl int64) (bool, validationContext) {
return ttl-age >= value, minFreshValidation
})
case noCache:
/**
Expand All @@ -280,7 +318,6 @@ func extractCacheHeader(headers string, minAge, maxFresh uint) (*cacheControl, s
log.Warnf("unrecognised cache header %s", header)
}
}

if len(wrn) > 0 {
warning = strings.Join(wrn, ",")
}
Expand Down Expand Up @@ -313,37 +350,36 @@ func parseValue(keyValue []string) (int64, bool) {
}

type cachedResponse struct {
response *sync.Response
response *cacheHandlerResponse
lastValid int64
etag string
err error
}

func extractRequestKey(path string, request *sync.Request) string {
return fmt.Sprintf("%s:%s", path, request.Fields)
func extractRequestKey(path, query string) string {
return fmt.Sprintf("%s:%s", path, query)
}

func isValid(ttl, now, last int64, validators ...validator) bool {
func isValid(age, ttl int64, validators ...validator) (bool, validationContext) {
if len(validators) == 0 {
return false
return false, 0
}
age := now - last
for _, validator := range validators {
if !validator(age, ttl) {
return false
if isValid, cx := validator(age, ttl); !isValid {
return false, cx
}
}
return true
return true, 0
}

func genETag(key []byte, t int) string {
func generateETag(key []byte, t int) string {
return fmt.Sprintf("%d-%d", crc32.ChecksumIEEE(key), t)
}

func genCacheControlHeader(ttl time.Duration, lastValid int64) string {
maxAge := int64(ttl/time.Second) - lastValid
if maxAge <= 0 {
return fmt.Sprintf("%s", mustRevalidate)
func createCacheControlHeader(ttl, lastValid int64) string {
mAge := ttl - lastValid
if mAge < 0 {
return mustRevalidate
}
return fmt.Sprintf("%s=%d", maxAge, int64(ttl/time.Second)-lastValid)
return fmt.Sprintf("%s=%d", maxAge, ttl-lastValid)
}
Loading

0 comments on commit ee0460c

Please sign in to comment.