Skip to content

Commit

Permalink
enhancement(http_server and heroku_logs sources): Added wildcard supp…
Browse files Browse the repository at this point in the history
…ort for query parameters (#21375)

* Added wildcard support for query parameters in http_server and heroku_logs sources

* Fixed unsafe unwrap by parsing glob patterns sooner

* Added changelog fragment

* Fixed changelog filename

* Fixed formatting
  • Loading branch information
uricorin authored Oct 7, 2024
1 parent 43eaf93 commit b58e1b2
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added wildcard support for `query_parameters` setting in `http_server` and `heroku_logs` sources.

authors: uricorin
70 changes: 64 additions & 6 deletions src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ use crate::{
http::KeepaliveConfig,
internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
serde::{bool_or_struct, default_decoding, default_framing_message_based},
sources::util::{
http::{add_query_parameters, HttpMethod},
ErrorMessage, HttpSource, HttpSourceAuthConfig,
sources::{
http_server::{build_param_matcher, remove_duplicates, HttpConfigParamKind},
util::{
http::{add_query_parameters, HttpMethod},
ErrorMessage, HttpSource, HttpSourceAuthConfig,
},
},
tls::TlsEnableableConfig,
};
Expand All @@ -51,9 +54,16 @@ pub struct LogplexConfig {

/// A list of URL query parameters to include in the log event.
///
/// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
///
/// Specifying "*" results in all query parameters included in the log event.
///
/// These override any values included in the body with conflicting names.
#[serde(default)]
#[configurable(metadata(docs::examples = "application", docs::examples = "source"))]
#[configurable(metadata(docs::examples = "application"))]
#[configurable(metadata(docs::examples = "source"))]
#[configurable(metadata(docs::examples = "param*"))]
#[configurable(metadata(docs::examples = "*"))]
query_parameters: Vec<String>,

#[configurable(derived)]
Expand Down Expand Up @@ -173,7 +183,10 @@ impl SourceConfig for LogplexConfig {
.build()?;

let source = LogplexSource {
query_parameters: self.query_parameters.clone(),
query_parameters: build_param_matcher(&remove_duplicates(
self.query_parameters.clone(),
"query_parameters",
))?,
decoder,
log_namespace,
};
Expand Down Expand Up @@ -213,7 +226,7 @@ impl SourceConfig for LogplexConfig {

#[derive(Clone, Default)]
struct LogplexSource {
query_parameters: Vec<String>,
query_parameters: Vec<HttpConfigParamKind>,
decoder: Decoder,
log_namespace: LogNamespace,
}
Expand Down Expand Up @@ -549,6 +562,51 @@ mod tests {
}).await;
}

#[tokio::test]
async fn logplex_query_parameters_wildcard() {
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let auth = make_auth();

let (rx, addr) = source(
Some(auth.clone()),
vec!["*".to_string()],
EventStatus::Delivered,
true,
)
.await;

let mut events = spawn_collect_n(
async move {
assert_eq!(
200,
send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
)
},
rx,
SAMPLE_BODY.lines().count(),
)
.await;

let event = events.remove(0);
let log = event.as_log();

assert_eq!(
*log.get_message().unwrap(),
r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
);
assert_eq!(
log[log_schema().timestamp_key().unwrap().to_string()],
"2020-01-08T22:33:57.353034+00:00"
.parse::<DateTime<Utc>>()
.unwrap()
.into()
);
assert_eq!(*log.get_host().unwrap(), "host".into());
assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
assert_eq!(log["appname"], "lumberjack-store".into());
}).await;
}

#[tokio::test]
async fn logplex_handles_failures() {
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
Expand Down
57 changes: 52 additions & 5 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,16 @@ pub struct SimpleHttpConfig {

/// A list of URL query parameters to include in the log event.
///
/// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
///
/// Specifying "*" results in all query parameters included in the log event.
///
/// These override any values included in the body with conflicting names.
#[serde(default)]
#[configurable(metadata(docs::examples = "application"))]
#[configurable(metadata(docs::examples = "source"))]
#[configurable(metadata(docs::examples = "param*"))]
#[configurable(metadata(docs::examples = "*"))]
query_parameters: Vec<String>,

#[configurable(derived)]
Expand Down Expand Up @@ -306,7 +312,7 @@ const fn default_http_response_code() -> StatusCode {
}

/// Removes duplicates from the list, and logs a `warn!()` for each duplicate removed.
fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
list.sort();

let mut dedup = false;
Expand All @@ -332,12 +338,12 @@ fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
}

#[derive(Clone)]
enum HttpConfigParamKind {
pub enum HttpConfigParamKind {
Glob(glob::Pattern),
Exact(String),
}

fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
list.iter()
.map(|s| match s.contains('*') {
true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
Expand All @@ -355,7 +361,10 @@ impl SourceConfig for SimpleHttpConfig {

let source = SimpleHttpSource {
headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
query_parameters: remove_duplicates(self.query_parameters.clone(), "query_parameters"),
query_parameters: build_param_matcher(&remove_duplicates(
self.query_parameters.clone(),
"query_parameters",
))?,
path_key: self.path_key.clone(),
host_key: self.host_key.clone(),
decoder,
Expand Down Expand Up @@ -403,7 +412,7 @@ impl SourceConfig for SimpleHttpConfig {
#[derive(Clone)]
struct SimpleHttpSource {
headers: Vec<HttpConfigParamKind>,
query_parameters: Vec<String>,
query_parameters: Vec<HttpConfigParamKind>,
path_key: OptionalValuePath,
host_key: OptionalValuePath,
decoder: Decoder,
Expand Down Expand Up @@ -1188,6 +1197,44 @@ mod tests {
}
}

#[tokio::test]
async fn http_query_wildcard() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec!["*".to_string()],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;

spawn_ok_collect_n(
send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging&region=gb"),
rx,
1,
)
.await
})
.await;

{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["source"], "staging".into());
assert_eq!(log["region"], "gb".into());
assert_event_metadata(log).await;
}
}

#[tokio::test]
async fn http_gzip_deflate() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
Expand Down
120 changes: 105 additions & 15 deletions src/sources/util/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,76 @@ use vector_lib::{
event::Event,
};

use crate::sources::http_server::HttpConfigParamKind;

pub fn add_query_parameters(
events: &mut [Event],
query_parameters_config: &[String],
query_parameters_config: &[HttpConfigParamKind],
query_parameters: &HashMap<String, String>,
log_namespace: LogNamespace,
source_name: &'static str,
) {
for query_parameter_name in query_parameters_config {
let value = query_parameters.get(query_parameter_name);
for event in events.iter_mut() {
if let Event::Log(log) = event {
log_namespace.insert_source_metadata(
source_name,
log,
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
path!("query_parameters", query_parameter_name),
crate::event::Value::from(value.map(String::to_owned)),
);
for qp in query_parameters_config {
match qp {
// Add each non-wildcard containing query_parameter that was specified
// in the `query_parameters` config option to the event if an exact match
// is found.
HttpConfigParamKind::Exact(query_parameter_name) => {
let value = query_parameters.get(query_parameter_name);

for event in events.iter_mut() {
if let Event::Log(log) = event {
log_namespace.insert_source_metadata(
source_name,
log,
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
path!("query_parameters", query_parameter_name),
crate::event::Value::from(value.map(String::to_owned)),
);
}
}
}
// Add all query_parameters that match against wildcard pattens specified
// in the `query_parameters` config option to the event.
HttpConfigParamKind::Glob(query_parameter_pattern) => {
for query_parameter_name in query_parameters.keys() {
if query_parameter_pattern
.matches_with(query_parameter_name.as_str(), glob::MatchOptions::default())
{
let value = query_parameters.get(query_parameter_name);

for event in events.iter_mut() {
if let Event::Log(log) = event {
log_namespace.insert_source_metadata(
source_name,
log,
Some(LegacyKey::Overwrite(path!(query_parameter_name))),
path!("query_parameters", query_parameter_name),
crate::event::Value::from(value.map(String::to_owned)),
);
}
}
}
}
}
}
};
}
}

#[cfg(test)]
mod tests {
use crate::event::LogEvent;
use crate::sources::util::add_query_parameters;
use crate::sources::{http_server::HttpConfigParamKind, util::add_query_parameters};

use vector_lib::config::LogNamespace;
use vrl::{path, value};

#[test]
fn multiple_query_params() {
let query_params_names = ["param1".into(), "param2".into()];
let query_params_names = [
HttpConfigParamKind::Exact("param1".into()),
HttpConfigParamKind::Exact("param2".into()),
];
let query_params = [
("param1".into(), "value1".into()),
("param2".into(), "value2".into()),
Expand Down Expand Up @@ -72,4 +109,57 @@ mod tests {
.unwrap()
);
}
#[test]
fn multiple_query_params_wildcard() {
let query_params_names = [HttpConfigParamKind::Glob(glob::Pattern::new("*").unwrap())];
let query_params = [
("param1".into(), "value1".into()),
("param2".into(), "value2".into()),
("param3".into(), "value3".into()),
]
.into();

let mut base_log = [LogEvent::from(value!({})).into()];
add_query_parameters(
&mut base_log,
&query_params_names,
&query_params,
LogNamespace::Legacy,
"test",
);
let mut namespaced_log = [LogEvent::from(value!({})).into()];
add_query_parameters(
&mut namespaced_log,
&query_params_names,
&query_params,
LogNamespace::Vector,
"test",
);

let log = base_log[0].as_log();
assert_eq!(
log.value(),
namespaced_log[0]
.metadata()
.value()
.get(path!("test", "query_parameters"))
.unwrap(),
"Checking legacy and namespaced log contain query parameters string"
);
assert_eq!(
log["param1"],
"value1".into(),
"Checking log contains first query parameter"
);
assert_eq!(
log["param2"],
"value2".into(),
"Checking log contains second query parameter"
);
assert_eq!(
log["param3"],
"value3".into(),
"Checking log contains third query parameter"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,16 @@ base: components: sources: heroku_logs: configuration: {
description: """
A list of URL query parameters to include in the log event.
Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
Specifying "*" results in all query parameters included in the log event.
These override any values included in the body with conflicting names.
"""
required: false
type: array: {
default: []
items: type: string: examples: ["application", "source"]
items: type: string: examples: ["application", "source", "param*", "*"]
}
}
tls: {
Expand Down
Loading

0 comments on commit b58e1b2

Please sign in to comment.