Skip to content

Commit

Permalink
[Fix] Liveness (#77)
Browse files Browse the repository at this point in the history
* [Fix] refresh meta
* [Fix] liveness in RPC timeout
* [Chore] update toolchain
* [Fix] panic error
* [Fix] connection all invalid
* [Chore] change default config
* [Fix] rebalance error
* [Fix] sync_refresh_metadata
  • Loading branch information
IHEII authored Oct 30, 2023
1 parent fc81c46 commit d2ab15c
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 47 deletions.
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio = { version = "1", features = ["full"] }

[dependencies]
anyhow = { workspace = true }
backtrace = "0.3"
byteorder = "1.2"
bytes = "1.4"
chrono = "0.4"
Expand Down
14 changes: 7 additions & 7 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,30 @@ impl Default for ClientConfig {
sys_password: "".to_owned(),
metadata_mysql_conn_pool_min_size: 1,
metadata_mysql_conn_pool_max_size: 3,
metadata_refresh_interval: Duration::from_secs(60),
metadata_refresh_interval: Duration::from_secs(3),
ocp_model_cache_file: "/tmp/ocp_model_cache.json".to_owned(),

rslist_acquire_timeout: Duration::from_secs(10),
rslist_acquire_try_times: 3,
rslist_acquire_retry_interval: Duration::from_millis(100),

table_entry_acquire_connect_timeout: Duration::from_secs(5),
table_entry_acquire_connect_timeout: Duration::from_secs(3),
table_entry_acquire_read_timeout: Duration::from_secs(3),
table_entry_refresh_interval_base: Duration::from_millis(100),
table_entry_refresh_interval_ceiling: Duration::from_millis(1600),
table_entry_refresh_interval_base: Duration::from_secs(60),
table_entry_refresh_interval_ceiling: Duration::from_secs(120),
table_entry_refresh_try_times: 3,
table_entry_refresh_try_interval: Duration::from_millis(20),
table_entry_refresh_continuous_failure_ceiling: 10,

server_address_priority_timeout: Duration::from_secs(1800),
runtime_continuous_failure_ceiling: 100,
runtime_continuous_failure_ceiling: 10,

rpc_connect_timeout: Duration::from_secs(5),
rpc_connect_timeout: Duration::from_secs(3),
rpc_read_timeout: Duration::from_secs(3),
rpc_login_timeout: Duration::from_secs(3),
rpc_operation_timeout: Duration::from_secs(3),
rpc_retry_limit: 3,
rpc_retry_interval: Duration::from_secs(0),
rpc_retry_interval: Duration::from_millis(500),

refresh_workers_num: 5,

Expand Down
5 changes: 5 additions & 0 deletions src/client/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ impl ObTable {

result.into()
}

/// return addr
pub fn addr(&self) -> String {
format!("{}:{}", self.ip, self.port)
}
}

pub struct Builder {
Expand Down
117 changes: 96 additions & 21 deletions src/client/table_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{

use rand::{seq::SliceRandom, thread_rng};
use scheduled_thread_pool::ScheduledThreadPool;
use tokio::time::sleep;

use super::{
ocp::{ObOcpModelManager, OcpModel},
Expand Down Expand Up @@ -214,6 +215,8 @@ struct ObTableClientInner {
refresh_metadata_mutex: Lock,
last_refresh_metadata_ts: AtomicUsize,

refresh_sender: std::sync::mpsc::SyncSender<String>,

// query concurrency control
query_permits: Option<Permits>,
}
Expand All @@ -230,6 +233,7 @@ impl ObTableClientInner {
database: String,
running_mode: RunningMode,
config: ClientConfig,
refresh_sender: std::sync::mpsc::SyncSender<String>,
runtimes: Arc<ObClientRuntimes>,
) -> Result<Self> {
let ocp_manager =
Expand Down Expand Up @@ -268,6 +272,7 @@ impl ObTableClientInner {
refresh_metadata_mutex: Mutex::new(0),
last_refresh_metadata_ts: AtomicUsize::new(0),

refresh_sender,
query_permits,
})
}
Expand Down Expand Up @@ -296,7 +301,17 @@ impl ObTableClientInner {
table_name:{}, err:{}",
table_name, error
);
self.get_or_refresh_table_entry_non_blocking(table_name, true)?;

match self.refresh_sender.try_send(table_name.to_owned()) {
Ok(_) => {
warn!("ObTableClientInner::on_table_op_failure: Need Refresh / try to refresh schema actively succeed, table_name:{table_name}");
}
Err(error) => {
warn!("ObTableClientInner::on_table_op_failure: Need Refresh / try to refresh schema actively failed, maybe other thread has sent, table_name:{table_name}, error:{error}");
}
}

return Ok(());
}

let counter = {
Expand All @@ -321,7 +336,16 @@ impl ObTableClientInner {
{
warn!("ObTableClientInner::on_table_op_failure refresh table entry {} while execute failed times exceeded {}, err: {}.",
table_name, self.config.runtime_continuous_failure_ceiling, error);
self.get_or_refresh_table_entry(table_name, true)?;

match self.refresh_sender.try_send(table_name.to_owned()) {
Ok(_) => {
warn!("ObTableClientInner::on_table_op_failure: Continuous Failure / try to refresh schema actively succeed, table_name:{table_name}");
}
Err(error) => {
warn!("ObTableClientInner::on_table_op_failure: Continuous Failure / try to refresh schema actively failed, maybe other thread has sent, table_name:{table_name}, error:{error}");
}
}

counter.store(0, Ordering::SeqCst);
}

Expand Down Expand Up @@ -839,7 +863,14 @@ impl ObTableClientInner {
))
}

fn need_refresh_table_entry(&self, table_entry: &Arc<TableEntry>) -> bool {
fn need_refresh_table_entry(
&self,
table_entry: &Arc<TableEntry>,
active_refresh: bool,
) -> bool {
if active_refresh {
return true;
}
let ratio = 2_f64.powi(self.server_roster.max_priority() as i32);

let interval_ms =
Expand Down Expand Up @@ -889,10 +920,12 @@ impl ObTableClientInner {
refresh: bool,
blocking: bool,
) -> Result<Arc<TableEntry>> {
// Now blocking is false when refresh actively
let active_refresh = !blocking;
// Attempt to retrieve it from cache, avoid locking.
if let Some(table_entry) = self.get_table_entry_from_cache(table_name) {
//If the refresh is false indicates that user tolerate not the latest data
if !refresh || !self.need_refresh_table_entry(&table_entry) {
if !refresh || !self.need_refresh_table_entry(&table_entry, active_refresh) {
return Ok(table_entry);
}
}
Expand Down Expand Up @@ -953,7 +986,7 @@ impl ObTableClientInner {
//double-check whether need to do refreshing
if let Some(table_entry) = self.get_table_entry_from_cache(table_name) {
//If the refresh is false indicates that user tolerate not the latest data
if !refresh || !self.need_refresh_table_entry(&table_entry) {
if !refresh || !self.need_refresh_table_entry(&table_entry, active_refresh) {
debug!(
"ObTableClientInner::get_or_refresh_table_entry: double check found no need \
to refresh, table_name:{}",
Expand Down Expand Up @@ -1001,6 +1034,13 @@ impl ObTableClientInner {
entry for table {}, error is {:?}",
table_name, e
);
if e.is_err()
&& e.as_ref().err().unwrap().is_ob_exception()
&& e.as_ref().err().unwrap().ob_result_code().unwrap()
== ResultCodes::OB_ERR_UNKNOWN_TABLE
{
return Err(e.err().unwrap());
}
if self
.table_entry_refresh_continuous_failure_count
.fetch_add(1, Ordering::SeqCst)
Expand Down Expand Up @@ -1123,7 +1163,9 @@ impl ObTableClientInner {
if let Err(e) = self.get_or_refresh_table_entry(&table_name, true) {
warn!("ObTableClientInner::refresh_all_table_entries fail to refresh table entry for table: {}, err: {}.",
table_name, e);
self.invalidate_table(&table_name);
if e.need_invalidate_table() {
self.invalidate_table(&table_name);
}
}
}
OBKV_CLIENT_METRICS.observe_sys_operation_rt("refresh_all_tables", start.elapsed());
Expand Down Expand Up @@ -1425,7 +1467,10 @@ impl ObTableClientInner {
OBKV_CLIENT_METRICS.inc_retry_times(ObClientOpRetryType::Execute);

if self.config.rpc_retry_interval.as_secs() > 0 {
thread::sleep(self.config.rpc_retry_interval);
sleep(Duration::from_millis(
self.config.rpc_retry_interval.as_millis() as u64,
))
.await;
}
continue;
}
Expand Down Expand Up @@ -1837,7 +1882,10 @@ impl ObTableClient {
OBKV_CLIENT_METRICS.inc_retry_times(ObClientOpRetryType::ExecuteBatch);

if self.inner.config.rpc_retry_interval.as_secs() > 0 {
thread::sleep(self.inner.config.rpc_retry_interval);
sleep(Duration::from_millis(
self.inner.config.rpc_retry_interval.as_millis() as u64,
))
.await;
}
continue;
}
Expand Down Expand Up @@ -2372,24 +2420,51 @@ impl Builder {
assert_not_empty(&self.param_url, "Blank param url");
assert_not_empty(&self.full_user_name, "Blank full user name");
let runtimes = Arc::new(build_obkv_runtimes(&self.config));
let (sender, receiver) = std::sync::mpsc::sync_channel::<String>(1);
let inner_client = Arc::new(ObTableClientInner::internal_new(
self.param_url,
self.full_user_name,
self.password,
self.user_name,
self.tenant_name,
self.cluster_name,
self.database,
self.running_mode,
self.config,
sender,
runtimes,
)?);

// refresh schema in ActiveRefreshSchemaThread
let inner = inner_client.clone();
let _handle = thread::Builder::new()
.name("ActiveRefreshMetaThread".to_string())
.spawn(move || {
loop {
let message = {
match receiver.recv() {
Ok(message) => Some(message),
Err(_) => None,
}
};

if let Some(message) = message {
if let Err(e) = inner.get_or_refresh_table_entry_non_blocking(&message, true) {
error!("ActiveRefreshMetaThread fail to refresh table entry for table: {}, err: {}.",
message, e);
}
} else {
break;
}
}
});

Ok(ObTableClient {
inner: Arc::new(ObTableClientInner::internal_new(
self.param_url,
self.full_user_name,
self.password,
self.user_name,
self.tenant_name,
self.cluster_name,
self.database,
self.running_mode,
self.config,
runtimes,
)?),
inner: inner_client,
refresh_thread_pool: Arc::new(
ScheduledThreadPool::builder()
.num_threads(2)
.thread_name_pattern("RefreshMetaThread-")
.thread_name_pattern("RefreshMetaThread")
.build(),
),
})
Expand Down
14 changes: 14 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ impl Error {
pub fn need_refresh_table(&self) -> bool {
if let Error::Common(CommonErrCode::ObException(code), _) = self {
code.need_refresh_table()
} else if let Error::Common(CommonErrCode::ConnPool, message) = self {
// conn_pool will produced this error if all connection to a server is shutdown
// which means we need refresh
return message.ends_with("are all removed");
} else {
false
}
}

pub fn need_invalidate_table(&self) -> bool {
if let Error::Common(CommonErrCode::PartitionError, message) = self {
// Location::get_table_location_from_remote will produce this error if the table
// is dropped
message.starts_with("Location::get_table_location_from_remote: Table maybe dropped.")
} else {
false
}
Expand Down
Loading

0 comments on commit d2ab15c

Please sign in to comment.