diff --git a/lib/archethic.ex b/lib/archethic.ex index 50a39c45b..b1d7fa9f1 100644 --- a/lib/archethic.ex +++ b/lib/archethic.ex @@ -157,7 +157,7 @@ defmodule Archethic do P2P.authorized_and_available_nodes() |> Enum.reject(&(&1.first_public_key == welcome_node_key)) |> Enum.sort_by(& &1.first_public_key) - |> P2P.nearest_nodes(welcome_node_patch) + |> P2P.sort_by_nearest_nodes(welcome_node_patch) |> Enum.filter(&P2P.node_connected?/1) this_node = Crypto.first_node_public_key() diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index b0f6d6a62..b832dce31 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -404,7 +404,7 @@ defmodule Archethic.BeaconChain do subset |> Election.beacon_storage_nodes(next_summary_date, authorized_nodes) |> Enum.filter(&P2P.node_connected?/1) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() |> Enum.take(3) |> Enum.map(&{&1, subset}) end) @@ -516,7 +516,7 @@ defmodule Archethic.BeaconChain do case P2P.quorum_read( nodes, %GetBeaconSummariesAggregate{date: summary_time}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, aggregate = %SummaryAggregate{}} -> {:ok, aggregate} diff --git a/lib/archethic/mining/pending_transaction_validation.ex b/lib/archethic/mining/pending_transaction_validation.ex index 2ddb2dc25..3357e56ec 100644 --- a/lib/archethic/mining/pending_transaction_validation.ex +++ b/lib/archethic/mining/pending_transaction_validation.ex @@ -978,7 +978,7 @@ defmodule Archethic.Mining.PendingTransactionValidation do previous_address |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() |> Enum.filter(&P2P.node_connected?/1) |> get_first_public_key(previous_address) end diff --git a/lib/archethic/mining/smart_contract_validation.ex b/lib/archethic/mining/smart_contract_validation.ex index c3392be1e..d39e075eb 100644 --- a/lib/archethic/mining/smart_contract_validation.ex +++ b/lib/archethic/mining/smart_contract_validation.ex @@ -83,7 +83,7 @@ defmodule Archethic.Mining.SmartContractValidation do |> Election.chain_storage_nodes(P2P.authorized_and_available_nodes()) |> Election.get_synchronized_nodes_before(previous_summary_time) - conflicts_resolver = fn results -> + conflict_resolver = fn results -> %SmartContractCallValidation{last_chain_sync_date: highest_date} = Enum.max_by(results, & &1.last_chain_sync_date, DateTime) @@ -108,8 +108,8 @@ defmodule Archethic.Mining.SmartContractValidation do transaction: transaction, timestamp: validation_time }, - conflicts_resolver, - @timeout + conflict_resolver: conflict_resolver, + timeout: @timeout ) do {:ok, %SmartContractCallValidation{status: :ok, fee: fee}} -> {:ok, fee} diff --git a/lib/archethic/networking/scheduler.ex b/lib/archethic/networking/scheduler.ex index 43cc7b68c..eafc8c182 100644 --- a/lib/archethic/networking/scheduler.ex +++ b/lib/archethic/networking/scheduler.ex @@ -153,7 +153,7 @@ defmodule Archethic.Networking.Scheduler do nodes = P2P.authorized_and_available_nodes() |> Enum.filter(&P2P.node_connected?/1) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() case Utils.await_confirmation(tx_address, nodes) do {:ok, validated_transaction = %Transaction{address: ^tx_address, data: ^transaction_data}} -> diff --git a/lib/archethic/p2p.ex b/lib/archethic/p2p.ex index d92e46064..c8865d42a 100644 --- a/lib/archethic/p2p.ex +++ b/lib/archethic/p2p.ex @@ -163,7 +163,7 @@ defmodule Archethic.P2P do case quorum_read( nodes, %ListNodes{authorized_and_available?: authorized_and_available?}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %NodeList{nodes: nodes}} -> {:ok, nodes} @@ -445,11 +445,11 @@ defmodule Archethic.P2P do @doc """ Return the nearest storages nodes from the local node """ - @spec nearest_nodes(list(Node.t())) :: list(Node.t()) - def nearest_nodes(storage_nodes) when is_list(storage_nodes) do + @spec sort_by_nearest_nodes(list(Node.t())) :: list(Node.t()) + def sort_by_nearest_nodes(storage_nodes) when is_list(storage_nodes) do case get_node_info(Crypto.first_node_public_key()) do {:ok, %Node{network_patch: network_patch}} -> - nearest_nodes(storage_nodes, network_patch) + sort_by_nearest_nodes(storage_nodes, network_patch) {:error, :not_found} -> storage_nodes @@ -467,7 +467,7 @@ defmodule Archethic.P2P do ...> %Node{network_patch: "3A2"} ...> ] ...> - ...> P2P.nearest_nodes(list_nodes, "12F") + ...> P2P.sort_by_nearest_nodes(list_nodes, "12F") [ %Node{network_patch: "3A2"}, %Node{network_patch: "AA0"}, @@ -480,15 +480,16 @@ defmodule Archethic.P2P do ...> %Node{network_patch: "3A2"} ...> ] ...> - ...> P2P.nearest_nodes(list_nodes, "C3A") + ...> P2P.sort_by_nearest_nodes(list_nodes, "C3A") [ %Node{network_patch: "F50"}, %Node{network_patch: "AA0"}, %Node{network_patch: "3A2"} ] """ - @spec nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) :: Enumerable.t() - def nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do + @spec sort_by_nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) :: + Enumerable.t() + def sort_by_nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do Enum.sort_by(storage_nodes, &network_distance(&1.network_patch, network_patch)) end @@ -732,165 +733,110 @@ defmodule Archethic.P2P do @doc """ Send a message to a list of nodes and perform a read quorum + + Opts: + timeout :: non_neg_integer(), + conflict_resolver :: (list(Message.t()) -> Message.t()), + acceptance_resolver :: (Message.t() -> boolean()), + consistency_level :: pos_integer(), + repair_fun :: (nil | Message.t(), list(Message.t()) -> :ok) """ @spec quorum_read( node_list :: list(Node.t()), message :: Message.t(), - conflict_resolver :: (list(Message.t()) -> Message.t()), - timeout :: non_neg_integer(), - acceptance_resolver :: (Message.t() -> boolean()), - consistency_level :: pos_integer() - ) :: - {:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed} + opts :: Keyword.t() + ) :: {:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed} def quorum_read( nodes, message, - conflict_resolver \\ fn results -> List.first(results) end, - timeout \\ 0, - acceptance_resolver \\ fn _ -> true end, - consistency_level \\ 3 - ) - - def quorum_read( - nodes, - message, - conflict_resolver, - timeout, - acceptance_resolver, - consistency_level + opts \\ [] ) do + timeout = Keyword.get(opts, :timeout, 0) + conflict_resolver = Keyword.get(opts, :conflict_resolver, &List.first(&1)) + acceptance_resolver = Keyword.get(opts, :acceptance_resolver, fn _ -> true end) + consistency_level = Keyword.get(opts, :consistency_level, 3) + repair_fun = Keyword.get(opts, :repair_fun, fn _, _ -> :ok end) + nodes |> Enum.filter(&node_connected?/1) - |> nearest_nodes() + |> sort_by_nearest_nodes() |> unprioritize_node(Crypto.first_node_public_key()) - |> do_quorum_read( - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - 0, - nil - ) + |> Enum.chunk_every(consistency_level) + |> Enum.reduce_while({nil, []}, fn nodes, {previous_result_acc, all_results_acc} -> + results_by_node = send_message_and_filter_results(nodes, message, timeout) + all_results_acc = results_by_node ++ all_results_acc + {_nodes, results} = Enum.unzip(results_by_node) + + with {:ok, results} <- enough_results(previous_result_acc, results), + result <- resolve_conflicts(results, conflict_resolver), + :ok <- result_accepted(result, acceptance_resolver) do + {:halt, {result, all_results_acc}} + else + {:error, previous_result} -> + {:cont, {previous_result, all_results_acc}} + end + end) + |> then(fn {result, all_results} -> + cond do + is_nil(result) -> + maybe_async_repair(nil, all_results, repair_fun) + {:error, :network_issue} + + acceptance_resolver.(result) -> + maybe_async_repair(result, all_results, repair_fun) + {:ok, result} + + true -> + maybe_async_repair(nil, all_results, repair_fun) + {:error, :acceptance_failed} + end + end) end - defp do_quorum_read([], _, _, _, _, _, 0, nil), do: {:error, :network_issue} - defp do_quorum_read([], _, _, _, _, _, _, nil), do: {:error, :acceptance_failed} - defp do_quorum_read([], _, _, _, _, _, _, previous_result), do: {:ok, previous_result} - - defp do_quorum_read( - nodes, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count, - previous_result - ) do - # We determine how many nodes to fetch for the quorum from the consistency level - {group, rest} = Enum.split(nodes, consistency_level) - - timeout = if timeout == 0, do: Message.get_timeout(message), else: timeout - - results = - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - group, - &send_message(&1, message, timeout), - ordered: false, - on_timeout: :kill_task, - timeout: timeout + 2000 - ) - |> Stream.filter(&match?({:ok, {:ok, _}}, &1)) - |> Stream.map(fn {:ok, {:ok, res}} -> res end) - |> Enum.to_list() - - # If no nodes answered we try another group - case length(results) do - 0 -> - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count, - previous_result - ) - - 1 -> - quorum_result = - if previous_result do - do_quorum([previous_result | results], conflict_resolver) - else - List.first(results) - end - - with true <- acceptance_resolver.(quorum_result), - nil <- previous_result do - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count, - quorum_result - ) - else - false -> - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count + 1, - previous_result - ) - - _ -> - {:ok, quorum_result} - end - - _ -> - results = if previous_result != nil, do: [previous_result | results], else: results - quorum_result = do_quorum(results, conflict_resolver) - - if acceptance_resolver.(quorum_result) do - {:ok, quorum_result} - else - do_quorum_read( - rest, - message, - conflict_resolver, - acceptance_resolver, - timeout, - consistency_level, - acceptance_failures_count + 1, - previous_result - ) - end + defp send_message_and_filter_results(nodes, message, timeout) do + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + nodes, + &{&1.first_public_key, send_message(&1, message, timeout)}, + ordered: false, + on_timeout: :kill_task, + timeout: timeout + 2000 + ) + |> Stream.filter(&match?({:ok, {_, {:ok, _}}}, &1)) + |> Enum.map(fn {:ok, {node_public_key, {:ok, res}}} -> {node_public_key, res} end) + end + + # returns ok if we have 2 results + # we should probably use the consistency level + defp enough_results(nil, []), do: {:error, nil} + defp enough_results(nil, [result]), do: {:error, result} + defp enough_results(nil, results), do: {:ok, results} + defp enough_results(previous_result, []), do: {:error, previous_result} + defp enough_results(previous_result, results), do: {:ok, [previous_result | results]} + + defp resolve_conflicts(results, conflict_resolver) do + case Enum.uniq(results) do + [result] -> result + results -> conflict_resolver.(results) end end - defp do_quorum(results, conflict_resolver) do - distinct_elems = Enum.dedup(results) - - # If the results are the same, then we reached consistency - if length(distinct_elems) == 1 do - List.first(distinct_elems) + defp result_accepted(result, acceptance_resolver) do + if acceptance_resolver.(result) do + :ok else - # If the results differ, we can apply a conflict resolver to merge the result into - # a consistent response - conflict_resolver.(distinct_elems) + {:error, result} end end + defp maybe_async_repair(_result, [], _repair_fun), do: :ok + + defp maybe_async_repair(result, results, repair_fun) do + Task.Supervisor.start_child(TaskSupervisor, fn -> + repair_fun.(result, results) + end) + end + @doc """ Update the node's network patch """ diff --git a/lib/archethic/p2p/message/get_bootstraping_nodes.ex b/lib/archethic/p2p/message/get_bootstraping_nodes.ex index 444922667..f3bc8296b 100644 --- a/lib/archethic/p2p/message/get_bootstraping_nodes.ex +++ b/lib/archethic/p2p/message/get_bootstraping_nodes.ex @@ -23,7 +23,7 @@ defmodule Archethic.P2P.Message.GetBootstrappingNodes do closest_nodes = top_nodes - |> P2P.nearest_nodes(patch) + |> P2P.sort_by_nearest_nodes(patch) |> Enum.take(5) %BootstrappingNodes{ diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 2fbb47292..62b940c1c 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -305,10 +305,10 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetLastTransactionAddress{address: address, timestamp: timestamp}, - conflict_resolver, - timeout, - acceptance_resolver, - consistency_level + conflict_resolver: conflict_resolver, + timeout: timeout, + acceptance_resolver: acceptance_resolver, + consistency_level: consistency_level ) do {:ok, %LastTransactionAddress{address: last_address}} -> {:ok, last_address} @@ -343,7 +343,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetNextAddresses{address: address, limit: limit}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %AddressList{addresses: addresses}} -> {:ok, addresses} {:error, :network_issue} = e -> e @@ -422,9 +422,9 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetTransaction{address: address}, - conflict_resolver, - timeout, - acceptance_resolver + conflict_resolver: conflict_resolver, + timeout: timeout, + acceptance_resolver: acceptance_resolver ) do {:ok, %NotFound{}} -> {:error, :transaction_not_exists} @@ -612,8 +612,8 @@ defmodule Archethic.TransactionChain do paging_state: paging_state, order: order }, - conflict_resolver, - timeout + conflict_resolver: conflict_resolver, + timeout: timeout ) do {:ok, %TransactionList{ @@ -696,7 +696,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetTransactionInputs{address: address, offset: offset, limit: limit}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %TransactionInputList{inputs: versioned_inputs, more?: more?, offset: offset}} -> {versioned_inputs, more?, offset} @@ -775,7 +775,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetUnspentOutputs{address: address, offset: offset, limit: limit}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %UnspentOutputList{ @@ -808,7 +808,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetTransactionChainLength{address: address}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %TransactionChainLength{length: length}} -> {:ok, length} @@ -849,9 +849,9 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetGenesisAddress{address: address}, - conflict_resolver, - timeout, - acceptance_resolver + conflict_resolver: conflict_resolver, + timeout: timeout, + acceptance_resolver: acceptance_resolver ) do {:ok, %GenesisAddress{address: genesis_address}} -> {:ok, genesis_address} @@ -903,7 +903,7 @@ defmodule Archethic.TransactionChain do case P2P.quorum_read( nodes, %GetFirstTransactionAddress{address: address}, - conflict_resolver + conflict_resolver: conflict_resolver ) do {:ok, %NotFound{}} -> {:error, :does_not_exist} @@ -1138,7 +1138,9 @@ defmodule Archethic.TransactionChain do end end - case P2P.quorum_read(nodes, %GetTransactionSummary{address: address}, conflict_resolver) do + case P2P.quorum_read(nodes, %GetTransactionSummary{address: address}, + conflict_resolver: conflict_resolver + ) do {:ok, %TransactionSummaryMessage{transaction_summary: %TransactionSummary{address: ^address}}} -> true diff --git a/lib/archethic_web/api/graphql/schema/resolver.ex b/lib/archethic_web/api/graphql/schema/resolver.ex index 9bfb31d77..43af41c49 100644 --- a/lib/archethic_web/api/graphql/schema/resolver.ex +++ b/lib/archethic_web/api/graphql/schema/resolver.ex @@ -341,7 +341,7 @@ defmodule ArchethicWeb.API.GraphQL.Schema.Resolver do def nearest_endpoints(ip) do P2P.authorized_and_available_nodes() - |> P2P.nearest_nodes(P2P.get_geo_patch(ip)) + |> P2P.sort_by_nearest_nodes(P2P.get_geo_patch(ip)) |> Enum.map(&%{ip: :inet.ntoa(&1.ip), port: &1.http_port}) end diff --git a/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs b/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs index dbcf02543..f4ae9bc6f 100644 --- a/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs +++ b/priv/migration_tasks/prod/1.5.9@add_mining_bls_key.exs @@ -44,7 +44,7 @@ defmodule Migration_1_5_9 do nodes = P2P.authorized_and_available_nodes() |> Enum.filter(&P2P.node_connected?/1) - |> P2P.nearest_nodes() + |> P2P.sort_by_nearest_nodes() case Utils.await_confirmation(tx.address, nodes) do {:ok, _} -> diff --git a/test/archethic/p2p_test.exs b/test/archethic/p2p_test.exs index 5e4b7fa31..efd534184 100644 --- a/test/archethic/p2p_test.exs +++ b/test/archethic/p2p_test.exs @@ -1,5 +1,6 @@ defmodule Archethic.P2PTest do use ArchethicCase + import ArchethicCase alias Archethic.Crypto @@ -30,67 +31,10 @@ defmodule Archethic.P2PTest do assert %Node{ip: {127, 0, 0, 1}} = P2P.get_node_info() end - describe "quorum_read/4" do - setup do - pub1 = Crypto.generate_deterministic_keypair("node1") |> elem(0) - pub2 = Crypto.generate_deterministic_keypair("node2") |> elem(0) - pub3 = Crypto.generate_deterministic_keypair("node3") |> elem(0) - pub4 = Crypto.generate_deterministic_keypair("node4") |> elem(0) - pub5 = Crypto.generate_deterministic_keypair("node5") |> elem(0) - - nodes = [ - %Node{ - ip: {127, 0, 0, 1}, - port: 3002, - first_public_key: pub1, - last_public_key: pub1, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3003, - first_public_key: pub2, - last_public_key: pub2, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3004, - first_public_key: pub3, - last_public_key: pub3, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3005, - first_public_key: pub4, - last_public_key: pub4, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - }, - %Node{ - ip: {127, 0, 0, 1}, - port: 3006, - first_public_key: pub5, - last_public_key: pub5, - available?: true, - network_patch: "AAA", - geo_patch: "AAA" - } - ] - - Enum.each(nodes, &P2P.add_and_connect_node/1) - {:ok, %{nodes: nodes}} - end + describe "quorum_read/3" do + test "should return the first result when the same results are returned" do + nodes = add_and_connect_nodes(5) - test "should return the first result when the same results are returned", %{nodes: nodes} do MockClient |> expect( :send_message, @@ -103,7 +47,9 @@ defmodule Archethic.P2PTest do assert {:ok, %Transaction{}} = P2P.quorum_read(nodes, %GetTransaction{address: ""}) end - test "should run resolver conflicts when the results are different", %{nodes: nodes} do + test "should run resolver conflicts when the results are different" do + nodes = add_and_connect_nodes(5) + MockClient |> stub(:send_message, fn %Node{port: 3004}, %GetTransaction{}, _timeout -> @@ -120,21 +66,22 @@ defmodule Archethic.P2PTest do end) assert {:ok, %Transaction{}} = - P2P.quorum_read(nodes, %GetTransaction{address: ""}, fn results -> - case Enum.find(results, &match?(%Transaction{}, &1)) do - nil -> - %NotFound{} - - tx -> - tx + P2P.quorum_read(nodes, %GetTransaction{address: ""}, + conflict_resolver: fn results -> + case Enum.find(results, &match?(%Transaction{}, &1)) do + nil -> + %NotFound{} + + tx -> + tx + end end - end) + ) end - test "should try all nodes and return error when no response match acceptance resolver", - %{ - nodes: nodes - } do + test "should try all nodes and return error when no response match acceptance resolver" do + nodes = add_and_connect_nodes(5) + MockClient |> expect( :send_message, @@ -156,11 +103,137 @@ defmodule Archethic.P2PTest do P2P.quorum_read( nodes, %GetTransaction{address: ""}, - fn results -> List.last(results) end, - 0, - fn _ -> false end + acceptance_resolver: fn _ -> false end ) end + + test "should accept a single result for the entire set" do + nodes = [node | _] = add_and_connect_nodes(5) + + MockClient + |> stub(:send_message, fn + ^node, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + + _, %GetTransaction{}, _timeout -> + {:error, :timeout} + end) + + assert {:ok, %Transaction{}} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""} + ) + end + + test "should not accept a single result if not gone through the entire set" do + nodes = [node, _, _, _, node5] = add_and_connect_nodes(5) + + me = self() + + MockClient + |> stub(:send_message, fn + ^node, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + + ^node5, %GetTransaction{}, _timeout -> + send(me, :node5_requested) + + {:ok, %NotFound{}} + + _, %GetTransaction{}, _timeout -> + {:error, :timeout} + end) + + assert {:ok, %Transaction{}} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""} + ) + + assert_receive :node5_requested, 100 + end + + test "repair function should receive a nil accepted_result when no accepted response" do + nodes = add_and_connect_nodes(5) + + MockClient + |> stub(:send_message, fn + _, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end) + + me = self() + + assert {:error, :acceptance_failed} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""}, + acceptance_resolver: fn _ -> false end, + repair_fun: fn accepted_result, results_by_node -> + assert is_nil(accepted_result) + assert match?([{_, _} | _], results_by_node) + send(me, {:repairing, length(results_by_node)}) + :ok + end + ) + + expected_size = length(nodes) + assert_receive({:repairing, ^expected_size}, 100) + end + + test "repair function should receive the accepted_result" do + nodes = add_and_connect_nodes(5) + + MockClient + |> stub(:send_message, fn + _, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end) + + me = self() + + assert {:ok, %Transaction{}} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""}, + repair_fun: fn accepted_result, results_by_node -> + assert %Transaction{} = accepted_result + assert match?([{_, _} | _], results_by_node) + send(me, {:repairing, length(results_by_node)}) + :ok + end + ) + + # 3 is consistency_level + assert_receive({:repairing, 3}, 100) + end + + test "should call the repair function asynchronously" do + nodes = add_and_connect_nodes(5) + + MockClient + |> stub(:send_message, fn + _, %GetTransaction{}, _timeout -> + {:ok, %Transaction{}} + end) + + me = self() + + assert {:error, :acceptance_failed} = + P2P.quorum_read( + nodes, + %GetTransaction{address: ""}, + acceptance_resolver: fn _ -> false end, + repair_fun: fn _ -> + Process.sleep(10_000) + send(me, :repairing_done) + :ok + end + ) + + refute_received(:repairing_done) + end end describe "authorized_and_available_nodes/1" do diff --git a/test/archethic/replication/transaction_context_test.exs b/test/archethic/replication/transaction_context_test.exs index 6e0a89b7a..393adbf5c 100644 --- a/test/archethic/replication/transaction_context_test.exs +++ b/test/archethic/replication/transaction_context_test.exs @@ -34,7 +34,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %Transaction{address: address}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) assert %Transaction{} = TransactionContext.fetch_transaction(address) end @@ -48,7 +48,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %NotFound{}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) # no assert, we use expect(5) in the mock TransactionContext.fetch_transaction(address, acceptance_resolver: :accept_transaction) @@ -65,7 +65,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %GenesisAddress{address: genesis, timestamp: DateTime.utc_now()}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) assert genesis == TransactionContext.fetch_genesis_address(address) end @@ -79,7 +79,7 @@ defmodule Archethic.Replication.TransactionContextTest do {:ok, %GenesisAddress{address: address, timestamp: DateTime.utc_now()}} end) - connect_to_n_nodes(5) + add_and_connect_nodes(5) # no assert, we use expect(5) in the mock TransactionContext.fetch_genesis_address(address, @@ -244,20 +244,4 @@ defmodule Archethic.Replication.TransactionContextTest do assert [^v_utxo] = TransactionContext.fetch_transaction_unspent_outputs(genesis_address) end - - def connect_to_n_nodes(n) do - Enum.each(1..n, fn i -> - P2P.add_and_connect_node(%Node{ - ip: {127, 0, 0, 1}, - port: 3000 + i, - first_public_key: random_public_key(), - last_public_key: random_public_key(), - available?: true, - geo_patch: "AAA", - network_patch: "AAA", - authorized?: true, - authorization_date: DateTime.utc_now() - }) - end) - end end diff --git a/test/support/template.ex b/test/support/template.ex index 2bab9ab77..7cb0aeda1 100644 --- a/test/support/template.ex +++ b/test/support/template.ex @@ -19,6 +19,9 @@ defmodule ArchethicCase do alias Archethic.Governance.Pools.MemTable, as: PoolsMemTable alias Archethic.OracleChain.MemTable, as: OracleMemTable + + alias Archethic.P2P + alias Archethic.P2P.Node alias Archethic.P2P.MemTable, as: P2PMemTable alias Archethic.ContractFactory @@ -284,6 +287,25 @@ defmodule ArchethicCase do end) end + def add_and_connect_nodes(n) do + Enum.map(1..n, fn i -> + node = %Node{ + ip: {127, 0, 0, 1}, + port: 3000 + i, + first_public_key: random_public_key(), + last_public_key: random_public_key(), + available?: true, + geo_patch: "AAA", + network_patch: "AAA", + authorized?: true, + authorization_date: DateTime.utc_now() + } + + P2P.add_and_connect_node(node) + node + end) + end + def random_address() do <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>> end