Skip to content

Commit

Permalink
Merge pull request #283 from rabbitmq/improve-fence-mechanism-perform…
Browse files Browse the repository at this point in the history
…ance

Improve the fence mechanism performances
  • Loading branch information
dumbbell authored Aug 14, 2024
2 parents 503a29a + 4c25407 commit 94e673c
Showing 1 changed file with 115 additions and 36 deletions.
151 changes: 115 additions & 36 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,12 @@ do_process_sync_command(StoreId, Command, Options) ->
CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom},
T0 = khepri_utils:start_timeout_window(Timeout),
Dest = case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined -> LeaderId;
undefined -> RaServer
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
LeaderId;
undefined ->
sending_sync_command_locally(StoreId),
RaServer
end,
case ra:process_command(Dest, Command, CommandOptions) of
{ok, Ret, _LeaderId} ->
Expand Down Expand Up @@ -903,15 +907,18 @@ process_async_command(
StoreId, Command, ?DEFAULT_RA_COMMAND_CORRELATION = Correlation, Priority) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority);
process_async_command(
StoreId, Command, Correlation, Priority) ->
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
ra:pipeline_command(LeaderId, Command, Correlation, Priority);
undefined ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority)
end.

Expand Down Expand Up @@ -983,6 +990,7 @@ process_query(StoreId, QueryFun, Options) ->
end.

process_query1(StoreId, QueryFun, Options) ->
sending_query_locally(StoreId),
LocalServerId = {StoreId, node()},
case ra:local_query(LocalServerId, QueryFun, Options) of
{ok, {_RaIdxTerm, Ret}, _NewLeaderId} ->
Expand All @@ -1009,9 +1017,10 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% the order of operations between updates and queries. We have to follow
%% several steps to prepare that condition.
%%
%% We first send an arbitrary query to the local Ra server. This is to
%% make sure that previously submitted pipelined commands were processed
%% by that server.
%% If the last message from the calling process to the local Ra server was
%% an async command or if it never sent a command yet, we first send an
%% arbitrary query to the local Ra server. This is to make sure that
%% previously submitted pipelined commands were processed by that server.
%%
%% For instance, if there was a pipelined command without any correlation
%% ID, it ensures it was forwarded to the leader. Likewise for a
Expand All @@ -1020,35 +1029,45 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% We can't have this guaranty for pipelined commands with a correlation
%% because the caller is responsible for receiving the rejection from the
%% follower and handle the redirect to the leader.
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
InternalOptions = #{favor => low_latency,
timeout => Timeout},
case process_query(StoreId, QueryFun, InternalOptions) of
case can_skip_fence_preliminary_query(StoreId) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
add_applied_condition2(StoreId, Options, Timeout);
false ->
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
case process_query1(StoreId, QueryFun, Timeout) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
end
end.

add_applied_condition2(StoreId, Options, Timeout) ->
%% After the previous local query, there is a great chance that the leader
%% was cached, though not 100% guarantied.
%% After the previous local query or sync command if there was one, there
%% is a great chance that the leader was cached, though not 100%
%% guarantied.
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
add_applied_condition3(StoreId, Options, LeaderId, Timeout);
undefined ->
%% If the leader is unknown, executing a preliminary query should
%% tell us who the leader is.
ask_fence_preliminary_query(StoreId),
add_applied_condition1(StoreId, Options, Timeout)
end.

add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%% We query the leader to know the last index it committed. We also
%% double-check it is still the leader; if it is not, we recurse.
%% We query the leader to know the last index it committed in which term.
%%
%% We pay attention to its state because a map is still returned even if
%% the Ra server is stopped.
T0 = khepri_utils:start_timeout_window(Timeout),
case ra:member_overview(LeaderId, Timeout) of
{ok, Overview, LeaderId} ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
try ra:key_metrics(LeaderId, Timeout) of
#{last_index := LastIndex, term := Term, state := State}
when State =/= noproc andalso State =/= unknown ->
NewTimeout1 = khepri_utils:end_timeout_window(Timeout, T0),

%% Now that we know the last committed index of the leader, we can
%% perform an arbitrary query on the local server. The query will
Expand All @@ -1057,26 +1076,21 @@ add_applied_condition3(StoreId, Options, LeaderId, Timeout) ->
%%
%% We don't care about the result of that query. We just want to
%% block until the latest commands are applied locally.
#{log := #{last_index := LastIndex},
current_term := CurrentTerm} = Overview,
Condition = {applied, {LastIndex, CurrentTerm}},
Condition = {applied, {LastIndex, Term}},
Options1 = Options#{condition => Condition,
timeout => NewTimeout},
timeout => NewTimeout1},
{ok, Options1};
{ok, _Overview, NewLeaderId} ->
_ ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition3(StoreId, Options, NewLeaderId, NewTimeout);
{timeout, _LeaderId} ->
add_applied_condition1(StoreId, Options, NewTimeout)
catch
error:{erpc, timeout} ->
{error, timeout};
{error, Reason}
when ?HAS_TIME_LEFT(Timeout) andalso
(Reason == noproc orelse Reason == nodedown orelse
Reason == shutdown) ->
error:{erpc, noconnection} ->
timer:sleep(200),
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout);
Error ->
Error
NewTimeout2 = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition1(StoreId, Options, NewTimeout2)
end.

-spec get_timeout(Options) -> Timeout when
Expand All @@ -1096,6 +1110,71 @@ get_timeout(_) -> khepri_app:get_default_timeout().
clear_cache(_StoreId) ->
ok.

-define(CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
{?MODULE, can_skip_fence_preliminary_query, StoreId}).

-spec sending_sync_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a synchronous command is about to be sent locally.
%%
%% After that, we know we don't need a fence preliminary query.

sending_sync_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:put(Key, true),
ok.

-spec sending_query_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a query is about to be executed locally.
%%
%% After that, we know we don't need a fence preliminary query.

sending_query_locally(StoreId) ->
%% Same behavior as a local sync command.
sending_sync_command_locally(StoreId).

-spec sending_async_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that an asynchronous command is about to be sent locally.

sending_async_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:erase(Key),
ok.

-spec sending_command_remotely(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a command is about to be sent to a remote store.

sending_command_remotely(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).

-spec ask_fence_preliminary_query(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Explicitly requests that a call to {@link
%% can_skip_fence_preliminary_query/1} returns `true'.

ask_fence_preliminary_query(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).

-spec can_skip_fence_preliminary_query(StoreId) -> LastMsgWasSync when
StoreId :: khepri:store_id(),
LastMsgWasSync :: boolean().
%% @doc Indicates if the calling process sent a synchronous command or a query
%% before this call.
%%
%% @returns `true' if the calling process sent a synchrorous command or a query
%% to the given store before this call, `false' if the calling process never
%% sent anything to the given store, if the last message was an asynchrorous
%% command, or if the last message was sent to a remote store.

can_skip_fence_preliminary_query(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
erlang:get(Key) =:= true.

%% -------------------------------------------------------------------
%% ra_machine callbacks.
%% -------------------------------------------------------------------
Expand Down

0 comments on commit 94e673c

Please sign in to comment.