Skip to content

Commit

Permalink
[Feat] Log Stream operation (phase 2)
Browse files Browse the repository at this point in the history
  • Loading branch information
IHEII committed Feb 22, 2024
1 parent 2ab4571 commit 3b5c031
Show file tree
Hide file tree
Showing 12 changed files with 1,073 additions and 150 deletions.
2 changes: 1 addition & 1 deletion src/client/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ impl FilterEncoder for ObTableFilterList {
}
}

#[derive(Debug, Clone)]
/// Only support [`ObTableValueFilter`] on numeric type and string type
/// The value will be encoded into string and will be parsed into filter in the server
#[derive(Debug, Clone)]
pub struct ObTableValueFilter {
pub op: ObCompareOperator,
pub column_name: String,
Expand Down
48 changes: 42 additions & 6 deletions src/client/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
use std::{fmt::Formatter, time::Duration};

use super::{ClientConfig, TableOpResult};
use crate::payloads::ObTableOperationType::CheckAndInsertUp;
use crate::{
error::{CommonErrCode, Error::Common as CommonErr, Result},
location::OB_INVALID_ID,
payloads::ObTableOperationType::InsertOrUpdate,
rpc::{
protocol::{codes::ResultCodes, lsop::*, payloads::*, ObPayload},
proxy::Proxy,
Expand Down Expand Up @@ -70,7 +70,7 @@ impl ObTable {
/// Execute batch operation
pub async fn execute_batch(
&self,
_table_name: &str,
table_name: &str,
mut batch_op: ObTableBatchOperation,
) -> Result<Vec<TableOpResult>> {
// check Log Stream Operation
Expand All @@ -85,7 +85,7 @@ impl ObTable {

// check operation type
for op in batch_op.get_ops() {
if op.get_type() != InsertOrUpdate {
if op.get_type() != CheckAndInsertUp {
return Err(CommonErr(
CommonErrCode::InvalidParam,
"All operations should be InsertOrUpdate if we use filter now".to_owned(),
Expand All @@ -95,13 +95,24 @@ impl ObTable {

// generate ObTableTabletOp from batch operation
let mut tablet_op = batch_op.generate_tablet_ops();
tablet_op.set_table_id(batch_op.table_id());
tablet_op.set_partition_id(batch_op.partition_id());

// construct ObTableLSOperation
let mut ls_option = ObTableLSOpFlag::default();
ls_option.set_flag_is_same_type(true);
let ls_op = ObTableLSOperation::internal_new(OB_INVALID_ID, ls_option, vec![tablet_op]);
let mut ls_op = ObTableLSOperation::internal_new(
OB_INVALID_ID,
table_name.to_string(),
batch_op.table_id(),
Vec::new(),
Vec::new(),
ls_option,
Vec::new(),
);
ls_op.add_op(tablet_op);

// adjust ObTableLSOperation
ls_op.prepare();

let mut payload = ObTableLSOpRequest::new(
ls_op,
Expand Down Expand Up @@ -229,6 +240,31 @@ fn process_op_results(op_results: Vec<ObTableOperationResult>) -> Result<Vec<Tab
Ok(results)
}

fn process_single_op_results(op_results: Vec<ObTableSingleOpResult>) -> Result<Vec<TableOpResult>> {
let mut results = Vec::with_capacity(op_results.len());
for op_res in op_results {
let error_no = op_res.header().errorno();
let result_code = ResultCodes::from_i32(error_no);

if result_code == ResultCodes::OB_SUCCESS {
let table_op_result = if op_res.operation_type() == ObTableOperationType::Get {
TableOpResult::RetrieveRows(op_res.take_entity().take_properties())
} else {
TableOpResult::AffectedRows(op_res.affected_rows())
};
results.push(table_op_result);
} else {
return Err(CommonErr(
CommonErrCode::ObException(result_code),
format!(
"OBKV server return exception in log stream operations response: {op_res:?}."
),
));
}
}
Ok(results)
}

impl From<ObTableBatchOperationResult> for Result<Vec<TableOpResult>> {
fn from(batch_result: ObTableBatchOperationResult) -> Result<Vec<TableOpResult>> {
let op_results = batch_result.take_op_results();
Expand All @@ -239,6 +275,6 @@ impl From<ObTableBatchOperationResult> for Result<Vec<TableOpResult>> {
impl From<ObTableLSOpResult> for Result<Vec<TableOpResult>> {
fn from(ls_result: ObTableLSOpResult) -> Result<Vec<TableOpResult>> {
let op_results = ls_result.take_op_results();
process_op_results(op_results)
process_single_op_results(op_results)
}
}
2 changes: 1 addition & 1 deletion src/client/table_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1641,7 +1641,7 @@ impl ObTableClient {
// we need origin_idx to recover the result
let mut part_batch_ops = HashMap::with_capacity(1);
for (idx, op) in batch_op.take_raw_ops().into_iter().enumerate() {
let phy_id = self.inner.get_partition(&table_entry, &op.1)?;
let phy_id = self.inner.get_partition(&table_entry, &op.2)?;
let (idx_vec, batch_op) = part_batch_ops
.entry(phy_id)
.or_insert_with(|| (Vec::new(), ObTableBatchOperation::new()));
Expand Down
5 changes: 5 additions & 0 deletions src/monitors/client_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub enum ObClientOpRecordType {
Batch = 8,
Query = 9,
StreamQuery = 10,
CheckAndDo = 11,
Invalid = 12,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
Expand All @@ -57,6 +59,9 @@ impl From<ObTableOperationType> for ObClientOpRecordType {
ObTableOperationType::Replace => ObClientOpRecordType::Replace,
ObTableOperationType::Increment => ObClientOpRecordType::Increment,
ObTableOperationType::Append => ObClientOpRecordType::Append,
ObTableOperationType::CheckAndInsertUp => ObClientOpRecordType::CheckAndDo,
ObTableOperationType::Invalid => ObClientOpRecordType::Invalid,
_ => unimplemented!(),
}
}
}
Expand Down
Loading

0 comments on commit 3b5c031

Please sign in to comment.