Skip to content

Commit

Permalink
Minimum delay between connections (#49)
Browse files Browse the repository at this point in the history
* Minimum delay between connections

So far mero had support for throttling connections attempts when
there are connection failures.
This add a new, general throttling of connections (regardless if
they succed or not) as a min wait time between attempts.
It's useful when there are _large_ number of hosts connecting
to the same memcache backed.

* Formatting

* Spelling
  • Loading branch information
polvorin authored May 15, 2019
1 parent 8e9fdf2 commit f611ff8
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 34 deletions.
11 changes: 11 additions & 0 deletions src/mero_conf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
pool_connection_unused_max_time/1,
connection_unused_max_time/1,
pool_max_connection_delay_time/1,
pool_min_connection_interval/1,
max_connection_delay_time/1,
stat_callback/0,
stat_callback/1,
Expand Down Expand Up @@ -209,6 +210,16 @@ connection_unused_max_time(Val) ->
pool_max_connection_delay_time(Pool) ->
get_env_per_pool(max_connection_delay_time, Pool).

%% @doc: min delay between connection attempts
-spec pool_min_connection_interval(Pool :: atom()) -> integer().
pool_min_connection_interval(Pool) ->
try
get_env_per_pool(min_connection_interval, Pool)
catch _:_ ->
%% Don't want to make this mandatory, but the rest are mandatory already.
undefined
end.

-spec max_connection_delay_time(mero_conf_value(integer())) -> ok.
max_connection_delay_time(Val) ->
application:set_env(mero, max_connection_delay_time, Val).
Expand Down
81 changes: 47 additions & 34 deletions src/mero_pool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@
num_failed_connecting :: non_neg_integer(),

reconnect_wait_time :: non_neg_integer(),
min_connection_interval_ms :: non_neg_integer() | undefined,
worker_module :: atom(),
callback_info :: mfargs(),
pool :: term()}).
pool :: term(),
last_connection_attempt :: non_neg_integer()}).

-callback transaction(client(), atom(), [term()]) -> {error, term()} | {client(), {ok, any()}}.
-callback close(client(), Reason :: term()) -> _.
Expand Down Expand Up @@ -190,7 +192,8 @@ init(Parent, ClusterName, Host, Port, PoolName, WrkModule) ->
reconnect_wait_time = ?RECONNECT_WAIT_TIME,
pool = PoolName,
callback_info = CallBackInfo,
worker_module = WrkModule},
worker_module = WrkModule,
last_connection_attempt = 0},
timer:send_interval(5000, reload_pool_min_max_settings),
pool_loop(schedule_expiration(reload_pool_min_max_settings(State)), Parent, Deb)
end.
Expand Down Expand Up @@ -250,7 +253,9 @@ pool_loop(State, Parent, Deb) ->
State#pool_st.host,
State#pool_st.port,
State#pool_st.callback_info),
?MODULE:pool_loop(State, Parent, Deb)
?MODULE:pool_loop(State#pool_st{
last_connection_attempt = erlang:system_time(millisecond)},
Parent, Deb)
end;
reload_pool_min_max_settings ->
?MODULE:pool_loop(reload_pool_min_max_settings(State), Parent, Deb);
Expand Down Expand Up @@ -289,43 +294,50 @@ get_connection(State, {Pid, Ref} = _From) ->


maybe_spawn_connect(#pool_st{
cluster = ClusterName,
free = Free,
num_connecting = Connecting,
num_connected = Connected,
max_connections = MaxConn,
min_connections = MinConn,
num_connecting = Connecting,
num_failed_connecting = NumFailed,
worker_module = WrkModule,
callback_info = CallbackInfo,
reconnect_wait_time = WaitTime,
pool = Pool,
host = Host,
port = Port} = State) ->
min_connections = MinConn} = State) ->
%% Length could be big.. better to not have more than a few dozens of sockets
%% May be worth to keep track of the length of the free in a counter.

FreeSockets = length(Free),
Needed = calculate_needed(FreeSockets, Connected, Connecting, MaxConn, MinConn),
case {Needed, NumFailed, Connecting} of
%% Need sockets and no failed connections are reported..
%% we create new ones
{Needed, NumFailed, _} when Needed > 0, NumFailed < 1 ->
spawn_connections(ClusterName, Pool, WrkModule, Host, Port, CallbackInfo, Needed),
State#pool_st{num_connecting = Connecting + Needed};

%% Wait before reconnection if more than one successive
%% connection attempt has failed. Don't open more than
%% one connection until an attempt has succeeded again.
{Needed, _, 0} when Needed > 0 ->
erlang:send_after(WaitTime, self(), connect),
State#pool_st{num_connecting = Connecting + 1};

%% We dont need sockets or we have failed connections
%% we wait before reconnecting.
{_, _, _} ->
State
end.
Needed = max(0, calculate_needed(FreeSockets, Connected, Connecting, MaxConn, MinConn)),
maybe_spawn_connect(State, Needed, erlang:system_time(millisecond)).

%% Do not spawn new connections if
%% - There is no need for new connection
%% - There is minimum interval between connections, and that hasn't elapsed yet since the last
%% connection
%% - There are in-flight connection attempts
maybe_spawn_connect(State = #pool_st{min_connection_interval_ms = Min,
last_connection_attempt = Last,
num_connecting = Connecting},
Needed,
Now) when Min /= undefined, (Now - Last) < Min;
Connecting > 0;
Needed == 0->
State;
maybe_spawn_connect(State = #pool_st{num_failed_connecting = NumFailed,
reconnect_wait_time = WaitTime,
num_connecting = Connecting}, _Needed, _Now)
when NumFailed > 0 ->
%% Wait before reconnection if more than one successive
%% connection attempt has failed. Don't open more than
%% one connection until an attempt has succeeded again.
erlang:send_after(WaitTime, self(), connect),
State#pool_st{num_connecting = Connecting + 1};
maybe_spawn_connect(State = #pool_st{num_connecting = Connecting,
pool = Pool,
worker_module = WrkModule,
cluster = ClusterName,
host = Host,
port = Port,
callback_info = CallbackInfo}, Needed, Now) ->
spawn_connections(ClusterName, Pool, WrkModule, Host, Port, CallbackInfo, Needed),
State#pool_st{num_connecting = Connecting + Needed, last_connection_attempt = Now}.


calculate_needed(FreeSockets, Connected, Connecting, MaxConn, MinConn) ->
TotalSockets = Connected + Connecting,
Expand Down Expand Up @@ -503,7 +515,8 @@ filter_expired(#conn{updated = Updated} = Conn, {Now, TTL, ExpConns, ActConns})
%% terminate by themselves (because of timeouts, errors, inactivity, etc)
reload_pool_min_max_settings(State = #pool_st{cluster = ClusterName}) ->
State#pool_st{min_connections = mero_conf:pool_min_free_connections(ClusterName),
max_connections = mero_conf:pool_max_connections(ClusterName)}.
max_connections = mero_conf:pool_max_connections(ClusterName),
min_connection_interval_ms = mero_conf:pool_min_connection_interval(ClusterName)}.

safe_send(PoolName, Cmd) ->
catch PoolName ! Cmd.
Expand Down

0 comments on commit f611ff8

Please sign in to comment.