From f379e9748db426b236ac8228964cecbd0be768dc Mon Sep 17 00:00:00 2001 From: bryan newbold Date: Wed, 29 Nov 2023 19:53:22 -0800 Subject: [PATCH] automod: distinct counters (mem and redis) --- automod/countstore.go | 29 +++++++++++++++++++++++++-- automod/countstore_test.go | 40 ++++++++++++++++++++++++++++++++++++++ automod/redis_counters.go | 35 +++++++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 automod/countstore_test.go diff --git a/automod/countstore.go b/automod/countstore.go index 641594b41..056ce5bb1 100644 --- a/automod/countstore.go +++ b/automod/countstore.go @@ -17,16 +17,20 @@ type CountStore interface { GetCount(ctx context.Context, name, val, period string) (int, error) Increment(ctx context.Context, name, val string) error // TODO: batch increment method + GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) + IncrementDistinct(ctx context.Context, name, bucket, val string) error } // TODO: this implementation isn't race-safe (yet)! type MemCountStore struct { - Counts map[string]int + Counts map[string]int + DistinctCounts map[string]map[string]bool } func NewMemCountStore() MemCountStore { return MemCountStore{ - Counts: make(map[string]int), + Counts: make(map[string]int), + DistinctCounts: make(map[string]map[string]bool), } } @@ -66,3 +70,24 @@ func (s MemCountStore) Increment(ctx context.Context, name, val string) error { } return nil } + +func (s MemCountStore) GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) { + v, ok := s.DistinctCounts[PeriodBucket(name, bucket, period)] + if !ok { + return 0, nil + } + return len(v), nil +} + +func (s MemCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { + for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { + k := PeriodBucket(name, bucket, p) + m, ok := s.DistinctCounts[k] + if !ok { + m = make(map[string]bool) + } + m[val] = true + s.DistinctCounts[k] = m + } + return nil +} diff --git a/automod/countstore_test.go b/automod/countstore_test.go new file mode 100644 index 000000000..8f080718a --- /dev/null +++ b/automod/countstore_test.go @@ -0,0 +1,40 @@ +package automod + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMemCountStoreBasics(t *testing.T) { + assert := assert.New(t) + ctx := context.Background() + + cs := NewMemCountStore() + + c, err := cs.GetCount(ctx, "test1", "val1", PeriodTotal) + assert.NoError(err) + assert.Equal(0, c) + assert.NoError(cs.Increment(ctx, "test1", "val1")) + assert.NoError(cs.Increment(ctx, "test1", "val1")) + c, err = cs.GetCount(ctx, "test1", "val1", PeriodTotal) + assert.NoError(err) + assert.Equal(2, c) + + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) + assert.NoError(err) + assert.Equal(0, c) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) + assert.NoError(err) + assert.Equal(1, c) + + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "two")) + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "three")) + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) + assert.NoError(err) + assert.Equal(3, c) +} diff --git a/automod/redis_counters.go b/automod/redis_counters.go index f95fdbd8c..8076d0d5d 100644 --- a/automod/redis_counters.go +++ b/automod/redis_counters.go @@ -8,6 +8,7 @@ import ( ) var redisCountPrefix string = "count/" +var redisDistinctPrefix string = "distinct/" type RedisCountStore struct { Client *redis.Client @@ -63,3 +64,37 @@ func (s *RedisCountStore) Increment(ctx context.Context, name, val string) error _, err := multi.Exec(ctx) return err } + +func (s *RedisCountStore) GetCountDistinct(ctx context.Context, name, val, period string) (int, error) { + key := redisDistinctPrefix + PeriodBucket(name, val, period) + c, err := s.Client.PFCount(ctx, key).Result() + if err == redis.Nil { + return 0, nil + } else if err != nil { + return 0, err + } + return int(c), nil +} + +func (s *RedisCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { + + var key string + + // increment multiple counters in a single redis round-trip + multi := s.Client.Pipeline() + + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodHour) + multi.PFAdd(ctx, key, val) + multi.Expire(ctx, key, 2*time.Hour) + + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodDay) + multi.PFAdd(ctx, key, val) + multi.Expire(ctx, key, 48*time.Hour) + + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodTotal) + multi.PFAdd(ctx, key, val) + // no expiration for total + + _, err := multi.Exec(ctx) + return err +}