From 626873f50f206eedcd0f3df9871c0e9df998dff3 Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Wed, 11 Sep 2024 13:31:27 +0900 Subject: [PATCH] Introduce an observability crate using opentelemetry-rust (#2535) (#2609) * feat: add observability crate * fix: fix crate name * feat: add module files * feat: create config * fix: implement from trait * feat: add tracer and meter macro * feat: add license * fix: use full module path * fix: add shutdown method * feat: add build method to crate tracer provider and meter provider * fix: module path * fix: add export timeout duration and comment * feat: update interface * feat: add endpint configuration to root configuration * fix: update deps * fix: small refactor * fix: deleted build method to create trace and metrics provider --------- Signed-off-by: hlts2 --- rust/libs/observability/src/config.rs | 142 ++++++++++++++++++ rust/libs/observability/src/lib.rs | 19 +++ rust/libs/observability/src/macros.rs | 150 +++++++++++++++++++ rust/libs/observability/src/observability.rs | 124 +++++++++++++++ 4 files changed, 435 insertions(+) create mode 100644 rust/libs/observability/src/config.rs create mode 100644 rust/libs/observability/src/lib.rs create mode 100644 rust/libs/observability/src/macros.rs create mode 100644 rust/libs/observability/src/observability.rs diff --git a/rust/libs/observability/src/config.rs b/rust/libs/observability/src/config.rs new file mode 100644 index 0000000000..1faa8fd90e --- /dev/null +++ b/rust/libs/observability/src/config.rs @@ -0,0 +1,142 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +use std::collections::HashMap; +use std::time::Duration; + +use opentelemetry::KeyValue; +use opentelemetry_sdk::{self, Resource}; + +#[derive(Clone, Debug)] +pub struct Config { + pub enabled: bool, + pub endpoint: String, + pub attributes: HashMap, + pub tracer: Tracer, + pub meter: Meter, +} + +#[derive(Clone, Debug, Default)] +pub struct Tracer { + pub enabled: bool, +} + +#[derive(Clone, Debug)] +pub struct Meter { + pub enabled: bool, + pub export_duration: Duration, + pub export_timeout_duration: Duration, +} + +impl Config { + pub fn new() -> Self { + Self::default() + } + + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled = enabled; + self + } + + pub fn endpoint(mut self, endpoint: &str) -> Self { + self.endpoint = endpoint.to_string(); + self + } + + pub fn attributes(mut self, attrs: HashMap) -> Self { + self.attributes = attrs; + self + } + + pub fn attribute(mut self, key: &str, value: &str) -> Self { + self.attributes.insert(key.to_string(), value.to_string()); + self + } + + pub fn tracer(mut self, cfg: Tracer) -> Self { + self.tracer = cfg; + self + } + + pub fn meter(mut self, cfg: Meter) -> Self { + self.meter = cfg; + self + } +} + +impl Default for Config { + fn default() -> Self { + Self { + enabled: false, + endpoint: "".to_string(), + attributes: HashMap::new(), + tracer: Tracer::default(), + meter: Meter::default(), + } + } +} + +impl From<&Config> for Resource { + fn from(value: &Config) -> Self { + let key_values: Vec = value + .attributes + .iter() + .map(|(key, val)| KeyValue::new(key.clone(), val.clone())) + .collect(); + Resource::new(key_values) + } +} + +impl Tracer { + pub fn new() -> Self { + Tracer::default() + } + + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled = enabled; + self + } +} + +impl Meter { + pub fn new() -> Self { + Meter::default() + } + + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled = enabled; + self + } + + pub fn export_duration(mut self, dur: Duration) -> Self { + self.export_duration = dur; + self + } + + pub fn export_timeout_duration(mut self, dur: Duration) -> Self { + self.export_timeout_duration = dur; + self + } +} + +impl Default for Meter { + fn default() -> Self { + Self { + enabled: false, + export_duration: Duration::from_secs(1), + export_timeout_duration: Duration::from_secs(5), + } + } +} diff --git a/rust/libs/observability/src/lib.rs b/rust/libs/observability/src/lib.rs new file mode 100644 index 0000000000..bf0aef235c --- /dev/null +++ b/rust/libs/observability/src/lib.rs @@ -0,0 +1,19 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +pub mod config; +pub mod macros; +pub mod observability; diff --git a/rust/libs/observability/src/macros.rs b/rust/libs/observability/src/macros.rs new file mode 100644 index 0000000000..21b57f5331 --- /dev/null +++ b/rust/libs/observability/src/macros.rs @@ -0,0 +1,150 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +#[macro_export] +macro_rules! tracer { + () => {{ + tracer!("vald") + }}; + + ($name:expr) => {{ + opentelemetry::global::tracer($name) + }}; +} + +#[macro_export] +macro_rules! ctx_span { + ($ctx:expr, $name:expr) => {{ + ctx_span!($ctx, $name, opentelemetry::trace::SpanKind::Internal) + }}; + + ($ctx:expr, $name:expr, $kind:expr) => {{ + let tracer = tracer!(); + let parent_ctx: &opentelemetry::Context = $ctx; + let span = tracer + .span_builder($name) + .with_kind($kind) + .start_with_context(&tracer, parent_ctx); + opentelemetry::Context::current_with_span(span) + }}; +} + +#[macro_export] +macro_rules! meter { + () => {{ + meter!("vald") + }}; + + ($name:expr) => {{ + opentelemetry::global::meter($name) + }}; +} + +#[derive(Debug, PartialEq)] +pub enum InstrumentKind { + UpdownCounter, + Counter, + Histogram, + Gauge, +} + +#[macro_export] +macro_rules! instrument { + (InstrumentKind::Counter, $typ:ty, $name:expr, $disc:expr, $unit:expr) => {{ + let meter = meter!(); + paste::paste! { + meter + .[<$typ _counter>]($name) // typ = f64 or u64 + .with_description($disc) + .with_unit($unit) + .init() + } + }}; + + (InstrumentKind::Counter, $typ:ty, $name:expr, $disc:expr, $unit:expr, $measurement:expr, $key_value:expr) => {{ + let meter = meter!(); + paste::paste! { + meter + .[<$typ _observable_counter>]($name) // typ = f64 or u64 + .with_description($disc) + .with_unit($unit) + .with_callback(|observe| { + observe.observe($measurement, $key_value); + }) + .init() + } + }}; + + (InstrumentKind::UpdownCounter, $typ:ty, $name:expr, $disc:expr, $unit:expr) => {{ + let meter = meter!(); + paste::paste! { + meter + .[<$typ _up_down_counter>]($name) // typ = f64 or i64 + .with_description($disc) + .with_unit($unit) + .init() + } + }}; + + (InstrumentKind::UpdownCounter, $typ:ty, $name:expr, $disc:expr, $unit:expr, $measurement:expr, $key_value:expr) => {{ + let meter = meter!(); + paste::paste! { + meter + .[<$typ _observable_up_down_counter>]($name) // typ = f64 or i64 + .with_description($disc) + .with_unit($unit) + .with_callback(|observe| { + observe.observe($measurement, $key_value); + }) + .init() + } + }}; + + (InstrumentKind::Histogram, $typ:ty, $name:expr, $disc:expr, $unit:expr) => {{ + let meter = meter!(); + paste::paste! { + meter + .[<$typ _histogram>]($name) // typ = f64 or i64 + .with_description($disc) + .with_unit($unit) + .init() + } + }}; + + (InstrumentKind::Gauge, $typ:ty, $name:expr, $disc:expr, $unit:expr) => {{ + let meter = meter!(); + paste::paste! { + meter + .[<$typ _gauge>]($name) // typ = f64 or i64 or u64 + .with_description($disc) + .with_unit($unit) + .init() + } + }}; + + (InstrumentKind::Gauge, $typ:ty, $name:expr, $disc:expr, $unit:expr, $measurement:expr, $key_value:expr) => {{ + let meter = meter!(); + paste::paste! { + meter + .[<$typ _observable_gauge>]($name) // typ = f64 or u64 or u64 + .with_description($disc) + .with_unit($unit) + .with_callback(|observe| { + observe.observe($measurement, $key_value); + }) + .init() + } + }}; +} diff --git a/rust/libs/observability/src/observability.rs b/rust/libs/observability/src/observability.rs new file mode 100644 index 0000000000..a80ab6d92a --- /dev/null +++ b/rust/libs/observability/src/observability.rs @@ -0,0 +1,124 @@ +// +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +use anyhow::{Context, Ok, Result}; +use opentelemetry::global::{self, shutdown_tracer_provider}; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::trace::{self, TracerProvider}; +use opentelemetry_sdk::{runtime, Resource}; +use url::Url; + +use crate::config::Config; + +pub const SERVICE_NAME: &str = opentelemetry_semantic_conventions::resource::SERVICE_NAME; + +pub trait Observability { + fn shutdown(&mut self) -> Result<()>; +} + +pub struct ObservabilityImpl { + config: Config, + meter_provider: Option, + tracer_provider: Option, +} + +impl ObservabilityImpl { + pub fn new(cfg: Config) -> Result { + let mut obj = ObservabilityImpl { + config: cfg, + meter_provider: None, + tracer_provider: None, + }; + + if !obj.config.enabled { + return Ok(obj); + } + + if obj.config.meter.enabled { + // NOTE: Since the agent implementation does not use views, we will use the simplest implementation for the current phase. + // If we want flexibility and customization, use SdkMeterProvider::builder. + let provider = opentelemetry_otlp::new_pipeline() + .metrics(runtime::Tokio) + .with_period(obj.config.meter.export_duration) + .with_resource(Resource::from(obj.config())) + .with_timeout(obj.config.meter.export_timeout_duration) + .with_exporter( + opentelemetry_otlp::new_exporter().tonic().with_endpoint( + Url::parse(obj.config.endpoint.as_str())? + .join("/v1/metrics")? + .as_str(), + ), + ) + .build()?; + obj.meter_provider = Some(provider.clone()); + global::set_meter_provider(provider.clone()); + } + + if obj.config.tracer.enabled { + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter().tonic().with_endpoint( + Url::parse(obj.config.endpoint.as_str())? + .join("/v1/traces")? + .as_str(), + ), + ) + .with_trace_config( + trace::config() + .with_sampler(trace::Sampler::AlwaysOn) + .with_resource(Resource::from(obj.config())) + .with_id_generator(trace::RandomIdGenerator::default()), + ) + .install_batch(runtime::Tokio)?; + let provider = tracer.provider().context("failed to get provider")?; + obj.tracer_provider = Some(provider.clone()); + global::set_text_map_propagator(TraceContextPropagator::new()); + global::set_tracer_provider(provider.clone()); + } + Ok(obj) + } + + fn config(&self) -> &Config { + &self.config + } +} + +impl Observability for ObservabilityImpl { + fn shutdown(&mut self) -> Result<()> { + if !self.config.enabled { + return Ok(()); + } + + if self.config.meter.enabled { + if let Some(ref provider) = self.meter_provider { + provider.force_flush()?; + provider.shutdown()?; + } + } + + if self.config.meter.enabled { + if let Some(ref provider) = self.tracer_provider { + for result in provider.force_flush() { + result?; + } + shutdown_tracer_provider(); + } + } + Ok(()) + } +}