Skip to content

Commit

Permalink
fix: switch to SHUTTING_DOWN state unconditionally
Browse files Browse the repository at this point in the history
During the shutdown sequence always switch to SHUTTING_DOWN.
Make sure that the rest of the code does not break if it can not switch to the desired
global state + some clean ups around state transitions.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jan 6, 2025
1 parent 7860a16 commit 7daf643
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 45 deletions.
30 changes: 11 additions & 19 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,11 @@ void Service::Shutdown() {
VLOG(1) << "Service::Shutdown";

// We mark that we are shutting down. After this incoming requests will be
// rejected
// rejected.
mu_.lock();
global_state_ = GlobalState::SHUTTING_DOWN;
mu_.unlock();

pp_.AwaitFiberOnAll([](ProactorBase* pb) {
ServerState::tlocal()->EnterLameDuck();
facade::Connection::ShutdownThreadLocal();
Expand Down Expand Up @@ -2503,27 +2507,20 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
return to;
}

void Service::RequestLoadingState() {
bool switch_state = false;
{
bool Service::RequestLoadingState() {
if (SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING) {
util::fb2::LockGuard lk(mu_);
++loading_state_counter_;
if (global_state_ != GlobalState::LOADING) {
DCHECK_EQ(global_state_, GlobalState::ACTIVE);
switch_state = true;
}
}
if (switch_state) {
SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
loading_state_counter_++;
return true;
}
return false;
}

void Service::RemoveLoadingState() {
bool switch_state = false;
{
util::fb2::LockGuard lk(mu_);
DCHECK_EQ(global_state_, GlobalState::LOADING);
DCHECK_GT(loading_state_counter_, 0u);
CHECK_GT(loading_state_counter_, 0u);
--loading_state_counter_;
switch_state = loading_state_counter_ == 0;
}
Expand All @@ -2532,11 +2529,6 @@ void Service::RemoveLoadingState() {
}
}

GlobalState Service::GetGlobalState() const {
util::fb2::LockGuard lk(mu_);
return global_state_;
}

void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
// We skip authentication on privileged listener if the flag admin_nopass is set
// We also skip authentication if requirepass is empty
Expand Down
4 changes: 1 addition & 3 deletions src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,9 @@ class Service : public facade::ServiceInterface {
// Upon switch, updates cached global state in threadlocal ServerState struct.
GlobalState SwitchState(GlobalState from, GlobalState to) ABSL_LOCKS_EXCLUDED(mu_);

void RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_);
bool RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_);
void RemoveLoadingState() ABSL_LOCKS_EXCLUDED(mu_);

GlobalState GetGlobalState() const ABSL_LOCKS_EXCLUDED(mu_);

void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final;
void OnConnectionClose(facade::ConnectionContext* cntx) final;

Expand Down
12 changes: 10 additions & 2 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,11 @@ error_code Replica::InitiatePSync() {
io::PrefixSource ps{io_buf.InputBuffer(), Sock()};

// Set LOADING state.
service_.RequestLoadingState();
if (!service_.RequestLoadingState()) {
return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable),
"Failed to enter LOADING state");
}

absl::Cleanup cleanup = [this]() { service_.RemoveLoadingState(); };

if (slot_range_.has_value()) {
Expand Down Expand Up @@ -502,10 +506,14 @@ error_code Replica::InitiateDflySync() {
for (auto& flow : shard_flows_)
flow->Cancel();
};

RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));

// Make sure we're in LOADING state.
service_.RequestLoadingState();
if (!service_.RequestLoadingState()) {
return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable),
"Failed to enter LOADING state");
}

// Start full sync flows.
state_mask_.fetch_or(R_SYNCING);
Expand Down
29 changes: 18 additions & 11 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1168,10 +1168,12 @@ void ServerFamily::SnapshotScheduling() {
return;
}

const auto loading_check_interval = std::chrono::seconds(10);
while (service_.GetGlobalState() == GlobalState::LOADING) {
schedule_done_.WaitFor(loading_check_interval);
}
ServerState* ss = ServerState::tlocal();
do {
if (schedule_done_.WaitFor(100ms)) {
return;
}
} while (ss->gstate() == GlobalState::LOADING);

while (true) {
const std::chrono::time_point now = std::chrono::system_clock::now();
Expand Down Expand Up @@ -1657,9 +1659,10 @@ GenericError ServerFamily::DoSave(bool ignore_state) {

GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view basename,
Transaction* trans, bool ignore_state) {
auto state = service_.GetGlobalState();
auto state = ServerState::tlocal()->gstate();

// In some cases we want to create a snapshot even if server is not active, f.e in takeover
if (!ignore_state && (state != GlobalState::ACTIVE)) {
if (!ignore_state && (state != GlobalState::ACTIVE && state != GlobalState::SHUTTING_DOWN)) {
return GenericError{make_error_code(errc::operation_in_progress),
StrCat(GlobalStateName(state), " - can not save database")};
}
Expand Down Expand Up @@ -2242,6 +2245,8 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
absl::StrAppend(&info, a1, ":", a2, "\r\n");
};

ServerState* ss = ServerState::tlocal();

if (should_enter("SERVER")) {
auto kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";
Expand Down Expand Up @@ -2467,7 +2472,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
append("last_saved_file", save_info.file_name);
append("last_success_save_duration_sec", save_info.success_duration_sec);

size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING;
unsigned is_loading = (ss->gstate() == GlobalState::LOADING);
append("loading", is_loading);
append("saving", is_saving);
append("current_save_duration_sec", curent_durration_sec);
Expand Down Expand Up @@ -2752,7 +2757,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time

// We should not execute replica of command while loading from snapshot.
if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) {
ServerState* ss = ServerState::tlocal();
if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
builder->SendError(kLoadingErr);
return;
}
Expand All @@ -2766,7 +2772,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply

// If NO ONE was supplied, just stop the current replica (if it exists)
if (replicaof_args->IsReplicaOfNoOne()) {
if (!ServerState::tlocal()->is_master) {
if (!ss->is_master) {
CHECK(replica_);

SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
Expand All @@ -2776,8 +2782,9 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
StopAllClusterReplicas();
}

CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";
// May not switch if we are at shutdown state, and the process is shutting down
// at the same time.
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);

return builder->SendOk();
}
Expand Down
24 changes: 14 additions & 10 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1976,18 +1976,22 @@ async def test_replicaof_reject_on_load(df_factory, df_seeder_factory):
replica.stop()
replica.start()
c_replica = replica.client()

@assert_eventually
async def check_replica_isloading():
persistence = await c_replica.info("PERSISTENCE")
assert persistence["loading"] == 1

# If this fails adjust `keys` and the `assert dbsize >= 30000` above.
# Keep in mind that if the assert False is triggered below, it doesn't mean
# that there is a bug because it could be the case that while executing
# INFO PERSISTENCE df is in loading state but when we call REPLICAOF df
# is no longer in loading state and the assertion false is triggered.
await check_replica_isloading()

# Check replica of not alowed while loading snapshot
try:
# If this fails adjust `keys` and the `assert dbsize >= 30000` above.
# Keep in mind that if the assert False is triggered below, it doesn't mean
# that there is a bug because it could be the case that while executing
# INFO PERSISTENCE df is in loading state but when we call REPLICAOF df
# is no longer in loading state and the assertion false is triggered.
assert "loading:1" in (await c_replica.execute_command("INFO PERSISTENCE"))
with pytest.raises(aioredis.BusyLoadingError):
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
assert False
except aioredis.BusyLoadingError as e:
assert "Dragonfly is loading the dataset in memory" in str(e)

# Check one we finish loading snapshot replicaof success
await wait_available_async(c_replica, timeout=180)
Expand Down

0 comments on commit 7daf643

Please sign in to comment.