Skip to content

Commit

Permalink
[Chore] Refactor deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Sep 8, 2024
1 parent f888eca commit 7929499
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 128 deletions.
26 changes: 13 additions & 13 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import (

// Builder is a one-shot builder for creating a cache instance.
type Builder[K comparable, V any] struct {
maximumSize *int
maximumWeight *uint64
initialCapacity *int
statsRecorder StatsRecorder
ttl *time.Duration
withVariableTTL bool
weigher func(key K, value V) uint32
deletionListener func(key K, value V, cause DeletionCause)
logger Logger
maximumSize *int
maximumWeight *uint64
initialCapacity *int
statsRecorder StatsRecorder
ttl *time.Duration
withVariableTTL bool
weigher func(key K, value V) uint32
onDeletion func(e DeletionEvent[K, V])
logger Logger
}

// NewBuilder creates a builder and sets the future cache capacity.
Expand Down Expand Up @@ -99,11 +99,11 @@ func (b *Builder[K, V]) Weigher(weigher func(key K, value V) uint32) *Builder[K,
return b
}

// DeletionListener specifies a listener instance that caches should notify each time an entry is deleted for any
// DeletionCause cause. The cache will invoke this listener in the background goroutine
// OnDeletion specifies a handler that caches should notify each time an entry is deleted for any
// DeletionCause. The cache will invoke this handler in the background goroutine
// after the entry's deletion operation has completed.
func (b *Builder[K, V]) DeletionListener(deletionListener func(key K, value V, cause DeletionCause)) *Builder[K, V] {
b.deletionListener = deletionListener
func (b *Builder[K, V]) OnDeletion(onDeletion func(e DeletionEvent[K, V])) *Builder[K, V] {
b.onDeletion = onDeletion
return b
}

Expand Down
119 changes: 48 additions & 71 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,6 @@ import (
"github.com/maypok86/otter/v2/internal/xruntime"
)

// DeletionCause the cause why a cached entry was deleted.
type DeletionCause uint8

const (
// Explicit the entry was manually deleted by the user.
Explicit DeletionCause = iota
// Replaced the entry itself was not actually deleted, but its value was replaced by the user.
Replaced
// Size the entry was evicted due to size constraints.
Size
// Expired the entry's expiration timestamp has passed.
Expired
)

func (dc DeletionCause) String() string {
switch dc {
case Explicit:
return "Explicit"
case Replaced:
return "Replaced"
case Size:
return "Size"
case Expired:
return "Expired"
default:
panic("unknown deletion cause")
}
}

const (
minWriteBufferSize uint32 = 4
pinnedWeight uint32 = 0
Expand Down Expand Up @@ -95,26 +66,26 @@ type expiryPolicy[K comparable, V any] interface {
// Cache is a structure performs a best-effort bounding of a hash table using eviction algorithm
// to determine which entries to evict when the capacity is exceeded.
type Cache[K comparable, V any] struct {
nodeManager *node.Manager[K, V]
hashmap *hashtable.Map[K, V]
policy evictionPolicy[K, V]
expiryPolicy expiryPolicy[K, V]
stats statsRecorder
logger Logger
clock *clock.Clock
stripedBuffer []*lossy.Buffer[K, V]
writeBuffer *queue.Growable[task[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
doneClose chan struct{}
weigher func(key K, value V) uint32
deletionListener func(key K, value V, cause DeletionCause)
mask uint32
ttl time.Duration
withExpiration bool
withEviction bool
withProcess bool
nodeManager *node.Manager[K, V]
hashmap *hashtable.Map[K, V]
policy evictionPolicy[K, V]
expiryPolicy expiryPolicy[K, V]
stats statsRecorder
logger Logger
clock *clock.Clock
stripedBuffer []*lossy.Buffer[K, V]
writeBuffer *queue.Growable[task[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
doneClose chan struct{}
weigher func(key K, value V) uint32
onDeletion func(e DeletionEvent[K, V])
mask uint32
ttl time.Duration
withExpiration bool
withEviction bool
withProcess bool
}

// newCache returns a new cache instance based on the settings from Config.
Expand Down Expand Up @@ -152,9 +123,9 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
doneClear: make(chan struct{}),
doneClose: make(chan struct{}, 1),
//nolint:gosec // there will never be an overflow
mask: uint32(maxStripedBufferSize - 1),
weigher: b.getWeigher(),
deletionListener: b.deletionListener,
mask: uint32(maxStripedBufferSize - 1),
weigher: b.getWeigher(),
onDeletion: b.onDeletion,
}

cache.withEviction = withEviction
Expand Down Expand Up @@ -342,7 +313,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration int64, onlyIfAbsent bool) (
func (c *Cache[K, V]) afterWrite(n, evicted node.Node[K, V]) {
if !c.withProcess {
if evicted != nil {
c.notifyDeletion(n.Key(), n.Value(), Replaced)
c.notifyDeletion(n.Key(), n.Value(), CauseReplacement)
}
return
}
Expand Down Expand Up @@ -380,22 +351,23 @@ func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) {
}

if !c.withProcess {
c.notifyDeletion(deleted.Key(), deleted.Value(), Explicit)
c.notifyDeletion(deleted.Key(), deleted.Value(), CauseInvalidation)
return
}

deleted.Die()
c.writeBuffer.Push(newDeleteTask(deleted))
}

// DeleteByFunc deletes the association for this key from the cache when the given function returns true.
func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) {
// InvalidateByFunc deletes the association for this key from the cache when the given function returns true.
func (c *Cache[K, V]) InvalidateByFunc(fn func(key K, value V) bool) {
offset := c.clock.Offset()
c.hashmap.Range(func(n node.Node[K, V]) bool {
if !n.IsAlive() || n.HasExpired(c.clock.Offset()) {
if !n.IsAlive() || n.HasExpired(offset) {
return true
}

if f(n.Key(), n.Value()) {
if fn(n.Key(), n.Value()) {
c.deleteNode(n)
}

Expand All @@ -404,19 +376,23 @@ func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) {
}

func (c *Cache[K, V]) notifyDeletion(key K, value V, cause DeletionCause) {
if c.deletionListener == nil {
if c.onDeletion == nil {
return
}

c.deletionListener(key, value, cause)
c.onDeletion(DeletionEvent[K, V]{
Key: key,
Value: value,
Cause: cause,
})
}

func (c *Cache[K, V]) deleteExpiredNode(n node.Node[K, V]) {
c.policy.Delete(n)
deleted := c.hashmap.DeleteNode(n)
if deleted != nil {
n.Die()
c.notifyDeletion(n.Key(), n.Value(), Expired)
c.notifyDeletion(n.Key(), n.Value(), CauseExpiration)
c.stats.RecordEviction(n.Weight())
}
}
Expand All @@ -441,7 +417,7 @@ func (c *Cache[K, V]) evictNode(n node.Node[K, V]) {
deleted := c.hashmap.DeleteNode(n)
if deleted != nil {
n.Die()
c.notifyDeletion(n.Key(), n.Value(), Size)
c.notifyDeletion(n.Key(), n.Value(), CauseOverflow)
c.stats.RecordEviction(n.Weight())
}
}
Expand Down Expand Up @@ -491,12 +467,12 @@ func (c *Cache[K, V]) onWrite(t task[K, V]) {
case t.isAdd():
c.addToPolicies(n)
case t.isUpdate():
c.deleteFromPolicies(t.oldNode(), Replaced)
c.deleteFromPolicies(t.oldNode(), CauseReplacement)
c.addToPolicies(n)
case t.isDelete():
c.deleteFromPolicies(n, Explicit)
c.deleteFromPolicies(n, CauseInvalidation)
case t.isExpired():
c.deleteFromPolicies(n, Expired)
c.deleteFromPolicies(n, CauseExpiration)
default:
panic("invalid task type")
}
Expand All @@ -519,20 +495,21 @@ func (c *Cache[K, V]) process() {
// Range iterates over all items in the cache.
//
// Iteration stops early when the given function returns false.
func (c *Cache[K, V]) Range(f func(key K, value V) bool) {
func (c *Cache[K, V]) Range(fn func(key K, value V) bool) {
offset := c.clock.Offset()
c.hashmap.Range(func(n node.Node[K, V]) bool {
if !n.IsAlive() || n.HasExpired(c.clock.Offset()) {
if !n.IsAlive() || n.HasExpired(offset) {
return true
}

return f(n.Key(), n.Value())
return fn(n.Key(), n.Value())
})
}

// Clear clears the hash table, all policies, buffers, etc.
// InvalidateAll discards all entries in the cache.
//
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Clear() {
func (c *Cache[K, V]) InvalidateAll() {
c.clear(newClearTask[K, V]())
}

Expand All @@ -553,7 +530,7 @@ func (c *Cache[K, V]) clear(t task[K, V]) {
<-c.doneClear
}

// Close clears the hash table, all policies, buffers, etc and stop all goroutines.
// Close discards all entries in the cache and stop all goroutines.
//
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Close() {
Expand Down
Loading

0 comments on commit 7929499

Please sign in to comment.