diff --git a/src/khepri_machine.erl b/src/khepri_machine.erl index 42cde609..25353106 100644 --- a/src/khepri_machine.erl +++ b/src/khepri_machine.erl @@ -78,7 +78,7 @@ handle_aux/6, apply/3, state_enter/2, - snapshot_installed/2, + snapshot_installed/4, overview/1, version/0, which_module/1]). @@ -1253,6 +1253,12 @@ handle_aux( Projection <- Projections] end), {no_reply, AuxState, LogState}; +handle_aux( + _RaState, cast, {update_projections, OldState}, AuxState, LogState, + NewState) -> + ok = update_projections(OldState, NewState), + ok = clear_compiled_projection_tree(), + {no_reply, AuxState, LogState}; handle_aux( _RaState, cast, #restore_projection{projection = Projection, pattern = PathPattern}, @@ -1565,8 +1571,20 @@ state_enter(_StateName, _State) -> %% @private -snapshot_installed(_Meta, _State) -> - SideEffect = {aux, restore_projections}, +snapshot_installed( + #{machine_version := NewMacVer}, OldMacVer, OldState, _NewState) -> + %% A snapshot might be installed on a follower member who has fallen + %% sufficiently far behind in replication of the log from the leader. When + %% a member installs a snapshot it needs to update its projections: new + %% projections may have been registered since the snapshot or old ones + %% unregistered. Projections which did not change need to be triggered + %% with the new changes to state, similar to the `restore_projections' aux + %% effect. Also see `update_projections/2'. + %% + %% Note that the old state could be an older state machine version which + %% should be converted to the current version. + OldState1 = convert_state(OldState, OldMacVer, NewMacVer), + SideEffect = {aux, {update_projections, OldState1}}, [SideEffect]. %% @private @@ -2322,3 +2340,129 @@ convert_state(State, 0, 1) -> State1 = list_to_tuple(Fields1), ?assert(is_state(State1)), State1. + +-spec update_projections(OldState, NewState) -> ok when + OldState :: khepri_machine:state(), + NewState :: khepri_machine:state(). +%% @doc Updates the machine's projections to account for changes between two +%% states. +%% +%% This is used when installing a projection - the state will jump from the +%% given `OldState' before the snapshot was installed to the given `NewState' +%% after. When we swap states we need to update the projections: the records +%% in the projection tables themselves but also which projection tables exist. +%% The changes glossed over by the snapshot may include projection +%% registrations and unregistrations so we need to initialize new projections +%% and delete unregistered ones, and we need to ensure that the projection +%% tables are up to date for any projections which didn't change. +%% +%% @private + +update_projections(OldState, NewState) -> + OldTree = get_tree(OldState), + OldProjections = set_of_projections(get_projections(OldState)), + NewTree = get_tree(NewState), + NewProjections = set_of_projections(get_projections(NewState)), + + CommonProjections = sets:intersection(OldProjections, NewProjections), + DeletedProjections = sets:subtract(OldProjections, CommonProjections), + CreatedProjections = sets:subtract(NewProjections, CommonProjections), + + %% Tear down any projections which were unregistered. + sets:fold( + fun({_Pattern, Projection}, _Acc) -> + _ = khepri_projection:delete(Projection), + ok + end, ok, DeletedProjections), + + %% Initialize any new projections which were registered. + sets:fold( + fun({Pattern, Projection}, _Acc) -> + ok = restore_projection(Projection, NewTree, Pattern) + end, ok, CreatedProjections), + + %% Update in-place any projections which were not changed themselves (i.e. + %% the projection name, function and pattern) between old and new states. + %% To do this we will find the matching nodes in the old and new tree for + %% the projection's pattern and trigger the projection based on each + %% matching path's old and new properties. + sets:fold( + fun({Pattern, Projection}, _Acc) -> + ok = update_projection(Pattern, Projection, OldTree, NewTree) + end, ok, CommonProjections), + + ok. + +-spec set_of_projections(ProjectionTree) -> Projections when + ProjectionTree :: khepri_machine:projection_tree(), + Element :: {khepri_path:native_pattern(), + khepri_projection:projection()}, + Projections :: sets:set(Element). +%% Folds the set of projections in a projection tree into a version 2 {@link +%% sets:set()}. +%% +%% @private + +set_of_projections(ProjectionTree) -> + khepri_pattern_tree:fold( + ProjectionTree, + fun(Pattern, Projections, Acc) -> + lists:foldl( + fun(Projection, Acc1) -> + Entry = {Pattern, Projection}, + sets:add_element(Entry, Acc1) + end, Acc, Projections) + end, sets:new([{version, 2}])). + +update_projection(Pattern, Projection, OldTree, NewTree) -> + TreeOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN, + include_root_props => true}, + case khepri_tree:find_matching_nodes(OldTree, Pattern, TreeOptions) of + {ok, OldMatchingNodes} -> + Result = khepri_tree:find_matching_nodes( + NewTree, Pattern, TreeOptions), + case Result of + {ok, NewMatchingNodes} -> + Updates = diff_matching_nodes( + OldMatchingNodes, NewMatchingNodes), + maps:foreach( + fun(Path, {OldProps, NewProps}) -> + khepri_projection:trigger( + Projection, Path, OldProps, NewProps) + end, Updates); + Error -> + ?LOG_DEBUG( + "Failed to refresh projection ~s due to an error " + "finding matching nodes in the new tree: ~p", + [khepri_projection:name(Projection), Error], + #{domain => [khepri, ra_machine]}) + end; + Error -> + ?LOG_DEBUG( + "Failed to refresh projection ~s due to an error finding " + "matching nodes in the old tree: ~p", + [khepri_projection:name(Projection), Error], + #{domain => [khepri, ra_machine]}) + end. + +-spec diff_matching_nodes(OldNodeProps, NewNodeProps) -> Changes when + OldNodeProps :: khepri_adv:node_props_map(), + NewNodeProps :: khepri_adv:node_props_map(), + OldProps :: khepri:node_props(), + NewProps :: khepri:node_props(), + Changes :: #{khepri_path:native_path() => {OldProps, NewProps}}. +%% @private + +diff_matching_nodes(OldNodeProps, NewNodeProps) -> + CommonProps = maps:intersect_with( + fun(_Path, OldProps, NewProps) -> {OldProps, NewProps} end, + OldNodeProps, NewNodeProps), + CommonPaths = maps:keys(CommonProps), + AllProps = maps:fold( + fun(Path, OldProps, Acc) -> + Acc#{Path => {OldProps, #{}}} + end, CommonProps, maps:without(CommonPaths, OldNodeProps)), + maps:fold( + fun(Path, NewProps, Acc) -> + Acc#{Path => {#{}, NewProps}} + end, AllProps, maps:without(CommonPaths, NewNodeProps)). diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index a6913f17..0ccc6974 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -1876,16 +1876,17 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> %% When this happens the member doesn't see the changes as regular %% commands (i.e. handled in `ra_machine:apply/3'). Instead the machine %% state is replaced entirely. So when a snapshot is installed we must - %% restore projections the same way we do as when we restart a member. - %% In `khepri_machine' this is done in the `snapshot_installed/2` callback - %% implementation. + %% restore projections. The machine is alive at this point though so we + %% can't restore projections the same way as when we restart a member + %% though. We need to diff the old and new state first to handle any newly + %% registered or unregistered projections and then to update any existing + %% projections with updated or deleted records. In `khepri_machine' this is + %% done in the `snapshot_installed/3' callback implementation. %% %% To test this we stop a member, apply enough commands to cause the leader %% to take a snapshot, and then restart the member and assert that the %% projection contents are as expected. - ProjectionName = ?MODULE, - %% We call `khepri_projection:new/2' on the local node and thus need %% Khepri. ?assertMatch({ok, _}, application:ensure_all_started(khepri)), @@ -1896,9 +1897,9 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> #{ra_system := RaSystem} = maps:get(Node1, PropsPerNode), StoreId = RaSystem, %% Set the snapshot interval low so that we can trigger a snapshot by - %% sending 4 commands. + %% sending a few commands. RaServerConfig = #{cluster_name => StoreId, - machine_config => #{snapshot_interval => 4}}, + machine_config => #{snapshot_interval => 20}}, ct:pal("Start database + cluster nodes"), lists:foreach( @@ -1916,14 +1917,30 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> rpc:call(Node, khepri_cluster, join, [StoreId, Node3])) end, [Node1, Node2]), - ct:pal("Register projection on node ~s", [Node1]), - Projection = khepri_projection:new( - ProjectionName, - fun(Path, Payload) -> {Path, Payload} end), + ProjectionName1 = projection_1, + Projection1 = khepri_projection:new( + ProjectionName1, + fun(Path, Payload) -> {Path, Payload} end), + ProjectionName2 = projection_2, + Projection2 = khepri_projection:new( + ProjectionName2, + fun(Path, Payload) -> {Path, Payload} end), + ProjectionName3 = projection_3, + Projection3 = khepri_projection:new( + ProjectionName3, + fun(Path, Payload) -> {Path, Payload} end), + + ct:pal("Register projection ~ts on node ~s", [ProjectionName1, Node1]), rpc:call(Node1, khepri, register_projection, - [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection]), - ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName), + [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection1]), + ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName1), + + ct:pal("Register projection ~ts on node ~s", [ProjectionName2, Node1]), + rpc:call(Node1, + khepri, register_projection, + [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection2]), + ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName2), ?assertEqual( ok, @@ -1931,29 +1948,48 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> #{reply_from => local}])), ?assertEqual( value1v1, - rpc:call(Node3, ets, lookup_element, [ProjectionName, [key1], 2])), + rpc:call(Node3, ets, lookup_element, [ProjectionName1, [key1], 2])), + %% This key will be deleted. + ?assertEqual( + ok, + rpc:call(Node3, khepri, put, [StoreId, [key2], value2v1, + #{reply_from => local}])), + %% So far there isn't a snapshot. + ?assertMatch( + {ok, #{log := #{snapshot_index := undefined}}, _}, + ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3))), ct:pal( "Stop cluster member ~s (quorum is maintained)", [Node1]), ok = rpc:call(Node1, khepri, stop, [StoreId]), - ?assertMatch( - {ok, #{log := #{snapshot_index := undefined}}, _}, - ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3))), - - ct:pal("Submit enough commands to trigger a snapshot"), + ct:pal("Modify paths which are watched by projections"), ct:pal("- set key1:value1v2"), ok = rpc:call(Node3, khepri, put, [StoreId, [key1], value1v2]), - ct:pal("- set key2:value2v1"), - ok = rpc:call(Node3, khepri, put, [StoreId, [key2], value2v1]), + ct:pal("- delete key2"), + ok = rpc:call(Node3, khepri, delete, [StoreId, [key2]]), + ok = rpc:call(Node3, khepri, put, [StoreId, [key1], value1v2]), ct:pal("- set key3:value3v1"), ok = rpc:call(Node3, khepri, put, [StoreId, [key3], value3v1]), ct:pal("- set key4:value4v1"), ok = rpc:call(Node3, khepri, put, [StoreId, [key4], value4v1]), - {ok, #{log := #{snapshot_index := SnapshotIndex}}, _} = - ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3)), - ?assert(is_number(SnapshotIndex) andalso SnapshotIndex > 4), + ct:pal("Register projection ~ts on node ~s", [ProjectionName3, Node3]), + rpc:call(Node3, + khepri, register_projection, + [StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection3]), + ct:pal("Unregister projection ~ts on node ~s", [ProjectionName2, Node3]), + rpc:call(Node3, + khepri, unregister_projections, [StoreId, [ProjectionName2]]), + + ct:pal("Send many commands to ensure a snapshot is triggered"), + [ok = rpc:call(Node3, khepri, put, [StoreId, [key5], value5v1]) + || _ <- lists:seq(1, 20)], + + {ok, #{log := #{snapshot_index := SnapshotIndex}}, _} = + ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3)), + ct:pal("New snapshot index: ~p", [SnapshotIndex]), + ?assert(is_number(SnapshotIndex) andalso SnapshotIndex > 20), ct:pal("Restart cluster member ~s", [Node1]), {ok, StoreId} = rpc:call(Node1, khepri, start, [RaSystem, RaServerConfig]), @@ -1967,20 +2003,37 @@ projections_are_updated_when_a_snapshot_is_installed(Config) -> [key5], value5v1, #{reply_from => local}), ?assertEqual( value5v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key5], 2])), - + rpc:call(Node1, ets, lookup_element, [ProjectionName1, [key5], 2])), + + ct:pal("Contents of projection table '~ts'", [ProjectionName1]), + [begin + Contents = rpc:call(Node, ets, tab2list, [ProjectionName1]), + ct:pal("- node ~ts:~n~p", [Node, Contents]) + end || Node <- Nodes], + + [begin + ?assertEqual( + value1v2, + rpc:call(Node, ets, lookup_element, [ProjectionName1, [key1], 2])), + ?assertEqual( + false, + rpc:call(Node, ets, member, [ProjectionName1, [key2]])), + ?assertEqual( + value3v1, + rpc:call(Node, ets, lookup_element, [ProjectionName1, [key3], 2])), + ?assertEqual( + value4v1, + rpc:call(Node, ets, lookup_element, [ProjectionName1, [key4], 2])) + end || Node <- Nodes], + + %% Ensure that the projections themselves are also updated on Node1. + %% ProjectionName2 was unregistered and ProjectionName3 was registered. + ?assertEqual( + undefined, + rpc:call(Node1, ets, info, [ProjectionName2])), ?assertEqual( value1v2, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key1], 2])), - ?assertEqual( - value2v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key2], 2])), - ?assertEqual( - value3v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key3], 2])), - ?assertEqual( - value4v1, - rpc:call(Node1, ets, lookup_element, [ProjectionName, [key4], 2])), + rpc:call(Node1, ets, lookup_element, [ProjectionName3, [key1], 2])), ok.