Skip to content

Commit

Permalink
[Chore] Use Clock instead of unixtime
Browse files Browse the repository at this point in the history
This should make the expiration policy much more accurate.
  • Loading branch information
maypok86 committed Aug 30, 2024
1 parent b2966f0 commit 00095e5
Show file tree
Hide file tree
Showing 22 changed files with 220 additions and 266 deletions.
56 changes: 22 additions & 34 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"sync"
"time"

"github.com/maypok86/otter/v2/internal/clock"
"github.com/maypok86/otter/v2/internal/expiry"
"github.com/maypok86/otter/v2/internal/generated/node"
"github.com/maypok86/otter/v2/internal/hashtable"
"github.com/maypok86/otter/v2/internal/lossy"
"github.com/maypok86/otter/v2/internal/queue"
"github.com/maypok86/otter/v2/internal/s3fifo"
"github.com/maypok86/otter/v2/internal/unixtime"
"github.com/maypok86/otter/v2/internal/xmath"
"github.com/maypok86/otter/v2/internal/xruntime"
)
Expand Down Expand Up @@ -75,19 +75,10 @@ func init() {
maxStripedBufferSize = 4 * roundedParallelism
}

func getTTL(ttl time.Duration) uint32 {
//nolint:gosec // there will never be an overflow
return uint32((ttl + time.Second - 1) / time.Second)
}

func getExpiration(ttl time.Duration) uint32 {
return unixtime.Now() + getTTL(ttl)
}

type expiryPolicy[K comparable, V any] interface {
Add(n node.Node[K, V])
Delete(n node.Node[K, V])
DeleteExpired()
DeleteExpired(nowNanos int64)
Clear()
}

Expand All @@ -100,6 +91,7 @@ type Cache[K comparable, V any] struct {
expiryPolicy expiryPolicy[K, V]
stats statsCollector
logger Logger
clock *clock.Clock
stripedBuffer []*lossy.Buffer[K, V]
writeBuffer *queue.Growable[task[K, V]]
evictionMutex sync.Mutex
Expand All @@ -110,7 +102,7 @@ type Cache[K comparable, V any] struct {
deletionListener func(key K, value V, cause DeletionCause)
capacity int
mask uint32
ttl uint32
ttl time.Duration
withExpiration bool
}

Expand Down Expand Up @@ -138,6 +130,7 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
hashmap: hashmap,
stats: newStatsCollector(b.statsCollector),
logger: b.logger,
clock: clock.New(),
stripedBuffer: stripedBuffer,
writeBuffer: queue.NewGrowable[task[K, V]](minWriteBufferSize, maxWriteBufferSize),
doneClear: make(chan struct{}),
Expand All @@ -161,13 +154,12 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
}

if b.ttl != nil {
cache.ttl = getTTL(*b.ttl)
cache.ttl = *b.ttl
}

cache.withExpiration = b.ttl != nil || b.withVariableTTL

if cache.withExpiration {
unixtime.Start()
go cache.cleanup()
}

Expand All @@ -180,6 +172,10 @@ func (c *Cache[K, V]) getReadBufferIdx() int {
return int(xruntime.Fastrand() & c.mask)
}

func (c *Cache[K, V]) getExpiration(duration time.Duration) int64 {
return c.clock.Offset() + duration.Nanoseconds()
}

// Has checks if there is an item with the given key in the cache.
func (c *Cache[K, V]) Has(key K) bool {
_, ok := c.Get(key)
Expand All @@ -204,7 +200,7 @@ func (c *Cache[K, V]) GetNode(key K) (node.Node[K, V], bool) {
return nil, false
}

if n.HasExpired() {
if n.HasExpired(c.clock.Offset()) {
// avoid duplicate push
deleted := c.hashmap.DeleteNode(n)
if deleted != nil {
Expand All @@ -227,7 +223,7 @@ func (c *Cache[K, V]) GetNode(key K) (node.Node[K, V], bool) {
// such as updating statistics or the eviction policy.
func (c *Cache[K, V]) GetNodeQuietly(key K) (node.Node[K, V], bool) {
n, ok := c.hashmap.Get(key)
if !ok || !n.IsAlive() || n.HasExpired() {
if !ok || !n.IsAlive() || n.HasExpired(c.clock.Offset()) {
return nil, false
}

Expand All @@ -253,19 +249,19 @@ func (c *Cache[K, V]) Set(key K, value V) bool {
return c.set(key, value, c.defaultExpiration(), false)
}

func (c *Cache[K, V]) defaultExpiration() uint32 {
func (c *Cache[K, V]) defaultExpiration() int64 {
if c.ttl == 0 {
return 0
}

return unixtime.Now() + c.ttl
return c.getExpiration(c.ttl)
}

// SetWithTTL associates the value with the key in this cache and sets the custom ttl for this key-value item.
//
// If it returns false, then the key-value item had too much weight and the SetWithTTL was dropped.
func (c *Cache[K, V]) SetWithTTL(key K, value V, ttl time.Duration) bool {
return c.set(key, value, getExpiration(ttl), false)
return c.set(key, value, c.getExpiration(ttl), false)
}

// SetIfAbsent if the specified key is not already associated with a value associates it with the given value.
Expand All @@ -284,10 +280,10 @@ func (c *Cache[K, V]) SetIfAbsent(key K, value V) bool {
//
// Also, it returns false if the key-value item had too much weight and the SetIfAbsent was dropped.
func (c *Cache[K, V]) SetIfAbsentWithTTL(key K, value V, ttl time.Duration) bool {
return c.set(key, value, getExpiration(ttl), true)
return c.set(key, value, c.getExpiration(ttl), true)
}

func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) bool {
func (c *Cache[K, V]) set(key K, value V, expiration int64, onlyIfAbsent bool) bool {
weight := c.weigher(key, value)
if int(weight) > c.policy.MaxAvailableWeight() {
c.stats.CollectRejectedSets(1)
Expand Down Expand Up @@ -337,7 +333,7 @@ func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) {
// DeleteByFunc deletes the association for this key from the cache when the given function returns true.
func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) {
c.hashmap.Range(func(n node.Node[K, V]) bool {
if !n.IsAlive() || n.HasExpired() {
if !n.IsAlive() || n.HasExpired(c.clock.Offset()) {
return true
}

Expand Down Expand Up @@ -376,7 +372,7 @@ func (c *Cache[K, V]) cleanup() {
return
case <-ticker.C:
c.evictionMutex.Lock()
c.expiryPolicy.DeleteExpired()
c.expiryPolicy.DeleteExpired(c.clock.Offset())
c.evictionMutex.Unlock()
}
}
Expand Down Expand Up @@ -411,15 +407,15 @@ func (c *Cache[K, V]) onWrite(t task[K, V]) {
case t.isAdd():
if n.IsAlive() {
c.expiryPolicy.Add(n)
c.policy.Add(n)
c.policy.Add(n, c.clock.Offset())
}
case t.isUpdate():
oldNode := t.oldNode()
c.expiryPolicy.Delete(oldNode)
c.policy.Delete(oldNode)
if n.IsAlive() {
c.expiryPolicy.Add(n)
c.policy.Add(n)
c.policy.Add(n, c.clock.Offset())
}
c.notifyDeletion(oldNode.Key(), oldNode.Value(), Replaced)
case t.isDelete():
Expand Down Expand Up @@ -452,7 +448,7 @@ func (c *Cache[K, V]) process() {
// Iteration stops early when the given function returns false.
func (c *Cache[K, V]) Range(f func(key K, value V) bool) {
c.hashmap.Range(func(n node.Node[K, V]) bool {
if !n.IsAlive() || n.HasExpired() {
if !n.IsAlive() || n.HasExpired(c.clock.Offset()) {
return true
}

Expand Down Expand Up @@ -483,9 +479,6 @@ func (c *Cache[K, V]) clear(t task[K, V]) {
func (c *Cache[K, V]) Close() {
c.closeOnce.Do(func() {
c.clear(newCloseTask[K, V]())
if c.withExpiration {
unixtime.Stop()
}
})
}

Expand All @@ -505,8 +498,3 @@ func (c *Cache[K, V]) Capacity() int {
func (c *Cache[K, V]) Extension() Extension[K, V] {
return newExtension(c)
}

// WithExpiration returns true if the cache was configured with the expiration policy enabled.
func (c *Cache[K, V]) WithExpiration() bool {
return c.withExpiration
}
36 changes: 23 additions & 13 deletions cmd/generator/main.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright (c) 2024 Alexey Mayshev. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
Expand Down Expand Up @@ -136,10 +150,6 @@ func (g *generator) printImports() {
g.in()
g.p("\"sync/atomic\"")
g.p("\"unsafe\"")
if g.features[expiration] {
g.p("")
g.p("\"github.com/maypok86/otter/v2/internal/unixtime\"")
}
g.out()
g.p(")")
g.p("")
Expand Down Expand Up @@ -175,7 +185,7 @@ func (g *generator) printStruct() {
if g.features[expiration] {
g.p("prevExp *%s[K, V]", g.structName)
g.p("nextExp *%s[K, V]", g.structName)
g.p("expiration uint32")
g.p("expiration int64")
}
if g.features[weight] {
g.p("weight uint32")
Expand All @@ -191,7 +201,7 @@ func (g *generator) printStruct() {

func (g *generator) printConstructors() {
g.p("// New%s creates a new %s.", g.structName, g.structName)
g.p("func New%s[K comparable, V any](key K, value V, expiration, weight uint32) Node[K, V] {", g.structName)
g.p("func New%s[K comparable, V any](key K, value V, expiration int64, weight uint32) Node[K, V] {", g.structName)
g.in()
g.p("return &%s[K, V]{", g.structName)
g.in()
Expand Down Expand Up @@ -337,18 +347,18 @@ func (g *generator) printFunctions() {
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) HasExpired() bool {", g.structName)
g.p("func (n *%s[K, V]) HasExpired(now int64) bool {", g.structName)
g.in()
if g.features[expiration] {
g.p("return n.expiration <= unixtime.Now()")
g.p("return n.expiration <= now")
} else {
g.p("return false")
}
g.out()
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) Expiration() uint32 {", g.structName)
g.p("func (n *%s[K, V]) Expiration() int64 {", g.structName)
g.in()
if g.features[expiration] {
g.p("return n.expiration")
Expand Down Expand Up @@ -503,9 +513,9 @@ type Node[K comparable, V any] interface {
// SetNextExp sets the next node in the expiration policy.
SetNextExp(v Node[K, V])
// HasExpired returns true if node has expired.
HasExpired() bool
HasExpired(now int64) bool
// Expiration returns the expiration time.
Expiration() uint32
Expiration() int64
// Weight returns the weight of the node.
Weight() uint32
// IsAlive returns true if the entry is available in the hash-table.
Expand Down Expand Up @@ -548,7 +558,7 @@ type Config struct {
}
type Manager[K comparable, V any] struct {
create func(key K, value V, expiration, weight uint32) Node[K, V]
create func(key K, value V, expiration int64, weight uint32) Node[K, V]
fromPointer func(ptr unsafe.Pointer) Node[K, V]
}
Expand All @@ -568,7 +578,7 @@ func NewManager[K comparable, V any](c Config) *Manager[K, V] {
const nodeFooter = `return m
}
func (m *Manager[K, V]) Create(key K, value V, expiration, weight uint32) Node[K, V] {
func (m *Manager[K, V]) Create(key K, value V, expiration int64, weight uint32) Node[K, V] {
return m.create(key, value, expiration, weight)
}
Expand Down
8 changes: 4 additions & 4 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (e Entry[K, V]) Value() V {
}

// Expiration returns the entry's expiration time as a unix time,
// the number of seconds elapsed since January 1, 1970 UTC.
// the number of nanoseconds elapsed since January 1, 1970 UTC.
//
// If the cache was not configured with an expiration policy then this value is always 0.
func (e Entry[K, V]) Expiration() int64 {
Expand All @@ -56,12 +56,12 @@ func (e Entry[K, V]) TTL() time.Duration {
return -1
}

now := time.Now().Unix()
now := time.Now().UnixNano()
if expiration <= now {
return 0
}

return time.Duration(expiration-now) * time.Second
return time.Duration(expiration - now)
}

// HasExpired returns true if the entry has expired.
Expand All @@ -71,7 +71,7 @@ func (e Entry[K, V]) HasExpired() bool {
return false
}

return expiration <= time.Now().Unix()
return expiration <= time.Now().UnixNano()
}

// Weight returns the entry's weight.
Expand Down
4 changes: 2 additions & 2 deletions entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func TestEntry(t *testing.T) {
}

newTTL := int64(10)
e.expiration = time.Now().Unix() + newTTL
e.expiration = time.Now().UnixNano() + (time.Duration(newTTL) * time.Second).Nanoseconds()
if ttl := e.TTL(); ttl <= 0 || ttl > time.Duration(newTTL)*time.Second {
t.Fatalf("ttl should be in the range (0, %d] seconds, but got %d seconds", newTTL, ttl/time.Second)
}
if e.HasExpired() {
t.Fatal("entry should not be expire")
}

e.expiration -= 2 * newTTL
e.expiration -= 2 * (time.Duration(newTTL) * time.Second).Nanoseconds()
if ttl := e.TTL(); ttl != 0 {
t.Fatalf("ttl should be 0 seconds, but got %d seconds", ttl/time.Second)
}
Expand Down
5 changes: 2 additions & 3 deletions extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package otter

import (
"github.com/maypok86/otter/v2/internal/generated/node"
"github.com/maypok86/otter/v2/internal/unixtime"
)

func zeroValue[V any]() V {
Expand All @@ -39,8 +38,8 @@ func newExtension[K comparable, V any](cache *Cache[K, V]) Extension[K, V] {

func (e Extension[K, V]) createEntry(n node.Node[K, V]) Entry[K, V] {
var expiration int64
if e.cache.WithExpiration() {
expiration = unixtime.StartTime() + int64(n.Expiration())
if e.cache.withExpiration {
expiration = e.cache.clock.Time(n.Expiration()).UnixNano()
}

return Entry[K, V]{
Expand Down
Loading

0 comments on commit 00095e5

Please sign in to comment.