Skip to content

Commit

Permalink
fix: revert log format breakage from #16249 (#16470)
Browse files Browse the repository at this point in the history
* fix: revert log format breakage from #16249

* fix

---------

Co-authored-by: Bohu <[email protected]>
  • Loading branch information
andylokandy and BohuTANG committed Sep 19, 2024
1 parent d9131cc commit 2c41faf
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 27 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/common/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ doctest = false
test = true

[dependencies]
anyhow = { workspace = true }
backtrace = { workspace = true }
chrono = { workspace = true }
color-backtrace = { version = "0.6" }
Expand All @@ -22,7 +23,7 @@ fastrace-opentelemetry = { workspace = true }
itertools = { workspace = true }
libc = "0.2.153"
log = { workspace = true }
logforth = { version = "0.11", git = "http://github.com/andylokandy/logforth", rev = "0ca61ca", features = [
logforth = { version = "0.12", features = [
'json',
'rolling_file',
'opentelemetry',
Expand All @@ -32,6 +33,7 @@ opentelemetry = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry_sdk = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
strip-ansi-escapes = "0.2"
tonic = { workspace = true }

Expand Down
52 changes: 32 additions & 20 deletions src/common/tracing/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,19 @@ pub fn init_logging(
let labels = labels
.iter()
.chain(&cfg.otlp.endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "system".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("system"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&cfg.otlp.endpoint.endpoint,
cfg.otlp.endpoint.protocol.into(),
labels,
)
.expect("initialize opentelemetry logger");
.with_protocol(cfg.otlp.endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for(
"databend::log::query",
Expand Down Expand Up @@ -297,15 +301,19 @@ pub fn init_logging(
let labels = labels
.iter()
.chain(&endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "query".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("query"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&endpoint.endpoint,
endpoint.protocol.into(),
labels,
&cfg.otlp.endpoint.endpoint,
)
.expect("initialize opentelemetry logger");
.with_protocol(cfg.otlp.endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for_not(
"databend::log::query",
Expand Down Expand Up @@ -335,15 +343,19 @@ pub fn init_logging(
let labels = labels
.iter()
.chain(&endpoint.labels)
.map(|(k, v)| (k.clone().into(), v.clone().into()))
.chain([("category".into(), "profile".into())]);
let otel = logforth::append::OpentelemetryLog::new(
.map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone())))
.chain([(Cow::from("category"), Cow::from("profile"))]);
let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new(
log_name,
&endpoint.endpoint,
endpoint.protocol.into(),
labels,
&cfg.otlp.endpoint.endpoint,
)
.expect("initialize opentelemetry logger");
.with_protocol(cfg.otlp.endpoint.protocol.into());
for (k, v) in labels {
otel_builder = otel_builder.add_label(k, v);
}
let otel = otel_builder
.build()
.expect("initialize opentelemetry logger");
let dispatch = Dispatch::new()
.filter(TargetFilter::level_for_not(
"databend::log::profile",
Expand Down
93 changes: 89 additions & 4 deletions src/common/tracing/src/loggers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Arguments;
use std::path::Path;

use databend_common_base::runtime::ThreadTracker;
use log::Record;
use logforth::append::rolling_file::NonBlockingBuilder;
use logforth::append::rolling_file::RollingFileWriter;
use logforth::append::rolling_file::Rotation;
use logforth::append::RollingFile;
use logforth::layout::JsonLayout;
use logforth::layout::TextLayout;
use logforth::layout::collect_kvs;
use logforth::layout::CustomLayout;
use logforth::layout::KvDisplay;
use logforth::Layout;
use serde_json::Map;

/// Create a `BufWriter<NonBlocking>` for a rolling file logger.
pub(crate) fn new_rolling_file_appender(
Expand All @@ -41,8 +48,86 @@ pub(crate) fn new_rolling_file_appender(

pub fn get_layout(format: &str) -> Layout {
match format {
"text" => TextLayout::default().into(),
"json" => JsonLayout::default().into(),
"text" => text_layout(),
"json" => json_layout(),
_ => unimplemented!("file logging format {format} is not supported"),
}
}

fn text_layout() -> Layout {
CustomLayout::new(
|record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| {
match ThreadTracker::query_id() {
None => {
f(format_args!(
"{} {:>5} {}: {}:{} {}{}",
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
record.module_path().unwrap_or(""),
Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default(),
record.line().unwrap_or(0),
record.args(),
KvDisplay::new(record.key_values()),
))?;
}
Some(query_id) => {
f(format_args!(
"{} {} {:>5} {}: {}:{} {}{}",
query_id,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
record.module_path().unwrap_or(""),
Path::new(record.file().unwrap_or_default())
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default(),
record.line().unwrap_or(0),
record.args(),
KvDisplay::new(record.key_values()),
))?;
}
}

Ok(())
},
)
.into()
}

fn json_layout() -> Layout {
CustomLayout::new(
|record: &Record, f: &dyn Fn(Arguments) -> anyhow::Result<()>| {
let mut fields = Map::new();
fields.insert("message".to_string(), format!("{}", record.args()).into());
for (k, v) in collect_kvs(record.key_values()) {
fields.insert(k, v.into());
}

match ThreadTracker::query_id() {
None => {
f(format_args!(
r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
serde_json::to_string(&fields).unwrap_or_default(),
))?;
}
Some(query_id) => {
f(format_args!(
r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#,
chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true),
record.level(),
query_id,
serde_json::to_string(&fields).unwrap_or_default(),
))?;
}
}

Ok(())
},
)
.into()
}

0 comments on commit 2c41faf

Please sign in to comment.