From ea0ea2695e13d2e352a7ff92c1f8d31b029363d0 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Fri, 23 Aug 2024 23:25:39 -0400 Subject: [PATCH] khepri_tx_adv: Prefer list prepending to build side effect lists In a transaction we build the set of `ra_machine:effect()`s for the machine to handle by using `++/2` with the old and new effects for each `khepri_tx`/`khepri_tx_adv` command. This is a noticeable bottleneck though when there are many (thousands or more) commands used in a transaction. `++/2` works by prepending every element in the left-hand side list (reversed) to the right-hand side list. As a transaction calls more `khepri_tx`/ `khepri_tx_adv` commands, the left-hand side list (old side effects) gets much longer than the right-hand side list (new side effects) and time it takes to append increases. We can avoid `++/2` by building the side effect list gradually and prepend every effect. The ordering of effects within a command isn't consequential: the effects update triggers and projections and each update is disjoint from the others. The ordering of the full side effect list is important though - a delete's effects should be handled before a put for the same path pattern for example - so we need to reverse the side effect list for the whole transaction before handling the effects. --- src/khepri_machine.erl | 77 ++++++++++++++++++++++-------------------- src/khepri_tx_adv.erl | 25 +++++++------- 2 files changed, 52 insertions(+), 50 deletions(-) diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index b181a55f..41d9c27a 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -89,8 +89,8 @@ split_query_options/1, split_command_options/1, split_put_options/1, - insert_or_update_node/5, - delete_matching_nodes/3, + insert_or_update_node/6, + delete_matching_nodes/4, handle_tx_exception/1, process_query/3, process_command/3]). @@ -1295,13 +1295,13 @@ apply( State) -> {TreeOptions, PutOptions} = split_put_options(TreeAndPutOptions), Ret = insert_or_update_node( - State, PathPattern, Payload, PutOptions, TreeOptions), + State, PathPattern, Payload, PutOptions, TreeOptions, []), post_apply(Ret, Meta); apply( Meta, #delete{path = PathPattern, options = TreeOptions}, State) -> - Ret = delete_matching_nodes(State, PathPattern, TreeOptions), + Ret = delete_matching_nodes(State, PathPattern, TreeOptions, []), post_apply(Ret, Meta); apply( Meta, @@ -1709,54 +1709,58 @@ failed_to_locate_sproc(Reason) -> khepri_tx:abort(Reason). -spec insert_or_update_node( - State, PathPattern, Payload, PutOptions, TreeOptions) -> Ret when + State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) -> Ret when State :: state(), PathPattern :: khepri_path:native_pattern(), Payload :: khepri_payload:payload(), PutOptions :: khepri:put_options(), TreeOptions :: khepri:tree_options(), - Ret :: {State, Result} | {State, Result, ra_machine:effects()}, + SideEffects :: ra_machine:effects(), + Ret :: {State, Result, ra_machine:effects()}, Result :: khepri_machine:common_ret(). %% @private -insert_or_update_node(State, PathPattern, Payload, PutOptions, TreeOptions) -> +insert_or_update_node(State, PathPattern, Payload, PutOptions, TreeOptions, SideEffects) -> Tree = get_tree(State), Ret1 = khepri_tree:insert_or_update_node( Tree, PathPattern, Payload, PutOptions, TreeOptions), case Ret1 of {ok, Tree1, AppliedChanges, Ret2} -> State1 = set_tree(State, Tree1), - {State2, SideEffects} = create_tree_change_side_effects( - State, State1, Ret2, AppliedChanges), - {State2, {ok, Ret2}, SideEffects}; + {State2, SideEffects1} = add_tree_change_side_effects( + State, State1, Ret2, AppliedChanges, SideEffects), + {State2, {ok, Ret2}, SideEffects1}; Error -> - {State, Error} + {State, Error, SideEffects} end. --spec delete_matching_nodes(State, PathPattern, TreeOptions) -> Ret when +-spec delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) -> + Ret when State :: state(), PathPattern :: khepri_path:native_pattern(), TreeOptions :: khepri:tree_options(), - Ret :: {State, Result} | {State, Result, ra_machine:effects()}, + SideEffects :: ra_machine:effects(), + Ret :: {State, Result, ra_machine:effects()}, Result :: khepri_machine:common_ret(). %% @private -delete_matching_nodes(State, PathPattern, TreeOptions) -> +delete_matching_nodes(State, PathPattern, TreeOptions, SideEffects) -> Tree = get_tree(State), Ret = khepri_tree:delete_matching_nodes( Tree, PathPattern, #{}, TreeOptions), case Ret of {ok, Tree1, AppliedChanges, Ret2} -> State1 = set_tree(State, Tree1), - {State2, SideEffects} = create_tree_change_side_effects( - State, State1, Ret2, AppliedChanges), - {State2, {ok, Ret2}, SideEffects}; + {State2, SideEffects1} = add_tree_change_side_effects( + State, State1, Ret2, AppliedChanges, + SideEffects), + {State2, {ok, Ret2}, SideEffects1}; Error -> - {State, Error} + {State, Error, SideEffects} end. -create_tree_change_side_effects( - InitialState, NewState, Ret, KeepWhileAftermath) -> +add_tree_change_side_effects( + InitialState, NewState, Ret, KeepWhileAftermath, SideEffects) -> %% We make a map where for each affected tree node, we indicate the type %% of change. Changes0 = maps:merge(Ret, KeepWhileAftermath), @@ -1766,24 +1770,23 @@ create_tree_change_side_effects( (_, #{} = _NodeProps) -> update; (_, delete) -> delete end, Changes0), - ProjectionEffects = create_projection_side_effects( - InitialState, NewState, Changes), - {NewState1, TriggerEffects} = create_trigger_side_effects( - InitialState, NewState, Changes), - {NewState1, ProjectionEffects ++ TriggerEffects}. + SideEffects1 = add_projection_side_effects( + InitialState, NewState, Changes, SideEffects), + add_trigger_side_effects( + InitialState, NewState, Changes, SideEffects1). -create_projection_side_effects(InitialState, NewState, Changes) -> +add_projection_side_effects(InitialState, NewState, Changes, SideEffects) -> InitialTree = get_tree(InitialState), NewTree = get_tree(NewState), ProjectionTree0 = get_projections(NewState), ProjectionTree = get_compiled_projection_tree(ProjectionTree0), maps:fold( fun(Path, Change, Effects) -> - create_projection_side_effects1( + add_projection_side_effects1( InitialTree, NewTree, ProjectionTree, Path, Change, Effects) - end, [], Changes). + end, SideEffects, Changes). -create_projection_side_effects1( +add_projection_side_effects1( InitialTree, NewTree, ProjectionTree, Path, delete = Change, Effects) -> %% Deletion changes recursively delete the subtree below the deleted tree %% node. Find any children in the tree that were also deleted by this @@ -1793,7 +1796,7 @@ create_projection_side_effects1( ChildrenPattern = Path ++ [?KHEPRI_WILDCARD_STAR_STAR], EffectsForChildrenFun = fun(ChildPath, _NodeProps, EffectAcc) -> - create_projection_side_effects2( + add_projection_side_effects2( InitialTree, NewTree, ProjectionTree, ChildPath, Change, EffectAcc) end, @@ -1802,14 +1805,14 @@ create_projection_side_effects1( EffectsForChildrenFun, Effects, ChildrenFindOptions), %% Also trigger a change for the deleted path itself. - create_projection_side_effects2( + add_projection_side_effects2( InitialTree, NewTree, ProjectionTree, Path, Change, Effects1); -create_projection_side_effects1( +add_projection_side_effects1( InitialTree, NewTree, ProjectionTree, Path, Change, Effects) -> - create_projection_side_effects2( + add_projection_side_effects2( InitialTree, NewTree, ProjectionTree, Path, Change, Effects). -create_projection_side_effects2( +add_projection_side_effects2( InitialTree, NewTree, ProjectionTree, Path, Change, Effects) -> PatternMatchingTree = case Change of create -> @@ -1869,7 +1872,7 @@ evaluate_projection( Effect = {aux, Trigger}, [Effect | Effects]. -create_trigger_side_effects(InitialState, NewState, Changes) -> +add_trigger_side_effects(InitialState, NewState, Changes, SideEffects) -> %% We want to consider the new state (with the updated tree), but we want %% to use triggers from the initial state, in case they were updated too. %% In other words, we want to evaluate triggers in the state they were at @@ -1877,7 +1880,7 @@ create_trigger_side_effects(InitialState, NewState, Changes) -> Triggers = get_triggers(InitialState), case Triggers =:= #{} of true -> - {NewState, []}; + {NewState, SideEffects}; false -> EmittedTriggers = get_emitted_triggers(InitialState), #config{store_id = StoreId} = get_config(NewState), @@ -1903,7 +1906,7 @@ create_trigger_side_effects(InitialState, NewState, Changes) -> khepri_event_handler, handle_triggered_sprocs, [StoreId, TriggeredStoredProcs]}, - {NewState1, [SideEffect]} + {NewState1, [SideEffect | SideEffects]} end. list_triggered_sprocs(Tree, Changes, Triggers) -> diff --git a/src/khepri_tx_adv.erl b/src/khepri_tx_adv.erl index ea8e2b79..0f3fc9dd 100644 --- a/src/khepri_tx_adv.erl +++ b/src/khepri_tx_adv.erl @@ -217,9 +217,10 @@ put_many(PathPattern, Data, Options) -> {TreeOptions, PutOptions} = khepri_machine:split_put_options(TreeAndPutOptions), %% TODO: Ensure `CommandOptions' is unset. - Fun = fun(State) -> + Fun = fun(State, SideEffects) -> khepri_machine:insert_or_update_node( - State, PathPattern1, Payload1, PutOptions, TreeOptions) + State, PathPattern1, Payload1, PutOptions, TreeOptions, + SideEffects) end, handle_state_for_call(Fun). @@ -421,9 +422,9 @@ delete_many(PathPattern, Options) -> khepri_machine:split_command_options(Options), %% TODO: Ensure `CommandOptions' is empty and `TreeOptions' doesn't %% contains put options. - Fun = fun(State) -> + Fun = fun(State, SideEffects) -> khepri_machine:delete_matching_nodes( - State, PathPattern1, TreeOptions) + State, PathPattern1, TreeOptions, SideEffects) end, handle_state_for_call(Fun). @@ -985,7 +986,10 @@ run(State, StandaloneFun, Args, AllowUpdates) NewTxProps = erlang:erase(?TX_PROPS), khepri_machine:ensure_is_state(NewState), ?assertEqual(TxProps, NewTxProps), - {NewState, Ret, NewSideEffects} + %% The side effect list is built using prepends so the list needs to + %% be reversed to process the effects in order. + NewSideEffects1 = lists:reverse(NewSideEffects), + {NewState, Ret, NewSideEffects1} catch Class:Reason:Stacktrace -> _ = erlang:erase(?TX_STATE_KEY), @@ -996,14 +1000,9 @@ run(State, StandaloneFun, Args, AllowUpdates) handle_state_for_call(Fun) -> {State, SideEffects} = get_tx_state(), - case Fun(State) of - {NewState, Ret, NewSideEffects} -> - set_tx_state(NewState, SideEffects ++ NewSideEffects), - ?raise_exception_if_any(Ret); - {NewState, Ret} -> - set_tx_state(NewState, SideEffects), - ?raise_exception_if_any(Ret) - end. + {NewState, Ret, SideEffects1} = Fun(State, SideEffects), + set_tx_state(NewState, SideEffects1), + ?raise_exception_if_any(Ret). -spec get_tx_state() -> {State, SideEffects} when State :: khepri_machine:state(),