Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
stream snapshot when reading from file
Browse files Browse the repository at this point in the history
  • Loading branch information
joecaswell committed Mar 11, 2022
1 parent 9433396 commit 98d6b3a
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/cli/blockchain_cli_snapshot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ snapshot_list(["snapshot", "list"], [], []) ->
Chain = blockchain_worker:blockchain(),
Snapshots = blockchain:find_last_snapshots(Chain, 5),
case Snapshots of
undefined -> ok;
undefined -> [clique_status:text("No snapshot found")];
_ ->
[ clique_status:text(io_lib:format("Height ~p\nHash ~p (~p)\nHave ~p\n",
[Height, Hash, binary_to_hex(Hash),
Expand Down
43 changes: 40 additions & 3 deletions src/handlers/blockchain_snapshot_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
-behavior(libp2p_framed_stream).


-include_lib("kernel/include/file.hrl").
-include_lib("helium_proto/include/blockchain_snapshot_handler_pb.hrl").

%% ------------------------------------------------------------------
Expand Down Expand Up @@ -101,9 +102,8 @@ handle_data(server, Data, #state{chain = Chain} = State) ->
#blockchain_snapshot_req_pb{height = _Height, hash = Hash} ->
case blockchain:get_snapshot(Hash, Chain) of
{ok, {file, FileName}} ->
{ok, Bin} = file:read_file(FileName),
Msg = #blockchain_snapshot_resp_pb{snapshot = Bin},
{noreply, State, blockchain_snapshot_handler_pb:encode_msg(Msg)};
lager:info("streaming snapshot from ~p", [FileName]),
{noreply, State, mk_file_stream_fun(FileName)};
{ok, Snap} ->
lager:info("sending snapshot ~p", [Hash]),
Msg = #blockchain_snapshot_resp_pb{snapshot = Snap},
Expand All @@ -121,3 +121,40 @@ handle_data(server, Data, #state{chain = Chain} = State) ->
handle_info(_Type, _Msg, State) ->
lager:info("unhandled message ~p ~p", [_Type, _Msg]),
{noreply, State}.

%% ------------------------------------------------------------------
%% internal functions
%% ------------------------------------------------------------------

% convert integer to varint encoding for protocol buffer
varint(I, Bin) when I =< 127 -> <<Bin/binary, I>>;
varint(I, Bin) -> varint(I bsr 7, <<Bin/binary, (I band 127 bor 128)>>).


-define(FILE_STREAM_BLOCKSIZE, 4096).

% send file bytes in chunks
mk_file_stream_fun(File) when is_pid(File)->
case file:read(File, ?FILE_STREAM_BLOCKSIZE) of
{ok, Data} ->
fun() ->
{mk_file_stream_fun(File), Data}
end;
eof ->
file:close(File),
fun() -> ok end
end;
mk_file_stream_fun(FileName) ->
{ok, #file_info{size = Bytes}} = file:read_file_info(FileName),
{ok, File} = file:open(FileName, [read, binary]),
PBSize = varint(Bytes, <<>>),
%% 18 introduces a protocol buffers bytes type
Msg0 = <<18, PBSize/binary>>,
HdrSize = byte_size(Msg0),
{ok, Data} = file:read(File, ?FILE_STREAM_BLOCKSIZE - HdrSize),
Msg = <<Msg0/binary, Data/binary>>,
{Bytes + HdrSize,
fun() ->
{mk_file_stream_fun(File), Msg}
end
}.

0 comments on commit 98d6b3a

Please sign in to comment.