From 727bec2e69958a170fc94c4ffec77e07ced25fce Mon Sep 17 00:00:00 2001 From: tutububug <40481744+tutububug@users.noreply.github.com> Date: Wed, 31 Jul 2024 13:34:23 +0800 Subject: [PATCH 1/4] feat(hyperloglog): add support of the Hyperloglog data structure (#2142) Co-authored-by: yangxiao Co-authored-by: hulk Co-authored-by: mwish --- src/commands/cmd_hll.cc | 81 +++++++++ src/storage/redis_metadata.cc | 25 ++- src/storage/redis_metadata.h | 26 ++- src/storage/storage.h | 4 + src/types/hyperloglog.cc | 232 ++++++++++++++++++++++++ src/types/hyperloglog.h | 72 ++++++++ src/types/redis_bitmap.cc | 1 - src/types/redis_hyperloglog.cc | 223 +++++++++++++++++++++++ src/types/redis_hyperloglog.h | 45 +++++ src/vendor/murmurhash2.h | 106 +++++++++++ tests/cppunit/types/hyperloglog_test.cc | 77 ++++++++ 11 files changed, 888 insertions(+), 4 deletions(-) create mode 100644 src/commands/cmd_hll.cc create mode 100644 src/types/hyperloglog.cc create mode 100644 src/types/hyperloglog.h create mode 100644 src/types/redis_hyperloglog.cc create mode 100644 src/types/redis_hyperloglog.h create mode 100644 src/vendor/murmurhash2.h create mode 100644 tests/cppunit/types/hyperloglog_test.cc diff --git a/src/commands/cmd_hll.cc b/src/commands/cmd_hll.cc new file mode 100644 index 00000000000..343aa322b4e --- /dev/null +++ b/src/commands/cmd_hll.cc @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include + +#include "commander.h" +#include "commands/command_parser.h" +#include "commands/error_constants.h" +#include "error_constants.h" +#include "parse_util.h" +#include "server/redis_reply.h" +#include "server/server.h" +#include "storage/redis_metadata.h" + +namespace redis { + +/// PFADD key [element [element ...]] +/// Complexity: O(1) for each element added. +class CommandPfAdd final : public Commander { + public: + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::HyperLogLog hll(srv->storage, conn->GetNamespace()); + std::vector hashes(args_.size() - 1); + for (size_t i = 1; i < args_.size(); i++) { + hashes[i - 1] = redis::HyperLogLog::HllHash(args_[i]); + } + uint64_t ret{}; + auto s = hll.Add(args_[0], hashes, &ret); + if (!s.ok() && !s.IsNotFound()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = redis::Integer(ret); + return Status::OK(); + } +}; + +/// PFCOUNT key [key ...] +/// Complexity: O(1) with a very small average constant time when called with a single key. +/// O(N) with N being the number of keys, and much bigger constant times, +/// when called with multiple keys. +/// +/// TODO(mwish): Currently we don't supports merge, so only one key is supported. +class CommandPfCount final : public Commander { + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::HyperLogLog hll(srv->storage, conn->GetNamespace()); + uint64_t ret{}; + auto s = hll.Count(args_[0], &ret); + if (!s.ok() && !s.IsNotFound()) { + return {Status::RedisExecErr, s.ToString()}; + } + if (s.IsNotFound()) { + ret = 0; + } + *output = redis::Integer(ret); + return Status::OK(); + } +}; + +REDIS_REGISTER_COMMANDS(MakeCmdAttr("pfadd", -2, "write", 1, 1, 1), + MakeCmdAttr("pfcount", 2, "read-only", 1, 1, 1), ); + +} // namespace redis diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index e44b39cad7c..76403faaef3 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -329,7 +329,7 @@ bool Metadata::ExpireAt(uint64_t expired_ts) const { bool Metadata::IsSingleKVType() const { return Type() == kRedisString || Type() == kRedisJson; } bool Metadata::IsEmptyableType() const { - return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter; + return IsSingleKVType() || Type() == kRedisStream || Type() == kRedisBloomFilter || Type() == kRedisHyperLogLog; } bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); } @@ -472,3 +472,26 @@ rocksdb::Status JsonMetadata::Decode(Slice *input) { return rocksdb::Status::OK(); } + +void HyperLogLogMetadata::Encode(std::string *dst) const { + Metadata::Encode(dst); + PutFixed8(dst, static_cast(this->encode_type)); +} + +rocksdb::Status HyperLogLogMetadata::Decode(Slice *input) { + if (auto s = Metadata::Decode(input); !s.ok()) { + return s; + } + + uint8_t encoded_type = 0; + if (!GetFixed8(input, &encoded_type)) { + return rocksdb::Status::InvalidArgument(kErrMetadataTooShort); + } + // Check validity of encode type + if (encoded_type > 0) { + return rocksdb::Status::InvalidArgument(fmt::format("Invalid encode type {}", encoded_type)); + } + this->encode_type = static_cast(encoded_type); + + return rocksdb::Status::OK(); +} diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 68f36b2c994..5590609be37 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -49,6 +49,7 @@ enum RedisType : uint8_t { kRedisStream = 8, kRedisBloomFilter = 9, kRedisJson = 10, + kRedisHyperLogLog = 11, }; struct RedisTypes { @@ -90,8 +91,9 @@ enum RedisCommand { kRedisCmdLMove, }; -const std::vector RedisTypeNames = {"none", "string", "hash", "list", "set", "zset", - "bitmap", "sortedint", "stream", "MBbloom--", "ReJSON-RL"}; +const std::vector RedisTypeNames = {"none", "string", "hash", "list", + "set", "zset", "bitmap", "sortedint", + "stream", "MBbloom--", "ReJSON-RL", "hyperloglog"}; constexpr const char *kErrMsgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value"; constexpr const char *kErrMsgKeyExpired = "the key was expired"; @@ -313,3 +315,23 @@ class JsonMetadata : public Metadata { void Encode(std::string *dst) const override; rocksdb::Status Decode(Slice *input) override; }; + +class HyperLogLogMetadata : public Metadata { + public: + enum class EncodeType : uint8_t { + // Redis-style dense encoding implement as bitmap like sub keys to + // store registers by segment in data column family. + // The registers are stored in 6-bit format and each segment contains + // 768 registers. + DENSE = 0, + // TODO(mwish): sparse encoding + // SPARSE = 1, + }; + + explicit HyperLogLogMetadata(bool generate_version = true) : Metadata(kRedisHyperLogLog, generate_version) {} + + void Encode(std::string *dst) const override; + rocksdb::Status Decode(Slice *input) override; + + EncodeType encode_type = EncodeType::DENSE; +}; diff --git a/src/storage/storage.h b/src/storage/storage.h index 7f31fc451a2..aa93fa27d4a 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -42,6 +42,10 @@ #include "observer_or_unique.h" #include "status.h" +#if defined(__sparc__) || defined(__arm__) +#define USE_ALIGNED_ACCESS +#endif + enum class StorageEngineType : uint16_t { RocksDB, Speedb, diff --git a/src/types/hyperloglog.cc b/src/types/hyperloglog.cc new file mode 100644 index 00000000000..80923181d2f --- /dev/null +++ b/src/types/hyperloglog.cc @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/* Redis HyperLogLog probabilistic cardinality approximation. + * This file implements the algorithm and the exported Redis commands. + * + * Copyright (c) 2014, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +// NOTE: this file is copy from redis's source: `src/hyperloglog.c` + +#include "hyperloglog.h" + +#include "vendor/murmurhash2.h" + +uint8_t HllDenseGetRegister(const uint8_t *registers, uint32_t register_index) { + uint32_t byte = (register_index * kHyperLogLogRegisterBits) / 8; + uint8_t fb = (register_index * kHyperLogLogRegisterBits) & 7; + uint8_t fb8 = 8 - fb; + uint8_t b0 = registers[byte]; + uint8_t b1 = 0; + if (fb > 8 - kHyperLogLogRegisterBits) { + b1 = registers[byte + 1]; + } + return ((b0 >> fb) | (b1 << fb8)) & kHyperLogLogRegisterMax; +} + +void HllDenseSetRegister(uint8_t *registers, uint32_t register_index, uint8_t val) { + uint32_t byte = register_index * kHyperLogLogRegisterBits / 8; + uint8_t fb = register_index * kHyperLogLogRegisterBits & 7; + uint8_t fb8 = 8 - fb; + uint8_t v = val; + registers[byte] &= ~(kHyperLogLogRegisterMax << fb); + registers[byte] |= v << fb; + if (fb > 8 - kHyperLogLogRegisterBits) { + registers[byte + 1] &= ~(kHyperLogLogRegisterMax >> fb8); + registers[byte + 1] |= v >> fb8; + } +} + +/* ========================= HyperLogLog algorithm ========================= */ + +// Reference: +// https://github.com/valkey-io/valkey/blob/14e09e981e0039edbf8c41a208a258c18624cbb7/src/hyperloglog.c#L457 +// +// Given a string element to add to the HyperLogLog, returns the length of the pattern 000..1 of the element +// hash. As a side effect 'regp' is *set to the register index this element hashes to +DenseHllResult ExtractDenseHllResult(uint64_t hash) { + /* Count the number of zeroes starting from bit kHyperLogLogRegisterCount + * (that is a power of two corresponding to the first bit we don't use + * as index). The max run can be 64-kHyperLogLogRegisterCountPow+1 = kHyperLogLogHashBitCount+1 bits. + * + * Note that the final "1" ending the sequence of zeroes must be + * included in the count, so if we find "001" the count is 3, and + * the smallest count possible is no zeroes at all, just a 1 bit + * at the first position, that is a count of 1. + * + * This may sound like inefficient, but actually in the average case + * there are high probabilities to find a 1 after a few iterations. */ + uint32_t index = hash & kHyperLogLogRegisterCountMask; /* Register index. */ + DCHECK_LT(index, kHyperLogLogRegisterCount); + hash >>= kHyperLogLogRegisterCountPow; /* Remove bits used to address the register. */ + hash |= (static_cast(1U) << kHyperLogLogHashBitCount); + uint8_t ctz = __builtin_ctzll(hash) + 1; + return DenseHllResult{index, ctz}; +} + +/* + * Compute the register histogram in the dense representation. + */ +void HllDenseRegHisto(nonstd::span registers, int *reghisto) { + /* Redis default is to use 16384 registers 6 bits each. The code works + * with other values by modifying the defines, but for our target value + * we take a faster path with unrolled loops. */ + const uint8_t *r = registers.data(); + unsigned long r0 = 0, r1 = 0, r2 = 0, r3 = 0, r4 = 0, r5 = 0, r6 = 0, r7 = 0, r8 = 0, r9 = 0, r10 = 0, r11 = 0, + r12 = 0, r13 = 0, r14 = 0, r15 = 0; + for (size_t j = 0; j < kHyperLogLogSegmentRegisters / 16; j++) { + /* Handle 16 registers per iteration. */ + r0 = r[0] & kHyperLogLogRegisterMax; + r1 = (r[0] >> 6 | r[1] << 2) & kHyperLogLogRegisterMax; + r2 = (r[1] >> 4 | r[2] << 4) & kHyperLogLogRegisterMax; + r3 = (r[2] >> 2) & kHyperLogLogRegisterMax; + r4 = r[3] & kHyperLogLogRegisterMax; + r5 = (r[3] >> 6 | r[4] << 2) & kHyperLogLogRegisterMax; + r6 = (r[4] >> 4 | r[5] << 4) & kHyperLogLogRegisterMax; + r7 = (r[5] >> 2) & kHyperLogLogRegisterMax; + r8 = r[6] & kHyperLogLogRegisterMax; + r9 = (r[6] >> 6 | r[7] << 2) & kHyperLogLogRegisterMax; + r10 = (r[7] >> 4 | r[8] << 4) & kHyperLogLogRegisterMax; + r11 = (r[8] >> 2) & kHyperLogLogRegisterMax; + r12 = r[9] & kHyperLogLogRegisterMax; + r13 = (r[9] >> 6 | r[10] << 2) & kHyperLogLogRegisterMax; + r14 = (r[10] >> 4 | r[11] << 4) & kHyperLogLogRegisterMax; + r15 = (r[11] >> 2) & kHyperLogLogRegisterMax; + + reghisto[r0]++; + reghisto[r1]++; + reghisto[r2]++; + reghisto[r3]++; + reghisto[r4]++; + reghisto[r5]++; + reghisto[r6]++; + reghisto[r7]++; + reghisto[r8]++; + reghisto[r9]++; + reghisto[r10]++; + reghisto[r11]++; + reghisto[r12]++; + reghisto[r13]++; + reghisto[r14]++; + reghisto[r15]++; + + r += 12; + } +} + +/* ========================= HyperLogLog Count ============================== + * This is the core of the algorithm where the approximated count is computed. + * The function uses the lower level HllDenseRegHisto() + * functions as helpers to compute histogram of register values part of the + * computation, which is representation-specific, while all the rest is common. */ + +/* Helper function sigma as defined in + * "New cardinality estimation algorithms for HyperLogLog sketches" + * Otmar Ertl, arXiv:1702.01284 */ +double HllSigma(double x) { + if (x == 1.) return INFINITY; + double z_prime = NAN; + double y = 1; + double z = x; + do { + x *= x; + z_prime = z; + z += x * y; + y += y; + } while (z_prime != z); + return z; +} + +/* Helper function tau as defined in + * "New cardinality estimation algorithms for HyperLogLog sketches" + * Otmar Ertl, arXiv:1702.01284 */ +double HllTau(double x) { + if (x == 0. || x == 1.) return 0.; + double z_prime = NAN; + double y = 1.0; + double z = 1 - x; + do { + x = sqrt(x); + z_prime = z; + y *= 0.5; + z -= pow(1 - x, 2) * y; + } while (z_prime != z); + return z / 3; +} + +/* Return the approximated cardinality of the set based on the harmonic + * mean of the registers values. */ +uint64_t HllDenseEstimate(const std::vector> ®isters) { + constexpr double m = kHyperLogLogRegisterCount; + int j = 0; + /* Note that reghisto size could be just kHyperLogLogHashBitCount+2, because kHyperLogLogHashBitCount+1 is + * the maximum frequency of the "000...1" sequence the hash function is + * able to return. However it is slow to check for sanity of the + * input: instead we history array at a safe size: overflows will + * just write data to wrong, but correctly allocated, places. */ + int reghisto[64] = {0}; + + /* Compute register histogram */ + for (const auto &r : registers) { + if (r.empty()) { + // Empty segment + reghisto[0] += kHyperLogLogSegmentRegisters; + } else { + HllDenseRegHisto(r, reghisto); + } + } + + /* Estimate cardinality from register histogram. See: + * "New cardinality estimation algorithms for HyperLogLog sketches" + * Otmar Ertl, arXiv:1702.01284 */ + double z = m * HllTau((m - reghisto[kHyperLogLogHashBitCount + 1]) / m); + for (j = kHyperLogLogHashBitCount; j >= 1; --j) { + z += reghisto[j]; + z *= 0.5; + } + z += m * HllSigma(reghisto[0] / m); + return static_cast(llroundl(kHyperLogLogAlpha * m * m / z)); +} diff --git a/src/types/hyperloglog.h b/src/types/hyperloglog.h new file mode 100644 index 00000000000..c99efe5c76f --- /dev/null +++ b/src/types/hyperloglog.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include +#include + +#include "redis_bitmap.h" + +/* The greater is Pow, the smaller the error. */ +constexpr uint32_t kHyperLogLogRegisterCountPow = 14; +/* The number of bits of the hash value used for determining the number of leading zeros. */ +constexpr uint32_t kHyperLogLogHashBitCount = 50; +constexpr uint32_t kHyperLogLogRegisterCount = 1 << kHyperLogLogRegisterCountPow; /* With Pow=14, 16384 registers. */ + +constexpr size_t kHyperLogLogSegmentBytes = 768; +constexpr size_t kHyperLogLogSegmentRegisters = 1024; + +constexpr uint32_t kHyperLogLogSegmentCount = kHyperLogLogRegisterCount / kHyperLogLogSegmentRegisters; +constexpr uint32_t kHyperLogLogRegisterBits = 6; +constexpr uint32_t kHyperLogLogRegisterCountMask = kHyperLogLogRegisterCount - 1; /* Mask to index register. */ +constexpr uint32_t kHyperLogLogRegisterMax = ((1 << kHyperLogLogRegisterBits) - 1); +/* constant for 0.5/ln(2) */ +constexpr double kHyperLogLogAlpha = 0.721347520444481703680; +constexpr uint32_t kHyperLogLogRegisterBytes = (kHyperLogLogRegisterCount * kHyperLogLogRegisterBits + 7) / 8; +// Copied from redis +// https://github.com/valkey-io/valkey/blob/14e09e981e0039edbf8c41a208a258c18624cbb7/src/hyperloglog.c#L472 +constexpr uint32_t kHyperLogLogHashSeed = 0xadc83b19; + +struct DenseHllResult { + uint32_t register_index; + uint8_t hll_trailing_zero; +}; + +DenseHllResult ExtractDenseHllResult(uint64_t hash); + +/** + * Store the value of the register at position 'index' into variable 'val'. + * 'registers' is an array of unsigned bytes. + */ +uint8_t HllDenseGetRegister(const uint8_t *registers, uint32_t register_index); +/** + * Set the value of the register at position 'index' to 'val'. + * 'registers' is an array of unsigned bytes. + */ +void HllDenseSetRegister(uint8_t *registers, uint32_t register_index, uint8_t val); +/** + * Estimate the cardinality of the HyperLogLog data structure. + * + * @param registers The HyperLogLog data structure. The element should be either empty + * or a kHyperLogLogSegmentBytes sized array. + */ +uint64_t HllDenseEstimate(const std::vector> ®isters); diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc index 9a08c1fe5fd..9d108d3b5b1 100644 --- a/src/types/redis_bitmap.cc +++ b/src/types/redis_bitmap.cc @@ -665,7 +665,6 @@ class Bitmap::SegmentCacheStore { metadata_cf_handle_(metadata_cf_handle), ns_key_(std::move(namespace_key)), metadata_(bitmap_metadata) {} - // Get a read-only segment by given index rocksdb::Status Get(uint32_t index, const std::string **cache) { std::string *res = nullptr; diff --git a/src/types/redis_hyperloglog.cc b/src/types/redis_hyperloglog.cc new file mode 100644 index 00000000000..aef0cc6680c --- /dev/null +++ b/src/types/redis_hyperloglog.cc @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "redis_hyperloglog.h" + +#include +#include + +#include "hyperloglog.h" +#include "vendor/murmurhash2.h" + +namespace redis { + +/// Cache for writing to a HyperLogLog. +/// +/// This is a bit like Bitmap::SegmentCacheStore, but simpler because +/// 1. We would only use it for writing, hll reading always traverses all segments. +/// 2. Some write access doesn't mark the segment as dirty, because the update value +/// is less than the current value. So that we need to export `SegmentEntry` to +/// the caller. +/// +/// When read from storage, if the segment exists and it size is not equal to +/// `kHyperLogLogRegisterBytesPerSegment`, it will be treated as a corruption. +class HllSegmentCache { + public: + struct SegmentEntry { + /// The segment data, it's would always equal to `kHyperLogLogRegisterBytesPerSegment`. + std::string data; + bool dirty; + }; + std::map segments; + + /// Get the segment from cache or storage. + /// + /// If the segment in not in the cache and storage, it will be initialized with + /// string(kHyperLogLogSegmentBytes, 0) and return OK. + template + rocksdb::Status Get(uint32_t segment_index, const GetSegmentFn &get_segment, SegmentEntry **entry) { + auto iter = segments.find(segment_index); + if (iter == segments.end()) { + std::string segment_data; + auto s = get_segment(segment_index, &segment_data); + if (!s.ok()) { + if (s.IsNotFound()) { + iter = segments.emplace(segment_index, SegmentEntry{std::move(segment_data), false}).first; + // Initialize the segment with 0 + iter->second.data.resize(kHyperLogLogSegmentBytes, 0); + *entry = &iter->second; + return rocksdb::Status::OK(); + } + return s; + } + iter = segments.emplace(segment_index, SegmentEntry{std::move(segment_data), false}).first; + } + if (iter->second.data.size() != kHyperLogLogSegmentBytes) { + return rocksdb::Status::Corruption("invalid segment size: expect=" + std::to_string(kHyperLogLogSegmentBytes) + + ", actual=" + std::to_string(iter->second.data.size())); + } + *entry = &iter->second; + return rocksdb::Status::OK(); + } +}; + +rocksdb::Status HyperLogLog::GetMetadata(Database::GetOptions get_options, const Slice &ns_key, + HyperLogLogMetadata *metadata) { + return Database::GetMetadata(get_options, {kRedisHyperLogLog}, ns_key, metadata); +} + +uint64_t HyperLogLog::HllHash(std::string_view element) { + DCHECK(element.size() <= std::numeric_limits::max()); + return HllMurMurHash64A(element.data(), static_cast(element.size()), kHyperLogLogHashSeed); +} + +/* the max 0 pattern counter of the subset the element belongs to is incremented if needed */ +rocksdb::Status HyperLogLog::Add(const Slice &user_key, const std::vector &element_hashes, uint64_t *ret) { + *ret = 0; + std::string ns_key = AppendNamespacePrefix(user_key); + + LockGuard guard(storage_->GetLockManager(), ns_key); + HyperLogLogMetadata metadata{}; + rocksdb::Status s = GetMetadata(GetOptions(), ns_key, &metadata); + if (!s.ok() && !s.IsNotFound()) return s; + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisHyperLogLog); + batch->PutLogData(log_data.Encode()); + + HllSegmentCache cache; + for (uint64_t element_hash : element_hashes) { + DenseHllResult dense_hll_result = ExtractDenseHllResult(element_hash); + uint32_t segment_index = dense_hll_result.register_index / kHyperLogLogSegmentRegisters; + uint32_t register_index_in_segment = dense_hll_result.register_index % kHyperLogLogSegmentRegisters; + HllSegmentCache::SegmentEntry *entry{nullptr}; + s = cache.Get( + segment_index, + [this, &ns_key, &metadata](uint32_t segment_index, std::string *segment) -> rocksdb::Status { + std::string sub_key = + InternalKey(ns_key, std::to_string(segment_index), metadata.version, storage_->IsSlotIdEncoded()) + .Encode(); + return storage_->Get(rocksdb::ReadOptions(), sub_key, segment); + }, + &entry); + if (!s.ok()) return s; + DCHECK(entry != nullptr); + DCHECK_EQ(kHyperLogLogSegmentBytes, entry->data.size()); + auto *segment_data = reinterpret_cast(entry->data.data()); + uint8_t old_count = HllDenseGetRegister(segment_data, register_index_in_segment); + if (dense_hll_result.hll_trailing_zero > old_count) { + HllDenseSetRegister(segment_data, register_index_in_segment, dense_hll_result.hll_trailing_zero); + entry->dirty = true; + *ret = 1; + } + } + // Nothing changed, no need to flush the segments + if (*ret == 0) return rocksdb::Status::OK(); + + // Flush dirty segments + // Release memory after batch is written + for (auto &[segment_index, entry] : cache.segments) { + if (entry.dirty) { + std::string sub_key = + InternalKey(ns_key, std::to_string(segment_index), metadata.version, storage_->IsSlotIdEncoded()).Encode(); + batch->Put(sub_key, entry.data); + entry.data.clear(); + } + } + cache.segments.clear(); + // Update metadata + { + metadata.encode_type = HyperLogLogMetadata::EncodeType::DENSE; + std::string bytes; + metadata.Encode(&bytes); + batch->Put(metadata_cf_handle_, ns_key, bytes); + } + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status HyperLogLog::Count(const Slice &user_key, uint64_t *ret) { + std::string ns_key = AppendNamespacePrefix(user_key); + *ret = 0; + std::vector registers; + { + LatestSnapShot ss(storage_); + Database::GetOptions get_options(ss.GetSnapShot()); + auto s = getRegisters(get_options, ns_key, ®isters); + if (!s.ok()) return s; + } + DCHECK_EQ(kHyperLogLogSegmentCount, registers.size()); + std::vector> register_segments; + register_segments.reserve(kHyperLogLogSegmentCount); + for (const auto ®ister_segment : registers) { + if (register_segment.empty()) { + // Empty segment + register_segments.emplace_back(); + continue; + } + // NOLINTNEXTLINE + const uint8_t *segment_data_ptr = reinterpret_cast(register_segment.data()); + register_segments.emplace_back(segment_data_ptr, register_segment.size()); + } + *ret = HllDenseEstimate(register_segments); + return rocksdb::Status::OK(); +} + +rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, const Slice &ns_key, + std::vector *register_segments) { + HyperLogLogMetadata metadata; + rocksdb::Status s = GetMetadata(get_options, ns_key, &metadata); + if (!s.ok()) { + if (s.IsNotFound()) { + // return empty registers with the right size. + register_segments->resize(kHyperLogLogSegmentCount); + return rocksdb::Status::OK(); + } + return s; + } + + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + read_options.snapshot = get_options.snapshot; + // Multi get all segments + std::vector sub_segment_keys; + sub_segment_keys.reserve(kHyperLogLogSegmentCount); + for (uint32_t i = 0; i < kHyperLogLogSegmentCount; i++) { + std::string sub_key = + InternalKey(ns_key, std::to_string(i), metadata.version, storage_->IsSlotIdEncoded()).Encode(); + sub_segment_keys.push_back(std::move(sub_key)); + } + std::vector sub_segment_slices; + sub_segment_slices.reserve(kHyperLogLogSegmentCount); + for (const auto &sub_key : sub_segment_keys) { + sub_segment_slices.emplace_back(sub_key); + } + std::vector values(kHyperLogLogSegmentCount); + std::vector statuses(kHyperLogLogSegmentCount); + storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), kHyperLogLogSegmentCount, + sub_segment_slices.data(), values.data(), statuses.data()); + for (size_t i = 0; i < kHyperLogLogSegmentCount; i++) { + if (!statuses[i].ok() && !statuses[i].IsNotFound()) { + return statuses[i]; + } + register_segments->push_back(std::move(values[i])); + } + return rocksdb::Status::OK(); +} + +} // namespace redis diff --git a/src/types/redis_hyperloglog.h b/src/types/redis_hyperloglog.h new file mode 100644 index 00000000000..d18e0335980 --- /dev/null +++ b/src/types/redis_hyperloglog.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include "storage/redis_db.h" +#include "storage/redis_metadata.h" + +namespace redis { + +class HyperLogLog : public Database { + public: + explicit HyperLogLog(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {} + rocksdb::Status Add(const Slice &user_key, const std::vector &element_hashes, uint64_t *ret); + rocksdb::Status Count(const Slice &user_key, uint64_t *ret); + // TODO(mwish): Supports merge operation and related commands + // rocksdb::Status Merge(const std::vector &user_keys); + + static uint64_t HllHash(std::string_view); + + private: + rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice &ns_key, HyperLogLogMetadata *metadata); + /// Using multi-get to acquire the register_segments + rocksdb::Status getRegisters(Database::GetOptions get_options, const Slice &ns_key, + std::vector *register_segments); +}; + +} // namespace redis diff --git a/src/vendor/murmurhash2.h b/src/vendor/murmurhash2.h new file mode 100644 index 00000000000..1cd7f6639d0 --- /dev/null +++ b/src/vendor/murmurhash2.h @@ -0,0 +1,106 @@ +/* Redis HyperLogLog probabilistic cardinality approximation. + * This file implements the algorithm and the exported Redis commands. + * + * Copyright (c) 2014, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#pragma once + +#include + +#ifndef USE_ALIGNED_ACCESS +#if defined(__sparc__) || defined(__arm__) +#define USE_ALIGNED_ACCESS +#endif +#endif + +// NOLINTBEGIN + +/* MurmurHash2, 64 bit version. + * It was modified for Redis in order to provide the same result in + * big and little endian archs (endian neutral). */ +inline uint64_t HllMurMurHash64A(const void *key, int len, uint32_t seed) { + const uint64_t m = 0xc6a4a7935bd1e995; + const int r = 47; + uint64_t h = seed ^ (len * m); + const auto *data = (const uint8_t *)key; + const uint8_t *end = data + (len - (len & 7)); + + while (data != end) { + uint64_t k = 0; + +#if (__BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__) +#ifdef USE_ALIGNED_ACCESS + memcpy(&k, data, sizeof(uint64_t)); +#else + k = *((uint64_t *)data); +#endif +#else + k = (uint64_t)data[0]; + k |= (uint64_t)data[1] << 8; + k |= (uint64_t)data[2] << 16; + k |= (uint64_t)data[3] << 24; + k |= (uint64_t)data[4] << 32; + k |= (uint64_t)data[5] << 40; + k |= (uint64_t)data[6] << 48; + k |= (uint64_t)data[7] << 56; +#endif + + k *= m; + k ^= k >> r; + k *= m; + h ^= k; + h *= m; + data += 8; + } + + switch (len & 7) { + case 7: + h ^= (uint64_t)data[6] << 48; /* fall-thru */ + case 6: + h ^= (uint64_t)data[5] << 40; /* fall-thru */ + case 5: + h ^= (uint64_t)data[4] << 32; /* fall-thru */ + case 4: + h ^= (uint64_t)data[3] << 24; /* fall-thru */ + case 3: + h ^= (uint64_t)data[2] << 16; /* fall-thru */ + case 2: + h ^= (uint64_t)data[1] << 8; /* fall-thru */ + case 1: + h ^= (uint64_t)data[0]; + h *= m; /* fall-thru */ + }; + + h ^= h >> r; + h *= m; + h ^= h >> r; + return h; +} + +// NOLINTEND diff --git a/tests/cppunit/types/hyperloglog_test.cc b/tests/cppunit/types/hyperloglog_test.cc new file mode 100644 index 00000000000..bf7c4914499 --- /dev/null +++ b/tests/cppunit/types/hyperloglog_test.cc @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include + +#include "test_base.h" +#include "types/redis_hyperloglog.h" + +class RedisHyperLogLogTest : public TestBase { + protected: + explicit RedisHyperLogLogTest() : TestBase() { + hll_ = std::make_unique(storage_.get(), "hll_ns"); + } + ~RedisHyperLogLogTest() override = default; + + std::unique_ptr hll_; + + static std::vector computeHashes(const std::vector &elements) { + std::vector hashes; + hashes.reserve(elements.size()); + for (const auto &element : elements) { + hashes.push_back(redis::HyperLogLog::HllHash(element)); + } + return hashes; + } +}; + +TEST_F(RedisHyperLogLogTest, PFADD) { + uint64_t ret = 0; + ASSERT_TRUE(hll_->Add("hll", {}, &ret).ok() && ret == 0); + // Approximated cardinality after creation is zero + ASSERT_TRUE(hll_->Count("hll", &ret).ok() && ret == 0); + // PFADD returns 1 when at least 1 reg was modified + ASSERT_TRUE(hll_->Add("hll", computeHashes({"a", "b", "c"}), &ret).ok()); + ASSERT_EQ(1, ret); + ASSERT_TRUE(hll_->Count("hll", &ret).ok()); + ASSERT_EQ(3, ret); + // PFADD returns 0 when no reg was modified + ASSERT_TRUE(hll_->Add("hll", computeHashes({"a", "b", "c"}), &ret).ok() && ret == 0); + // PFADD works with empty string + ASSERT_TRUE(hll_->Add("hll", computeHashes({""}), &ret).ok() && ret == 1); + // PFADD works with similar hash, which is likely to be in the same bucket + ASSERT_TRUE(hll_->Add("hll", {1, 2, 3, 2, 1}, &ret).ok() && ret == 1); + ASSERT_TRUE(hll_->Count("hll", &ret).ok()); + ASSERT_EQ(7, ret); +} + +TEST_F(RedisHyperLogLogTest, PFCOUNT_returns_approximated_cardinality_of_set) { + uint64_t ret = 0; + // pf add "1" to "5" + ASSERT_TRUE(hll_->Add("hll", computeHashes({"1", "2", "3", "4", "5"}), &ret).ok() && ret == 1); + // pf count is 5 + ASSERT_TRUE(hll_->Count("hll", &ret).ok() && ret == 5); + // pf add "6" to "10" + ASSERT_TRUE(hll_->Add("hll", computeHashes({"6", "7", "8", "8", "9", "10"}), &ret).ok() && ret == 1); + // pf count is 10 + ASSERT_TRUE(hll_->Count("hll", &ret).ok() && ret == 10); +} From f436ebc1eade0bb857831c7a033dae5caf5fc412 Mon Sep 17 00:00:00 2001 From: Wayne Date: Wed, 31 Jul 2024 19:39:43 +0900 Subject: [PATCH 2/4] feat(ci): integrate Rocky Linux environment into the CI workflow (#2451) Signed-off-by: Ruihua Wen --- .github/workflows/kvrocks.yaml | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/.github/workflows/kvrocks.yaml b/.github/workflows/kvrocks.yaml index 78491de3789..cb0835010c7 100644 --- a/.github/workflows/kvrocks.yaml +++ b/.github/workflows/kvrocks.yaml @@ -426,6 +426,12 @@ jobs: - name: ArchLinux image: archlinux:base compiler: gcc + - name: Rocky Linux 8 + image: rockylinux:8 + compiler: gcc + - name: Rocky Linux 9 + image: rockylinux:9 + compiler: gcc runs-on: ubuntu-22.04 container: @@ -448,6 +454,30 @@ jobs: update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-11 100 echo "NPROC=$(nproc)" >> $GITHUB_ENV + - name: Setup Rocky Linux 8 + if: ${{ startsWith(matrix.image, 'rockylinux:8') }} + run: | + dnf install -y epel-release + dnf config-manager --set-enabled powertools + dnf install -y git gcc-toolset-12 autoconf automake libtool libstdc++-static python3 python3-pip openssl-devel which cmake + source /opt/rh/gcc-toolset-12/enable + update-alternatives --install /usr/bin/g++ g++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100 + update-alternatives --install /usr/bin/cc cc /opt/rh/gcc-toolset-12/root/usr/bin/gcc 100 + update-alternatives --install /usr/bin/c++ c++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100 + echo "NPROC=$(nproc)" >> $GITHUB_ENV + + - name: Setup Rocky Linux 9 + if: ${{ startsWith(matrix.image, 'rockylinux:9') }} + run: | + dnf install -y epel-release + dnf config-manager --set-enabled crb + dnf install -y git gcc-toolset-12 autoconf automake libtool libstdc++-static python3 python3-pip openssl-devel which cmake + source /opt/rh/gcc-toolset-12/enable + update-alternatives --install /usr/bin/g++ g++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100 + update-alternatives --install /usr/bin/cc cc /opt/rh/gcc-toolset-12/root/usr/bin/gcc 100 + update-alternatives --install /usr/bin/c++ c++ /opt/rh/gcc-toolset-12/root/usr/bin/g++ 100 + echo "NPROC=$(nproc)" >> $GITHUB_ENV + - name: Cache redis id: cache-redis uses: actions/cache@v3 From b919343c70b9c20a627fc0dd8d79b3213910ed84 Mon Sep 17 00:00:00 2001 From: Twice Date: Wed, 31 Jul 2024 23:44:30 +0900 Subject: [PATCH 3/4] build(cmake): update compiler version requirement (#2455) --- CMakeLists.txt | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1719aa0c38c..3542cf6fc27 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,20 +42,20 @@ endif() find_package(Backtrace REQUIRED) if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") - if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 7) - message(FATAL_ERROR "It is expected to build kvrocks with GCC 7 or above") + if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8) + message(FATAL_ERROR "It is expected to build kvrocks with GCC 8 or above") endif() elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 5) - message(FATAL_ERROR "It is expected to build kvrocks with Clang 5 or above") + if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 8) + message(FATAL_ERROR "It is expected to build kvrocks with Clang 8 or above") endif() elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang") - if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 10) - message(FATAL_ERROR "It is expected to build kvrocks with Xcode toolchains 10 or above") + if(CMAKE_CXX_COMPILER_VERSION VERSION_LESS 11) + message(FATAL_ERROR "It is expected to build kvrocks with Xcode toolchains 11 or above") endif() else() message(WARNING "The compiler you are currently using is not officially supported, - so you can try switching to GCC>=7 or Clang>=5 if you encounter problems") + so you can try switching to GCC>=8 or Clang>=8 if you encounter problems") endif() if(CMAKE_GENERATOR STREQUAL "Ninja") From a3863f9cb8f84c15bec97a5382cd4e1bb76aa04e Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 1 Aug 2024 20:58:11 +0800 Subject: [PATCH 4/4] feat(hyperloglog): Add support of PFMERGE command (#2457) --- src/commands/cmd_hll.cc | 38 +++++-- src/types/hyperloglog.cc | 25 +++++ src/types/hyperloglog.h | 6 ++ src/types/redis_hyperloglog.cc | 136 +++++++++++++++++++++--- src/types/redis_hyperloglog.h | 20 +++- tests/cppunit/types/hyperloglog_test.cc | 99 +++++++++++++++++ 6 files changed, 298 insertions(+), 26 deletions(-) diff --git a/src/commands/cmd_hll.cc b/src/commands/cmd_hll.cc index 343aa322b4e..88545427f18 100644 --- a/src/commands/cmd_hll.cc +++ b/src/commands/cmd_hll.cc @@ -24,12 +24,8 @@ #include "commander.h" #include "commands/command_parser.h" -#include "commands/error_constants.h" -#include "error_constants.h" -#include "parse_util.h" #include "server/redis_reply.h" #include "server/server.h" -#include "storage/redis_metadata.h" namespace redis { @@ -57,13 +53,17 @@ class CommandPfAdd final : public Commander { /// Complexity: O(1) with a very small average constant time when called with a single key. /// O(N) with N being the number of keys, and much bigger constant times, /// when called with multiple keys. -/// -/// TODO(mwish): Currently we don't supports merge, so only one key is supported. class CommandPfCount final : public Commander { Status Execute(Server *srv, Connection *conn, std::string *output) override { redis::HyperLogLog hll(srv->storage, conn->GetNamespace()); uint64_t ret{}; - auto s = hll.Count(args_[0], &ret); + rocksdb::Status s; + if (args_.size() > 1) { + std::vector keys(args_.begin(), args_.end()); + s = hll.CountMultiple(keys, &ret); + } else { + s = hll.Count(args_[0], &ret); + } if (!s.ok() && !s.IsNotFound()) { return {Status::RedisExecErr, s.ToString()}; } @@ -75,7 +75,29 @@ class CommandPfCount final : public Commander { } }; +/// PFMERGE destkey [sourcekey [sourcekey ...]] +/// +/// complexity: O(N) to merge N HyperLogLogs, but with high constant times. +class CommandPfMerge final : public Commander { + Status Execute(Server *srv, Connection *conn, std::string *output) override { + redis::HyperLogLog hll(srv->storage, conn->GetNamespace()); + std::vector keys(args_.begin() + 1, args_.end()); + std::vector src_user_keys; + src_user_keys.reserve(args_.size() - 1); + for (size_t i = 1; i < args_.size(); i++) { + src_user_keys.emplace_back(args_[i]); + } + auto s = hll.Merge(/*dest_user_key=*/args_[0], src_user_keys); + if (!s.ok() && !s.IsNotFound()) { + return {Status::RedisExecErr, s.ToString()}; + } + *output = redis::SimpleString("OK"); + return Status::OK(); + } +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("pfadd", -2, "write", 1, 1, 1), - MakeCmdAttr("pfcount", 2, "read-only", 1, 1, 1), ); + MakeCmdAttr("pfcount", -2, "read-only", 1, -1, 1), + MakeCmdAttr("pfmerge", -2, "write", 1, -1, 1), ); } // namespace redis diff --git a/src/types/hyperloglog.cc b/src/types/hyperloglog.cc index 80923181d2f..831988debef 100644 --- a/src/types/hyperloglog.cc +++ b/src/types/hyperloglog.cc @@ -157,6 +157,31 @@ void HllDenseRegHisto(nonstd::span registers, int *reghisto) { } } +void HllMerge(std::vector *dest_registers, const std::vector> ®isters) { + for (size_t segment_id = 0; segment_id < kHyperLogLogSegmentCount; segment_id++) { + std::string *dest_segment = &dest_registers->at(segment_id); + nonstd::span src_segment = registers[segment_id]; + if (src_segment.empty()) { + continue; + } + if (dest_segment->empty()) { + dest_segment->resize(src_segment.size()); + memcpy(dest_segment->data(), src_segment.data(), src_segment.size()); + continue; + } + // Do physical merge + // NOLINTNEXTLINE + uint8_t *dest_segment_data = reinterpret_cast(dest_segment->data()); + for (size_t register_idx = 0; register_idx < kHyperLogLogSegmentRegisters; register_idx++) { + uint8_t val = HllDenseGetRegister(src_segment.data(), register_idx); + uint8_t previous_val = HllDenseGetRegister(dest_segment_data, register_idx); + if (val > previous_val) { + HllDenseSetRegister(dest_segment_data, register_idx, val); + } + } + } +} + /* ========================= HyperLogLog Count ============================== * This is the core of the algorithm where the approximated count is computed. * The function uses the lower level HllDenseRegHisto() diff --git a/src/types/hyperloglog.h b/src/types/hyperloglog.h index c99efe5c76f..2071f90060b 100644 --- a/src/types/hyperloglog.h +++ b/src/types/hyperloglog.h @@ -70,3 +70,9 @@ void HllDenseSetRegister(uint8_t *registers, uint32_t register_index, uint8_t va * or a kHyperLogLogSegmentBytes sized array. */ uint64_t HllDenseEstimate(const std::vector> ®isters); + +/** + * Merge by computing MAX(registers_max[i],registers[i]) the HyperLogLog 'registers' + * with an array of uint8_t kHyperLogLogRegisterCount registers pointed by 'dest_registers'. + */ +void HllMerge(std::vector *dest_registers, const std::vector> ®isters); diff --git a/src/types/redis_hyperloglog.cc b/src/types/redis_hyperloglog.cc index aef0cc6680c..f83c7e93048 100644 --- a/src/types/redis_hyperloglog.cc +++ b/src/types/redis_hyperloglog.cc @@ -28,6 +28,25 @@ namespace redis { +namespace { +template +std::vector> TransformToSpan(const std::vector ®isters) { + std::vector> register_segments; + register_segments.reserve(kHyperLogLogSegmentCount); + for (const auto ®ister_segment : registers) { + if (register_segment.empty()) { + // Empty segment + register_segments.emplace_back(); + continue; + } + // NOLINTNEXTLINE + const uint8_t *segment_data_ptr = reinterpret_cast(register_segment.data()); + register_segments.emplace_back(segment_data_ptr, register_segment.size()); + } + return register_segments; +} +} // namespace + /// Cache for writing to a HyperLogLog. /// /// This is a bit like Bitmap::SegmentCacheStore, but simpler because @@ -163,22 +182,101 @@ rocksdb::Status HyperLogLog::Count(const Slice &user_key, uint64_t *ret) { if (!s.ok()) return s; } DCHECK_EQ(kHyperLogLogSegmentCount, registers.size()); - std::vector> register_segments; - register_segments.reserve(kHyperLogLogSegmentCount); - for (const auto ®ister_segment : registers) { - if (register_segment.empty()) { - // Empty segment - register_segments.emplace_back(); + std::vector> register_segments = TransformToSpan(registers); + *ret = HllDenseEstimate(register_segments); + return rocksdb::Status::OK(); +} + +rocksdb::Status HyperLogLog::mergeUserKeys(Database::GetOptions get_options, const std::vector &user_keys, + std::vector *register_segments) { + DCHECK_GE(user_keys.size(), static_cast(1)); + + std::string first_ns_key = AppendNamespacePrefix(user_keys[0]); + rocksdb::Status s = getRegisters(get_options, first_ns_key, register_segments); + if (!s.ok()) return s; + // The set of keys that have been seen so far + std::unordered_set seend_user_keys; + seend_user_keys.emplace(user_keys[0].ToStringView()); + + for (size_t idx = 1; idx < user_keys.size(); idx++) { + rocksdb::Slice source_user_key = user_keys[idx]; + if (!seend_user_keys.emplace(source_user_key.ToStringView()).second) { + // Skip duplicate keys continue; } - // NOLINTNEXTLINE - const uint8_t *segment_data_ptr = reinterpret_cast(register_segment.data()); - register_segments.emplace_back(segment_data_ptr, register_segment.size()); + std::string source_key = AppendNamespacePrefix(source_user_key); + std::vector source_registers; + s = getRegisters(get_options, source_key, &source_registers); + if (!s.ok()) return s; + DCHECK_EQ(kHyperLogLogSegmentCount, source_registers.size()); + DCHECK_EQ(kHyperLogLogSegmentCount, register_segments->size()); + std::vector> source_register_span = TransformToSpan(source_registers); + HllMerge(register_segments, source_register_span); } - *ret = HllDenseEstimate(register_segments); return rocksdb::Status::OK(); } +rocksdb::Status HyperLogLog::CountMultiple(const std::vector &user_key, uint64_t *ret) { + DCHECK_GT(user_key.size(), static_cast(1)); + std::vector register_segments; + // Using same snapshot for all get operations + LatestSnapShot ss(storage_); + Database::GetOptions get_options(ss.GetSnapShot()); + auto s = mergeUserKeys(get_options, user_key, ®ister_segments); + if (!s.ok()) return s; + std::vector> register_segment_span = TransformToSpan(register_segments); + *ret = HllDenseEstimate(register_segment_span); + return rocksdb::Status::OK(); +} + +rocksdb::Status HyperLogLog::Merge(const Slice &dest_user_key, const std::vector &source_user_keys) { + if (source_user_keys.empty()) { + return rocksdb::Status::OK(); + } + + std::string dest_key = AppendNamespacePrefix(dest_user_key); + LockGuard guard(storage_->GetLockManager(), dest_key); + // Using same snapshot for all get operations + LatestSnapShot ss(storage_); + Database::GetOptions get_options(ss.GetSnapShot()); + HyperLogLogMetadata metadata; + rocksdb::Status s = GetMetadata(get_options, dest_user_key, &metadata); + if (!s.ok() && !s.IsNotFound()) return s; + std::vector registers; + { + std::vector all_user_keys; + all_user_keys.reserve(source_user_keys.size() + 1); + all_user_keys.push_back(dest_user_key); + for (const auto &source_user_key : source_user_keys) { + all_user_keys.push_back(source_user_key); + } + s = mergeUserKeys(get_options, all_user_keys, ®isters); + } + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisHyperLogLog); + batch->PutLogData(log_data.Encode()); + for (uint32_t i = 0; i < kHyperLogLogSegmentCount; i++) { + if (registers[i].empty()) { + continue; + } + std::string sub_key = + InternalKey(dest_key, std::to_string(i), metadata.version, storage_->IsSlotIdEncoded()).Encode(); + batch->Put(sub_key, registers[i]); + // Release memory after batch is written + registers[i].clear(); + } + // Metadata + { + metadata.encode_type = HyperLogLogMetadata::EncodeType::DENSE; + std::string bytes; + metadata.Encode(&bytes); + batch->Put(metadata_cf_handle_, dest_key, bytes); + } + + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, const Slice &ns_key, std::vector *register_segments) { HyperLogLogMetadata metadata; @@ -207,15 +305,27 @@ rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, cons for (const auto &sub_key : sub_segment_keys) { sub_segment_slices.emplace_back(sub_key); } - std::vector values(kHyperLogLogSegmentCount); + register_segments->resize(kHyperLogLogSegmentCount); std::vector statuses(kHyperLogLogSegmentCount); storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), kHyperLogLogSegmentCount, - sub_segment_slices.data(), values.data(), statuses.data()); + sub_segment_slices.data(), register_segments->data(), statuses.data()); for (size_t i = 0; i < kHyperLogLogSegmentCount; i++) { if (!statuses[i].ok() && !statuses[i].IsNotFound()) { + register_segments->at(i).clear(); return statuses[i]; } - register_segments->push_back(std::move(values[i])); + } + return rocksdb::Status::OK(); +} + +rocksdb::Status HyperLogLog::getRegisters(Database::GetOptions get_options, const Slice &ns_key, + std::vector *register_segments) { + std::vector pinnable_slices; + rocksdb::Status s = getRegisters(get_options, ns_key, &pinnable_slices); + if (!s.ok()) return s; + register_segments->reserve(kHyperLogLogSegmentCount); + for (auto &pinnable_slice : pinnable_slices) { + register_segments->push_back(pinnable_slice.ToString()); } return rocksdb::Status::OK(); } diff --git a/src/types/redis_hyperloglog.h b/src/types/redis_hyperloglog.h index d18e0335980..6b2e441b668 100644 --- a/src/types/redis_hyperloglog.h +++ b/src/types/redis_hyperloglog.h @@ -30,16 +30,26 @@ class HyperLogLog : public Database { explicit HyperLogLog(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {} rocksdb::Status Add(const Slice &user_key, const std::vector &element_hashes, uint64_t *ret); rocksdb::Status Count(const Slice &user_key, uint64_t *ret); - // TODO(mwish): Supports merge operation and related commands - // rocksdb::Status Merge(const std::vector &user_keys); + /// The count when user_keys.size() is greater than 1. + rocksdb::Status CountMultiple(const std::vector &user_key, uint64_t *ret); + rocksdb::Status Merge(const Slice &dest_user_key, const std::vector &source_user_keys); static uint64_t HllHash(std::string_view); private: - rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice &ns_key, HyperLogLogMetadata *metadata); + [[nodiscard]] rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice &ns_key, + HyperLogLogMetadata *metadata); + + [[nodiscard]] rocksdb::Status mergeUserKeys(Database::GetOptions get_options, const std::vector &user_keys, + std::vector *register_segments); /// Using multi-get to acquire the register_segments - rocksdb::Status getRegisters(Database::GetOptions get_options, const Slice &ns_key, - std::vector *register_segments); + /// + /// If the metadata is not found, register_segments will be initialized with 16 empty slices. + [[nodiscard]] rocksdb::Status getRegisters(Database::GetOptions get_options, const Slice &ns_key, + std::vector *register_segments); + /// Same with getRegisters, but the result is stored in a vector of strings. + [[nodiscard]] rocksdb::Status getRegisters(Database::GetOptions get_options, const Slice &ns_key, + std::vector *register_segments); }; } // namespace redis diff --git a/tests/cppunit/types/hyperloglog_test.cc b/tests/cppunit/types/hyperloglog_test.cc index bf7c4914499..234b688e8ba 100644 --- a/tests/cppunit/types/hyperloglog_test.cc +++ b/tests/cppunit/types/hyperloglog_test.cc @@ -32,6 +32,22 @@ class RedisHyperLogLogTest : public TestBase { } ~RedisHyperLogLogTest() override = default; + void SetUp() override { + TestBase::SetUp(); + [[maybe_unused]] auto s = hll_->Del("hll"); + for (int x = 1; x <= 3; x++) { + s = hll_->Del("hll" + std::to_string(x)); + } + } + + void TearDown() override { + TestBase::SetUp(); + [[maybe_unused]] auto s = hll_->Del("hll"); + for (int x = 1; x <= 3; x++) { + s = hll_->Del("hll" + std::to_string(x)); + } + } + std::unique_ptr hll_; static std::vector computeHashes(const std::vector &elements) { @@ -75,3 +91,86 @@ TEST_F(RedisHyperLogLogTest, PFCOUNT_returns_approximated_cardinality_of_set) { // pf count is 10 ASSERT_TRUE(hll_->Count("hll", &ret).ok() && ret == 10); } + +TEST_F(RedisHyperLogLogTest, PFMERGE_results_on_the_cardinality_of_union_of_sets) { + uint64_t ret = 0; + // pf add hll1 a b c + ASSERT_TRUE(hll_->Add("hll1", computeHashes({"a", "b", "c"}), &ret).ok() && ret == 1); + // pf add hll2 b c d + ASSERT_TRUE(hll_->Add("hll2", computeHashes({"b", "c", "d"}), &ret).ok() && ret == 1); + // pf add hll3 c d e + ASSERT_TRUE(hll_->Add("hll3", computeHashes({"c", "d", "e"}), &ret).ok() && ret == 1); + // pf merge hll hll1 hll2 hll3 + ASSERT_TRUE(hll_->Merge("hll", {"hll1", "hll2", "hll3"}).ok()); + // pf count hll is 5 + ASSERT_TRUE(hll_->Count("hll", &ret).ok()); + ASSERT_EQ(5, ret); +} + +TEST_F(RedisHyperLogLogTest, PFCOUNT_multiple) { + uint64_t ret = 0; + ASSERT_TRUE(hll_->CountMultiple({"hll1", "hll2", "hll3"}, &ret).ok()); + ASSERT_EQ(0, ret); + // pf add hll1 a b c + ASSERT_TRUE(hll_->Add("hll1", computeHashes({"a", "b", "c"}), &ret).ok() && ret == 1); + ASSERT_TRUE(hll_->Count("hll1", &ret).ok()); + ASSERT_EQ(3, ret); + ASSERT_TRUE(hll_->CountMultiple({"hll1", "hll2", "hll3"}, &ret).ok()); + ASSERT_EQ(3, ret); + // pf add hll2 b c d + ASSERT_TRUE(hll_->Add("hll2", computeHashes({"b", "c", "d"}), &ret).ok() && ret == 1); + ASSERT_TRUE(hll_->CountMultiple({"hll1", "hll2", "hll3"}, &ret).ok()); + ASSERT_EQ(4, ret); + // pf add hll3 c d e + ASSERT_TRUE(hll_->Add("hll3", computeHashes({"c", "d", "e"}), &ret).ok() && ret == 1); + ASSERT_TRUE(hll_->CountMultiple({"hll1", "hll2", "hll3"}, &ret).ok()); + ASSERT_EQ(5, ret); + // pf merge hll hll1 hll2 hll3 + ASSERT_TRUE(hll_->Merge("hll", {"hll1", "hll2", "hll3"}).ok()); + // pf count hll is 5 + ASSERT_TRUE(hll_->Count("hll", &ret).ok()); + ASSERT_EQ(5, ret); + ASSERT_TRUE(hll_->CountMultiple({"hll1", "hll2", "hll3", "hll"}, &ret).ok()); + ASSERT_EQ(5, ret); +} + +TEST_F(RedisHyperLogLogTest, PFCOUNT_multiple_keys_merge_returns_cardinality_of_union_1) { + for (int x = 1; x < 1000; x++) { + uint64_t ret = 0; + ASSERT_TRUE(hll_->Add("hll0", computeHashes({"foo-" + std::to_string(x)}), &ret).ok()); + ASSERT_TRUE(hll_->Add("hll1", computeHashes({"bar-" + std::to_string(x)}), &ret).ok()); + ASSERT_TRUE(hll_->Add("hll2", computeHashes({"zap-" + std::to_string(x)}), &ret).ok()); + std::vector cards(3); + ASSERT_TRUE(hll_->Count("hll0", &cards[0]).ok()); + ASSERT_TRUE(hll_->Count("hll1", &cards[1]).ok()); + ASSERT_TRUE(hll_->Count("hll2", &cards[2]).ok()); + auto card = static_cast(cards[0] + cards[1] + cards[2]); + double realcard = x * 3; + // assert the ABS of 'card' and 'realcart' is within 5% of the cardinality + double left = std::abs(card - realcard); + double right = card / 100 * 5; + ASSERT_LT(left, right) << "left : " << left << ", right: " << right; + } +} + +TEST_F(RedisHyperLogLogTest, PFCOUNT_multiple_keys_merge_returns_cardinality_of_union_2) { + std::srand(time(nullptr)); + std::vector realcard_vec; + for (auto i = 1; i < 1000; i++) { + for (auto j = 0; j < 3; j++) { + uint64_t ret = 0; + int rint = std::rand() % 20000; + ASSERT_TRUE(hll_->Add("hll" + std::to_string(j), computeHashes({std::to_string(rint)}), &ret).ok()); + realcard_vec.push_back(rint); + } + } + std::vector cards(3); + ASSERT_TRUE(hll_->Count("hll0", &cards[0]).ok()); + ASSERT_TRUE(hll_->Count("hll1", &cards[1]).ok()); + ASSERT_TRUE(hll_->Count("hll2", &cards[2]).ok()); + auto card = static_cast(cards[0] + cards[1] + cards[2]); + auto realcard = static_cast(realcard_vec.size()); + double left = std::abs(card - realcard); + double right = card / 100 * 5; + ASSERT_LT(left, right) << "left : " << left << ", right: " << right; +}