Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IF: add get_blocks_result_v1 support to SHiP #2349

Merged
merged 12 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion libraries/state_history/abi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ extern const char* const state_history_plugin_abi = R"({
},
{
"name": "get_blocks_result_v0", "fields": [
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "this_block", "type": "block_position?" },
{ "name": "prev_block", "type": "block_position?" },
{ "name": "block", "type": "bytes?" },
{ "name": "traces", "type": "bytes?" },
{ "name": "deltas", "type": "bytes?" }
]
},
{
"name": "get_blocks_result_v1", "fields": [
heifner marked this conversation as resolved.
Show resolved Hide resolved
{ "name": "head", "type": "block_position" },
{ "name": "last_irreversible", "type": "block_position" },
{ "name": "this_block", "type": "block_position?" },
Expand Down Expand Up @@ -576,7 +587,7 @@ extern const char* const state_history_plugin_abi = R"({
],
"variants": [
{ "name": "request", "types": ["get_status_request_v0", "get_blocks_request_v0", "get_blocks_request_v1", "get_blocks_ack_request_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0"] },
{ "name": "result", "types": ["get_status_result_v0", "get_blocks_result_v0", "get_blocks_result_v1"] },

{ "name": "action_receipt", "types": ["action_receipt_v0"] },
{ "name": "action_trace", "types": ["action_trace_v0", "action_trace_v1"] },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,18 @@ datastream<ST>& operator<<(datastream<ST>& ds, const history_context_wrapper_sta

template <typename ST>
datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_blocks_result_v0& obj) {
fc::raw::pack(ds, obj.head);
fc::raw::pack(ds, obj.last_irreversible);
fc::raw::pack(ds, obj.this_block);
fc::raw::pack(ds, obj.prev_block);
history_pack_big_bytes(ds, obj.block);
history_pack_big_bytes(ds, obj.traces);
history_pack_big_bytes(ds, obj.deltas);
return ds;
}

template <typename ST>
datastream<ST>& operator<<(datastream<ST>& ds, const eosio::state_history::get_blocks_result_v1& obj) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
fc::raw::pack(ds, obj.head);
fc::raw::pack(ds, obj.last_irreversible);
fc::raw::pack(ds, obj.this_block);
Expand Down
9 changes: 7 additions & 2 deletions libraries/state_history/include/eosio/state_history/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,16 @@ struct get_blocks_result_base {
struct get_blocks_result_v0 : get_blocks_result_base {
std::optional<bytes> traces;
std::optional<bytes> deltas;
};

struct get_blocks_result_v1 : get_blocks_result_v0 {
std::optional<bytes> finality_data;
};

using state_request = std::variant<get_status_request_v0, get_blocks_request_v0, get_blocks_request_v1, get_blocks_ack_request_v0>;
using state_result = std::variant<get_status_result_v0, get_blocks_result_v0, get_blocks_result_v1>;
using get_blocks_request = std::variant<get_blocks_request_v0, get_blocks_request_v1>;
using state_result = std::variant<get_status_result_v0, get_blocks_result_v0>;
using get_blocks_result = std::variant<get_blocks_result_v0, get_blocks_result_v1>;

} // namespace state_history
} // namespace eosio
Expand All @@ -142,5 +146,6 @@ FC_REFLECT(eosio::state_history::get_blocks_request_v0, (start_block_num)(end_bl
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_request_v1, (eosio::state_history::get_blocks_request_v0), (fetch_finality_data));
FC_REFLECT(eosio::state_history::get_blocks_ack_request_v0, (num_messages));
FC_REFLECT(eosio::state_history::get_blocks_result_base, (head)(last_irreversible)(this_block)(prev_block)(block));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v0, (eosio::state_history::get_blocks_result_base), (traces)(deltas)(finality_data));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v0, (eosio::state_history::get_blocks_result_base), (traces)(deltas));
FC_REFLECT_DERIVED(eosio::state_history::get_blocks_result_v1, (eosio::state_history::get_blocks_result_v0), (finality_data));
// clang-format on
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class blocks_request_send_queue_entry : public send_queue_entry_base {
template <typename Session>
class blocks_result_send_queue_entry : public send_queue_entry_base, public std::enable_shared_from_this<blocks_result_send_queue_entry<Session>> {
std::shared_ptr<Session> session;
state_history::get_blocks_result_v0 r;
state_history::get_blocks_result result;
std::vector<char> data;
std::optional<locked_decompress_stream> stream;

Expand Down Expand Up @@ -264,46 +264,68 @@ class blocks_result_send_queue_entry : public send_queue_entry_base, public std:
});
}

// last to be sent
// last to be sent if result is get_blocks_result_v1
void send_finality_data() {
assert(std::holds_alternative<state_history::get_blocks_result_v1>(result));
stream.reset();
send_log(session->get_finality_data_log_entry(r, stream), true, [me=this->shared_from_this()]() {
send_log(session->get_finality_data_log_entry(std::get<state_history::get_blocks_result_v1>(result), stream), true, [me=this->shared_from_this()]() {
me->stream.reset();
me->session->session_mgr.pop_entry();
});
}

// second to be sent
// second to be sent if result is get_blocks_result_v1;
// last to be sent if result is get_blocks_result_v0
void send_deltas() {
stream.reset();
send_log(session->get_delta_log_entry(r, stream), false, [me=this->shared_from_this()]() {
me->send_finality_data();
});
std::visit(chain::overloaded{
[&](state_history::get_blocks_result_v0& r) {
send_log(session->get_delta_log_entry(r, stream), true, [me=this->shared_from_this()]() {
me->stream.reset();
me->session->session_mgr.pop_entry();}); },
[&](state_history::get_blocks_result_v1& r) {
send_log(session->get_delta_log_entry(r, stream), false, [me=this->shared_from_this()]() {
me->send_finality_data(); }); }},
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
result);
}

// first to be sent
void send_traces() {
stream.reset();
send_log(session->get_trace_log_entry(r, stream), false, [me=this->shared_from_this()]() {
send_log(session->get_trace_log_entry(result, stream), false, [me=this->shared_from_this()]() {
me->send_deltas();
});
}

public:
blocks_result_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_result_v0&& r)
blocks_result_send_queue_entry(std::shared_ptr<Session> s, state_history::get_blocks_result&& r)
: session(std::move(s)),
r(std::move(r)) {}
result(std::move(r)) {}

void send_entry() override {
assert(std::holds_alternative<state_history::get_blocks_result_v0>(result) ||
std::holds_alternative<state_history::get_blocks_result_v1>(result));
heifner marked this conversation as resolved.
Show resolved Hide resolved

// pack the state_result{get_blocks_result} excluding the fields `traces` and `deltas`
fc::datastream<size_t> ss;
fc::raw::pack(ss, fc::unsigned_int(1)); // pack the variant index of state_result{r}
fc::raw::pack(ss, static_cast<const state_history::get_blocks_result_base&>(r));
if(std::holds_alternative<state_history::get_blocks_result_v0>(result)) {
fc::raw::pack(ss, fc::unsigned_int(1)); // pack the variant index of state_result{r}, 1 for get_blocks_result_v0
} else {
fc::raw::pack(ss, fc::unsigned_int(2)); // pack the variant index of state_result{r}, 2 for get_blocks_result_v1
heifner marked this conversation as resolved.
Show resolved Hide resolved
}
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
std::visit([&](auto& r) {
fc::raw::pack(ss, static_cast<const state_history::get_blocks_result_base&>(r)); },
result);
data.resize(ss.tellp());
fc::datastream<char*> ds(data.data(), data.size());
fc::raw::pack(ds, fc::unsigned_int(1)); // pack the variant index of state_result{r}
fc::raw::pack(ds, static_cast<const state_history::get_blocks_result_base&>(r));

if(std::holds_alternative<state_history::get_blocks_result_v0>(result)) {
fc::raw::pack(ds, fc::unsigned_int(1)); // pack the variant index of state_result{r}, 1 for get_blocks_result_v0
} else {
fc::raw::pack(ds, fc::unsigned_int(2)); // pack the variant index of state_result{r}, 2 for get_blocks_result_v1
}
std::visit([&](auto& r) {
fc::raw::pack(ds, static_cast<const state_history::get_blocks_result_base&>(r)); },
result);
async_send(false, data, [me=this->shared_from_this()]() {
me->send_traces();
});
Expand Down Expand Up @@ -394,31 +416,31 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
}
}

uint64_t get_log_entry_impl(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_log_entry_impl(const eosio::state_history::get_blocks_result& result,
bool has_value,
std::optional<state_history_log>& optional_log,
std::optional<locked_decompress_stream>& buf) {
if (has_value) {
if( optional_log ) {
buf.emplace( optional_log->create_locked_decompress_stream() );
return optional_log->get_unpacked_entry( result.this_block->block_num, *buf );
return std::visit([&](auto& r) { return optional_log->get_unpacked_entry( r.this_block->block_num, *buf ); }, result);
}
}
return 0;
}

uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_trace_log_entry(const eosio::state_history::get_blocks_result& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.traces.has_value(), plugin.get_trace_log(), buf);
return std::visit([&](auto& r) { return get_log_entry_impl(r, r.traces.has_value(), plugin.get_trace_log(), buf); }, result);
}

uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result_v0& result,
uint64_t get_delta_log_entry(const eosio::state_history::get_blocks_result& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.deltas.has_value(), plugin.get_chain_state_log(), buf);
return std::visit([&](auto& r) { return get_log_entry_impl(r, r.deltas.has_value(), plugin.get_chain_state_log(), buf); }, result);
}

uint64_t get_finality_data_log_entry(const eosio::state_history::get_blocks_result_v0& result,
std::optional<locked_decompress_stream>& buf) {
uint64_t get_finality_data_log_entry(const eosio::state_history::get_blocks_result_v1& result,
std::optional<locked_decompress_stream>& buf) {
return get_log_entry_impl(result, result.finality_data.has_value(), plugin.get_finality_data_log(), buf);
}

Expand Down Expand Up @@ -515,7 +537,8 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
current_request = std::move(req);
}

void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
template<typename T> // get_blocks_result_v0 or get_blocks_result_v1
void send_update(state_history::get_blocks_request_v0& request, bool fetch_finality_data, T result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
heifner marked this conversation as resolved.
Show resolved Hide resolved
need_to_send_update = true;

result.last_irreversible = plugin.get_last_irreversible();
Expand Down Expand Up @@ -565,8 +588,10 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
result.traces.emplace();
if (request.fetch_deltas && plugin.get_chain_state_log())
result.deltas.emplace();
if (fetch_finality_data && plugin.get_finality_data_log()) {
result.finality_data.emplace(); // create finality_data (it's an optional field)
if constexpr (std::is_same_v<T, state_history::get_blocks_result_v1>) {
if (fetch_finality_data && plugin.get_finality_data_log()) {
result.finality_data.emplace();
}
}
}
++to_send_block_num;
Expand Down Expand Up @@ -601,7 +626,7 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
return !max_messages_in_flight;
}

void send_update(state_history::get_blocks_result_v0 result, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
void send_update(const state_history::block_position& head, const chain::signed_block_ptr& block, const chain::block_id_type& id) {
if (no_request_or_not_max_messages_in_flight()) {
session_mgr.pop_entry(false);
return;
Expand All @@ -613,9 +638,13 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock

std::visit(eosio::chain::overloaded{
[&](eosio::state_history::get_blocks_request_v0& request) {
send_update(request, false, result, block, id); },
state_history::get_blocks_result_v0 result;
result.head = head;
send_update(request, false, std::move(result), block, id); },
[&](eosio::state_history::get_blocks_request_v1& request) {
send_update(request, request.fetch_finality_data, result, block, id); } },
state_history::get_blocks_result_v1 result;
result.head = head;
send_update(request, request.fetch_finality_data, std::move(result), block, id); } },
*current_request);
}

Expand All @@ -626,17 +655,13 @@ struct session : session_base, std::enable_shared_from_this<session<Plugin, Sock
}

auto block_num = block->block_num();
state_history::get_blocks_result_v0 result;
result.head = {block_num, id};
to_send_block_num = std::min(block_num, to_send_block_num);
send_update(std::move(result), block, id);
send_update(state_history::block_position{block_num, id}, block, id);
}

void send_update(bool changed) override {
if (changed || need_to_send_update) {
state_history::get_blocks_result_v0 result;
result.head = plugin.get_block_head();
send_update(std::move(result), nullptr, chain::block_id_type{});
send_update(plugin.get_block_head(), nullptr, chain::block_id_type{});
} else {
session_mgr.pop_entry(false);
}
Expand Down
Loading
Loading