Skip to content

Commit

Permalink
additional channel cache improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 23, 2023
1 parent 6c4c79a commit e5ada2f
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 23 deletions.
32 changes: 15 additions & 17 deletions internal/rule/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rule

import (
"hash/fnv"
"sync"
"sync/atomic"
"time"
)
Expand All @@ -14,26 +13,28 @@ type cacheItem struct {
}

type cacheShard struct {
mu sync.RWMutex
size int
buffer []cacheItem
index int32
size int32
buffer []atomic.Value
}

type rollingCache struct {
shards []*cacheShard
}

func newRollingCache(size int, shardCount int) *rollingCache {
shardSize := size / shardCount
func newRollingCache(shardSize int, shardCount int) *rollingCache {
rc := &rollingCache{
shards: make([]*cacheShard, shardCount),
}
for i := range rc.shards {
rc.shards[i] = &cacheShard{
size: shardSize,
buffer: make([]cacheItem, shardSize),
shard := &cacheShard{
size: int32(shardSize),
buffer: make([]atomic.Value, shardSize),
}
for j := 0; j < shardSize; j++ {
shard.buffer[j].Store(cacheItem{}) // Initialize with zero value.
}
rc.shards[i] = shard
}
return rc
}
Expand All @@ -47,10 +48,9 @@ func (c *rollingCache) shardForKey(key string) *cacheShard {

func (c *rollingCache) Get(channel string) (channelOptionsResult, bool) {
shard := c.shardForKey(channel)
shard.mu.RLock()
defer shard.mu.RUnlock()
for _, item := range shard.buffer {
if item.channel == channel && time.Now().Before(time.Unix(0, item.expires)) {
for i := 0; i < int(shard.size); i++ {
item := shard.buffer[i].Load().(cacheItem)
if item.channel == channel && time.Now().UnixNano() < item.expires {
return item.value, true
}
}
Expand All @@ -59,13 +59,11 @@ func (c *rollingCache) Get(channel string) (channelOptionsResult, bool) {

func (c *rollingCache) Set(channel string, value channelOptionsResult, ttl time.Duration) {
shard := c.shardForKey(channel)
shard.mu.Lock()
defer shard.mu.Unlock()
index := int(atomic.AddInt32(&shard.index, 1) % int32(shard.size))
index := int(atomic.AddInt32(&shard.index, 1) % shard.size)
item := cacheItem{
channel: channel,
value: value,
expires: time.Now().Add(ttl).UnixNano(),
}
shard.buffer[index] = item
shard.buffer[index].Store(item)
}
4 changes: 2 additions & 2 deletions internal/rule/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ type Container struct {
}

const (
channelOptionsCacheSize = 100
channelOptionsCacheShards = 16
channelOptionsCacheSize = 8
channelOptionsCacheShards = 128
)

// NewContainer ...
Expand Down
16 changes: 12 additions & 4 deletions internal/rule/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rule

import (
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -149,12 +150,16 @@ func TestIsUserLimited(t *testing.T) {
func BenchmarkContainer_ChannelOptions(b *testing.B) {
cfg := DefaultConfig

var namespaces []ChannelNamespace
const numNamespaces = 128

for i := 0; i < 100; i++ {
var channels []string

var namespaces []ChannelNamespace
for i := 0; i < numNamespaces; i++ {
namespaces = append(namespaces, ChannelNamespace{
Name: "test" + strconv.Itoa(i),
})
channels = append(channels, "test"+strconv.Itoa(i)+":123")
}
cfg.Namespaces = namespaces

Expand All @@ -163,12 +168,15 @@ func BenchmarkContainer_ChannelOptions(b *testing.B) {

b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
nsName, _, _, ok, _ := c.ChannelOptions("test99:123")
i++
ch := channels[i%numNamespaces]
nsName, _, _, ok, _ := c.ChannelOptions(ch)
if !ok {
b.Fatal("ns not found")
}
if nsName != "test99" {
if !strings.HasPrefix(ch, nsName) {
b.Fatal("wrong ns name: " + nsName)
}
}
Expand Down

0 comments on commit e5ada2f

Please sign in to comment.