Skip to content

Commit

Permalink
khepri_cluster: Add wait_for_leader/{0,1,2}
Browse files Browse the repository at this point in the history
[Why]
This new public API is useful if you want to be sure a clustered store
is ready before issueing writes and queries.

Note that there are obviously no guaranties that the Raft quorum will be
lost just after this call.

[How]
The implementation is very close to `members/{0,1,2}`. The difference
are:
* how errors are handled
* the return value
  • Loading branch information
dumbbell committed Aug 23, 2024
1 parent be71afe commit e4b60b0
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 0 deletions.
69 changes: 69 additions & 0 deletions src/khepri_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
locally_known_nodes/0,
locally_known_nodes/1,
locally_known_nodes/2,
wait_for_leader/0, wait_for_leader/1, wait_for_leader/2,
get_default_ra_system_or_data_dir/0,
get_default_store_id/0,
get_store_ids/0,
Expand Down Expand Up @@ -1367,6 +1368,74 @@ locally_known_nodes(StoreId, Timeout) ->
Error -> Error
end.

-spec wait_for_leader() -> Ret when
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% Calling this function is the same as calling `wait_for_leader(StoreId)'
%% with the default store ID (see {@link
%% khepri_cluster:get_default_store_id/0}).
%%
%% @see wait_for_leader/1.
%% @see wait_for_leader/2.

wait_for_leader() ->
StoreId = get_default_store_id(),
wait_for_leader(StoreId).

-spec wait_for_leader(StoreId) -> Ret when
StoreId :: khepri:store_id(),
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% Calling this function is the same as calling `wait_for_leader(StoreId,
%% DefaultTimeout)' where `DefaultTimeout' is returned by {@link
%% khepri_app:get_default_timeout/0}.
%%
%% @see wait_for_leader/2.

wait_for_leader(StoreId) ->
Timeout = khepri_app:get_default_timeout(),
wait_for_leader(StoreId, Timeout).

-spec wait_for_leader(StoreId, Timeout) -> Ret when
StoreId :: khepri:store_id(),
Timeout :: timeout(),
Ret :: ok | khepri:error().
%% @doc Waits for a leader to be elected.
%%
%% This is useful if you want to be sure the clustered store is ready before
%% issueing writes and queries. Note that there are obviously no guaranties
%% that the Raft quorum will be lost just after this call.
%%
%% @param StoreId the ID of the store that should elect a leader before this
%% call can return successfully.
%% @param Timeout the timeout.
%%
%% @returns `ok' if a leader was elected or an `{error, Reason}' tuple.

wait_for_leader(StoreId, Timeout) ->
T0 = khepri_utils:start_timeout_window(Timeout),
ThisMember = this_member(StoreId),
case ra:members(ThisMember, Timeout) of
{ok, _Members, _LeaderId} ->
ok;
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse
Reason == noconnection orelse
Reason == nodedown orelse
Reason == shutdown) ->
NewTimeout0 = khepri_utils:end_timeout_window(Timeout, T0),
NewTimeout = khepri_utils:sleep(
?TRANSIENT_ERROR_RETRY_INTERVAL, NewTimeout0),
wait_for_leader(StoreId, NewTimeout);
{timeout, _} ->
{error, timeout};
{error, _} = Error ->
Error
end.

-spec node_to_member(StoreId, Node) -> Member when
StoreId :: khepri:store_id(),
Node :: node(),
Expand Down
145 changes: 145 additions & 0 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
can_start_a_single_node/1,
can_restart_a_single_node_with_ra_server_config/1,
can_query_members_with_a_single_node/1,
can_wait_for_leader_with_a_single_node/1,
fail_to_start_with_bad_ra_server_config/1,
initial_members_are_ignored/1,
can_start_a_three_node_cluster/1,
Expand All @@ -38,6 +39,7 @@
can_restart_nodes_in_a_three_node_cluster/1,
can_reset_a_cluster_member/1,
can_query_members_with_a_three_node_cluster/1,
can_wait_for_leader_with_a_three_node_cluster/1,
fail_to_join_if_not_started/1,
fail_to_join_non_existing_node/1,
fail_to_join_non_existing_store/1,
Expand All @@ -55,6 +57,7 @@ all() ->
[can_start_a_single_node,
can_restart_a_single_node_with_ra_server_config,
can_query_members_with_a_single_node,
can_wait_for_leader_with_a_single_node,
fail_to_start_with_bad_ra_server_config,
initial_members_are_ignored,
can_start_a_three_node_cluster,
Expand All @@ -63,6 +66,7 @@ all() ->
can_restart_nodes_in_a_three_node_cluster,
can_reset_a_cluster_member,
can_query_members_with_a_three_node_cluster,
can_wait_for_leader_with_a_three_node_cluster,
fail_to_join_if_not_started,
fail_to_join_non_existing_node,
fail_to_join_non_existing_store,
Expand Down Expand Up @@ -108,6 +112,7 @@ init_per_testcase(Testcase, Config)
when Testcase =:= can_start_a_single_node orelse
Testcase =:= can_restart_a_single_node_with_ra_server_config orelse
Testcase =:= can_query_members_with_a_single_node orelse
Testcase =:= can_wait_for_leader_with_a_single_node orelse
Testcase =:= fail_to_start_with_bad_ra_server_config orelse
Testcase =:= initial_members_are_ignored orelse
Testcase =:= fail_to_join_non_existing_node orelse
Expand All @@ -123,6 +128,7 @@ init_per_testcase(Testcase, Config)
Testcase =:= can_restart_nodes_in_a_three_node_cluster orelse
Testcase =:= can_reset_a_cluster_member orelse
Testcase =:= can_query_members_with_a_three_node_cluster orelse
Testcase =:= can_wait_for_leader_with_a_three_node_cluster orelse
Testcase =:= fail_to_join_if_not_started orelse
Testcase =:= fail_to_join_non_existing_store orelse
Testcase =:= handle_leader_down_on_three_node_cluster_command orelse
Expand Down Expand Up @@ -361,6 +367,47 @@ can_query_members_with_a_single_node(Config) ->

ok.

can_wait_for_leader_with_a_single_node(Config) ->
Node = node(),
#{Node := #{ra_system := RaSystem}} = ?config(ra_system_props, Config),
StoreId = RaSystem,

ct:pal("Wait for leader before starting database"),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId)),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId, 2000)),

ct:pal("Start database and wait for it in parallel"),
Parent = self(),
_ = spawn_link(fun() ->
timer:sleep(2000),
?assertEqual(
{ok, StoreId},
khepri:start(RaSystem, StoreId)),
erlang:unlink(Parent)
end),
?assertEqual(
ok,
khepri_cluster:wait_for_leader(StoreId, 40000)),

ct:pal("Stop database"),
?assertEqual(
ok,
khepri:stop(StoreId)),

ct:pal("Wait for leader after stopping database"),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId)),
?assertEqual(
{error, noproc},
khepri_cluster:wait_for_leader(StoreId, 2000)),

ok.

fail_to_start_with_bad_ra_server_config(Config) ->
Node = node(),
#{Node := #{ra_system := RaSystem}} = ?config(ra_system_props, Config),
Expand Down Expand Up @@ -1298,6 +1345,100 @@ can_query_members_with_a_three_node_cluster(Config) ->

ok.

can_wait_for_leader_with_a_three_node_cluster(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
PeerPerNode = ?config(peer_nodes, Config),
[Node1, Node2, Node3] = Nodes = lists:sort(maps:keys(PropsPerNode)),

%% We assume all nodes are using the same Ra system name & store ID.
#{ra_system := RaSystem} = maps:get(Node1, PropsPerNode),
StoreId = RaSystem,

ct:pal("Wait for leader before starting database"),
lists:foreach(
fun(Node) ->
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, Nodes),

ct:pal("Start database + cluster nodes"),
lists:foreach(
fun(Node) ->
ct:pal("- khepri:start() from node ~s", [Node]),
?assertEqual(
{ok, StoreId},
rpc:call(Node, khepri, start, [RaSystem, StoreId]))
end, Nodes),
lists:foreach(
fun(Node) ->
ct:pal("- khepri_cluster:join() from node ~s", [Node]),
?assertEqual(
ok,
rpc:call(Node, khepri_cluster, join, [StoreId, Node3]))
end, [Node1, Node2]),

ct:pal("Wait for leader after starting database"),
lists:foreach(
fun(Node) ->
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, Nodes),

LeaderId1 = get_leader_in_store(StoreId, Nodes),
{StoreId, LeaderNode1} = LeaderId1,
ct:pal("Stop node ~s", [LeaderNode1]),
LeaderPeer1 = proplists:get_value(LeaderNode1, PeerPerNode),
?assertEqual(ok, stop_erlang_node(LeaderNode1, LeaderPeer1)),

ct:pal("Wait for leader after stopping leader"),
LeftNodes1 = Nodes -- [LeaderNode1],
lists:foreach(
fun(Node) ->
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
ok,
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, LeftNodes1),

lists:foreach(
fun(Node) ->
ct:pal("Stop node ~s", [Node]),
?assertEqual(
ok,
rpc:call(Node, khepri, stop, [StoreId]))
end, LeftNodes1),

ct:pal("Wait for leader after stopping database"),
lists:foreach(
fun(Node) ->
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId])),
?assertEqual(
{error, noproc},
erpc:call(
Node, khepri_cluster, wait_for_leader, [StoreId, 2000]))
end, LeftNodes1),

ok.

fail_to_join_if_not_started(Config) ->
PropsPerNode = ?config(ra_system_props, Config),
[Node1, Node2, _Node3] = maps:keys(PropsPerNode),
Expand Down Expand Up @@ -1396,6 +1537,8 @@ can_use_default_store_on_single_node(_Config) ->
?assertEqual({error, noproc}, khepri_cluster:nodes()),
?assertEqual({error, noproc}, khepri_cluster:locally_known_nodes()),

?assertEqual({error, noproc}, khepri_cluster:wait_for_leader()),

{ok, StoreId} = khepri:start(),
?assert(filelib:is_dir(DataDir)),

Expand Down Expand Up @@ -1493,6 +1636,8 @@ can_use_default_store_on_single_node(_Config) ->
?assertEqual({ok, [Node]}, khepri_cluster:nodes()),
?assertEqual({ok, [Node]}, khepri_cluster:locally_known_nodes()),

?assertEqual(ok, khepri_cluster:wait_for_leader()),

?assertEqual(ok, khepri:stop()),
?assertEqual({error, noproc}, khepri:get([foo])),
?assertEqual({error, noproc}, khepri:exists([foo])),
Expand Down

0 comments on commit e4b60b0

Please sign in to comment.