Skip to content

Commit

Permalink
enhancement(components): Add customizable connection retry options fo…
Browse files Browse the repository at this point in the history
…r Pulsar client in Pulsar sink (#21245)

* feat(components) Add customizable connection retry options for Pulsar client in Pulsar sink

* review feedback

* cargo fmt
  • Loading branch information
FRosner authored Oct 21, 2024
1 parent fc6883f commit baf6a55
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Expose `connection_retry_options` in the Pulsar sink configuration to allow customizing the connection retry behaviour of the pulsar client. This includes the following options:

- `min_backoff_ms`: Minimum delay between connection retries.
- `max_backoff_secs`: Maximum delay between reconnection retries.
- `max_retries`: Maximum number of connection retries.
- `connection_timeout_secs`: Time limit to establish a connection.
- `keep_alive_secs`: Keep-alive interval for each broker connection.

authors: FRosner
71 changes: 69 additions & 2 deletions src/sinks/pulsar/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use pulsar::{
};
use pulsar::{error::AuthenticationError, OperationRetryOptions};
use snafu::ResultExt;
use std::time::Duration;
use vector_lib::codecs::{encoding::SerializerConfig, TextSerializerConfig};
use vector_lib::config::DataType;
use vector_lib::lookup::lookup_v2::OptionalTargetPath;
Expand Down Expand Up @@ -77,6 +78,10 @@ pub struct PulsarSinkConfig {
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
#[serde(default)]
pub connection_retry_options: Option<CustomConnectionRetryOptions>,
}

/// Event batching behavior.
Expand Down Expand Up @@ -171,6 +176,36 @@ pub enum PulsarCompression {
Snappy,
}

#[configurable_component]
#[configurable(
description = "Custom connection retry options configuration for the Pulsar client."
)]
#[derive(Clone, Debug)]
pub struct CustomConnectionRetryOptions {
/// Minimum delay between connection retries.
#[configurable(metadata(docs::type_unit = "milliseconds"))]
pub min_backoff_ms: Option<u64>,

/// Maximum delay between reconnection retries.
#[configurable(metadata(docs::type_unit = "seconds"))]
#[configurable(metadata(docs::examples = 30))]
pub max_backoff_secs: Option<u64>,

/// Maximum number of connection retries.
#[configurable(metadata(docs::examples = 12))]
pub max_retries: Option<u32>,

/// Time limit to establish a connection.
#[configurable(metadata(docs::type_unit = "seconds"))]
#[configurable(metadata(docs::examples = 10))]
pub connection_timeout_secs: Option<u64>,

/// Keep-alive interval for each broker connection.
#[configurable(metadata(docs::type_unit = "seconds"))]
#[configurable(metadata(docs::examples = 60))]
pub keep_alive_secs: Option<u64>,
}

impl Default for PulsarSinkConfig {
fn default() -> Self {
Self {
Expand All @@ -185,6 +220,7 @@ impl Default for PulsarSinkConfig {
encoding: TextSerializerConfig::default().into(),
auth: None,
acknowledgements: Default::default(),
connection_retry_options: None,
}
}
}
Expand Down Expand Up @@ -218,8 +254,39 @@ impl PulsarSinkConfig {
}

// Apply configuration for reconnection exponential backoff.
let retry_opts = ConnectionRetryOptions::default();
builder = builder.with_connection_retry_options(retry_opts);
let default_retry_options = ConnectionRetryOptions::default();
let retry_options =
self.connection_retry_options
.as_ref()
.map_or(default_retry_options.clone(), |opts| {
ConnectionRetryOptions {
min_backoff: opts
.min_backoff_ms
.map_or(default_retry_options.min_backoff, |ms| {
Duration::from_millis(ms)
}),
max_backoff: opts
.max_backoff_secs
.map_or(default_retry_options.max_backoff, |secs| {
Duration::from_secs(secs)
}),
max_retries: opts
.max_retries
.unwrap_or(default_retry_options.max_retries),
connection_timeout: opts
.connection_timeout_secs
.map_or(default_retry_options.connection_timeout, |secs| {
Duration::from_secs(secs)
}),
keep_alive: opts
.keep_alive_secs
.map_or(default_retry_options.keep_alive, |secs| {
Duration::from_secs(secs)
}),
}
});

builder = builder.with_connection_retry_options(retry_options);

// Apply configuration for retrying Pulsar operations.
let operation_retry_opts = OperationRetryOptions::default();
Expand Down
40 changes: 40 additions & 0 deletions website/cue/reference/components/sinks/base/pulsar.cue
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,46 @@ base: components: sinks: pulsar: configuration: {
}
}
}
connection_retry_options: {
description: "Custom connection retry options configuration for the Pulsar client."
required: false
type: object: options: {
connection_timeout_secs: {
description: "Time limit to establish a connection."
required: false
type: uint: {
examples: [10]
unit: "seconds"
}
}
keep_alive_secs: {
description: "Keep-alive interval for each broker connection."
required: false
type: uint: {
examples: [60]
unit: "seconds"
}
}
max_backoff_secs: {
description: "Maximum delay between reconnection retries."
required: false
type: uint: {
examples: [30]
unit: "seconds"
}
}
max_retries: {
description: "Maximum number of connection retries."
required: false
type: uint: examples: [12]
}
min_backoff_ms: {
description: "Minimum delay between connection retries."
required: false
type: uint: unit: "milliseconds"
}
}
}
encoding: {
description: "Configures how events are encoded into raw bytes."
required: true
Expand Down

0 comments on commit baf6a55

Please sign in to comment.