From 4c6f5d56a5f7e6a89bfdf5e4d231cb32c5f678ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Mon, 31 Jul 2023 10:53:13 +0200 Subject: [PATCH] session: require schema agreement to fetch schema version It can be a nasty source of race bugs when schema version is fetched without awaiting for schema agreement. It is especially risky when one awaits schema agreement and subsequently fetches schema version, because the schema version can change in the meantime and schema may no longer be in agreement. To cope with the problem, `await_schema_agreement()` now returns the agreed schema version, and `fetch_schema_version()` is removed alltogether. --- docs/source/queries/schema-agreement.md | 15 +---- examples/schema_agreement.rs | 18 +++--- scylla/src/transport/session.rs | 79 ++++++------------------- scylla/src/transport/session_test.rs | 8 +-- 4 files changed, 30 insertions(+), 90 deletions(-) diff --git a/docs/source/queries/schema-agreement.md b/docs/source/queries/schema-agreement.md index aa70a84ebc..189fc17f8e 100644 --- a/docs/source/queries/schema-agreement.md +++ b/docs/source/queries/schema-agreement.md @@ -4,19 +4,6 @@ Sometimes after performing queries some nodes have not been updated, so we need There is a number of methods in `Session` that assist us. Every method raise `QueryError` if something goes wrong, but they should never raise any errors, unless there is a DB or connection malfunction. -### Checking schema version -`Session::fetch_schema_version` returns an `Uuid` of local node's schema version. - -```rust -# extern crate scylla; -# use scylla::Session; -# use std::error::Error; -# async fn check_only_compiles(session: &Session) -> Result<(), Box> { -println!("Local schema version is: {}", session.fetch_schema_version().await?); -# Ok(()) -# } -``` - ### Awaiting schema agreement `Session::await_schema_agreement` returns a `Future` that can be `await`ed as long as schema is not in an agreement. @@ -62,7 +49,7 @@ If you want to check if schema is in agreement now, without retrying after failu # use scylla::Session; # use std::error::Error; # async fn check_only_compiles(session: &Session) -> Result<(), Box> { -if session.check_schema_agreement().await? { +if session.check_schema_agreement().await?.is_some() { println!("SCHEMA AGREED"); } else { println!("SCHEMA IS NOT IN AGREEMENT"); diff --git a/examples/schema_agreement.rs b/examples/schema_agreement.rs index 44e2b32859..08e5a59384 100644 --- a/examples/schema_agreement.rs +++ b/examples/schema_agreement.rs @@ -1,4 +1,5 @@ -use anyhow::Result; +use anyhow::{bail, Result}; +use scylla::transport::errors::QueryError; use scylla::transport::session::{IntoTypedRows, Session}; use scylla::SessionBuilder; use std::env; @@ -17,16 +18,17 @@ async fn main() -> Result<()> { .build() .await?; - let schema_version = session.fetch_schema_version().await?; + let schema_version = session.await_schema_agreement().await?; + println!("Schema version: {}", schema_version); session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}", &[]).await?; - if session.await_schema_agreement().await? { - println!("Schema is in agreement"); - } else { - println!("Schema is NOT in agreement"); - } + match session.await_schema_agreement().await { + Ok(_schema_version) => println!("Schema is in agreement in time"), + Err(QueryError::RequestTimeout(_)) => println!("Schema is NOT in agreement in time"), + Err(err) => bail!(err), + }; session .query( "CREATE TABLE IF NOT EXISTS ks.t (a int, b int, c text, primary key (a, b))", @@ -66,7 +68,7 @@ async fn main() -> Result<()> { } println!("Ok."); - let schema_version = session.fetch_schema_version().await?; + let schema_version = session.await_schema_agreement().await?; println!("Schema version: {}", schema_version); Ok(()) diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 629efdf3b7..0811a465b3 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -35,7 +35,7 @@ use std::time::Duration; use tokio::net::lookup_host; use tokio::time::timeout; use tracing::warn; -use tracing::{debug, error, trace, trace_span, Instrument}; +use tracing::{debug, trace, trace_span, Instrument}; use uuid::Uuid; use super::cluster::ContactPoint; @@ -58,7 +58,7 @@ use crate::frame::value::{ use crate::prepared_statement::{PartitionKeyError, PreparedStatement}; use crate::query::Query; use crate::routing::Token; -use crate::statement::{Consistency, SerialConsistency}; +use crate::statement::Consistency; use crate::tracing::{TracingEvent, TracingInfo}; use crate::transport::cluster::{Cluster, ClusterData, ClusterNeatDebug}; use crate::transport::connection::{Connection, ConnectionConfig, VerifiedKeyspaceName}; @@ -747,8 +747,7 @@ impl Session { }; self.handle_set_keyspace_response(&response).await?; - self.handle_auto_await_schema_agreement(&query.contents, &response) - .await?; + self.handle_auto_await_schema_agreement(&response).await?; let result = response.into_query_result()?; span.record_result_fields(&result); @@ -773,18 +772,11 @@ impl Session { async fn handle_auto_await_schema_agreement( &self, - contents: &str, response: &NonErrorQueryResponse, ) -> Result<(), QueryError> { if self.schema_agreement_automatic_waiting { - if response.as_schema_change().is_some() && !self.await_schema_agreement().await? { - // TODO: The TimeoutError should allow to provide more context. - // For now, print an error to the logs - error!( - "Failed to reach schema agreement after a schema-altering statement: {}", - contents, - ); - return Err(QueryError::TimeoutError); + if response.as_schema_change().is_some() { + self.await_schema_agreement().await?; } if self.refresh_metadata_on_auto_schema_agreement @@ -1099,8 +1091,7 @@ impl Session { }; self.handle_set_keyspace_response(&response).await?; - self.handle_auto_await_schema_agreement(prepared.get_statement(), &response) - .await?; + self.handle_auto_await_schema_agreement(&response).await?; let result = response.into_query_result()?; span.record_result_fields(&result); @@ -1848,23 +1839,27 @@ impl Session { last_error.map(Result::Err) } - async fn await_schema_agreement_indefinitely(&self) -> Result<(), QueryError> { - while !self.check_schema_agreement().await? { - tokio::time::sleep(self.schema_agreement_interval).await + async fn await_schema_agreement_indefinitely(&self) -> Result { + loop { + tokio::time::sleep(self.schema_agreement_interval).await; + if let Some(agreed_version) = self.check_schema_agreement().await? { + return Ok(agreed_version); + } } - Ok(()) } - pub async fn await_schema_agreement(&self) -> Result { + pub async fn await_schema_agreement(&self) -> Result { timeout( self.schema_agreement_timeout, self.await_schema_agreement_indefinitely(), ) .await - .map_or(Ok(false), |res| res.and(Ok(true))) + .unwrap_or(Err(QueryError::RequestTimeout( + "schema agreement not reached in time".to_owned(), + ))) } - pub async fn check_schema_agreement(&self) -> Result { + pub async fn check_schema_agreement(&self) -> Result, QueryError> { let connections = self.cluster.get_working_connections().await?; let handles = connections.iter().map(|c| c.fetch_schema_version()); @@ -1872,38 +1867,7 @@ impl Session { let local_version: Uuid = versions[0]; let in_agreement = versions.into_iter().all(|v| v == local_version); - Ok(in_agreement) - } - - pub async fn fetch_schema_version(&self) -> Result { - // We ignore custom Consistency that a retry policy could decide to put here, using the default instead. - let info = RoutingInfo::default(); - let config = StatementConfig { - is_idempotent: true, - serial_consistency: Some(Some(SerialConsistency::LocalSerial)), - ..Default::default() - }; - - let span = RequestSpan::new_none(); - - match self - .run_query( - info, - &config, - self.get_default_execution_profile_handle().access(), - |node: Arc| async move { node.random_connection().await }, - |connection: Arc, _: Consistency, _: &ExecutionProfileInner| async move { - connection.fetch_schema_version().await - }, - &span, - ) - .await? - { - RunQueryResult::IgnoredWriteError => Err(QueryError::ProtocolError( - "Retry policy has made the driver ignore fetching schema version query.", - )), - RunQueryResult::Completed(result) => Ok(result), - } + Ok(in_agreement.then_some(local_version)) } fn calculate_partition_key( @@ -2189,13 +2153,6 @@ impl RequestSpan { } } - pub(crate) fn new_none() -> Self { - Self { - span: tracing::Span::none(), - speculative_executions: 0.into(), - } - } - pub(crate) fn record_shard_id(&self, conn: &Connection) { if let Some(info) = conn.get_shard_info() { self.span.record("shard", info.shard); diff --git a/scylla/src/transport/session_test.rs b/scylla/src/transport/session_test.rs index ead2481be6..30899da278 100644 --- a/scylla/src/transport/session_test.rs +++ b/scylla/src/transport/session_test.rs @@ -1103,16 +1103,10 @@ async fn assert_in_tracing_table(session: &Session, tracing_uuid: Uuid) { panic!("No rows for tracing with this session id!"); } -#[tokio::test] -async fn test_fetch_schema_version() { - let session = create_new_session_builder().build().await.unwrap(); - session.fetch_schema_version().await.unwrap(); -} - #[tokio::test] async fn test_await_schema_agreement() { let session = create_new_session_builder().build().await.unwrap(); - session.await_schema_agreement().await.unwrap(); + let _schema_version = session.await_schema_agreement().await.unwrap(); } #[tokio::test]