From 66cb34a132dfcaa76c3188578cfedf60a54d7789 Mon Sep 17 00:00:00 2001 From: Austin Reifsteck Date: Thu, 6 Mar 2025 15:26:47 -0500 Subject: [PATCH 1/5] feat: added interleave/4 which puts a transaction after every previously defined transaction --- lib/sage.ex | 61 ++++++++++++++++++ test/sage_test.exs | 64 +++++++++++++++++++ .../test_intermediate_transaction_handler.ex | 5 ++ 3 files changed, 130 insertions(+) create mode 100644 test/support/test_intermediate_transaction_handler.ex diff --git a/lib/sage.ex b/lib/sage.ex index 5b5dae2..7ae8696 100644 --- a/lib/sage.ex +++ b/lib/sage.ex @@ -163,6 +163,16 @@ defmodule Sage do defguardp is_transaction(value) when is_function(value, 2) or is_mfa(value) + @typedoc """ + Just like a transaction, although it receives extra argument of the name of the previous transaction + """ + @type intermediate_transaction :: + (effects_so_far :: effects(), attrs :: any(), previous_stage :: stage_name() -> + {:ok | :error | :abort, any()}) + | mfa() + + defguardp is_intermediate_transaction(value) when is_function(value, 3) or is_mfa(value) + @typedoc """ Tracer callback, can be a module that implements `Sage.Tracer` behaviour, an anonymous function, or an `{module, function, [arguments]}` tuple. @@ -437,6 +447,57 @@ defmodule Sage do def run_async(sage, name, transaction, compensation, opts \\ []), do: add_stage(sage, name, build_operation!(:run_async, transaction, compensation, opts)) + @doc """ + For a given Sage S with transactions :t1 -> :t2 -> :t3, a call to `interleave(S, :name, f)` + will yield a saga with transactions :t1 -> :name_1 -> :t2 -> :name_2 -> :t3 -> :name_3. + + This can be useful if you are trying to do a long computation and want to do something with + the intermediate results, such as logging or persistence. + + Note: + - This isn't strict interleaving because a transaction is still appended at the end. + - Calling this function twice with the same name will give a `Sage.DuplicateStageError`, but + calling the function twice with a different name will compound its effects. + - Calling this function before the end of your saga definition will mean any stages you add after the + call to `interleave` will not have the intermediate stages added after them + """ + @spec interleave( + sage :: t(), + name :: stage_name(), + intermediate_transaction :: intermediate_transaction(), + compensation :: compensation() + ) :: t() + def interleave(sage, name, intermediate_transaction, compensation \\ :noop) + when is_intermediate_transaction(intermediate_transaction) and is_compensation(compensation) do + sage.stages + |> Enum.reverse() + |> Enum.with_index(1) + |> Enum.flat_map(fn {stage, index} -> + name = String.to_atom("#{name}_#{index}") + {stage_name, _} = stage + + transaction = + case intermediate_transaction do + {m, f, a} -> + {m, f, [stage_name | a]} + + _ -> + &intermediate_transaction.(&1, &2, stage_name) + end + + [stage, {name, build_operation!(:run, transaction, compensation)}] + end) + |> Enum.reduce(new(), fn stage, sage -> + case stage do + {name, {:run, transaction, comp, _opts}} -> + run(sage, name, transaction, comp) + + {name, {:run_async, transaction, comp, opts}} -> + run_async(sage, name, transaction, comp, opts) + end + end) + end + @doc """ Executes a Sage. diff --git a/test/sage_test.exs b/test/sage_test.exs index c4c09ad..a77ff7b 100644 --- a/test/sage_test.exs +++ b/test/sage_test.exs @@ -254,6 +254,70 @@ defmodule SageTest do end end + describe "interleaves/3" do + test "adds a step between every transaction" do + sage = + new() + |> run(:t1, transaction(:t1)) + |> run(:t2, transaction(:t2)) + |> run_async(:t_async, transaction(:t_async), :noop) + |> run(:t3, transaction(:t3)) + |> interleave(:i, fn _effects, _args, previous_stage_name -> {:ok, previous_stage_name} end) + + assert [i_4: _, t3: _, i_3: _, t_async: _, i_2: _, t2: _, i_1: _, t1: _] = sage.stages + assert {:ok, _, %{i_4: :t3, i_3: :t_async, i_2: :t2, i_1: :t1}} = execute(sage) + end + + test "adds nothing if there are no transactions" do + assert [] = + new() + |> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end) + |> Map.get(:stages) + end + + test "adds a transaction at the end if there is one transaction" do + assert [i_1: _, t1: _] = + new() + |> run(:t1, transaction(:t1)) + |> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end) + |> Map.get(:stages) + end + + test "works with mfa" do + sage = + new() + |> run(:t1, transaction(:t1)) + |> run(:t2, transaction(:t2)) + |> run(:t3, transaction(:t3)) + |> interleave(:i, {TestIntermediateTransactionHandler, :intermediate_transaction_handler, [:foo]}) + + assert {:ok, _, %{i_3: {:t3, :foo}, i_2: {:t2, :foo}, i_1: {:t1, :foo}}} = execute(sage) + end + + test "can run a compensations" do + new() + |> run(:t1, transaction(:t1)) + |> run(:t2, transaction(:t2)) + |> run(:t3, transaction_with_error(:t3)) + |> interleave(:i, fn _effects, _args, _previous_stage_name -> {:ok, nil} end, fn _errored_effect, _effects_so_far, _attrs -> + send(self(), :compensating) + :ok + end) + |> execute() + + for _ <- 1..2, do: assert_received(:compensating) + end + + test "errors if used more than once" do + assert_raise Sage.DuplicateStageError, fn -> + new() + |> run(:t1, transaction(:t1)) + |> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end) + |> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end) + end + end + end + def dummy_transaction_for_mfa(_effects_so_far, _opts), do: raise("Not implemented") def dummy_compensation_for_mfa(_effect_to_compensate, _opts), do: raise("Not implemented") def dummy_final_cb(_status, _opts, _return), do: raise("Not implemented") diff --git a/test/support/test_intermediate_transaction_handler.ex b/test/support/test_intermediate_transaction_handler.ex new file mode 100644 index 0000000..a5d3906 --- /dev/null +++ b/test/support/test_intermediate_transaction_handler.ex @@ -0,0 +1,5 @@ +defmodule TestIntermediateTransactionHandler do + def intermediate_transaction_handler(_effects, _args, previous_stage_name, something_else) do + {:ok, {previous_stage_name, something_else}} + end +end From 18f32edc97c4d4ff0d9701df7aafb7f23b9c5ef9 Mon Sep 17 00:00:00 2001 From: Austin Reifsteck Date: Fri, 7 Mar 2025 14:48:38 -0500 Subject: [PATCH 2/5] test: refactor test according to PR feedback --- test/sage_test.exs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/sage_test.exs b/test/sage_test.exs index a77ff7b..8267414 100644 --- a/test/sage_test.exs +++ b/test/sage_test.exs @@ -269,10 +269,9 @@ defmodule SageTest do end test "adds nothing if there are no transactions" do - assert [] = - new() - |> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end) - |> Map.get(:stages) + sage = interleave(new(), :i, fn _effects, _args, _previous_stage_name -> :ok end) + + assert sage.stages == [] end test "adds a transaction at the end if there is one transaction" do From 51516f7d6ca0f89b8f1665cff707e25c103232e4 Mon Sep 17 00:00:00 2001 From: Austin Reifsteck Date: Fri, 7 Mar 2025 16:20:09 -0500 Subject: [PATCH 3/5] refactor: do not rebuild the saga when interleaving stages --- lib/sage.ex | 89 +++++++++++++++++++++++++++++++---------------------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/lib/sage.ex b/lib/sage.ex index 7ae8696..e565de2 100644 --- a/lib/sage.ex +++ b/lib/sage.ex @@ -453,7 +453,7 @@ defmodule Sage do This can be useful if you are trying to do a long computation and want to do something with the intermediate results, such as logging or persistence. - + Note: - This isn't strict interleaving because a transaction is still appended at the end. - Calling this function twice with the same name will give a `Sage.DuplicateStageError`, but @@ -469,33 +469,27 @@ defmodule Sage do ) :: t() def interleave(sage, name, intermediate_transaction, compensation \\ :noop) when is_intermediate_transaction(intermediate_transaction) and is_compensation(compensation) do - sage.stages - |> Enum.reverse() - |> Enum.with_index(1) - |> Enum.flat_map(fn {stage, index} -> - name = String.to_atom("#{name}_#{index}") - {stage_name, _} = stage - - transaction = - case intermediate_transaction do - {m, f, a} -> - {m, f, [stage_name | a]} - - _ -> - &intermediate_transaction.(&1, &2, stage_name) - end - - [stage, {name, build_operation!(:run, transaction, compensation)}] - end) - |> Enum.reduce(new(), fn stage, sage -> - case stage do - {name, {:run, transaction, comp, _opts}} -> - run(sage, name, transaction, comp) - - {name, {:run_async, transaction, comp, opts}} -> - run_async(sage, name, transaction, comp, opts) - end - end) + new_stages = + sage.stages + |> Enum.reverse() + |> Enum.with_index(1) + |> Enum.flat_map(fn {stage, index} -> + name = String.to_atom("#{name}_#{index}") + {stage_name, _} = stage + + transaction = + case intermediate_transaction do + {m, f, a} -> + {m, f, [stage_name | a]} + + _ -> + &intermediate_transaction.(&1, &2, stage_name) + end + + [stage, {name, build_operation!(:run, transaction, compensation)}] + end) + + add_stages(%{sage | stage_names: MapSet.new(), stages: []}, new_stages) end @doc """ @@ -558,16 +552,39 @@ defmodule Sage do end defp add_stage(sage, name, operation) do + add_stages(sage, [{name, operation}]) + end + + defp add_stages(sage, name_operation_pairs) do %{stages: stages, stage_names: names} = sage - if MapSet.member?(names, name) do - raise Sage.DuplicateStageError, sage: sage, name: name - else - %{ - sage - | stages: [{name, operation} | stages], - stage_names: MapSet.put(names, name) - } + {names_to_add_list, _operations} = + Enum.unzip(name_operation_pairs) + + names_to_add_set = MapSet.new(names_to_add_list) + + duplicates = + names_to_add_set + |> MapSet.intersection(names) + |> MapSet.to_list() + + cond do + # There is a duplicate between what was existing in the struct beforehand and + # what was passed to this function + !Enum.empty?(duplicates) -> + raise Sage.DuplicateStageError, sage: sage, name: hd(duplicates) + + # There was a duplicate within name_operation_pairs + length(names_to_add_list) != MapSet.size(names_to_add_set) -> + duplicates = names_to_add_list -- MapSet.to_list(names_to_add_set) + raise Sage.DuplicateStageError, sage: sage, name: hd(duplicates) + + true -> + %{ + sage + | stages: Enum.reverse(name_operation_pairs) ++ stages, + stage_names: MapSet.union(names, names_to_add_set) + } end end From 6449ebe1f4702e76a3818c80362739ce2c2239be Mon Sep 17 00:00:00 2001 From: Austin Reifsteck Date: Sat, 12 Apr 2025 21:58:39 -0400 Subject: [PATCH 4/5] refactor: namespace interleaves --- lib/sage.ex | 2 +- test/sage_test.exs | 33 ++++++++++++++++++++++++++++----- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/lib/sage.ex b/lib/sage.ex index e565de2..d229573 100644 --- a/lib/sage.ex +++ b/lib/sage.ex @@ -474,7 +474,7 @@ defmodule Sage do |> Enum.reverse() |> Enum.with_index(1) |> Enum.flat_map(fn {stage, index} -> - name = String.to_atom("#{name}_#{index}") + name = {:interleave, name, index} {stage_name, _} = stage transaction = diff --git a/test/sage_test.exs b/test/sage_test.exs index 8267414..a4fb092 100644 --- a/test/sage_test.exs +++ b/test/sage_test.exs @@ -264,8 +264,24 @@ defmodule SageTest do |> run(:t3, transaction(:t3)) |> interleave(:i, fn _effects, _args, previous_stage_name -> {:ok, previous_stage_name} end) - assert [i_4: _, t3: _, i_3: _, t_async: _, i_2: _, t2: _, i_1: _, t1: _] = sage.stages - assert {:ok, _, %{i_4: :t3, i_3: :t_async, i_2: :t2, i_1: :t1}} = execute(sage) + assert [ + {{:interleave, :i, 4}, _}, + {:t3, _}, + {{:interleave, :i, 3}, _}, + {:t_async, _}, + {{:interleave, :i, 2}, _}, + {:t2, _}, + {{:interleave, :i, 1}, _}, + t1: _ + ] = sage.stages + + assert {:ok, _, + %{ + {:interleave, :i, 4} => :t3, + {:interleave, :i, 3} => :t_async, + {:interleave, :i, 2} => :t2, + {:interleave, :i, 1} => :t1 + }} = execute(sage) end test "adds nothing if there are no transactions" do @@ -275,7 +291,7 @@ defmodule SageTest do end test "adds a transaction at the end if there is one transaction" do - assert [i_1: _, t1: _] = + assert [{{:interleave, :i, 1}, _}, t1: _] = new() |> run(:t1, transaction(:t1)) |> interleave(:i, fn _effects, _args, _previous_stage_name -> :ok end) @@ -290,7 +306,12 @@ defmodule SageTest do |> run(:t3, transaction(:t3)) |> interleave(:i, {TestIntermediateTransactionHandler, :intermediate_transaction_handler, [:foo]}) - assert {:ok, _, %{i_3: {:t3, :foo}, i_2: {:t2, :foo}, i_1: {:t1, :foo}}} = execute(sage) + assert {:ok, _, + %{ + {:interleave, :i, 3} => {:t3, :foo}, + {:interleave, :i, 2} => {:t2, :foo}, + {:interleave, :i, 1} => {:t1, :foo} + }} = execute(sage) end test "can run a compensations" do @@ -298,7 +319,9 @@ defmodule SageTest do |> run(:t1, transaction(:t1)) |> run(:t2, transaction(:t2)) |> run(:t3, transaction_with_error(:t3)) - |> interleave(:i, fn _effects, _args, _previous_stage_name -> {:ok, nil} end, fn _errored_effect, _effects_so_far, _attrs -> + |> interleave(:i, fn _effects, _args, _previous_stage_name -> {:ok, nil} end, fn _errored_effect, + _effects_so_far, + _attrs -> send(self(), :compensating) :ok end) From 554f537fc029c146d46c8c992283df8a4a0fa8c2 Mon Sep 17 00:00:00 2001 From: Austin Reifsteck Date: Sun, 13 Apr 2025 11:23:15 -0400 Subject: [PATCH 5/5] fix: update doc --- lib/sage.ex | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/sage.ex b/lib/sage.ex index d229573..6c7438d 100644 --- a/lib/sage.ex +++ b/lib/sage.ex @@ -448,8 +448,9 @@ defmodule Sage do do: add_stage(sage, name, build_operation!(:run_async, transaction, compensation, opts)) @doc """ - For a given Sage S with transactions :t1 -> :t2 -> :t3, a call to `interleave(S, :name, f)` - will yield a saga with transactions :t1 -> :name_1 -> :t2 -> :name_2 -> :t3 -> :name_3. + For a given Sage S with transactions `:t1` -> `:t2` -> `:t3`, a call to `interleave(S, :name, f)` + will yield a saga with transactions `:t1` -> `{:interleave, :name, 1}` -> `:t2` -> `{:interleave, :name, 2}` + -> `:t3` -> `{:interleave, :name, 3}`. This can be useful if you are trying to do a long computation and want to do something with the intermediate results, such as logging or persistence.