Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a repair function to the quorum read #1586

Merged
merged 8 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/archethic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ defmodule Archethic do
P2P.authorized_and_available_nodes()
|> Enum.reject(&(&1.first_public_key == welcome_node_key))
|> Enum.sort_by(& &1.first_public_key)
|> P2P.nearest_nodes(welcome_node_patch)
|> P2P.sort_by_nearest_nodes(welcome_node_patch)
|> Enum.filter(&P2P.node_connected?/1)

this_node = Crypto.first_node_public_key()
Expand Down
4 changes: 2 additions & 2 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ defmodule Archethic.BeaconChain do
subset
|> Election.beacon_storage_nodes(next_summary_date, authorized_nodes)
|> Enum.filter(&P2P.node_connected?/1)
|> P2P.nearest_nodes()
|> P2P.sort_by_nearest_nodes()
|> Enum.take(3)
|> Enum.map(&{&1, subset})
end)
Expand Down Expand Up @@ -516,7 +516,7 @@ defmodule Archethic.BeaconChain do
case P2P.quorum_read(
nodes,
%GetBeaconSummariesAggregate{date: summary_time},
conflict_resolver
conflict_resolver: conflict_resolver
) do
{:ok, aggregate = %SummaryAggregate{}} ->
{:ok, aggregate}
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/mining/pending_transaction_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ defmodule Archethic.Mining.PendingTransactionValidation do

previous_address
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> P2P.nearest_nodes()
|> P2P.sort_by_nearest_nodes()
|> Enum.filter(&P2P.node_connected?/1)
|> get_first_public_key(previous_address)
end
Expand Down
6 changes: 3 additions & 3 deletions lib/archethic/mining/smart_contract_validation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ defmodule Archethic.Mining.SmartContractValidation do
|> Election.chain_storage_nodes(P2P.authorized_and_available_nodes())
|> Election.get_synchronized_nodes_before(previous_summary_time)

conflicts_resolver = fn results ->
conflict_resolver = fn results ->
%SmartContractCallValidation{last_chain_sync_date: highest_date} =
Enum.max_by(results, & &1.last_chain_sync_date, DateTime)

Expand All @@ -108,8 +108,8 @@ defmodule Archethic.Mining.SmartContractValidation do
transaction: transaction,
timestamp: validation_time
},
conflicts_resolver,
@timeout
conflict_resolver: conflict_resolver,
timeout: @timeout
) do
{:ok, %SmartContractCallValidation{status: :ok, fee: fee}} ->
{:ok, fee}
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/networking/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ defmodule Archethic.Networking.Scheduler do
nodes =
P2P.authorized_and_available_nodes()
|> Enum.filter(&P2P.node_connected?/1)
|> P2P.nearest_nodes()
|> P2P.sort_by_nearest_nodes()

case Utils.await_confirmation(tx_address, nodes) do
{:ok, validated_transaction = %Transaction{address: ^tx_address, data: ^transaction_data}} ->
Expand Down
240 changes: 93 additions & 147 deletions lib/archethic/p2p.ex
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ defmodule Archethic.P2P do
case quorum_read(
nodes,
%ListNodes{authorized_and_available?: authorized_and_available?},
conflict_resolver
conflict_resolver: conflict_resolver
) do
{:ok, %NodeList{nodes: nodes}} ->
{:ok, nodes}
Expand Down Expand Up @@ -445,11 +445,11 @@ defmodule Archethic.P2P do
@doc """
Return the nearest storages nodes from the local node
"""
@spec nearest_nodes(list(Node.t())) :: list(Node.t())
def nearest_nodes(storage_nodes) when is_list(storage_nodes) do
@spec sort_by_nearest_nodes(list(Node.t())) :: list(Node.t())
def sort_by_nearest_nodes(storage_nodes) when is_list(storage_nodes) do
case get_node_info(Crypto.first_node_public_key()) do
{:ok, %Node{network_patch: network_patch}} ->
nearest_nodes(storage_nodes, network_patch)
sort_by_nearest_nodes(storage_nodes, network_patch)

{:error, :not_found} ->
storage_nodes
Expand All @@ -467,7 +467,7 @@ defmodule Archethic.P2P do
...> %Node{network_patch: "3A2"}
...> ]
...>
...> P2P.nearest_nodes(list_nodes, "12F")
...> P2P.sort_by_nearest_nodes(list_nodes, "12F")
[
%Node{network_patch: "3A2"},
%Node{network_patch: "AA0"},
Expand All @@ -480,15 +480,16 @@ defmodule Archethic.P2P do
...> %Node{network_patch: "3A2"}
...> ]
...>
...> P2P.nearest_nodes(list_nodes, "C3A")
...> P2P.sort_by_nearest_nodes(list_nodes, "C3A")
[
%Node{network_patch: "F50"},
%Node{network_patch: "AA0"},
%Node{network_patch: "3A2"}
]
"""
@spec nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) :: Enumerable.t()
def nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do
@spec sort_by_nearest_nodes(node_list :: Enumerable.t(), network_patch :: binary()) ::
Enumerable.t()
def sort_by_nearest_nodes(storage_nodes, network_patch) when is_binary(network_patch) do
Enum.sort_by(storage_nodes, &network_distance(&1.network_patch, network_patch))
end

Expand Down Expand Up @@ -732,165 +733,110 @@ defmodule Archethic.P2P do

@doc """
Send a message to a list of nodes and perform a read quorum

Opts:
timeout :: non_neg_integer(),
conflict_resolver :: (list(Message.t()) -> Message.t()),
acceptance_resolver :: (Message.t() -> boolean()),
consistency_level :: pos_integer(),
repair_fun :: (nil | Message.t(), list(Message.t()) -> :ok)
"""
@spec quorum_read(
node_list :: list(Node.t()),
message :: Message.t(),
conflict_resolver :: (list(Message.t()) -> Message.t()),
timeout :: non_neg_integer(),
acceptance_resolver :: (Message.t() -> boolean()),
consistency_level :: pos_integer()
) ::
{:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed}
opts :: Keyword.t()
) :: {:ok, Message.t()} | {:error, :network_issue} | {:error, :acceptance_failed}
def quorum_read(
nodes,
message,
conflict_resolver \\ fn results -> List.first(results) end,
timeout \\ 0,
acceptance_resolver \\ fn _ -> true end,
consistency_level \\ 3
)

def quorum_read(
nodes,
message,
conflict_resolver,
timeout,
acceptance_resolver,
consistency_level
opts \\ []
) do
timeout = Keyword.get(opts, :timeout, 0)
conflict_resolver = Keyword.get(opts, :conflict_resolver, &List.first(&1))
acceptance_resolver = Keyword.get(opts, :acceptance_resolver, fn _ -> true end)
consistency_level = Keyword.get(opts, :consistency_level, 3)
repair_fun = Keyword.get(opts, :repair_fun, fn _, _ -> :ok end)

nodes
|> Enum.filter(&node_connected?/1)
|> nearest_nodes()
|> sort_by_nearest_nodes()
|> unprioritize_node(Crypto.first_node_public_key())
|> do_quorum_read(
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
0,
nil
)
|> Enum.chunk_every(consistency_level)
|> Enum.reduce_while({nil, []}, fn nodes, {previous_result_acc, all_results_acc} ->
results_by_node = send_message_and_filter_results(nodes, message, timeout)
all_results_acc = results_by_node ++ all_results_acc
{_nodes, results} = Enum.unzip(results_by_node)

bchamagne marked this conversation as resolved.
Show resolved Hide resolved
with {:ok, results} <- enough_results(previous_result_acc, results),
result <- resolve_conflicts(results, conflict_resolver),
:ok <- result_accepted(result, acceptance_resolver) do
{:halt, {result, all_results_acc}}
else
{:error, previous_result} ->
{:cont, {previous_result, all_results_acc}}
end
end)
|> then(fn {result, all_results} ->
cond do
is_nil(result) ->
maybe_async_repair(nil, all_results, repair_fun)
{:error, :network_issue}

acceptance_resolver.(result) ->
maybe_async_repair(result, all_results, repair_fun)
{:ok, result}

true ->
maybe_async_repair(nil, all_results, repair_fun)
{:error, :acceptance_failed}
end
end)
end

defp do_quorum_read([], _, _, _, _, _, 0, nil), do: {:error, :network_issue}
defp do_quorum_read([], _, _, _, _, _, _, nil), do: {:error, :acceptance_failed}
defp do_quorum_read([], _, _, _, _, _, _, previous_result), do: {:ok, previous_result}

defp do_quorum_read(
nodes,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count,
previous_result
) do
# We determine how many nodes to fetch for the quorum from the consistency level
{group, rest} = Enum.split(nodes, consistency_level)

timeout = if timeout == 0, do: Message.get_timeout(message), else: timeout

results =
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
group,
&send_message(&1, message, timeout),
ordered: false,
on_timeout: :kill_task,
timeout: timeout + 2000
)
|> Stream.filter(&match?({:ok, {:ok, _}}, &1))
|> Stream.map(fn {:ok, {:ok, res}} -> res end)
|> Enum.to_list()

# If no nodes answered we try another group
case length(results) do
0 ->
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count,
previous_result
)

1 ->
quorum_result =
if previous_result do
do_quorum([previous_result | results], conflict_resolver)
else
List.first(results)
end

with true <- acceptance_resolver.(quorum_result),
nil <- previous_result do
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count,
quorum_result
)
else
false ->
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count + 1,
previous_result
)

_ ->
{:ok, quorum_result}
end

_ ->
results = if previous_result != nil, do: [previous_result | results], else: results
quorum_result = do_quorum(results, conflict_resolver)

if acceptance_resolver.(quorum_result) do
{:ok, quorum_result}
else
do_quorum_read(
rest,
message,
conflict_resolver,
acceptance_resolver,
timeout,
consistency_level,
acceptance_failures_count + 1,
previous_result
)
end
defp send_message_and_filter_results(nodes, message, timeout) do
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
nodes,
&{&1.first_public_key, send_message(&1, message, timeout)},
ordered: false,
on_timeout: :kill_task,
timeout: timeout + 2000
)
|> Stream.filter(&match?({:ok, {_, {:ok, _}}}, &1))
|> Enum.map(fn {:ok, {node_public_key, {:ok, res}}} -> {node_public_key, res} end)
end

# returns ok if we have 2 results
# we should probably use the consistency level
defp enough_results(nil, []), do: {:error, nil}
defp enough_results(nil, [result]), do: {:error, result}
defp enough_results(nil, results), do: {:ok, results}
defp enough_results(previous_result, []), do: {:error, previous_result}
defp enough_results(previous_result, results), do: {:ok, [previous_result | results]}

defp resolve_conflicts(results, conflict_resolver) do
case Enum.uniq(results) do
[result] -> result
results -> conflict_resolver.(results)
end
end

defp do_quorum(results, conflict_resolver) do
distinct_elems = Enum.dedup(results)

# If the results are the same, then we reached consistency
if length(distinct_elems) == 1 do
List.first(distinct_elems)
defp result_accepted(result, acceptance_resolver) do
if acceptance_resolver.(result) do
:ok
else
# If the results differ, we can apply a conflict resolver to merge the result into
# a consistent response
conflict_resolver.(distinct_elems)
{:error, result}
end
end

defp maybe_async_repair(_result, [], _repair_fun), do: :ok

defp maybe_async_repair(result, results, repair_fun) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
repair_fun.(result, results)
end)
end

@doc """
Update the node's network patch
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/archethic/p2p/message/get_bootstraping_nodes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Archethic.P2P.Message.GetBootstrappingNodes do

closest_nodes =
top_nodes
|> P2P.nearest_nodes(patch)
|> P2P.sort_by_nearest_nodes(patch)
|> Enum.take(5)

%BootstrappingNodes{
Expand Down
Loading
Loading