Skip to content

Commit

Permalink
chore: pass SinkReplyBuilder and Transaction explicitly. Part4 (#3967)
Browse files Browse the repository at this point in the history
  • Loading branch information
romange authored Oct 23, 2024
1 parent 70614b8 commit 0ebc1a1
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 166 deletions.
110 changes: 53 additions & 57 deletions src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ using BitsStrVec = vector<string>;

// The following is the list of the functions that would handle the
// commands that handle the bit operations
void BitPos(CmdArgList args, ConnectionContext* cntx);
void BitCount(CmdArgList args, ConnectionContext* cntx);
void BitField(CmdArgList args, ConnectionContext* cntx);
void BitFieldRo(CmdArgList args, ConnectionContext* cntx);
void BitOp(CmdArgList args, ConnectionContext* cntx);
void GetBit(CmdArgList args, ConnectionContext* cntx);
void SetBit(CmdArgList args, ConnectionContext* cntx);
void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);

OpResult<string> ReadValue(const DbContext& context, string_view key, EngineShard* shard);
OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, string_view key, uint32_t offset);
Expand Down Expand Up @@ -489,35 +489,36 @@ OpResult<string> RunBitOpOnShard(string_view op, const OpArgs& op_args, ShardArg
return op_result;
}

template <typename T> void HandleOpValueResult(const OpResult<T>& result, ConnectionContext* cntx) {
template <typename T>
void HandleOpValueResult(const OpResult<T>& result, SinkReplyBuilder* builder) {
static_assert(std::is_integral<T>::value,
"we are only handling types that are integral types in the return types from "
"here");
if (result) {
cntx->SendLong(result.value());
builder->SendLong(result.value());
} else {
switch (result.status()) {
case OpStatus::WRONG_TYPE:
cntx->SendError(kWrongTypeErr);
builder->SendError(kWrongTypeErr);
break;
case OpStatus::OUT_OF_MEMORY:
cntx->SendError(kOutOfMemory);
builder->SendError(kOutOfMemory);
break;
default:
cntx->SendLong(0); // in case we don't have the value we should just send 0
builder->SendLong(0); // in case we don't have the value we should just send 0
break;
}
}
}

// ------------------------------------------------------------------------- //
// Impl for the command functions
void BitPos(CmdArgList args, ConnectionContext* cntx) {
void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command BITPOS
// See details at https://redis.io/commands/bitpos/

if (args.size() < 1 || args.size() > 5) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}

string_view key = ArgS(args, 0);
Expand All @@ -528,19 +529,19 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
bool as_bit = false;

if (!absl::SimpleAtoi(ArgS(args, 1), &value)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
} else if (value != 0 && value != 1) {
return cntx->SendError("The bit argument must be 1 or 0");
return builder->SendError("The bit argument must be 1 or 0");
}

if (args.size() >= 3) {
if (!absl::SimpleAtoi(ArgS(args, 2), &start)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}

if (args.size() >= 4) {
if (!absl::SimpleAtoi(ArgS(args, 3), &end)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}

if (args.size() >= 5) {
Expand All @@ -550,7 +551,7 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
} else if (arg == "BYTE") {
as_bit = false;
} else {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
}
}
Expand All @@ -559,12 +560,11 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return FindFirstBitWithValue(t->GetOpArgs(shard), key, value, start, end, as_bit);
};
Transaction* trans = cntx->transaction;
OpResult<int64_t> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<int64_t> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

void BitCount(CmdArgList args, ConnectionContext* cntx) {
void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command BITCOUNT
// See details at https://redis.io/commands/bitcount/
// Please note that if the key don't exists, it would return 0
Expand All @@ -579,14 +579,13 @@ void BitCount(CmdArgList args, ConnectionContext* cntx) {
bool as_bit = parser.HasNext() ? parser.MapNext("BYTE", false, "BIT", true) : false;

if (!parser.Finalize()) {
return cntx->SendError(parser.Error()->MakeReply());
return builder->SendError(parser.Error()->MakeReply());
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return CountBitsForValue(t->GetOpArgs(shard), key, start, end, as_bit);
};
Transaction* trans = cntx->transaction;
OpResult<std::size_t> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<std::size_t> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

// GCC yields a wrong warning about uninitialized optional use
Expand Down Expand Up @@ -1063,8 +1062,8 @@ nonstd::expected<CommandList, string> ParseToCommandList(CmdArgList args, bool r
return result;
}

void SendResults(const vector<ResultType>& results, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void SendResults(const vector<ResultType>& results, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
const size_t total = results.size();
if (total == 0) {
rb->SendNullArray();
Expand All @@ -1082,17 +1081,17 @@ void SendResults(const vector<ResultType>& results, ConnectionContext* cntx) {
}
}

void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) {
void BitFieldGeneric(CmdArgList args, bool read_only, Transaction* tx, SinkReplyBuilder* builder) {
if (args.size() == 1) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendNullArray();
return;
}
auto key = ArgS(args, 0);
auto maybe_ops_list = ParseToCommandList(args.subspan(1), read_only);

if (!maybe_ops_list.has_value()) {
cntx->SendError(maybe_ops_list.error());
builder->SendError(maybe_ops_list.error());
return;
}
CommandList cmd_list = std::move(maybe_ops_list.value());
Expand All @@ -1102,30 +1101,29 @@ void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) {
return executor.Execute(cmd_list);
};

Transaction* trans = cntx->transaction;
OpResult<vector<ResultType>> res = trans->ScheduleSingleHopT(std::move(cb));
OpResult<vector<ResultType>> res = tx->ScheduleSingleHopT(std::move(cb));

if (res == OpStatus::WRONG_TYPE) {
cntx->SendError(kWrongTypeErr);
builder->SendError(kWrongTypeErr);
return;
}

SendResults(*res, cntx);
SendResults(*res, builder);
}

void BitField(CmdArgList args, ConnectionContext* cntx) {
BitFieldGeneric(args, false, cntx);
void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, false, tx, builder);
}

void BitFieldRo(CmdArgList args, ConnectionContext* cntx) {
BitFieldGeneric(args, true, cntx);
void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, true, tx, builder);
}

#ifndef __clang__
#pragma GCC diagnostic pop
#endif

void BitOp(CmdArgList args, ConnectionContext* cntx) {
void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
static const std::array<string_view, 4> BITOP_OP_NAMES{OR_OP_NAME, XOR_OP_NAME, AND_OP_NAME,
NOT_OP_NAME};
string op = absl::AsciiStrToUpper(ArgS(args, 0));
Expand All @@ -1134,7 +1132,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
[&op](auto val) { return op == val; });

if (illegal || (op == NOT_OP_NAME && args.size() > 3)) {
return cntx->SendError(kSyntaxErr); // too many arguments
return builder->SendError(kSyntaxErr); // too many arguments
}

// Multi shard access - read only
Expand All @@ -1157,13 +1155,13 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};

cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do
tx->Execute(std::move(shard_bitop), false); // we still have more work to do
// All result from each shard
const auto joined_results = CombineResultOp(result_set, op);
// Second phase - save to target key if successful
if (!joined_results) {
cntx->transaction->Conclude();
cntx->SendError(joined_results.status());
tx->Conclude();
builder->SendError(joined_results.status());
return;
} else {
auto op_result = joined_results.value();
Expand Down Expand Up @@ -1193,47 +1191,45 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK;
};

cntx->transaction->Execute(std::move(store_cb), true);
cntx->SendLong(op_result.size());
tx->Execute(std::move(store_cb), true);
builder->SendLong(op_result.size());
}
}

void GetBit(CmdArgList args, ConnectionContext* cntx) {
void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command "GETBIT key offset"
// see https://redis.io/commands/getbit/

uint32_t offset{0};
string_view key = ArgS(args, 0);

if (!absl::SimpleAtoi(ArgS(args, 1), &offset)) {
return cntx->SendError(kInvalidIntErr);
return builder->SendError(kInvalidIntErr);
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return ReadValueBitsetAt(t->GetOpArgs(shard), key, offset);
};
Transaction* trans = cntx->transaction;
OpResult<bool> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

void SetBit(CmdArgList args, ConnectionContext* cntx) {
void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command "SETBIT key offset new_value"
// see https://redis.io/commands/setbit/

CmdArgParser parser(args);
auto [key, offset, value] = parser.Next<string_view, uint32_t, FInt<0, 1>>();

if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
}

auto cb = [&](Transaction* t, EngineShard* shard) {
return BitNewValue(t->GetOpArgs(shard), key, offset, value != 0);
};

Transaction* trans = cntx->transaction;
OpResult<bool> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, builder);
}

// ------------------------------------------------------------------------- //
Expand Down
Loading

0 comments on commit 0ebc1a1

Please sign in to comment.