Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenTelemetry OTLP setup for tracing, take 2 #697

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 259 additions & 49 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ move-vm-ext = { path = "types/move-vm-ext" }
num-derive = "0.4.2"
num-traits = "0.2.14"
once_cell = "1.8.0"
opentelemetry = { version = "0.26" }
opentelemetry-otlp = { version = "0.26" }
opentelemetry_sdk = { version = "0.26", features = ["rt-tokio"] }
opentelemetry-semantic-conventions = { version = "0.26" }
parking_lot = { version = "0.12.1" }
poem = { version = "=1.3.59", features = ["anyhow", "rustls"] }
poem-openapi = { version = "=2.0.11", features = ["swagger-ui", "url"] }
Expand Down Expand Up @@ -283,7 +287,7 @@ tonic-reflection = "0.11"
tonic-web = "0.11"
### To try (experimental) std support, add `features = [ "std" ]` to risc0-zkvm
tracing = "0.1.40"
tracing-appender = "0.2"
tracing-opentelemetry = { version = "0.27" }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-test = "0.2.5"
trie-db = "0.28.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use tokio::sync::RwLock;
use tracing::info;
use url::Url;

const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG";

pub fn get_suzuka_config(
dot_movement: &DotMovement,
) -> Result<suzuka_config::Config, anyhow::Error> {
Expand Down Expand Up @@ -247,10 +245,12 @@ pub async fn basic_coin_transfers(

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let tracing_config = movement_tracing::Config {
timing_log_path: std::env::var_os(TIMING_LOG_ENV).map(Into::into),
};
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
let tracing_config = movement_tracing::Config::from_env()?;
let _guard = movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
&tracing_config,
)?;

// get the lead dot movement from the environment
let dot_movement = DotMovement::try_from_env()?;
Expand Down
12 changes: 6 additions & 6 deletions networks/suzuka/suzuka-full-node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use suzuka_full_node::manager::Manager;

use std::env;
use std::process::ExitCode;

const TIMING_LOG_ENV: &str = "SUZUKA_TIMING_LOG";

#[tokio::main]
async fn main() -> Result<ExitCode, anyhow::Error> {
let tracing_config =
movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) };
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
let tracing_config = movement_tracing::Config::from_env()?;
let _guard = movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
&tracing_config,
)?;

// get the config file
let dot_movement = dot_movement::DotMovement::try_from_env()?;
Expand Down
34 changes: 27 additions & 7 deletions networks/suzuka/suzuka-full-node/src/tasks/execute_settle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ where
})
.await??;

// get the transactions
// get the transactions count before the block is consumed
let transactions_count = block.transactions().len();
let span = info_span!(target: "movement_timing", "execute_block", id = %block_id);
let span = info_span!(target: "movement_telemetry", "execute_block", id = %block_id);
let commitment =
self.execute_block_with_retries(block, block_timestamp).instrument(span).await?;

Expand All @@ -152,7 +152,7 @@ where
self.da_db.set_synced_height(da_height - 1).await?;

// set the block as executed
self.da_db.add_executed_block(block_id.to_string()).await?;
self.da_db.add_executed_block(block_id.clone()).await?;

// todo: this needs defaults
if self.settlement_enabled() {
Expand Down Expand Up @@ -183,18 +183,22 @@ where
block: Block,
mut block_timestamp: u64,
) -> anyhow::Result<BlockCommitment> {
for _ in 0..self.execution_extension.block_retry_count {
let retry_count = self.execution_extension.block_retry_count;
for _ in 0..retry_count {
// we have to clone here because the block is supposed to be consumed by the executor
match self.execute_block(block.clone(), block_timestamp).await {
Ok(commitment) => return Ok(commitment),
Ok(commitment) => {
info!(target: "movement_telemetry", "execute_block_succeeded");
return Ok(commitment);
}
Err(e) => {
info!("Failed to execute block: {:?}. Retrying", e);
info!(target: "movement_telemetry", error = %e, "execute_block_failed");
block_timestamp += self.execution_extension.block_retry_increment_microseconds; // increase the timestamp by 5 ms (5000 microseconds)
}
}
}

anyhow::bail!("Failed to execute block after 5 retries")
anyhow::bail!("Failed to execute block after {retry_count} retries")
}

async fn execute_block(
Expand Down Expand Up @@ -226,6 +230,22 @@ where
continue;
}

// Instrumentation for aggregated metrics:
// Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422
// Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423
// Transaction failure rate: https://github.com/movementlabsxyz/movement/discussions/428
//
// TODO: as the block can be attempted to be executed repeatedly,
// collect this data once and export in telemetry
// on the final success or failure.
info!(
target: "movement_telemetry",
tx_hash = %signed_transaction.committed_hash(),
sender = %signed_transaction.sender(),
sequence_number = signed_transaction.sequence_number(),
"executing_transaction"
);

let signature_verified_transaction = SignatureVerifiedTransaction::Valid(
Transaction::UserTransaction(signed_transaction),
);
Expand Down
36 changes: 18 additions & 18 deletions networks/suzuka/suzuka-full-node/src/tasks/transaction_ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ use m1_da_light_node_util::config::Config as LightNodeConfig;
use maptos_dof_execution::SignedTransaction;

use tokio::sync::mpsc;
use tracing::{info, warn};
use tracing::{info, info_span, warn, Instrument};

use std::ops::ControlFlow;
use std::sync::atomic::AtomicU64;
use std::time::{Duration, Instant};

const LOGGING_UID: AtomicU64 = AtomicU64::new(0);

pub struct Task {
transaction_receiver: mpsc::Receiver<SignedTransaction>,
da_light_node_client: LightNodeServiceClient<tonic::transport::Channel>,
Expand All @@ -29,14 +26,13 @@ impl Task {
}

pub async fn run(mut self) -> anyhow::Result<()> {
while let ControlFlow::Continue(()) = self.spawn_write_next_transaction_batch().await? {}
while let ControlFlow::Continue(()) = self.build_and_write_batch().await? {}
Ok(())
}

/// Constructs a batch of transactions then spawns the write request to the DA in the background.
async fn spawn_write_next_transaction_batch(
&mut self,
) -> Result<ControlFlow<(), ()>, anyhow::Error> {
#[tracing::instrument(target = "movement_telemetry", skip(self))]
async fn build_and_write_batch(&mut self) -> Result<ControlFlow<(), ()>, anyhow::Error> {
use ControlFlow::{Break, Continue};

// limit the total time batching transactions
Expand All @@ -45,7 +41,6 @@ impl Task {

let mut transactions = Vec::new();

let batch_id = LOGGING_UID.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
loop {
let remaining = match half_building_time.checked_sub(start.elapsed().as_millis() as u64)
{
Expand All @@ -64,13 +59,15 @@ impl Task {
{
Ok(transaction) => match transaction {
Some(transaction) => {
// Instrumentation for aggregated metrics:
// Transactions per second: https://github.com/movementlabsxyz/movement/discussions/422
// Transaction latency: https://github.com/movementlabsxyz/movement/discussions/423
info!(
target : "movement_timing",
batch_id = %batch_id,
target: "movement_telemetry",
tx_hash = %transaction.committed_hash(),
sender = %transaction.sender(),
sequence_number = transaction.sequence_number(),
"received transaction",
"received_transaction",
);
let serialized_aptos_transaction = serde_json::to_vec(&transaction)?;
let movement_transaction = movement_types::transaction::Transaction::new(
Expand All @@ -93,19 +90,22 @@ impl Task {

if transactions.len() > 0 {
info!(
target: "movement_timing",
batch_id = %batch_id,
target: "movement_telemetry",
transaction_count = transactions.len(),
"built_batch_write"
);
let batch_write = BatchWriteRequest { blobs: transactions };
// spawn the actual batch write request in the background
let mut da_light_node_client = self.da_light_node_client.clone();
tokio::spawn(async move {
if let Err(e) = da_light_node_client.batch_write(batch_write).await {
warn!("failed to write batch to DA: {:?}", e);
let write_span = info_span!(target: "movement_telemetry", "batch_write");
tokio::spawn(
async move {
if let Err(e) = da_light_node_client.batch_write(batch_write).await {
warn!("failed to write batch to DA: {:?}", e);
}
}
});
.instrument(write_span),
);
}

Ok(Continue(()))
Expand Down
24 changes: 24 additions & 0 deletions process-compose/suzuka-full-node/process-compose.telemetry.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3"

environment:

processes:
otlp-collector:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't seem to be getting anything served by this and the e2e tests for the basic simulation are failing. Is this how I should be using this?:

just suzuka-full-node native build.setup.eth-local.celestia-local.test.telemetry --keep-project

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Screenshot 2024-10-18 at 10 49 29 AM

This is all I'm seeing from the collector process when I start with the above.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a feed overlay in case for some reason this is not in fact staying open with the --keep-project flag.

just suzuka-full-node native build.setup.eth-local.celestia-local.feed.telemetry --keep-project

is_daemon: true
command: |
docker run -d --rm --name suzuka-otlp-collector -p16686:16686 -p4317:4317 -e COLLECTOR_OTLP_ENABLED=true jaegertracing/all-in-one:latest
shutdown:
command: |
docker stop suzuka-otlp-collector
suzuka-full-node:
depends_on:
otlp-collector:
condition: process_started
environment:
- MOVEMENT_OTLP=http://localhost:4317
m1-da-light-node:
depends_on:
otlp-collector:
condition: process_started
environment:
- MOVEMENT_OTLP=http://localhost:4317
4 changes: 2 additions & 2 deletions process-compose/suzuka-full-node/process-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ processes:
suzuka-full-node:
command: |
suzuka-full-node
env:
RUST_LOG: info,aptos-indexer=debug
environment:
- RUST_LOG=info
depends_on:
m1-da-light-node:
condition: process_healthy
Expand Down
13 changes: 6 additions & 7 deletions protocol-units/da/m1/light-node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use m1_da_light_node::v1::{LightNodeV1, Manager};

use std::env;

const TIMING_LOG_ENV: &str = "M1_DA_LIGHT_NODE_TIMING_LOG";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let tracing_config =
movement_tracing::Config { timing_log_path: env::var_os(TIMING_LOG_ENV).map(Into::into) };
let _guard = movement_tracing::init_tracing_subscriber(tracing_config);
let tracing_config = movement_tracing::Config::from_env()?;
let _guard = movement_tracing::init_tracing_subscriber(
env!("CARGO_BIN_NAME"),
env!("CARGO_PKG_VERSION"),
&tracing_config,
)?;

let dot_movement = dot_movement::DotMovement::try_from_env()?;
let config_path = dot_movement.get_config_json_path();
Expand Down
1 change: 0 additions & 1 deletion protocol-units/da/m1/light-node/src/v1/passthrough.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl LightNodeV1 {
Ok(verified_blobs)
}

#[tracing::instrument(target = "movement_timing", level = "debug")]
async fn get_blobs_at_height(&self, height: u64) -> Result<Vec<Blob>, anyhow::Error> {
let celestia_blobs = self.get_celestia_blobs_at_height(height).await?;
let mut blobs = Vec::new();
Expand Down
Loading