Skip to content

Commit

Permalink
[Fix] review
Browse files Browse the repository at this point in the history
  • Loading branch information
IHEII committed Jan 17, 2024
1 parent 75057f3 commit 43979a5
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 168 deletions.
194 changes: 106 additions & 88 deletions src/client/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,39 @@
* #L%
*/

use std::{any::Any, fmt::Write};
use std::fmt::Write;

const TABLE_COMPARE_FILTER_PREFIX: &str = "TableCompareFilter";

pub trait Filter: Any {
fn as_any(&self) -> &dyn Any;
fn string(&self) -> String;
fn concat_string(&self, filter_string: &mut String);
pub trait FilterEncoder {
/// Encode the filter as string.
fn encode(&self) -> String;

/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String);
}

pub enum Filter {
Value(ObTableValueFilter),
List(ObTableFilterList),
}

impl FilterEncoder for Filter {
/// Encode the filter as string.
fn encode(&self) -> String {
match self {
Filter::List(filter) => filter.encode(),
Filter::Value(filter) => filter.encode(),
}
}

/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String) {
match self {
Filter::List(filter) => filter.encode_to(buffer),
Filter::Value(filter) => filter.encode_to(buffer),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -60,48 +85,35 @@ pub enum FilterOp {

pub struct ObTableFilterList {
pub op: FilterOp,
pub filters: Vec<Box<dyn Filter>>,
}

#[macro_export]
macro_rules! filter_list {
($op:expr, $($filter:expr),+ $(,)?) => {
ObTableFilterList {
op: $op,
filters: vec![$(Box::new($filter) as Box<dyn Filter>),+],
}
};
pub filters: Vec<Filter>,
}

impl ObTableFilterList {
pub fn new<I>(op: FilterOp, filters: I) -> Self
where
I: IntoIterator<Item = Box<dyn Filter>>,
I: IntoIterator<Item = Filter>,
{
ObTableFilterList {
op,
filters: filters.into_iter().collect(),
}
}

pub fn add_filter(&mut self, filter: Box<dyn Filter>) {
pub fn add_filter(&mut self, filter: Filter) {
self.filters.push(filter)
}

pub fn add_filters<I>(&mut self, filters: I)
where
I: IntoIterator<Item = Box<dyn Filter>>,
I: IntoIterator<Item = Filter>,
{
self.filters.extend(filters.into_iter())
}
}

impl Filter for ObTableFilterList {
fn as_any(&self) -> &dyn Any {
self
}

fn string(&self) -> String {
impl FilterEncoder for ObTableFilterList {
/// Encode the filter as string.
fn encode(&self) -> String {
let string_op = match self.op {
FilterOp::And => " && ",
FilterOp::Or => " || ",
Expand All @@ -111,61 +123,63 @@ impl Filter for ObTableFilterList {
self.filters
.iter()
.map(|filter| {
let filter_string = filter.string();
if filter.as_any().is::<ObTableValueFilter>() {
filter_string
} else {
format!("({})", filter_string)
let filter_string = filter.encode();
match filter {
Filter::List(_) => {
format!("({})", filter_string)
}
Filter::Value(_) => filter_string,
}
})
.collect::<Vec<_>>()
.join(string_op)
}

fn concat_string(&self, filter_string: &mut String) {
/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String) {
let string_op = match self.op {
FilterOp::And => " && ",
FilterOp::Or => " || ",
};

for (i, filter) in self.filters.iter().enumerate() {
if i != 0 {
filter_string.push_str(string_op);
buffer.push_str(string_op);
}
if filter.as_any().is::<ObTableValueFilter>() {
filter.concat_string(filter_string);
} else {
filter_string.push('(');
filter.concat_string(filter_string);
filter_string.push(')');
match filter {
Filter::List(filter_list) => {
buffer.push('(');
filter_list.encode_to(buffer);
buffer.push(')');
}
Filter::Value(value_filter) => value_filter.encode_to(buffer),
}
}
}
}

#[derive(Debug, Clone)]
/// Only support [`ObTableValueFilter`] on numeric type and string type
/// The value will be encoded into string and will be parsed into filter in the server
pub struct ObTableValueFilter {
pub op: ObCompareOperator,
pub column_name: String,
pub value: String,
}

impl ObTableValueFilter {
pub fn new<U: ToString, V: ToString>(op: ObCompareOperator, column_name: U, value: V) -> Self {
pub fn new<V: ToString>(op: ObCompareOperator, column_name: String, value: V) -> Self {
ObTableValueFilter {
op,
column_name: column_name.to_string(),
column_name,
value: value.to_string(),
}
}
}

impl Filter for ObTableValueFilter {
fn as_any(&self) -> &dyn Any {
self
}

fn string(&self) -> String {
impl FilterEncoder for ObTableValueFilter {
/// Encode the filter as string.
fn encode(&self) -> String {
if self.column_name.is_empty() {
return String::new();
}
Expand All @@ -178,10 +192,11 @@ impl Filter for ObTableValueFilter {
)
}

fn concat_string(&self, filter_string: &mut String) {
/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String) {
if !self.column_name.is_empty() {
if let Err(e) = write!(
filter_string,
buffer,
"{}({},'{}:{}')",
TABLE_COMPARE_FILTER_PREFIX,
self.op.string(),
Expand All @@ -205,41 +220,43 @@ mod test {
let string_column_name = "string_column";

// create ObTableValueFilter by micro rules
let filter_i16 = ObTableValueFilter::new(op.clone(), column_name, 51i16);
let filter_i32 = ObTableValueFilter::new(op.clone(), string_column_name, 51i32);
let filter_i64 = ObTableValueFilter::new(op.clone(), column_name, 51i64);
let filter_u16 = ObTableValueFilter::new(op.clone(), string_column_name, 51u16);
let filter_u32 = ObTableValueFilter::new(op.clone(), column_name, 51u32);
let filter_u64 = ObTableValueFilter::new(op.clone(), string_column_name, 51u64);
let filter_f32 = ObTableValueFilter::new(op.clone(), column_name, 51.0f32);
let filter_f64 = ObTableValueFilter::new(op.clone(), string_column_name, 51.0f64);
let filter_string = ObTableValueFilter::new(op.clone(), column_name, "51".to_string());
let filter_str = ObTableValueFilter::new(op.clone(), string_column_name, "51");
let filter_i16 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51i16);
let filter_i32 = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51i32);
let filter_i64 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51i64);
let filter_u16 = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51u16);
let filter_u32 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51u32);
let filter_u64 = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51u64);
let filter_f32 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51.0f32);
let filter_f64 =
ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51.0f64);
let filter_string =
ObTableValueFilter::new(op.clone(), column_name.to_string(), "51".to_string());
let filter_str = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), "51");

assert_eq!("TableCompareFilter(=,'column:51')", filter_i16.string());
assert_eq!("TableCompareFilter(=,'column:51')", filter_i16.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_i32.string()
filter_i32.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_i64.string());
assert_eq!("TableCompareFilter(=,'column:51')", filter_i64.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_u16.string()
filter_u16.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_u32.string());
assert_eq!("TableCompareFilter(=,'column:51')", filter_u32.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_u64.string()
filter_u64.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_f32.string());
assert_eq!("TableCompareFilter(=,'column:51')", filter_f32.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_f64.string()
filter_f64.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_string.string());
assert_eq!("TableCompareFilter(=,'column:51')", filter_string.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_str.string()
filter_str.encode()
);
}

Expand All @@ -249,44 +266,45 @@ mod test {
let filter_list_0 = ObTableFilterList::new(
FilterOp::And,
vec![
Box::new(ObTableValueFilter::new(
Filter::Value(ObTableValueFilter::new(
ObCompareOperator::Equal,
column_name,
column_name.to_string(),
"0",
)) as Box<dyn Filter>,
Box::new(ObTableValueFilter::new(
)),
Filter::Value(ObTableValueFilter::new(
ObCompareOperator::GreaterThan,
column_name,
column_name.to_string(),
"1",
)) as Box<dyn Filter>,
)),
],
);
let mut filter_list_component = ObTableFilterList::new(
FilterOp::And,
vec![Box::new(ObTableValueFilter::new(
vec![Filter::Value(ObTableValueFilter::new(
ObCompareOperator::Equal,
column_name,
column_name.to_string(),
2,
)) as Box<dyn Filter>],
))],
);
filter_list_component.add_filter(Box::new(ObTableValueFilter::new(
filter_list_component.add_filter(Filter::Value(ObTableValueFilter::new(
ObCompareOperator::GreaterThan,
column_name,
column_name.to_string(),
"3",
)) as Box<dyn Filter>);
let mut filter_list_1 = filter_list!(FilterOp::Or, filter_list_component,);
filter_list_1.add_filters(vec![Box::new(ObTableValueFilter::new(
)));
let mut filter_list_1 =
ObTableFilterList::new(FilterOp::Or, vec![Filter::List(filter_list_component)]);
filter_list_1.add_filters(vec![Filter::Value(ObTableValueFilter::new(
ObCompareOperator::GreaterThan,
column_name,
column_name.to_string(),
"4",
)) as Box<dyn Filter>]);
))]);

println!("{:?}", filter_list_0.string());
println!("{:?}", filter_list_1.string());
println!("{:?}", filter_list_0.encode());
println!("{:?}", filter_list_1.encode());
assert_eq!(
"TableCompareFilter(=,'column:0') && TableCompareFilter(>,'column:1')",
filter_list_0.string()
filter_list_0.encode()
);
assert_eq!("(TableCompareFilter(=,'column:2') && TableCompareFilter(>,'column:3')) || TableCompareFilter(>,'column:4')", filter_list_1.string());
assert_eq!("(TableCompareFilter(=,'column:2') && TableCompareFilter(>,'column:3')) || TableCompareFilter(>,'column:4')", filter_list_1.encode());
}
}
8 changes: 4 additions & 4 deletions src/rpc/protocol/payloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use bytes::{Buf, BufMut, BytesMut};
use super::{
BasePayLoad, ObPayload, ObTablePacketCode, ProtoDecoder, ProtoEncoder, Result, TraceId,
};
use crate::filter::FilterEncoder;
use crate::{
client::filter::Filter,
location::OB_INVALID_ID,
query::{ObNewRange, ObTableQuery},
rpc::protocol::{
Expand Down Expand Up @@ -725,12 +725,12 @@ impl ObTableBatchOperation {
/// whether meet the filter and execute the insertUp
/// if check_exist is true: check if any data meet the filter
/// if check_exist is false: check if all data do not meet the filter
pub fn check_and_insert_up(
pub fn check_and_insert_up<T: FilterEncoder>(
&mut self,
row_keys: Vec<Value>,
columns: Vec<String>,
properties: Vec<Value>,
filter: Box<dyn Filter>,
filter: T,
check_exist: bool,
) {
let mut option = RawObTableOperationFlag::new();
Expand All @@ -741,7 +741,7 @@ impl ObTableBatchOperation {
row_keys,
Some(columns),
Some(properties),
Some(filter.string()),
Some(filter.encode()),
Some(option),
))
}
Expand Down
Loading

0 comments on commit 43979a5

Please sign in to comment.