diff --git a/src/blockchain.erl b/src/blockchain.erl index 4268551ef5..dacc84d804 100644 --- a/src/blockchain.erl +++ b/src/blockchain.erl @@ -1460,26 +1460,94 @@ build(Height, Blockchain, N, Acc) -> end end. - --spec build_hash_chain(blockchain_block:hash(), blockchain_block:block(), blockchain(), rocksdb:cf_handle()) -> [blockchain_block:hash(), ...]. -build_hash_chain(StopHash,StartingBlock, Blockchain, CF) -> - BlockHash = blockchain_block:hash_block(StartingBlock), - ParentHash = blockchain_block:prev_hash(StartingBlock), - build_hash_chain_(StopHash, CF, Blockchain, [ParentHash, BlockHash]). - --spec build_hash_chain_(blockchain_block:hash(), rocksdb:cf_handle(), blockchain(), [blockchain_block:hash(), ...]) -> [blockchain_block:hash()]. -build_hash_chain_(StopHash, CF, Blockchain = #blockchain{db=DB}, [ParentHash|Tail]=Acc) -> - case ParentHash == StopHash of - true -> - %% reached the end - Tail; - false -> - case rocksdb:get(DB, CF, ParentHash, []) of - {ok, BinBlock} -> - build_hash_chain_(StopHash, CF, Blockchain, [blockchain_block:prev_hash(blockchain_block:deserialize(BinBlock))|Acc]); - _ -> - Acc +-spec build_hash_chain( + blockchain_block:hash(), + blockchain_block:block(), + blockchain(), + rocksdb:cf_handle() +) -> + [binary()]. +build_hash_chain(StopHash, StartBlock, #blockchain{db=DB}, CF) -> + StartHash = blockchain_block:hash_block(StartBlock), + build_hash_chain_in_parallel(DB, CF, StopHash, StartHash). + +-spec build_hash_chain_in_parallel( + rocksdb:db_handle(), + rocksdb:cf_handle(), + H, + H +) -> + [H] when H :: blockchain_block:hash(). +build_hash_chain_in_parallel(DB, CF, Oldest, Youngest) -> + Relations = digraph:new([cyclic]), % XXX 'cyclic' is a perf compromise: + %% 'acyclic' is what we'd ideally want, but it is an expensive option, as + %% it forces a cycles check ON EACH add_edge operation, so by using + %% 'cyclic' we're just trusting that our prev_hash relationships are all + %% correct, which they should be, if not - we have bigger problems. + Edges = + data_stream:pmap_to_bag( + rocksdb_stream(DB, CF), + fun ({<>, <>}) -> + ChildBlock = blockchain_block:deserialize(ChildBlockBin), + <> = blockchain_block:prev_hash(ChildBlock), + %% XXX Can't update digraph in parallel - no concurrent writes. + {ParentHash, ChildHash} end + ), + _ = [digraph:add_vertex(Relations, Child) || {_ , Child} <- Edges], + _ = [digraph:add_edge(Relations, Parent, Child) || {Parent, Child} <- Edges], + HashChain = + %% XXX The simplest solution: + %% digraph:get_path(Relations, Oldest, Youngest) + %% is fine in the ideal case, but doesn't work in the case of a + %% broken path (when a block was missing and its parent could not + %% be looked up), in which case we need to return the longest + %% lineage found, traced back from the youngest hash. TraceBack + %% works for either scenario. + (fun TraceBack ([Child | _]=Lineage) -> + case digraph:in_neighbours(Relations, Child) of + [] -> + Lineage; + [Oldest] -> + Lineage; + [Parent] -> + TraceBack([Parent | Lineage]); + [_|_]=Parents -> + error({incorrect_chain, multiple_parents, Child, Parents}) + end + end)([Youngest]), + true = digraph:delete(Relations), + HashChain. + +-spec rocksdb_stream(rocksdb:db_handle(), rocksdb:cf_handle()) -> + data_stream:t({K :: binary(), V :: binary()}). +rocksdb_stream(DB, CF) -> + fun () -> + case rocksdb:iterator(DB, CF, []) of + {error, Reason} -> + error({rocks_key_streaming_failure, Reason}); + {ok, Iter} -> + case rocksdb:iterator_move(Iter, first) of + {ok, K, V} -> + {some, {{K, V}, rocksdb_stream(Iter)}}; + Error -> + error({rocks_key_streaming_failure, Error}) + end + end + end. + +-spec rocksdb_stream(rocksdb:itr_handle()) -> + data_stream:t({K :: binary(), V :: binary()}). +rocksdb_stream(Iter) -> + fun () -> + case rocksdb:iterator_move(Iter, next) of + {ok, K, V} -> + {some, {{K, V}, rocksdb_stream(Iter)}}; + {error, invalid_iterator} -> + none; + {error, Reason} -> + error({rocks_key_streaming_failure, Reason}) + end end. -spec fold_chain(fun((Blk :: blockchain_block:block(), AccIn :: any()) -> NewAcc :: any()),