Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Beihao-Zhou committed Apr 29, 2024
1 parent 0981283 commit f9a4ef0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
36 changes: 32 additions & 4 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,14 @@ class CommandXClaim : public Commander {
stream_name_ = GET_OR_RET(parser.TakeStr());
group_name_ = GET_OR_RET(parser.TakeStr());
consumer_name_ = GET_OR_RET(parser.TakeStr());
min_idle_time_ = GET_OR_RET(parser.TakeInt<uint64_t>());
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
min_idle_time_ = parse_result.GetValue();
if (min_idle_time_ < 0) {
min_idle_time_ = 0;
}

while (parser.Good() && !isOption(parser.RawPeek())) {
auto raw_id = GET_OR_RET(parser.TakeStr());
Expand All @@ -267,13 +274,34 @@ class CommandXClaim : public Commander {

while (parser.Good()) {
if (parser.EatEqICase("idle")) {
stream_claim_options_.idle_time = GET_OR_RET(parser.TakeInt<uint64_t>());
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (parse_result.GetValue() < 0) {
return {Status::RedisParseErr, "IDLE for XCLAIM must be non-negative"};
}
stream_claim_options_.idle_time = parse_result.GetValue();
} else if (parser.EatEqICase("time")) {
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (parse_result.GetValue() < 0) {
return {Status::RedisParseErr, "TIME for XCLAIM must be non-negative"};
}
stream_claim_options_.with_time = true;
stream_claim_options_.last_delivery_time = GET_OR_RET(parser.TakeInt<uint64_t>());
stream_claim_options_.last_delivery_time = parse_result.GetValue();
} else if (parser.EatEqICase("retrycount")) {
auto parse_result = parser.TakeInt<int64_t>();
if (!parse_result.IsOK()) {
return {Status::RedisParseErr, errValueNotInteger};
}
if (parse_result.GetValue() < 0) {
return {Status::RedisParseErr, "RETRYCOUNT for XCLAIM must be non-negative"};
}
stream_claim_options_.with_retry_count = true;
stream_claim_options_.last_delivery_count = GET_OR_RET(parser.TakeInt<uint64_t>());
stream_claim_options_.last_delivery_count = parse_result.GetValue();
} else if (parser.EatEqICase("force")) {
stream_claim_options_.force = true;
} else if (parser.EatEqICase("justid")) {
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const std::str
pel_entry = decodeStreamPelEntryValue(value);
}

if (s.ok() || (!s.ok() && s.IsNotFound() && options.force)) {
if (s.ok() || (s.IsNotFound() && options.force)) {
if (now - pel_entry.last_delivery_time < min_idle_time) continue;

std::vector<std::string> values;
Expand Down Expand Up @@ -476,7 +476,7 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const std::str
}
}

if (options.with_last_id) {
if (options.with_last_id && options.last_delivered_id > group_metadata.last_delivered_id) {
group_metadata.last_delivered_id = options.last_delivered_id;
}

Expand Down

0 comments on commit f9a4ef0

Please sign in to comment.