Skip to content

Commit

Permalink
chore(hyperloglog): add go test cases and minor optimizations/bugfixes (
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU authored Aug 4, 2024
1 parent 0f5f18e commit f9d7297
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 32 deletions.
27 changes: 13 additions & 14 deletions src/commands/cmd_hll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ class CommandPfAdd final : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
std::vector<uint64_t> hashes(args_.size() - 1);
for (size_t i = 1; i < args_.size(); i++) {
hashes[i - 1] = redis::HyperLogLog::HllHash(args_[i]);
DCHECK_GE(args_.size(), 2);
std::vector<uint64_t> hashes(args_.size() - 2);
for (size_t i = 2; i < args_.size(); i++) {
hashes[i - 2] = redis::HyperLogLog::HllHash(args_[i]);
}
uint64_t ret{};
auto s = hll.Add(args_[0], hashes, &ret);
auto s = hll.Add(args_[1], hashes, &ret);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand All @@ -58,11 +59,13 @@ class CommandPfCount final : public Commander {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
uint64_t ret{};
rocksdb::Status s;
if (args_.size() > 1) {
std::vector<Slice> keys(args_.begin(), args_.end());
// The first argument is the command name, so we need to skip it.
DCHECK_GE(args_.size(), 2);
if (args_.size() > 2) {
std::vector<Slice> keys(args_.begin() + 1, args_.end());
s = hll.CountMultiple(keys, &ret);
} else {
s = hll.Count(args_[0], &ret);
s = hll.Count(args_[1], &ret);
}
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
Expand All @@ -81,13 +84,9 @@ class CommandPfCount final : public Commander {
class CommandPfMerge final : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::HyperLogLog hll(srv->storage, conn->GetNamespace());
std::vector<std::string> keys(args_.begin() + 1, args_.end());
std::vector<Slice> src_user_keys;
src_user_keys.reserve(args_.size() - 1);
for (size_t i = 1; i < args_.size(); i++) {
src_user_keys.emplace_back(args_[i]);
}
auto s = hll.Merge(/*dest_user_key=*/args_[0], src_user_keys);
DCHECK_GT(args_.size(), 1);
std::vector<Slice> src_user_keys(args_.begin() + 2, args_.end());
auto s = hll.Merge(/*dest_user_key=*/args_[1], src_user_keys);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down
6 changes: 3 additions & 3 deletions src/types/hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@ void HllMerge(std::vector<std::string> *dest_registers, const std::vector<nonstd
continue;
}
if (dest_segment->empty()) {
dest_segment->resize(src_segment.size());
memcpy(dest_segment->data(), src_segment.data(), src_segment.size());
DCHECK_EQ(kHyperLogLogSegmentBytes, src_segment.size());
*dest_segment = std::string(src_segment.begin(), src_segment.end());
continue;
}
// Do physical merge
// Do physical merge for this segment.
// NOLINTNEXTLINE
uint8_t *dest_segment_data = reinterpret_cast<uint8_t *>(dest_segment->data());
for (size_t register_idx = 0; register_idx < kHyperLogLogSegmentRegisters; register_idx++) {
Expand Down
39 changes: 24 additions & 15 deletions src/types/redis_hyperloglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ rocksdb::Status HyperLogLog::Add(const Slice &user_key, const std::vector<uint64
LockGuard guard(storage_->GetLockManager(), ns_key);
HyperLogLogMetadata metadata{};
rocksdb::Status s = GetMetadata(GetOptions(), ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
if (!s.ok() && !s.IsNotFound()) {
return s;
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisHyperLogLog);
Expand Down Expand Up @@ -148,7 +150,9 @@ rocksdb::Status HyperLogLog::Add(const Slice &user_key, const std::vector<uint64
}
}
// Nothing changed, no need to flush the segments
if (*ret == 0) return rocksdb::Status::OK();
if (*ret == 0) {
return rocksdb::Status::OK();
}

// Flush dirty segments
// Release memory after batch is written
Expand Down Expand Up @@ -179,7 +183,9 @@ rocksdb::Status HyperLogLog::Count(const Slice &user_key, uint64_t *ret) {
LatestSnapShot ss(storage_);
Database::GetOptions get_options(ss.GetSnapShot());
auto s = getRegisters(get_options, ns_key, &registers);
if (!s.ok()) return s;
if (!s.ok()) {
return s;
}
}
DCHECK_EQ(kHyperLogLogSegmentCount, registers.size());
std::vector<nonstd::span<const uint8_t>> register_segments = TransformToSpan(registers);
Expand Down Expand Up @@ -236,21 +242,24 @@ rocksdb::Status HyperLogLog::Merge(const Slice &dest_user_key, const std::vector

std::string dest_key = AppendNamespacePrefix(dest_user_key);
LockGuard guard(storage_->GetLockManager(), dest_key);
// Using same snapshot for all get operations
LatestSnapShot ss(storage_);
Database::GetOptions get_options(ss.GetSnapShot());
HyperLogLogMetadata metadata;
rocksdb::Status s = GetMetadata(get_options, dest_user_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
std::vector<std::string> registers;
HyperLogLogMetadata metadata;
{
std::vector<Slice> all_user_keys;
all_user_keys.reserve(source_user_keys.size() + 1);
all_user_keys.push_back(dest_user_key);
for (const auto &source_user_key : source_user_keys) {
all_user_keys.push_back(source_user_key);
// Using same snapshot for all get operations and release it after
// finishing the merge operation
LatestSnapShot ss(storage_);
Database::GetOptions get_options(ss.GetSnapShot());
rocksdb::Status s = GetMetadata(get_options, dest_user_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
{
std::vector<Slice> all_user_keys;
all_user_keys.reserve(source_user_keys.size() + 1);
all_user_keys.push_back(dest_user_key);
for (const auto &source_user_key : source_user_keys) {
all_user_keys.push_back(source_user_key);
}
s = mergeUserKeys(get_options, all_user_keys, &registers);
}
s = mergeUserKeys(get_options, all_user_keys, &registers);
}

auto batch = storage_->GetWriteBatchBase();
Expand Down
216 changes: 216 additions & 0 deletions tests/gocase/unit/hyperloglog/hyperloglog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package hyperloglog

import (
"context"
"fmt"
"testing"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/stretchr/testify/require"
)

func TestHyperLogLog(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("basic add", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "hll").Err())

card, err := rdb.PFCount(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 0, card)
addCnt, err := rdb.PFAdd(ctx, "hll", "foo").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)
card, err = rdb.PFCount(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 1, card)
})

t.Run("duplicate add", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "hll").Err())

card, err := rdb.PFCount(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 0, card)
addCnt, err := rdb.PFAdd(ctx, "hll", "foo").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)
addCnt, err = rdb.PFAdd(ctx, "hll", "foo").Result()
require.NoError(t, err)
require.EqualValues(t, 0, addCnt)
})

t.Run("empty add", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "hll").Err())

addCnt, err := rdb.PFAdd(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 0, addCnt)

card, err := rdb.PFCount(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 0, card)
})

t.Run("multiple add", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "hll").Err())

addCnt, err := rdb.PFAdd(ctx, "hll", "a", "b", "c", "d").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

card, err := rdb.PFCount(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 4, card)

addCnt, err = rdb.PFAdd(ctx, "hll", "a", "b", "c").Result()
require.NoError(t, err)
require.EqualValues(t, 0, addCnt)

addCnt, err = rdb.PFAdd(ctx, "hll", "a", "f", "c").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

card, err = rdb.PFCount(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 5, card)
})

t.Run("multiple count", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "hll").Err())
// Delete hll1, hll2, hll3
for i := 0; i < 3; i++ {
require.NoError(t, rdb.Do(ctx, "DEL", fmt.Sprintf("hll%d", i)).Err())
}

addCnt, err := rdb.PFAdd(ctx, "hll", "a", "b", "c", "d").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

addCnt, err = rdb.PFAdd(ctx, "hll1", "a", "b", "c", "d").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

card, err := rdb.PFCount(ctx, "hll", "hll1").Result()
require.NoError(t, err)
require.EqualValues(t, 4, card)

// Order doesn't matter
card, err = rdb.PFCount(ctx, "hll1", "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 4, card)

// Count non-exist key
card, err = rdb.PFCount(ctx, "hll1", "hll2", "hll3").Result()
require.NoError(t, err)
require.EqualValues(t, 4, card)

addCnt, err = rdb.PFAdd(ctx, "hll2", "1", "2", "3").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

card, err = rdb.PFCount(ctx, "hll", "hll1", "hll2").Result()
require.NoError(t, err)
require.EqualValues(t, 7, card)

// add with overlap
addCnt, err = rdb.PFAdd(ctx, "hll3", "a", "3", "5").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

card, err = rdb.PFCount(ctx, "hll", "hll1", "hll2", "hll3").Result()
require.NoError(t, err)
require.EqualValues(t, 8, card)
})

t.Run("basic merge", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "hll").Err())
// Delete hll1, hll2, hll3, hll4
for i := 0; i < 4; i++ {
require.NoError(t, rdb.Do(ctx, "DEL", fmt.Sprintf("hll%d", i)).Err())
}

addCnt, err := rdb.PFAdd(ctx, "hll", "a", "b", "c", "d").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

// Empty merge
mergeCmd, err := rdb.PFMerge(ctx, "hll").Result()
require.NoError(t, err)
// mergeCmd result is always "OK"
require.EqualValues(t, "OK", mergeCmd)

// Count the merged key
card, err := rdb.PFCount(ctx, "hll").Result()
require.NoError(t, err)
require.EqualValues(t, 4, card)

// Merge to hll1
mergeCmd, err = rdb.PFMerge(ctx, "hll1", "hll").Result()
require.NoError(t, err)
require.EqualValues(t, "OK", mergeCmd)

// Count the merged key
card, err = rdb.PFCount(ctx, "hll1").Result()
require.NoError(t, err)
require.EqualValues(t, 4, card)

// Add more elements to hll2
addCnt, err = rdb.PFAdd(ctx, "hll2", "e", "f", "g").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

card, err = rdb.PFCount(ctx, "hll2").Result()
require.NoError(t, err)
require.EqualValues(t, 3, card)

// merge to hll3
mergeCmd, err = rdb.PFMerge(ctx, "hll3", "hll", "hll1", "hll2").Result()
require.NoError(t, err)
require.EqualValues(t, "OK", mergeCmd)

// Count the merged key
card, err = rdb.PFCount(ctx, "hll3").Result()
require.NoError(t, err)
require.EqualValues(t, 7, card)

// Add more elements to hll4
addCnt, err = rdb.PFAdd(ctx, "hll4", "h", "i", "j").Result()
require.NoError(t, err)
require.EqualValues(t, 1, addCnt)

// Merge all to existing hll4
mergeCmd, err = rdb.PFMerge(ctx, "hll4", "hll", "hll1", "hll2", "hll3").Result()
require.NoError(t, err)
require.EqualValues(t, "OK", mergeCmd)

// Count the merged key
card, err = rdb.PFCount(ctx, "hll4").Result()
require.NoError(t, err)
require.EqualValues(t, 10, card)
})
}

0 comments on commit f9d7297

Please sign in to comment.