Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus: Add speculative block metrics #1502

Merged
merged 7 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,29 +146,49 @@ class producer_plugin : public appbase::plugin<producer_plugin> {

static void set_test_mode(bool m) { test_mode_ = m; }

struct produced_block_metrics {
struct speculative_block_metrics {
account_name block_producer{};
uint32_t block_num = 0;
int64_t block_total_time_us = 0;
int64_t block_idle_us = 0;
std::size_t num_success_trx = 0;
int64_t success_trx_time_us = 0;
std::size_t num_fail_trx = 0;
int64_t fail_trx_time_us = 0;
std::size_t num_transient_trx = 0;
int64_t transient_trx_time_us = 0;
int64_t block_other_time_us = 0;
};

struct produced_block_metrics : public speculative_block_metrics {
std::size_t unapplied_transactions_total = 0;
std::size_t blacklisted_transactions_total = 0;
std::size_t subjective_bill_account_size_total = 0;
std::size_t scheduled_trxs_total = 0;
std::size_t trxs_produced_total = 0;
uint64_t cpu_usage_us = 0;
int64_t total_elapsed_time_us = 0;
int64_t total_time_us = 0;
uint64_t net_usage_us = 0;

uint32_t last_irreversible = 0;
uint32_t head_block_num = 0;
};

struct incoming_block_metrics {
std::size_t trxs_incoming_total = 0;
uint64_t cpu_usage_us = 0;
uint64_t net_usage_us = 0;
std::size_t trxs_incoming_total = 0;
uint64_t cpu_usage_us = 0;
int64_t total_elapsed_time_us = 0;
int64_t total_time_us = 0;
uint64_t net_usage_us = 0;
int64_t block_latency_us = 0;

uint32_t last_irreversible = 0;
uint32_t head_block_num = 0;
};

void register_update_produced_block_metrics(std::function<void(produced_block_metrics)>&&);
void register_update_speculative_block_metrics(std::function<void(speculative_block_metrics)>&&);
void register_update_incoming_block_metrics(std::function<void(incoming_block_metrics)>&&);

inline static bool test_mode_{false}; // to be moved into appbase (application_base)
Expand Down
72 changes: 47 additions & 25 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

#include <boost/asio.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/range/adaptor/map.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/hashed_index.hpp>
Expand Down Expand Up @@ -307,11 +305,11 @@ struct block_time_tracker {
last_time_point = now;
}

void report(uint32_t block_num, account_name producer) {
void report(uint32_t block_num, account_name producer, producer_plugin::speculative_block_metrics& metrics) {
using namespace std::string_literals;
assert(!paused);
auto now = fc::time_point::now();
if( _log.is_enabled( fc::log_level::debug ) ) {
auto now = fc::time_point::now();
auto diff = now - clear_time_point - block_idle_time - trx_success_time - trx_fail_time - transient_trx_time - other_time;
fc_dlog( _log, "Block #${n} ${p} trx idle: ${i}us out of ${t}us, success: ${sn}, ${s}us, fail: ${fn}, ${f}us, "
"transient: ${ttn}, ${tt}us, other: ${o}us${rest}",
Expand All @@ -321,6 +319,17 @@ struct block_time_tracker {
("ttn", transient_trx_num)("tt", transient_trx_time)
("o", other_time)("rest", diff.count() > 5 ? ", diff: "s + std::to_string(diff.count()) + "us"s : ""s ) );
}
metrics.block_producer = producer;
metrics.block_num = block_num;
metrics.block_total_time_us = (now - clear_time_point).count();
metrics.block_idle_us = block_idle_time.count();
metrics.num_success_trx = trx_success_num;
metrics.success_trx_time_us = trx_success_time.count();
metrics.num_fail_trx = trx_fail_num;
metrics.fail_trx_time_us = trx_fail_time.count();
metrics.num_transient_trx = transient_trx_num;
metrics.transient_trx_time_us = transient_trx_time.count();
metrics.block_other_time_us = other_time.count();
}

void clear() {
Expand Down Expand Up @@ -571,6 +580,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
snapshot_scheduler _snapshot_scheduler;

std::function<void(producer_plugin::produced_block_metrics)> _update_produced_block_metrics;
std::function<void(producer_plugin::speculative_block_metrics)> _update_speculative_block_metrics;
std::function<void(producer_plugin::incoming_block_metrics)> _update_incoming_block_metrics;

// ro for read-only
Expand Down Expand Up @@ -666,7 +676,10 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

if (block_info) {
auto[block_num, block_producer] = *block_info;
_time_tracker.report(block_num, block_producer);
producer_plugin::speculative_block_metrics metrics;
_time_tracker.report(block_num, block_producer, metrics);
if (_update_speculative_block_metrics)
_update_speculative_block_metrics(metrics);
}
_time_tracker.clear();
}
Expand Down Expand Up @@ -764,11 +777,14 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
}
}
if (_update_incoming_block_metrics) {
_update_incoming_block_metrics({.trxs_incoming_total = block->transactions.size(),
.cpu_usage_us = br.total_cpu_usage_us,
.net_usage_us = br.total_net_usage,
.last_irreversible = chain.last_irreversible_block_num(),
.head_block_num = chain.head_block_num()});
_update_incoming_block_metrics({.trxs_incoming_total = block->transactions.size(),
.cpu_usage_us = br.total_cpu_usage_us,
.total_elapsed_time_us = br.total_elapsed_time.count(),
.total_time_us = br.total_time.count(),
.net_usage_us = br.total_net_usage,
.block_latency_us = (now - block->timestamp).count(),
.last_irreversible = chain.last_irreversible_block_num(),
.head_block_num = blk_num});
}

return true;
Expand Down Expand Up @@ -2722,31 +2738,33 @@ void producer_plugin_impl::produce_block() {
chain.commit_block();

block_state_ptr new_bs = chain.head_block_state();

producer_plugin::produced_block_metrics metrics;
br.total_time += fc::time_point::now() - start;

if (_update_produced_block_metrics) {
_update_produced_block_metrics(
{.unapplied_transactions_total = _unapplied_transactions.size(),
.blacklisted_transactions_total = _blacklisted_transactions.size(),
.subjective_bill_account_size_total = chain.get_subjective_billing().get_account_cache_size(),
.scheduled_trxs_total = chain.db().get_index<generated_transaction_multi_index, by_delay>().size(),
.trxs_produced_total = new_bs->block->transactions.size(),
.cpu_usage_us = br.total_cpu_usage_us,
.net_usage_us = br.total_net_usage,
.last_irreversible = chain.last_irreversible_block_num(),
.head_block_num = chain.head_block_num()});
}

ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} "
"[trxs: ${count}, lib: ${lib}, confirmed: ${confs}, net: ${net}, cpu: ${cpu}, elapsed: ${et}, time: ${tt}]",
("p", new_bs->header.producer)("id", new_bs->id.str().substr(8, 16))("n", new_bs->block_num)("t", new_bs->header.timestamp)
("count", new_bs->block->transactions.size())("lib", chain.last_irreversible_block_num())("net", br.total_net_usage)
("cpu", br.total_cpu_usage_us)("et", br.total_elapsed_time)("tt", br.total_time)("confs", new_bs->header.confirmed));

_time_tracker.add_other_time();
_time_tracker.report(new_bs->block_num, new_bs->block->producer);
_time_tracker.report(new_bs->block_num, new_bs->block->producer, metrics);
_time_tracker.clear();

if (_update_produced_block_metrics) {
metrics.unapplied_transactions_total = _unapplied_transactions.size();
metrics.blacklisted_transactions_total = _blacklisted_transactions.size();
metrics.subjective_bill_account_size_total = chain.get_subjective_billing().get_account_cache_size();
metrics.scheduled_trxs_total = chain.db().get_index<generated_transaction_multi_index, by_delay>().size();
metrics.trxs_produced_total = new_bs->block->transactions.size();
metrics.cpu_usage_us = br.total_cpu_usage_us;
metrics.total_elapsed_time_us = br.total_elapsed_time.count();
metrics.total_time_us = br.total_time.count();
metrics.net_usage_us = br.total_net_usage;
metrics.last_irreversible = chain.last_irreversible_block_num();
metrics.head_block_num = chain.head_block_num();
_update_produced_block_metrics(metrics);
}
}

void producer_plugin::received_block(uint32_t block_num) {
Expand Down Expand Up @@ -2980,6 +2998,10 @@ void producer_plugin::register_update_produced_block_metrics(std::function<void(
my->_update_produced_block_metrics = std::move(fun);
}

void producer_plugin::register_update_speculative_block_metrics(std::function<void(speculative_block_metrics)> && fun) {
my->_update_speculative_block_metrics = std::move(fun);
}

void producer_plugin::register_update_incoming_block_metrics(std::function<void(producer_plugin::incoming_block_metrics)>&& fun) {
my->_update_incoming_block_metrics = std::move(fun);
}
Expand Down
Loading