diff --git a/lib/archethic/transaction_chain.ex b/lib/archethic/transaction_chain.ex index 2a9fa9dff..a50352c4c 100644 --- a/lib/archethic/transaction_chain.ex +++ b/lib/archethic/transaction_chain.ex @@ -303,6 +303,27 @@ defmodule Archethic.TransactionChain do Enum.max_by(results, &DateTime.to_unix(&1.timestamp, :millisecond)) end + # repair_fun = fn + # nil, _results_by_node -> + # :ok + + # %GetLastTransactionAddress{address: last_address}, results_by_node -> + # results_by_node + # |> Enum.reduce([], fn + # {_node_public_key, %GetLastTransactionAddress{address: ^last_address}}, acc -> + # acc + + # {node_public_key, %GetLastTransactionAddress{address: _}}, acc -> + # [node_public_key | acc] + # end) + # |> P2P.get_nodes_info() + # |> P2P.broadcast_message(%ShardRepair{ + # genesis_address: genesis_address, + # storage_address: last_address, + # io_addresses: [] + # }) + # end + case P2P.quorum_read( nodes, %GetLastTransactionAddress{address: address, timestamp: timestamp}, @@ -310,6 +331,7 @@ defmodule Archethic.TransactionChain do timeout: timeout, acceptance_resolver: acceptance_resolver, consistency_level: consistency_level + # repair_fun: repair_fun ) do {:ok, %LastTransactionAddress{address: last_address}} -> {:ok, last_address} @@ -341,10 +363,37 @@ defmodule Archethic.TransactionChain do Enum.sort_by(results, &length(&1.addresses), :desc) |> List.first() end + # repair_fun = fn + # nil, _results_by_node -> + # :ok + + # %AddressList{addresses: []}, _results_by_nodes -> + # :ok + + # %AddressList{addresses: accepted_addresses}, results_by_nodes -> + # last_accepted_address = List.last(accepted_addresses) + + # results_by_nodes + # |> Enum.reduce([], fn {node_public_key, %AddressList{addresses: addresses}}, acc -> + # if last_accepted_address == List.last(addresses) do + # acc + # else + # [node_public_key | acc] + # end + # end) + # |> P2P.get_nodes_info() + # |> P2P.broadcast_message(%ShardRepair{ + # genesis_address: genesis_address, + # storage_address: last_accepted_address, + # io_addresses: [] + # }) + # end + case P2P.quorum_read( nodes, %GetNextAddresses{address: address, limit: limit}, conflict_resolver: conflict_resolver + # repair_fun: repair_fun ) do {:ok, %AddressList{addresses: addresses}} -> {:ok, addresses} {:error, :network_issue} = e -> e @@ -420,12 +469,40 @@ defmodule Archethic.TransactionChain do end) end + # repair_fun = fn + # nil, _results_by_node -> + # :ok + + # %NotFound{}, _results_by_node -> + # :ok + + # %Error{}, _results_by_node -> + # :ok + + # %Transaction{address: ^address}, results_by_node -> + # results_by_node + # |> Enum.reduce([], fn + # {_node_public_key, %Transaction{address: ^address}}, acc -> + # acc + + # {node_public_key, _}, acc -> + # [node_public_key | acc] + # end) + # |> P2P.get_nodes_info() + # |> P2P.broadcast_message(%ShardRepair{ + # genesis_address: genesis_address, + # storage_address: address, + # io_addresses: [] + # }) + # end + case P2P.quorum_read( nodes, %GetTransaction{address: address}, conflict_resolver: conflict_resolver, timeout: timeout, acceptance_resolver: acceptance_resolver + # repair_fun: repair_fun ) do {:ok, %NotFound{}} -> {:error, :transaction_not_exists} @@ -694,10 +771,35 @@ defmodule Archethic.TransactionChain do |> List.first() end + # repair_fun = fn + # nil, _results_by_node -> + # :ok + + # %TransactionInputList{inputs: accepted_versioned_inputs}, results_by_node -> + # accepted_length = length(accepted_versioned_inputs) + + # results_by_node + # |> Enum.reduce([], fn {node_public_key, %TransactionInputList{inputs: versioned_inputs}}, + # acc -> + # if length(versioned_inputs) < accepted_length do + # [node_public_key | acc] + # else + # acc + # end + # end) + # |> P2P.get_nodes_info() + # |> P2P.broadcast_message(%ShardRepair{ + # genesis_address: genesis_address, + # storage_address: address, + # io_addresses: Enum.map(versioned_inputs, & &1.input.from) |> Enum.uniq() + # }) + # end + case P2P.quorum_read( nodes, %GetTransactionInputs{address: address, offset: offset, limit: limit}, conflict_resolver: conflict_resolver + # repair_fun: repair_fun ) do {:ok, %TransactionInputList{inputs: versioned_inputs, more?: more?, offset: offset}} -> {versioned_inputs, more?, offset} @@ -773,10 +875,33 @@ defmodule Archethic.TransactionChain do } end + # repair_fun = fn + # nil, _results_by_node -> + # :ok + + # %UnspentOutputList{unspent_outputs: accepted_utxos}, results_by_node -> + # results_by_node + # |> Enum.reduce([], fn {node_public_key, %UnspentOutputList{unspent_outputs: utxos}}, + # acc -> + # if utxos != accepted_utxos do + # [node_public_key | acc] + # else + # acc + # end + # end) + # |> P2P.get_nodes_info() + # |> P2P.broadcast_message(%ShardRepair{ + # genesis_address: genesis_address, + # storage_address: address, + # io_addresses: Enum.map(accepted_utxos, & &1.unspent_output.from) |> Enum.uniq() + # }) + # end + case P2P.quorum_read( nodes, %GetUnspentOutputs{address: address, offset: offset, limit: limit}, conflict_resolver: conflict_resolver + # repair_fun: repair_fun ) do {:ok, %UnspentOutputList{ @@ -806,10 +931,32 @@ defmodule Archethic.TransactionChain do Enum.max_by(results, & &1.length) end + # repair_fun = fn + # nil, _results_by_node -> + # :ok + + # %TransactionChainLength{length: accepted_len}, results_by_node -> + # results_by_node + # |> Enum.reduce([], fn {node_public_key, %TransactionChainLength{length: len}} -> + # if len == accepted_len do + # acc + # else + # [node_public_key | acc] + # end + # end) + # |> P2P.get_nodes_info() + # |> P2P.broadcast_message(%ShardRepair{ + # genesis_address: genesis_address, + # storage_address: last_address, + # io_addresses: [] + # }) + # end + case P2P.quorum_read( nodes, %GetTransactionChainLength{address: address}, conflict_resolver: conflict_resolver + # repair_fun: repair_fun ) do {:ok, %TransactionChainLength{length: length}} -> {:ok, length} @@ -911,25 +1058,53 @@ defmodule Archethic.TransactionChain do end defp do_fetch_first_transaction_address(address, nodes) do - conflict_resolver = fn results -> - case results |> Enum.reject(&match?(%NotFound{}, &1)) do - [] -> - %NotFound{} - - results_filtered -> - Enum.min_by(results_filtered, & &1.timestamp, DateTime) - end - end - case get_first_transaction_address(address) do {:ok, {first_address, _}} -> {:ok, first_address} _ -> + conflict_resolver = fn results -> + case results |> Enum.reject(&match?(%NotFound{}, &1)) do + [] -> + %NotFound{} + + results_filtered -> + Enum.min_by(results_filtered, & &1.timestamp, DateTime) + end + end + + # repair_fun = fn + # nil, _results_by_node -> + # :ok + + # %NotFound{}, _results_by_node -> + # :ok + + # %FirstTransactionAddress{address: accepted_address}, results_by_node -> + # results_by_node + # |> Enum.reduce([], fn + # {node_public_key, %NotFound{}}, acc -> + # [node_public_key | acc] + + # {_node_public_key, %FirstTransactionAddress{address: ^accepted_address}}, acc -> + # acc + + # {node_public_key, %FirstTransactionAddress{address: _}}, acc -> + # [node_public_key | acc] + # end) + # |> P2P.get_nodes_info() + # |> P2P.broadcast_message(%ShardRepair{ + # genesis_address: genesis_address, + # storage_address: address, + # io_addresses: [] + # }) + # end + case P2P.quorum_read( nodes, %GetFirstTransactionAddress{address: address}, conflict_resolver: conflict_resolver + # repair_fun: repair_fun ) do {:ok, %NotFound{}} -> {:error, :does_not_exist}