From 77a511ef86b2ec79d4d58b59b8183951f84afde2 Mon Sep 17 00:00:00 2001 From: bchamagne Date: Fri, 8 Dec 2023 11:32:29 +0100 Subject: [PATCH] connection backoff and wake up heartbeat mechanism unit test the heartbeat mechanism remove max_connection_attempts add code_change/4 to add the missing fields in the state --- config/test.exs | 5 + lib/archethic/p2p/client/connection.ex | 128 ++++++++++++++++-- lib/archethic/p2p/listener_protocol.ex | 18 +++ test/archethic/p2p/client/connection_test.exs | 66 ++++++++- 4 files changed, 205 insertions(+), 12 deletions(-) diff --git a/config/test.exs b/config/test.exs index f5854ff80..279842bce 100755 --- a/config/test.exs +++ b/config/test.exs @@ -136,6 +136,11 @@ config :archethic, Archethic.P2P.GeoPatch.GeoIP, MockGeoIP config :archethic, Archethic.P2P.BootstrappingSeeds, enabled: false +config :archethic, Archethic.P2P.Client.Connection, + backoff_strategy: :static, + heartbeat_interval: 200, + reconnect_delay: 50 + config :archethic, Archethic.Mining.PendingTransactionValidation, validate_node_ip: true config :archethic, Archethic.Metrics.Poller, enabled: false diff --git a/lib/archethic/p2p/client/connection.ex b/lib/archethic/p2p/client/connection.ex index 32601d416..3e4e35c21 100644 --- a/lib/archethic/p2p/client/connection.ex +++ b/lib/archethic/p2p/client/connection.ex @@ -21,9 +21,20 @@ defmodule Archethic.P2P.Client.Connection do require Logger use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary - @vsn 1 + @vsn 2 @table_name :connection_status + @heartbeat_interval Keyword.get( + Application.compile_env(:archethic, __MODULE__, []), + :heartbeat_interval, + 10_000 + ) + @reconnect_delay Keyword.get( + Application.compile_env(:archethic, __MODULE__, []), + :reconnect_delay, + 500 + ) + @doc """ Starts a new connection """ @@ -59,6 +70,18 @@ defmodule Archethic.P2P.Client.Connection do end end + @doc """ + When called, if disconnect, it will try to connect to socket + Noop if it's already connected + + It's used when some node has been offline for a long time + It has connected to us so we know we can connect to it as well + """ + @spec wake_up(Crypto.key()) :: :ok + def wake_up(public_key) do + GenStateMachine.cast(via_tuple(public_key), :wake_up) + end + @doc """ Get the availability timer and reset it with a new start time if it was already started """ @@ -102,7 +125,10 @@ defmodule Archethic.P2P.Client.Connection do request_id: 0, messages: %{}, send_tasks: %{}, - availability_timer: {nil, 0} + availability_timer: {nil, 0}, + reconnect_attempts: 0, + heartbeats_sent: 0, + heartbeats_received: 0 } {:ok, :initializing, data, [{:next_event, :internal, {:connect, from}}]} @@ -190,7 +216,7 @@ defmodule Archethic.P2P.Client.Connection do end) # Reconnect with backoff - actions = [{{:timeout, :reconnect}, 500, nil} | actions] + actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil} | actions] {:keep_state, new_data, actions} end @@ -204,7 +230,11 @@ defmodule Archethic.P2P.Client.Connection do # Start availability timer new_data = - Map.update!(data, :availability_timer, fn + data + |> Map.put(:reconnect_attempts, 0) + |> Map.put(:heartbeats_sent, 0) + |> Map.put(:heartbeats_received, 0) + |> Map.update!(:availability_timer, fn {nil, time} -> {System.monotonic_time(:second), time} @@ -212,12 +242,13 @@ defmodule Archethic.P2P.Client.Connection do timer end) + Process.send_after(self(), :heartbeat, @heartbeat_interval) + {:keep_state, new_data} end def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data - def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data # called from the :disconnected or :initializing state def handle_event( @@ -258,9 +289,11 @@ defmodule Archethic.P2P.Client.Connection do end # this message is used to delay next connection attempt - def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do + def handle_event({:timeout, :reconnect}, _event_data, _state, data) do actions = [{:next_event, :internal, {:connect, nil}}] - {:keep_state_and_data, actions} + + new_data = Map.update!(data, :reconnect_attempts, &(&1 + 1)) + {:keep_state, new_data, actions} end def handle_event( @@ -273,6 +306,25 @@ defmodule Archethic.P2P.Client.Connection do :keep_state_and_data end + def handle_event( + :cast, + :wake_up, + :disconnected, + data + ) do + actions = [{:next_event, :internal, {:connect, nil}}] + {:keep_state, %{data | reconnect_attempts: 0}, actions} + end + + def handle_event( + :cast, + :wake_up, + _, + _data + ) do + :keep_state_and_data + end + def handle_event( :cast, {:send_message, ref, from, message, timeout}, @@ -381,6 +433,35 @@ defmodule Archethic.P2P.Client.Connection do end end + def handle_event( + :info, + :heartbeat, + {:connected, socket}, + data = %{ + transport: transport, + heartbeats_sent: heartbeats_sent, + heartbeats_received: heartbeats_received + } + ) do + # disconnect if missed more than 2 heartbeats + if heartbeats_sent - heartbeats_received >= 2 do + {:next_state, :disconnected, data} + else + transport.handle_send(socket, "hb") + Process.send_after(self(), :heartbeat, @heartbeat_interval) + {:keep_state, %{data | heartbeats_sent: heartbeats_sent + 1}} + end + end + + def handle_event( + :info, + :heartbeat, + _state, + _data + ) do + :keep_state_and_data + end + def handle_event(:info, {ref, :ok}, {:connected, _socket}, data = %{send_tasks: send_tasks}) do case Map.pop(send_tasks, ref) do {nil, _} -> @@ -440,13 +521,13 @@ defmodule Archethic.P2P.Client.Connection do # Task.async sending us the result of the handle_connect def handle_event(:info, {_ref, {:error, _reason, nil}}, _, data) do - actions = [{{:timeout, :reconnect}, 500, nil}] + actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}] {:next_state, :disconnected, data, actions} end def handle_event(:info, {_ref, {:error, reason, from}}, _, data) do send(from, {:error, reason}) - actions = [{{:timeout, :reconnect}, 500, nil}] + actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}] {:next_state, :disconnected, data, actions} end @@ -456,7 +537,8 @@ defmodule Archethic.P2P.Client.Connection do {:connected, _socket}, data = %{ transport: transport, - node_public_key: node_public_key + node_public_key: node_public_key, + heartbeats_received: heartbeats_received } ) do case transport.handle_message(event) do @@ -467,6 +549,9 @@ defmodule Archethic.P2P.Client.Connection do {:next_state, :disconnected, data} + {:ok, "hb"} -> + {:keep_state, %{data | heartbeats_received: heartbeats_received + 1}} + {:ok, msg} -> set_node_connected(node_public_key) @@ -540,5 +625,28 @@ defmodule Archethic.P2P.Client.Connection do :ets.delete(@table_name, node_public_key) end + def code_change(1, state, data, _extra) do + {:ok, state, + data + |> Map.merge(%{ + reconnect_attempts: 0, + heartbeats_sent: 0, + heartbeats_received: 0 + })} + end + def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data} + + defp backoff(attempts) do + config = Application.get_env(:archethic, __MODULE__, []) + + case Keyword.get(config, :backoff_strategy, :exponential) do + :static -> + @reconnect_delay + + :exponential -> + # cap at 24hours + min(:timer.hours(24), 2 ** attempts * @reconnect_delay) + end + end end diff --git a/lib/archethic/p2p/listener_protocol.ex b/lib/archethic/p2p/listener_protocol.ex index 4c9894522..1dacd951d 100644 --- a/lib/archethic/p2p/listener_protocol.ex +++ b/lib/archethic/p2p/listener_protocol.ex @@ -9,6 +9,7 @@ defmodule Archethic.P2P.ListenerProtocol do alias Archethic.Crypto alias Archethic.P2P + alias Archethic.P2P.Client.Connection alias Archethic.P2P.Message alias Archethic.P2P.MessageEnvelop alias Archethic.TaskSupervisor @@ -35,6 +36,19 @@ defmodule Archethic.P2P.ListenerProtocol do }) end + def handle_info( + {_transport, socket, "hb"}, + state = %{transport: transport} + ) do + :inet.setopts(socket, active: :once) + + Task.Supervisor.start_child(TaskSupervisor, fn -> + transport.send(socket, "hb") + end) + + {:noreply, state} + end + def handle_info( {_transport, socket, err}, state = %{transport: transport, ip: ip} @@ -45,6 +59,7 @@ defmodule Archethic.P2P.ListenerProtocol do end transport.close(socket) + {:noreply, state} end @@ -84,6 +99,9 @@ defmodule Archethic.P2P.ListenerProtocol do ) if valid_signature? do + # we may attempt to wakeup a connection that offline + Connection.wake_up(sender_pkey) + message |> process_msg(sender_pkey) |> encode_response(message_id, sender_pkey) diff --git a/test/archethic/p2p/client/connection_test.exs b/test/archethic/p2p/client/connection_test.exs index e3cc0bf04..45f6a1a09 100644 --- a/test/archethic/p2p/client/connection_test.exs +++ b/test/archethic/p2p/client/connection_test.exs @@ -14,6 +14,12 @@ defmodule Archethic.P2P.Client.ConnectionTest do alias Archethic.Utils + @heartbeat_interval Keyword.get( + Application.compile_env(:archethic, Connection, []), + :heartbeat_interval, + 10_000 + ) + test "start_link/1 should open a socket and a connection worker and initialize the backlog and lookup tables" do {:ok, pid} = Connection.start_link( @@ -166,7 +172,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:ok, make_ref()} end - def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + def handle_send(_socket, _), do: :ok def handle_message({_, _, _}), do: {:error, :closed} end @@ -555,6 +561,57 @@ defmodule Archethic.P2P.Client.ConnectionTest do end end + describe "Stale detection" do + test "should change state to disconnected once a few heartbeats are missed" do + defmodule MockTransportStale do + alias Archethic.P2P.Client.Transport + + @behaviour Transport + + def handle_connect({127, 0, 0, 1}, _port) do + conn_count = :persistent_term.get(:conn_count, 0) + :persistent_term.put(:conn_count, conn_count + 1) + + if conn_count == 0 do + {:ok, make_ref()} + else + {:error, :timeout} + end + end + + def handle_send(_socket, "hb") do + hb_count = :persistent_term.get(:hb_count, 0) + :persistent_term.put(:hb_count, hb_count + 1) + + # become stale after 5 hbs + if hb_count <= 5 do + send(self(), {:tcp, make_ref(), "hb"}) + end + + :ok + end + + def handle_send(_socket, _), do: :ok + + def handle_message({_, _, data}), do: {:ok, data} + end + + {:ok, pid} = + Connection.start_link( + transport: MockTransportStale, + ip: {127, 0, 0, 1}, + port: 3000, + node_public_key: Crypto.first_node_public_key() + ) + + Process.sleep(@heartbeat_interval * 5) + assert {{:connected, _}, _} = :sys.get_state(pid) + + Process.sleep(@heartbeat_interval * 5) + assert {:disconnected, _} = :sys.get_state(pid) + end + end + defmodule MockTransport do alias Archethic.P2P.Client.Transport @@ -564,6 +621,11 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:ok, make_ref()} end + def handle_send(_socket, "hb") do + send(self(), {:tcp, make_ref(), "hb"}) + :ok + end + def handle_send(_socket, _data), do: :ok def handle_message({_, _, data}), do: {:ok, data} @@ -578,7 +640,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do {:ok, make_ref()} end - def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok + def handle_send(_socket, _), do: :ok def handle_message({_, _, _}), do: {:error, :closed} end