Skip to content

Commit

Permalink
configurable topology refresh duration (#776)
Browse files Browse the repository at this point in the history
* fix(Cluster, ClusterWorker): New topology_refresh_interval introduced to ClusterWorker struct, which is now used instead of old creating new Dutaion instance

* fix(SessionConfig): SessionConfig now accepts cluster_topology_refresh_interval of type Duration which is consumed by ClusterWorker

* fix(GenericSessionBuilder): Added method cluster_topology_refresh_interval to set the Duration for refreshing the topology

* fix(docs): Updated docs to include information about setting cluster topology refresh duration

* Change non-zero to non-negative in docs

* fix(cluster, session, session_builder): Renamed the name to more relavent cluster_metadata_refesh_interval

* fix(conneting.md): Modified doc for cluster metadata refresh interval so that it iss more clear to understand

* fix(rename): Rename toppology refresh to metadata refresh

* fix(typo): Rename topology to metadata

* fix(typo): Rename topology to metadata

* fix(typo): Rename topology to metadata

---------

Co-authored-by: Rishabh Aryal <[email protected]>
  • Loading branch information
rishabharyal and Rishabh Aryal authored Aug 22, 2023
1 parent 148595a commit e1fa6ff
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/source/connecting/connecting.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.known_node("127.0.0.72:4321")
.known_node("localhost:8000")
.connection_timeout(Duration::from_secs(3))
.cluster_metadata_refresh_interval(Duration::from_secs(10))
.known_node_addr(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
9000,
Expand All @@ -34,6 +35,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
After successfully connecting to some specified node the driver will fetch topology information about
other nodes in this cluster and connect to them as well.

The driver refreshes the cluster metadata periodically, which contains information about cluster topology as well as the cluster schema. By default, the driver refreshes the cluster metadata every 60 seconds.
However, you can set the `cluster_metadata_refresh_interval` to a non-negative value to periodically refresh the cluster metadata. This is useful when you do not have unexpected amount of traffic or when you have an extra traffic causing topology to change frequently.

Scylla Serverless is an elastic and dynamic deployment model. When creating a `Session` you need to
specify the secure connection bundle as follows:

Expand Down
9 changes: 7 additions & 2 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ struct ClusterWorker {
// The host filter determines towards which nodes we should open
// connections
host_filter: Option<Arc<dyn HostFilter>>,

// This value determines how frequently the cluster
// worker will refresh the cluster metadata
cluster_metadata_refresh_interval: Duration,
}

#[derive(Debug)]
Expand All @@ -144,6 +148,7 @@ impl Cluster {
keyspaces_to_fetch: Vec<String>,
fetch_schema_metadata: bool,
host_filter: Option<Arc<dyn HostFilter>>,
cluster_metadata_refresh_interval: Duration,
) -> Result<Cluster, QueryError> {
let (refresh_sender, refresh_receiver) = tokio::sync::mpsc::channel(32);
let (use_keyspace_sender, use_keyspace_receiver) = tokio::sync::mpsc::channel(32);
Expand Down Expand Up @@ -185,6 +190,7 @@ impl Cluster {
used_keyspace: None,

host_filter,
cluster_metadata_refresh_interval,
};

let (fut, worker_handle) = worker.work().remote_handle();
Expand Down Expand Up @@ -473,15 +479,14 @@ impl ClusterWorker {
pub(crate) async fn work(mut self) {
use tokio::time::Instant;

let refresh_duration = Duration::from_secs(60); // Refresh topology every 60 seconds
let mut last_refresh_time = Instant::now();

loop {
let mut cur_request: Option<RefreshRequest> = None;

// Wait until it's time for the next refresh
let sleep_until: Instant = last_refresh_time
.checked_add(refresh_duration)
.checked_add(self.cluster_metadata_refresh_interval)
.unwrap_or_else(Instant::now);

let sleep_future = tokio::time::sleep_until(sleep_until);
Expand Down
8 changes: 8 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,12 @@ pub struct SessionConfig {
/// Consistency level of fetching [`TracingInfo`]
/// in [`Session::get_tracing_info`].
pub tracing_info_fetch_consistency: Consistency,

/// Interval between refreshing cluster metadata. This
/// can be configured according to the traffic pattern
/// for e.g: if they do not want unexpected traffic
/// or they expect the topology to change frequently.
pub cluster_metadata_refresh_interval: Duration,
}

/// Describes database server known on Session startup.
Expand Down Expand Up @@ -344,6 +350,7 @@ impl SessionConfig {
tracing_info_fetch_attempts: NonZeroU32::new(5).unwrap(),
tracing_info_fetch_interval: Duration::from_millis(3),
tracing_info_fetch_consistency: Consistency::One,
cluster_metadata_refresh_interval: Duration::from_secs(60),
}
}

Expand Down Expand Up @@ -580,6 +587,7 @@ impl Session {
config.keyspaces_to_fetch,
config.fetch_schema_metadata,
config.host_filter,
config.cluster_metadata_refresh_interval,
)
.await?;

Expand Down
38 changes: 38 additions & 0 deletions scylla/src/transport/session_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,30 @@ impl<K: SessionBuilderKind> GenericSessionBuilder<K> {
self.config.enable_write_coalescing = enable;
self
}

/// Set the interval at which the driver refreshes the cluster metadata which contains information
/// about the cluster topology as well as the cluster schema.
///
/// The default is 60 seconds.
///
/// In the given example, we have set the duration value to 20 seconds, which
/// means that the metadata is refreshed every 20 seconds.
/// # Example
/// ```
/// # use scylla::{Session, SessionBuilder};
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// let session: Session = SessionBuilder::new()
/// .known_node("127.0.0.1:9042")
/// .cluster_metadata_refresh_interval(std::time::Duration::from_secs(20))
/// .build()
/// .await?;
/// # Ok(())
/// # }
/// ```
pub fn cluster_metadata_refresh_interval(mut self, interval: Duration) -> Self {
self.config.cluster_metadata_refresh_interval = interval;
self
}
}

/// Creates a [`SessionBuilder`] with default configuration, same as [`SessionBuilder::new`]
Expand Down Expand Up @@ -1124,6 +1148,15 @@ mod tests {
);
}

#[test]
fn cluster_metadata_refresh_interval() {
let builder = SessionBuilder::new();
assert_eq!(
builder.config.cluster_metadata_refresh_interval,
std::time::Duration::from_secs(60)
);
}

#[test]
fn all_features() {
let mut builder = SessionBuilder::new();
Expand All @@ -1140,6 +1173,7 @@ mod tests {
builder = builder.tcp_nodelay(true);
builder = builder.use_keyspace("ks_name", true);
builder = builder.fetch_schema_metadata(false);
builder = builder.cluster_metadata_refresh_interval(Duration::from_secs(1));

assert_eq!(
builder.config.known_nodes,
Expand All @@ -1155,6 +1189,10 @@ mod tests {

assert_eq!(builder.config.compression, Some(Compression::Snappy));
assert!(builder.config.tcp_nodelay);
assert_eq!(
builder.config.cluster_metadata_refresh_interval,
Duration::from_secs(1)
);

assert_eq!(builder.config.used_keyspace, Some("ks_name".to_string()));

Expand Down

0 comments on commit e1fa6ff

Please sign in to comment.