From ae069e8c58fe470857cf54e98ff2039f07be3c58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Sat, 2 Mar 2024 01:04:55 +0100 Subject: [PATCH] Cluster: get_endpoints and related methods return shards MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Wojciech Przytuła --- examples/compare-tokens.rs | 2 +- scylla/src/transport/cluster.rs | 13 +++++++------ scylla/src/transport/iterator.rs | 2 +- scylla/src/transport/session.rs | 14 +++++++------- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/examples/compare-tokens.rs b/examples/compare-tokens.rs index 294dc7842..47bad6418 100644 --- a/examples/compare-tokens.rs +++ b/examples/compare-tokens.rs @@ -41,7 +41,7 @@ async fn main() -> Result<()> { .get_cluster_data() .get_token_endpoints("examples_ks", Token { value: t }) .iter() - .map(|n| n.address) + .map(|(node, _shard)| node.address) .collect::>() ); diff --git a/scylla/src/transport/cluster.rs b/scylla/src/transport/cluster.rs index 4dbf2d385..698549888 100644 --- a/scylla/src/transport/cluster.rs +++ b/scylla/src/transport/cluster.rs @@ -1,7 +1,7 @@ /// Cluster manages up to date information and connections to database nodes use crate::frame::response::event::{Event, StatusChangeEvent}; use crate::prepared_statement::TokenCalculationError; -use crate::routing::Token; +use crate::routing::{Shard, Token}; use crate::transport::host_filter::HostFilter; use crate::transport::{ connection::{Connection, VerifiedKeyspaceName}, @@ -27,6 +27,7 @@ use tracing::{debug, warn}; use uuid::Uuid; use super::node::{KnownNode, NodeAddr}; +use super::NodeRef; use super::locator::ReplicaLocator; use super::partitioner::calculate_token_for_partition_key; @@ -408,9 +409,9 @@ impl ClusterData { } /// Access to replicas owning a given token - pub fn get_token_endpoints(&self, keyspace: &str, token: Token) -> Vec> { + pub fn get_token_endpoints(&self, keyspace: &str, token: Token) -> Vec<(Arc, Shard)> { self.get_token_endpoints_iter(keyspace, token) - .cloned() + .map(|(node, shard)| (node.clone(), shard)) .collect() } @@ -418,7 +419,7 @@ impl ClusterData { &self, keyspace: &str, token: Token, - ) -> impl Iterator> { + ) -> impl Iterator, Shard)> { let keyspace = self.keyspaces.get(keyspace); let strategy = keyspace .map(|k| &k.strategy) @@ -427,7 +428,7 @@ impl ClusterData { .replica_locator() .replicas_for_token(token, strategy, None); - replica_set.into_iter().map(|(node, _shard)| node) + replica_set.into_iter() } /// Access to replicas owning a given partition key (similar to `nodetool getendpoints`) @@ -436,7 +437,7 @@ impl ClusterData { keyspace: &str, table: &str, partition_key: &SerializedValues, - ) -> Result>, BadQuery> { + ) -> Result, Shard)>, BadQuery> { Ok(self.get_token_endpoints( keyspace, self.compute_token(keyspace, table, partition_key)?, diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index e4697fbd2..070c07a78 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -290,7 +290,7 @@ impl RowIterator { config .cluster_data .get_token_endpoints_iter(keyspace, token) - .cloned() + .map(|(node, shard)| (node.clone(), shard)) .collect(), ) } else { diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 9ba5805d1..090bb9ef8 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -2027,19 +2027,19 @@ impl RequestSpan { self.span.record("result_rows", rows.rows.len()); } - pub(crate) fn record_replicas<'a>(&'a self, replicas: &'a [impl Borrow>]) { - struct ReplicaIps<'a, N>(&'a [N]); + pub(crate) fn record_replicas<'a>(&'a self, replicas: &'a [(impl Borrow>, Shard)]) { + struct ReplicaIps<'a, N>(&'a [(N, Shard)]); impl<'a, N> Display for ReplicaIps<'a, N> where N: Borrow>, { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut nodes = self.0.iter(); - if let Some(node) = nodes.next() { - write!(f, "{}", node.borrow().address.ip())?; + let mut nodes_with_shards = self.0.iter(); + if let Some((node, shard)) = nodes_with_shards.next() { + write!(f, "{}-shard{}", node.borrow().address.ip(), shard)?; - for node in nodes { - write!(f, ",{}", node.borrow().address.ip())?; + for (node, shard) in nodes_with_shards { + write!(f, ",{}-shard{}", node.borrow().address.ip(), shard)?; } } Ok(())