diff --git a/lib/archethic/mining.ex b/lib/archethic/mining.ex index f953a3106..6b7df0400 100644 --- a/lib/archethic/mining.ex +++ b/lib/archethic/mining.ex @@ -258,12 +258,12 @@ defmodule Archethic.Mining do node_public_key :: Crypto.key() ) :: :ok def confirm_replication(tx_address, signature, node_public_key) do - pid = get_mining_process!(tx_address, 1000) + pid = get_mining_process!(tx_address) if pid, do: send(pid, {:ack_replication, signature, node_public_key}) :ok end - defp get_mining_process!(tx_address, timeout \\ 3_000) do + defp get_mining_process!(tx_address, timeout \\ 1000) do retry_while with: constant_backoff(100) |> expiry(timeout) do case Registry.lookup(WorkflowRegistry, tx_address) do [{pid, _}] -> diff --git a/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex b/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex index a24c18b83..a965eaff8 100644 --- a/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex +++ b/lib/archethic/p2p/message/replicate_pending_transaction_chain.ex @@ -3,8 +3,14 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do defstruct [:address, :genesis_address, :proof_of_validation] + use Retry + alias Archethic.Crypto - alias Archethic.Utils + alias Archethic.Election + alias Archethic.P2P + alias Archethic.P2P.Message.Ok + alias Archethic.P2P.Message.Error + alias Archethic.P2P.Message.AcknowledgeStorage alias Archethic.Replication alias Archethic.TaskSupervisor alias Archethic.TransactionChain @@ -17,10 +23,7 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do alias Archethic.TransactionChain.TransactionInput alias Archethic.TransactionChain.VersionedTransactionInput alias Archethic.TransactionChain.TransactionSummary - alias Archethic.P2P - alias Archethic.P2P.Message.Ok - alias Archethic.P2P.Message.Error - alias Archethic.P2P.Message.AcknowledgeStorage + alias Archethic.Utils @type t() :: %__MODULE__{ address: Crypto.prepended_hash(), @@ -37,23 +40,67 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do }, sender_public_key ) do - with {:ok, tx, validation_inputs} <- Replication.get_transaction_in_commit_pool(address), - true <- ProofOfValidation.valid?(proof, tx.validation_stamp) do - Task.Supervisor.start_child(TaskSupervisor, fn -> + Task.Supervisor.start_child(TaskSupervisor, fn -> + authorized_nodes = P2P.authorized_and_available_nodes() + node_public_key = Crypto.first_node_public_key() + + with true <- Election.chain_storage_node?(address, node_public_key, authorized_nodes), + {:ok, tx, validation_inputs} <- get_transaction_data(address), + true <- ProofOfValidation.valid?(proof, tx.validation_stamp) do replicate_transaction(tx, validation_inputs, genesis_address, sender_public_key) - end) + else + _ -> :skip + end + end) + + %Ok{} + end - %Ok{} - else - _ -> %Error{reason: :invalid_transaction} + defp get_transaction_data(address) do + # As validation can happen without all node returned the validation response + # it is possible to receive this message before processing the validation + case get_data_in_tx_pool(address) do + res = {:ok, _, _} -> res + _ -> fetch_tx_data(address) + end + end + + defp get_data_in_tx_pool(address) do + retry_while with: constant_backoff(100) |> expiry(2000) do + case Replication.get_transaction_in_commit_pool(address) do + {:ok, tx, validation_utxo} -> + validation_inputs = convert_unspent_outputs_to_inputs(validation_utxo) + {:halt, {:ok, tx, validation_inputs}} + + er -> + {:cont, er} + end + end + end + + defp fetch_tx_data(address) do + storage_nodes = Election.storage_nodes(address, P2P.authorized_and_available_nodes()) + + res = + [ + Task.async(fn -> + TransactionChain.fetch_transaction(address, storage_nodes, search_mode: :remote) + end), + Task.async(fn -> TransactionChain.fetch_inputs(address, storage_nodes) end) + ] + |> Task.await_many(P2P.Message.get_max_timeout() + 100) + + case res do + [{:ok, tx}, validation_inputs] -> {:ok, tx, validation_inputs} + _ -> {:error, :transaction_not_exists} end end defp replicate_transaction( - %Transaction{ + tx = %Transaction{ address: tx_address, validation_stamp: %ValidationStamp{timestamp: validation_time} - } = tx, + }, validation_inputs, genesis_address, sender_public_key @@ -61,11 +108,7 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do authorized_nodes = P2P.authorized_and_available_nodes(validation_time) Replication.sync_transaction_chain(tx, genesis_address, authorized_nodes) - - TransactionChain.write_inputs( - tx_address, - convert_unspent_outputs_to_inputs(validation_inputs) - ) + TransactionChain.write_inputs(tx_address, validation_inputs) P2P.send_message(sender_public_key, get_ack_storage(tx, genesis_address)) end diff --git a/lib/archethic/p2p/message/validate_transaction.ex b/lib/archethic/p2p/message/validate_transaction.ex index 46b9ede8c..528247866 100644 --- a/lib/archethic/p2p/message/validate_transaction.ex +++ b/lib/archethic/p2p/message/validate_transaction.ex @@ -14,7 +14,6 @@ defmodule Archethic.P2P.Message.ValidateTransaction do alias Archethic.Replication alias Archethic.TaskSupervisor alias Archethic.TransactionChain.Transaction - alias Archethic.TransactionChain.Transaction.CrossValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput @@ -70,15 +69,14 @@ defmodule Archethic.P2P.Message.ValidateTransaction do defp do_validate_transaction(tx, contract_context, inputs, validation_nodes) do %Transaction{address: tx_address, validation_stamp: %ValidationStamp{error: stamp_error}} = tx - cross_stamp = - %CrossValidationStamp{inconsistencies: inconsistencies} = - Replication.validate_transaction(tx, contract_context, inputs) + # Since the transaction can be validated before a node finish processing this message + # we store the transaction directly in the waiting pool + if stamp_error == nil, do: Replication.add_transaction_to_commit_pool(tx, inputs) - if stamp_error == nil and Enum.empty?(inconsistencies) do - Replication.add_transaction_to_commit_pool(tx, inputs) - end - - message = %CrossValidationDone{address: tx_address, cross_validation_stamp: cross_stamp} + message = %CrossValidationDone{ + address: tx_address, + cross_validation_stamp: Replication.validate_transaction(tx, contract_context, inputs) + } P2P.broadcast_message(validation_nodes, message) end