Skip to content

Commit

Permalink
fix(sources): always emit HttpBytesReceived after decompression (#19048)
Browse files Browse the repository at this point in the history
* fix(sources): always emit HttpBytesReceived after decompression

* remove breaking change

* add upgrade guide entry
  • Loading branch information
dsmith3197 authored and neuronull committed Nov 16, 2023
1 parent 52c3648 commit 0fffcb8
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub(crate) fn build_warp_filter(
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::body::bytes())
.and_then(move |encoding_header: Option<String>, body: Bytes| {
let events = decode(&encoding_header, body).and_then(|body| {
let events = decode(encoding_header.as_deref(), body).and_then(|body| {
bytes_received.emit(ByteSize(body.len()));
decode_body(body, log_namespace, &events_received)
});
Expand Down
18 changes: 7 additions & 11 deletions src/sources/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,18 @@ impl RemoteWriteSource {
}

impl HttpSource for RemoteWriteSource {
fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
// Default to snappy decoding the request body.
decode(encoding_header.or(Some("snappy")), body)
}

fn build_events(
&self,
mut body: Bytes,
header_map: &HeaderMap,
body: Bytes,
_header_map: &HeaderMap,
_query_parameters: &HashMap<String, String>,
_full_path: &str,
) -> Result<Vec<Event>, ErrorMessage> {
// If `Content-Encoding` header isn't `snappy` HttpSource won't decode it for us
// se we need to.
if header_map
.get("Content-Encoding")
.map(|header| header.as_ref())
!= Some(&b"snappy"[..])
{
body = decode(&Some("snappy".to_string()), body)?;
}
let events = self.decode_body(body)?;
Ok(events)
}
Expand Down
16 changes: 8 additions & 8 deletions src/sources/splunk_hec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,26 +351,26 @@ impl SplunkSource {
let mut out = out.clone();
let idx_ack = idx_ack.clone();
let events_received = events_received.clone();
emit!(HttpBytesReceived {
byte_size: body.len(),
http_path: path.as_str(),
protocol,
});

async move {
if idx_ack.is_some() && channel.is_none() {
return Err(Rejection::from(ApiError::MissingChannel));
}

let mut data = Vec::new();
let body = if gzip {
let (byte_size, body) = if gzip {
MultiGzDecoder::new(body.reader())
.read_to_end(&mut data)
.map_err(|_| Rejection::from(ApiError::BadRequest))?;
String::from_utf8_lossy(data.as_slice())
(data.len(), String::from_utf8_lossy(data.as_slice()))
} else {
String::from_utf8_lossy(body.as_ref())
(body.len(), String::from_utf8_lossy(body.as_ref()))
};
emit!(HttpBytesReceived {
byte_size,
http_path: path.as_str(),
protocol,
});

let (batch, receiver) =
BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
Expand Down
2 changes: 1 addition & 1 deletion src/sources/util/http/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use warp::http::StatusCode;
use super::error::ErrorMessage;
use crate::internal_events::HttpDecompressError;

pub fn decode(header: &Option<String>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
if let Some(encodings) = header {
for encoding in encodings.rsplit(',').map(str::trim) {
body = match encoding {
Expand Down
19 changes: 11 additions & 8 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
path: &str,
) -> Result<Vec<Event>, ErrorMessage>;

fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
decode(encoding_header, body)
}

#[allow(clippy::too_many_arguments)]
fn run(
self,
Expand Down Expand Up @@ -123,23 +127,22 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
.and_then(
move |path: FullPath,
auth_header,
encoding_header,
encoding_header: Option<String>,
headers: HeaderMap,
body: Bytes,
query_parameters: HashMap<String, String>| {
debug!(message = "Handling HTTP request.", headers = ?headers);
let http_path = path.as_str();

emit!(HttpBytesReceived {
byte_size: body.len(),
http_path,
protocol,
});

let events = auth
.is_valid(&auth_header)
.and_then(|()| decode(&encoding_header, body))
.and_then(|()| self.decode(encoding_header.as_deref(), body))
.and_then(|body| {
emit!(HttpBytesReceived {
byte_size: body.len(),
http_path,
protocol,
});
self.build_events(body, &headers, &query_parameters, path.as_str())
})
.map(|mut events| {
Expand Down
11 changes: 11 additions & 0 deletions website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ Vector's 0.35.0 release includes **deprecations**:

1. [Deprecation of `file` internal metric tag for file-based components](#deprecate-file-tag)

and **potentially impactful changes**:

1. [Update `component_received_bytes_total` to correctly report decompressed bytes for all sources](#component-received-bytes-total)

We cover them below to help you upgrade quickly:

## Upgrade guide
Expand All @@ -26,3 +30,10 @@ File-based components (file source, Kubernetes logs source, file sink) now inclu
component's corresponding internal metrics. This config option defaults to `true` for now to retain the
existing behavior. In the next release, the config option will be updated to default to `false`, as this
`tag` is likely to be of high cardinality.

### Potentially impactful changes

#### Update `component_received_bytes_total` to correctly report decompressed bytes for all sources {#component-received-bytes-total}

The Heroku Logs, HTTP Server, Prometheus Remote Write, and Splunk HEC sources now correctly report decompressed bytes,
rather than compressed bytes, for the `component_received_bytes_total` internal metric.

0 comments on commit 0fffcb8

Please sign in to comment.