Skip to content

Commit

Permalink
khepri_tx_adv: Prefer list prepending to build side effect lists
Browse files Browse the repository at this point in the history
In a transaction we build the set of `ra_machine:effect()`s for the
machine to handle by using `++/2` with the old and new effects for each
`khepri_tx`/`khepri_tx_adv` command. This is a noticeable bottleneck
though when there are many (thousands or more) commands used in a
transaction.

`++/2` works by prepending every element in the left-hand side list
(reversed) to the right-hand side list. As a transaction calls more
`khepri_tx`/ `khepri_tx_adv` commands, the left-hand side list (old side
effects) gets much longer than the right-hand side list (new side
effects) and time it takes to append increases.

We can avoid `++/2` by building the side effect list gradually and
prepend every effect. The ordering of effects within a command isn't
consequential: the effects update triggers and projections and each
update is disjoint from the others. The ordering of the full side effect
list is important though - a delete's effects should be handled before
a put for the same path pattern for example - so we need to reverse the
side effect list for the whole transaction before handling the effects.
  • Loading branch information
the-mikedavis committed Aug 24, 2024
1 parent be71afe commit ea0ea26
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 50 deletions.
77 changes: 40 additions & 37 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@
split_query_options/1,
split_command_options/1,
split_put_options/1,
insert_or_update_node/5,
delete_matching_nodes/3,
insert_or_update_node/6,
delete_matching_nodes/4,
handle_tx_exception/1,
process_query/3,
process_command/3]).
Expand Down Expand Up @@ -1295,13 +1295,13 @@ apply(
State) ->
{TreeOptions, PutOptions} = split_put_options(TreeAndPutOptions),
Ret = insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions),
State, PathPattern, Payload, PutOptions, TreeOptions, []),
post_apply(Ret, Meta);
apply(
Meta,
#delete{path = PathPattern, options = TreeOptions},
State) ->
Ret = delete_matching_nodes(State, PathPattern, TreeOptions),
Ret = delete_matching_nodes(State, PathPattern, TreeOptions, []),
post_apply(Ret, Meta);
apply(
Meta,
Expand Down Expand Up @@ -1709,54 +1709,58 @@ failed_to_locate_sproc(Reason) ->
khepri_tx:abort(Reason).

-spec insert_or_update_node(
State, PathPattern, Payload, PutOptions, TreeOptions) -> Ret when
State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) -> Ret when
State :: state(),
PathPattern :: khepri_path:native_pattern(),
Payload :: khepri_payload:payload(),
PutOptions :: khepri:put_options(),
TreeOptions :: khepri:tree_options(),
Ret :: {State, Result} | {State, Result, ra_machine:effects()},
SideEffects :: ra_machine:effects(),
Ret :: {State, Result, ra_machine:effects()},
Result :: khepri_machine:common_ret().
%% @private

insert_or_update_node(State, PathPattern, Payload, PutOptions, TreeOptions) ->
insert_or_update_node(State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) ->
Tree = get_tree(State),
Ret1 = khepri_tree:insert_or_update_node(
Tree, PathPattern, Payload, PutOptions, TreeOptions),
case Ret1 of
{ok, Tree1, AppliedChanges, Ret2} ->
State1 = set_tree(State, Tree1),
{State2, SideEffects} = create_tree_change_side_effects(
State, State1, Ret2, AppliedChanges),
{State2, {ok, Ret2}, SideEffects};
{State2, SideEffects1} = add_tree_change_side_effects(
State, State1, Ret2, AppliedChanges, SideEffects),
{State2, {ok, Ret2}, SideEffects1};
Error ->
{State, Error}
{State, Error, SideEffects}
end.

-spec delete_matching_nodes(State, PathPattern, TreeOptions) -> Ret when
-spec delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) ->
Ret when
State :: state(),
PathPattern :: khepri_path:native_pattern(),
TreeOptions :: khepri:tree_options(),
Ret :: {State, Result} | {State, Result, ra_machine:effects()},
SideEffects :: ra_machine:effects(),
Ret :: {State, Result, ra_machine:effects()},
Result :: khepri_machine:common_ret().
%% @private

delete_matching_nodes(State, PathPattern, TreeOptions) ->
delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) ->
Tree = get_tree(State),
Ret = khepri_tree:delete_matching_nodes(
Tree, PathPattern, #{}, TreeOptions),
case Ret of
{ok, Tree1, AppliedChanges, Ret2} ->
State1 = set_tree(State, Tree1),
{State2, SideEffects} = create_tree_change_side_effects(
State, State1, Ret2, AppliedChanges),
{State2, {ok, Ret2}, SideEffects};
{State2, SideEffects1} = add_tree_change_side_effects(
State, State1, Ret2, AppliedChanges,
SideEffects),
{State2, {ok, Ret2}, SideEffects1};
Error ->
{State, Error}
{State, Error, SideEffects}
end.

create_tree_change_side_effects(
InitialState, NewState, Ret, KeepWhileAftermath) ->
add_tree_change_side_effects(
InitialState, NewState, Ret, KeepWhileAftermath, SideEffects) ->
%% We make a map where for each affected tree node, we indicate the type
%% of change.
Changes0 = maps:merge(Ret, KeepWhileAftermath),
Expand All @@ -1766,24 +1770,23 @@ create_tree_change_side_effects(
(_, #{} = _NodeProps) -> update;
(_, delete) -> delete
end, Changes0),
ProjectionEffects = create_projection_side_effects(
InitialState, NewState, Changes),
{NewState1, TriggerEffects} = create_trigger_side_effects(
InitialState, NewState, Changes),
{NewState1, ProjectionEffects ++ TriggerEffects}.
SideEffects1 = add_projection_side_effects(
InitialState, NewState, Changes, SideEffects),
add_trigger_side_effects(
InitialState, NewState, Changes, SideEffects1).

create_projection_side_effects(InitialState, NewState, Changes) ->
add_projection_side_effects(InitialState, NewState, Changes, SideEffects) ->
InitialTree = get_tree(InitialState),
NewTree = get_tree(NewState),
ProjectionTree0 = get_projections(NewState),
ProjectionTree = get_compiled_projection_tree(ProjectionTree0),
maps:fold(
fun(Path, Change, Effects) ->
create_projection_side_effects1(
add_projection_side_effects1(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects)
end, [], Changes).
end, SideEffects, Changes).

create_projection_side_effects1(
add_projection_side_effects1(
InitialTree, NewTree, ProjectionTree, Path, delete = Change, Effects) ->
%% Deletion changes recursively delete the subtree below the deleted tree
%% node. Find any children in the tree that were also deleted by this
Expand All @@ -1793,7 +1796,7 @@ create_projection_side_effects1(
ChildrenPattern = Path ++ [?KHEPRI_WILDCARD_STAR_STAR],
EffectsForChildrenFun =
fun(ChildPath, _NodeProps, EffectAcc) ->
create_projection_side_effects2(
add_projection_side_effects2(
InitialTree, NewTree, ProjectionTree,
ChildPath, Change, EffectAcc)
end,
Expand All @@ -1802,14 +1805,14 @@ create_projection_side_effects1(
EffectsForChildrenFun, Effects,
ChildrenFindOptions),
%% Also trigger a change for the deleted path itself.
create_projection_side_effects2(
add_projection_side_effects2(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects1);
create_projection_side_effects1(
add_projection_side_effects1(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects) ->
create_projection_side_effects2(
add_projection_side_effects2(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects).

create_projection_side_effects2(
add_projection_side_effects2(
InitialTree, NewTree, ProjectionTree, Path, Change, Effects) ->
PatternMatchingTree = case Change of
create ->
Expand Down Expand Up @@ -1869,15 +1872,15 @@ evaluate_projection(
Effect = {aux, Trigger},
[Effect | Effects].

create_trigger_side_effects(InitialState, NewState, Changes) ->
add_trigger_side_effects(InitialState, NewState, Changes, SideEffects) ->
%% We want to consider the new state (with the updated tree), but we want
%% to use triggers from the initial state, in case they were updated too.
%% In other words, we want to evaluate triggers in the state they were at
%% the time the change to the tree was requested.
Triggers = get_triggers(InitialState),
case Triggers =:= #{} of
true ->
{NewState, []};
{NewState, SideEffects};
false ->
EmittedTriggers = get_emitted_triggers(InitialState),
#config{store_id = StoreId} = get_config(NewState),
Expand All @@ -1903,7 +1906,7 @@ create_trigger_side_effects(InitialState, NewState, Changes) ->
khepri_event_handler,
handle_triggered_sprocs,
[StoreId, TriggeredStoredProcs]},
{NewState1, [SideEffect]}
{NewState1, [SideEffect | SideEffects]}
end.

list_triggered_sprocs(Tree, Changes, Triggers) ->
Expand Down
25 changes: 12 additions & 13 deletions src/khepri_tx_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,10 @@ put_many(PathPattern, Data, Options) ->
{TreeOptions, PutOptions} =
khepri_machine:split_put_options(TreeAndPutOptions),
%% TODO: Ensure `CommandOptions' is unset.
Fun = fun(State) ->
Fun = fun(State, SideEffects) ->
khepri_machine:insert_or_update_node(
State, PathPattern1, Payload1, PutOptions, TreeOptions)
State, PathPattern1, Payload1, PutOptions, TreeOptions,
SideEffects)
end,
handle_state_for_call(Fun).

Expand Down Expand Up @@ -421,9 +422,9 @@ delete_many(PathPattern, Options) ->
khepri_machine:split_command_options(Options),
%% TODO: Ensure `CommandOptions' is empty and `TreeOptions' doesn't
%% contains put options.
Fun = fun(State) ->
Fun = fun(State, SideEffects) ->
khepri_machine:delete_matching_nodes(
State, PathPattern1, TreeOptions)
State, PathPattern1, TreeOptions, SideEffects)
end,
handle_state_for_call(Fun).

Expand Down Expand Up @@ -985,7 +986,10 @@ run(State, StandaloneFun, Args, AllowUpdates)
NewTxProps = erlang:erase(?TX_PROPS),
khepri_machine:ensure_is_state(NewState),
?assertEqual(TxProps, NewTxProps),
{NewState, Ret, NewSideEffects}
%% The side effect list is built using prepends so the list needs to
%% be reversed to process the effects in order.
NewSideEffects1 = lists:reverse(NewSideEffects),
{NewState, Ret, NewSideEffects1}
catch
Class:Reason:Stacktrace ->
_ = erlang:erase(?TX_STATE_KEY),
Expand All @@ -996,14 +1000,9 @@ run(State, StandaloneFun, Args, AllowUpdates)

handle_state_for_call(Fun) ->
{State, SideEffects} = get_tx_state(),
case Fun(State) of
{NewState, Ret, NewSideEffects} ->
set_tx_state(NewState, SideEffects ++ NewSideEffects),
?raise_exception_if_any(Ret);
{NewState, Ret} ->
set_tx_state(NewState, SideEffects),
?raise_exception_if_any(Ret)
end.
{NewState, Ret, SideEffects1} = Fun(State, SideEffects),
set_tx_state(NewState, SideEffects1),
?raise_exception_if_any(Ret).

-spec get_tx_state() -> {State, SideEffects} when
State :: khepri_machine:state(),
Expand Down

0 comments on commit ea0ea26

Please sign in to comment.