Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Parallelize build_hash_chain #1334

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ _build
*.iml
rebar3.crashdump
data/
!src/data/
.DS_Store
src/pb/
src/grpc/autogen
Expand Down
184 changes: 166 additions & 18 deletions src/blockchain.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1460,26 +1460,116 @@ build(Height, Blockchain, N, Acc) ->
end
end.

-spec build_hash_chain(
H,
blockchain_block:block(),
blockchain(),
rocksdb:cf_handle()
) ->
[H, ...] when H :: blockchain_block:hash().
build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) ->
%% XXX This parallelized build_hash_chain has a potential PERF DRAWBACK:
%%
%% Because it processes the WHOLE of blocks CF BEFORE knowing what will
%% actually be needed, if the caller needs a significantly smaller
%% segment than the whole - we end-up doing a lot of wasted
%% deserializations.
%%
%% Some ideas for solutions:
%% A. eliminate the desrialization step entirtely, by maintaining
%% a rocksdb CF of ChildHash->ParentHash mappings;
%% B. dispatch between serial and parallel versions based on some
%% conditions/configs;
%% C. accept the redundancy, but create an opportunity to terminate
%% early once a long-enough segment is built - reimplement Parents
%% as a cache process which does two things:
%% 1. starts a background job which fills the cache with parents;
%% 2. accepts parent requests and either retrieves responses from
%% cache or compute them (updating the cache).
Parents =
maps:from_list(
data_stream:pmap_to_bag(
rocksdb_stream(DB, CF),
fun ({<<ChildHash/binary>>, <<ChildBlockBin/binary>>}) ->
ChildBlock = blockchain_block:deserialize(ChildBlockBin),
<<ParentHash/binary>> = blockchain_block:prev_hash(ChildBlock),
{ChildHash, ParentHash}
end
)
),
StartHash = blockchain_block:hash_block(StartBlock),
HashChain = trace_lineage(Parents, StopHash, StartHash),
%% TODO Predicate the disjointness check on a config?
case find_orphans(Parents) of
[] ->
ok;
[_|_]=Orphans ->
lager:warning(
"Disjoint blocks database. "
"Hash chain may not be valid. "
"Found orphan blocks: ~p, "
"Hash chain length: ~p.",
[length(HashChain), Orphans]
)
end,
HashChain.

-spec build_hash_chain(blockchain_block:hash(), blockchain_block:block(), blockchain(), rocksdb:cf_handle()) -> [blockchain_block:hash(), ...].
build_hash_chain(StopHash,StartingBlock, Blockchain, CF) ->
BlockHash = blockchain_block:hash_block(StartingBlock),
ParentHash = blockchain_block:prev_hash(StartingBlock),
build_hash_chain_(StopHash, CF, Blockchain, [ParentHash, BlockHash]).
-spec find_orphans(#{B => B}) -> [B] when B :: binary().
find_orphans(ChildToParent) ->
%% Given the pairs: #{A => 0, C => B}
%% and assuming the chain: [0, A, B, C]
%% we can see that B=>A is missing.
%%
%% Since we cannot actually assume, we can find non-genesis-parents which
%% have no parents themselves. In above example that would be B. Implying
%% the chain is disjoint.
[
Parent
||
{_, Parent} <- maps:to_list(ChildToParent),
maps:find(Parent, ChildToParent) =:= error, % Parent has no parent.
lists:sum(binary_to_list(Parent)) > 0 % Parent is non-genesis.
].

-spec trace_lineage(#{A => A}, A, A) -> [A, ...].
trace_lineage(Parents, Oldest, Youngest) ->
(fun TraceBack ([Child | _]=Lineage) ->
case maps:find(Child, Parents) of
error -> Lineage;
{ok, Oldest} -> Lineage; % Oldest ancestor is excluded.
{ok, Parent} -> TraceBack([Parent | Lineage])
end
end)([Youngest]).

-spec rocksdb_stream(rocksdb:db_handle(), rocksdb:cf_handle()) ->
data_stream:t({K :: binary(), V :: binary()}).
rocksdb_stream(DB, CF) ->
fun () ->
case rocksdb:iterator(DB, CF, []) of
{error, Reason} ->
error({rocks_key_streaming_failure, Reason});
{ok, Iter} ->
case rocksdb:iterator_move(Iter, first) of
{ok, K, V} ->
{some, {{K, V}, rocksdb_stream(Iter)}};
Error ->
error({rocks_key_streaming_failure, Error})
end
end
end.

-spec build_hash_chain_(blockchain_block:hash(), rocksdb:cf_handle(), blockchain(), [blockchain_block:hash(), ...]) -> [blockchain_block:hash()].
build_hash_chain_(StopHash, CF, Blockchain = #blockchain{db=DB}, [ParentHash|Tail]=Acc) ->
case ParentHash == StopHash of
true ->
%% reached the end
Tail;
false ->
case rocksdb:get(DB, CF, ParentHash, []) of
{ok, BinBlock} ->
build_hash_chain_(StopHash, CF, Blockchain, [blockchain_block:prev_hash(blockchain_block:deserialize(BinBlock))|Acc]);
_ ->
Acc
end
-spec rocksdb_stream(rocksdb:itr_handle()) ->
data_stream:t({K :: binary(), V :: binary()}).
rocksdb_stream(Iter) ->
fun () ->
case rocksdb:iterator_move(Iter, next) of
{ok, K, V} ->
{some, {{K, V}, rocksdb_stream(Iter)}};
{error, invalid_iterator} ->
none;
{error, Reason} ->
error({rocks_key_streaming_failure, Reason})
end
end.

-spec fold_chain(fun((Blk :: blockchain_block:block(), AccIn :: any()) -> NewAcc :: any()),
Expand Down Expand Up @@ -3356,4 +3446,62 @@ block_info_upgrade_test() ->
V2BlockInfo = upgrade_block_info(V1BlockInfo, Block, Chain),
?assertMatch(ExpV2BlockInfo, V2BlockInfo).

trace_lineage_test_() ->
[
?_assertEqual(
[middle, youngest],
trace_lineage(
#{
youngest => middle,
middle => oldest
},
oldest,
youngest
)
),
?_assertEqual(
[b, c, d, e],
trace_lineage(
#{
e => d,
d => c,
c => b,
b => a
},
a,
e
)
)
].

find_orphans_test_() ->
[
?_assertEqual(
%% [0, A, B, C]
%% #{A => 0, C => B}
%% missing B=>A, so B is orphan.
[<<"B">>],
find_orphans(
#{
<<"A">> => <<0>>,
<<"C">> => <<"B">>
}
)
),
?_assertEqual(
%% [X, A, B, C]
%% #{A => X, C => B}
%% missing B=>A,
%% X is not genesis,
%% so both X and B are orphans.
[<<"X">>, <<"B">>],
find_orphans(
#{
<<"A">> => <<"X">>,
<<"C">> => <<"B">>
}
)
)
].

-endif.
2 changes: 2 additions & 0 deletions src/blockchain_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include("blockchain_vars.hrl").

-export([
cpus/0,
shuffle_from_hash/2,
shuffle/1,
rand_from_hash/1, rand_state/1,
Expand Down Expand Up @@ -290,6 +291,7 @@ validation_width() ->
N
end.

-spec cpus() -> non_neg_integer().
cpus() ->
Ct = erlang:system_info(schedulers_online),
max(2, ceil(Ct/2) + 1).
Expand Down
Loading