Skip to content

Commit

Permalink
khepri_machine: "Update" projections when a snapshot is installed
Browse files Browse the repository at this point in the history
When a snapshot is installed the machine state is "swapped" from the
state before the snapshot to the one contained in the snapshot. This
swap jumps over potentially many commands in the log which we would
have used to trigger projections. When we install a snapshot we need
to update the projection state but it's not as straightforward as when
"restoring" projections (on machine recovery for example) because the
projection tables will already exist and potentially contain records.

When we detect that a snapshot has been installed we need to diff the
old and new states to find which projections have changed themselves
and also update their associated projection tables.
  • Loading branch information
the-mikedavis committed Sep 4, 2024
1 parent 8b6c63e commit 09f2b9f
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 40 deletions.
148 changes: 144 additions & 4 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
handle_aux/6,
apply/3,
state_enter/2,
snapshot_installed/2,
snapshot_installed/4,
overview/1,
version/0,
which_module/1]).
Expand Down Expand Up @@ -1565,9 +1565,23 @@ state_enter(_StateName, _State) ->

%% @private

snapshot_installed(_Meta, _State) ->
SideEffect = {aux, restore_projections},
[SideEffect].
snapshot_installed(
#{machine_version := NewMacVer}, NewState,
#{machine_version := OldMacVer}, OldState) ->
%% A snapshot might be installed on a follower member who has fallen
%% sufficiently far behind in replication of the log from the leader. When
%% a member installs a snapshot it needs to update its projections: new
%% projections may have been registered since the snapshot or old ones
%% unregistered. Projections which did not change need to be triggered
%% with the new changes to state, similar to the `restore_projections' aux
%% effect. Also see `update_projections/2'.
%%
%% Note that the snapshot installation might bump the effective machine
%% version so we need to convert the old state to the new machine version.
OldState1 = convert_state(OldState, OldMacVer, NewMacVer),
ok = update_projections(OldState1, NewState),
ok = clear_compiled_projection_tree(),
[].

%% @private

Expand Down Expand Up @@ -2331,3 +2345,129 @@ convert_state(State, 0, 1) ->
State1 = list_to_tuple(Fields1),
?assert(is_state(State1)),
State1.

-spec update_projections(OldState, NewState) -> ok when
OldState :: khepri_machine:state(),
NewState :: khepri_machine:state().
%% @doc Updates the machine's projections to account for changes between two
%% states.
%%
%% This is used when installing a projection - the state will jump from the
%% given `OldState' before the snapshot was installed to the given `NewState'
%% after. When we swap states we need to update the projections: the records
%% in the projection tables themselves but also which projection tables exist.
%% The changes glossed over by the snapshot may include projection
%% registrations and unregistrations so we need to initialize new projections
%% and delete unregistered ones, and we need to ensure that the projection
%% tables are up to date for any projections which didn't change.
%%
%% @private

update_projections(OldState, NewState) ->
OldTree = get_tree(OldState),
OldProjections = set_of_projections(get_projections(OldState)),
NewTree = get_tree(NewState),
NewProjections = set_of_projections(get_projections(NewState)),

CommonProjections = sets:intersection(OldProjections, NewProjections),
DeletedProjections = sets:subtract(OldProjections, CommonProjections),
CreatedProjections = sets:subtract(NewProjections, CommonProjections),

%% Tear down any projections which were unregistered.
sets:fold(
fun({_Pattern, Projection}, _Acc) ->
_ = khepri_projection:delete(Projection),
ok
end, ok, DeletedProjections),

%% Initialize any new projections which were registered.
sets:fold(
fun({Pattern, Projection}, _Acc) ->
ok = restore_projection(Projection, NewTree, Pattern)
end, ok, CreatedProjections),

%% Update in-place any projections which were not changed themselves (i.e.
%% the projection name, function and pattern) between old and new states.
%% To do this we will find the matching nodes in the old and new tree for
%% the projection's pattern and trigger the projection based on each
%% matching path's old and new properties.
sets:fold(
fun({Pattern, Projection}, _Acc) ->
ok = update_projection(Pattern, Projection, OldTree, NewTree)
end, ok, CommonProjections),

ok.

-spec set_of_projections(ProjectionTree) -> Projections when
ProjectionTree :: khepri_machine:projection_tree(),
Element :: {khepri_path:native_pattern(),
khepri_projection:projection()},
Projections :: sets:set(Element).
%% Folds the set of projections in a projection tree into a version 2 {@link
%% sets:set()}.
%%
%% @private

set_of_projections(ProjectionTree) ->
khepri_pattern_tree:fold(
ProjectionTree,
fun(Pattern, Projections, Acc) ->
lists:foldl(
fun(Projection, Acc1) ->
Entry = {Pattern, Projection},
sets:add_element(Entry, Acc1)
end, Acc, Projections)
end, sets:new([{version, 2}])).

update_projection(Pattern, Projection, OldTree, NewTree) ->
TreeOptions = #{props_to_return => ?PROJECTION_PROPS_TO_RETURN,
include_root_props => true},
case khepri_tree:find_matching_nodes(OldTree, Pattern, TreeOptions) of
{ok, OldMatchingNodes} ->
Result = khepri_tree:find_matching_nodes(
NewTree, Pattern, TreeOptions),
case Result of
{ok, NewMatchingNodes} ->
Updates = diff_matching_nodes(
OldMatchingNodes, NewMatchingNodes),
maps:foreach(
fun(Path, {OldProps, NewProps}) ->
khepri_projection:trigger(
Projection, Path, OldProps, NewProps)
end, Updates);
Error ->
?LOG_DEBUG(
"Failed to refresh projection ~s due to an error "
"finding matching nodes in the new tree: ~p",
[khepri_projection:name(Projection), Error],
#{domain => [khepri, ra_machine]})
end;
Error ->
?LOG_DEBUG(
"Failed to refresh projection ~s due to an error finding "
"matching nodes in the old tree: ~p",
[khepri_projection:name(Projection), Error],
#{domain => [khepri, ra_machine]})
end.

-spec diff_matching_nodes(OldNodeProps, NewNodeProps) -> Changes when
OldNodeProps :: khepri_adv:node_props_map(),
NewNodeProps :: khepri_adv:node_props_map(),
OldProps :: khepri:node_props(),
NewProps :: khepri:node_props(),
Changes :: #{khepri_path:native_path() => {OldProps, NewProps}}.
%% @private

diff_matching_nodes(OldNodeProps, NewNodeProps) ->
CommonProps = maps:intersect_with(
fun(_Path, OldProps, NewProps) -> {OldProps, NewProps} end,
OldNodeProps, NewNodeProps),
CommonPaths = maps:keys(CommonProps),
AllProps = maps:fold(
fun(Path, OldProps, Acc) ->
Acc#{Path => {OldProps, #{}}}
end, CommonProps, maps:without(CommonPaths, OldNodeProps)),
maps:fold(
fun(Path, NewProps, Acc) ->
Acc#{Path => {#{}, NewProps}}
end, AllProps, maps:without(CommonPaths, NewNodeProps)).
124 changes: 88 additions & 36 deletions test/cluster_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2021,16 +2021,17 @@ projections_are_updated_when_a_snapshot_is_installed(Config) ->
%% When this happens the member doesn't see the changes as regular
%% commands (i.e. handled in `ra_machine:apply/3'). Instead the machine
%% state is replaced entirely. So when a snapshot is installed we must
%% restore projections the same way we do as when we restart a member.
%% In `khepri_machine' this is done in the `snapshot_installed/2` callback
%% implementation.
%% restore projections. The machine is alive at this point though so we
%% can't restore projections the same way as when we restart a member
%% though. We need to diff the old and new state first to handle any newly
%% registered or unregistered projections and then to update any existing
%% projections with updated or deleted records. In `khepri_machine' this is
%% done in the `snapshot_installed/3' callback implementation.
%%
%% To test this we stop a member, apply enough commands to cause the leader
%% to take a snapshot, and then restart the member and assert that the
%% projection contents are as expected.

ProjectionName = ?MODULE,

%% We call `khepri_projection:new/2' on the local node and thus need
%% Khepri.
?assertMatch({ok, _}, application:ensure_all_started(khepri)),
Expand All @@ -2041,9 +2042,9 @@ projections_are_updated_when_a_snapshot_is_installed(Config) ->
#{ra_system := RaSystem} = maps:get(Node1, PropsPerNode),
StoreId = RaSystem,
%% Set the snapshot interval low so that we can trigger a snapshot by
%% sending 4 commands.
%% sending a few commands.
RaServerConfig = #{cluster_name => StoreId,
machine_config => #{snapshot_interval => 4}},
machine_config => #{snapshot_interval => 20}},

ct:pal("Start database + cluster nodes"),
lists:foreach(
Expand All @@ -2061,44 +2062,78 @@ projections_are_updated_when_a_snapshot_is_installed(Config) ->
rpc:call(Node, khepri_cluster, join, [StoreId, Node3]))
end, [Node1, Node2]),

ct:pal("Register projection on node ~s", [Node1]),
Projection = khepri_projection:new(
ProjectionName,
fun(Path, Payload) -> {Path, Payload} end),
ProjectionName1 = projection_1,
Projection1 = khepri_projection:new(
ProjectionName1,
fun(Path, Payload) -> {Path, Payload} end),
ProjectionName2 = projection_2,
Projection2 = khepri_projection:new(
ProjectionName2,
fun(Path, Payload) -> {Path, Payload} end),
ProjectionName3 = projection_3,
Projection3 = khepri_projection:new(
ProjectionName3,
fun(Path, Payload) -> {Path, Payload} end),

ct:pal("Register projection ~ts on node ~s", [ProjectionName1, Node1]),
rpc:call(Node1,
khepri, register_projection,
[StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection]),
ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName),
[StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection1]),
ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName1),

ct:pal("Register projection ~ts on node ~s", [ProjectionName2, Node1]),
rpc:call(Node1,
khepri, register_projection,
[StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection2]),
ok = wait_for_projection_on_nodes([Node2, Node3], ProjectionName2),

?assertEqual(
ok,
rpc:call(Node3, khepri, put, [StoreId, [key1], value1v1,
#{reply_from => local}])),
?assertEqual(
value1v1,
rpc:call(Node3, ets, lookup_element, [ProjectionName, [key1], 2])),
rpc:call(Node3, ets, lookup_element, [ProjectionName1, [key1], 2])),
%% This key will be deleted.
?assertEqual(
ok,
rpc:call(Node3, khepri, put, [StoreId, [key2], value2v1,
#{reply_from => local}])),
%% So far there isn't a snapshot.
?assertMatch(
{ok, #{log := #{snapshot_index := undefined}}, _},
ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3))),

ct:pal(
"Stop cluster member ~s (quorum is maintained)", [Node1]),
ok = rpc:call(Node1, khepri, stop, [StoreId]),

?assertMatch(
{ok, #{log := #{snapshot_index := undefined}}, _},
ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3))),

ct:pal("Submit enough commands to trigger a snapshot"),
ct:pal("Modify paths which are watched by projections"),
ct:pal("- set key1:value1v2"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key1], value1v2]),
ct:pal("- set key2:value2v1"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key2], value2v1]),
ct:pal("- delete key2"),
ok = rpc:call(Node3, khepri, delete, [StoreId, [key2]]),
ct:pal("- set key3:value3v1"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key3], value3v1]),
ct:pal("- set key4:value4v1"),
ok = rpc:call(Node3, khepri, put, [StoreId, [key4], value4v1]),

{ok, #{log := #{snapshot_index := SnapshotIndex}}, _} =
ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3)),
?assert(is_number(SnapshotIndex) andalso SnapshotIndex > 4),
ct:pal("Register projection ~ts on node ~s", [ProjectionName3, Node3]),
rpc:call(Node3,
khepri, register_projection,
[StoreId, [?KHEPRI_WILDCARD_STAR_STAR], Projection3]),
ct:pal("Unregister projection ~ts on node ~s", [ProjectionName2, Node3]),
rpc:call(Node3,
khepri, unregister_projections, [StoreId, [ProjectionName2]]),

ct:pal("Send many commands to ensure a snapshot is triggered"),
[ok = rpc:call(Node3, khepri, put, [StoreId, [key5], value5v1])
|| _ <- lists:seq(1, 20)],

{ok, #{log := #{snapshot_index := SnapshotIndex}}, _} =
ra:member_overview(khepri_cluster:node_to_member(StoreId, Node3)),
ct:pal("New snapshot index: ~p", [SnapshotIndex]),
?assert(is_number(SnapshotIndex) andalso SnapshotIndex > 20),

ct:pal("Restart cluster member ~s", [Node1]),
{ok, StoreId} = rpc:call(Node1, khepri, start, [RaSystem, RaServerConfig]),
Expand All @@ -2112,20 +2147,37 @@ projections_are_updated_when_a_snapshot_is_installed(Config) ->
[key5], value5v1, #{reply_from => local}),
?assertEqual(
value5v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key5], 2])),

rpc:call(Node1, ets, lookup_element, [ProjectionName1, [key5], 2])),

ct:pal("Contents of projection table '~ts'", [ProjectionName1]),
[begin
Contents = rpc:call(Node, ets, tab2list, [ProjectionName1]),
ct:pal("- node ~ts:~n~p", [Node, Contents])
end || Node <- Nodes],

[begin
?assertEqual(
value1v2,
rpc:call(Node, ets, lookup_element, [ProjectionName1, [key1], 2])),
?assertEqual(
false,
rpc:call(Node, ets, member, [ProjectionName1, [key2]])),
?assertEqual(
value3v1,
rpc:call(Node, ets, lookup_element, [ProjectionName1, [key3], 2])),
?assertEqual(
value4v1,
rpc:call(Node, ets, lookup_element, [ProjectionName1, [key4], 2]))
end || Node <- Nodes],

%% Ensure that the projections themselves are also updated on Node1.
%% ProjectionName2 was unregistered and ProjectionName3 was registered.
?assertEqual(
undefined,
rpc:call(Node1, ets, info, [ProjectionName2])),
?assertEqual(
value1v2,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key1], 2])),
?assertEqual(
value2v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key2], 2])),
?assertEqual(
value3v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key3], 2])),
?assertEqual(
value4v1,
rpc:call(Node1, ets, lookup_element, [ProjectionName, [key4], 2])),
rpc:call(Node1, ets, lookup_element, [ProjectionName3, [key1], 2])),

ok.

Expand Down

0 comments on commit 09f2b9f

Please sign in to comment.