Skip to content

Commit

Permalink
beatlabs#128 cache wip implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vangelis Katikaridis <[email protected]>
  • Loading branch information
drakos74 committed Mar 14, 2020
1 parent 69990e1 commit a0470c7
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 8 deletions.
29 changes: 21 additions & 8 deletions examples/sixth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,33 @@ func main() {

err := patron.SetupLogging(name, version)
if err != nil {
fmt.Printf("failed to set up logging: %v", err)
os.Exit(1)
log.Fatalf("failed to init the cache %v", err)
}
cachedRoute := patronhttp.NewCachedRouteBuilder("/", sixth).WithCache(cache).MethodGet()

cmp, err := grpc.New(50006).Create()
ctx := context.Background()
err = patron.New(name, version).WithRoutesBuilder(patronhttp.NewRoutesBuilder().Append(cachedRoute)).Run(ctx)
if err != nil {
log.Fatalf("failed to create gRPC component: %v", err)
log.Fatalf("failed to run patron service %v", err)
}

greeter.RegisterGreeterServer(cmp.Server(), &greeterServer{})
}

ctx := context.Background()
err = patron.New(name, version).WithComponents(cmp).Run(ctx)
func sixth(ctx context.Context, req *sync.Request) (*sync.Response, error) {

var u examples.User
println(fmt.Sprintf("u = %v", u))
err := req.Decode(&u)
println(fmt.Sprintf("err = %v", err))
if err != nil {
log.Fatalf("failed to create and run service: %v", err)
return nil, fmt.Errorf("failed to decode request: %w", err)
}

b, err := json.Encode(&u)
if err != nil {
return nil, fmt.Errorf("failed create request: %w", err)
}

log.FromContext(ctx).Infof("request processed: %s %s", u.GetFirstname(), u.GetLastname())
return sync.NewResponse(string(b)), nil
}
168 changes: 168 additions & 0 deletions sync/http/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package http

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/beatlabs/patron/cache"
"github.com/beatlabs/patron/log"
"github.com/beatlabs/patron/sync"
)

type CacheHeader int

const (
max_age CacheHeader = iota + 1
max_stale
min_fresh
no_cache
no_store
no_transform
only_if_cached

CacheControlHeader = "CACHE-CONTROL"
)

var cacheHeaders = map[string]CacheHeader{"max-age": max_age, "max-stale": max_stale, "min-fresh": min_fresh, "no-cache": no_cache, "no-store": no_store, "no-transform": no_transform, "only-if-cached": only_if_cached}

type TimeInstant func() int64

func cacheHandler(hnd sync.ProcessorFunc, cache cache.Cache, instant TimeInstant) sync.ProcessorFunc {
return func(ctx context.Context, request *sync.Request) (response *sync.Response, e error) {

now := instant()

skipCache, onlyIfCached, ttl := extractCacheHeaders(request)

if skipCache {
return hnd(ctx, request)
}

// TODO : add metrics

key := createRequestKey(request)
if resp, ok, err := cache.Get(key); ok && err == nil {
// TODO : cache also errors ???
if r, ok := resp.(cachedResponse); ok && notExpired(now, r.lastValid, ttl) {
println(fmt.Sprintf("cache = %v", cache))
return r.response, r.err
} else {
log.Errorf("could not parse cached response from %v", resp)
}
} else if err != nil {
log.Debugf("could not read cache value for [ key = %v , err = %v ]", key, err)
}

if onlyIfCached {
// return empty response if we have cache-only header present
return sync.NewResponse([]byte{}), nil
}

// we have not encountered this key before
response, e = hnd(ctx, request)
resp := cachedResponse{
response: response,
lastValid: now,
err: e,
}
err := cache.Set(key, resp)
log.Errorf("could not cache response for request key %s %w", key, err)
return
}
}

func extractCacheHeaders(request *sync.Request) (bool, bool, int64) {
var noCache bool
var forceCache bool
var ttl int64
println(fmt.Sprintf("request = %v", request))
if CacheControl, ok := request.Headers[CacheControlHeader]; ok {
// time to live threshold
for _, header := range strings.Split(CacheControl, ",") {
println(fmt.Sprintf("header = %v", header))
keyValue := strings.Split(header, "=")
println(fmt.Sprintf("keyValue = %v", keyValue))
if cacheHeader, ok := cacheHeaders[keyValue[0]]; ok {
switch cacheHeader {
case max_stale:
/**
Indicates that the client is willing to accept a response that has
exceeded its expiration time. If max-stale is assigned a value,
then the client is willing to accept a response that has exceeded
its expiration time by no more than the specified number of
seconds. If no value is assigned to max-stale, then the client is
willing to accept a stale response of any age.
*/
fallthrough
case max_age:
/**
Indicates that the client is willing to accept a response whose
age is no greater than the specified time in seconds. Unless max-
stale directive is also included, the client is not willing to
accept a stale response.
*/
expiration, err := strconv.Atoi(keyValue[1])
if err == nil {
ttl -= int64(expiration)
}
case min_fresh:
/**
Indicates that the client is willing to accept a response whose
freshness lifetime is no less than its current age plus the
specified time in seconds. That is, the client wants a response
that will still be fresh for at least the specified number of
seconds.
*/
freshness, err := strconv.Atoi(keyValue[1])
if err == nil {
ttl += int64(freshness)
}
case no_cache:
/**
retrieve from the store
it SHOULD NOT include min-fresh, max-stale, or max-age.
*/
fallthrough
case no_store:
/**
no storage whatsoever
*/
noCache = true
case no_transform:
/**
response should be kept intact
*/
// we always use no-transform
case only_if_cached:
/**
return only if is in cache , otherwise 504
*/
forceCache = true
default:
log.Warnf("unrecognised cache header %s", header)
}
}
}
} else {
// we dont have any cache-control headers, so no intention of caching anything
noCache = true
}
return noCache, forceCache, ttl
}

type cachedResponse struct {
response *sync.Response
lastValid int64
err error
}

func createRequestKey(request *sync.Request) string {
return fmt.Sprintf("%s:%s", request.Headers, request.Fields)
}

func notExpired(now, last, ttl int64) bool {
nilExp := now+ttl <= last
return nilExp
}
169 changes: 169 additions & 0 deletions sync/http/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package http

import (
"context"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"

"github.com/beatlabs/patron/sync"
)

func TestExtractCacheHeaders(t *testing.T) {

type args struct {
headers map[string]string
noCache bool
forceCache bool
ttl int64
}

// TODO : cover the extract headers functionality from 'real' http header samples

params := []args{
{
headers: map[string]string{CacheControlHeader: "max-age=10"},
noCache: false,
forceCache: false,
ttl: -10,
},
}

for _, param := range params {
req := sync.NewRequest(map[string]string{}, nil, param.headers, nil)
noCache, forceCache, ttl := extractCacheHeaders(req)
assert.Equal(t, param.noCache, noCache)
assert.Equal(t, param.forceCache, forceCache)
assert.Equal(t, param.ttl, ttl)
}

}

func TestCacheHandler(t *testing.T) {

type args struct {
header map[string]string
fields map[string]string
response *sync.Response
timeInstance int64
err error
}

params := [][]args{
// cache expiration with max-age header
{
// initial request
{
fields: map[string]string{"VALUE": "1"},
header: map[string]string{CacheControlHeader: "max-age=10"},
response: sync.NewResponse(10),
timeInstance: 1,
err: nil,
},
// cache response
{
fields: map[string]string{"VALUE": "1"},
header: map[string]string{CacheControlHeader: "max-age=10"},
response: sync.NewResponse(10),
timeInstance: 9,
err: nil,
},
// still cached response because we are at the edge of the expiry e.g. 11 - 1 = 10
{
fields: map[string]string{"VALUE": "1"},
header: map[string]string{CacheControlHeader: "max-age=10"},
response: sync.NewResponse(10),
timeInstance: 11,
err: nil,
},
// new response because cache has expired
{
fields: map[string]string{"VALUE": "1"},
header: map[string]string{CacheControlHeader: "max-age=10"},
response: sync.NewResponse(120),
timeInstance: 12,
err: nil,
},
// make an extra request with the new cache value
{
fields: map[string]string{"VALUE": "1"},
header: map[string]string{CacheControlHeader: "max-age=10"},
response: sync.NewResponse(120),
timeInstance: 15,
err: nil,
},
// and another when the previous has expired 12 + 10 = 22
{
fields: map[string]string{"VALUE": "1"},
header: map[string]string{CacheControlHeader: "max-age=10"},
response: sync.NewResponse(230),
timeInstance: 23,
err: nil,
},
},
}

handler := func(timeInstance int64) func(ctx context.Context, request *sync.Request) (*sync.Response, error) {
return func(ctx context.Context, request *sync.Request) (*sync.Response, error) {
i, err := strconv.Atoi(request.Fields["VALUE"])
if err != nil {
return nil, err
}
return sync.NewResponse(i * 10 * int(timeInstance)), nil
}
}

cache := &testingCache{cache: make(map[string]interface{})}

for _, param := range params {
for _, arg := range param {
request := sync.NewRequest(arg.fields, nil, arg.header, nil)
// initial request
response, err := cacheHandler(handler(arg.timeInstance), cache, func() int64 {
return arg.timeInstance
})(context.Background(), request)
if arg.err != nil {
assert.Error(t, err)
assert.NotNil(t, response)
// TODO : assert type of error
} else {
assert.NoError(t, err)
assert.NotNil(t, response)
assert.Equal(t, arg.response, response)
}
}
}

}

type testingCache struct {
cache map[string]interface{}
}

func (t *testingCache) Get(key string) (interface{}, bool, error) {
r, ok := t.cache[key]
println(fmt.Sprintf("key = %v", key))
println(fmt.Sprintf("r = %v", r))
return r, ok, nil
}

func (t *testingCache) Purge() error {
for k := range t.cache {
_ = t.Remove(k)
}
return nil
}

func (t *testingCache) Remove(key string) error {
delete(t.cache, key)
return nil
}

func (t *testingCache) Set(key string, value interface{}) error {
t.cache[key] = value
println(fmt.Sprintf("key = %v", key))
println(fmt.Sprintf("value = %v", value))
return nil
}

0 comments on commit a0470c7

Please sign in to comment.