Skip to content

Commit

Permalink
feat: produce some telemetry spans and events
Browse files Browse the repository at this point in the history
Install an OpenTelemetryLayer configured with the OTLP exporter.
The tracing spans and events to export are selected by target
"movement_telemetry".
  • Loading branch information
mzabaluev committed Oct 15, 2024
1 parent 607ba0b commit 56899ca
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 73 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ pub async fn basic_coin_transfers(

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::Config::from_env()?;
let _guard = movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
&tracing_config,
)?;

// get the lead dot movement from the environment
let dot_movement = DotMovement::try_from_env()?;
Expand Down
9 changes: 4 additions & 5 deletions networks/suzuka/suzuka-full-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use std::process::ExitCode;

#[tokio::main]
async fn main() -> Result<ExitCode, anyhow::Error> {
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::telemetry::Config::from_env()?;
movement_tracing::telemetry::init_tracer_provider(
env!("CARGO_PKG_NAME"),
let tracing_config = movement_tracing::Config::from_env()?;
let _guard = movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
tracing_config,
&tracing_config,
)?;

// get the config file
Expand Down
4 changes: 2 additions & 2 deletions networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ where
})
.await??;

// get the transactions
// get the transactions count before the block is consumed
let transactions_count = block.transactions().len();
let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id);
let commitment =
Expand All @@ -152,7 +152,7 @@ where
self.da_db.set_synced_height(da_height - 1).await?;

// set the block as executed
self.da_db.add_executed_block(block_id.to_string()).await?;
self.da_db.add_executed_block(block_id.clone()).await?;

// todo: this needs defaults
if self.settlement_enabled() {
Expand Down
22 changes: 14 additions & 8 deletions networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use m1_da_light_node_util::config::Config as LightNodeConfig;
use maptos_dof_execution::SignedTransaction;

use tokio::sync::mpsc;
use tracing::{info, warn};
use tracing::{info, info_span, warn, Instrument};

use std::ops::ControlFlow;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::{self, AtomicU64};
use std::time::{Duration, Instant};

const LOGGING_UID: AtomicU64 = AtomicU64::new(0);
Expand All @@ -29,7 +29,16 @@ impl Task {
}

pub async fn run(mut self) -> anyhow::Result<()> {
while let ControlFlow::Continue(()) = self.spawn_write_next_transaction_batch().await? {}
loop {
let batch_id = LOGGING_UID.fetch_add(1, atomic::Ordering::Relaxed);
let span =
info_span!(target: "movement_telemetry", "write_batch", batch_id = %batch_id);
if let ControlFlow::Break(()) =
self.spawn_write_next_transaction_batch().instrument(span).await?
{
break;
}
}
Ok(())
}

Expand All @@ -45,7 +54,6 @@ impl Task {

let mut transactions = Vec::new();

let batch_id = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
loop {
let remaining = match half_building_time.checked_sub(start.elapsed().as_millis() as u64)
{
Expand All @@ -65,12 +73,11 @@ impl Task {
Ok(transaction) => match transaction {
Some(transaction) => {
info!(
target : "movement_telemetry",
batch_id = %batch_id,
target: "movement_telemetry",
tx_hash = %transaction.committed_hash(),
sender = %transaction.sender(),
sequence_number = transaction.sequence_number(),
"received transaction",
"received_transaction",
);
let serialized_aptos_transaction = serde_json::to_vec(&transaction)?;
let movement_transaction = movement_types::transaction::Transaction::new(
Expand All @@ -94,7 +101,6 @@ impl Task {
if transactions.len() > 0 {
info!(
target: "movement_telemetry",
batch_id = %batch_id,
transaction_count = transactions.len(),
"built_batch_write"
);
Expand Down
9 changes: 4 additions & 5 deletions protocol-units/da/m1/light-node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use m1_da_light_node::v1::{LightNodeV1, Manager};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
movement_tracing::init_tracing_subscriber();
let tracing_config = movement_tracing::telemetry::Config::from_env()?;
movement_tracing::telemetry::init_tracer_provider(
env!("CARGO_PKG_NAME"),
let tracing_config = movement_tracing::Config::from_env()?;
let _guard = movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
tracing_config,
&tracing_config,
)?;

let dot_movement = dot_movement::DotMovement::try_from_env()?;
Expand Down
5 changes: 2 additions & 3 deletions protocol-units/execution/opt-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,14 @@ aptos-mempool = { workspace = true }
aptos-temppath = { workspace = true }
aptos-faucet-core = { workspace = true }
aptos-cached-packages = { workspace = true }
maptos-execution-util = { workspace = true }
movement-types = { workspace = true }
aptos-indexer-grpc-fullnode = { workspace = true }
aptos-indexer-grpc-table-info = { workspace = true }
aptos-indexer = { workspace = true }
aptos-protos = { workspace = true }
aptos-logger = { workspace = true }
tonic = { workspace = true }
maptos-execution-util = { workspace = true }
movement-rest = { workspace = true }
movement-types = { workspace = true }

[dev-dependencies]
dirs = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion util/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ rust-version.workspace = true

[dependencies]
anyhow = { workspace = true }
tracing-subscriber = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] }
opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
#console-subscriber = { workspace = true }

[lints]
Expand Down
29 changes: 29 additions & 0 deletions util/tracing/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use anyhow::anyhow;
use std::env;

const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP";

/// Options for tracing configuration.
#[derive(Debug, Default)]
pub struct Config {
/// URL of the OpenTelemetry collector endpoint using the OTLP gRPC protocol.
/// If the value is `None`, telemetry is not exported.
pub otlp_grpc_url: Option<String>,
}

impl Config {
/// Get the tracing configuration from well-known environment variables.
pub fn from_env() -> Result<Self, anyhow::Error> {
let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) {
Ok(url) => Some(url),
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(s)) => {
return Err(anyhow!(
"value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}",
s.to_string_lossy()
));
}
};
Ok(Self { otlp_grpc_url })
}
}
12 changes: 11 additions & 1 deletion util/tracing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
pub mod telemetry;
//! Tracing setup for Movement services.
//!
//! Exporting of tracing data via [OpenTelemetry] is optionally supported
//! by setting "movement_telemetry" as the target in tracing spans and events.
//!
//! [OpenTelemetry]: https://opentelemetry.io/

mod config;
mod telemetry;
mod tracing;

pub use config::Config;
pub use telemetry::ScopeGuard;
pub use tracing::init_tracing_subscriber;
83 changes: 39 additions & 44 deletions util/tracing/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,53 @@
//!
//! [tracing-opentelemetry#159]: https://github.com/tokio-rs/tracing-opentelemetry/issues/159

use anyhow::anyhow;
use opentelemetry::global::{self, BoxedTracer};
use opentelemetry::trace::noop::NoopTracerProvider;
use opentelemetry::KeyValue;
use crate::Config;

use opentelemetry::{trace::TracerProvider as _, KeyValue};
use opentelemetry_otlp::WithExportConfig as _;
use opentelemetry_sdk::{runtime, trace::Config as TraceConfig, Resource};
use opentelemetry_sdk::trace::{Config as TraceConfig, TracerProvider};
use opentelemetry_sdk::{runtime, Resource};
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use tracing::{error, Level, Subscriber};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::filter;
use tracing_subscriber::prelude::*;
use tracing_subscriber::registry::LookupSpan;

use std::env;

const OTLP_TRACING_ENV: &str = "MOVEMENT_OTLP";

/// Options for telemetry configuration.
#[derive(Debug, Default)]
pub struct Config {
/// URL of the collector endpoint using the OTLP gRPC protocol.
pub otlp_grpc_url: Option<String>,
}
/// The scope guard object for the background tasks of the tracing subsystem.
///
/// This object needs to be kept alive for the duration of the program.
#[must_use = "should be dropped at the end of the program scope"]
#[derive(Debug)]
pub struct ScopeGuard(Option<TracerProvider>);

impl Config {
/// Get the tracing configuration from well-known environment variables.
pub fn from_env() -> Result<Self, anyhow::Error> {
let otlp_grpc_url = match env::var(OTLP_TRACING_ENV) {
Ok(url) => Some(url),
Err(env::VarError::NotPresent) => None,
Err(env::VarError::NotUnicode(s)) => {
return Err(anyhow!(
"value of environment variable {OTLP_TRACING_ENV} is not valid UTF-8: {}",
s.to_string_lossy()
));
impl Drop for ScopeGuard {
fn drop(&mut self) {
if let Some(tracer_provider) = &self.0 {
// Make sure all batched traces are exported.
if let Err(e) = tracer_provider.shutdown() {
error!("OpenTelemetry tracer provider shutdown failed: {e}");
}
};
Ok(Self { otlp_grpc_url })
}
}
}

/// Global initialization of the OpenTelemetry tracer provider.
/// Adds an optional OpenTelemetry tracing layer to the provided subscriber.
///
/// This function should be called at the start of the program before any
/// threads are able to use OpenTelemetry tracers. The function will panic
/// if not called within a Tokio runtime.
pub fn init_tracer_provider(
pub(crate) fn init_tracing_layer<S>(
subscriber: S,
service_name: &'static str,
service_version: &'static str,
config: Config,
) -> Result<(), anyhow::Error> {
if let Some(endpoint) = config.otlp_grpc_url {
dbg!(&endpoint);
config: &Config,
) -> Result<(ScopeGuard, impl Subscriber), anyhow::Error>
where
S: Subscriber,
for<'span> S: LookupSpan<'span>,
{
let (tracer_provider, layer) = if let Some(endpoint) = &config.otlp_grpc_url {
let exporter = opentelemetry_otlp::new_exporter().tonic().with_endpoint(endpoint);
let provider = opentelemetry_otlp::new_pipeline()
.tracing()
Expand All @@ -63,15 +62,11 @@ pub fn init_tracer_provider(
KeyValue::new(SERVICE_VERSION, service_version),
])))
.install_batch(runtime::Tokio)?;
dbg!(&provider);
global::set_tracer_provider(provider);
let layer = OpenTelemetryLayer::new(provider.tracer("movement"))
.with_filter(filter::Targets::new().with_target("movement_telemetry", Level::INFO));
(Some(provider), Some(layer))
} else {
global::set_tracer_provider(NoopTracerProvider::new());
}
Ok(())
}

/// Get the tracer configured for the process.
pub fn tracer() -> BoxedTracer {
global::tracer("movement")
(None, None)
};
Ok((ScopeGuard(tracer_provider), subscriber.with(layer)))
}
20 changes: 18 additions & 2 deletions util/tracing/src/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
use crate::telemetry::{self, ScopeGuard};
use crate::Config;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, Registry};

/// Sets up the tracing subscribers for a Movement process. This should be
/// called at the beginning of a process' `main` function.
pub fn init_tracing_subscriber() {
///
/// If successful, returns a guard object that should be dropped at the end
/// of the process' `main` function scope.
pub fn init_tracing_subscriber(
service_name: &'static str,
service_version: &'static str,
config: &Config,
) -> Result<ScopeGuard, anyhow::Error> {
// TODO: compose console_subscriber as a layer
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
tracing_subscriber::fmt().with_env_filter(env_filter).init();
let fmt_layer = fmt::layer().with_filter(env_filter);
let subscriber = Registry::default().with(fmt_layer);
let (scope_guard, subscriber) =
telemetry::init_tracing_layer(subscriber, service_name, service_version, config)?;
subscriber.init();
Ok(scope_guard)
}

0 comments on commit 56899ca

Please sign in to comment.