diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index f883f6df0f1..7492171f358 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -253,7 +253,14 @@ class CommandXClaim : public Commander { stream_name_ = GET_OR_RET(parser.TakeStr()); group_name_ = GET_OR_RET(parser.TakeStr()); consumer_name_ = GET_OR_RET(parser.TakeStr()); - min_idle_time_ = GET_OR_RET(parser.TakeInt()); + auto parse_result = parser.TakeInt(); + if (!parse_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + min_idle_time_ = parse_result.GetValue(); + if (min_idle_time_ < 0) { + min_idle_time_ = 0; + } while (parser.Good() && !isOption(parser.RawPeek())) { auto raw_id = GET_OR_RET(parser.TakeStr()); @@ -267,13 +274,34 @@ class CommandXClaim : public Commander { while (parser.Good()) { if (parser.EatEqICase("idle")) { - stream_claim_options_.idle_time = GET_OR_RET(parser.TakeInt()); + auto parse_result = parser.TakeInt(); + if (!parse_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + if (parse_result.GetValue() < 0) { + return {Status::RedisParseErr, "IDLE for XCLAIM must be non-negative"}; + } + stream_claim_options_.idle_time = parse_result.GetValue(); } else if (parser.EatEqICase("time")) { + auto parse_result = parser.TakeInt(); + if (!parse_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + if (parse_result.GetValue() < 0) { + return {Status::RedisParseErr, "TIME for XCLAIM must be non-negative"}; + } stream_claim_options_.with_time = true; - stream_claim_options_.last_delivery_time = GET_OR_RET(parser.TakeInt()); + stream_claim_options_.last_delivery_time = parse_result.GetValue(); } else if (parser.EatEqICase("retrycount")) { + auto parse_result = parser.TakeInt(); + if (!parse_result.IsOK()) { + return {Status::RedisParseErr, errValueNotInteger}; + } + if (parse_result.GetValue() < 0) { + return {Status::RedisParseErr, "RETRYCOUNT for XCLAIM must be non-negative"}; + } stream_claim_options_.with_retry_count = true; - stream_claim_options_.last_delivery_count = GET_OR_RET(parser.TakeInt()); + stream_claim_options_.last_delivery_count = parse_result.GetValue(); } else if (parser.EatEqICase("force")) { stream_claim_options_.force = true; } else if (parser.EatEqICase("justid")) { diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 907de277fd3..49cad19fdf5 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -427,7 +427,7 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const std::str pel_entry = decodeStreamPelEntryValue(value); } - if (s.ok() || (!s.ok() && s.IsNotFound() && options.force)) { + if (s.ok() || (s.IsNotFound() && options.force)) { if (now - pel_entry.last_delivery_time < min_idle_time) continue; std::vector values; @@ -476,7 +476,7 @@ rocksdb::Status Stream::ClaimPelEntries(const Slice &stream_name, const std::str } } - if (options.with_last_id) { + if (options.with_last_id && options.last_delivered_id > group_metadata.last_delivered_id) { group_metadata.last_delivered_id = options.last_delivered_id; }