From cddb83570d8accf5b528254f53502b00385ee268 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 24 Oct 2023 13:57:18 +0000 Subject: [PATCH 01/28] chore(deps): Bump the clap group with 1 update (#18906) Bumps the clap group with 1 update: [clap-verbosity-flag](https://github.com/clap-rs/clap-verbosity-flag). - [Changelog](https://github.com/clap-rs/clap-verbosity-flag/blob/master/CHANGELOG.md) - [Commits](https://github.com/clap-rs/clap-verbosity-flag/compare/v2.0.1...v2.1.0) --- updated-dependencies: - dependency-name: clap-verbosity-flag dependency-type: direct:production update-type: version-update:semver-minor dependency-group: clap ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- vdev/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcdf1a92b2d9f..805d01e59721c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2077,9 +2077,9 @@ dependencies = [ [[package]] name = "clap-verbosity-flag" -version = "2.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1eef05769009513df2eb1c3b4613e7fad873a14c600ff025b08f250f59fee7de" +checksum = "e5fdbb015d790cfb378aca82caf9cc52a38be96a7eecdb92f31b4366a8afc019" dependencies = [ "clap 4.4.6", "log", diff --git a/vdev/Cargo.toml b/vdev/Cargo.toml index 54bd6bcfd1bbc..4b0fcc62099c8 100644 --- a/vdev/Cargo.toml +++ b/vdev/Cargo.toml @@ -13,7 +13,7 @@ atty = "0.2.14" cached = "0.46.0" chrono = { version = "0.4.31", default-features = false, features = ["serde", "clock"] } clap = { version = "4.4.6", features = ["derive"] } -clap-verbosity-flag = "2.0.1" +clap-verbosity-flag = "2.1.0" clap_complete = "4.4.3" confy = "0.5.1" directories = "5.0.1" From 249330a3a01b132feea17cd3fbd4b0ed22429524 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 24 Oct 2023 11:31:20 -0600 Subject: [PATCH 02/28] chore(deps): Bump toml from 0.8.3 to 0.8.4 (#18913) Bumps [toml](https://github.com/toml-rs/toml) from 0.8.3 to 0.8.4. - [Commits](https://github.com/toml-rs/toml/compare/toml-v0.8.3...toml-v0.8.4) --- updated-dependencies: - dependency-name: toml dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 24 ++++++++++++------------ Cargo.toml | 2 +- lib/vector-config/Cargo.toml | 2 +- lib/vector-core/Cargo.toml | 4 ++-- vdev/Cargo.toml | 2 +- 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 805d01e59721c..53a279c8c4ad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1872,7 +1872,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3f9629bc6c4388ea699781dc988c2b99766d7679b151c81990b4fa1208fafd3" dependencies = [ "serde", - "toml 0.8.3", + "toml 0.8.4", ] [[package]] @@ -7813,7 +7813,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af5ae5f42c16d60b098ae5d4afd75c1d3b6512e6ca5d0b9b916e2ced30df264c" dependencies = [ - "toml 0.8.3", + "toml 0.8.4", ] [[package]] @@ -8936,9 +8936,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b150d2f463da7b52f12110d3995dc86598bf90d535e929e5f5af15ab89155011" +checksum = "2ef75d881185fd2df4a040793927c153d863651108a93c7e17a9e591baa95cc6" dependencies = [ "serde", "serde_spanned", @@ -8948,18 +8948,18 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51cc078118ed25af325985ff674c00c8416b0f962be67da4946854ebfc99f334" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" dependencies = [ "serde", ] [[package]] name = "toml_edit" -version = "0.20.3" +version = "0.20.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a2534c1aa199edef7108fb7d970facaa17f8f8cc5ce6bde75372cfa1051ed91" +checksum = "380f9e8120405471f7c9ad1860a713ef5ece6a670c7eae39225e477340f32fc4" dependencies = [ "indexmap 2.0.2", "serde", @@ -9672,7 +9672,7 @@ dependencies = [ "serde_yaml 0.9.25", "sha2", "tempfile", - "toml 0.8.3", + "toml 0.8.4", ] [[package]] @@ -9848,7 +9848,7 @@ dependencies = [ "tokio-test", "tokio-tungstenite", "tokio-util", - "toml 0.8.3", + "toml 0.8.4", "tonic 0.10.2", "tonic-build 0.10.2", "tower", @@ -9996,7 +9996,7 @@ dependencies = [ "serde_json", "serde_with 3.4.0", "snafu", - "toml 0.8.3", + "toml 0.8.4", "tracing 0.1.37", "url", "vector-config-common", @@ -10097,7 +10097,7 @@ dependencies = [ "tokio-stream", "tokio-test", "tokio-util", - "toml 0.8.3", + "toml 0.8.4", "tonic 0.10.2", "tracing 0.1.37", "tracing-core 0.1.30", diff --git a/Cargo.toml b/Cargo.toml index 3d48ecf208b66..7637cac5ff91e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -324,7 +324,7 @@ syslog = { version = "6.1.0", default-features = false, optional = true } tikv-jemallocator = { version = "0.5.4", default-features = false, optional = true } tokio-postgres = { version = "0.7.10", default-features = false, features = ["runtime", "with-chrono-0_4"], optional = true } tokio-tungstenite = {version = "0.20.1", default-features = false, features = ["connect"], optional = true} -toml = { version = "0.8.3", default-features = false, features = ["parse", "display"] } +toml = { version = "0.8.4", default-features = false, features = ["parse", "display"] } tonic = { version = "0.10", optional = true, default-features = false, features = ["transport", "codegen", "prost", "tls", "tls-roots", "gzip"] } trust-dns-proto = { version = "0.23.2", default-features = false, features = ["dnssec"], optional = true } typetag = { version = "0.2.13", default-features = false } diff --git a/lib/vector-config/Cargo.toml b/lib/vector-config/Cargo.toml index 86fde0b7d2c44..3011248797838 100644 --- a/lib/vector-config/Cargo.toml +++ b/lib/vector-config/Cargo.toml @@ -22,7 +22,7 @@ serde = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["std"] } serde_with = { version = "3.4.0", default-features = false, features = ["std"] } snafu = { version = "0.7.5", default-features = false } -toml = { version = "0.8.3", default-features = false } +toml = { version = "0.8.4", default-features = false } tracing = { version = "0.1.34", default-features = false } url = { version = "2.4.1", default-features = false, features = ["serde"] } http = { version = "0.2.9", default-features = false } diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 9b1e4009fef74..c33f3f54485f7 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -50,7 +50,7 @@ tokio = { version = "1.33.0", default-features = false, features = ["net"] } tokio-openssl = { version = "0.6.3", default-features = false } tokio-stream = { version = "0.1", default-features = false, features = ["time"], optional = true } tokio-util = { version = "0.7.0", default-features = false, features = ["time"] } -toml = { version = "0.8.3", default-features = false } +toml = { version = "0.8.4", default-features = false } tonic = { version = "0.10", default-features = false, features = ["transport"] } tracing = { version = "0.1.34", default-features = false } tracing-core = { version = "0.1.26", default-features = false } @@ -84,7 +84,7 @@ quickcheck_macros = "1" proptest = "1.3" similar-asserts = "1.5.0" tokio-test = "0.4.3" -toml = { version = "0.8.3", default-features = false, features = ["parse"] } +toml = { version = "0.8.4", default-features = false, features = ["parse"] } ndarray = "0.15.6" ndarray-stats = "0.5.1" noisy_float = "0.2.0" diff --git a/vdev/Cargo.toml b/vdev/Cargo.toml index 4b0fcc62099c8..ce33910260155 100644 --- a/vdev/Cargo.toml +++ b/vdev/Cargo.toml @@ -37,4 +37,4 @@ serde_json = "1.0.107" serde_yaml = "0.9.25" sha2 = "0.10.8" tempfile = "3.8.0" -toml = { version = "0.8.3", default-features = false, features = ["parse"] } +toml = { version = "0.8.4", default-features = false, features = ["parse"] } From 26f430c77138ef2373e86182966d5d5085b68514 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 24 Oct 2023 14:16:33 -0400 Subject: [PATCH 03/28] fix(amqp sink): remove unnecessary unwrap & emit event dropped errors (#18923) * fix(amqp sink): remove unnecessary unwrap & emit event dropped errors * return error too * fix checks * feedback --- src/internal_events/amqp.rs | 36 ++++++++++++++++++++++++++++++------ src/sinks/amqp/service.rs | 25 ++++++++++--------------- src/sinks/amqp/sink.rs | 1 + 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/src/internal_events/amqp.rs b/src/internal_events/amqp.rs index dd5dde7b8e7b8..a042b86278ed3 100644 --- a/src/internal_events/amqp.rs +++ b/src/internal_events/amqp.rs @@ -105,9 +105,9 @@ pub mod sink { impl InternalEvent for AmqpDeliveryError<'_> { fn emit(self) { - let deliver_reason = "Unable to deliver."; + const DELIVER_REASON: &str = "Unable to deliver."; - error!(message = deliver_reason, + error!(message = DELIVER_REASON, error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, @@ -120,7 +120,7 @@ pub mod sink { ); emit!(ComponentEventsDropped:: { count: 1, - reason: deliver_reason + reason: DELIVER_REASON }); } } @@ -132,9 +132,9 @@ pub mod sink { impl InternalEvent for AmqpAcknowledgementError<'_> { fn emit(self) { - let ack_reason = "Acknowledgement failed."; + const ACK_REASON: &str = "Acknowledgement failed."; - error!(message = ack_reason, + error!(message = ACK_REASON, error = ?self.error, error_type = error_type::REQUEST_FAILED, stage = error_stage::SENDING, @@ -147,7 +147,31 @@ pub mod sink { ); emit!(ComponentEventsDropped:: { count: 1, - reason: ack_reason + reason: ACK_REASON + }); + } + } + + #[derive(Debug)] + pub struct AmqpNackError; + + impl InternalEvent for AmqpNackError { + fn emit(self) { + const DELIVER_REASON: &str = "Received Negative Acknowledgement from AMQP broker."; + error!( + message = DELIVER_REASON, + error_type = error_type::ACKNOWLEDGMENT_FAILED, + stage = error_stage::SENDING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::ACKNOWLEDGMENT_FAILED, + "stage" => error_stage::SENDING, + ); + emit!(ComponentEventsDropped:: { + count: 1, + reason: DELIVER_REASON }); } } diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index 42ccf467e5692..36e2845afa821 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -1,7 +1,7 @@ //! The main tower service that takes the request created by the request builder //! and sends it to `AMQP`. use crate::{ - internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError}, + internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError, AmqpNackError}, sinks::prelude::*, }; use bytes::Bytes; @@ -88,10 +88,13 @@ pub(super) struct AmqpService { #[derive(Debug, Snafu)] pub(super) enum AmqpError { #[snafu(display("Failed retrieving Acknowledgement: {}", error))] - AmqpAcknowledgementFailed { error: lapin::Error }, + AcknowledgementFailed { error: lapin::Error }, #[snafu(display("Failed AMQP request: {}", error))] - AmqpDeliveryFailed { error: lapin::Error }, + DeliveryFailed { error: lapin::Error }, + + #[snafu(display("Received Negative Acknowledgement from AMQP broker."))] + Nack, } impl Service for AmqpService { @@ -109,11 +112,6 @@ impl Service for AmqpService { let channel = Arc::clone(&self.channel); Box::pin(async move { - channel - .confirm_select(lapin::options::ConfirmSelectOptions::default()) - .await - .unwrap(); - let byte_size = req.body.len(); let fut = channel .basic_publish( @@ -128,16 +126,13 @@ impl Service for AmqpService { match fut { Ok(result) => match result.await { Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => { - warn!("Received Negative Acknowledgement from AMQP server."); - Ok(AmqpResponse { - json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), - byte_size, - }) + emit!(AmqpNackError); + Err(AmqpError::Nack) } Err(error) => { // TODO: In due course the caller could emit these on error. emit!(AmqpAcknowledgementError { error: &error }); - Err(AmqpError::AmqpAcknowledgementFailed { error }) + Err(AmqpError::AcknowledgementFailed { error }) } Ok(_) => Ok(AmqpResponse { json_size: req.metadata.into_events_estimated_json_encoded_byte_size(), @@ -147,7 +142,7 @@ impl Service for AmqpService { Err(error) => { // TODO: In due course the caller could emit these on error. emit!(AmqpDeliveryError { error: &error }); - Err(AmqpError::AmqpDeliveryFailed { error }) + Err(AmqpError::DeliveryFailed { error }) } } }) diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 7039c6217993d..922065a58a7a3 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -43,6 +43,7 @@ impl AmqpSink { .await .map_err(|e| BuildError::AmqpCreateFailed { source: e })?; + // Enable confirmations on the channel. channel .confirm_select(ConfirmSelectOptions::default()) .await From bf56ac5b98569902b5a58e94b8afecd846545d14 Mon Sep 17 00:00:00 2001 From: neuronull Date: Tue, 24 Oct 2023 12:52:49 -0600 Subject: [PATCH 04/28] chore(ci): filter team members from gardener issue comment workflow (#18915) * chore(ci): filter team members from gardener issue comment workflow * fix logic --- .github/workflows/gardener_issue_comment.yml | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/.github/workflows/gardener_issue_comment.yml b/.github/workflows/gardener_issue_comment.yml index f25dc9da5e02a..b55221ef32a42 100644 --- a/.github/workflows/gardener_issue_comment.yml +++ b/.github/workflows/gardener_issue_comment.yml @@ -1,7 +1,8 @@ # Gardener Issue Comment # # This workflow moves GH issues from the Gardener board's "Blocked / Waiting" column -# to the "Triage", so that the Gardener can assess the issue in light of new information. +# to "Triage", when a comment is posted on an issue from a non-team member +# so that the Gardener can assess the issue in light of new information. name: Gardener Issue Comment @@ -15,7 +16,23 @@ jobs: runs-on: ubuntu-latest if: contains(github.event.issue.url, 'issues') steps: + - name: Generate authentication token + id: generate_token + uses: tibdex/github-app-token@3beb63f4bd073e61482598c45c71c1019b59b73a + with: + app_id: ${{ secrets.GH_APP_DATADOG_VECTOR_CI_APP_ID }} + private_key: ${{ secrets.GH_APP_DATADOG_VECTOR_CI_APP_PRIVATE_KEY }} + + - name: Get PR comment author + id: comment + uses: tspascoal/get-user-teams-membership@v3 + with: + username: ${{ github.actor }} + team: 'Vector' + GITHUB_TOKEN: ${{ steps.generate_token.outputs.token }} + - name: Move issue back to Triage if status is Blocked/Waiting + if: steps.comment.outputs.isTeamMember == 'false' env: GH_TOKEN: ${{ secrets.GH_PROJECT_PAT }} run: | From 78934c211d085dabe3be4c183b804b05d49303c4 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 24 Oct 2023 15:03:54 -0600 Subject: [PATCH 05/28] chore(deps): Bump the clap group with 2 updates (#18925) Bumps the clap group with 2 updates: [clap](https://github.com/clap-rs/clap) and [clap_complete](https://github.com/clap-rs/clap). Updates `clap` from 4.4.6 to 4.4.7 - [Release notes](https://github.com/clap-rs/clap/releases) - [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md) - [Commits](https://github.com/clap-rs/clap/compare/v4.4.6...v4.4.7) Updates `clap_complete` from 4.4.3 to 4.4.4 - [Release notes](https://github.com/clap-rs/clap/releases) - [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md) - [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.4.3...clap_complete-v4.4.4) --- updated-dependencies: - dependency-name: clap dependency-type: direct:production update-type: version-update:semver-patch dependency-group: clap - dependency-name: clap_complete dependency-type: direct:production update-type: version-update:semver-patch dependency-group: clap ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 40 ++++++++++++++++---------------- Cargo.toml | 2 +- lib/vector-api-client/Cargo.toml | 2 +- lib/vector-buffers/Cargo.toml | 2 +- lib/vector-vrl/cli/Cargo.toml | 2 +- lib/vector-vrl/tests/Cargo.toml | 2 +- vdev/Cargo.toml | 4 ++-- 7 files changed, 27 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53a279c8c4ad7..389d7355930c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2067,9 +2067,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.6" +version = "4.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" +checksum = "ac495e00dcec98c83465d5ad66c5c4fabd652fd6686e7c6269b117e729a6f17b" dependencies = [ "clap_builder", "clap_derive", @@ -2081,15 +2081,15 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5fdbb015d790cfb378aca82caf9cc52a38be96a7eecdb92f31b4366a8afc019" dependencies = [ - "clap 4.4.6", + "clap 4.4.7", "log", ] [[package]] name = "clap_builder" -version = "4.4.6" +version = "4.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" +checksum = "c77ed9a32a62e6ca27175d00d29d05ca32e396ea1eb5fb01d8256b669cec7663" dependencies = [ "anstream", "anstyle 1.0.0", @@ -2100,18 +2100,18 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.4.3" +version = "4.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3ae8ba90b9d8b007efe66e55e48fb936272f5ca00349b5b0e89877520d35ea7" +checksum = "bffe91f06a11b4b9420f62103854e90867812cd5d01557f853c5ee8e791b12ae" dependencies = [ - "clap 4.4.6", + "clap 4.4.7", ] [[package]] name = "clap_derive" -version = "4.4.2" +version = "4.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0862016ff20d69b84ef8247369fabf5c008a7417002411897d40ee1f4532b873" +checksum = "cf9804afaaf59a91e75b022a30fb7229a7901f60c755489cc61c9b423b836442" dependencies = [ "heck 0.4.1", "proc-macro2 1.0.69", @@ -2121,9 +2121,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" +checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1" [[package]] name = "clipboard-win" @@ -2439,7 +2439,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.4.6", + "clap 4.4.7", "criterion-plot", "futures 0.3.28", "is-terminal", @@ -9649,7 +9649,7 @@ dependencies = [ "atty", "cached", "chrono", - "clap 4.4.6", + "clap 4.4.7", "clap-verbosity-flag", "clap_complete", "confy", @@ -9726,7 +9726,7 @@ dependencies = [ "bytesize", "chrono", "cidr-utils", - "clap 4.4.6", + "clap 4.4.7", "codecs", "colored", "console-subscriber", @@ -9888,7 +9888,7 @@ dependencies = [ "anyhow", "async-trait", "chrono", - "clap 4.4.6", + "clap 4.4.7", "futures 0.3.28", "graphql_client", "indoc", @@ -9911,7 +9911,7 @@ dependencies = [ "async-trait", "bytecheck", "bytes 1.5.0", - "clap 4.4.6", + "clap 4.4.7", "crc32fast", "criterion", "crossbeam-queue", @@ -10148,7 +10148,7 @@ dependencies = [ name = "vector-vrl-cli" version = "0.1.0" dependencies = [ - "clap 4.4.6", + "clap 4.4.7", "vector-vrl-functions", "vrl", ] @@ -10167,7 +10167,7 @@ dependencies = [ "ansi_term", "chrono", "chrono-tz", - "clap 4.4.6", + "clap 4.4.7", "enrichment", "glob", "prettydiff", @@ -10228,7 +10228,7 @@ dependencies = [ "chrono", "chrono-tz", "cidr-utils", - "clap 4.4.6", + "clap 4.4.7", "codespan-reporting", "community-id", "crypto_secretbox", diff --git a/Cargo.toml b/Cargo.toml index 7637cac5ff91e..cf794391c783c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -257,7 +257,7 @@ bytes = { version = "1.5.0", default-features = false, features = ["serde"] } bytesize = { version = "1.3.0", default-features = false } chrono = { version = "0.4.31", default-features = false, features = ["serde"] } cidr-utils = { version = "0.5.11", default-features = false } -clap = { version = "4.4.6", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } +clap = { version = "4.4.7", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } colored = { version = "2.0.4", default-features = false } csv = { version = "1.3", default-features = false } derivative = { version = "2.2.0", default-features = false } diff --git a/lib/vector-api-client/Cargo.toml b/lib/vector-api-client/Cargo.toml index df849abbc3683..2a408c0c74bb0 100644 --- a/lib/vector-api-client/Cargo.toml +++ b/lib/vector-api-client/Cargo.toml @@ -30,7 +30,7 @@ tokio-tungstenite = { version = "0.20.1", default-features = false, features = [ # External libs chrono = { version = "0.4.31", default-features = false, features = ["serde"] } -clap = { version = "4.4.6", default-features = false, features = ["derive"] } +clap = { version = "4.4.7", default-features = false, features = ["derive"] } url = { version = "2.4.1", default-features = false } uuid = { version = "1", default-features = false, features = ["serde", "v4"] } indoc = { version = "2.0.4", default-features = false } diff --git a/lib/vector-buffers/Cargo.toml b/lib/vector-buffers/Cargo.toml index 18f031fbbf694..5e1938772f0b0 100644 --- a/lib/vector-buffers/Cargo.toml +++ b/lib/vector-buffers/Cargo.toml @@ -32,7 +32,7 @@ vector-config-macros = { path = "../vector-config-macros", default-features = fa vector-common = { path = "../vector-common", default-features = false, features = ["byte_size_of", "serde"] } [dev-dependencies] -clap = "4.4.6" +clap = "4.4.7" criterion = { version = "0.5", features = ["html_reports", "async_tokio"] } crossbeam-queue = "0.3.8" hdrhistogram = "7.5.2" diff --git a/lib/vector-vrl/cli/Cargo.toml b/lib/vector-vrl/cli/Cargo.toml index 5e06e58ca9c3b..e374afc1894c3 100644 --- a/lib/vector-vrl/cli/Cargo.toml +++ b/lib/vector-vrl/cli/Cargo.toml @@ -7,6 +7,6 @@ publish = false license = "MPL-2.0" [dependencies] -clap = { version = "4.4.6", features = ["derive"] } +clap = { version = "4.4.7", features = ["derive"] } vector-vrl-functions = { path = "../functions" } vrl.workspace = true diff --git a/lib/vector-vrl/tests/Cargo.toml b/lib/vector-vrl/tests/Cargo.toml index 73e6d9e5df72f..067e8cae41fe1 100644 --- a/lib/vector-vrl/tests/Cargo.toml +++ b/lib/vector-vrl/tests/Cargo.toml @@ -13,7 +13,7 @@ vector-vrl-functions = { path = "../../vector-vrl/functions" } ansi_term = "0.12" chrono = "0.4" chrono-tz = "0.8" -clap = { version = "4.4.6", features = ["derive"] } +clap = { version = "4.4.7", features = ["derive"] } glob = "0.3" prettydiff = "0.6" regex = "1" diff --git a/vdev/Cargo.toml b/vdev/Cargo.toml index ce33910260155..f82e632feb1a9 100644 --- a/vdev/Cargo.toml +++ b/vdev/Cargo.toml @@ -12,9 +12,9 @@ anyhow = "1.0.75" atty = "0.2.14" cached = "0.46.0" chrono = { version = "0.4.31", default-features = false, features = ["serde", "clock"] } -clap = { version = "4.4.6", features = ["derive"] } +clap = { version = "4.4.7", features = ["derive"] } clap-verbosity-flag = "2.1.0" -clap_complete = "4.4.3" +clap_complete = "4.4.4" confy = "0.5.1" directories = "5.0.1" # remove this when stabilized https://doc.rust-lang.org/stable/std/path/fn.absolute.html From a1863e65cc22cd83d12c7806ec13baa6f65f8491 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Tue, 24 Oct 2023 17:21:51 -0400 Subject: [PATCH 06/28] fix(kafka sink): Make KafkaService return `Poll::Pending` when producer queue is full (#18770) * fix(kafka sink): set concurrency limits equal to kafka producer queue limits * use send_result to better track state * nits * clippy --- src/sinks/kafka/service.rs | 95 ++++++++++++++++++++++++++++++-------- src/sinks/kafka/sink.rs | 9 +--- 2 files changed, 78 insertions(+), 26 deletions(-) diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 0f1d122b7750c..607e7cd5fd4ea 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -1,11 +1,18 @@ -use std::task::{Context, Poll}; +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; use bytes::Bytes; use rdkafka::{ error::KafkaError, message::OwnedHeaders, producer::{FutureProducer, FutureRecord}, - util::Timeout, + types::RDKafkaErrorCode, }; use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; @@ -59,16 +66,38 @@ impl MetaDescriptive for KafkaRequest { } } +/// BlockedRecordState manages state for a record blocked from being enqueued on the producer. +struct BlockedRecordState { + records_blocked: Arc, +} + +impl BlockedRecordState { + fn new(records_blocked: Arc) -> Self { + records_blocked.fetch_add(1, Ordering::Relaxed); + Self { records_blocked } + } +} + +impl Drop for BlockedRecordState { + fn drop(&mut self) { + self.records_blocked.fetch_sub(1, Ordering::Relaxed); + } +} + #[derive(Clone)] pub struct KafkaService { kafka_producer: FutureProducer, + + /// The number of records blocked from being enqueued on the producer. + records_blocked: Arc, } impl KafkaService { - pub(crate) const fn new( - kafka_producer: FutureProducer, - ) -> KafkaService { - KafkaService { kafka_producer } + pub(crate) fn new(kafka_producer: FutureProducer) -> KafkaService { + KafkaService { + kafka_producer, + records_blocked: Arc::new(AtomicUsize::new(0)), + } } } @@ -78,13 +107,21 @@ impl Service for KafkaService { type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + // The Kafka service is at capacity if any records are currently blocked from being enqueued + // on the producer. + if self.records_blocked.load(Ordering::Relaxed) > 0 { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } } fn call(&mut self, request: KafkaRequest) -> Self::Future { let this = self.clone(); Box::pin(async move { + let raw_byte_size = + request.body.len() + request.metadata.key.as_ref().map_or(0, |x| x.len()); let event_byte_size = request .request_metadata .into_events_estimated_json_encoded_byte_size(); @@ -101,17 +138,39 @@ impl Service for KafkaService { record = record.headers(headers); } - // rdkafka will internally retry forever if the queue is full - match this.kafka_producer.send(record, Timeout::Never).await { - Ok((_partition, _offset)) => { - let raw_byte_size = - request.body.len() + request.metadata.key.map_or(0, |x| x.len()); - Ok(KafkaResponse { - event_byte_size, - raw_byte_size, - }) - } - Err((kafka_err, _original_record)) => Err(kafka_err), + // Manually poll [FutureProducer::send_result] instead of [FutureProducer::send] to track + // records that fail to be enqueued on the producer. + let mut blocked_state: Option = None; + loop { + match this.kafka_producer.send_result(record) { + // Record was successfully enqueued on the producer. + Ok(fut) => { + // Drop the blocked state (if any), as the producer is no longer blocked. + drop(blocked_state.take()); + return fut + .await + .expect("producer unexpectedly dropped") + .map(|_| KafkaResponse { + event_byte_size, + raw_byte_size, + }) + .map_err(|(err, _)| err); + } + // Producer queue is full. + Err(( + KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), + original_record, + )) => { + if blocked_state.is_none() { + blocked_state = + Some(BlockedRecordState::new(Arc::clone(&this.records_blocked))); + } + record = original_record; + tokio::time::sleep(Duration::from_millis(100)).await; + } + // A different error occurred. + Err((err, _)) => return Err(err), + }; } }) } diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 141c32f7cb3b7..db4395db15799 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -6,7 +6,6 @@ use rdkafka::{ }; use snafu::{ResultExt, Snafu}; use tokio::time::Duration; -use tower::limit::ConcurrencyLimit; use vrl::path::OwnedTargetPath; use super::config::{KafkaRole, KafkaSinkConfig}; @@ -62,11 +61,6 @@ impl KafkaSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - // rdkafka will internally retry forever, so we need some limit to prevent this from overflowing. - // 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying - // buffer is full. - let service = ConcurrencyLimit::new(self.service.clone(), 64); - let request_builder = KafkaRequestBuilder { key_field: self.key_field, headers_key: self.headers_key, @@ -100,8 +94,7 @@ impl KafkaSink { Ok(req) => Some(req), } }) - .into_driver(service) - .protocol("kafka") + .into_driver(self.service) .protocol("kafka") .run() .await From 5c1707f5972ff37d6bcd5782a157afac89efaa3d Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 25 Oct 2023 10:28:24 +0100 Subject: [PATCH 07/28] chore(prometheus_remote_write sink): remote write sink rewrite (#18676) * Refactor prometheus remote write sink Signed-off-by: Stephen Wakely * WIP Signed-off-by: Stephen Wakely * Made tests pass Signed-off-by: Stephen Wakely * Use shared compression Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Some comments Signed-off-by: Stephen Wakely * Use HttpResponse Signed-off-by: Stephen Wakely * Update request builder default Signed-off-by: Stephen Wakely * Update PartitionBatcher to use BatchConfig Signed-off-by: Stephen Wakely * Update PartitionBatcher to use BatchConfig Signed-off-by: Stephen Wakely * Remove zorkwonk Signed-off-by: Stephen Wakely * Make into fns as fns instead Signed-off-by: Stephen Wakely * Spelling Signed-off-by: Stephen Wakely * Don't box the closure Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Only insert timeout if we add the batch to the list Signed-off-by: Stephen Wakely * Allow the timer to remove an item Signed-off-by: Stephen Wakely * Allow partitions to be types other than Vec Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Added test for aggregation Signed-off-by: Stephen Wakely * Allow a custom object to be used for the reducer Signed-off-by: Stephen Wakely * Make aggregating optional Signed-off-by: Stephen Wakely * Adde test for non aggregation Signed-off-by: Stephen Wakely * Clippy Signed-off-by: Stephen Wakely * Component docs Signed-off-by: Stephen Wakely * Feedback from Kyle and Doug Signed-off-by: Stephen Wakely * Use generic compression options Signed-off-by: Stephen Wakely * Default compression to Snappy Signed-off-by: Stephen Wakely * Update docs Signed-off-by: Stephen Wakely * Add snappy to the compression docs Signed-off-by: Stephen Wakely * Remove proptest file Signed-off-by: Stephen Wakely * Snappy is no longer optional Signed-off-by: Stephen Wakely --------- Signed-off-by: Stephen Wakely --- Cargo.toml | 8 +- lib/vector-stream/src/partitioned_batcher.rs | 55 +- src/sinks/prometheus/remote_write.rs | 790 ------------------ src/sinks/prometheus/remote_write/config.rs | 229 +++++ .../remote_write/integration_tests.rs | 109 +++ src/sinks/prometheus/remote_write/mod.rs | 48 ++ .../remote_write/request_builder.rs | 131 +++ src/sinks/prometheus/remote_write/service.rs | 139 +++ src/sinks/prometheus/remote_write/sink.rs | 233 ++++++ src/sinks/prometheus/remote_write/tests.rs | 290 +++++++ src/sinks/util/buffer/compression.rs | 34 +- src/sinks/util/buffer/metrics/normalize.rs | 12 +- src/sinks/util/buffer/mod.rs | 9 + src/sinks/util/builder.rs | 4 +- src/sinks/util/compressor.rs | 11 +- src/sinks/util/http.rs | 4 +- src/sinks/util/mod.rs | 1 + src/sinks/util/snappy.rs | 88 ++ src/sinks/vector/sink.rs | 8 +- src/sources/prometheus/remote_write.rs | 214 +++-- .../components/sinks/base/appsignal.cue | 5 + .../sinks/base/aws_cloudwatch_logs.cue | 5 + .../sinks/base/aws_cloudwatch_metrics.cue | 5 + .../sinks/base/aws_kinesis_firehose.cue | 5 + .../sinks/base/aws_kinesis_streams.cue | 5 + .../components/sinks/base/aws_s3.cue | 5 + .../reference/components/sinks/base/axiom.cue | 5 + .../components/sinks/base/azure_blob.cue | 5 + .../components/sinks/base/clickhouse.cue | 5 + .../components/sinks/base/datadog_logs.cue | 5 + .../components/sinks/base/datadog_traces.cue | 5 + .../components/sinks/base/elasticsearch.cue | 5 + .../sinks/base/gcp_cloud_storage.cue | 5 + .../reference/components/sinks/base/http.cue | 5 + .../components/sinks/base/humio_logs.cue | 5 + .../components/sinks/base/humio_metrics.cue | 5 + .../components/sinks/base/new_relic.cue | 5 + .../sinks/base/prometheus_remote_write.cue | 39 +- .../components/sinks/base/splunk_hec_logs.cue | 5 + .../sinks/base/splunk_hec_metrics.cue | 5 + .../components/sinks/base/webhdfs.cue | 5 + 41 files changed, 1597 insertions(+), 959 deletions(-) delete mode 100644 src/sinks/prometheus/remote_write.rs create mode 100644 src/sinks/prometheus/remote_write/config.rs create mode 100644 src/sinks/prometheus/remote_write/integration_tests.rs create mode 100644 src/sinks/prometheus/remote_write/mod.rs create mode 100644 src/sinks/prometheus/remote_write/request_builder.rs create mode 100644 src/sinks/prometheus/remote_write/service.rs create mode 100644 src/sinks/prometheus/remote_write/sink.rs create mode 100644 src/sinks/prometheus/remote_write/tests.rs create mode 100644 src/sinks/util/snappy.rs diff --git a/Cargo.toml b/Cargo.toml index cf794391c783c..3a5b607be302d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -316,7 +316,7 @@ seahash = { version = "4.1.0", default-features = false } semver = { version = "1.0.20", default-features = false, features = ["serde", "std"], optional = true } smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snafu = { version = "0.7.5", default-features = false, features = ["futures"] } -snap = { version = "1.1.0", default-features = false, optional = true } +snap = { version = "1.1.0", default-features = false } socket2 = { version = "0.5.5", default-features = false } stream-cancel = { version = "0.8.1", default-features = false } strip-ansi-escapes = { version = "0.2.0", default-features = false } @@ -560,9 +560,9 @@ sources-splunk_hec = ["dep:roaring"] sources-statsd = ["sources-utils-net", "tokio-util/net"] sources-stdin = ["tokio-util/io"] sources-syslog = ["codecs-syslog", "sources-utils-net", "tokio-util/net"] -sources-utils-http = ["dep:snap", "sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error", "sources-utils-http-prelude"] +sources-utils-http = ["sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error", "sources-utils-http-prelude"] sources-utils-http-auth = ["sources-utils-http-error"] -sources-utils-http-encoding = ["dep:snap", "sources-utils-http-error"] +sources-utils-http-encoding = ["sources-utils-http-error"] sources-utils-http-error = [] sources-utils-http-prelude = ["sources-utils-http", "sources-utils-http-auth", "sources-utils-http-encoding", "sources-utils-http-error"] sources-utils-http-query = [] @@ -715,7 +715,7 @@ sinks-nats = ["dep:async-nats", "dep:nkeys"] sinks-new_relic_logs = ["sinks-http"] sinks-new_relic = [] sinks-papertrail = ["dep:syslog"] -sinks-prometheus = ["dep:base64", "dep:prometheus-parser", "dep:snap"] +sinks-prometheus = ["dep:base64", "dep:prometheus-parser"] sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] sinks-redis = ["dep:redis"] sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] diff --git a/lib/vector-stream/src/partitioned_batcher.rs b/lib/vector-stream/src/partitioned_batcher.rs index 0d73cc5267098..b865216c1f764 100644 --- a/lib/vector-stream/src/partitioned_batcher.rs +++ b/lib/vector-stream/src/partitioned_batcher.rs @@ -15,7 +15,7 @@ use vector_core::{partition::Partitioner, time::KeyedTimer, ByteSizeOf}; use crate::batcher::{ config::BatchConfigParts, - data::BatchReduce, + data::BatchData, limiter::{ByteSizeOfItemSize, ItemBatchSize, SizeLimit}, BatchConfig, }; @@ -155,16 +155,15 @@ impl BatcherSettings { } /// A batcher config using the `ItemBatchSize` trait to determine batch sizes. - /// The output is built with the supplied reducer function. - pub fn into_reducer_config( - self, + /// The output is built with the supplied object implementing [`BatchData`]. + pub fn as_reducer_config( + &self, item_size: I, - reducer: F, - ) -> BatchConfigParts, BatchReduce> + reducer: B, + ) -> BatchConfigParts, B> where I: ItemBatchSize, - F: FnMut(&mut S, T), - S: Default, + B: BatchData, { BatchConfigParts { batch_limiter: SizeLimit { @@ -173,14 +172,14 @@ impl BatcherSettings { current_size: 0, item_size_calculator: item_size, }, - batch_data: BatchReduce::new(reducer), + batch_data: reducer, timeout: self.timeout, } } } #[pin_project] -pub struct PartitionedBatcher +pub struct PartitionedBatcher where Prt: Partitioner, { @@ -193,7 +192,7 @@ where /// The store of 'closed' batches. When this is not empty it will be /// preferentially flushed prior to consuming any new items from the /// underlying stream. - closed_batches: Vec<(Prt::Key, Vec)>, + closed_batches: Vec<(Prt::Key, B)>, /// The queue of pending batch expirations timer: KT, /// The partitioner for this `Batcher` @@ -203,7 +202,7 @@ where stream: Fuse, } -impl PartitionedBatcher, C, F> +impl PartitionedBatcher, C, F, B> where St: Stream, Prt: Partitioner + Unpin, @@ -226,7 +225,7 @@ where } #[cfg(test)] -impl PartitionedBatcher +impl PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, @@ -247,17 +246,17 @@ where } } -impl Stream for PartitionedBatcher +impl Stream for PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, KT: KeyedTimer, - C: BatchConfig>, + C: BatchConfig, F: Fn() -> C + Send, { - type Item = (Prt::Key, Vec); + type Item = (Prt::Key, B); fn size_hint(&self) -> (usize, Option) { self.stream.size_hint() @@ -270,20 +269,18 @@ where return Poll::Ready(this.closed_batches.pop()); } match this.stream.as_mut().poll_next(cx) { - Poll::Pending => { - match this.timer.poll_expired(cx) { - // Unlike normal streams, `DelayQueue` can return `None` - // here but still be usable later if more entries are added. - Poll::Pending | Poll::Ready(None) => return Poll::Pending, - Poll::Ready(Some(item_key)) => { - let mut batch = this - .batches - .remove(&item_key) - .expect("batch should exist if it is set to expire"); - this.closed_batches.push((item_key, batch.take_batch())); - } + Poll::Pending => match this.timer.poll_expired(cx) { + // Unlike normal streams, `DelayQueue` can return `None` + // here but still be usable later if more entries are added. + Poll::Pending | Poll::Ready(None) => return Poll::Pending, + Poll::Ready(Some(item_key)) => { + let mut batch = this + .batches + .remove(&item_key) + .expect("batch should exist if it is set to expire"); + this.closed_batches.push((item_key, batch.take_batch())); } - } + }, Poll::Ready(None) => { // Now that the underlying stream is closed, we need to // clear out our batches, including all expiration diff --git a/src/sinks/prometheus/remote_write.rs b/src/sinks/prometheus/remote_write.rs deleted file mode 100644 index 043254630b63e..0000000000000 --- a/src/sinks/prometheus/remote_write.rs +++ /dev/null @@ -1,790 +0,0 @@ -use std::io::Read; -use std::sync::Arc; -use std::task; - -#[cfg(feature = "aws-core")] -use aws_credential_types::provider::SharedCredentialsProvider; -#[cfg(feature = "aws-core")] -use aws_types::region::Region; -use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream, FutureExt, SinkExt}; -use http::{Request, Uri}; -use prost::Message; -use snafu::{ResultExt, Snafu}; -use tower::Service; -use vector_config::configurable_component; -use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; - -use super::collector::{self, MetricCollector as _}; -use crate::{ - config::{self, AcknowledgementsConfig, Input, SinkConfig}, - event::{Event, Metric}, - http::HttpClient, - internal_events::{EndpointBytesSent, TemplateRenderingError}, - sinks::{ - self, - prometheus::PrometheusRemoteWriteAuth, - util::{ - auth::Auth, - batch::BatchConfig, - buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer}, - http::HttpRetryLogic, - uri, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings, - TowerRequestConfig, - }, - }, - template::Template, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Clone, Copy, Debug, Default)] -pub struct PrometheusRemoteWriteDefaultBatchSettings; - -impl SinkBatchSettings for PrometheusRemoteWriteDefaultBatchSettings { - const MAX_EVENTS: Option = Some(1_000); - const MAX_BYTES: Option = None; - const TIMEOUT_SECS: f64 = 1.0; -} - -#[derive(Debug, Snafu)] -enum Errors { - #[snafu(display(r#"Prometheus remote_write sink cannot accept "set" metrics"#))] - SetMetricInvalid, - #[cfg(feature = "aws-core")] - #[snafu(display("aws.region required when AWS authentication is in use"))] - AwsRegionRequired, -} - -/// Configuration for the `prometheus_remote_write` sink. -#[configurable_component(sink( - "prometheus_remote_write", - "Deliver metric data to a Prometheus remote write endpoint." -))] -#[derive(Clone, Debug, Default)] -#[serde(deny_unknown_fields)] -pub struct RemoteWriteConfig { - /// The endpoint to send data to. - /// - /// The endpoint should include the scheme and the path to write to. - #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))] - pub endpoint: String, - - /// The default namespace for any metrics sent. - /// - /// This namespace is only used if a metric has no existing namespace. When a namespace is - /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`). - /// - /// It should follow the Prometheus [naming conventions][prom_naming_docs]. - /// - /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names - #[configurable(metadata(docs::examples = "service"))] - #[configurable(metadata(docs::advanced))] - pub default_namespace: Option, - - /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms. - /// - /// [dist_metric_docs]: https://vector.dev/docs/about/under-the-hood/architecture/data-model/metric/#distribution - #[serde(default = "super::default_histogram_buckets")] - #[configurable(metadata(docs::advanced))] - pub buckets: Vec, - - /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary. - /// - /// [dist_metric_docs]: https://vector.dev/docs/about/under-the-hood/architecture/data-model/metric/#distribution - #[serde(default = "super::default_summary_quantiles")] - #[configurable(metadata(docs::advanced))] - pub quantiles: Vec, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - /// The tenant ID to send. - /// - /// If set, a header named `X-Scope-OrgID` is added to outgoing requests with the value of this setting. - /// - /// This may be used by Cortex or other remote services to identify the tenant making the request. - #[serde(default)] - #[configurable(metadata(docs::examples = "my-domain"))] - #[configurable(metadata(docs::advanced))] - pub tenant_id: Option