Skip to content

Commit

Permalink
Merge branch 'unstable' into add-command-XCLAIM
Browse files Browse the repository at this point in the history
  • Loading branch information
Beihao-Zhou authored May 12, 2024
2 parents 5d3f353 + 1e23484 commit 61881f1
Show file tree
Hide file tree
Showing 66 changed files with 3,259 additions and 287 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/kvrocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ jobs:
GOCASE_RUN_ARGS=""
if [[ -n "${{ matrix.with_openssl }}" ]] && [[ "${{ matrix.os }}" == ubuntu* ]]; then
git clone https://github.com/jsha/minica
cd minica && go build && cd ..
cd minica && git checkout 96a5c93723cf3d34b50b3e723a9f05cd3765bc67 && go build && cd ..
./minica/minica --domains localhost
cp localhost/cert.pem tests/gocase/tls/cert/server.crt
cp localhost/key.pem tests/gocase/tls/cert/server.key
Expand Down Expand Up @@ -376,7 +376,7 @@ jobs:
name: Check Docker image
needs: [precondition, check-and-lint, check-typos]
if: ${{ needs.precondition.outputs.docs_only != 'true' }}
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- name: Get core numbers
Expand Down
4 changes: 4 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The Apache Software Foundation (http://www.apache.org/).

================================================================

Thanks to designers Lingyu Tian and Shili Fan for contributing the logo of Kvrocks.

================================================================

This product includes a number of Dependencies with separate copyright notices
and license terms. Your use of these submodules is subject to the terms and
conditions of the following licenses.
Expand Down
37 changes: 0 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ Kvrocks has the following key features:
* High Availability: Support Redis sentinel to failover when master or slave was failed.
* Cluster: Centralized management but accessible via any Redis cluster client.

Thanks to designers [Lingyu Tian](https://github.com/tianlingyu1997) and Shili Fan for contributing the logo of Kvrocks.

## Who uses Kvrocks

You can find Kvrocks users at [the Users page](https://kvrocks.apache.org/users/).
Expand Down Expand Up @@ -185,41 +183,6 @@ Documents are hosted at the [official website](https://kvrocks.apache.org/docs/g

Kvrocks community welcomes all forms of contribution and you can find out how to get involved on the [Community](https://kvrocks.apache.org/community/) and [How to Contribute](https://kvrocks.apache.org/community/contributing) pages.

## Performance

### Hardware

* CPU: 48 cores Intel(R) Xeon(R) CPU E5-2650 v4 @ 2.20GHz
* Memory: 32 GiB
* NET: Intel Corporation I350 Gigabit Network Connection
* DISK: 2TB NVMe Intel SSD DC P4600

> Benchmark Client: multi-thread redis-benchmark(unstable branch)
### 1. Commands QPS

> kvrocks: workers = 16, benchmark: 8 threads/ 512 conns / 128 payload
latency: 99.9% < 10ms

![image](assets/chart-commands.png)

### 2. QPS on different payloads

> kvrocks: workers = 16, benchmark: 8 threads/ 512 conns
latency: 99.9% < 10ms

![image](assets/chart-values.png)

### 3. QPS on different workers

> kvrocks: workers = 16, benchmark: 8 threads/ 512 conns / 128 payload
latency: 99.9% < 10ms

![image](assets/chart-threads.png)

## License

Apache Kvrocks is licensed under the Apache License Version 2.0. See the [LICENSE](LICENSE) file for details.
Expand Down
Binary file added assets/KQIR.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed assets/chart-commands.png
Binary file not shown.
Binary file removed assets/chart-threads.png
Binary file not shown.
Binary file removed assets/chart-values.png
Binary file not shown.
4 changes: 2 additions & 2 deletions cmake/jsoncons.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(jsoncons
danielaparker/jsoncons v0.174.0
MD5=1e620831477adbed19e85248c33cbb89
danielaparker/jsoncons v0.175.0
MD5=1ee4a655719dc3333b5c1fbf5a6e9321
)

FetchContent_MakeAvailableWithArgs(jsoncons
Expand Down
96 changes: 48 additions & 48 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

#include <config/config_util.h>

#include <array>
#include <cstring>
#include <fstream>
#include <memory>
#include <vector>

#include "cluster/cluster_defs.h"
#include "commands/commander.h"
Expand All @@ -37,11 +39,11 @@
#include "time_util.h"

ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
std::bitset<kClusterSlots> slots)
const std::bitset<kClusterSlots> &slots)
: id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {}

Cluster::Cluster(Server *srv, std::vector<std::string> binds, int port)
: srv_(srv), binds_(std::move(binds)), port_(port), size_(0), version_(-1), myself_(nullptr) {
: srv_(srv), binds_(std::move(binds)), port_(port) {
for (auto &slots_node : slots_nodes_) {
slots_node = nullptr;
}
Expand All @@ -53,10 +55,10 @@ Cluster::Cluster(Server *srv, std::vector<std::string> binds, int port)
// cluster data, so these commands should be executed exclusively, and ReadWriteLock
// also can guarantee accessing data is safe.
bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {
for (auto v : {"setnodes", "setnodeid", "setslot", "import", "reset"}) {
if (util::EqualICase(v, subcommand)) return true;
}
return false;
std::array subcommands = {"setnodes", "setnodeid", "setslot", "import", "reset"};

return std::any_of(std::begin(subcommands), std::end(subcommands),
[&subcommand](const std::string &val) { return util::EqualICase(val, subcommand); });
}

Status Cluster::SetNodeId(const std::string &node_id) {
Expand Down Expand Up @@ -170,26 +172,26 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b
size_ = 0;

// Update slots to nodes
for (const auto &n : slots_nodes) {
slots_nodes_[n.first] = nodes_[n.second];
for (const auto &[slot, node_id] : slots_nodes) {
slots_nodes_[slot] = nodes_[node_id];
}

// Update replicas info and size
for (auto &n : nodes_) {
if (n.second->role == kClusterSlave) {
if (nodes_.find(n.second->master_id) != nodes_.end()) {
nodes_[n.second->master_id]->replicas.push_back(n.first);
for (const auto &[node_id, node] : nodes_) {
if (node->role == kClusterSlave) {
if (nodes_.find(node->master_id) != nodes_.end()) {
nodes_[node->master_id]->replicas.push_back(node_id);
}
}
if (n.second->role == kClusterMaster && n.second->slots.count() > 0) {
if (node->role == kClusterMaster && node->slots.count() > 0) {
size_++;
}
}

if (myid_.empty() || force) {
for (auto &n : nodes_) {
if (n.second->port == port_ && util::MatchListeningIP(binds_, n.second->host)) {
myid_ = n.first;
for (const auto &[node_id, node] : nodes_) {
if (node->port == port_ && util::MatchListeningIP(binds_, node->host)) {
myid_ = node_id;
break;
}
}
Expand All @@ -210,9 +212,9 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b

// Clear data of migrated slots
if (!migrated_slots_.empty()) {
for (auto &it : migrated_slots_) {
if (slots_nodes_[it.first] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, it.first);
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slots: " << s.ToString();
}
Expand Down Expand Up @@ -521,34 +523,32 @@ std::string Cluster::genNodesDescription() {

auto now = util::GetTimeStampMS();
std::string nodes_desc;
for (const auto &item : nodes_) {
const std::shared_ptr<ClusterNode> n = item.second;

for (const auto &[_, node] : nodes_) {
std::string node_str;
// ID, host, port
node_str.append(n->id + " ");
node_str.append(fmt::format("{}:{}@{} ", n->host, n->port, n->port + kClusterPortIncr));
node_str.append(node->id + " ");
node_str.append(fmt::format("{}:{}@{} ", node->host, node->port, node->port + kClusterPortIncr));

// Flags
if (n->id == myid_) node_str.append("myself,");
if (n->role == kClusterMaster) {
if (node->id == myid_) node_str.append("myself,");
if (node->role == kClusterMaster) {
node_str.append("master - ");
} else {
node_str.append("slave " + n->master_id + " ");
node_str.append("slave " + node->master_id + " ");
}

// Ping sent, pong received, config epoch, link status
node_str.append(fmt::format("{} {} {} connected", now - 1, now, version_));

if (n->role == kClusterMaster) {
auto iter = slots_infos.find(n->id);
if (iter != slots_infos.end() && iter->second.size() > 0) {
if (node->role == kClusterMaster) {
auto iter = slots_infos.find(node->id);
if (iter != slots_infos.end() && !iter->second.empty()) {
node_str.append(" " + iter->second);
}
}

// Just for MYSELF node to show the importing/migrating slot
if (n->id == myid_) {
if (node->id == myid_) {
if (srv_->slot_migrator) {
auto migrating_slot = srv_->slot_migrator->GetMigratingSlot();
if (migrating_slot != -1) {
Expand All @@ -567,10 +567,10 @@ std::string Cluster::genNodesDescription() {
return nodes_desc;
}

std::map<std::string, std::string> Cluster::getClusterNodeSlots() const {
std::map<std::string, std::string, std::less<>> Cluster::getClusterNodeSlots() const {
int start = -1;
// node id => slots info string
std::map<std::string, std::string> slots_infos;
std::map<std::string, std::string, std::less<>> slots_infos;

std::shared_ptr<ClusterNode> n = nullptr;
for (int i = 0; i <= kClusterSlots; i++) {
Expand Down Expand Up @@ -600,30 +600,29 @@ std::map<std::string, std::string> Cluster::getClusterNodeSlots() const {
return slots_infos;
}

std::string Cluster::genNodesInfo() {
std::string Cluster::genNodesInfo() const {
auto slots_infos = getClusterNodeSlots();

std::string nodes_info;
for (const auto &item : nodes_) {
const std::shared_ptr<ClusterNode> &n = item.second;
for (const auto &[_, node] : nodes_) {
std::string node_str;
node_str.append("node ");
// ID
node_str.append(n->id + " ");
node_str.append(node->id + " ");
// Host + Port
node_str.append(fmt::format("{} {} ", n->host, n->port));
node_str.append(fmt::format("{} {} ", node->host, node->port));

// Role
if (n->role == kClusterMaster) {
if (node->role == kClusterMaster) {
node_str.append("master - ");
} else {
node_str.append("slave " + n->master_id + " ");
node_str.append("slave " + node->master_id + " ");
}

// Slots
if (n->role == kClusterMaster) {
auto iter = slots_infos.find(n->id);
if (iter != slots_infos.end() && iter->second.size() > 0) {
if (node->role == kClusterMaster) {
auto iter = slots_infos.find(node->id);
if (iter != slots_infos.end() && !iter->second.empty()) {
node_str.append(" " + iter->second);
}
}
Expand Down Expand Up @@ -694,7 +693,7 @@ Status Cluster::LoadClusterNodes(const std::string &file_path) {
Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes) {
std::vector<std::string> nodes_info = util::Split(nodes_str, "\n");
if (nodes_info.size() == 0) {
if (nodes_info.empty()) {
return {Status::ClusterInvalidInfo, errInvalidClusterNodeInfo};
}

Expand Down Expand Up @@ -803,16 +802,17 @@ Status Cluster::parseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return Status::OK();
}

bool Cluster::IsWriteForbiddenSlot(int slot) { return srv_->slot_migrator->GetForbiddenSlot() == slot; }
bool Cluster::IsWriteForbiddenSlot(int slot) const { return srv_->slot_migrator->GetForbiddenSlot() == slot; }

Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn) {
std::vector<int> keys_indexes;
auto s = redis::CommandTable::GetKeysFromCommand(attributes, cmd_tokens, &keys_indexes);

// No keys
if (!s.IsOK()) return Status::OK();
if (auto s = redis::CommandTable::GetKeysFromCommand(attributes, cmd_tokens, &keys_indexes); !s.IsOK())
return Status::OK();

if (keys_indexes.size() == 0) return Status::OK();
if (keys_indexes.empty()) return Status::OK();

int slot = -1;
for (auto i : keys_indexes) {
Expand Down
12 changes: 6 additions & 6 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
class ClusterNode {
public:
explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
std::bitset<kClusterSlots> slots);
const std::bitset<kClusterSlots> &slots);
std::string id;
std::string host;
int port;
Expand Down Expand Up @@ -81,7 +81,7 @@ class Cluster {
int64_t GetVersion() const { return version_; }
static bool IsValidSlot(int slot) { return slot >= 0 && slot < kClusterSlots; }
bool IsNotMaster();
bool IsWriteForbiddenSlot(int slot);
bool IsWriteForbiddenSlot(int slot) const;
Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector<std::string> &cmd_tokens,
redis::Connection *conn);
Status SetMasterSlaveRepl();
Expand All @@ -97,16 +97,16 @@ class Cluster {
private:
std::string getNodeIDBySlot(int slot) const;
std::string genNodesDescription();
std::string genNodesInfo();
std::map<std::string, std::string> getClusterNodeSlots() const;
std::string genNodesInfo() const;
std::map<std::string, std::string, std::less<>> getClusterNodeSlots() const;
SlotInfo genSlotNodeInfo(int start, int end, const std::shared_ptr<ClusterNode> &n);
static Status parseClusterNodes(const std::string &nodes_str, ClusterNodes *nodes,
std::unordered_map<int, std::string> *slots_nodes);
Server *srv_;
std::vector<std::string> binds_;
int port_;
int size_;
int64_t version_;
int size_ = 0;
int64_t version_ = -1;
std::string myid_;
std::shared_ptr<ClusterNode> myself_;
ClusterNodes nodes_;
Expand Down
2 changes: 0 additions & 2 deletions src/cluster/redis_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

#include "redis_slot.h"

#include <stdlib.h>

#include <algorithm>
#include <cstdlib>
#include <string>
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ void ReplicationThread::CallbacksStateMachine::ReadWriteCB(bufferevent *bev) {
assert(handler_idx_ <= handlers_.size());
DLOG(INFO) << "[replication] Execute handler[" << getHandlerName(handler_idx_) << "]";
auto st = getHandlerFunc(handler_idx_)(repl_, bev);
repl_->last_io_time_.store(util::GetTimeStamp(), std::memory_order_relaxed);
repl_->last_io_time_secs_.store(util::GetTimeStamp(), std::memory_order_relaxed);
switch (st) {
case CBState::NEXT:
++handler_idx_;
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
void Stop();
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
time_t LastIOTime() { return last_io_time_.load(std::memory_order_relaxed); }
int64_t LastIOTimeSecs() const { return last_io_time_secs_.load(std::memory_order_relaxed); }

void TimerCB(int, int16_t);

Expand Down Expand Up @@ -155,7 +155,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
Server *srv_ = nullptr;
engine::Storage *storage_ = nullptr;
std::atomic<ReplState> repl_state_;
std::atomic<time_t> last_io_time_ = 0;
std::atomic<int64_t> last_io_time_secs_ = 0;
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;

Expand Down
5 changes: 5 additions & 0 deletions src/commands/cmd_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,11 @@ class CommandJsonStrLen : public Commander {

Optionals<uint64_t> results;
auto s = json.StrLen(args_[1], path, &results);
if (s.IsNotFound()) {
*output = conn->NilString();
return Status::OK();
}

if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = OptionalsToString(conn, results);
Expand Down
Loading

0 comments on commit 61881f1

Please sign in to comment.