Skip to content

Commit

Permalink
Add a repair function to the quorum read (#1586)
Browse files Browse the repository at this point in the history
* rename function

* refactor the quorum using a reduce

* return the results zipped with the node + refactor

* quorum_read use a keyword list

* oops

* refactor for code quality

* provide the accepted_result to the repair_fun

* more lint after feedback, add a test
  • Loading branch information
bchamagne authored Oct 29, 2024
1 parent 8f6ddc9 commit 507c4be
Show file tree
Hide file tree
Showing 13 changed files with 299 additions and 272 deletions.
2 changes: 1 addition & 1 deletion lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ defmodule Archethic do
P2P.authorized_and_available_nodes()
|> Enum.reject(&(&1.first_public_key == welcome_node_key))
|> Enum.sort_by(& &1.first_public_key)
|> P2P.nearest_nodes(welcome_node_patch)
|> P2P.sort_by_nearest_nodes(welcome_node_patch)
|> Enum.filter(&P2P.node_connected?/1)

this_node = Crypto.first_node_public_key()
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ defmodule Archethic.BeaconChain do
subset
|> Election.beacon_storage_nodes(next_summary_date, authorized_nodes)
|> Enum.filter(&P2P.node_connected?/1)
|> P2P.nearest_nodes()
|> P2P.sort_by_nearest_nodes()
|> Enum.take(3)
|> Enum.map(&{&1, subset})
end)
Expand Down Expand Up @@ -516,7 +516,7 @@ defmodule Archethic.BeaconChain do
case P2P.quorum_read(
nodes,
%GetBeaconSummariesAggregate{date: summary_time},
conflict_resolver
conflict_resolver: conflict_resolver
) do
{:ok, aggregate = %SummaryAggregate{}} ->
{:ok, aggregate}
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/mining/pending_transaction_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ defmodule Archethic.Mining.PendingTransactionValidation do

previous_address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> P2P.sort_by_nearest_nodes()
|> Enum.filter(&P2P.node_connected?/1)
|> get_first_public_key(previous_address)
end
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/mining/smart_contract_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ defmodule Archethic.Mining.SmartContractValidation do
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> Election.get_synchronized_nodes_before(previous_summary_time)

conflicts_resolver = fn results ->
conflict_resolver = fn results ->
%SmartContractCallValidation{last_chain_sync_date: highest_date} =
Enum.max_by(results, & &1.last_chain_sync_date, DateTime)

Expand All @@ -108,8 +108,8 @@ defmodule Archethic.Mining.SmartContractValidation do
transaction: transaction,
timestamp: validation_time
},
conflicts_resolver,
@timeout
conflict_resolver: conflict_resolver,
timeout: @timeout
) do
{:ok, %SmartContractCallValidation{status: :ok, fee: fee}} ->
{:ok, fee}
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/networking/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ defmodule Archethic.Networking.Scheduler do
nodes =
P2P.authorized_and_available_nodes()
|> Enum.filter(&P2P.node_connected?/1)
|> P2P.nearest_nodes()
|> P2P.sort_by_nearest_nodes()

case Utils.await_confirmation(tx_address, nodes) do
{:ok, validated_transaction = %Transaction{address: ^tx_address, data: ^transaction_data}} ->
Expand Down
240 changes: 93 additions & 147 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ defmodule Archethic.P2P do
case quorum_read(
nodes,
%ListNodes{authorized_and_available?: authorized_and_available?},
conflict_resolver
conflict_resolver: conflict_resolver
) do
{:ok, %NodeList{nodes: nodes}} ->
{:ok, nodes}
Expand Down Expand Up @@ -445,11 +445,11 @@ defmodule Archethic.P2P do
@doc """
Return the nearest storages nodes from the local node
"""
@spec nearest_nodes(list(Node.t())) :: list(Node.t())
def nearest_nodes(storage_nodes) when is_list(storage_nodes) do
@spec sort_by_nearest_nodes(list(Node.t())) :: list(Node.t())
def sort_by_nearest_nodes(storage_nodes) when is_list(storage_nodes) do
case get_node_info(Crypto.first_node_public_key()) do
{:ok, %Node{network_patch: network_patch}} ->
nearest_nodes(storage_nodes, network_patch)
sort_by_nearest_nodes(storage_nodes, network_patch)

{:error, :not_found} ->
storage_nodes
Expand All @@ -467,7 +467,7 @@ defmodule Archethic.P2P do
...> %Node{network_patch: "3A2"}
...> ]
...>
...> P2P.nearest_nodes(list_nodes, "12F")
...> P2P.sort_by_nearest_nodes(list_nodes, "12F")
[
%Node{network_patch: "3A2"},
%Node{network_patch: "AA0"},
Expand All @@ -480,15 +480,16 @@ defmodule Archethic.P2P do
...> %Node{network_patch: "3A2"}
...> ]
...>
...> P2P.nearest_nodes(list_nodes, "C3A")
...> P2P.sort_by_nearest_nodes(list_nodes, "C3A")
[
%Node{network_patch: "F50"},
%Node{network_patch: "AA0"},
%Node{network_patch: "3A2"}
]
"""
@spec nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) :: Enumerable.t()
def nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do
@spec sort_by_nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) ::
Enumerable.t()
def sort_by_nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do
Enum.sort_by(storage_nodes, &network_distance(&1.network_patch, network_patch))
end

Expand Down Expand Up @@ -732,165 +733,110 @@ defmodule Archethic.P2P do

@doc """
Send a message to a list of nodes and perform a read quorum
Opts:
timeout :: non_neg_integer(),
conflict_resolver :: (list(Message.t()) -> Message.t()),
acceptance_resolver :: (Message.t() -> boolean()),
consistency_level :: pos_integer(),
repair_fun :: (nil | Message.t(), list(Message.t()) -> :ok)
"""
@spec quorum_read(
node_list :: list(Node.t()),
message :: Message.t(),
conflict_resolver :: (list(Message.t()) -> Message.t()),
timeout :: non_neg_integer(),
acceptance_resolver :: (Message.t() -> boolean()),
consistency_level :: pos_integer()
) ::
{:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed}
opts :: Keyword.t()
) :: {:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed}
def quorum_read(
nodes,
message,
conflict_resolver \\ fn results -> List.first(results) end,
timeout \\ 0,
acceptance_resolver \\ fn _ -> true end,
consistency_level \\ 3
)

def quorum_read(
nodes,
message,
conflict_resolver,
timeout,
acceptance_resolver,
consistency_level
opts \\ []
) do
timeout = Keyword.get(opts, :timeout, 0)
conflict_resolver = Keyword.get(opts, :conflict_resolver, &List.first(&1))
acceptance_resolver = Keyword.get(opts, :acceptance_resolver, fn _ -> true end)
consistency_level = Keyword.get(opts, :consistency_level, 3)
repair_fun = Keyword.get(opts, :repair_fun, fn _, _ -> :ok end)

nodes
|> Enum.filter(&node_connected?/1)
|> nearest_nodes()
|> sort_by_nearest_nodes()
|> unprioritize_node(Crypto.first_node_public_key())
|> do_quorum_read(
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
0,
nil
)
|> Enum.chunk_every(consistency_level)
|> Enum.reduce_while({nil, []}, fn nodes, {previous_result_acc, all_results_acc} ->
results_by_node = send_message_and_filter_results(nodes, message, timeout)
all_results_acc = results_by_node ++ all_results_acc
{_nodes, results} = Enum.unzip(results_by_node)

with {:ok, results} <- enough_results(previous_result_acc, results),
result <- resolve_conflicts(results, conflict_resolver),
:ok <- result_accepted(result, acceptance_resolver) do
{:halt, {result, all_results_acc}}
else
{:error, previous_result} ->
{:cont, {previous_result, all_results_acc}}
end
end)
|> then(fn {result, all_results} ->
cond do
is_nil(result) ->
maybe_async_repair(nil, all_results, repair_fun)
{:error, :network_issue}

acceptance_resolver.(result) ->
maybe_async_repair(result, all_results, repair_fun)
{:ok, result}

true ->
maybe_async_repair(nil, all_results, repair_fun)
{:error, :acceptance_failed}
end
end)
end

defp do_quorum_read([], _, _, _, _, _, 0, nil), do: {:error, :network_issue}
defp do_quorum_read([], _, _, _, _, _, _, nil), do: {:error, :acceptance_failed}
defp do_quorum_read([], _, _, _, _, _, _, previous_result), do: {:ok, previous_result}

defp do_quorum_read(
nodes,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count,
previous_result
) do
# We determine how many nodes to fetch for the quorum from the consistency level
{group, rest} = Enum.split(nodes, consistency_level)

timeout = if timeout == 0, do: Message.get_timeout(message), else: timeout

results =
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
group,
&send_message(&1, message, timeout),
ordered: false,
on_timeout: :kill_task,
timeout: timeout + 2000
)
|> Stream.filter(&match?({:ok, {:ok, _}}, &1))
|> Stream.map(fn {:ok, {:ok, res}} -> res end)
|> Enum.to_list()

# If no nodes answered we try another group
case length(results) do
0 ->
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count,
previous_result
)

1 ->
quorum_result =
if previous_result do
do_quorum([previous_result | results], conflict_resolver)
else
List.first(results)
end

with true <- acceptance_resolver.(quorum_result),
nil <- previous_result do
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count,
quorum_result
)
else
false ->
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count + 1,
previous_result
)

_ ->
{:ok, quorum_result}
end

_ ->
results = if previous_result != nil, do: [previous_result | results], else: results
quorum_result = do_quorum(results, conflict_resolver)

if acceptance_resolver.(quorum_result) do
{:ok, quorum_result}
else
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count + 1,
previous_result
)
end
defp send_message_and_filter_results(nodes, message, timeout) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
nodes,
&{&1.first_public_key, send_message(&1, message, timeout)},
ordered: false,
on_timeout: :kill_task,
timeout: timeout + 2000
)
|> Stream.filter(&match?({:ok, {_, {:ok, _}}}, &1))
|> Enum.map(fn {:ok, {node_public_key, {:ok, res}}} -> {node_public_key, res} end)
end

# returns ok if we have 2 results
# we should probably use the consistency level
defp enough_results(nil, []), do: {:error, nil}
defp enough_results(nil, [result]), do: {:error, result}
defp enough_results(nil, results), do: {:ok, results}
defp enough_results(previous_result, []), do: {:error, previous_result}
defp enough_results(previous_result, results), do: {:ok, [previous_result | results]}

defp resolve_conflicts(results, conflict_resolver) do
case Enum.uniq(results) do
[result] -> result
results -> conflict_resolver.(results)
end
end

defp do_quorum(results, conflict_resolver) do
distinct_elems = Enum.dedup(results)

# If the results are the same, then we reached consistency
if length(distinct_elems) == 1 do
List.first(distinct_elems)
defp result_accepted(result, acceptance_resolver) do
if acceptance_resolver.(result) do
:ok
else
# If the results differ, we can apply a conflict resolver to merge the result into
# a consistent response
conflict_resolver.(distinct_elems)
{:error, result}
end
end

defp maybe_async_repair(_result, [], _repair_fun), do: :ok

defp maybe_async_repair(result, results, repair_fun) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
repair_fun.(result, results)
end)
end

@doc """
Update the node's network patch
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/p2p/message/get_bootstraping_nodes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Archethic.P2P.Message.GetBootstrappingNodes do

closest_nodes =
top_nodes
|> P2P.nearest_nodes(patch)
|> P2P.sort_by_nearest_nodes(patch)
|> Enum.take(5)

%BootstrappingNodes{
Expand Down
Loading

0 comments on commit 507c4be

Please sign in to comment.