Skip to content

Commit

Permalink
[Feat] support 4.x observer (#86)
Browse files Browse the repository at this point in the history
* [Feat] support route 4.x

* [Feat] support 4.x

* [Fix] cargo soft

* [Fix] table loc & err no

* [Fix] trace id & default atomic flag

* [Fix] empty rowkey exception

* [Fix] Value::from(None)

* [Fix] from(()) & err msg & exec sql in test

* [Fix] op err without peer addr

* [Fix] err message

* [Fix] refresh error code

* [Fix] review

* [Fix] review

* [Fix] review

* [Fix] review
  • Loading branch information
IHEII authored Dec 31, 2023
1 parent 5adbe29 commit 0bb657b
Show file tree
Hide file tree
Showing 22 changed files with 1,556 additions and 664 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ prometheus-client = { workspace = true }
quick-error = "1.2"
r2d2 = "0.8.3"
rand = "0.8"
regex = "1.7"
reqwest = { version = "0.11.13", default-features = false, features = ["rustls-tls", "blocking"] }
scheduled-thread-pool = "0.2"
serde = "1.0"
Expand Down
13 changes: 7 additions & 6 deletions src/client/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
/// Query API for ob table
use super::ObTable;
use crate::{
client::table_client::{StreamQuerier, OBKV_CLIENT_METRICS},
client::table_client::{PartInfo, StreamQuerier, OBKV_CLIENT_METRICS},
error::{CommonErrCode, Error::Common as CommonErr, Result},
rpc::protocol::{
payloads::ObTableEntityType,
Expand All @@ -51,7 +51,7 @@ pub struct QueryStreamResult {
operation_timeout: Option<Duration>,
table_name: String,
entity_type: ObTableEntityType,
expectant: HashMap<i64, (i64, Arc<ObTable>)>,
expectant: HashMap<i64, (PartInfo, Arc<ObTable>)>,
cache_properties: Vec<String>,
cache_rows: VecDeque<Vec<Value>>,
partition_last_result: PartitionQueryResultDeque,
Expand Down Expand Up @@ -89,11 +89,12 @@ impl QueryStreamResult {

async fn refer_to_new_partition(
&mut self,
(part_id, ob_table): (i64, Arc<ObTable>),
(part_info, ob_table): (PartInfo, Arc<ObTable>),
) -> Result<i64> {
let mut req = ObTableQueryRequest::new(
&self.table_name,
part_id,
part_info.table_id,
part_info.part_id,
self.entity_type.to_owned(),
self.table_query.to_owned(),
self.operation_timeout
Expand All @@ -104,7 +105,7 @@ impl QueryStreamResult {
let result = self
.querier
.clone()
.execute_query(self, (part_id, ob_table), &mut req)
.execute_query(self, (part_info.part_id, ob_table), &mut req)
.await;

if result.is_err() {
Expand Down Expand Up @@ -151,7 +152,7 @@ impl QueryStreamResult {
self.table_name = table_name.to_owned();
}

pub fn set_expectant(&mut self, expectant: HashMap<i64, (i64, Arc<ObTable>)>) {
pub fn set_expectant(&mut self, expectant: HashMap<i64, (PartInfo, Arc<ObTable>)>) {
self.expectant = expectant;
}

Expand Down
Loading

0 comments on commit 0bb657b

Please sign in to comment.