Skip to content

Commit

Permalink
session: Impl calculate_token_for_partition_key()
Browse files Browse the repository at this point in the history
So far, the driver has provided a way to calculate token based on
a PreparedStatement and values for that statement. However, if
a user knows:
  - the order of the columns in the partition key,
  - the values of the columns of the partition key,
  - the partitioner of the table that the statement operates on,

then having a PreparedStatement is not necessary and the token can be
calculated based on that information. The function
calculate_token_for_partition_key() is added, which does that
calculation, along with a test.
  • Loading branch information
wprzytula committed Jul 6, 2023
1 parent 41b133d commit ed127fd
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 0 deletions.
42 changes: 42 additions & 0 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::history::HistoryListener;
use crate::retry_policy::RetryPolicy;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures::future::join_all;
use futures::future::try_join_all;
pub use scylla_cql::errors::TranslationError;
Expand Down Expand Up @@ -38,6 +40,7 @@ use super::connection::QueryResponse;
use super::connection::SslConfig;
use super::errors::{BadQuery, NewSessionError, QueryError};
use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner};
use super::partitioner::Partitioner;
use super::partitioner::PartitionerName;
use super::topology::UntranslatedPeer;
use super::NodeRef;
Expand Down Expand Up @@ -1853,6 +1856,45 @@ impl Session {
}
}

/// Calculates the token for given partitioner and serialized partition key.
///
/// The ordinary way to calculate token is based on a PreparedStatement
/// and values for that statement. However, if a user knows:
/// - the order of the columns in the partition key,
/// - the values of the columns of the partition key,
/// - the partitioner of the table that the statement operates on,
///
/// then having a `PreparedStatement` is not necessary and the token can
/// be calculated based on that information.
///
/// NOTE: the provided values must completely constitute partition key
/// and be in the order defined in CREATE TABLE statement.
pub fn calculate_token_for_partition_key<P: Partitioner>(
serialized_partition_key_values: &SerializedValues,
_partitioner: &P,
) -> Result<Token, PartitionKeyError> {
let mut buf: BytesMut = BytesMut::new();

if serialized_partition_key_values.len() == 1 {
let val = serialized_partition_key_values.iter().next().unwrap();
if let Some(val) = val {
buf.extend_from_slice(val);
}
} else {
for val in serialized_partition_key_values.iter().flatten() {
let val_len_u16: u16 = val
.len()
.try_into()
.map_err(|_| PartitionKeyError::ValueTooLong(val.len()))?;
buf.put_u16(val_len_u16);
buf.extend_from_slice(val);
buf.put_u8(0);
}
}

Ok(P::hash(&buf.freeze()))
}

/// Retrieves the handle to execution profile that is used by this session
/// by default, i.e. when an executed statement does not define its own handle.
pub fn get_default_execution_profile_handle(&self) -> &ExecutionProfileHandle {
Expand Down
95 changes: 95 additions & 0 deletions scylla/src/transport/session_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2798,3 +2798,98 @@ async fn simple_strategy_test() {

assert_eq!(rows, vec![(1, 2, 3), (4, 5, 6), (7, 8, 9)]);
}

#[tokio::test]
async fn test_manual_primary_key_computation() {
// Setup session
let ks = unique_keyspace_name();
let session = create_new_session_builder().build().await.unwrap();
session.query(format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1}}", ks), &[]).await.unwrap();
session.use_keyspace(&ks, true).await.unwrap();

async fn assert_tokens_equal(
session: &Session,
prepared: &PreparedStatement,
pk_values_in_pk_order: impl ValueList,
all_values_in_query_order: impl ValueList,
) {
let serialized_values_in_pk_order =
pk_values_in_pk_order.serialized().unwrap().into_owned();
let serialized_values_in_query_order =
all_values_in_query_order.serialized().unwrap().into_owned();

session
.execute(prepared, &serialized_values_in_query_order)
.await
.unwrap();

let token_by_prepared = session
.calculate_token(prepared, &serialized_values_in_query_order)
.unwrap()
.unwrap();
let token_by_hand = Session::calculate_token_for_partition_key(
&serialized_values_in_pk_order,
&Murmur3Partitioner,
)
.unwrap();
println!(
"by_prepared: {}, by_hand: {}",
token_by_prepared.value, token_by_hand.value
);
assert_eq!(token_by_prepared, token_by_hand);
}

// Single-column partition key
{
session
.query(
"CREATE TABLE IF NOT EXISTS t2 (a int, b int, c text, primary key (a, b))",
&[],
)
.await
.unwrap();

// Values are given non partition key order,
let prepared_simple_pk = session
.prepare("INSERT INTO t2 (a, b, c) VALUES (?, ?, ?)")
.await
.unwrap();

let pk_values_in_pk_order = (17_i32,);
let all_values_in_query_order = (17_i32, 16_i32, "I'm prepared!!!");

assert_tokens_equal(
&session,
&prepared_simple_pk,
pk_values_in_pk_order,
all_values_in_query_order,
)
.await;
}

// Composite partition key
{
session
.query("CREATE TABLE IF NOT EXISTS complex_pk (a int, b int, c text, d int, e int, primary key ((a,b,c),d))", &[])
.await
.unwrap();

// Values are given in non partition key order, to check that such permutation
// still yields consistent hashes.
let prepared_complex_pk = session
.prepare("INSERT INTO complex_pk (a, d, c, b) VALUES (?, 7, ?, ?)")
.await
.unwrap();

let pk_values_in_pk_order = (17_i32, 16_i32, "I'm prepared!!!");
let all_values_in_query_order = (17_i32, "I'm prepared!!!", 16_i32);

assert_tokens_equal(
&session,
&prepared_complex_pk,
pk_values_in_pk_order,
all_values_in_query_order,
)
.await;
}
}

0 comments on commit ed127fd

Please sign in to comment.