Skip to content

Commit

Permalink
Merge pull request #124 from inaka/cabol.122.riak_backend_config_enha…
Browse files Browse the repository at this point in the history
…ncement

Cabol.122.riak backend config enhancement
  • Loading branch information
Brujo Benavides committed Mar 12, 2015
2 parents ecab5e4 + 0a7f213 commit d0a8dcd
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 58 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ First, let's create and activate a bucket type simply called maps that is set up
to store Riak maps:

$ riak-admin bucket-type create maps '{"props":{"datatype":"map"}}'
$ riak-admin bucket-type activate mapsdmin bucket-type activate maps
$ riak-admin bucket-type activate maps

Now, let's create a search index called `sumo_test_index` using the default
schema:
Expand Down
55 changes: 20 additions & 35 deletions src/sumo_backend_riak.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,27 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

%%% Public API.
-export(
[ get_connection/1,
get_state/1
]).
-export([get_connection/1]).

%%% Exports for sumo_backend
-export(
[ start_link/2
]).
-export([start_link/2]).

%%% Exports for gen_server
-export(
[ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Types.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-record(state, {conn :: pid(), bucket :: binary(), index :: binary()}).
-record(state, {host :: string(),
port :: non_neg_integer(),
opts :: [term()]}).
-type state() :: #state{}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand All @@ -68,10 +64,6 @@ start_link(Name, Options) ->
get_connection(Name) ->
gen_server:call(Name, get_connection).

-spec get_state(atom() | pid()) -> state().
get_state(Name) ->
gen_server:call(Name, get_state).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server stuff.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand All @@ -82,23 +74,16 @@ init(Options) ->
Host = proplists:get_value(host, Options, "127.0.0.1"),
Port = proplists:get_value(port, Options, 8087),
Opts = riak_opts(Options),
%% Get DB parameters
BucketType = iolist_to_binary(
proplists:get_value(bucket_type, Options)),
Bucket = iolist_to_binary(
proplists:get_value(bucket, Options, <<"sumo_test">>)),
Index = iolist_to_binary(
proplists:get_value(index, Options, <<"sumo_test_index">>)),
%% Place Riak connection
{ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts),
%% Initial state
{ok, #state{conn = Conn, bucket = {BucketType, Bucket}, index = Index}}.
{ok, #state{host = Host, port = Port, opts = Opts}}.

%% @todo: implement connection pool.
%% In other cases is a built-in feature of the client.
-spec handle_call(term(), term(), state()) -> {reply, term(), state()}.
handle_call(get_connection, _From, State = #state{conn = Conn}) ->
{reply, Conn, State};
handle_call(get_state, _From, State) ->
{reply, State, State}.
handle_call(get_connection,
_From,
State = #state{host = Host, port = Port, opts = Opts}) ->
{ok, Conn} = riakc_pb_socket:start_link(Host, Port, Opts),
{reply, Conn, State}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Unused Callbacks
Expand Down
68 changes: 50 additions & 18 deletions src/sumo_store_riak.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,21 @@
%% Types.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

-record(state, {conn :: pid(), bucket :: binary(), index :: binary()}).
%% Riak quorum parameters.
%% @see <a href="http://docs.basho.com/riak/latest/dev/using/basics"/>
-type r_param() :: r | pr | notfound_ok.
-type w_param() :: w | pw | dw | returnbody.

%% conn: is the Pid of the gen_server that holds the connection with Riak
%% bucket: Riak bucket (per store)
%% index: Riak index to be used by Riak Search
%% read_quorum: Riak read quorum parameters.
%% write_quorum: Riak write quorum parameters.
-record(state, {conn :: pid(),
bucket :: binary(),
index :: binary(),
read_quorum :: [{r_param(), integer() | (true | false)}],
write_quorum :: [{w_param(), integer() | (true | false)}]}).
-type state() :: #state{}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand All @@ -61,19 +75,33 @@
-spec init(
term()
) -> {ok, term()}.
init(Options) ->
init(Opts) ->
% The storage backend key in the options specifies the name of the process
% which creates and initializes the storage backend.
Backend = proplists:get_value(storage_backend, Options),
State = sumo_backend_riak:get_state(Backend),
Backend = proplists:get_value(storage_backend, Opts),
Conn = sumo_backend_riak:get_connection(Backend),
BucketType = iolist_to_binary(
proplists:get_value(bucket_type, Opts, <<"maps">>)),
Bucket = iolist_to_binary(
proplists:get_value(bucket, Opts, <<"sumo">>)),
Index = iolist_to_binary(
proplists:get_value(index, Opts, <<"sumo_index">>)),
Rq = proplists:get_value(read_quorum, Opts, []),
Wq = proplists:get_value(write_quorum, Opts, []),
State = #state{conn = Conn,
bucket = {BucketType, Bucket},
index = Index,
read_quorum = Rq,
write_quorum = Wq},
{ok, State}.

-spec persist(
sumo_internal:doc(), state()
) -> sumo_store:result(sumo_internal:doc(), state()).
persist(Doc, #state{conn = Conn, bucket = Bucket} = State) ->
persist(Doc,
#state{conn = Conn, bucket = Bucket, write_quorum = Wq} = State) ->
{Id, NewDoc} = new_doc(Doc, State),
case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc)) of
case update_map(Conn, Bucket, Id, doc_to_rmap(NewDoc), Wq) of
{error, Error} ->
{error, Error, State};
_ ->
Expand Down Expand Up @@ -122,9 +150,10 @@ delete_all(_DocName, #state{conn = Conn, bucket = Bucket} = State) ->
-spec find_all(
sumo:schema_name(), state()
) -> sumo_store:result([sumo_internal:doc()], state()).
find_all(DocName, #state{conn = Conn, bucket = Bucket} = State) ->
find_all(DocName,
#state{conn = Conn, bucket = Bucket, read_quorum = Rq} = State) ->
Get = fun({C, B, Kst}, Acc) ->
fetch_map_bulk(DocName, C, B, Kst) ++ Acc
fetch_map_bulk(DocName, C, B, Kst, Rq) ++ Acc
end,
case stream_keys(Conn, Bucket, Get, []) of
{ok, Docs} -> {ok, Docs, State};
Expand Down Expand Up @@ -158,11 +187,14 @@ find_by(DocName,
Conditions,
Limit,
Offset,
#state{conn = Conn, bucket = Bucket, index = Index} = State) ->
#state{conn = Conn,
bucket = Bucket,
index = Index,
read_quorum = Rq} = State) ->
IdField = sumo_internal:id_field_name(DocName),
case lists:keyfind(IdField, 1, Conditions) of
{_K, Key} ->
case fetch_map(Conn, Bucket, iolist_to_binary(Key)) of
case fetch_map(Conn, Bucket, iolist_to_binary(Key), Rq) of
{ok, RMap} ->
Val = rmap_to_doc(DocName, RMap),
{ok, [Val], State};
Expand Down Expand Up @@ -205,12 +237,12 @@ doc_id(Doc) ->
sumo_internal:get_field(IdField, Doc).

%% @private
new_doc(Doc, #state{conn = Conn, bucket = Bucket}) ->
new_doc(Doc, #state{conn = Conn, bucket = Bucket, write_quorum = Wq}) ->
DocName = sumo_internal:doc_name(Doc),
IdField = sumo_internal:id_field_name(DocName),
Id = case sumo_internal:get_field(IdField, Doc) of
undefined ->
case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc)) of
case update_map(Conn, Bucket, undefined, doc_to_rmap(Doc), Wq) of
{ok, RiakMapId} -> RiakMapId;
{error, Error} -> throw(Error);
_ -> throw(unexpected)
Expand Down Expand Up @@ -281,13 +313,13 @@ normalize_doc_fields(Src) ->
[{return, binary}, global]).

%% @private
fetch_map(Conn, Bucket, Key) ->
riakc_pb_socket:fetch_type(Conn, Bucket, Key).
fetch_map(Conn, Bucket, Key, Opts) ->
riakc_pb_socket:fetch_type(Conn, Bucket, Key, Opts).

%% @private
fetch_map_bulk(DocName, Conn, Bucket, Keys) ->
fetch_map_bulk(DocName, Conn, Bucket, Keys, Opts) ->
Fun = fun(K, Acc) ->
case fetch_map(Conn, Bucket, K) of
case fetch_map(Conn, Bucket, K, Opts) of
{ok, M} -> [rmap_to_doc(DocName, M) | Acc];
_ -> Acc
end
Expand All @@ -299,8 +331,8 @@ delete_map(Conn, Bucket, Key) ->
riakc_pb_socket:delete(Conn, Bucket, Key).

%% @private
update_map(Conn, Bucket, Key, Map) ->
riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map)).
update_map(Conn, Bucket, Key, Map, Opts) ->
riakc_pb_socket:update_type(Conn, Bucket, Key, riakc_map:to_op(Map), Opts).

%% @private
search(Conn, Index, Query, 0, 0) ->
Expand Down
11 changes: 7 additions & 4 deletions test/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
sumo_backend_riak,
[{host, "127.0.0.1"},
{port, 8087},
{bucket_type, "maps"},
{bucket, "sumo_test"},
{index, "sumo_test_index"}]
{poolsize, 10}]
}
]
},
Expand Down Expand Up @@ -73,7 +71,12 @@
{sumo_test_riak,
sumo_store_riak,
[{storage_backend, sumo_test_backend_riak},
{workers, 10}]
{workers, 10},
{bucket_type, "maps"},
{bucket, "sumo_test"},
{index, "sumo_test_index"},
{w_args, [{w, 2}, {pw, 0}, {dw, 0}, {returnbody, false}]},
{r_args, [{r, 2}, {pr, 0}]}]
}
]
},
Expand Down

0 comments on commit d0a8dcd

Please sign in to comment.