From 40305f164fb4e8f6976b63d6723c50d67d6b1af6 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Fri, 10 Nov 2023 14:30:32 -0500 Subject: [PATCH] fix(sources): always emit HttpBytesReceived after decompression (#19048) * fix(sources): always emit HttpBytesReceived after decompression * remove breaking change * add upgrade guide entry --- src/sources/opentelemetry/http.rs | 2 +- src/sources/prometheus/remote_write.rs | 18 +++++++----------- src/sources/splunk_hec/mod.rs | 16 ++++++++-------- src/sources/util/http/encoding.rs | 2 +- src/sources/util/http/prelude.rs | 19 +++++++++++-------- .../2023-12-19-0-35-0-upgrade-guide.md | 11 +++++++++++ 6 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 1d5e91b839b74..fe85dc04b8e76 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -84,7 +84,7 @@ pub(crate) fn build_warp_filter( .and(warp::header::optional::("content-encoding")) .and(warp::body::bytes()) .and_then(move |encoding_header: Option, body: Bytes| { - let events = decode(&encoding_header, body).and_then(|body| { + let events = decode(encoding_header.as_deref(), body).and_then(|body| { bytes_received.emit(ByteSize(body.len())); decode_body(body, log_namespace, &events_received) }); diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 523df670ec5b8..d619b9e7a03f9 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -122,22 +122,18 @@ impl RemoteWriteSource { } impl HttpSource for RemoteWriteSource { + fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result { + // Default to snappy decoding the request body. + decode(encoding_header.or(Some("snappy")), body) + } + fn build_events( &self, - mut body: Bytes, - header_map: &HeaderMap, + body: Bytes, + _header_map: &HeaderMap, _query_parameters: &HashMap, _full_path: &str, ) -> Result, ErrorMessage> { - // If `Content-Encoding` header isn't `snappy` HttpSource won't decode it for us - // se we need to. - if header_map - .get("Content-Encoding") - .map(|header| header.as_ref()) - != Some(&b"snappy"[..]) - { - body = decode(&Some("snappy".to_string()), body)?; - } let events = self.decode_body(body)?; Ok(events) } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 745e419c888ea..592c79e1e3ef6 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -351,11 +351,6 @@ impl SplunkSource { let mut out = out.clone(); let idx_ack = idx_ack.clone(); let events_received = events_received.clone(); - emit!(HttpBytesReceived { - byte_size: body.len(), - http_path: path.as_str(), - protocol, - }); async move { if idx_ack.is_some() && channel.is_none() { @@ -363,14 +358,19 @@ impl SplunkSource { } let mut data = Vec::new(); - let body = if gzip { + let (byte_size, body) = if gzip { MultiGzDecoder::new(body.reader()) .read_to_end(&mut data) .map_err(|_| Rejection::from(ApiError::BadRequest))?; - String::from_utf8_lossy(data.as_slice()) + (data.len(), String::from_utf8_lossy(data.as_slice())) } else { - String::from_utf8_lossy(body.as_ref()) + (body.len(), String::from_utf8_lossy(body.as_ref())) }; + emit!(HttpBytesReceived { + byte_size, + http_path: path.as_str(), + protocol, + }); let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(idx_ack.is_some()); diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index be3b8f8a6f076..39051f67acd82 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -8,7 +8,7 @@ use warp::http::StatusCode; use super::error::ErrorMessage; use crate::internal_events::HttpDecompressError; -pub fn decode(header: &Option, mut body: Bytes) -> Result { +pub fn decode(header: Option<&str>, mut body: Bytes) -> Result { if let Some(encodings) = header { for encoding in encodings.rsplit(',').map(str::trim) { body = match encoding { diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 1523427f392a4..391964a219681 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -65,6 +65,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static { path: &str, ) -> Result, ErrorMessage>; + fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result { + decode(encoding_header, body) + } + #[allow(clippy::too_many_arguments)] fn run( self, @@ -123,23 +127,22 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .and_then( move |path: FullPath, auth_header, - encoding_header, + encoding_header: Option, headers: HeaderMap, body: Bytes, query_parameters: HashMap| { debug!(message = "Handling HTTP request.", headers = ?headers); let http_path = path.as_str(); - emit!(HttpBytesReceived { - byte_size: body.len(), - http_path, - protocol, - }); - let events = auth .is_valid(&auth_header) - .and_then(|()| decode(&encoding_header, body)) + .and_then(|()| self.decode(encoding_header.as_deref(), body)) .and_then(|body| { + emit!(HttpBytesReceived { + byte_size: body.len(), + http_path, + protocol, + }); self.build_events(body, &headers, &query_parameters, path.as_str()) }) .map(|mut events| { diff --git a/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md b/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md index 2ec70b3c7e9b1..22592e25a92ab 100644 --- a/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md +++ b/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md @@ -13,6 +13,10 @@ Vector's 0.35.0 release includes **deprecations**: 1. [Deprecation of `file` internal metric tag for file-based components](#deprecate-file-tag) +and **potentially impactful changes**: + +1. [Update `component_received_bytes_total` to correctly report decompressed bytes for all sources](#component-received-bytes-total) + We cover them below to help you upgrade quickly: ## Upgrade guide @@ -26,3 +30,10 @@ File-based components (file source, Kubernetes logs source, file sink) now inclu component's corresponding internal metrics. This config option defaults to `true` for now to retain the existing behavior. In the next release, the config option will be updated to default to `false`, as this `tag` is likely to be of high cardinality. + +### Potentially impactful changes + +#### Update `component_received_bytes_total` to correctly report decompressed bytes for all sources {#component-received-bytes-total} + +The Heroku Logs, HTTP Server, Prometheus Remote Write, and Splunk HEC sources now correctly report decompressed bytes, +rather than compressed bytes, for the `component_received_bytes_total` internal metric.