diff --git a/src/ra.erl b/src/ra.erl index 1a00d4bbe..5d6669ebf 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -61,8 +61,10 @@ %% membership changes add_member/2, add_member/3, + pipeline_add_member/4, remove_member/2, remove_member/3, + pipeline_remove_member/4, leave_and_terminate/3, leave_and_terminate/4, leave_and_delete_server/3, @@ -613,6 +615,38 @@ add_member(ServerLoc, ServerId, Timeout) -> {'$ra_join', ServerId, after_log_append}, Timeout). +%% @doc Asynchronously add a ra server id to a ra cluster's membership +%% configuration. +%% +%% This is the same operation as {@link add_member/2} but the membership +%% change is pipelined in the same way as {@link pipeline_command/4}. +%% +%% @param ServerRef the ra server or servers to send the command to +%% @param ServerId the ra server id of the server to remove +%% @param Correlation a correlation identifier to be included to receive an +%% async notification after the command is applied to the state machine. If the +%% Correlation is set to `no_correlation' then no notifications will be sent. +%% @param Priority command priority. `low' priority commands will be held back +%% and appended to the Raft log in batches. NB: A `normal' priority command sent +%% from the same process can overtake a low priority command that was +%% sent before. There is no high priority. +%% Only use priority level of `low' with commands that +%% do not rely on total execution ordering. +%% @see add_member/2 +%% @end +-spec pipeline_add_member(ServerRef :: ra_server_id() | [ra_server_id()], + ServerId :: ra_server_id(), + Correlation :: ra_server:command_correlation() | + no_correlation, + Priority :: ra_server:command_priority()) -> ok. +pipeline_add_member(ServerRef, ServerId, no_correlation, Priority) -> + Cmd = {'$ra_join', ServerId, noreply}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd); +pipeline_add_member(ServerRef, ServerId, Correlation, Priority) -> + Cmd = {'$ra_join', ServerId, {notify, Correlation, self()}}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd). + + %% @doc Removes a server from the cluster's membership configuration. %% This function returns after appending a cluster membership change %% command to the log. @@ -647,6 +681,37 @@ remove_member(ServerRef, ServerId, Timeout) -> {'$ra_leave', ServerId, after_log_append}, Timeout). +%% @doc Asynchronously remove a ra server id from a ra cluster's membership +%% configuration. +%% +%% This is the same operation as {@link remove_member/2} but the membership +%% change is pipelined in the same way as {@link pipeline_command/4}. +%% +%% @param ServerRef the ra server or servers to send the command to +%% @param ServerId the ra server id of the server to remove +%% @param Correlation a correlation identifier to be included to receive an +%% async notification after the command is applied to the state machine. If the +%% Correlation is set to `no_correlation' then no notifications will be sent. +%% @param Priority command priority. `low' priority commands will be held back +%% and appended to the Raft log in batches. NB: A `normal' priority command sent +%% from the same process can overtake a low priority command that was +%% sent before. There is no high priority. +%% Only use priority level of `low' with commands that +%% do not rely on total execution ordering. +%% @see remove_member/2 +%% @end +-spec pipeline_remove_member(ServerRef :: ra_server_id() | [ra_server_id()], + ServerId :: ra_server_id(), + Correlation :: ra_server:command_correlation() | + no_correlation, + Priority :: ra_server:command_priority()) -> ok. +pipeline_remove_member(ServerRef, ServerId, no_correlation, Priority) -> + Cmd = {'$ra_leave', ServerId, noreply}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd); +pipeline_remove_member(ServerRef, ServerId, Correlation, Priority) -> + Cmd = {'$ra_leave', ServerId, {notify, Correlation, self()}}, + ra_server_proc:cast_command(ServerRef, Priority, Cmd). + %% @doc Makes the server enter a pre-vote state and attempt to become the leader. %% It is necessary to call this function when starting a new cluster as a %% brand new Ra server (node) will not automatically enter the pre-vote state. diff --git a/src/ra_server.erl b/src/ra_server.erl index 61cc16940..00daf16fd 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -3518,6 +3518,8 @@ append_error_reply(Cmd, Reason, Effects0) -> case Cmd of {_, #{from := From}, _, _} -> [{reply, From, {error, Reason}} | Effects0]; + {_, _, _, {notify, Corr, Pid}} -> + [{notify, #{Pid => [{Corr, {error, Reason}}]}} | Effects0]; _ -> Effects0 end. diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index d16812352..98f633572 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -60,6 +60,7 @@ all_tests() -> snapshot_installation, snapshot_installation_with_call_crash, add_member, + pipeline_membership_changes, queue_example, ramp_up_and_ramp_down, start_and_join_then_leave_and_terminate, @@ -877,6 +878,31 @@ add_member(Config) -> {ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end), terminate_cluster([C | Cluster]). +pipeline_membership_changes(Config) -> + Name = ?config(test_name, Config), + [A, B, C] = Cluster0 = start_local_cluster(3, Name, add_machine()), + {ok, _, Leader0} = ra:process_command(A, 9), + Corr1 = make_ref(), + ok = ra:pipeline_remove_member(Leader0, C, Corr1, normal), + [{Corr1, ok}] = gather_applied([], 0), + stop_server(C), + {ok, Members, Leader} = ra:members(A), + ?assertEqual(lists:sort(Members), lists:sort([A, B])), + %% Process a command to ensure that the cluster change command has + %% been committed - this prevents spurious failures of + %% `cluster_change_not_permitted` from the next leave command: + {ok, _, Leader} = ra:process_command(Leader, 4), + Corr2 = make_ref(), + ok = ra:pipeline_remove_member(Leader, C, Corr2, normal), + [{Corr2, {error, not_member}}] = gather_applied([], 0), + ok = ra:start_server(default, Name, C, add_machine(), [A, B]), + Corr3 = make_ref(), + ok = ra:pipeline_add_member(Leader, C, Corr3, normal), + [{Corr3, ok}] = gather_applied([], 0), + {ok, Members1, _Leader} = ra:members(Leader), + ?assertEqual(lists:sort(Members1), lists:sort(Cluster0)), + terminate_cluster(Cluster0). + server_catches_up(Config) -> N1 = nth_server_name(Config, 1), N2 = nth_server_name(Config, 2),