From 2a491f6843609dffec2005f4ab59f422d9ab1dbe Mon Sep 17 00:00:00 2001 From: muzarski Date: Wed, 28 Aug 2024 17:11:15 +0200 Subject: [PATCH] node: ConnectionPoolError Introduced a ConnectionPoolError which appears when we were unable to select a connection from the connection pool. --- scylla-cql/src/errors.rs | 22 ++++++++++++++++ scylla/src/transport/connection_pool.rs | 26 ++++++++----------- .../src/transport/load_balancing/default.rs | 1 + scylla/src/transport/node.rs | 19 +++++++------- 4 files changed, 43 insertions(+), 25 deletions(-) diff --git a/scylla-cql/src/errors.rs b/scylla-cql/src/errors.rs index 97822f6fb..d2e62dc32 100644 --- a/scylla-cql/src/errors.rs +++ b/scylla-cql/src/errors.rs @@ -37,6 +37,10 @@ pub enum QueryError { #[error("IO Error: {0}")] IoError(Arc), + /// Selected node's connection pool is in invalid state. + #[error("No connections in the pool: {0}")] + ConnectionPoolError(#[from] ConnectionPoolError), + /// Unexpected message received #[error("Protocol Error: {0}")] ProtocolError(&'static str), @@ -431,6 +435,10 @@ pub enum NewSessionError { #[error("IO Error: {0}")] IoError(Arc), + /// Selected node's connection pool is in invalid state. + #[error("No connections in the pool: {0}")] + ConnectionPoolError(#[from] ConnectionPoolError), + /// Unexpected message received #[error("Protocol Error: {0}")] ProtocolError(&'static str), @@ -474,6 +482,19 @@ pub enum BadKeyspaceName { IllegalCharacter(String, char), } +/// An error that occurred when selecting a node connection +/// to perform a request on. +#[derive(Error, Debug, Clone)] +#[non_exhaustive] +pub enum ConnectionPoolError { + #[error("The pool is broken; Last connection failed with: {last_connection_error}")] + Broken { last_connection_error: String }, + #[error("Pool is still being initialized")] + Initializing, + #[error("The node has been disabled by a host filter")] + NodeDisabledByHostFilter, +} + #[derive(Error, Debug, Clone)] #[error("Connection broken, reason: {0}")] pub struct BrokenConnectionError(Arc); @@ -656,6 +677,7 @@ impl From for NewSessionError { QueryError::CqlResultParseError(e) => NewSessionError::CqlResultParseError(e), QueryError::CqlErrorParseError(e) => NewSessionError::CqlErrorParseError(e), QueryError::IoError(e) => NewSessionError::IoError(e), + QueryError::ConnectionPoolError(e) => NewSessionError::ConnectionPoolError(e), QueryError::ProtocolError(m) => NewSessionError::ProtocolError(m), QueryError::InvalidMessage(m) => NewSessionError::InvalidMessage(m), QueryError::TimeoutError => NewSessionError::TimeoutError, diff --git a/scylla/src/transport/connection_pool.rs b/scylla/src/transport/connection_pool.rs index bbcd81e4b..0bd10d2d1 100644 --- a/scylla/src/transport/connection_pool.rs +++ b/scylla/src/transport/connection_pool.rs @@ -20,6 +20,7 @@ use super::NodeAddr; use arc_swap::ArcSwap; use futures::{future::RemoteHandle, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use rand::Rng; +use scylla_cql::errors::ConnectionPoolError; use std::convert::TryInto; use std::io::ErrorKind; use std::num::NonZeroUsize; @@ -238,7 +239,7 @@ impl NodeConnectionPool { pub(crate) fn connection_for_shard( &self, shard: Shard, - ) -> Result, std::io::Error> { + ) -> Result, ConnectionPoolError> { trace!(shard = shard, "Selecting connection for shard"); self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => { @@ -261,7 +262,7 @@ impl NodeConnectionPool { }) } - pub(crate) fn random_connection(&self) -> Result, std::io::Error> { + pub(crate) fn random_connection(&self) -> Result, ConnectionPoolError> { trace!("Selecting random connection"); self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => { @@ -345,7 +346,9 @@ impl NodeConnectionPool { } } - pub(crate) fn get_working_connections(&self) -> Result>, std::io::Error> { + pub(crate) fn get_working_connections( + &self, + ) -> Result>, ConnectionPoolError> { self.with_connections(|pool_conns| match pool_conns { PoolConnections::NotSharded(conns) => conns.clone(), PoolConnections::Sharded { connections, .. } => { @@ -377,21 +380,14 @@ impl NodeConnectionPool { fn with_connections( &self, f: impl FnOnce(&PoolConnections) -> T, - ) -> Result { + ) -> Result { let conns = self.conns.load_full(); match &*conns { MaybePoolConnections::Ready(pool_connections) => Ok(f(pool_connections)), - MaybePoolConnections::Broken(err) => Err(std::io::Error::new( - ErrorKind::Other, - format!( - "No connections in the pool; last connection failed with: {}", - err - ), - )), - MaybePoolConnections::Initializing => Err(std::io::Error::new( - ErrorKind::Other, - "No connections in the pool, pool is still being initialized", - )), + MaybePoolConnections::Broken(err) => Err(ConnectionPoolError::Broken { + last_connection_error: format!("{err}"), + }), + MaybePoolConnections::Initializing => Err(ConnectionPoolError::Initializing), } } } diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index 38eb6915e..35d1f926b 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -2838,6 +2838,7 @@ mod latency_awareness { // "fast" errors, i.e. ones that are returned quickly after the query begins QueryError::BadQuery(_) | QueryError::BrokenConnection(_) + | QueryError::ConnectionPoolError(_) | QueryError::TooManyOrphanedStreamIds(_) | QueryError::UnableToAllocStreamId | QueryError::DbError(DbError::IsBootstrapping, _) diff --git a/scylla/src/transport/node.rs b/scylla/src/transport/node.rs index e698820a8..79895d65c 100644 --- a/scylla/src/transport/node.rs +++ b/scylla/src/transport/node.rs @@ -1,3 +1,4 @@ +use scylla_cql::errors::ConnectionPoolError; use tokio::net::lookup_host; use tracing::warn; use uuid::Uuid; @@ -157,7 +158,7 @@ impl Node { pub(crate) async fn connection_for_shard( &self, shard: Shard, - ) -> Result, std::io::Error> { + ) -> Result, ConnectionPoolError> { self.get_pool()?.connection_for_shard(shard) } @@ -186,7 +187,9 @@ impl Node { Ok(()) } - pub(crate) fn get_working_connections(&self) -> Result>, std::io::Error> { + pub(crate) fn get_working_connections( + &self, + ) -> Result>, ConnectionPoolError> { self.get_pool()?.get_working_connections() } @@ -196,14 +199,10 @@ impl Node { } } - fn get_pool(&self) -> Result<&NodeConnectionPool, std::io::Error> { - self.pool.as_ref().ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::Other, - "No connections in the pool: the node has been disabled \ - by the host filter", - ) - }) + fn get_pool(&self) -> Result<&NodeConnectionPool, ConnectionPoolError> { + self.pool + .as_ref() + .ok_or(ConnectionPoolError::NodeDisabledByHostFilter) } }