Skip to content

Commit

Permalink
connection backoff and wake up
Browse files Browse the repository at this point in the history
heartbeat mechanism

unit test the heartbeat mechanism

remove max_connection_attempts

add code_change/4 to add the missing fields in the state
  • Loading branch information
bchamagne committed Sep 19, 2024
1 parent 3a76aac commit 77a511e
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 12 deletions.
5 changes: 5 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ config :archethic, Archethic.P2P.GeoPatch.GeoIP, MockGeoIP

config :archethic, Archethic.P2P.BootstrappingSeeds, enabled: false

config :archethic, Archethic.P2P.Client.Connection,
backoff_strategy: :static,
heartbeat_interval: 200,
reconnect_delay: 50

config :archethic, Archethic.Mining.PendingTransactionValidation, validate_node_ip: true

config :archethic, Archethic.Metrics.Poller, enabled: false
Expand Down
128 changes: 118 additions & 10 deletions lib/archethic/p2p/client/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,20 @@ defmodule Archethic.P2P.Client.Connection do
require Logger

use GenStateMachine, callback_mode: [:handle_event_function, :state_enter], restart: :temporary
@vsn 1
@vsn 2
@table_name :connection_status

@heartbeat_interval Keyword.get(
Application.compile_env(:archethic, __MODULE__, []),
:heartbeat_interval,
10_000
)
@reconnect_delay Keyword.get(
Application.compile_env(:archethic, __MODULE__, []),
:reconnect_delay,
500
)

@doc """
Starts a new connection
"""
Expand Down Expand Up @@ -59,6 +70,18 @@ defmodule Archethic.P2P.Client.Connection do
end
end

@doc """
When called, if disconnect, it will try to connect to socket
Noop if it's already connected
It's used when some node has been offline for a long time
It has connected to us so we know we can connect to it as well
"""
@spec wake_up(Crypto.key()) :: :ok
def wake_up(public_key) do
GenStateMachine.cast(via_tuple(public_key), :wake_up)
end

@doc """
Get the availability timer and reset it with a new start time if it was already started
"""
Expand Down Expand Up @@ -102,7 +125,10 @@ defmodule Archethic.P2P.Client.Connection do
request_id: 0,
messages: %{},
send_tasks: %{},
availability_timer: {nil, 0}
availability_timer: {nil, 0},
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
}

{:ok, :initializing, data, [{:next_event, :internal, {:connect, from}}]}
Expand Down Expand Up @@ -190,7 +216,7 @@ defmodule Archethic.P2P.Client.Connection do
end)

# Reconnect with backoff
actions = [{{:timeout, :reconnect}, 500, nil} | actions]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil} | actions]
{:keep_state, new_data, actions}
end

Expand All @@ -204,20 +230,25 @@ defmodule Archethic.P2P.Client.Connection do

# Start availability timer
new_data =
Map.update!(data, :availability_timer, fn
data
|> Map.put(:reconnect_attempts, 0)
|> Map.put(:heartbeats_sent, 0)
|> Map.put(:heartbeats_received, 0)
|> Map.update!(:availability_timer, fn
{nil, time} ->
{System.monotonic_time(:second), time}

timer ->
timer
end)

Process.send_after(self(), :heartbeat, @heartbeat_interval)

{:keep_state, new_data}
end

def handle_event(:enter, _old_state, :initializing, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, :disconnected, _data), do: :keep_state_and_data
def handle_event(:enter, _old_state, {:connected, _socket}, _data), do: :keep_state_and_data

# called from the :disconnected or :initializing state
def handle_event(
Expand Down Expand Up @@ -258,9 +289,11 @@ defmodule Archethic.P2P.Client.Connection do
end

# this message is used to delay next connection attempt
def handle_event({:timeout, :reconnect}, _event_data, _state, _data) do
def handle_event({:timeout, :reconnect}, _event_data, _state, data) do
actions = [{:next_event, :internal, {:connect, nil}}]
{:keep_state_and_data, actions}

new_data = Map.update!(data, :reconnect_attempts, &(&1 + 1))
{:keep_state, new_data, actions}
end

def handle_event(
Expand All @@ -273,6 +306,25 @@ defmodule Archethic.P2P.Client.Connection do
:keep_state_and_data
end

def handle_event(
:cast,
:wake_up,
:disconnected,
data
) do
actions = [{:next_event, :internal, {:connect, nil}}]
{:keep_state, %{data | reconnect_attempts: 0}, actions}
end

def handle_event(
:cast,
:wake_up,
_,
_data
) do
:keep_state_and_data
end

def handle_event(
:cast,
{:send_message, ref, from, message, timeout},
Expand Down Expand Up @@ -381,6 +433,35 @@ defmodule Archethic.P2P.Client.Connection do
end
end

def handle_event(
:info,
:heartbeat,
{:connected, socket},
data = %{
transport: transport,
heartbeats_sent: heartbeats_sent,
heartbeats_received: heartbeats_received
}
) do
# disconnect if missed more than 2 heartbeats
if heartbeats_sent - heartbeats_received >= 2 do
{:next_state, :disconnected, data}
else
transport.handle_send(socket, "hb")
Process.send_after(self(), :heartbeat, @heartbeat_interval)
{:keep_state, %{data | heartbeats_sent: heartbeats_sent + 1}}
end
end

def handle_event(
:info,
:heartbeat,
_state,
_data
) do
:keep_state_and_data
end

def handle_event(:info, {ref, :ok}, {:connected, _socket}, data = %{send_tasks: send_tasks}) do
case Map.pop(send_tasks, ref) do
{nil, _} ->
Expand Down Expand Up @@ -440,13 +521,13 @@ defmodule Archethic.P2P.Client.Connection do

# Task.async sending us the result of the handle_connect
def handle_event(:info, {_ref, {:error, _reason, nil}}, _, data) do
actions = [{{:timeout, :reconnect}, 500, nil}]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}]
{:next_state, :disconnected, data, actions}
end

def handle_event(:info, {_ref, {:error, reason, from}}, _, data) do
send(from, {:error, reason})
actions = [{{:timeout, :reconnect}, 500, nil}]
actions = [{{:timeout, :reconnect}, backoff(data.reconnect_attempts), nil}]
{:next_state, :disconnected, data, actions}
end

Expand All @@ -456,7 +537,8 @@ defmodule Archethic.P2P.Client.Connection do
{:connected, _socket},
data = %{
transport: transport,
node_public_key: node_public_key
node_public_key: node_public_key,
heartbeats_received: heartbeats_received
}
) do
case transport.handle_message(event) do
Expand All @@ -467,6 +549,9 @@ defmodule Archethic.P2P.Client.Connection do

{:next_state, :disconnected, data}

{:ok, "hb"} ->
{:keep_state, %{data | heartbeats_received: heartbeats_received + 1}}

{:ok, msg} ->
set_node_connected(node_public_key)

Expand Down Expand Up @@ -540,5 +625,28 @@ defmodule Archethic.P2P.Client.Connection do
:ets.delete(@table_name, node_public_key)
end

def code_change(1, state, data, _extra) do
{:ok, state,
data
|> Map.merge(%{
reconnect_attempts: 0,
heartbeats_sent: 0,
heartbeats_received: 0
})}
end

def code_change(_old_vsn, state, data, _extra), do: {:ok, state, data}

defp backoff(attempts) do
config = Application.get_env(:archethic, __MODULE__, [])

case Keyword.get(config, :backoff_strategy, :exponential) do
:static ->
@reconnect_delay

:exponential ->
# cap at 24hours
min(:timer.hours(24), 2 ** attempts * @reconnect_delay)
end
end
end
18 changes: 18 additions & 0 deletions lib/archethic/p2p/listener_protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Archethic.P2P.ListenerProtocol do

alias Archethic.Crypto
alias Archethic.P2P
alias Archethic.P2P.Client.Connection
alias Archethic.P2P.Message
alias Archethic.P2P.MessageEnvelop
alias Archethic.TaskSupervisor
Expand All @@ -35,6 +36,19 @@ defmodule Archethic.P2P.ListenerProtocol do
})
end

def handle_info(
{_transport, socket, "hb"},
state = %{transport: transport}
) do
:inet.setopts(socket, active: :once)

Task.Supervisor.start_child(TaskSupervisor, fn ->
transport.send(socket, "hb")
end)

{:noreply, state}
end

def handle_info(
{_transport, socket, err},
state = %{transport: transport, ip: ip}
Expand All @@ -45,6 +59,7 @@ defmodule Archethic.P2P.ListenerProtocol do
end

transport.close(socket)

{:noreply, state}
end

Expand Down Expand Up @@ -84,6 +99,9 @@ defmodule Archethic.P2P.ListenerProtocol do
)

if valid_signature? do
# we may attempt to wakeup a connection that offline
Connection.wake_up(sender_pkey)

message
|> process_msg(sender_pkey)
|> encode_response(message_id, sender_pkey)
Expand Down
66 changes: 64 additions & 2 deletions test/archethic/p2p/client/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ defmodule Archethic.P2P.Client.ConnectionTest do

alias Archethic.Utils

@heartbeat_interval Keyword.get(
Application.compile_env(:archethic, Connection, []),
:heartbeat_interval,
10_000
)

test "start_link/1 should open a socket and a connection worker and initialize the backlog and lookup tables" do
{:ok, pid} =
Connection.start_link(
Expand Down Expand Up @@ -166,7 +172,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do
{:ok, make_ref()}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok
def handle_send(_socket, _), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end
Expand Down Expand Up @@ -555,6 +561,57 @@ defmodule Archethic.P2P.Client.ConnectionTest do
end
end

describe "Stale detection" do
test "should change state to disconnected once a few heartbeats are missed" do
defmodule MockTransportStale do
alias Archethic.P2P.Client.Transport

@behaviour Transport

def handle_connect({127, 0, 0, 1}, _port) do
conn_count = :persistent_term.get(:conn_count, 0)
:persistent_term.put(:conn_count, conn_count + 1)

if conn_count == 0 do
{:ok, make_ref()}
else
{:error, :timeout}
end
end

def handle_send(_socket, "hb") do
hb_count = :persistent_term.get(:hb_count, 0)
:persistent_term.put(:hb_count, hb_count + 1)

# become stale after 5 hbs
if hb_count <= 5 do
send(self(), {:tcp, make_ref(), "hb"})
end

:ok
end

def handle_send(_socket, _), do: :ok

def handle_message({_, _, data}), do: {:ok, data}
end

{:ok, pid} =
Connection.start_link(
transport: MockTransportStale,
ip: {127, 0, 0, 1},
port: 3000,
node_public_key: Crypto.first_node_public_key()
)

Process.sleep(@heartbeat_interval * 5)
assert {{:connected, _}, _} = :sys.get_state(pid)

Process.sleep(@heartbeat_interval * 5)
assert {:disconnected, _} = :sys.get_state(pid)
end
end

defmodule MockTransport do
alias Archethic.P2P.Client.Transport

Expand All @@ -564,6 +621,11 @@ defmodule Archethic.P2P.Client.ConnectionTest do
{:ok, make_ref()}
end

def handle_send(_socket, "hb") do
send(self(), {:tcp, make_ref(), "hb"})
:ok
end

def handle_send(_socket, _data), do: :ok

def handle_message({_, _, data}), do: {:ok, data}
Expand All @@ -578,7 +640,7 @@ defmodule Archethic.P2P.Client.ConnectionTest do
{:ok, make_ref()}
end

def handle_send(_socket, <<0::32, _rest::bitstring>>), do: :ok
def handle_send(_socket, _), do: :ok

def handle_message({_, _, _}), do: {:error, :closed}
end
Expand Down

0 comments on commit 77a511e

Please sign in to comment.