Skip to content

Commit

Permalink
Handle validation lag
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix committed Oct 16, 2024
1 parent 452f890 commit 8d78af7
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 32 deletions.
4 changes: 2 additions & 2 deletions lib/archethic/mining.ex
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ defmodule Archethic.Mining do
node_public_key :: Crypto.key()
) :: :ok
def confirm_replication(tx_address, signature, node_public_key) do
pid = get_mining_process!(tx_address, 1000)
pid = get_mining_process!(tx_address)
if pid, do: send(pid, {:ack_replication, signature, node_public_key})
:ok
end

defp get_mining_process!(tx_address, timeout \\ 3_000) do
defp get_mining_process!(tx_address, timeout \\ 1000) do
retry_while with: constant_backoff(100) |> expiry(timeout) do
case Registry.lookup(WorkflowRegistry, tx_address) do
[{pid, _}] ->
Expand Down
82 changes: 63 additions & 19 deletions lib/archethic/p2p/message/replicate_pending_transaction_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do

defstruct [:address, :genesis_address, :proof_of_validation]

use Retry

alias Archethic.Crypto
alias Archethic.Utils
alias Archethic.Election
alias Archethic.P2P
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.Error
alias Archethic.P2P.Message.AcknowledgeStorage
alias Archethic.Replication
alias Archethic.TaskSupervisor
alias Archethic.TransactionChain
Expand All @@ -17,10 +23,7 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do
alias Archethic.TransactionChain.TransactionInput
alias Archethic.TransactionChain.VersionedTransactionInput
alias Archethic.TransactionChain.TransactionSummary
alias Archethic.P2P
alias Archethic.P2P.Message.Ok
alias Archethic.P2P.Message.Error
alias Archethic.P2P.Message.AcknowledgeStorage
alias Archethic.Utils

@type t() :: %__MODULE__{
address: Crypto.prepended_hash(),
Expand All @@ -37,35 +40,76 @@ defmodule Archethic.P2P.Message.ReplicatePendingTransactionChain do
},
sender_public_key
) do
with {:ok, tx, validation_inputs} <- Replication.get_transaction_in_commit_pool(address),
true <- ProofOfValidation.valid?(proof, tx.validation_stamp) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
Task.Supervisor.start_child(TaskSupervisor, fn ->
node_public_key = Crypto.first_node_public_key()

with {:ok, tx, validation_inputs} <- get_transaction_data(address),
authorized_nodes <- P2P.authorized_and_available_nodes(tx.validation_stamp.timestamp),
true <- Election.chain_storage_node?(address, node_public_key, authorized_nodes),
sorted_nodes <- ProofOfValidation.sort_nodes(authorized_nodes),
true <- ProofOfValidation.valid?(sorted_nodes, proof, tx.validation_stamp) do
replicate_transaction(tx, validation_inputs, genesis_address, sender_public_key)
end)
else
_ -> :skip
end
end)

%Ok{}
end

%Ok{}
else
_ -> %Error{reason: :invalid_transaction}
defp get_transaction_data(address) do
# As validation can happen without all node returned the validation response
# it is possible to receive this message before processing the validation
case get_data_in_tx_pool(address) do
res = {:ok, _, _} -> res
_ -> fetch_tx_data(address)
end
end

defp get_data_in_tx_pool(address) do
retry_while with: constant_backoff(100) |> expiry(2000) do
case Replication.get_transaction_in_commit_pool(address) do
{:ok, tx, validation_utxo} ->
validation_inputs = convert_unspent_outputs_to_inputs(validation_utxo)
{:halt, {:ok, tx, validation_inputs}}

er ->
{:cont, er}
end
end
end

defp fetch_tx_data(address) do
storage_nodes = Election.storage_nodes(address, P2P.authorized_and_available_nodes())

res =
[
Task.async(fn ->
TransactionChain.fetch_transaction(address, storage_nodes, search_mode: :remote)
end),
Task.async(fn -> TransactionChain.fetch_inputs(address, storage_nodes) end)
]
|> Task.await_many(P2P.Message.get_max_timeout() + 100)

case res do
[{:ok, tx}, validation_inputs] -> {:ok, tx, validation_inputs}
_ -> {:error, :transaction_not_exists}
end
end

defp replicate_transaction(
%Transaction{
tx = %Transaction{
address: tx_address,
validation_stamp: %ValidationStamp{timestamp: validation_time}
} = tx,
},
validation_inputs,
genesis_address,
sender_public_key
) do
authorized_nodes = P2P.authorized_and_available_nodes(validation_time)

Replication.sync_transaction_chain(tx, genesis_address, authorized_nodes)

TransactionChain.write_inputs(
tx_address,
convert_unspent_outputs_to_inputs(validation_inputs)
)
TransactionChain.write_inputs(tx_address, validation_inputs)

P2P.send_message(sender_public_key, get_ack_storage(tx, genesis_address))
end
Expand Down
10 changes: 8 additions & 2 deletions lib/archethic/p2p/message/replicate_transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ defmodule Archethic.P2P.Message.ReplicateTransaction do
@spec process(__MODULE__.t(), Crypto.key()) :: Ok.t()
def process(
%__MODULE__{
transaction: tx = %Transaction{validation_stamp: stamp},
transaction:
tx = %Transaction{
validation_stamp: stamp = %ValidationStamp{timestamp: validation_time}
},
genesis_address: genesis_address,
proof_of_validation: proof_of_validation
},
_
) do
if ProofOfValidation.valid?(proof_of_validation, stamp) do
sorted_nodes =
validation_time |> P2P.authorized_and_available_nodes() |> ProofOfValidation.sort_nodes()

if ProofOfValidation.valid?(sorted_nodes, proof_of_validation, stamp) do
Task.Supervisor.start_child(TaskSupervisor, fn ->
replicate_transaction(tx, genesis_address)
end)
Expand Down
16 changes: 7 additions & 9 deletions lib/archethic/p2p/message/validate_transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ defmodule Archethic.P2P.Message.ValidateTransaction do
alias Archethic.Replication
alias Archethic.TaskSupervisor
alias Archethic.TransactionChain.Transaction
alias Archethic.TransactionChain.Transaction.CrossValidationStamp
alias Archethic.TransactionChain.Transaction.ValidationStamp

alias Archethic.TransactionChain.Transaction.ValidationStamp.LedgerOperations.VersionedUnspentOutput
Expand Down Expand Up @@ -70,15 +69,14 @@ defmodule Archethic.P2P.Message.ValidateTransaction do
defp do_validate_transaction(tx, contract_context, inputs, validation_nodes) do
%Transaction{address: tx_address, validation_stamp: %ValidationStamp{error: stamp_error}} = tx

cross_stamp =
%CrossValidationStamp{inconsistencies: inconsistencies} =
Replication.validate_transaction(tx, contract_context, inputs)
# Since the transaction can be validated before a node finish processing this message
# we store the transaction directly in the waiting pool
if stamp_error == nil, do: Replication.add_transaction_to_commit_pool(tx, inputs)

if stamp_error == nil and Enum.empty?(inconsistencies) do
Replication.add_transaction_to_commit_pool(tx, inputs)
end

message = %CrossValidationDone{address: tx_address, cross_validation_stamp: cross_stamp}
message = %CrossValidationDone{
address: tx_address,
cross_validation_stamp: Replication.validate_transaction(tx, contract_context, inputs)
}

P2P.broadcast_message(validation_nodes, message)
end
Expand Down

0 comments on commit 8d78af7

Please sign in to comment.