Skip to content

Commit

Permalink
Add parallel_timeout option to specify max timeout for a parallel e…
Browse files Browse the repository at this point in the history
…xecutions
  • Loading branch information
cabol committed Oct 28, 2020
1 parent cf41eed commit 09b06a3
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 64 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
![CI](https://github.com/cabol/shards/workflows/CI/badge.svg)
[![Codecov](https://codecov.io/gh/cabol/shards/branch/master/graphs/badge.svg)](https://codecov.io/gh/cabol/shards/branch/master/graphs/badge.svg)
[![Hex Version](https://img.shields.io/hexpm/v/shards.svg)](https://hex.pm/packages/shards)
[![Docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/shards)

Why might we need **Sharding/Partitioning** for the ETS tables? The main reason
is to keep the lock contention under control enabling ETS tables to scale out
Expand Down
3 changes: 3 additions & 0 deletions guides/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ table and the options. But in addition to the options given by `ets:new/2`,
mode or not, for the applicable functions, e.g.: `select`, `match`, etc. By
default is set to `false`.

* `{parallel_timeout, timeout()}` - When `parallel` is set to `true`, it
specifies the max timeout for a parallel execution. Defaults to `infinity`.

Wen a new table is created, the [metadata][shards_meta] is created for that
table as well. The purpose of the **metadata** is to store information related
to that table, such as: number of partitions, keyslot function, etc. To learn
Expand Down
33 changes: 19 additions & 14 deletions src/shards.erl
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,10 @@ member(Tab, Key, Meta) ->
%% or not, for the applicable functions, e.g.: `select', `match', etc. By
%% default is set to `false'.
%% </li>
%% <li>
%% `{parallel_timeout, T}' - When `parallel' is set to `true', it specifies
%% the max timeout for a parallel execution. Defaults to `infinity'.
%% </li>
%% </ul>
%%
%% <h3>Access:</h3>
Expand Down Expand Up @@ -1562,19 +1566,20 @@ mapred(Tab, Map, Meta) ->
mapred(Tab, Map, nil, Meta) ->
mapred(Tab, Map, fun(E, Acc) -> [E | Acc] end, Meta);
mapred(Tab, Map, Reduce, {PartitionFun, Meta}) ->
Partitions = shards_meta:partitions(Meta),
Parallel = shards_meta:parallel(Meta),
do_mapred(Tab, Map, Reduce, PartitionFun, Partitions, Parallel);
do_mapred(Tab, Map, Reduce, PartitionFun, Meta);
mapred(Tab, Map, Reduce, Meta) ->
Partitions = shards_meta:partitions(Meta),
Parallel = shards_meta:parallel(Meta),
do_mapred(Tab, Map, Reduce, tid, Partitions, Parallel).
do_mapred(Tab, Map, Reduce, tid, Meta).

%% @private
do_mapred(Tab, Map, Reduce, PartFun, Partitions, true) when Partitions > 1 ->
p_mapred(Tab, Map, Reduce, PartFun, Partitions);
do_mapred(Tab, Map, Reduce, PartFun, Partitions, _Parallel) ->
s_mapred(Tab, Map, Reduce, PartFun, Partitions).
do_mapred(Tab, Map, Reduce, PartFun, Meta) ->
case {shards_meta:partitions(Meta), shards_meta:parallel(Meta)} of
{Partitions, true} when Partitions > 1 ->
ParallelTimeout = shards_meta:parallel_timeout(Meta),
p_mapred(Tab, Map, Reduce, PartFun, Partitions, ParallelTimeout);

{Partitions, false} ->
s_mapred(Tab, Map, Reduce, PartFun, Partitions)
end.

%% @private
s_mapred(Tab, {MapFun, Args}, {ReduceFun, AccIn}, PartFun, Partitions) ->
Expand All @@ -1588,17 +1593,17 @@ s_mapred(Tab, MapFun, ReduceFun, PartFun, Partitions) ->
s_mapred(Tab, Map, Reduce, PartFun, Partitions).

%% @private
p_mapred(Tab, {MapFun, Args}, {ReduceFun, AccIn}, PartFun, Partitions) ->
p_mapred(Tab, {MapFun, Args}, {ReduceFun, AccIn}, PartFun, Partitions, ParallelTimeout) ->
MapResults =
shards_enum:pmap(fun(Idx) ->
PartitionId = shards_partition:PartFun(Tab, Idx),
apply(MapFun, [PartitionId | Args])
end, lists:seq(0, Partitions - 1)),
end, ParallelTimeout, lists:seq(0, Partitions - 1)),

lists:foldl(ReduceFun, AccIn, MapResults);
p_mapred(Tab, MapFun, ReduceFun, PartFun, Partitions) ->
p_mapred(Tab, MapFun, ReduceFun, PartFun, Partitions, ParallelTimeout) ->
{Map, Reduce} = mapred_funs(MapFun, ReduceFun),
p_mapred(Tab, Map, Reduce, PartFun, Partitions).
p_mapred(Tab, Map, Reduce, PartFun, Partitions, ParallelTimeout).

%% @private
mapred_funs(MapFun, ReduceFun) ->
Expand Down
96 changes: 54 additions & 42 deletions src/shards_meta.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
partitions/1,
keyslot_fun/1,
parallel/1,
parallel_timeout/1,
ets_opts/1
]).

Expand Down Expand Up @@ -62,12 +63,13 @@

%% Metadata definition
-record(meta, {
tab_pid = undefined :: pid() | undefined,
keypos = 1 :: pos_integer(),
partitions = ?PARTITIONS :: pos_integer(),
keyslot_fun = fun erlang:phash2/2 :: keyslot_fun(),
parallel = false :: boolean(),
ets_opts = [] :: [term()]
tab_pid = undefined :: pid() | undefined,
keypos = 1 :: pos_integer(),
partitions = ?PARTITIONS :: pos_integer(),
keyslot_fun = fun erlang:phash2/2 :: keyslot_fun(),
parallel = false :: boolean(),
parallel_timeout = infinity :: timeout(),
ets_opts = [] :: [term()]
}).

%% @type t() = #meta{}.
Expand All @@ -76,22 +78,24 @@
-type t() :: #meta{}.

%% @type meta_map() = #{
%% tab_pid => pid(),
%% keypos => pos_integer(),
%% partitions => pos_integer(),
%% keyslot_fun => keyslot_fun(),
%% parallel => boolean(),
%% ets_opts => [term()]
%% tab_pid => pid(),
%% keypos => pos_integer(),
%% partitions => pos_integer(),
%% keyslot_fun => keyslot_fun(),
%% parallel => boolean(),
%% parallel_timeout => timeout(),
%% ets_opts => [term()]
%% }.
%%
%% Defines the map representation for the metadata data type.
-type meta_map() :: #{
tab_pid => pid(),
keypos => pos_integer(),
partitions => pos_integer(),
keyslot_fun => keyslot_fun(),
parallel => boolean(),
ets_opts => [term()]
tab_pid => pid(),
keypos => pos_integer(),
partitions => pos_integer(),
keyslot_fun => keyslot_fun(),
parallel => boolean(),
parallel_timeout => timeout(),
ets_opts => [term()]
}.

%% Exported types
Expand All @@ -117,12 +121,13 @@ new() -> #meta{}.
-spec from_map(Map :: #{atom() => term()}) -> t().
from_map(Map) ->
#meta{
tab_pid = maps:get(tab_pid, Map, self()),
keypos = maps:get(keypos, Map, 1),
partitions = maps:get(partitions, Map, ?PARTITIONS),
keyslot_fun = maps:get(keyslot_fun, Map, fun erlang:phash2/2),
parallel = maps:get(parallel, Map, false),
ets_opts = maps:get(ets_opts, Map, [])
tab_pid = maps:get(tab_pid, Map, self()),
keypos = maps:get(keypos, Map, 1),
partitions = maps:get(partitions, Map, ?PARTITIONS),
keyslot_fun = maps:get(keyslot_fun, Map, fun erlang:phash2/2),
parallel = maps:get(parallel, Map, false),
parallel_timeout = maps:get(parallel_timeout, Map, infinity),
ets_opts = maps:get(ets_opts, Map, [])
}.

%% @doc
Expand All @@ -131,12 +136,13 @@ from_map(Map) ->
-spec to_map(t()) -> meta_map().
to_map(Meta) ->
#{
tab_pid => Meta#meta.tab_pid,
keypos => Meta#meta.keypos,
partitions => Meta#meta.partitions,
keyslot_fun => Meta#meta.keyslot_fun,
parallel => Meta#meta.parallel,
ets_opts => Meta#meta.ets_opts
tab_pid => Meta#meta.tab_pid,
keypos => Meta#meta.keypos,
partitions => Meta#meta.partitions,
keyslot_fun => Meta#meta.keyslot_fun,
parallel => Meta#meta.parallel,
parallel_timeout => Meta#meta.parallel_timeout,
ets_opts => Meta#meta.ets_opts
}.

%% @doc
Expand Down Expand Up @@ -229,37 +235,43 @@ partitions_info(Tab, KeyPrefix) ->
%%%===================================================================

-spec tab_pid(t() | shards:tab()) -> pid().
tab_pid(#meta{tab_pid = TabPid}) ->
TabPid;
tab_pid(#meta{tab_pid = Value}) ->
Value;
tab_pid(Tab) when is_atom(Tab); is_reference(Tab) ->
tab_pid(?MODULE:get(Tab)).

-spec keypos(t() | shards:tab()) -> pos_integer().
keypos(#meta{keypos = Keypos}) ->
Keypos;
keypos(#meta{keypos = Value}) ->
Value;
keypos(Tab) when is_atom(Tab); is_reference(Tab) ->
keypos(?MODULE:get(Tab)).

-spec partitions(t() | shards:tab()) -> pos_integer().
partitions(#meta{partitions = Partitions}) ->
Partitions;
partitions(#meta{partitions = Value}) ->
Value;
partitions(Tab) when is_atom(Tab); is_reference(Tab) ->
partitions(?MODULE:get(Tab)).

-spec keyslot_fun(t() | shards:tab()) -> keyslot_fun().
keyslot_fun(#meta{keyslot_fun = KeyslotFun}) ->
KeyslotFun;
keyslot_fun(#meta{keyslot_fun = Value}) ->
Value;
keyslot_fun(Tab) when is_atom(Tab); is_reference(Tab) ->
keyslot_fun(?MODULE:get(Tab)).

-spec parallel(t() | shards:tab()) -> boolean().
parallel(#meta{parallel = Parallel}) ->
Parallel;
parallel(#meta{parallel = Value}) ->
Value;
parallel(Tab) when is_atom(Tab); is_reference(Tab) ->
parallel(?MODULE:get(Tab)).

-spec parallel_timeout(t() | shards:tab()) -> timeout().
parallel_timeout(#meta{parallel_timeout = Value}) ->
Value;
parallel_timeout(Tab) when is_atom(Tab); is_reference(Tab) ->
parallel_timeout(?MODULE:get(Tab)).

-spec ets_opts(t() | shards:tab()) -> [term()].
ets_opts(#meta{ets_opts = EtsOpts}) ->
EtsOpts;
ets_opts(#meta{ets_opts = Value}) ->
Value;
ets_opts(Tab) when is_atom(Tab); is_reference(Tab) ->
ets_opts(?MODULE:get(Tab)).
3 changes: 3 additions & 0 deletions src/shards_opts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ parse_opts([{keyslot_fun, Val} | Opts], Acc) when is_function(Val, 2) ->
parse_opts(Opts, Acc#{keyslot_fun := Val});
parse_opts([{parallel, Val} | Opts], Acc) when is_boolean(Val) ->
parse_opts(Opts, Acc#{parallel := Val});
parse_opts([{parallel_timeout, Val} | Opts], Acc)
when (is_integer(Val) andalso Val >= 0) orelse Val == infinity ->
parse_opts(Opts, Acc#{parallel_timeout := Val});
parse_opts([{restore, _, _} = Opt | Opts], #{ets_opts := NOpts} = Acc) ->
parse_opts(Opts, Acc#{ets_opts := [Opt | NOpts]});
parse_opts([{heir, _, _} = Opt | Opts], #{ets_opts := NOpts} = Acc) ->
Expand Down
25 changes: 18 additions & 7 deletions test/shards_meta_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,24 @@ t_getters(_Config) ->
true = ?PARTITIONS == shards_meta:partitions(Meta0),
true = fun erlang:phash2/2 == shards_meta:keyslot_fun(Meta0),
false = shards_meta:parallel(Meta0),
infinity = shards_meta:parallel_timeout(Meta0),
[] = shards_meta:ets_opts(Meta0),

Meta1 = shards_meta:from_map(#{keypos => 2, partitions => 4, parallel => true}),
Meta1 =
shards_meta:from_map(#{
keypos => 2,
partitions => 4,
parallel => true,
parallel_timeout => 5000
}),

Self = self(),
Self = shards_meta:tab_pid(Meta1),
2 = shards_meta:keypos(Meta1),
4 = shards_meta:partitions(Meta1),
true = fun erlang:phash2/2 == shards_meta:keyslot_fun(Meta1),
true = shards_meta:parallel(Meta1),
5000 = shards_meta:parallel_timeout(Meta1),
[] = shards_meta:ets_opts(Meta1),
ok.

Expand All @@ -68,6 +77,7 @@ t_getters_with_table(_Config) ->
true = ?PARTITIONS == shards_meta:partitions(Tab),
true = fun erlang:phash2/2 == shards_meta:keyslot_fun(Tab),
false = shards_meta:parallel(Tab),
infinity = shards_meta:parallel_timeout(Tab),
[] = shards_meta:ets_opts(Tab),

true = shards:delete(Tab).
Expand All @@ -78,12 +88,13 @@ t_to_map(_Config) ->
Parts = ?PARTITIONS,

#{
tab_pid := undefined,
keypos := 1,
partitions := Parts,
keyslot_fun := KeyslotFun,
parallel := false,
ets_opts := []
tab_pid := undefined,
keypos := 1,
partitions := Parts,
keyslot_fun := KeyslotFun,
parallel := false,
parallel_timeout := infinity,
ets_opts := []
} = shards_meta:to_map(Meta0),

true = fun erlang:phash2/2 == KeyslotFun.
Expand Down
3 changes: 2 additions & 1 deletion test/shards_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,8 @@ init_shards_new(Opts) ->
duplicate_bag,
{partitions, 5},
{keyslot_fun, fun ?MODULE:pick_shard/2},
{parallel, true}
{parallel, true},
{parallel_timeout, 5000}
| Opts
]),

Expand Down

0 comments on commit 09b06a3

Please sign in to comment.