Skip to content

Commit

Permalink
beatlabs#128 fix redis cache interaction and create cached route example
Browse files Browse the repository at this point in the history
Signed-off-by: Vangelis Katikaridis <[email protected]>
  • Loading branch information
drakos74 committed May 2, 2020
1 parent 88d8d2b commit a39ffaa
Show file tree
Hide file tree
Showing 16 changed files with 452 additions and 271 deletions.
202 changes: 79 additions & 123 deletions component/http/cache.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package http

import (
"encoding/json"
"fmt"
"hash/crc32"
"net/http"
"strconv"
"strings"
"time"
Expand All @@ -19,12 +17,12 @@ type validationContext int
const (
// validation due to normal expiry in terms of ttl setting
ttlValidation validationContext = iota + 1
// maxAgeValidation represents a validation , happening due to max-age header requirements
// maxAgeValidation represents a validation , happening due to max-age Header requirements
maxAgeValidation
// minFreshValidation represents a validation , happening due to min-fresh header requirements
// minFreshValidation represents a validation , happening due to min-fresh Header requirements
minFreshValidation

// cacheControlHeader is the header key for cache related values
// cacheControlHeader is the Header key for cache related values
// note : it is case-sensitive
cacheControlHeader = "Cache-Control"

Expand Down Expand Up @@ -73,87 +71,25 @@ type cacheControl struct {
expiryValidator validator
}

// cacheHandlerResponse is the dedicated response object for the cache handler
type cacheHandlerResponse struct {
payload interface{}
bytes []byte
header map[string]string
}

// cacheHandlerRequest is the dedicated request object for the cache handler
type cacheHandlerRequest struct {
header string
path string
query string
}

// fromRequest transforms the Request object to the cache handler request
func fromRequest(path string, req *Request) *cacheHandlerRequest {
var header string
if req.Headers != nil {
header = req.Headers[cacheControlHeader]
}
var query string
if req.Fields != nil {
if fields, err := json.Marshal(req.Fields); err == nil {
query = string(fields)
}
}
return &cacheHandlerRequest{
header: header,
path: path,
query: query,
}
}

// fromHTTPRequest transforms the http Request object to the cache handler request
func fromHTTPRequest(req *http.Request) *cacheHandlerRequest {
var header string
if req.Header != nil {
header = req.Header.Get(cacheControlHeader)
}
var path string
var query string
if req.URL != nil {
path = req.URL.Path
query = req.URL.RawQuery
}
return &cacheHandlerRequest{
header: header,
path: path,
query: query,
}
}

// cachedResponse is the struct representing an object retrieved or ready to be put into the route cache
type cachedResponse struct {
response *cacheHandlerResponse
lastValid int64
etag string
warning string
fromCache bool
err error
}

// executor is the function returning a cache response object from the underlying implementation
type executor func(now int64, key string) *cachedResponse
// executor is the function returning a cache Response object from the underlying implementation
type executor func(now int64, key string) *CachedResponse

// cacheHandler wraps the an execution logic with a cache layer
// exec is the processor func that the cache will wrap
// rc is the route cache implementation to be used
func cacheHandler(exec executor, rc *routeCache) func(request *cacheHandlerRequest) (response *cacheHandlerResponse, e error) {
func cacheHandler(exec executor, rc *routeCache) func(request *cacheHandlerRequest) (response *CacheHandlerResponse, e error) {

return func(request *cacheHandlerRequest) (response *cacheHandlerResponse, e error) {
return func(request *cacheHandlerRequest) (response *CacheHandlerResponse, e error) {

now := now()

key := extractRequestKey(request.path, request.query)
key := request.getKey()

var rsp *cachedResponse
var rsp *CachedResponse

if hasNoAgeConfig(rc.age.min, rc.age.max) {
rsp = exec(now, key)
return rsp.response, rsp.err
return &rsp.Response, rsp.Err
}

cfg := extractRequestHeaders(request.header, rc.age.min, rc.age.max-rc.age.min)
Expand All @@ -162,23 +98,23 @@ func cacheHandler(exec executor, rc *routeCache) func(request *cacheHandlerReque
}

rsp = getResponse(cfg, request.path, key, now, rc, exec)
response = rsp.response
e = rsp.err
response = &rsp.Response
e = rsp.Err

if e == nil {
addResponseHeaders(now, response.header, rsp, rc.age.max)
if !rsp.fromCache && !cfg.noCache {
saveResponseWithTTL(request.path, key, rsp, rc.cache, time.Duration(rc.age.max)*time.Second)
addResponseHeaders(now, response.Header, rsp, rc.age.max)
if !rsp.FromCache && !cfg.noCache {
saveToCache(request.path, key, rsp, rc.cache, time.Duration(rc.age.max)*time.Second)
}
}

return
}
}

// getResponse will get the appropriate response either using the cache or the executor,
// getResponse will get the appropriate Response either using the cache or the executor,
// depending on the
func getResponse(cfg *cacheControl, path, key string, now int64, rc *routeCache, exec executor) *cachedResponse {
func getResponse(cfg *cacheControl, path, key string, now int64, rc *routeCache, exec executor) *CachedResponse {

if cfg.noCache {
return exec(now, key)
Expand All @@ -189,25 +125,26 @@ func getResponse(cfg *cacheControl, path, key string, now int64, rc *routeCache,
metrics.miss(path)
return exec(now, key)
}
if rsp.err != nil {
if rsp.Err != nil {
log.Errorf("error during cache interaction: %v", rsp.Err)
metrics.err(path)
return exec(now, key)
}
// if the object has expired
if isValid, cx := isValid(now-rsp.lastValid, rc.age.max, append(cfg.validators, cfg.expiryValidator)...); !isValid {
if isValid, cx := isValid(now-rsp.LastValid, rc.age.max, append(cfg.validators, cfg.expiryValidator)...); !isValid {
tmpRsp := exec(now, key)
// if we could not retrieve a fresh response,
// serve the last cached value, with a warning header
if cfg.forceCache || tmpRsp.err != nil {
rsp.warning = "last-valid"
// if we could not retrieve a fresh Response,
// serve the last cached value, with a Warning Header
if cfg.forceCache || tmpRsp.Err != nil {
rsp.Warning = "last-valid"
metrics.hit(path)
} else {
rsp = tmpRsp
metrics.evict(path, cx, now-rsp.lastValid)
metrics.evict(path, cx, now-rsp.LastValid)
}
} else {
// add any warning generated while parsing the headers
rsp.warning = cfg.warning
// add any Warning generated while parsing the headers
rsp.Warning = cfg.warning
metrics.hit(path)
}

Expand All @@ -226,49 +163,68 @@ func isValid(age, maxAge int64, validators ...validator) (bool, validationContex
return true, 0
}

// getFromCache is the implementation that will provide a cachedResponse instance from the cache,
// getFromCache is the implementation that will provide a CachedResponse instance from the cache,
// if it exists
func getFromCache(key string, rc *routeCache) *cachedResponse {
func getFromCache(key string, rc *routeCache) *CachedResponse {
if resp, ok, err := rc.cache.Get(key); ok && err == nil {
if r, ok := resp.(*cachedResponse); ok {
r.fromCache = true
if b, ok := resp.([]byte); ok {
r := &CachedResponse{}
err := r.decode(b)
if err != nil {
return &CachedResponse{Err: fmt.Errorf("could not decode cached bytes as response %v for key %s", resp, key)}
}
r.FromCache = true
return r
}
return &cachedResponse{err: fmt.Errorf("could not parse cached response %v for key %s", resp, key)}
// NOTE : we need to do this hack to bypass the redis go client implementation of returning result as string instead of bytes
if b, ok := resp.(string); ok {
r := &CachedResponse{}
err := r.decode([]byte(b))
if err != nil {
return &CachedResponse{Err: fmt.Errorf("could not decode cached string as response %v for key %s", resp, key)}
}
r.FromCache = true
return r
}

return &CachedResponse{Err: fmt.Errorf("could not parse cached response %v for key %s", resp, key)}
} else if err != nil {
return &cachedResponse{err: fmt.Errorf("could not read cache value for [ key = %v , err = %v ]", key, err)}
return &CachedResponse{Err: fmt.Errorf("could not read cache value for [ key = %v , Err = %v ]", key, err)}
}
return nil
}

// saveResponseWithTTL caches the given response if required with a ttl
// saveToCache caches the given Response if required with a ttl
// as we are putting the objects in the cache, if its a TTL one, we need to manage the expiration on our own
func saveResponseWithTTL(path, key string, rsp *cachedResponse, cache cache.TTLCache, maxAge time.Duration) {
if !rsp.fromCache && rsp.err == nil {
if err := cache.SetTTL(key, rsp, maxAge); err != nil {
log.Errorf("could not cache response for request key %s %v", key, err)
} else {
metrics.add(path)
func saveToCache(path, key string, rsp *CachedResponse, cache cache.TTLCache, maxAge time.Duration) {
if !rsp.FromCache && rsp.Err == nil {
// encode to a byte array on our side to avoid cache specific encoding / marshaling requirements
bytes, err := rsp.encode()
if err != nil {
log.Errorf("could not encode response for request key %s: %v", key, err)
metrics.err(path)
return
}
if err := cache.SetTTL(key, bytes, maxAge); err != nil {
log.Errorf("could not cache response for request key %s: %v", key, err)
metrics.err(path)
return
}
metrics.add(path)
}
}

// addResponseHeaders adds the appropriate headers according to the cachedResponse conditions
func addResponseHeaders(now int64, header map[string]string, rsp *cachedResponse, maxAge int64) {
header[cacheHeaderETagHeader] = rsp.etag
header[cacheControlHeader] = createCacheControlHeader(maxAge, now-rsp.lastValid)
if rsp.warning != "" && rsp.fromCache {
header[cacheHeaderWarning] = rsp.warning
// addResponseHeaders adds the appropriate headers according to the CachedResponse conditions
func addResponseHeaders(now int64, header map[string]string, rsp *CachedResponse, maxAge int64) {
header[cacheHeaderETagHeader] = rsp.Etag
header[cacheControlHeader] = createCacheControlHeader(maxAge, now-rsp.LastValid)
if rsp.Warning != "" && rsp.FromCache {
header[cacheHeaderWarning] = rsp.Warning
} else {
delete(header, cacheHeaderWarning)
}
}

// extractRequestKey generates a unique cache key based on the route path and the query parameters
func extractRequestKey(path, query string) string {
return fmt.Sprintf("%s:%s", path, query)
}

// extractRequestHeaders extracts the client request headers allowing the client some control over the cache
func extractRequestHeaders(header string, minAge, maxFresh int64) *cacheControl {

Expand All @@ -284,14 +240,14 @@ func extractRequestHeaders(header string, minAge, maxFresh int64) *cacheControl
switch headerKey {
case cacheHeaderMaxAge:
/**
Indicates that the client is willing to accept a response whose
Indicates that the client is willing to accept a Response whose
age is no greater than the specified time in seconds. Unless max-
stale directive is also included, the client is not willing to
accept a stale response.
accept a stale Response.
*/
value, ok := parseValue(keyValue)
if !ok || value < 0 {
log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue)
log.Debugf("invalid value for Header '%s', defaulting to '0' ", keyValue)
value = 0
}
value, adjusted := min(value, minAge)
Expand All @@ -303,15 +259,15 @@ func extractRequestHeaders(header string, minAge, maxFresh int64) *cacheControl
})
case cacheControlMinFresh:
/**
Indicates that the client is willing to accept a response whose
Indicates that the client is willing to accept a Response whose
freshness lifetime is no less than its current age plus the
specified time in seconds. That is, the client wants a response
specified time in seconds. That is, the client wants a Response
that will still be fresh for at least the specified number of
seconds.
*/
value, ok := parseValue(keyValue)
if !ok || value < 0 {
log.Debugf("invalid value for header '%s', defaulting to '0' ", keyValue)
log.Debugf("invalid value for Header '%s', defaulting to '0' ", keyValue)
value = 0
}
value, adjusted := max(value, maxFresh)
Expand All @@ -323,8 +279,8 @@ func extractRequestHeaders(header string, minAge, maxFresh int64) *cacheControl
})
case cacheControlNoCache:
/**
return response if entity has changed
e.g. (304 response if nothing has changed : 304 Not Modified)
return Response if entity has changed
e.g. (304 Response if nothing has changed : 304 Not Modified)
it SHOULD NOT include min-fresh, max-stale, or max-age.
request should be accompanied by an ETag token
*/
Expand All @@ -345,7 +301,7 @@ func extractRequestHeaders(header string, minAge, maxFresh int64) *cacheControl
case cacheControlEmpty:
// nothing to do here
default:
log.Warn("unrecognised cache header: '%s'", header)
log.Warn("unrecognised cache Header: '%s'", header)
}
}
if len(wrn) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions component/http/cache_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type prometheusMetrics struct {
}

func (m *prometheusMetrics) add(path string) {
m.operations.WithLabelValues(path, "merge", "").Inc()
m.operations.WithLabelValues(path, "add", "").Inc()
}

func (m *prometheusMetrics) miss(path string) {
Expand All @@ -31,7 +31,7 @@ func (m *prometheusMetrics) hit(path string) {
}

func (m *prometheusMetrics) err(path string) {
m.operations.WithLabelValues(path, "err", "").Inc()
m.operations.WithLabelValues(path, "Err", "").Inc()
}

func (m *prometheusMetrics) evict(path string, context validationContext, age int64) {
Expand Down
Loading

0 comments on commit a39ffaa

Please sign in to comment.