Skip to content

Commit

Permalink
Merge branch 'master' into dougsmith/uncompressed-sent-bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmith3197 authored Nov 10, 2023
2 parents e60fa9a + 40305f1 commit 59f85fe
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 29 deletions.
2 changes: 1 addition & 1 deletion src/sources/opentelemetry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub(crate) fn build_warp_filter(
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::body::bytes())
.and_then(move |encoding_header: Option<String>, body: Bytes| {
let events = decode(&encoding_header, body).and_then(|body| {
let events = decode(encoding_header.as_deref(), body).and_then(|body| {
bytes_received.emit(ByteSize(body.len()));
decode_body(body, log_namespace, &events_received)
});
Expand Down
18 changes: 7 additions & 11 deletions src/sources/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,18 @@ impl RemoteWriteSource {
}

impl HttpSource for RemoteWriteSource {
fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
// Default to snappy decoding the request body.
decode(encoding_header.or(Some("snappy")), body)
}

fn build_events(
&self,
mut body: Bytes,
header_map: &HeaderMap,
body: Bytes,
_header_map: &HeaderMap,
_query_parameters: &HashMap<String, String>,
_full_path: &str,
) -> Result<Vec<Event>, ErrorMessage> {
// If `Content-Encoding` header isn't `snappy` HttpSource won't decode it for us
// se we need to.
if header_map
.get("Content-Encoding")
.map(|header| header.as_ref())
!= Some(&b"snappy"[..])
{
body = decode(&Some("snappy".to_string()), body)?;
}
let events = self.decode_body(body)?;
Ok(events)
}
Expand Down
16 changes: 8 additions & 8 deletions src/sources/splunk_hec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,26 +351,26 @@ impl SplunkSource {
let mut out = out.clone();
let idx_ack = idx_ack.clone();
let events_received = events_received.clone();
emit!(HttpBytesReceived {
byte_size: body.len(),
http_path: path.as_str(),
protocol,
});

async move {
if idx_ack.is_some() && channel.is_none() {
return Err(Rejection::from(ApiError::MissingChannel));
}

let mut data = Vec::new();
let body = if gzip {
let (byte_size, body) = if gzip {
MultiGzDecoder::new(body.reader())
.read_to_end(&mut data)
.map_err(|_| Rejection::from(ApiError::BadRequest))?;
String::from_utf8_lossy(data.as_slice())
(data.len(), String::from_utf8_lossy(data.as_slice()))
} else {
String::from_utf8_lossy(body.as_ref())
(body.len(), String::from_utf8_lossy(body.as_ref()))
};
emit!(HttpBytesReceived {
byte_size,
http_path: path.as_str(),
protocol,
});

let (batch, receiver) =
BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
Expand Down
2 changes: 1 addition & 1 deletion src/sources/util/http/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use warp::http::StatusCode;
use super::error::ErrorMessage;
use crate::internal_events::HttpDecompressError;

pub fn decode(header: &Option<String>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
if let Some(encodings) = header {
for encoding in encodings.rsplit(',').map(str::trim) {
body = match encoding {
Expand Down
19 changes: 11 additions & 8 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
path: &str,
) -> Result<Vec<Event>, ErrorMessage>;

fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
decode(encoding_header, body)
}

#[allow(clippy::too_many_arguments)]
fn run(
self,
Expand Down Expand Up @@ -123,23 +127,22 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
.and_then(
move |path: FullPath,
auth_header,
encoding_header,
encoding_header: Option<String>,
headers: HeaderMap,
body: Bytes,
query_parameters: HashMap<String, String>| {
debug!(message = "Handling HTTP request.", headers = ?headers);
let http_path = path.as_str();

emit!(HttpBytesReceived {
byte_size: body.len(),
http_path,
protocol,
});

let events = auth
.is_valid(&auth_header)
.and_then(|()| decode(&encoding_header, body))
.and_then(|()| self.decode(encoding_header.as_deref(), body))
.and_then(|body| {
emit!(HttpBytesReceived {
byte_size: body.len(),
http_path,
protocol,
});
self.build_events(body, &headers, &query_parameters, path.as_str())
})
.map(|mut events| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Vector's 0.35.0 release includes **deprecations**:
and **potentially impactful changes**:

1. [Update `component_sent_bytes_total` to correctly report uncompressed bytes for all sinks](#component-sent-bytes-total)
1. [Update `component_received_bytes_total` to correctly report decompressed bytes for all sources](#component-received-bytes-total)

We cover them below to help you upgrade quickly:

Expand All @@ -37,3 +38,8 @@ existing behavior. In the next release, the config option will be updated to def

The AppSignal, Datadog Metrics, GreptimeDB, GCP Cloud Monitoring, Honeycomb, and HTTP sinks now correctly
report uncompressed bytes, rather than compressed bytes, for the `component_sent_bytes_total` internal metric.

#### Update `component_received_bytes_total` to correctly report decompressed bytes for all sources {#component-received-bytes-total}

The Heroku Logs, HTTP Server, Prometheus Remote Write, and Splunk HEC sources now correctly report decompressed bytes,
rather than compressed bytes, for the `component_received_bytes_total` internal metric.

0 comments on commit 59f85fe

Please sign in to comment.