diff --git a/src/cli/blockchain_cli_snapshot.erl b/src/cli/blockchain_cli_snapshot.erl index d4c2155fdd..8de68de0f5 100644 --- a/src/cli/blockchain_cli_snapshot.erl +++ b/src/cli/blockchain_cli_snapshot.erl @@ -213,7 +213,7 @@ snapshot_list(["snapshot", "list"], [], []) -> Chain = blockchain_worker:blockchain(), Snapshots = blockchain:find_last_snapshots(Chain, 5), case Snapshots of - undefined -> ok; + undefined -> [clique_status:text("No snapshot found")]; _ -> [ clique_status:text(io_lib:format("Height ~p\nHash ~p (~p)\nHave ~p\n", [Height, Hash, binary_to_hex(Hash), diff --git a/src/handlers/blockchain_snapshot_handler.erl b/src/handlers/blockchain_snapshot_handler.erl index ffcb8c64f8..c3b5c3d60f 100644 --- a/src/handlers/blockchain_snapshot_handler.erl +++ b/src/handlers/blockchain_snapshot_handler.erl @@ -8,6 +8,7 @@ -behavior(libp2p_framed_stream). +-include_lib("kernel/include/file.hrl"). -include_lib("helium_proto/include/blockchain_snapshot_handler_pb.hrl"). %% ------------------------------------------------------------------ @@ -101,9 +102,8 @@ handle_data(server, Data, #state{chain = Chain} = State) -> #blockchain_snapshot_req_pb{height = _Height, hash = Hash} -> case blockchain:get_snapshot(Hash, Chain) of {ok, {file, FileName}} -> - {ok, Bin} = file:read_file(FileName), - Msg = #blockchain_snapshot_resp_pb{snapshot = Bin}, - {noreply, State, blockchain_snapshot_handler_pb:encode_msg(Msg)}; + lager:info("streaming snapshot from ~p", [FileName]), + {noreply, State, mk_file_stream_fun(FileName)}; {ok, Snap} -> lager:info("sending snapshot ~p", [Hash]), Msg = #blockchain_snapshot_resp_pb{snapshot = Snap}, @@ -121,3 +121,40 @@ handle_data(server, Data, #state{chain = Chain} = State) -> handle_info(_Type, _Msg, State) -> lager:info("unhandled message ~p ~p", [_Type, _Msg]), {noreply, State}. + +%% ------------------------------------------------------------------ +%% internal functions +%% ------------------------------------------------------------------ + +% convert integer to varint encoding for protocol buffer +varint(I, Bin) when I =< 127 -> <>; +varint(I, Bin) -> varint(I bsr 7, <>). + + +-define(FILE_STREAM_BLOCKSIZE, 4096). + +% send file bytes in chunks +mk_file_stream_fun(File) when is_pid(File)-> + case file:read(File, ?FILE_STREAM_BLOCKSIZE) of + {ok, Data} -> + fun() -> + {mk_file_stream_fun(File), Data} + end; + eof -> + file:close(File), + fun() -> ok end + end; +mk_file_stream_fun(FileName) -> + {ok, #file_info{size = Bytes}} = file:read_file_info(FileName), + {ok, File} = file:open(FileName, [read, binary]), + PBSize = varint(Bytes, <<>>), + %% 18 introduces a protocol buffers bytes type + Msg0 = <<18, PBSize/binary>>, + HdrSize = byte_size(Msg0), + {ok, Data} = file:read(File, ?FILE_STREAM_BLOCKSIZE - HdrSize), + Msg = <>, + {Bytes + HdrSize, + fun() -> + {mk_file_stream_fun(File), Msg} + end + }.