Skip to content

Commit

Permalink
[Chore] Add a statistics counter
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Aug 28, 2024
1 parent 479b32d commit 186ff41
Show file tree
Hide file tree
Showing 6 changed files with 449 additions and 0 deletions.
90 changes: 90 additions & 0 deletions internal/xsync/adder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2023 Alexey Mayshev. All rights reserved.
// Copyright (c) 2021 Andrey Pechkurov
//
// Copyright notice. This code is a fork of xsync.Adder from this file with some changes:
// https://github.com/puzpuzpuz/xsync/blob/main/counter.go
//
// Use of this source code is governed by a MIT license that can be found
// at https://github.com/puzpuzpuz/xsync/blob/main/LICENSE

package xsync

import (
"sync"
"sync/atomic"
"unsafe"

"github.com/maypok86/otter/v2/internal/xmath"
"github.com/maypok86/otter/v2/internal/xruntime"
)

// pool for P tokens.
var tokenPool sync.Pool

// a P token is used to point at the current OS thread (P)
// on which the goroutine is run; exact identity of the thread,
// as well as P migration tolerance, is not important since
// it's used to as a best effort mechanism for assigning
// concurrent operations (goroutines) to different stripes of
// the Adder.
type token struct {
idx uint32
padding [xruntime.CacheLineSize - 4]byte
}

// A Adder is a striped int64 Adder.
//
// Should be preferred over a single atomically updated uint64
// Adder in high contention scenarios.
//
// A Adder must not be copied after first use.
type Adder struct {
stripes []astripe
mask uint32
}

type astripe struct {
adder atomic.Uint64
padding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
}

// NewAdder creates a new Adder instance.
func NewAdder() *Adder {
nstripes := xmath.RoundUpPowerOf2(xruntime.Parallelism())
return &Adder{
stripes: make([]astripe, nstripes),
mask: nstripes - 1,
}
}

// Add adds the delta to the Adder.
func (a *Adder) Add(delta uint64) {
t, ok := tokenPool.Get().(*token)
if !ok {
t = &token{
idx: xruntime.Fastrand(),
}
}
for {
stripe := &a.stripes[t.idx&a.mask]
cnt := stripe.adder.Load()
if stripe.adder.CompareAndSwap(cnt, cnt+delta) {
break
}
// Give a try with another randomly selected stripe.
t.idx = xruntime.Fastrand()
}
tokenPool.Put(t)
}

// Value returns the current Adder value.
// The returned value may not include all of the latest operations in
// presence of concurrent modifications of the Adder.
func (a *Adder) Value() uint64 {
value := uint64(0)
for i := 0; i < len(a.stripes); i++ {
stripe := &a.stripes[i]
value += stripe.adder.Load()
}
return value
}
61 changes: 61 additions & 0 deletions internal/xsync/adder_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) 2023 Alexey Mayshev. All rights reserved.
// Copyright (c) 2021 Andrey Pechkurov
//
// Copyright notice. This code is a fork of benchmarks for xsync.Counter from this file with some changes:
// https://github.com/puzpuzpuz/xsync/blob/main/counter_test.go
//
// Use of this source code is governed by a MIT license that can be found
// at https://github.com/puzpuzpuz/xsync/blob/main/LICENSE

package xsync

import (
"sync/atomic"
"testing"
)

func runBenchAdder(b *testing.B, value func() uint64, increment func(), writeRatio int) {
b.Helper()
b.ResetTimer()
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
sink := 0
for pb.Next() {
sink++
if writeRatio > 0 && sink%writeRatio == 0 {
value()
} else {
increment()
}
}
_ = sink
})
}

func benchmarkAdder(b *testing.B, writeRatio int) {
b.Helper()
a := NewAdder()
runBenchAdder(b, func() uint64 {
return a.Value()
}, func() {
a.Add(1)
}, writeRatio)
}

func BenchmarkAdder(b *testing.B) {
benchmarkAdder(b, 10000)
}

func benchmarkAtomicUint64(b *testing.B, writeRatio int) {
b.Helper()
var c atomic.Uint64
runBenchAdder(b, func() uint64 {
return c.Load()
}, func() {
c.Add(1)
}, writeRatio)
}

func BenchmarkAtomicUint64(b *testing.B) {
benchmarkAtomicUint64(b, 10000)
}
81 changes: 81 additions & 0 deletions internal/xsync/adder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright (c) 2023 Alexey Mayshev. All rights reserved.
// Copyright (c) 2021 Andrey Pechkurov
//
// Copyright notice. This code is a fork of tests for xsync.Counter from this file with some changes:
// https://github.com/puzpuzpuz/xsync/blob/main/counter_test.go
//
// Use of this source code is governed by a MIT license that can be found
// at https://github.com/puzpuzpuz/xsync/blob/main/LICENSE

package xsync

import (
"fmt"
"runtime"
"sync"
"testing"
)

func TestAdderAdd(t *testing.T) {
a := NewAdder()
for i := 0; i < 100; i++ {
if v := a.Value(); v != uint64(i*42) {
t.Fatalf("got %v, want %d", v, i*42)
}
a.Add(42)
}
}

func parallelIncrement(a *Adder, incs int, wg *sync.WaitGroup) {
for i := 0; i < incs; i++ {
a.Add(1)
}
wg.Done()
}

func testParallelIncrement(t *testing.T, modifiers, gomaxprocs int) {
t.Helper()
runtime.GOMAXPROCS(gomaxprocs)
a := NewAdder()
wg := &sync.WaitGroup{}
incs := 10_000
wg.Add(modifiers)
for i := 0; i < modifiers; i++ {
go parallelIncrement(a, incs, wg)
}
wg.Wait()
expected := uint64(modifiers * incs)
if v := a.Value(); v != expected {
t.Fatalf("got %d, want %d", v, expected)
}
}

func TestAdderParallelIncrementors(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))

tests := []struct {
modifiers int
gomaxprocs int
}{
{
modifiers: 4,
gomaxprocs: 2,
},
{
modifiers: 16,
gomaxprocs: 4,
},
{
modifiers: 64,
gomaxprocs: 8,
},
}

for i, tt := range tests {
t.Run(fmt.Sprintf("parallelIncrement-%d", i+1), func(t *testing.T) {
testParallelIncrement(t, tt.modifiers, tt.gomaxprocs)
testParallelIncrement(t, tt.modifiers, tt.gomaxprocs)
testParallelIncrement(t, tt.modifiers, tt.gomaxprocs)
})
}
}
106 changes: 106 additions & 0 deletions stats/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright (c) 2024 Alexey Mayshev. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stats

import (
"github.com/maypok86/otter/v2/internal/xsync"
"math"
"time"
)

// Counter is a goroutine-safe Collector implementation for use by otter.Cache.
type Counter struct {
hits *xsync.Adder
misses *xsync.Adder
evictions *xsync.Adder
evictionWeight *xsync.Adder
rejectedSets *xsync.Adder
loadSuccesses *xsync.Adder
loadFailures *xsync.Adder
totalLoadTime *xsync.Adder
}

// NewCounter constructs an instance with all counts initialized to zero.
func NewCounter() *Counter {
return &Counter{
hits: xsync.NewAdder(),
misses: xsync.NewAdder(),
evictions: xsync.NewAdder(),
evictionWeight: xsync.NewAdder(),
rejectedSets: xsync.NewAdder(),
loadSuccesses: xsync.NewAdder(),
loadFailures: xsync.NewAdder(),
totalLoadTime: xsync.NewAdder(),
}
}

// Snapshot returns a snapshot of this collector's values. Note that this may be an inconsistent view, as it
// may be interleaved with update operations.
//
// NOTE: the values of the metrics are undefined in case of overflow. If you require specific handling, we recommend
// implementing your own Collector.
func (c *Counter) Snapshot() Stats {
totalLoadTime := c.totalLoadTime.Value()
if totalLoadTime > uint64(math.MaxInt64) {
totalLoadTime = uint64(math.MaxInt64)
}
return Stats{
hits: c.hits.Value(),
misses: c.misses.Value(),
evictions: c.evictions.Value(),
evictionWeight: c.evictionWeight.Value(),
rejectedSets: c.rejectedSets.Value(),
loadSuccesses: c.loadSuccesses.Value(),
loadFailures: c.loadFailures.Value(),
totalLoadTime: time.Duration(totalLoadTime),
}
}

// CollectHits collects cache hits. This should be called when a cache request returns a cached value.
func (c *Counter) CollectHits(count int) {
c.hits.Add(uint64(count))
}

// CollectMisses collects cache misses. This should be called when a cache request returns a value that was not
// found in the cache.
func (c *Counter) CollectMisses(count int) {
c.misses.Add(uint64(count))
}

// CollectEviction collects the eviction of an entry from the cache. This should only been called when an entry is
// evicted due to the cache's eviction strategy, and not as a result of manual deletions.
func (c *Counter) CollectEviction(weight uint32) {
c.evictions.Add(1)
c.evictionWeight.Add(uint64(weight))
}

// CollectRejectedSets collects rejected sets due to too much weight of entries in them.
func (c *Counter) CollectRejectedSets(count int) {
c.rejectedSets.Add(uint64(count))
}

// CollectLoadSuccess collects the successful load of a new entry. This method should be called when a cache request
// causes an entry to be loaded and the loading completes successfully.
func (c *Counter) CollectLoadSuccess(loadTime time.Duration) {
c.loadSuccesses.Add(1)
c.totalLoadTime.Add(uint64(loadTime))
}

// CollectLoadFailure collects the failed load of a new entry. This method should be called when a cache request
// causes an entry to be loaded, but the loading function returns an error.
func (c *Counter) CollectLoadFailure(loadTime time.Duration) {
c.loadFailures.Add(1)
c.totalLoadTime.Add(uint64(loadTime))
}
Loading

0 comments on commit 186ff41

Please sign in to comment.