Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
Parallelize build_hash_chain
Browse files Browse the repository at this point in the history
The main idea is to
1. asynchronously: enumerate all K->V pairs in the blocks CF;
2. in-parallel:
    2.1 deserialize blocks;
    2.2 lookup parents;
    2.3 accumulate `{Parent, Child}` pairs;
3. in-serial:
    3.1 build graph from above-built pairs;
    3.2 walk backwards from the youngest given hash and trace its
        longest possible lineage up to the oldest given hash.

The last step, 3.2, is essentially what the previous implementation
(that this one replaces) used to do, but this time all the expensive
deserialization has already been done, in parallel.
  • Loading branch information
xandkar committed May 10, 2022
1 parent 988ade6 commit cb82d65
Showing 1 changed file with 87 additions and 19 deletions.
106 changes: 87 additions & 19 deletions src/blockchain.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1460,26 +1460,94 @@ build(Height, Blockchain, N, Acc) ->
end
end.


-spec build_hash_chain(blockchain_block:hash(), blockchain_block:block(), blockchain(), rocksdb:cf_handle()) -> [blockchain_block:hash(), ...].
build_hash_chain(StopHash,StartingBlock, Blockchain, CF) ->
BlockHash = blockchain_block:hash_block(StartingBlock),
ParentHash = blockchain_block:prev_hash(StartingBlock),
build_hash_chain_(StopHash, CF, Blockchain, [ParentHash, BlockHash]).

-spec build_hash_chain_(blockchain_block:hash(), rocksdb:cf_handle(), blockchain(), [blockchain_block:hash(), ...]) -> [blockchain_block:hash()].
build_hash_chain_(StopHash, CF, Blockchain = #blockchain{db=DB}, [ParentHash|Tail]=Acc) ->
case ParentHash == StopHash of
true ->
%% reached the end
Tail;
false ->
case rocksdb:get(DB, CF, ParentHash, []) of
{ok, BinBlock} ->
build_hash_chain_(StopHash, CF, Blockchain, [blockchain_block:prev_hash(blockchain_block:deserialize(BinBlock))|Acc]);
_ ->
Acc
-spec build_hash_chain(
blockchain_block:hash(),
blockchain_block:block(),
blockchain(),
rocksdb:cf_handle()
) ->
[binary()].
build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) ->
StartHash = blockchain_block:hash_block(StartBlock),
build_hash_chain_in_parallel(DB, CF, StopHash, StartHash).

-spec build_hash_chain_in_parallel(
rocksdb:db_handle(),
rocksdb:cf_handle(),
H,
H
) ->
[H] when H :: blockchain_block:hash().
build_hash_chain_in_parallel(DB, CF, Oldest, Youngest) ->
Relations = digraph:new([cyclic]), % XXX 'cyclic' is a perf compromise:
%% 'acyclic' is what we'd ideally want, but it is an expensive option, as
%% it forces a cycles check ON EACH add_edge operation, so by using
%% 'cyclic' we're just trusting that our prev_hash relationships are all
%% correct, which they should be, if not - we have bigger problems.
Edges =
data_stream:pmap_to_bag(
rocksdb_stream(DB, CF),
fun ({<<ChildHash/binary>>, <<ChildBlockBin/binary>>}) ->
ChildBlock = blockchain_block:deserialize(ChildBlockBin),
<<ParentHash/binary>> = blockchain_block:prev_hash(ChildBlock),
%% XXX Can't update digraph in parallel - no concurrent writes.
{ParentHash, ChildHash}
end
),
_ = [digraph:add_vertex(Relations, Child) || {_ , Child} <- Edges],
_ = [digraph:add_edge(Relations, Parent, Child) || {Parent, Child} <- Edges],
HashChain =
%% XXX The simplest solution:
%% digraph:get_path(Relations, Oldest, Youngest)
%% is fine in the ideal case, but doesn't work in the case of a
%% broken path (when a block was missing and its parent could not
%% be looked up), in which case we need to return the longest
%% lineage found, traced back from the youngest hash. TraceBack
%% works for either scenario.
(fun TraceBack ([Child | _]=Lineage) ->
case digraph:in_neighbours(Relations, Child) of
[] ->
Lineage;
[Oldest] ->
Lineage;
[Parent] ->
TraceBack([Parent | Lineage]);
[_|_]=Parents ->
error({incorrect_chain, multiple_parents, Child, Parents})
end
end)([Youngest]),
true = digraph:delete(Relations),
HashChain.

-spec rocksdb_stream(rocksdb:db_handle(), rocksdb:cf_handle()) ->
data_stream:t({K :: binary(), V :: binary()}).
rocksdb_stream(DB, CF) ->
fun () ->
case rocksdb:iterator(DB, CF, []) of
{error, Reason} ->
error({rocks_key_streaming_failure, Reason});
{ok, Iter} ->
case rocksdb:iterator_move(Iter, first) of
{ok, K, V} ->
{some, {{K, V}, rocksdb_stream(Iter)}};
Error ->
error({rocks_key_streaming_failure, Error})
end
end
end.

-spec rocksdb_stream(rocksdb:itr_handle()) ->
data_stream:t({K :: binary(), V :: binary()}).
rocksdb_stream(Iter) ->
fun () ->
case rocksdb:iterator_move(Iter, next) of
{ok, K, V} ->
{some, {{K, V}, rocksdb_stream(Iter)}};
{error, invalid_iterator} ->
none;
{error, Reason} ->
error({rocks_key_streaming_failure, Reason})
end
end.

-spec fold_chain(fun((Blk :: blockchain_block:block(), AccIn :: any()) -> NewAcc :: any()),
Expand Down

0 comments on commit cb82d65

Please sign in to comment.