From c484a0c3eb4c99c37a9432472b76045c0b63f308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 25 Jul 2024 19:02:06 +0200 Subject: [PATCH 1/3] khepri_machine: Use `ra:key_metrics/2` instead of `ra:member_overview/2` [Why] `ra:member_overview/2` is a very expensive call. [How] We just need the last index and the current term from the leader and `ra:key_metrics/2` provides this piece of information too. The difference is huge: in my benchmark, the query rate goes from 15 queries per second to 100k. This is in association with a related change in Ra; see rabbitmq/ra#462. --- src/khepri_machine.erl | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 561c4eb7..1afe8184 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -1043,12 +1043,15 @@ add_applied_condition2(StoreId, Options, Timeout) -> end. add_applied_condition3(StoreId, Options, LeaderId, Timeout) -> - %% We query the leader to know the last index it committed. We also - %% double-check it is still the leader; if it is not, we recurse. + %% We query the leader to know the last index it committed in which term. + %% + %% We pay attention to its state because a map is still returned even if + %% the Ra server is stopped. T0 = khepri_utils:start_timeout_window(Timeout), - case ra:member_overview(LeaderId, Timeout) of - {ok, Overview, LeaderId} -> - NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), + try ra:key_metrics(LeaderId, Timeout) of + #{last_index := LastIndex, term := Term, state := State} + when State =/= noproc andalso State =/= unknown -> + NewTimeout1 = khepri_utils:end_timeout_window(Timeout, T0), %% Now that we know the last committed index of the leader, we can %% perform an arbitrary query on the local server. The query will @@ -1057,26 +1060,21 @@ add_applied_condition3(StoreId, Options, LeaderId, Timeout) -> %% %% We don't care about the result of that query. We just want to %% block until the latest commands are applied locally. - #{log := #{last_index := LastIndex}, - current_term := CurrentTerm} = Overview, - Condition = {applied, {LastIndex, CurrentTerm}}, + Condition = {applied, {LastIndex, Term}}, Options1 = Options#{condition => Condition, - timeout => NewTimeout}, + timeout => NewTimeout1}, {ok, Options1}; - {ok, _Overview, NewLeaderId} -> + _ -> + timer:sleep(200), NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), - add_applied_condition3(StoreId, Options, NewLeaderId, NewTimeout); - {timeout, _LeaderId} -> + add_applied_condition1(StoreId, Options, NewTimeout) + catch + error:{erpc, timeout} -> {error, timeout}; - {error, Reason} - when ?HAS_TIME_LEFT(Timeout) andalso - (Reason == noproc orelse Reason == nodedown orelse - Reason == shutdown) -> + error:{erpc, noconnection} -> timer:sleep(200), - NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), - add_applied_condition1(StoreId, Options, NewTimeout); - Error -> - Error + NewTimeout2 = khepri_utils:end_timeout_window(Timeout, T0), + add_applied_condition1(StoreId, Options, NewTimeout2) end. -spec get_timeout(Options) -> Timeout when From 56f009e93b9584cc82ed0ce424121829439dbcb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 25 Jul 2024 19:05:09 +0200 Subject: [PATCH 2/3] khepri_machine: Call `process_query1/3` directly in `add_applied_condition1/3` [Why] There is no need to run the same checks as a user query. We can execute the query directly. --- src/khepri_machine.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 1afe8184..512613dc 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -1022,9 +1022,7 @@ add_applied_condition1(StoreId, Options, Timeout) -> %% follower and handle the redirect to the leader. T0 = khepri_utils:start_timeout_window(Timeout), QueryFun = fun erlang:is_tuple/1, - InternalOptions = #{favor => low_latency, - timeout => Timeout}, - case process_query(StoreId, QueryFun, InternalOptions) of + case process_query1(StoreId, QueryFun, Timeout) of true -> NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), add_applied_condition2(StoreId, Options, NewTimeout); From 4c25407a766391e3bbe5531ff1ec08b6a0a5dd9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 26 Jul 2024 16:11:53 +0200 Subject: [PATCH 3/3] khepri_machine: Skip extra query before a query with a fence ... if it is unneeded. [Why] We do that extra query to ensure that previous async commands were handled by the local Ra server before we proceed with the query with a fence. This comes with a performance penalty of course. We don't need that extra query if the previous command or query made by the calling process was synchronous. [How] We now keep a flag in the calling process dictionary to indicate if the last command was synchonous or it was a query. The flag is cleared with an async command. When we have to perform a query with a fence, we look at this flag to determine if the extra query is needed. --- src/khepri_machine.erl | 111 +++++++++++++++++++++++++++++++++++------ 1 file changed, 97 insertions(+), 14 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 512613dc..3638e2e9 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -865,8 +865,12 @@ do_process_sync_command(StoreId, Command, Options) -> CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom}, T0 = khepri_utils:start_timeout_window(Timeout), Dest = case ra_leaderboard:lookup_leader(StoreId) of - LeaderId when LeaderId =/= undefined -> LeaderId; - undefined -> RaServer + LeaderId when LeaderId =/= undefined -> + sending_command_remotely(StoreId), + LeaderId; + undefined -> + sending_sync_command_locally(StoreId), + RaServer end, case ra:process_command(Dest, Command, CommandOptions) of {ok, Ret, _LeaderId} -> @@ -903,15 +907,18 @@ process_async_command( StoreId, Command, ?DEFAULT_RA_COMMAND_CORRELATION = Correlation, Priority) -> ThisNode = node(), RaServer = khepri_cluster:node_to_member(StoreId, ThisNode), + sending_async_command_locally(StoreId), ra:pipeline_command(RaServer, Command, Correlation, Priority); process_async_command( StoreId, Command, Correlation, Priority) -> case ra_leaderboard:lookup_leader(StoreId) of LeaderId when LeaderId =/= undefined -> + sending_command_remotely(StoreId), ra:pipeline_command(LeaderId, Command, Correlation, Priority); undefined -> ThisNode = node(), RaServer = khepri_cluster:node_to_member(StoreId, ThisNode), + sending_async_command_locally(StoreId), ra:pipeline_command(RaServer, Command, Correlation, Priority) end. @@ -983,6 +990,7 @@ process_query(StoreId, QueryFun, Options) -> end. process_query1(StoreId, QueryFun, Options) -> + sending_query_locally(StoreId), LocalServerId = {StoreId, node()}, case ra:local_query(LocalServerId, QueryFun, Options) of {ok, {_RaIdxTerm, Ret}, _NewLeaderId} -> @@ -1009,9 +1017,10 @@ add_applied_condition1(StoreId, Options, Timeout) -> %% the order of operations between updates and queries. We have to follow %% several steps to prepare that condition. %% - %% We first send an arbitrary query to the local Ra server. This is to - %% make sure that previously submitted pipelined commands were processed - %% by that server. + %% If the last message from the calling process to the local Ra server was + %% an async command or if it never sent a command yet, we first send an + %% arbitrary query to the local Ra server. This is to make sure that + %% previously submitted pipelined commands were processed by that server. %% %% For instance, if there was a pipelined command without any correlation %% ID, it ensures it was forwarded to the leader. Likewise for a @@ -1020,23 +1029,32 @@ add_applied_condition1(StoreId, Options, Timeout) -> %% We can't have this guaranty for pipelined commands with a correlation %% because the caller is responsible for receiving the rejection from the %% follower and handle the redirect to the leader. - T0 = khepri_utils:start_timeout_window(Timeout), - QueryFun = fun erlang:is_tuple/1, - case process_query1(StoreId, QueryFun, Timeout) of + case can_skip_fence_preliminary_query(StoreId) of true -> - NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), - add_applied_condition2(StoreId, Options, NewTimeout); - Other when Other =/= false -> - Other + add_applied_condition2(StoreId, Options, Timeout); + false -> + T0 = khepri_utils:start_timeout_window(Timeout), + QueryFun = fun erlang:is_tuple/1, + case process_query1(StoreId, QueryFun, Timeout) of + true -> + NewTimeout = khepri_utils:end_timeout_window(Timeout, T0), + add_applied_condition2(StoreId, Options, NewTimeout); + Other when Other =/= false -> + Other + end end. add_applied_condition2(StoreId, Options, Timeout) -> - %% After the previous local query, there is a great chance that the leader - %% was cached, though not 100% guarantied. + %% After the previous local query or sync command if there was one, there + %% is a great chance that the leader was cached, though not 100% + %% guarantied. case ra_leaderboard:lookup_leader(StoreId) of LeaderId when LeaderId =/= undefined -> add_applied_condition3(StoreId, Options, LeaderId, Timeout); undefined -> + %% If the leader is unknown, executing a preliminary query should + %% tell us who the leader is. + ask_fence_preliminary_query(StoreId), add_applied_condition1(StoreId, Options, Timeout) end. @@ -1092,6 +1110,71 @@ get_timeout(_) -> khepri_app:get_default_timeout(). clear_cache(_StoreId) -> ok. +-define(CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId), + {?MODULE, can_skip_fence_preliminary_query, StoreId}). + +-spec sending_sync_command_locally(StoreId) -> ok when + StoreId :: khepri:store_id(). +%% @doc Records that a synchronous command is about to be sent locally. +%% +%% After that, we know we don't need a fence preliminary query. + +sending_sync_command_locally(StoreId) -> + Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId), + _ = erlang:put(Key, true), + ok. + +-spec sending_query_locally(StoreId) -> ok when + StoreId :: khepri:store_id(). +%% @doc Records that a query is about to be executed locally. +%% +%% After that, we know we don't need a fence preliminary query. + +sending_query_locally(StoreId) -> + %% Same behavior as a local sync command. + sending_sync_command_locally(StoreId). + +-spec sending_async_command_locally(StoreId) -> ok when + StoreId :: khepri:store_id(). +%% @doc Records that an asynchronous command is about to be sent locally. + +sending_async_command_locally(StoreId) -> + Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId), + _ = erlang:erase(Key), + ok. + +-spec sending_command_remotely(StoreId) -> ok when + StoreId :: khepri:store_id(). +%% @doc Records that a command is about to be sent to a remote store. + +sending_command_remotely(StoreId) -> + %% Same behavior as a local async command. + sending_async_command_locally(StoreId). + +-spec ask_fence_preliminary_query(StoreId) -> ok when + StoreId :: khepri:store_id(). +%% @doc Explicitly requests that a call to {@link +%% can_skip_fence_preliminary_query/1} returns `true'. + +ask_fence_preliminary_query(StoreId) -> + %% Same behavior as a local async command. + sending_async_command_locally(StoreId). + +-spec can_skip_fence_preliminary_query(StoreId) -> LastMsgWasSync when + StoreId :: khepri:store_id(), + LastMsgWasSync :: boolean(). +%% @doc Indicates if the calling process sent a synchronous command or a query +%% before this call. +%% +%% @returns `true' if the calling process sent a synchrorous command or a query +%% to the given store before this call, `false' if the calling process never +%% sent anything to the given store, if the last message was an asynchrorous +%% command, or if the last message was sent to a remote store. + +can_skip_fence_preliminary_query(StoreId) -> + Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId), + erlang:get(Key) =:= true. + %% ------------------------------------------------------------------- %% ra_machine callbacks. %% -------------------------------------------------------------------