Skip to content

Commit

Permalink
[Chore] Use only one shard
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Dec 7, 2023
1 parent 96f7bd3 commit 347ea30
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 115 deletions.
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ func main() {
panic(err)
}

// ShardCount sets the number of cache shards to 256.
// The number of shards must always be a degree of two.
// Default is 128.
builder.ShardCount(256)

// StatsEnabled determines whether statistics should be calculated when the cache is running.
// By default, statistics calculating is disabled.
builder.StatsEnabled(true)
Expand Down
19 changes: 1 addition & 18 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,24 @@ package otter
import "errors"

const (
defaultShardCount = 128
defaultStatsEnabled = false
)

var (
ErrIllegalCapacity = errors.New("capacity should be positive")
ErrIllegalShardCount = errors.New("hashtable count should be positive")
)
var ErrIllegalCapacity = errors.New("capacity should be positive")

type options[K comparable, V any] struct {
capacity int
shardCount int
statsEnabled bool
costFunc func(key K, value V) uint32
}

func (o *options[K, V]) validate() error {
// shard count should be power of two.
if o.shardCount <= 0 || (o.shardCount&(o.shardCount-1)) != 0 {
return ErrIllegalShardCount
}

return nil
}

func (o *options[K, V]) toConfig() Config[K, V] {
return Config[K, V]{
Capacity: o.capacity,
ShardCount: o.shardCount,
StatsEnabled: o.statsEnabled,
CostFunc: o.costFunc,
}
Expand All @@ -57,7 +46,6 @@ func NewBuilder[K comparable, V any](capacity int) (*Builder[K, V], error) {
return &Builder[K, V]{
options: options[K, V]{
capacity: capacity,
shardCount: defaultShardCount,
statsEnabled: defaultStatsEnabled,
costFunc: func(key K, value V) uint32 {
return 1
Expand All @@ -66,11 +54,6 @@ func NewBuilder[K comparable, V any](capacity int) (*Builder[K, V], error) {
}, nil
}

func (b *Builder[K, V]) ShardCount(shardCount int) *Builder[K, V] {
b.shardCount = shardCount
return b
}

func (b *Builder[K, V]) StatsEnabled(statsEnabled bool) *Builder[K, V] {
b.statsEnabled = statsEnabled
return b
Expand Down
10 changes: 0 additions & 10 deletions builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,10 @@ func TestBuilder_NewFailed(t *testing.T) {
}
}

func TestBuilder_BuildFailed(t *testing.T) {
b := MustBuilder[int, int](10)

_, err := b.ShardCount(129).Build()
if err == nil || !errors.Is(err, ErrIllegalShardCount) {
t.Fatalf("should fail with an error %v, but got %v", ErrIllegalShardCount, err)
}
}

func TestBuilder_BuildSuccess(t *testing.T) {
b := MustBuilder[int, int](10)

c, err := b.
ShardCount(256).
StatsEnabled(true).
Cost(func(key int, value int) uint32 {
return 2
Expand Down
53 changes: 25 additions & 28 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,49 +23,48 @@ func zeroValue[V any]() V {

type Config[K comparable, V any] struct {
Capacity int
ShardCount int
StatsEnabled bool
CostFunc func(key K, value V) uint32
}

type Cache[K comparable, V any] struct {
shards []*hashtable.Map[K, V]
hashmap *hashtable.Map[K, V]
policy *s3fifo.Policy[K, V]
expirePolicy *expire.Policy[K, V]
stats *stats.Stats
readBuffers []*lossy.Buffer[node.Node[K, V]]
writeBuffer *queue.MPSC[node.WriteTask[K, V]]
closeOnce sync.Once
doneClear chan struct{}
hasher *hasher[K]
costFunc func(key K, value V) uint32
mask uint64
capacity int
mask uint32
}

func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] {
shards := make([]*hashtable.Map[K, V], 0, c.ShardCount)
readBuffers := make([]*lossy.Buffer[node.Node[K, V]], 0, c.ShardCount)
for i := 0; i < c.ShardCount; i++ {
shards = append(shards, hashtable.New[K, V]())
parallelism := xruntime.Parallelism()
roundedParallelism := int(xmath.RoundUpPowerOf2(parallelism))
writeBufferCapacity := 128 * roundedParallelism
readBuffersCount := 4 * roundedParallelism

readBuffers := make([]*lossy.Buffer[node.Node[K, V]], 0, readBuffersCount)
for i := 0; i < readBuffersCount; i++ {
readBuffers = append(readBuffers, lossy.New[node.Node[K, V]]())
}

writeBufferCapacity := 128 * int(xmath.RoundUpPowerOf2(xruntime.Parallelism()))
cache := &Cache[K, V]{
shards: shards,
hashmap: hashtable.New[K, V](),
policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)),
readBuffers: readBuffers,
writeBuffer: queue.NewMPSC[node.WriteTask[K, V]](writeBufferCapacity),
doneClear: make(chan struct{}),
hasher: newHasher[K](),
mask: uint64(c.ShardCount - 1),
mask: uint32(readBuffersCount - 1),
costFunc: c.CostFunc,
capacity: c.Capacity,
}

cache.expirePolicy = expire.NewPolicy[K, V](func(n *node.Node[K, V]) {
cache.shards[cache.getShardIdx(n.Key())].EvictNode(n)
cache.hashmap.EvictNode(n)
})
if c.StatsEnabled {
cache.stats = stats.New()
Expand All @@ -77,8 +76,8 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] {
return cache
}

func (c *Cache[K, V]) getShardIdx(key K) int {
return int(c.hasher.hash(key) & c.mask)
func (c *Cache[K, V]) getReadBufferIdx() int {
return int(xruntime.Fastrand() & c.mask)
}

func (c *Cache[K, V]) Has(key K) bool {
Expand All @@ -87,8 +86,7 @@ func (c *Cache[K, V]) Has(key K) bool {
}

func (c *Cache[K, V]) Get(key K) (V, bool) {
idx := c.getShardIdx(key)
got, ok := c.shards[idx].Get(key)
got, ok := c.hashmap.Get(key)
if !ok {
c.stats.IncMisses()
return zeroValue[V](), false
Expand All @@ -100,18 +98,19 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
return zeroValue[V](), false
}

c.afterGet(idx, got)
c.afterGet(got)
c.stats.IncHits()

return got.Value(), ok
}

func (c *Cache[K, V]) afterGet(idx int, got *node.Node[K, V]) {
func (c *Cache[K, V]) afterGet(got *node.Node[K, V]) {
idx := c.getReadBufferIdx()
pb := c.readBuffers[idx].Add(got)
if pb != nil {
deleted := c.policy.Read(pb.Deleted, pb.Returned)
for _, n := range deleted {
c.shards[c.getShardIdx(n.Key())].EvictNode(n)
c.hashmap.EvictNode(n)
}
c.readBuffers[idx].Free()
}
Expand All @@ -133,9 +132,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint64) {
return
}

idx := c.getShardIdx(key)
s := c.shards[idx]
got, ok := s.Get(key)
got, ok := c.hashmap.Get(key)
if ok {
if !got.IsExpired() {
oldCost := got.SwapCost(cost)
Expand All @@ -152,7 +149,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint64) {
}

n := node.New(key, value, expiration, cost)
evicted := s.Set(n)
evicted := c.hashmap.Set(n)
// TODO: try insert?
c.writeBuffer.Insert(node.NewAddTask(n))
if evicted != nil {
Expand All @@ -161,7 +158,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint64) {
}

func (c *Cache[K, V]) Delete(key K) {
deleted := c.shards[c.getShardIdx(key)].Delete(key)
deleted := c.hashmap.Delete(key)
if deleted != nil {
c.writeBuffer.Insert(node.NewDeleteTask(deleted))
}
Expand Down Expand Up @@ -202,7 +199,7 @@ func (c *Cache[K, V]) process() {

d := c.policy.Write(deleted, e, buffer)
for _, n := range d {
c.shards[c.getShardIdx(n.Key())].EvictNode(n)
c.hashmap.EvictNode(n)
}

buffer = buffer[:0]
Expand All @@ -217,8 +214,8 @@ func (c *Cache[K, V]) Clear() {
}

func (c *Cache[K, V]) clear(task node.WriteTask[K, V]) {
for i := 0; i < len(c.shards); i++ {
c.shards[i].Clear()
c.hashmap.Clear()
for i := 0; i < len(c.readBuffers); i++ {
c.readBuffers[i].Clear()
}

Expand Down
7 changes: 1 addition & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ require (
github.com/dolthub/maphash v0.1.0
github.com/dolthub/swiss v0.2.1
github.com/gammazero/deque v0.2.1
github.com/zeebo/xxh3 v1.0.2
)

require (
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/stretchr/testify v1.8.2 // indirect
golang.org/x/sys v0.15.0 // indirect
)
require github.com/stretchr/testify v1.8.2 // indirect
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=
github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -18,12 +16,6 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
40 changes: 0 additions & 40 deletions hasher.go

This file was deleted.

0 comments on commit 347ea30

Please sign in to comment.