From d2ab15c2aab8e71930596fe370c9cd58dfbe1e8a Mon Sep 17 00:00:00 2001 From: IHEII Date: Mon, 30 Oct 2023 09:44:12 +0800 Subject: [PATCH] [Fix] Liveness (#77) * [Fix] refresh meta * [Fix] liveness in RPC timeout * [Chore] update toolchain * [Fix] panic error * [Fix] connection all invalid * [Chore] change default config * [Fix] rebalance error * [Fix] sync_refresh_metadata --- Cargo.lock | 46 +++++++++++ Cargo.toml | 1 + src/client/mod.rs | 14 ++-- src/client/table.rs | 5 ++ src/client/table_client.rs | 117 ++++++++++++++++++++++----- src/error.rs | 14 ++++ src/location/mod.rs | 4 +- src/rpc/conn_pool.rs | 18 ++++- src/rpc/mod.rs | 15 ++-- tests/test_table_client_base.rs | 13 +-- ycsb-rs/workloads/workload_obkv.toml | 4 +- 11 files changed, 204 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef4d406..167afa0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -108,6 +117,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" +dependencies = [ + "addr2line", + "cc", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.21.0" @@ -827,6 +851,12 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" + [[package]] name = "glob" version = "0.3.1" @@ -1461,11 +1491,21 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.30.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03b4680b86d9cfafba8fc491dc9b6df26b68cf40e9e6cd73909194759a63c385" +dependencies = [ + "memchr", +] + [[package]] name = "obkv-table-client-rs" version = "0.2.0" dependencies = [ "anyhow", + "backtrace", "byteorder", "bytes", "chrono", @@ -1893,6 +1933,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustc-hash" version = "1.1.0" diff --git a/Cargo.toml b/Cargo.toml index 031e706..ab8f922 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ tokio = { version = "1", features = ["full"] } [dependencies] anyhow = { workspace = true } +backtrace = "0.3" byteorder = "1.2" bytes = "1.4" chrono = "0.4" diff --git a/src/client/mod.rs b/src/client/mod.rs index a95826d..8ba03e1 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -87,30 +87,30 @@ impl Default for ClientConfig { sys_password: "".to_owned(), metadata_mysql_conn_pool_min_size: 1, metadata_mysql_conn_pool_max_size: 3, - metadata_refresh_interval: Duration::from_secs(60), + metadata_refresh_interval: Duration::from_secs(3), ocp_model_cache_file: "/tmp/ocp_model_cache.json".to_owned(), rslist_acquire_timeout: Duration::from_secs(10), rslist_acquire_try_times: 3, rslist_acquire_retry_interval: Duration::from_millis(100), - table_entry_acquire_connect_timeout: Duration::from_secs(5), + table_entry_acquire_connect_timeout: Duration::from_secs(3), table_entry_acquire_read_timeout: Duration::from_secs(3), - table_entry_refresh_interval_base: Duration::from_millis(100), - table_entry_refresh_interval_ceiling: Duration::from_millis(1600), + table_entry_refresh_interval_base: Duration::from_secs(60), + table_entry_refresh_interval_ceiling: Duration::from_secs(120), table_entry_refresh_try_times: 3, table_entry_refresh_try_interval: Duration::from_millis(20), table_entry_refresh_continuous_failure_ceiling: 10, server_address_priority_timeout: Duration::from_secs(1800), - runtime_continuous_failure_ceiling: 100, + runtime_continuous_failure_ceiling: 10, - rpc_connect_timeout: Duration::from_secs(5), + rpc_connect_timeout: Duration::from_secs(3), rpc_read_timeout: Duration::from_secs(3), rpc_login_timeout: Duration::from_secs(3), rpc_operation_timeout: Duration::from_secs(3), rpc_retry_limit: 3, - rpc_retry_interval: Duration::from_secs(0), + rpc_retry_interval: Duration::from_millis(500), refresh_workers_num: 5, diff --git a/src/client/table.rs b/src/client/table.rs index 62aa309..611bf9b 100644 --- a/src/client/table.rs +++ b/src/client/table.rs @@ -82,6 +82,11 @@ impl ObTable { result.into() } + + /// return addr + pub fn addr(&self) -> String { + format!("{}:{}", self.ip, self.port) + } } pub struct Builder { diff --git a/src/client/table_client.rs b/src/client/table_client.rs index 9452e7f..1e5815b 100644 --- a/src/client/table_client.rs +++ b/src/client/table_client.rs @@ -29,6 +29,7 @@ use std::{ use rand::{seq::SliceRandom, thread_rng}; use scheduled_thread_pool::ScheduledThreadPool; +use tokio::time::sleep; use super::{ ocp::{ObOcpModelManager, OcpModel}, @@ -214,6 +215,8 @@ struct ObTableClientInner { refresh_metadata_mutex: Lock, last_refresh_metadata_ts: AtomicUsize, + refresh_sender: std::sync::mpsc::SyncSender, + // query concurrency control query_permits: Option, } @@ -230,6 +233,7 @@ impl ObTableClientInner { database: String, running_mode: RunningMode, config: ClientConfig, + refresh_sender: std::sync::mpsc::SyncSender, runtimes: Arc, ) -> Result { let ocp_manager = @@ -268,6 +272,7 @@ impl ObTableClientInner { refresh_metadata_mutex: Mutex::new(0), last_refresh_metadata_ts: AtomicUsize::new(0), + refresh_sender, query_permits, }) } @@ -296,7 +301,17 @@ impl ObTableClientInner { table_name:{}, err:{}", table_name, error ); - self.get_or_refresh_table_entry_non_blocking(table_name, true)?; + + match self.refresh_sender.try_send(table_name.to_owned()) { + Ok(_) => { + warn!("ObTableClientInner::on_table_op_failure: Need Refresh / try to refresh schema actively succeed, table_name:{table_name}"); + } + Err(error) => { + warn!("ObTableClientInner::on_table_op_failure: Need Refresh / try to refresh schema actively failed, maybe other thread has sent, table_name:{table_name}, error:{error}"); + } + } + + return Ok(()); } let counter = { @@ -321,7 +336,16 @@ impl ObTableClientInner { { warn!("ObTableClientInner::on_table_op_failure refresh table entry {} while execute failed times exceeded {}, err: {}.", table_name, self.config.runtime_continuous_failure_ceiling, error); - self.get_or_refresh_table_entry(table_name, true)?; + + match self.refresh_sender.try_send(table_name.to_owned()) { + Ok(_) => { + warn!("ObTableClientInner::on_table_op_failure: Continuous Failure / try to refresh schema actively succeed, table_name:{table_name}"); + } + Err(error) => { + warn!("ObTableClientInner::on_table_op_failure: Continuous Failure / try to refresh schema actively failed, maybe other thread has sent, table_name:{table_name}, error:{error}"); + } + } + counter.store(0, Ordering::SeqCst); } @@ -839,7 +863,14 @@ impl ObTableClientInner { )) } - fn need_refresh_table_entry(&self, table_entry: &Arc) -> bool { + fn need_refresh_table_entry( + &self, + table_entry: &Arc, + active_refresh: bool, + ) -> bool { + if active_refresh { + return true; + } let ratio = 2_f64.powi(self.server_roster.max_priority() as i32); let interval_ms = @@ -889,10 +920,12 @@ impl ObTableClientInner { refresh: bool, blocking: bool, ) -> Result> { + // Now blocking is false when refresh actively + let active_refresh = !blocking; // Attempt to retrieve it from cache, avoid locking. if let Some(table_entry) = self.get_table_entry_from_cache(table_name) { //If the refresh is false indicates that user tolerate not the latest data - if !refresh || !self.need_refresh_table_entry(&table_entry) { + if !refresh || !self.need_refresh_table_entry(&table_entry, active_refresh) { return Ok(table_entry); } } @@ -953,7 +986,7 @@ impl ObTableClientInner { //double-check whether need to do refreshing if let Some(table_entry) = self.get_table_entry_from_cache(table_name) { //If the refresh is false indicates that user tolerate not the latest data - if !refresh || !self.need_refresh_table_entry(&table_entry) { + if !refresh || !self.need_refresh_table_entry(&table_entry, active_refresh) { debug!( "ObTableClientInner::get_or_refresh_table_entry: double check found no need \ to refresh, table_name:{}", @@ -1001,6 +1034,13 @@ impl ObTableClientInner { entry for table {}, error is {:?}", table_name, e ); + if e.is_err() + && e.as_ref().err().unwrap().is_ob_exception() + && e.as_ref().err().unwrap().ob_result_code().unwrap() + == ResultCodes::OB_ERR_UNKNOWN_TABLE + { + return Err(e.err().unwrap()); + } if self .table_entry_refresh_continuous_failure_count .fetch_add(1, Ordering::SeqCst) @@ -1123,7 +1163,9 @@ impl ObTableClientInner { if let Err(e) = self.get_or_refresh_table_entry(&table_name, true) { warn!("ObTableClientInner::refresh_all_table_entries fail to refresh table entry for table: {}, err: {}.", table_name, e); - self.invalidate_table(&table_name); + if e.need_invalidate_table() { + self.invalidate_table(&table_name); + } } } OBKV_CLIENT_METRICS.observe_sys_operation_rt("refresh_all_tables", start.elapsed()); @@ -1425,7 +1467,10 @@ impl ObTableClientInner { OBKV_CLIENT_METRICS.inc_retry_times(ObClientOpRetryType::Execute); if self.config.rpc_retry_interval.as_secs() > 0 { - thread::sleep(self.config.rpc_retry_interval); + sleep(Duration::from_millis( + self.config.rpc_retry_interval.as_millis() as u64, + )) + .await; } continue; } @@ -1837,7 +1882,10 @@ impl ObTableClient { OBKV_CLIENT_METRICS.inc_retry_times(ObClientOpRetryType::ExecuteBatch); if self.inner.config.rpc_retry_interval.as_secs() > 0 { - thread::sleep(self.inner.config.rpc_retry_interval); + sleep(Duration::from_millis( + self.inner.config.rpc_retry_interval.as_millis() as u64, + )) + .await; } continue; } @@ -2372,24 +2420,51 @@ impl Builder { assert_not_empty(&self.param_url, "Blank param url"); assert_not_empty(&self.full_user_name, "Blank full user name"); let runtimes = Arc::new(build_obkv_runtimes(&self.config)); + let (sender, receiver) = std::sync::mpsc::sync_channel::(1); + let inner_client = Arc::new(ObTableClientInner::internal_new( + self.param_url, + self.full_user_name, + self.password, + self.user_name, + self.tenant_name, + self.cluster_name, + self.database, + self.running_mode, + self.config, + sender, + runtimes, + )?); + + // refresh schema in ActiveRefreshSchemaThread + let inner = inner_client.clone(); + let _handle = thread::Builder::new() + .name("ActiveRefreshMetaThread".to_string()) + .spawn(move || { + loop { + let message = { + match receiver.recv() { + Ok(message) => Some(message), + Err(_) => None, + } + }; + + if let Some(message) = message { + if let Err(e) = inner.get_or_refresh_table_entry_non_blocking(&message, true) { + error!("ActiveRefreshMetaThread fail to refresh table entry for table: {}, err: {}.", + message, e); + } + } else { + break; + } + } + }); Ok(ObTableClient { - inner: Arc::new(ObTableClientInner::internal_new( - self.param_url, - self.full_user_name, - self.password, - self.user_name, - self.tenant_name, - self.cluster_name, - self.database, - self.running_mode, - self.config, - runtimes, - )?), + inner: inner_client, refresh_thread_pool: Arc::new( ScheduledThreadPool::builder() .num_threads(2) - .thread_name_pattern("RefreshMetaThread-") + .thread_name_pattern("RefreshMetaThread") .build(), ), }) diff --git a/src/error.rs b/src/error.rs index 76ae888..12728d1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -166,6 +166,20 @@ impl Error { pub fn need_refresh_table(&self) -> bool { if let Error::Common(CommonErrCode::ObException(code), _) = self { code.need_refresh_table() + } else if let Error::Common(CommonErrCode::ConnPool, message) = self { + // conn_pool will produced this error if all connection to a server is shutdown + // which means we need refresh + return message.ends_with("are all removed"); + } else { + false + } + } + + pub fn need_invalidate_table(&self) -> bool { + if let Error::Common(CommonErrCode::PartitionError, message) = self { + // Location::get_table_location_from_remote will produce this error if the table + // is dropped + message.starts_with("Location::get_table_location_from_remote: Table maybe dropped.") } else { false } diff --git a/src/location/mod.rs b/src/location/mod.rs index 5515fa6..e0ad753 100644 --- a/src/location/mod.rs +++ b/src/location/mod.rs @@ -1068,7 +1068,7 @@ impl ObTableLocation { part_id, table_entry, partition_location); return Err(CommonErr( CommonErrCode::PartitionError, - format!("Location::get_table_location_from_remote: partition num={part_id} is not exists, table={table_entry:?}, locations={partition_location:?}"), + format!("Location::get_table_location_from_remote: Table maybe dropped. partition num={part_id} is not exists, table={table_entry:?}, locations={partition_location:?}"), )); } @@ -1078,7 +1078,7 @@ impl ObTableLocation { return Err(CommonErr( CommonErrCode::PartitionError, - format!("Location::get_table_location_from_remote: partition num={part_id} has no leader, table={table_entry:?}, locations={partition_location:?}"), + format!("Location::get_table_location_from_remote: partition num={part_id} has no leader, table={table_entry:?}, locations={partition_location:?}, maybe rebalancing"), )); } } diff --git a/src/rpc/conn_pool.rs b/src/rpc/conn_pool.rs index 483dfe4..1d15f3b 100644 --- a/src/rpc/conn_pool.rs +++ b/src/rpc/conn_pool.rs @@ -256,6 +256,7 @@ impl ConnPool { // TODO: may use better name for the timeout here let end = Instant::now() + pool.conn_builder.connect_timeout; + let mut all_moved = false; let mut inner = pool.inner.lock().unwrap(); loop { @@ -274,6 +275,9 @@ impl ConnPool { return Ok(conn); } (None, removed) => { + if !all_moved { + all_moved = true; + } warn!("ConnPool::get fail to get active connection so have to wait for a new one, removed:{}", removed); } } @@ -295,13 +299,23 @@ impl ConnPool { ), )); } + if all_moved { + // error.rs will refresh depends on 'are all removed' + return Err(CommonErr( + CommonErrCode::ConnPool, + format!( + "ConnPool::all connection to addr:{}, port:{} are all removed", + pool.conn_builder.ip, pool.conn_builder.port + ), + )); + } let wait_res = pool.cond.wait_timeout(inner, end - now).unwrap(); if wait_res.1.timed_out() { return Err(CommonErr( CommonErrCode::ConnPool, format!( - "ConnPool::get wait for a connection timeout, timeout:{:?}, addr:{}", - pool.conn_builder.connect_timeout, pool.conn_builder.ip + "ConnPool::get wait for a connection timeout, timeout:{:?}, addr:{}, port:{}", + pool.conn_builder.connect_timeout, pool.conn_builder.ip, pool.conn_builder.port ), )); } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 6aff15e..d0d7b83 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -570,7 +570,7 @@ impl Connection { self.on_recv_timeout(); return Err(CommonErr( CommonErrCode::Rpc, - format!("wait for rpc response timeout, err:{err}"), + format!("wait for rpc response timeout, err:{err}, trace_id:{trace_id}"), )); } }.map_err(|err| CommonErr(CommonErrCode::Rpc, format!("Tokio timeout error: {err:?}")))?; @@ -935,14 +935,13 @@ impl Builder { let tokio_socket = TcpSocket::from_std_stream(socket2_socket.into()); - let stream = tokio_socket - .connect(addr) - .await - .map_err(|e| { + let stream = match tokio_socket.connect(addr).await { + Ok(stream) => stream, + Err(e) => { error!("Builder::build fail to connect to {}, err: {}.", addr, e); - e - }) - .unwrap(); + return Err(e.into()); + } + }; let id = Self::generate_uniqueid(stream.local_addr().unwrap()); diff --git a/tests/test_table_client_base.rs b/tests/test_table_client_base.rs index b217088..b615b73 100644 --- a/tests/test_table_client_base.rs +++ b/tests/test_table_client_base.rs @@ -106,12 +106,12 @@ impl BaseTest { let mut handles = vec![]; let start = SystemTime::now(); let counter = Arc::new(AtomicUsize::new(0)); - for _ in 0..10 { + for _ in 0..BaseTest::THREAD_NUM { let client = self.client.clone(); let counter = counter.clone(); handles.push(task::spawn(async move { - for i in 0..100 { - let key: i64 = i; + for i in 0..BaseTest::ROW_NUM { + let key: i64 = i.try_into().unwrap(); let value = format!("value{i}"); let result = client .insert_or_update( @@ -141,11 +141,14 @@ impl BaseTest { for handle in handles { handle.await.unwrap(); } - assert_eq!(1000, counter.load(Ordering::SeqCst)); + assert_eq!( + BaseTest::THREAD_NUM * BaseTest::ROW_NUM, + counter.load(Ordering::SeqCst) + ); println!( "{} seconds for insert_or_update {} rows.", start.elapsed().unwrap().as_secs(), - 1000 + BaseTest::THREAD_NUM * BaseTest::ROW_NUM ); } diff --git a/ycsb-rs/workloads/workload_obkv.toml b/ycsb-rs/workloads/workload_obkv.toml index 4764755..d98acde 100644 --- a/ycsb-rs/workloads/workload_obkv.toml +++ b/ycsb-rs/workloads/workload_obkv.toml @@ -44,9 +44,9 @@ test_sys_user_name = "" test_sys_password = "" # How may YCSB Client will use a OBKV Client -obkv_client_reuse = 400 +obkv_client_reuse = 200 -rpc_connect_timeout = 5000 +rpc_connect_timeout = 1000 rpc_read_timeout = 3000 rpc_login_timeout = 3000 rpc_operation_timeout = 3000