diff --git a/README.md b/README.md
index 1f728df..e6cba10 100644
--- a/README.md
+++ b/README.md
@@ -9,6 +9,7 @@
![CI](https://github.com/cabol/shards/workflows/CI/badge.svg)
[![Codecov](https://codecov.io/gh/cabol/shards/branch/master/graphs/badge.svg)](https://codecov.io/gh/cabol/shards/branch/master/graphs/badge.svg)
[![Hex Version](https://img.shields.io/hexpm/v/shards.svg)](https://hex.pm/packages/shards)
+[![Docs](https://img.shields.io/badge/docs-hexpm-blue.svg)](https://hexdocs.pm/shards)
Why might we need **Sharding/Partitioning** for the ETS tables? The main reason
is to keep the lock contention under control enabling ETS tables to scale out
diff --git a/guides/getting-started.md b/guides/getting-started.md
index 249d426..8fe9547 100644
--- a/guides/getting-started.md
+++ b/guides/getting-started.md
@@ -61,6 +61,9 @@ table and the options. But in addition to the options given by `ets:new/2`,
mode or not, for the applicable functions, e.g.: `select`, `match`, etc. By
default is set to `false`.
+ * `{parallel_timeout, timeout()}` - When `parallel` is set to `true`, it
+ specifies the max timeout for a parallel execution. Defaults to `infinity`.
+
Wen a new table is created, the [metadata][shards_meta] is created for that
table as well. The purpose of the **metadata** is to store information related
to that table, such as: number of partitions, keyslot function, etc. To learn
diff --git a/src/shards.erl b/src/shards.erl
index 361cb38..defc12d 100644
--- a/src/shards.erl
+++ b/src/shards.erl
@@ -903,6 +903,10 @@ member(Tab, Key, Meta) ->
%% or not, for the applicable functions, e.g.: `select', `match', etc. By
%% default is set to `false'.
%%
+%%
+%% `{parallel_timeout, T}' - When `parallel' is set to `true', it specifies
+%% the max timeout for a parallel execution. Defaults to `infinity'.
+%%
%%
%%
%% Access:
@@ -1562,19 +1566,20 @@ mapred(Tab, Map, Meta) ->
mapred(Tab, Map, nil, Meta) ->
mapred(Tab, Map, fun(E, Acc) -> [E | Acc] end, Meta);
mapred(Tab, Map, Reduce, {PartitionFun, Meta}) ->
- Partitions = shards_meta:partitions(Meta),
- Parallel = shards_meta:parallel(Meta),
- do_mapred(Tab, Map, Reduce, PartitionFun, Partitions, Parallel);
+ do_mapred(Tab, Map, Reduce, PartitionFun, Meta);
mapred(Tab, Map, Reduce, Meta) ->
- Partitions = shards_meta:partitions(Meta),
- Parallel = shards_meta:parallel(Meta),
- do_mapred(Tab, Map, Reduce, tid, Partitions, Parallel).
+ do_mapred(Tab, Map, Reduce, tid, Meta).
%% @private
-do_mapred(Tab, Map, Reduce, PartFun, Partitions, true) when Partitions > 1 ->
- p_mapred(Tab, Map, Reduce, PartFun, Partitions);
-do_mapred(Tab, Map, Reduce, PartFun, Partitions, _Parallel) ->
- s_mapred(Tab, Map, Reduce, PartFun, Partitions).
+do_mapred(Tab, Map, Reduce, PartFun, Meta) ->
+ case {shards_meta:partitions(Meta), shards_meta:parallel(Meta)} of
+ {Partitions, true} when Partitions > 1 ->
+ ParallelTimeout = shards_meta:parallel_timeout(Meta),
+ p_mapred(Tab, Map, Reduce, PartFun, Partitions, ParallelTimeout);
+
+ {Partitions, false} ->
+ s_mapred(Tab, Map, Reduce, PartFun, Partitions)
+ end.
%% @private
s_mapred(Tab, {MapFun, Args}, {ReduceFun, AccIn}, PartFun, Partitions) ->
@@ -1588,17 +1593,17 @@ s_mapred(Tab, MapFun, ReduceFun, PartFun, Partitions) ->
s_mapred(Tab, Map, Reduce, PartFun, Partitions).
%% @private
-p_mapred(Tab, {MapFun, Args}, {ReduceFun, AccIn}, PartFun, Partitions) ->
+p_mapred(Tab, {MapFun, Args}, {ReduceFun, AccIn}, PartFun, Partitions, ParallelTimeout) ->
MapResults =
shards_enum:pmap(fun(Idx) ->
PartitionId = shards_partition:PartFun(Tab, Idx),
apply(MapFun, [PartitionId | Args])
- end, lists:seq(0, Partitions - 1)),
+ end, ParallelTimeout, lists:seq(0, Partitions - 1)),
lists:foldl(ReduceFun, AccIn, MapResults);
-p_mapred(Tab, MapFun, ReduceFun, PartFun, Partitions) ->
+p_mapred(Tab, MapFun, ReduceFun, PartFun, Partitions, ParallelTimeout) ->
{Map, Reduce} = mapred_funs(MapFun, ReduceFun),
- p_mapred(Tab, Map, Reduce, PartFun, Partitions).
+ p_mapred(Tab, Map, Reduce, PartFun, Partitions, ParallelTimeout).
%% @private
mapred_funs(MapFun, ReduceFun) ->
diff --git a/src/shards_meta.erl b/src/shards_meta.erl
index 431d8e2..3576109 100644
--- a/src/shards_meta.erl
+++ b/src/shards_meta.erl
@@ -34,6 +34,7 @@
partitions/1,
keyslot_fun/1,
parallel/1,
+ parallel_timeout/1,
ets_opts/1
]).
@@ -62,12 +63,13 @@
%% Metadata definition
-record(meta, {
- tab_pid = undefined :: pid() | undefined,
- keypos = 1 :: pos_integer(),
- partitions = ?PARTITIONS :: pos_integer(),
- keyslot_fun = fun erlang:phash2/2 :: keyslot_fun(),
- parallel = false :: boolean(),
- ets_opts = [] :: [term()]
+ tab_pid = undefined :: pid() | undefined,
+ keypos = 1 :: pos_integer(),
+ partitions = ?PARTITIONS :: pos_integer(),
+ keyslot_fun = fun erlang:phash2/2 :: keyslot_fun(),
+ parallel = false :: boolean(),
+ parallel_timeout = infinity :: timeout(),
+ ets_opts = [] :: [term()]
}).
%% @type t() = #meta{}.
@@ -76,22 +78,24 @@
-type t() :: #meta{}.
%% @type meta_map() = #{
-%% tab_pid => pid(),
-%% keypos => pos_integer(),
-%% partitions => pos_integer(),
-%% keyslot_fun => keyslot_fun(),
-%% parallel => boolean(),
-%% ets_opts => [term()]
+%% tab_pid => pid(),
+%% keypos => pos_integer(),
+%% partitions => pos_integer(),
+%% keyslot_fun => keyslot_fun(),
+%% parallel => boolean(),
+%% parallel_timeout => timeout(),
+%% ets_opts => [term()]
%% }.
%%
%% Defines the map representation for the metadata data type.
-type meta_map() :: #{
- tab_pid => pid(),
- keypos => pos_integer(),
- partitions => pos_integer(),
- keyslot_fun => keyslot_fun(),
- parallel => boolean(),
- ets_opts => [term()]
+ tab_pid => pid(),
+ keypos => pos_integer(),
+ partitions => pos_integer(),
+ keyslot_fun => keyslot_fun(),
+ parallel => boolean(),
+ parallel_timeout => timeout(),
+ ets_opts => [term()]
}.
%% Exported types
@@ -117,12 +121,13 @@ new() -> #meta{}.
-spec from_map(Map :: #{atom() => term()}) -> t().
from_map(Map) ->
#meta{
- tab_pid = maps:get(tab_pid, Map, self()),
- keypos = maps:get(keypos, Map, 1),
- partitions = maps:get(partitions, Map, ?PARTITIONS),
- keyslot_fun = maps:get(keyslot_fun, Map, fun erlang:phash2/2),
- parallel = maps:get(parallel, Map, false),
- ets_opts = maps:get(ets_opts, Map, [])
+ tab_pid = maps:get(tab_pid, Map, self()),
+ keypos = maps:get(keypos, Map, 1),
+ partitions = maps:get(partitions, Map, ?PARTITIONS),
+ keyslot_fun = maps:get(keyslot_fun, Map, fun erlang:phash2/2),
+ parallel = maps:get(parallel, Map, false),
+ parallel_timeout = maps:get(parallel_timeout, Map, infinity),
+ ets_opts = maps:get(ets_opts, Map, [])
}.
%% @doc
@@ -131,12 +136,13 @@ from_map(Map) ->
-spec to_map(t()) -> meta_map().
to_map(Meta) ->
#{
- tab_pid => Meta#meta.tab_pid,
- keypos => Meta#meta.keypos,
- partitions => Meta#meta.partitions,
- keyslot_fun => Meta#meta.keyslot_fun,
- parallel => Meta#meta.parallel,
- ets_opts => Meta#meta.ets_opts
+ tab_pid => Meta#meta.tab_pid,
+ keypos => Meta#meta.keypos,
+ partitions => Meta#meta.partitions,
+ keyslot_fun => Meta#meta.keyslot_fun,
+ parallel => Meta#meta.parallel,
+ parallel_timeout => Meta#meta.parallel_timeout,
+ ets_opts => Meta#meta.ets_opts
}.
%% @doc
@@ -229,37 +235,43 @@ partitions_info(Tab, KeyPrefix) ->
%%%===================================================================
-spec tab_pid(t() | shards:tab()) -> pid().
-tab_pid(#meta{tab_pid = TabPid}) ->
- TabPid;
+tab_pid(#meta{tab_pid = Value}) ->
+ Value;
tab_pid(Tab) when is_atom(Tab); is_reference(Tab) ->
tab_pid(?MODULE:get(Tab)).
-spec keypos(t() | shards:tab()) -> pos_integer().
-keypos(#meta{keypos = Keypos}) ->
- Keypos;
+keypos(#meta{keypos = Value}) ->
+ Value;
keypos(Tab) when is_atom(Tab); is_reference(Tab) ->
keypos(?MODULE:get(Tab)).
-spec partitions(t() | shards:tab()) -> pos_integer().
-partitions(#meta{partitions = Partitions}) ->
- Partitions;
+partitions(#meta{partitions = Value}) ->
+ Value;
partitions(Tab) when is_atom(Tab); is_reference(Tab) ->
partitions(?MODULE:get(Tab)).
-spec keyslot_fun(t() | shards:tab()) -> keyslot_fun().
-keyslot_fun(#meta{keyslot_fun = KeyslotFun}) ->
- KeyslotFun;
+keyslot_fun(#meta{keyslot_fun = Value}) ->
+ Value;
keyslot_fun(Tab) when is_atom(Tab); is_reference(Tab) ->
keyslot_fun(?MODULE:get(Tab)).
-spec parallel(t() | shards:tab()) -> boolean().
-parallel(#meta{parallel = Parallel}) ->
- Parallel;
+parallel(#meta{parallel = Value}) ->
+ Value;
parallel(Tab) when is_atom(Tab); is_reference(Tab) ->
parallel(?MODULE:get(Tab)).
+-spec parallel_timeout(t() | shards:tab()) -> timeout().
+parallel_timeout(#meta{parallel_timeout = Value}) ->
+ Value;
+parallel_timeout(Tab) when is_atom(Tab); is_reference(Tab) ->
+ parallel_timeout(?MODULE:get(Tab)).
+
-spec ets_opts(t() | shards:tab()) -> [term()].
-ets_opts(#meta{ets_opts = EtsOpts}) ->
- EtsOpts;
+ets_opts(#meta{ets_opts = Value}) ->
+ Value;
ets_opts(Tab) when is_atom(Tab); is_reference(Tab) ->
ets_opts(?MODULE:get(Tab)).
diff --git a/src/shards_opts.erl b/src/shards_opts.erl
index 8e1eb0d..9ee49e4 100644
--- a/src/shards_opts.erl
+++ b/src/shards_opts.erl
@@ -56,6 +56,9 @@ parse_opts([{keyslot_fun, Val} | Opts], Acc) when is_function(Val, 2) ->
parse_opts(Opts, Acc#{keyslot_fun := Val});
parse_opts([{parallel, Val} | Opts], Acc) when is_boolean(Val) ->
parse_opts(Opts, Acc#{parallel := Val});
+parse_opts([{parallel_timeout, Val} | Opts], Acc)
+ when (is_integer(Val) andalso Val >= 0) orelse Val == infinity ->
+ parse_opts(Opts, Acc#{parallel_timeout := Val});
parse_opts([{restore, _, _} = Opt | Opts], #{ets_opts := NOpts} = Acc) ->
parse_opts(Opts, Acc#{ets_opts := [Opt | NOpts]});
parse_opts([{heir, _, _} = Opt | Opts], #{ets_opts := NOpts} = Acc) ->
diff --git a/test/shards_meta_SUITE.erl b/test/shards_meta_SUITE.erl
index cb1a535..57e5a5f 100644
--- a/test/shards_meta_SUITE.erl
+++ b/test/shards_meta_SUITE.erl
@@ -44,15 +44,24 @@ t_getters(_Config) ->
true = ?PARTITIONS == shards_meta:partitions(Meta0),
true = fun erlang:phash2/2 == shards_meta:keyslot_fun(Meta0),
false = shards_meta:parallel(Meta0),
+ infinity = shards_meta:parallel_timeout(Meta0),
[] = shards_meta:ets_opts(Meta0),
- Meta1 = shards_meta:from_map(#{keypos => 2, partitions => 4, parallel => true}),
+ Meta1 =
+ shards_meta:from_map(#{
+ keypos => 2,
+ partitions => 4,
+ parallel => true,
+ parallel_timeout => 5000
+ }),
+
Self = self(),
Self = shards_meta:tab_pid(Meta1),
2 = shards_meta:keypos(Meta1),
4 = shards_meta:partitions(Meta1),
true = fun erlang:phash2/2 == shards_meta:keyslot_fun(Meta1),
true = shards_meta:parallel(Meta1),
+ 5000 = shards_meta:parallel_timeout(Meta1),
[] = shards_meta:ets_opts(Meta1),
ok.
@@ -68,6 +77,7 @@ t_getters_with_table(_Config) ->
true = ?PARTITIONS == shards_meta:partitions(Tab),
true = fun erlang:phash2/2 == shards_meta:keyslot_fun(Tab),
false = shards_meta:parallel(Tab),
+ infinity = shards_meta:parallel_timeout(Tab),
[] = shards_meta:ets_opts(Tab),
true = shards:delete(Tab).
@@ -78,12 +88,13 @@ t_to_map(_Config) ->
Parts = ?PARTITIONS,
#{
- tab_pid := undefined,
- keypos := 1,
- partitions := Parts,
- keyslot_fun := KeyslotFun,
- parallel := false,
- ets_opts := []
+ tab_pid := undefined,
+ keypos := 1,
+ partitions := Parts,
+ keyslot_fun := KeyslotFun,
+ parallel := false,
+ parallel_timeout := infinity,
+ ets_opts := []
} = shards_meta:to_map(Meta0),
true = fun erlang:phash2/2 == KeyslotFun.
diff --git a/test/shards_tests.erl b/test/shards_tests.erl
index 85e877d..f80f24f 100644
--- a/test/shards_tests.erl
+++ b/test/shards_tests.erl
@@ -649,7 +649,8 @@ init_shards_new(Opts) ->
duplicate_bag,
{partitions, 5},
{keyslot_fun, fun ?MODULE:pick_shard/2},
- {parallel, true}
+ {parallel, true},
+ {parallel_timeout, 5000}
| Opts
]),