Skip to content

Commit

Permalink
Capture files will now exclude metrics that have not been emitted wit…
Browse files Browse the repository at this point in the history
…hin the last 2 seconds
  • Loading branch information
scottopell committed Sep 24, 2024
1 parent 7423899 commit 1843634
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tokio = { version = "1.40" }
tracing = { version = "0.1" }
uuid = { version = "1.6", default-features = false, features = ["v4", "serde"] }
once_cell = { version = "1.19" }
quanta = { version = "0.12", default-features = false, features = [] }

[profile.release]
lto = true # Optimize our binary at link stage.
Expand Down
1 change: 1 addition & 0 deletions lading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ tracing = { workspace = true }
tracing-subscriber = { version = "0.3", features = ["std", "env-filter"] }
uuid = { workspace = true }
zstd = "0.13.1"
quanta = { workspace = true }

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs = { version = "0.3", default-features = false, features = [] }
Expand Down
40 changes: 35 additions & 5 deletions lading/src/captures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::{
};

use lading_capture::json;
use metrics_util::registry::{AtomicStorage, Registry};
use metrics_util::{
registry::{GenerationalAtomicStorage, Recency, Registry},
MetricKindMask,
};
use rustc_hash::FxHashMap;
use tracing::{debug, info, warn};
use uuid::Uuid;
Expand All @@ -42,7 +45,8 @@ pub enum Error {
}

struct Inner {
registry: Registry<metrics::Key, AtomicStorage>,
registry: Registry<metrics::Key, GenerationalAtomicStorage>,
recency: Recency<metrics::Key>,
}

#[allow(missing_debug_implementations)]
Expand Down Expand Up @@ -86,7 +90,12 @@ impl CaptureManager {
_experiment_started: experiment_started,
target_running,
inner: Arc::new(Inner {
registry: Registry::atomic(),
registry: Registry::new(GenerationalAtomicStorage::atomic()),
recency: Recency::new(
quanta::Clock::new(),
MetricKindMask::COUNTER | MetricKindMask::GAUGE,
Some(Duration::from_secs(2)),
),
}),
global_labels: FxHashMap::default(),
})
Expand Down Expand Up @@ -120,6 +129,17 @@ impl CaptureManager {
self.inner
.registry
.visit_counters(|key: &metrics::Key, counter| {
let gen = counter.get_generation();

if !self
.inner
.recency
.should_store_counter(key, gen, &self.inner.registry)
{
// Skip this metric, its too old
return;
}

let mut labels = self.global_labels.clone();
for lbl in key.labels() {
// TODO we're allocating the same small strings over and over most likely
Expand All @@ -131,20 +151,30 @@ impl CaptureManager {
fetch_index: self.fetch_index,
metric_name: key.name().into(),
metric_kind: json::MetricKind::Counter,
value: json::LineValue::Int(counter.load(Ordering::Relaxed)),
value: json::LineValue::Int(counter.get_inner().load(Ordering::Relaxed)),
labels,
};
lines.push(line);
});
self.inner
.registry
.visit_gauges(|key: &metrics::Key, gauge| {
let gen = gauge.get_generation();

if !self
.inner
.recency
.should_store_gauge(key, gen, &self.inner.registry)
{
// Skip this metric, its too old
return;
}
let mut labels = self.global_labels.clone();
for lbl in key.labels() {
// TODO we're allocating the same small strings over and over most likely
labels.insert(lbl.key().into(), lbl.value().into());
}
let value: f64 = f64::from_bits(gauge.load(Ordering::Relaxed));
let value: f64 = f64::from_bits(gauge.get_inner().load(Ordering::Relaxed));
let line = json::Line {
run_id: self.run_id,
time: now_ms,
Expand Down

0 comments on commit 1843634

Please sign in to comment.