From 2c41faf63b5136ab22a2ab3ac972c9752dc4a7bf Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Thu, 19 Sep 2024 16:41:04 +0800 Subject: [PATCH] fix: revert log format breakage from #16249 (#16470) * fix: revert log format breakage from #16249 * fix --------- Co-authored-by: Bohu --- Cargo.lock | 7 ++- src/common/tracing/Cargo.toml | 4 +- src/common/tracing/src/init.rs | 52 ++++++++++------- src/common/tracing/src/loggers.rs | 93 +++++++++++++++++++++++++++++-- 4 files changed, 129 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a6a243e7bf1..2388d322e6ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4748,6 +4748,7 @@ dependencies = [ name = "databend-common-tracing" version = "0.1.0" dependencies = [ + "anyhow", "backtrace", "chrono", "color-backtrace", @@ -4764,6 +4765,7 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry_sdk", "serde", + "serde_json", "strip-ansi-escapes", "tonic 0.11.0", ] @@ -10023,8 +10025,9 @@ dependencies = [ [[package]] name = "logforth" -version = "0.11.0" -source = "git+http://github.com/andylokandy/logforth?rev=0ca61ca#0ca61ca0fa3c87b5af5a08aa0354d96604e685c0" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633080680671612565f637d1e33c5bcb7d58fb12c7d658baa166a03487265e80" dependencies = [ "anyhow", "colored", diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index bf3b1cd70ce8..58ebe2903dbf 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -11,6 +11,7 @@ doctest = false test = true [dependencies] +anyhow = { workspace = true } backtrace = { workspace = true } chrono = { workspace = true } color-backtrace = { version = "0.6" } @@ -22,7 +23,7 @@ fastrace-opentelemetry = { workspace = true } itertools = { workspace = true } libc = "0.2.153" log = { workspace = true } -logforth = { version = "0.11", git = "http://github.com/andylokandy/logforth", rev = "0ca61ca", features = [ +logforth = { version = "0.12", features = [ 'json', 'rolling_file', 'opentelemetry', @@ -32,6 +33,7 @@ opentelemetry = { workspace = true } opentelemetry-otlp = { workspace = true } opentelemetry_sdk = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } strip-ansi-escapes = "0.2" tonic = { workspace = true } diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 7d8a28250aec..52097dca157c 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -224,15 +224,19 @@ pub fn init_logging( let labels = labels .iter() .chain(&cfg.otlp.endpoint.labels) - .map(|(k, v)| (k.clone().into(), v.clone().into())) - .chain([("category".into(), "system".into())]); - let otel = logforth::append::OpentelemetryLog::new( + .map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone()))) + .chain([(Cow::from("category"), Cow::from("system"))]); + let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, &cfg.otlp.endpoint.endpoint, - cfg.otlp.endpoint.protocol.into(), - labels, ) - .expect("initialize opentelemetry logger"); + .with_protocol(cfg.otlp.endpoint.protocol.into()); + for (k, v) in labels { + otel_builder = otel_builder.add_label(k, v); + } + let otel = otel_builder + .build() + .expect("initialize opentelemetry logger"); let dispatch = Dispatch::new() .filter(TargetFilter::level_for( "databend::log::query", @@ -297,15 +301,19 @@ pub fn init_logging( let labels = labels .iter() .chain(&endpoint.labels) - .map(|(k, v)| (k.clone().into(), v.clone().into())) - .chain([("category".into(), "query".into())]); - let otel = logforth::append::OpentelemetryLog::new( + .map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone()))) + .chain([(Cow::from("category"), Cow::from("query"))]); + let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, - &endpoint.endpoint, - endpoint.protocol.into(), - labels, + &cfg.otlp.endpoint.endpoint, ) - .expect("initialize opentelemetry logger"); + .with_protocol(cfg.otlp.endpoint.protocol.into()); + for (k, v) in labels { + otel_builder = otel_builder.add_label(k, v); + } + let otel = otel_builder + .build() + .expect("initialize opentelemetry logger"); let dispatch = Dispatch::new() .filter(TargetFilter::level_for_not( "databend::log::query", @@ -335,15 +343,19 @@ pub fn init_logging( let labels = labels .iter() .chain(&endpoint.labels) - .map(|(k, v)| (k.clone().into(), v.clone().into())) - .chain([("category".into(), "profile".into())]); - let otel = logforth::append::OpentelemetryLog::new( + .map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone()))) + .chain([(Cow::from("category"), Cow::from("profile"))]); + let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, - &endpoint.endpoint, - endpoint.protocol.into(), - labels, + &cfg.otlp.endpoint.endpoint, ) - .expect("initialize opentelemetry logger"); + .with_protocol(cfg.otlp.endpoint.protocol.into()); + for (k, v) in labels { + otel_builder = otel_builder.add_label(k, v); + } + let otel = otel_builder + .build() + .expect("initialize opentelemetry logger"); let dispatch = Dispatch::new() .filter(TargetFilter::level_for_not( "databend::log::profile", diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs index 08ff7ed96aad..6285a438fb57 100644 --- a/src/common/tracing/src/loggers.rs +++ b/src/common/tracing/src/loggers.rs @@ -12,13 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Arguments; +use std::path::Path; + +use databend_common_base::runtime::ThreadTracker; +use log::Record; use logforth::append::rolling_file::NonBlockingBuilder; use logforth::append::rolling_file::RollingFileWriter; use logforth::append::rolling_file::Rotation; use logforth::append::RollingFile; -use logforth::layout::JsonLayout; -use logforth::layout::TextLayout; +use logforth::layout::collect_kvs; +use logforth::layout::CustomLayout; +use logforth::layout::KvDisplay; use logforth::Layout; +use serde_json::Map; /// Create a `BufWriter` for a rolling file logger. pub(crate) fn new_rolling_file_appender( @@ -41,8 +48,86 @@ pub(crate) fn new_rolling_file_appender( pub fn get_layout(format: &str) -> Layout { match format { - "text" => TextLayout::default().into(), - "json" => JsonLayout::default().into(), + "text" => text_layout(), + "json" => json_layout(), _ => unimplemented!("file logging format {format} is not supported"), } } + +fn text_layout() -> Layout { + CustomLayout::new( + |record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| { + match ThreadTracker::query_id() { + None => { + f(format_args!( + "{} {:>5} {}: {}:{} {}{}", + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + record.module_path().unwrap_or(""), + Path::new(record.file().unwrap_or_default()) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default(), + record.line().unwrap_or(0), + record.args(), + KvDisplay::new(record.key_values()), + ))?; + } + Some(query_id) => { + f(format_args!( + "{} {} {:>5} {}: {}:{} {}{}", + query_id, + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + record.module_path().unwrap_or(""), + Path::new(record.file().unwrap_or_default()) + .file_name() + .and_then(|name| name.to_str()) + .unwrap_or_default(), + record.line().unwrap_or(0), + record.args(), + KvDisplay::new(record.key_values()), + ))?; + } + } + + Ok(()) + }, + ) + .into() +} + +fn json_layout() -> Layout { + CustomLayout::new( + |record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| { + let mut fields = Map::new(); + fields.insert("message".to_string(), format!("{}", record.args()).into()); + for (k, v) in collect_kvs(record.key_values()) { + fields.insert(k, v.into()); + } + + match ThreadTracker::query_id() { + None => { + f(format_args!( + r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#, + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + serde_json::to_string(&fields).unwrap_or_default(), + ))?; + } + Some(query_id) => { + f(format_args!( + r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#, + chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + record.level(), + query_id, + serde_json::to_string(&fields).unwrap_or_default(), + ))?; + } + } + + Ok(()) + }, + ) + .into() +}