Skip to content

Commit

Permalink
Refactor TTL processing in string type (apache#2250)
Browse files Browse the repository at this point in the history
Co-authored-by: hulk <[email protected]>
  • Loading branch information
PragmaTwice and git-hulk authored Apr 15, 2024
1 parent 433dcfb commit aa7c745
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 94 deletions.
59 changes: 22 additions & 37 deletions src/commands/cmd_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ class CommandGetEx : public Commander {
CommandParser parser(args, 2);
std::string_view ttl_flag;
while (parser.Good()) {
if (auto v = GET_OR_RET(ParseTTL(parser, ttl_flag))) {
ttl_ = *v;
if (auto v = GET_OR_RET(ParseExpireFlags(parser, ttl_flag))) {
expire_ = *v;
} else if (parser.EatEqICaseFlag("PERSIST", ttl_flag)) {
persist_ = true;
expire_ = 0;
} else {
return parser.InvalidSyntax();
}
Expand All @@ -80,7 +80,7 @@ class CommandGetEx : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::string value;
redis::String string_db(srv->storage, conn->GetNamespace());
auto s = string_db.GetEx(args_[1], &value, ttl_, persist_);
auto s = string_db.GetEx(args_[1], &value, expire_);

// The IsInvalidArgument error means the key type maybe a bitmap
// which we need to fall back to the bitmap's GetString according
Expand All @@ -90,12 +90,8 @@ class CommandGetEx : public Commander {
uint32_t max_btos_size = static_cast<uint32_t>(config->max_bitmap_to_string_mb) * MiB;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
s = bitmap_db.GetString(args_[1], max_btos_size, &value);
if (s.ok()) {
if (ttl_ > 0) {
s = bitmap_db.Expire(args_[1], ttl_ + util::GetTimeStampMS());
} else if (persist_) {
s = bitmap_db.Expire(args_[1], 0);
}
if (s.ok() && expire_) {
s = bitmap_db.Expire(args_[1], expire_.value());
}
}
if (!s.ok() && !s.IsNotFound()) {
Expand All @@ -107,8 +103,7 @@ class CommandGetEx : public Commander {
}

private:
uint64_t ttl_ = 0;
bool persist_ = false;
std::optional<uint64_t> expire_;
};

class CommandStrlen : public Commander {
Expand Down Expand Up @@ -282,8 +277,8 @@ class CommandSet : public Commander {
CommandParser parser(args, 3);
std::string_view ttl_flag, set_flag;
while (parser.Good()) {
if (auto v = GET_OR_RET(ParseTTL(parser, ttl_flag))) {
ttl_ = *v;
if (auto v = GET_OR_RET(ParseExpireFlags(parser, ttl_flag))) {
expire_ = *v;
} else if (parser.EatEqICaseFlag("KEEPTTL", ttl_flag)) {
keep_ttl_ = true;
} else if (parser.EatEqICaseFlag("NX", set_flag)) {
Expand All @@ -304,17 +299,7 @@ class CommandSet : public Commander {
std::optional<std::string> ret;
redis::String string_db(srv->storage, conn->GetNamespace());

if (ttl_ < 0) {
auto s = string_db.Del(args_[1]);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::SimpleString("OK");
return Status::OK();
}

rocksdb::Status s;
s = string_db.Set(args_[1], args_[2], {ttl_, set_flag_, get_, keep_ttl_}, ret);
rocksdb::Status s = string_db.Set(args_[1], args_[2], {expire_, set_flag_, get_, keep_ttl_}, ret);

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
Expand All @@ -337,7 +322,7 @@ class CommandSet : public Commander {
}

private:
uint64_t ttl_ = 0;
uint64_t expire_ = 0;
bool get_ = false;
bool keep_ttl_ = false;
StringSetType set_flag_ = StringSetType::NONE;
Expand All @@ -353,20 +338,20 @@ class CommandSetEX : public Commander {

if (*parse_result <= 0) return {Status::RedisParseErr, errInvalidExpireTime};

ttl_ = *parse_result;
expire_ = *parse_result * 1000 + util::GetTimeStampMS();

return Commander::Parse(args);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::String string_db(srv->storage, conn->GetNamespace());
auto s = string_db.SetEX(args_[1], args_[3], ttl_ * 1000);
auto s = string_db.SetEX(args_[1], args_[3], expire_);
*output = redis::SimpleString("OK");
return Status::OK();
}

private:
uint64_t ttl_ = 0;
uint64_t expire_ = 0;
};

class CommandPSetEX : public Commander {
Expand All @@ -379,20 +364,20 @@ class CommandPSetEX : public Commander {

if (*ttl_ms <= 0) return {Status::RedisParseErr, errInvalidExpireTime};

ttl_ = *ttl_ms;
expire_ = *ttl_ms + util::GetTimeStampMS();

return Commander::Parse(args);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::String string_db(srv->storage, conn->GetNamespace());
auto s = string_db.SetEX(args_[1], args_[3], ttl_);
auto s = string_db.SetEX(args_[1], args_[3], expire_);
*output = redis::SimpleString("OK");
return Status::OK();
}

private:
int64_t ttl_ = 0;
uint64_t expire_ = 0;
};

class CommandMSet : public Commander {
Expand All @@ -412,7 +397,7 @@ class CommandMSet : public Commander {
kvs.emplace_back(StringPair{args_[i], args_[i + 1]});
}

auto s = string_db.MSet(kvs);
auto s = string_db.MSet(kvs, 0);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -581,8 +566,8 @@ class CommandCAS : public Commander {
CommandParser parser(args, 4);
std::string_view flag;
while (parser.Good()) {
if (auto v = GET_OR_RET(ParseTTL(parser, flag))) {
ttl_ = *v;
if (auto v = GET_OR_RET(ParseExpireFlags(parser, flag))) {
expire_ = *v;
} else {
return parser.InvalidSyntax();
}
Expand All @@ -593,7 +578,7 @@ class CommandCAS : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::String string_db(srv->storage, conn->GetNamespace());
int ret = 0;
auto s = string_db.CAS(args_[1], args_[2], args_[3], ttl_, &ret);
auto s = string_db.CAS(args_[1], args_[2], args_[3], expire_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand All @@ -603,7 +588,7 @@ class CommandCAS : public Commander {
}

private:
uint64_t ttl_ = 0;
uint64_t expire_ = 0;
};

class CommandCAD : public Commander {
Expand Down
10 changes: 5 additions & 5 deletions src/commands/ttl_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ template <typename T>
constexpr auto TTL_RANGE = NumericRange<T>{1, std::numeric_limits<T>::max()};

template <typename T>
StatusOr<std::optional<int64_t>> ParseTTL(CommandParser<T> &parser, std::string_view &curr_flag) {
StatusOr<std::optional<uint64_t>> ParseExpireFlags(CommandParser<T> &parser, std::string_view &curr_flag) {
if (parser.EatEqICaseFlag("EX", curr_flag)) {
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000;
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000 + util::GetTimeStampMS();
} else if (parser.EatEqICaseFlag("EXAT", curr_flag)) {
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000 - util::GetTimeStampMS();
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) * 1000;
} else if (parser.EatEqICaseFlag("PX", curr_flag)) {
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>));
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) + util::GetTimeStampMS();
} else if (parser.EatEqICaseFlag("PXAT", curr_flag)) {
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>)) - util::GetTimeStampMS();
return GET_OR_RET(parser.template TakeInt<int64_t>(TTL_RANGE<int64_t>));
} else {
return std::nullopt;
}
Expand Down
6 changes: 5 additions & 1 deletion src/storage/rdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,11 @@ Status RDB::saveRdbObject(int type, const std::string &key, const RedisObjValue
if (type == RDBTypeString) {
const auto &value = std::get<std::string>(obj);
redis::String string_db(storage_, ns_);
db_status = string_db.SetEX(key, value, ttl_ms);
uint64_t expire = 0;
if (ttl_ms > 0) {
expire = ttl_ms + util::GetTimeStampMS();
}
db_status = string_db.SetEX(key, value, expire);
} else if (type == RDBTypeSet || type == RDBTypeSetIntSet || type == RDBTypeSetListPack) {
const auto &members = std::get<std::vector<std::string>>(obj);
redis::Set set_db(storage_, ns_);
Expand Down
51 changes: 17 additions & 34 deletions src/types/redis_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,7 @@ rocksdb::Status String::Get(const std::string &user_key, std::string *value) {
return getValue(ns_key, value);
}

rocksdb::Status String::GetEx(const std::string &user_key, std::string *value, uint64_t ttl, bool persist) {
uint64_t expire = 0;
if (ttl > 0) {
uint64_t now = util::GetTimeStampMS();
expire = now + ttl;
}
rocksdb::Status String::GetEx(const std::string &user_key, std::string *value, std::optional<uint64_t> expire) {
std::string ns_key = AppendNamespacePrefix(user_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
Expand All @@ -162,8 +157,8 @@ rocksdb::Status String::GetEx(const std::string &user_key, std::string *value, u

std::string raw_data;
Metadata metadata(kRedisString, false);
if (ttl > 0 || persist) {
metadata.expire = expire;
if (expire.has_value()) {
metadata.expire = expire.value();
} else {
// If there is no ttl or persist is false, then skip the following updates.
return rocksdb::Status::OK();
Expand All @@ -181,7 +176,7 @@ rocksdb::Status String::GetEx(const std::string &user_key, std::string *value, u

rocksdb::Status String::GetSet(const std::string &user_key, const std::string &new_value,
std::optional<std::string> &old_value) {
auto s = Set(user_key, new_value, {/*ttl=*/0, StringSetType::NONE, /*get=*/true, /*keep_ttl=*/false}, old_value);
auto s = Set(user_key, new_value, {/*expire=*/0, StringSetType::NONE, /*get=*/true, /*keep_ttl=*/false}, old_value);
return s;
}
rocksdb::Status String::GetDel(const std::string &user_key, std::string *value) {
Expand All @@ -196,7 +191,7 @@ rocksdb::Status String::GetDel(const std::string &user_key, std::string *value)

rocksdb::Status String::Set(const std::string &user_key, const std::string &value) {
std::vector<StringPair> pairs{StringPair{user_key, value}};
return MSet(pairs, /*ttl=*/0, /*lock=*/true);
return MSet(pairs, /*expire=*/0, /*lock=*/true);
}

rocksdb::Status String::Set(const std::string &user_key, const std::string &value, StringSetArgs args,
Expand Down Expand Up @@ -247,9 +242,8 @@ rocksdb::Status String::Set(const std::string &user_key, const std::string &valu
}

// Handle expire time
if (args.ttl > 0) {
uint64_t now = util::GetTimeStampMS();
expire = now + args.ttl;
if (!args.keep_ttl) {
expire = args.expire;
}

// Create new value
Expand All @@ -261,21 +255,21 @@ rocksdb::Status String::Set(const std::string &user_key, const std::string &valu
return updateRawValue(ns_key, new_raw_value);
}

rocksdb::Status String::SetEX(const std::string &user_key, const std::string &value, uint64_t ttl) {
rocksdb::Status String::SetEX(const std::string &user_key, const std::string &value, uint64_t expire) {
std::optional<std::string> ret;
return Set(user_key, value, {ttl, StringSetType::NONE, /*get=*/false, /*keep_ttl=*/false}, ret);
return Set(user_key, value, {expire, StringSetType::NONE, /*get=*/false, /*keep_ttl=*/false}, ret);
}

rocksdb::Status String::SetNX(const std::string &user_key, const std::string &value, uint64_t ttl, bool *flag) {
rocksdb::Status String::SetNX(const std::string &user_key, const std::string &value, uint64_t expire, bool *flag) {
std::optional<std::string> ret;
auto s = Set(user_key, value, {ttl, StringSetType::NX, /*get=*/false, /*keep_ttl=*/false}, ret);
auto s = Set(user_key, value, {expire, StringSetType::NX, /*get=*/false, /*keep_ttl=*/false}, ret);
*flag = ret.has_value();
return s;
}

rocksdb::Status String::SetXX(const std::string &user_key, const std::string &value, uint64_t ttl, bool *flag) {
rocksdb::Status String::SetXX(const std::string &user_key, const std::string &value, uint64_t expire, bool *flag) {
std::optional<std::string> ret;
auto s = Set(user_key, value, {ttl, StringSetType::XX, /*get=*/false, /*keep_ttl=*/false}, ret);
auto s = Set(user_key, value, {expire, StringSetType::XX, /*get=*/false, /*keep_ttl=*/false}, ret);
*flag = ret.has_value();
return s;
}
Expand Down Expand Up @@ -390,13 +384,7 @@ rocksdb::Status String::IncrByFloat(const std::string &user_key, double incremen
return updateRawValue(ns_key, raw_value);
}

rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, uint64_t ttl, bool lock) {
uint64_t expire = 0;
if (ttl > 0) {
uint64_t now = util::GetTimeStampMS();
expire = now + ttl;
}

rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, uint64_t expire, bool lock) {
// Data race, key string maybe overwrite by other key while didn't lock the keys here,
// to improve the set performance
std::optional<MultiLockGuard> guard;
Expand Down Expand Up @@ -425,7 +413,7 @@ rocksdb::Status String::MSet(const std::vector<StringPair> &pairs, uint64_t ttl,
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t ttl, bool *flag) {
rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t expire, bool *flag) {
*flag = false;

int exists = 0;
Expand All @@ -447,7 +435,7 @@ rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t tt
return rocksdb::Status::OK();
}

rocksdb::Status s = MSet(pairs, /*ttl=*/ttl, /*lock=*/false);
rocksdb::Status s = MSet(pairs, /*expire=*/expire, /*lock=*/false);
if (!s.ok()) return s;

*flag = true;
Expand All @@ -460,7 +448,7 @@ rocksdb::Status String::MSetNX(const std::vector<StringPair> &pairs, uint64_t tt
// -1 if the user_key does not exist
// 0 if the operation fails
rocksdb::Status String::CAS(const std::string &user_key, const std::string &old_value, const std::string &new_value,
uint64_t ttl, int *flag) {
uint64_t expire, int *flag) {
*flag = 0;

std::string current_value;
Expand All @@ -480,12 +468,7 @@ rocksdb::Status String::CAS(const std::string &user_key, const std::string &old_

if (old_value == current_value) {
std::string raw_value;
uint64_t expire = 0;
Metadata metadata(kRedisString, false);
if (ttl > 0) {
uint64_t now = util::GetTimeStampMS();
expire = now + ttl;
}
metadata.expire = expire;
metadata.Encode(&raw_value);
raw_value.append(new_value);
Expand Down
16 changes: 8 additions & 8 deletions src/types/redis_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ struct StringPair {
enum class StringSetType { NONE, NX, XX };

struct StringSetArgs {
uint64_t ttl;
uint64_t expire;
StringSetType type;
bool get;
bool keep_ttl;
Expand Down Expand Up @@ -78,24 +78,24 @@ class String : public Database {
explicit String(engine::Storage *storage, const std::string &ns) : Database(storage, ns) {}
rocksdb::Status Append(const std::string &user_key, const std::string &value, uint64_t *new_size);
rocksdb::Status Get(const std::string &user_key, std::string *value);
rocksdb::Status GetEx(const std::string &user_key, std::string *value, uint64_t ttl, bool persist);
rocksdb::Status GetEx(const std::string &user_key, std::string *value, std::optional<uint64_t> expire);
rocksdb::Status GetSet(const std::string &user_key, const std::string &new_value,
std::optional<std::string> &old_value);
rocksdb::Status GetDel(const std::string &user_key, std::string *value);
rocksdb::Status Set(const std::string &user_key, const std::string &value);
rocksdb::Status Set(const std::string &user_key, const std::string &value, StringSetArgs args,
std::optional<std::string> &ret);
rocksdb::Status SetEX(const std::string &user_key, const std::string &value, uint64_t ttl);
rocksdb::Status SetNX(const std::string &user_key, const std::string &value, uint64_t ttl, bool *flag);
rocksdb::Status SetXX(const std::string &user_key, const std::string &value, uint64_t ttl, bool *flag);
rocksdb::Status SetEX(const std::string &user_key, const std::string &value, uint64_t expire);
rocksdb::Status SetNX(const std::string &user_key, const std::string &value, uint64_t expire, bool *flag);
rocksdb::Status SetXX(const std::string &user_key, const std::string &value, uint64_t expire, bool *flag);
rocksdb::Status SetRange(const std::string &user_key, size_t offset, const std::string &value, uint64_t *new_size);
rocksdb::Status IncrBy(const std::string &user_key, int64_t increment, int64_t *new_value);
rocksdb::Status IncrByFloat(const std::string &user_key, double increment, double *new_value);
std::vector<rocksdb::Status> MGet(const std::vector<Slice> &keys, std::vector<std::string> *values);
rocksdb::Status MSet(const std::vector<StringPair> &pairs, uint64_t ttl = 0, bool lock = true);
rocksdb::Status MSetNX(const std::vector<StringPair> &pairs, uint64_t ttl, bool *flag);
rocksdb::Status MSet(const std::vector<StringPair> &pairs, uint64_t expire, bool lock = true);
rocksdb::Status MSetNX(const std::vector<StringPair> &pairs, uint64_t expire, bool *flag);
rocksdb::Status CAS(const std::string &user_key, const std::string &old_value, const std::string &new_value,
uint64_t ttl, int *flag);
uint64_t expire, int *flag);
rocksdb::Status CAD(const std::string &user_key, const std::string &value, int *flag);
rocksdb::Status LCS(const std::string &user_key1, const std::string &user_key2, StringLCSArgs args,
StringLCSResult *rst);
Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/iterator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ TEST_F(WALIteratorTest, BasicString) {
auto start_seq = storage_->GetDB()->GetLatestSequenceNumber();
redis::String string(storage_.get(), "test_ns0");
string.Set("a", "1");
string.MSet({{"b", "2"}, {"c", "3"}});
string.MSet({{"b", "2"}, {"c", "3"}}, 0);
ASSERT_TRUE(string.Del("b").ok());

std::vector<std::string> put_keys, delete_keys;
Expand Down
Loading

0 comments on commit aa7c745

Please sign in to comment.