Skip to content

Commit

Permalink
Fix dialyzer warnings (#10)
Browse files Browse the repository at this point in the history
* make the should_deflate flag part of the state (#2)

To control if it should or not deflate,  the library was
accepting a flag on kpl_agg:finish/2.

But, this wasn't applied to aggregated records created implicitly
when adding new items.

This PR makes the should_deflate part of the state, and allow it
to be initialized using kpl_agg:new/1  (default is to not deflate).

Note:
It has a backward incompatible change, as kpl_agg:finish/2 is removed.

* Avoid cuadratic complexity,  keep track of current list length (#3)

Do not calculate length(List) on each item addition,  keep a track
of the current buffer length.

* Remove dialyzer warnings

* Fix #stream_record.timestamp type
  • Loading branch information
elbrujohalcon authored Jul 19, 2018
1 parent b9cd2f3 commit 25d28e6
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 28 deletions.
4 changes: 2 additions & 2 deletions include/erlmld.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

-record(stream_record, {
partition_key :: binary(),
timestamp :: non_neg_integer(), % approximate arrival time (ms)
timestamp :: undefined | non_neg_integer(), % approximate arrival time (ms)
delay :: non_neg_integer(), % approximate delay between this record and tip of stream (ms)
sequence_number :: sequence_number(),
data :: binary()
data :: term()
}).

-type worker_state() :: term().
Expand Down
15 changes: 15 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,18 @@
]}
]}
]}.

{dialyzer, [
{warnings, [unknown, no_return, error_handling]},
{plt_apps, top_level_deps},
{plt_extra_apps, []},
{plt_location, local},
{base_plt_apps, [erts, stdlib, kernel, sasl, runtime_tools, tools]},
{base_plt_location, global}
]}.

{xref_checks,[
undefined_function_calls,
locals_not_used,
deprecated_function_calls
]}.
1 change: 1 addition & 0 deletions src/erlmld.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
{applications,
[kernel,
stdlib,
crypto,
erlexec,
jiffy,
b64fast]},
Expand Down
12 changes: 6 additions & 6 deletions src/erlmld_wrk_statem.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,23 @@
handler_data :: term(),

%% connected socket owned by this process:
socket :: gen_tcp:socket(),
socket :: undefined | gen_tcp:socket(),

%% input buffer; responses are small and we need no output buffer:
buf = [] :: list(binary()),
buf = [] :: [binary()],

%% worker state returned from handler module init:
worker_state :: term(),
worker_state :: undefined | term(),

%% if true, the MLD made a processRecords call with the V2 format (supplied
%% millisBehindLatest), so we will checkpoint using the V2 checkpoint format:
is_v2 = false :: boolean(),

%% most recent action name from the peer:
last_request :: binary(),
last_request :: undefined | binary(),

%% last attempted checkpoint:
last_checkpoint :: checkpoint()
last_checkpoint :: undefined | checkpoint()
}).

-define(INTERNAL, internal).
Expand Down Expand Up @@ -590,7 +590,7 @@ next_line(Bin, #data{buf = Buf} = Data) ->
%% and remaining data. an "action" is a line which should have been a json-encoded map
%% containing an "action" key. if decoding fails with a thrown error, that error is
%% returned as the decoded value.
-spec next_action(binary(), #data{}) -> {map() | undefined, #data{}, binary()}.
-spec next_action(binary(), #data{}) -> {map() | undefined | {error, term()}, #data{}, binary()}.
next_action(Bin, Data) ->
case next_line(Bin, Data) of
{undefined, NData, Rest} ->
Expand Down
42 changes: 22 additions & 20 deletions src/kpl_agg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
%%% end
%%%
%%% The result currently uses a non-standard magic prefix to prevent the KCL from
%%% deaggregating the record automatically. To use compression, use
%%% `kpl_agg:finish/2` with `true` as the second argument, which uses another
%%% deaggregating the record automatically. To use compression, instantiate the
%%% aggregator using kpl_agg:new(true), which uses another
%%% non-standard magic prefix.
%%%
%%% @end
Expand All @@ -45,7 +45,7 @@
-module(kpl_agg).

%% API
-export([new/0, count/1, size_bytes/1, finish/1, finish/2, add/2, add_all/2]).
-export([new/0, new/1, count/1, size_bytes/1, finish/1, add/2, add_all/2]).

-define(MD5_DIGEST_BYTES, 16).
%% From http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html:
Expand All @@ -57,7 +57,8 @@
%% A set of keys, mapping each key to a unique index.
-record(keyset, {
rev_keys = [] :: list(binary()), %% list of known keys in reverse order
key_to_index = maps:new() :: map() %% maps each known key to a 0-based index
rev_keys_length = 0 :: non_neg_integer(), %% length of the rev_keys list
key_to_index = maps:new() :: map() %% maps each known key to a 0-based index
}).

%% Internal state of a record aggregator. It stores an aggregated record that
Expand All @@ -76,7 +77,9 @@
explicit_hash_keyset = #keyset{} :: #keyset{},

%% List if user records added so far, in reverse order.
rev_records = [] :: [#'Record'{}]
rev_records = [] :: [#'Record'{}],

should_deflate = false
}).


Expand All @@ -85,7 +88,9 @@
%%%===================================================================

new() ->
#state{}.
new(false).
new(ShouldDeflate) ->
#state{should_deflate = ShouldDeflate}.

count(#state{num_user_records = Num} = _State) ->
Num.
Expand All @@ -101,15 +106,12 @@ size_bytes(#state{agg_size_bytes = Size,
end)
+ byte_size(kpl_agg_pb:encode_msg(#'AggregatedRecord'{})).

finish(#state{num_user_records = 0} = State, _) ->
finish(#state{num_user_records = 0} = State) ->
{undefined, State};

finish(#state{agg_partition_key = AggPK, agg_explicit_hash_key = AggEHK} = State, ShouldDeflate) ->
finish(#state{agg_partition_key = AggPK, agg_explicit_hash_key = AggEHK, should_deflate = ShouldDeflate} = State) ->
AggRecord = {AggPK, serialize_data(State, ShouldDeflate), AggEHK},
{AggRecord, new()}.

finish(State) ->
finish(State, false).
{AggRecord, new(ShouldDeflate)}.


add(State, {PartitionKey, Data} = _Record) ->
Expand Down Expand Up @@ -277,23 +279,23 @@ is_key(Key, #keyset{key_to_index = KeyToIndex} = _KeySet) ->

get_or_add_key(undefined, KeySet) ->
{undefined, KeySet};
get_or_add_key(Key, #keyset{rev_keys = RevKeys, key_to_index = KeyToIndex} = KeySet) ->
get_or_add_key(Key, #keyset{rev_keys = RevKeys, rev_keys_length = Length, key_to_index = KeyToIndex} = KeySet) ->
case maps:get(Key, KeyToIndex, not_found) of
not_found ->
Index = length(RevKeys),
NewKeySet = KeySet#keyset{
rev_keys = [Key | RevKeys],
key_to_index = maps:put(Key, Index, KeyToIndex)
rev_keys_length = Length + 1,
key_to_index = maps:put(Key, Length, KeyToIndex)
},
{Index, NewKeySet};
{Length, NewKeySet};
Index ->
{Index, KeySet}
end.


potential_index(Key, #keyset{rev_keys = RevKeys, key_to_index = KeyToIndex} = _KeySet) ->
potential_index(Key, #keyset{rev_keys_length = Length, key_to_index = KeyToIndex} = _KeySet) ->
case maps:get(Key, KeyToIndex, not_found) of
not_found -> length(RevKeys);
not_found -> Length;
Index -> Index
end.

Expand Down Expand Up @@ -443,9 +445,9 @@ full_record_test() ->


deflate_test() ->
Agg0 = new(),
Agg0 = new(true),
{undefined, Agg1} = add(Agg0, {<<"pk1">>, <<"data1">>, <<"ehk1">>}),
{{_, Data, _}, _} = finish(Agg1, true),
{{_, Data, _}, _} = finish(Agg1),
<<Magic:4/binary, Deflated/binary>> = Data,
?assertEqual(?KPL_AGG_MAGIC_DEFLATED, Magic),
Inflated = zlib:uncompress(Deflated),
Expand Down

0 comments on commit 25d28e6

Please sign in to comment.