From 0ebc1a11e1f8cbd06a2031e59f92f00e7e8a5497 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 23 Oct 2024 21:21:46 +0300 Subject: [PATCH] chore: pass SinkReplyBuilder and Transaction explicitly. Part4 (#3967) --- src/server/bitops_family.cc | 110 +++++++++++----------- src/server/hset_family.cc | 181 ++++++++++++++++++------------------ src/server/hset_family.h | 39 ++++---- 3 files changed, 164 insertions(+), 166 deletions(-) diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index c194392195b9..2145f9179cca 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -39,13 +39,13 @@ using BitsStrVec = vector; // The following is the list of the functions that would handle the // commands that handle the bit operations -void BitPos(CmdArgList args, ConnectionContext* cntx); -void BitCount(CmdArgList args, ConnectionContext* cntx); -void BitField(CmdArgList args, ConnectionContext* cntx); -void BitFieldRo(CmdArgList args, ConnectionContext* cntx); -void BitOp(CmdArgList args, ConnectionContext* cntx); -void GetBit(CmdArgList args, ConnectionContext* cntx); -void SetBit(CmdArgList args, ConnectionContext* cntx); +void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); +void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); +void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); +void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); +void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); +void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); +void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); OpResult ReadValue(const DbContext& context, string_view key, EngineShard* shard); OpResult ReadValueBitsetAt(const OpArgs& op_args, string_view key, uint32_t offset); @@ -489,22 +489,23 @@ OpResult RunBitOpOnShard(string_view op, const OpArgs& op_args, ShardArg return op_result; } -template void HandleOpValueResult(const OpResult& result, ConnectionContext* cntx) { +template +void HandleOpValueResult(const OpResult& result, SinkReplyBuilder* builder) { static_assert(std::is_integral::value, "we are only handling types that are integral types in the return types from " "here"); if (result) { - cntx->SendLong(result.value()); + builder->SendLong(result.value()); } else { switch (result.status()) { case OpStatus::WRONG_TYPE: - cntx->SendError(kWrongTypeErr); + builder->SendError(kWrongTypeErr); break; case OpStatus::OUT_OF_MEMORY: - cntx->SendError(kOutOfMemory); + builder->SendError(kOutOfMemory); break; default: - cntx->SendLong(0); // in case we don't have the value we should just send 0 + builder->SendLong(0); // in case we don't have the value we should just send 0 break; } } @@ -512,12 +513,12 @@ template void HandleOpValueResult(const OpResult& result, Connec // ------------------------------------------------------------------------- // // Impl for the command functions -void BitPos(CmdArgList args, ConnectionContext* cntx) { +void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { // Support for the command BITPOS // See details at https://redis.io/commands/bitpos/ if (args.size() < 1 || args.size() > 5) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } string_view key = ArgS(args, 0); @@ -528,19 +529,19 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) { bool as_bit = false; if (!absl::SimpleAtoi(ArgS(args, 1), &value)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } else if (value != 0 && value != 1) { - return cntx->SendError("The bit argument must be 1 or 0"); + return builder->SendError("The bit argument must be 1 or 0"); } if (args.size() >= 3) { if (!absl::SimpleAtoi(ArgS(args, 2), &start)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } if (args.size() >= 4) { if (!absl::SimpleAtoi(ArgS(args, 3), &end)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } if (args.size() >= 5) { @@ -550,7 +551,7 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) { } else if (arg == "BYTE") { as_bit = false; } else { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } } } @@ -559,12 +560,11 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { return FindFirstBitWithValue(t->GetOpArgs(shard), key, value, start, end, as_bit); }; - Transaction* trans = cntx->transaction; - OpResult res = trans->ScheduleSingleHopT(std::move(cb)); - HandleOpValueResult(res, cntx); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); + HandleOpValueResult(res, builder); } -void BitCount(CmdArgList args, ConnectionContext* cntx) { +void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { // Support for the command BITCOUNT // See details at https://redis.io/commands/bitcount/ // Please note that if the key don't exists, it would return 0 @@ -579,14 +579,13 @@ void BitCount(CmdArgList args, ConnectionContext* cntx) { bool as_bit = parser.HasNext() ? parser.MapNext("BYTE", false, "BIT", true) : false; if (!parser.Finalize()) { - return cntx->SendError(parser.Error()->MakeReply()); + return builder->SendError(parser.Error()->MakeReply()); } auto cb = [&](Transaction* t, EngineShard* shard) { return CountBitsForValue(t->GetOpArgs(shard), key, start, end, as_bit); }; - Transaction* trans = cntx->transaction; - OpResult res = trans->ScheduleSingleHopT(std::move(cb)); - HandleOpValueResult(res, cntx); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); + HandleOpValueResult(res, builder); } // GCC yields a wrong warning about uninitialized optional use @@ -1063,8 +1062,8 @@ nonstd::expected ParseToCommandList(CmdArgList args, bool r return result; } -void SendResults(const vector& results, ConnectionContext* cntx) { - auto* rb = static_cast(cntx->reply_builder()); +void SendResults(const vector& results, SinkReplyBuilder* builder) { + auto* rb = static_cast(builder); const size_t total = results.size(); if (total == 0) { rb->SendNullArray(); @@ -1082,9 +1081,9 @@ void SendResults(const vector& results, ConnectionContext* cntx) { } } -void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) { +void BitFieldGeneric(CmdArgList args, bool read_only, Transaction* tx, SinkReplyBuilder* builder) { if (args.size() == 1) { - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->SendNullArray(); return; } @@ -1092,7 +1091,7 @@ void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) { auto maybe_ops_list = ParseToCommandList(args.subspan(1), read_only); if (!maybe_ops_list.has_value()) { - cntx->SendError(maybe_ops_list.error()); + builder->SendError(maybe_ops_list.error()); return; } CommandList cmd_list = std::move(maybe_ops_list.value()); @@ -1102,30 +1101,29 @@ void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) { return executor.Execute(cmd_list); }; - Transaction* trans = cntx->transaction; - OpResult> res = trans->ScheduleSingleHopT(std::move(cb)); + OpResult> res = tx->ScheduleSingleHopT(std::move(cb)); if (res == OpStatus::WRONG_TYPE) { - cntx->SendError(kWrongTypeErr); + builder->SendError(kWrongTypeErr); return; } - SendResults(*res, cntx); + SendResults(*res, builder); } -void BitField(CmdArgList args, ConnectionContext* cntx) { - BitFieldGeneric(args, false, cntx); +void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + BitFieldGeneric(args, false, tx, builder); } -void BitFieldRo(CmdArgList args, ConnectionContext* cntx) { - BitFieldGeneric(args, true, cntx); +void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + BitFieldGeneric(args, true, tx, builder); } #ifndef __clang__ #pragma GCC diagnostic pop #endif -void BitOp(CmdArgList args, ConnectionContext* cntx) { +void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { static const std::array BITOP_OP_NAMES{OR_OP_NAME, XOR_OP_NAME, AND_OP_NAME, NOT_OP_NAME}; string op = absl::AsciiStrToUpper(ArgS(args, 0)); @@ -1134,7 +1132,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { [&op](auto val) { return op == val; }); if (illegal || (op == NOT_OP_NAME && args.size() > 3)) { - return cntx->SendError(kSyntaxErr); // too many arguments + return builder->SendError(kSyntaxErr); // too many arguments } // Multi shard access - read only @@ -1157,13 +1155,13 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do + tx->Execute(std::move(shard_bitop), false); // we still have more work to do // All result from each shard const auto joined_results = CombineResultOp(result_set, op); // Second phase - save to target key if successful if (!joined_results) { - cntx->transaction->Conclude(); - cntx->SendError(joined_results.status()); + tx->Conclude(); + builder->SendError(joined_results.status()); return; } else { auto op_result = joined_results.value(); @@ -1193,12 +1191,12 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->Execute(std::move(store_cb), true); - cntx->SendLong(op_result.size()); + tx->Execute(std::move(store_cb), true); + builder->SendLong(op_result.size()); } } -void GetBit(CmdArgList args, ConnectionContext* cntx) { +void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { // Support for the command "GETBIT key offset" // see https://redis.io/commands/getbit/ @@ -1206,17 +1204,16 @@ void GetBit(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); if (!absl::SimpleAtoi(ArgS(args, 1), &offset)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } auto cb = [&](Transaction* t, EngineShard* shard) { return ReadValueBitsetAt(t->GetOpArgs(shard), key, offset); }; - Transaction* trans = cntx->transaction; - OpResult res = trans->ScheduleSingleHopT(std::move(cb)); - HandleOpValueResult(res, cntx); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); + HandleOpValueResult(res, builder); } -void SetBit(CmdArgList args, ConnectionContext* cntx) { +void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { // Support for the command "SETBIT key offset new_value" // see https://redis.io/commands/setbit/ @@ -1224,16 +1221,15 @@ void SetBit(CmdArgList args, ConnectionContext* cntx) { auto [key, offset, value] = parser.Next>(); if (auto err = parser.Error(); err) { - return cntx->SendError(err->MakeReply()); + return builder->SendError(err->MakeReply()); } auto cb = [&](Transaction* t, EngineShard* shard) { return BitNewValue(t->GetOpArgs(shard), key, offset, value != 0); }; - Transaction* trans = cntx->transaction; - OpResult res = trans->ScheduleSingleHopT(std::move(cb)); - HandleOpValueResult(res, cntx); + OpResult res = tx->ScheduleSingleHopT(std::move(cb)); + HandleOpValueResult(res, builder); } // ------------------------------------------------------------------------- // diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 8c55fe8eff3c..29f408904dfe 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -708,22 +708,22 @@ OpResult OpSet(const OpArgs& op_args, string_view key, CmdArgList valu return created; } -void HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t getall_mask) { +void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { return OpGetAll(t->GetOpArgs(shard), key, getall_mask); }; - OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult> result = tx->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); if (result) { bool is_map = (getall_mask == (VALUES | FIELDS)); rb->SendStringArr(absl::Span{*result}, is_map ? RedisReplyBuilder::MAP : RedisReplyBuilder::ARRAY); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } @@ -745,7 +745,7 @@ OpResult> OpHExpire(const OpArgs& op_args, string_view key, uint32_ } // HSETEX key [NX] tll_sec field value field value ... -void HSetEx(CmdArgList args, ConnectionContext* cntx) { +void HSetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { CmdArgParser parser{args}; string_view key = parser.Next(); @@ -757,13 +757,13 @@ void HSetEx(CmdArgList args, ConnectionContext* cntx) { constexpr uint32_t kMaxTtl = (1UL << 26); if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } CmdArgList fields = parser.Tail(); if (fields.size() % 2 != 0) { - return cntx->SendError(facade::WrongNumArgsError(cntx->cid->name()), kSyntaxErrType); + return builder->SendError(facade::WrongNumArgsError(cntx->cid->name()), kSyntaxErrType); } OpSetParams op_sp{skip_if_exists, ttl_sec}; @@ -772,17 +772,17 @@ void HSetEx(CmdArgList args, ConnectionContext* cntx) { return OpSet(t->GetOpArgs(shard), key, fields, op_sp); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { - cntx->SendLong(*result); + builder->SendLong(*result); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } } // namespace -void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -790,28 +790,28 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) { return OpDel(t->GetOpArgs(shard), key, args); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result || result.status() == OpStatus::KEY_NOTFOUND) { - cntx->SendLong(*result); + builder->SendLong(*result); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { - cntx->SendLong(*result); + builder->SendLong(*result); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view field = ArgS(args, 1); @@ -819,46 +819,46 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) { return OpExist(t->GetOpArgs(shard), key, field); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { - cntx->SendLong(*result); + builder->SendLong(*result); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HExpire(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view key = parser.Next(); string_view ttl_str = parser.Next(); uint32_t ttl_sec; constexpr uint32_t kMaxTtl = (1UL << 26); if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } if (!static_cast(parser.Check("FIELDS"sv))) { - return cntx->SendError("Mandatory argument FIELDS is missing or not at the right position", - kSyntaxErrType); + return builder->SendError("Mandatory argument FIELDS is missing or not at the right position", + kSyntaxErrType); } string_view numFieldsStr = parser.Next(); uint32_t numFields; if (!absl::SimpleAtoi(numFieldsStr, &numFields) || numFields == 0) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } CmdArgList fields = parser.Tail(); if (fields.size() != numFields) { - return cntx->SendError("The `numfields` parameter must match the number of arguments", - kSyntaxErrType); + return builder->SendError("The `numfields` parameter must match the number of arguments", + kSyntaxErrType); } auto cb = [&](Transaction* t, EngineShard* shard) { return OpHExpire(t->GetOpArgs(shard), key, ttl_sec, fields); }; - OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + OpResult> result = tx->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); if (result) { rb->StartArray(result->size()); const auto& array = result.value(); @@ -866,11 +866,11 @@ void HSetFamily::HExpire(CmdArgList args, ConnectionContext* cntx) { rb->SendLong(v); } } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -878,12 +878,12 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { return OpHMGet(t->GetOpArgs(shard), key, args); }; - OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult> result = tx->ScheduleSingleHopT(std::move(cb)); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); if (result) { - SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder()); - auto* rb = static_cast(cntx->reply_builder()); + SinkReplyBuilder::ReplyAggregator agg(builder); + auto* rb = static_cast(builder); rb->StartArray(result->size()); for (const auto& val : *result) { if (val) { @@ -893,18 +893,18 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { } } } else if (result.status() == OpStatus::KEY_NOTFOUND) { - SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder()); + SinkReplyBuilder::ReplyAggregator agg(builder); rb->StartArray(args.size()); for (unsigned i = 0; i < args.size(); ++i) { rb->SendNull(); } } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view field = ArgS(args, 1); @@ -912,27 +912,27 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) { return OpGet(t->GetOpArgs(shard), key, field); }; - auto* rb = static_cast(cntx->reply_builder()); - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { rb->SendBulkString(*result); } else { if (result.status() == OpStatus::KEY_NOTFOUND) { rb->SendNull(); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } } -void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view field = ArgS(args, 1); string_view incrs = ArgS(args, 2); int64_t ival = 0; if (!absl::SimpleAtoi(incrs, &ival)) { - return cntx->SendError(kInvalidIntErr); + return builder->SendError(kInvalidIntErr); } IncrByParam param{ival}; @@ -941,33 +941,33 @@ void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) { return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OK) { - cntx->SendLong(get(param)); + builder->SendLong(get(param)); } else { switch (status) { case OpStatus::INVALID_VALUE: - cntx->SendError("hash value is not an integer"); + builder->SendError("hash value is not an integer"); break; case OpStatus::OUT_OF_RANGE: - cntx->SendError(kIncrOverflow); + builder->SendError(kIncrOverflow); break; default: - cntx->SendError(status); + builder->SendError(status); break; } } } -void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HIncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view field = ArgS(args, 1); string_view incrs = ArgS(args, 2); double dval = 0; if (!absl::SimpleAtod(incrs, &dval)) { - return cntx->SendError(kInvalidFloatErr); + return builder->SendError(kInvalidFloatErr); } IncrByParam param{dval}; @@ -976,55 +976,55 @@ void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) { return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m); }; - OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); + OpStatus status = tx->ScheduleSingleHop(std::move(cb)); if (status == OpStatus::OK) { - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->SendDouble(get(param)); } else { switch (status) { case OpStatus::INVALID_VALUE: - cntx->SendError("hash value is not a float"); + builder->SendError("hash value is not a float"); break; default: - cntx->SendError(status); + builder->SendError(status); break; } } } -void HSetFamily::HKeys(CmdArgList args, ConnectionContext* cntx) { - HGetGeneric(args, cntx, FIELDS); +void HSetFamily::HKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + HGetGeneric(args, FIELDS, tx, builder); } -void HSetFamily::HVals(CmdArgList args, ConnectionContext* cntx) { - HGetGeneric(args, cntx, VALUES); +void HSetFamily::HVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + HGetGeneric(args, VALUES, tx, builder); } -void HSetFamily::HGetAll(CmdArgList args, ConnectionContext* cntx) { - HGetGeneric(args, cntx, GetAllMode::FIELDS | GetAllMode::VALUES); +void HSetFamily::HGetAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + HGetGeneric(args, GetAllMode::FIELDS | GetAllMode::VALUES, tx, builder); } -void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { std::string_view key = ArgS(args, 0); std::string_view token = ArgS(args, 1); uint64_t cursor = 0; if (!absl::SimpleAtoi(token, &cursor)) { - return cntx->SendError("invalid cursor"); + return builder->SendError("invalid cursor"); } // HSCAN key cursor [MATCH pattern] [COUNT count] if (args.size() > 6) { DVLOG(1) << "got " << args.size() << " this is more than it should be"; - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } OpResult ops = ScanOpts::TryFrom(args.subspan(2)); if (!ops) { DVLOG(1) << "HScan invalid args - return " << ops << " to the user"; - return cntx->SendError(ops.status()); + return builder->SendError(ops.status()); } ScanOpts scan_op = ops.value(); @@ -1033,8 +1033,8 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) { return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op); }; - auto* rb = static_cast(cntx->reply_builder()); - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result.status() != OpStatus::WRONG_TYPE) { rb->StartArray(2); rb->SendBulkString(absl::StrCat(cursor)); @@ -1043,17 +1043,18 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) { rb->SendBulkString(k); } } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string_view key = ArgS(args, 0); string_view cmd{cntx->cid->name()}; if (args.size() % 2 != 1) { - return cntx->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType); + return builder->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType); } args.remove_prefix(1); @@ -1061,16 +1062,16 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) { return OpSet(t->GetOpArgs(shard), key, args); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result && cmd == "HSET") { - cntx->SendLong(*result); + builder->SendLong(*result); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); args.remove_prefix(1); @@ -1078,15 +1079,15 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) { return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{.skip_if_exists = true}); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { - cntx->SendLong(*result); + builder->SendLong(*result); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } -void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HStrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view key = ArgS(args, 0); string_view field = ArgS(args, 1); @@ -1094,11 +1095,11 @@ void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) { return OpStrLen(t->GetOpArgs(shard), key, field); }; - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { - cntx->SendLong(*result); + builder->SendLong(*result); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } @@ -1110,10 +1111,10 @@ void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) { str_vec.emplace_back(absl::StrCat(lp.lval)); } -void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { +void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { if (args.size() > 3) { DVLOG(1) << "Wrong number of command arguments: " << args.size(); - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } string_view key = ArgS(args, 0); @@ -1121,19 +1122,19 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { bool with_values = false; if ((args.size() > 1) && (!SimpleAtoi(ArgS(args, 1), &count))) { - return cntx->SendError("count value is not an integer", kSyntaxErrType); + return builder->SendError("count value is not an integer", kSyntaxErrType); } if (args.size() == 3) { string arg = absl::AsciiStrToUpper(ArgS(args, 2)); if (arg != "WITHVALUES") - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); else with_values = true; } auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto& db_slice = cntx->ns->GetDbSlice(shard->shard_id()); + auto& db_slice = t->GetDbSlice(shard->shard_id()); DbContext db_context = t->GetDbContext(); auto it_res = db_slice.FindReadOnly(db_context, key, OBJ_HASH); @@ -1213,8 +1214,8 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { return str_vec; }; - auto* rb = static_cast(cntx->reply_builder()); - OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + auto* rb = static_cast(builder); + OpResult result = tx->ScheduleSingleHopT(std::move(cb)); if (result) { if ((result->size() == 1) && (args.size() == 1)) rb->SendBulkString(result->front()); @@ -1226,7 +1227,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { else rb->SendEmptyArray(); } else { - cntx->SendError(result.status()); + builder->SendError(result.status()); } } diff --git a/src/server/hset_family.h b/src/server/hset_family.h index 296051171607..f56ae254c7ad 100644 --- a/src/server/hset_family.h +++ b/src/server/hset_family.h @@ -12,9 +12,9 @@ namespace dfly { -class ConnectionContext; class CommandRegistry; class StringMap; +class Transaction; using facade::OpResult; using facade::OpStatus; @@ -34,24 +34,25 @@ class HSetFamily { PrimeValue* pv); private: - // TODO: to move it to anonymous namespace in cc file. - - static void HExpire(CmdArgList args, ConnectionContext* cntx); - static void HDel(CmdArgList args, ConnectionContext* cntx); - static void HLen(CmdArgList args, ConnectionContext* cntx); - static void HExists(CmdArgList args, ConnectionContext* cntx); - static void HGet(CmdArgList args, ConnectionContext* cntx); - static void HMGet(CmdArgList args, ConnectionContext* cntx); - static void HIncrBy(CmdArgList args, ConnectionContext* cntx); - static void HKeys(CmdArgList args, ConnectionContext* cntx); - static void HVals(CmdArgList args, ConnectionContext* cntx); - static void HGetAll(CmdArgList args, ConnectionContext* cntx); - static void HIncrByFloat(CmdArgList args, ConnectionContext* cntx); - static void HScan(CmdArgList args, ConnectionContext* cntx); - static void HSet(CmdArgList args, ConnectionContext* cntx); - static void HSetNx(CmdArgList args, ConnectionContext* cntx); - static void HStrLen(CmdArgList args, ConnectionContext* cntx); - static void HRandField(CmdArgList args, ConnectionContext* cntx); + using SinkReplyBuilder = facade::SinkReplyBuilder; + + static void HExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HGetAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HIncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + static void HSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HStrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); }; } // namespace dfly