diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 1d5e91b839b74..fe85dc04b8e76 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -84,7 +84,7 @@ pub(crate) fn build_warp_filter( .and(warp::header::optional::("content-encoding")) .and(warp::body::bytes()) .and_then(move |encoding_header: Option, body: Bytes| { - let events = decode(&encoding_header, body).and_then(|body| { + let events = decode(encoding_header.as_deref(), body).and_then(|body| { bytes_received.emit(ByteSize(body.len())); decode_body(body, log_namespace, &events_received) }); diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 523df670ec5b8..d619b9e7a03f9 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -122,22 +122,18 @@ impl RemoteWriteSource { } impl HttpSource for RemoteWriteSource { + fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result { + // Default to snappy decoding the request body. + decode(encoding_header.or(Some("snappy")), body) + } + fn build_events( &self, - mut body: Bytes, - header_map: &HeaderMap, + body: Bytes, + _header_map: &HeaderMap, _query_parameters: &HashMap, _full_path: &str, ) -> Result, ErrorMessage> { - // If `Content-Encoding` header isn't `snappy` HttpSource won't decode it for us - // se we need to. - if header_map - .get("Content-Encoding") - .map(|header| header.as_ref()) - != Some(&b"snappy"[..]) - { - body = decode(&Some("snappy".to_string()), body)?; - } let events = self.decode_body(body)?; Ok(events) } diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index 745e419c888ea..592c79e1e3ef6 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -351,11 +351,6 @@ impl SplunkSource { let mut out = out.clone(); let idx_ack = idx_ack.clone(); let events_received = events_received.clone(); - emit!(HttpBytesReceived { - byte_size: body.len(), - http_path: path.as_str(), - protocol, - }); async move { if idx_ack.is_some() && channel.is_none() { @@ -363,14 +358,19 @@ impl SplunkSource { } let mut data = Vec::new(); - let body = if gzip { + let (byte_size, body) = if gzip { MultiGzDecoder::new(body.reader()) .read_to_end(&mut data) .map_err(|_| Rejection::from(ApiError::BadRequest))?; - String::from_utf8_lossy(data.as_slice()) + (data.len(), String::from_utf8_lossy(data.as_slice())) } else { - String::from_utf8_lossy(body.as_ref()) + (body.len(), String::from_utf8_lossy(body.as_ref())) }; + emit!(HttpBytesReceived { + byte_size, + http_path: path.as_str(), + protocol, + }); let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(idx_ack.is_some()); diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index be3b8f8a6f076..39051f67acd82 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -8,7 +8,7 @@ use warp::http::StatusCode; use super::error::ErrorMessage; use crate::internal_events::HttpDecompressError; -pub fn decode(header: &Option, mut body: Bytes) -> Result { +pub fn decode(header: Option<&str>, mut body: Bytes) -> Result { if let Some(encodings) = header { for encoding in encodings.rsplit(',').map(str::trim) { body = match encoding { diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 1523427f392a4..391964a219681 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -65,6 +65,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static { path: &str, ) -> Result, ErrorMessage>; + fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result { + decode(encoding_header, body) + } + #[allow(clippy::too_many_arguments)] fn run( self, @@ -123,23 +127,22 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .and_then( move |path: FullPath, auth_header, - encoding_header, + encoding_header: Option, headers: HeaderMap, body: Bytes, query_parameters: HashMap| { debug!(message = "Handling HTTP request.", headers = ?headers); let http_path = path.as_str(); - emit!(HttpBytesReceived { - byte_size: body.len(), - http_path, - protocol, - }); - let events = auth .is_valid(&auth_header) - .and_then(|()| decode(&encoding_header, body)) + .and_then(|()| self.decode(encoding_header.as_deref(), body)) .and_then(|body| { + emit!(HttpBytesReceived { + byte_size: body.len(), + http_path, + protocol, + }); self.build_events(body, &headers, &query_parameters, path.as_str()) }) .map(|mut events| { diff --git a/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md b/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md index 9d7b8734af4df..2b3ecd24d66f8 100644 --- a/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md +++ b/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md @@ -16,6 +16,7 @@ Vector's 0.35.0 release includes **deprecations**: and **potentially impactful changes**: 1. [Update `component_sent_bytes_total` to correctly report uncompressed bytes for all sinks](#component-sent-bytes-total) +1. [Update `component_received_bytes_total` to correctly report decompressed bytes for all sources](#component-received-bytes-total) We cover them below to help you upgrade quickly: @@ -37,3 +38,8 @@ existing behavior. In the next release, the config option will be updated to def The AppSignal, Datadog Metrics, GreptimeDB, GCP Cloud Monitoring, Honeycomb, and HTTP sinks now correctly report uncompressed bytes, rather than compressed bytes, for the `component_sent_bytes_total` internal metric. + +#### Update `component_received_bytes_total` to correctly report decompressed bytes for all sources {#component-received-bytes-total} + +The Heroku Logs, HTTP Server, Prometheus Remote Write, and Splunk HEC sources now correctly report decompressed bytes, +rather than compressed bytes, for the `component_received_bytes_total` internal metric.