Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Filter & Log Stream Operation #88

Closed
wants to merge 16 commits into from
2 changes: 2 additions & 0 deletions docs/test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ CREATE TABLE `TEST_TABLE_BATCH_KEY` (
`c1` varchar(20) NOT NULL,
`c1sk` varchar(20) NOT NULL,
`c2` varchar(20) DEFAULT NULL,
`c3` bigint DEFAULT 0,
PRIMARY KEY (`c1`, `c1sk`)
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'lz4_1.0' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10
partition by key(`c1`) partitions 16;
Expand Down Expand Up @@ -157,6 +158,7 @@ CREATE TABLE `TEST_TABLE_BATCH_HASH` (
`c1` bigint NOT NULL,
`c1sk` varchar(20) NOT NULL,
`c2` varchar(20) DEFAULT NULL,
`c3` bigint DEFAULT 0,
PRIMARY KEY (`c1`, `c1sk`)) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'lz4_1.0' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10
partition by hash(`c1`) partitions 16;

Expand Down
264 changes: 264 additions & 0 deletions src/client/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
/*-
* #%L
* OBKV Table Client Framework
* %%
* Copyright (C) 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the
* Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
* KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

#![allow(dead_code)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to disable dead_code and unused_macros check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

#![allow(unused_macros)]
use std::{any::Any, fmt::Write};

const TABLE_COMPARE_FILTER_PREFIX: &str = "TableCompareFilter";

pub trait Filter: Any {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the rust, there are two ways to achieve the polymorphism:

  • trait object
  • enum

And here I find the as_any is used for downcasting, and it's not a good pattern when using trait object. And if the details of the underlying implementation are necessary, enum may be a good choice:

pub enum Filter {
     Value(ObTableValueFilter),
     List(ObTableFilterList),
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

fn as_any(&self) -> &dyn Any;
fn string(&self) -> String;
fn concat_string(&self, filter_string: &mut String);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about such names:

Suggested change
fn string(&self) -> String;
fn concat_string(&self, filter_string: &mut String);
/// Encode the filter as string.
fn encode(&self) -> String;
/// Encode the filter to the buffer.
fn encode_to&self, buffer: &mut String);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ObCompareOperator {
LessThan = 0,
GreaterThan = 1,
LessOrEqualThan = 2,
GreaterOrEqualThan = 3,
NotEqual = 4,
Equal = 5,
IsNull = 6,
IsNotNull = 7,
}

impl ObCompareOperator {
pub fn string(&self) -> &'static str {
match self {
ObCompareOperator::LessThan => "<",
ObCompareOperator::GreaterThan => ">",
ObCompareOperator::LessOrEqualThan => "<=",
ObCompareOperator::GreaterOrEqualThan => ">=",
ObCompareOperator::NotEqual => "!=",
ObCompareOperator::Equal => "=",
ObCompareOperator::IsNull => "IS",
ObCompareOperator::IsNotNull => "IS_NOT",
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FilterOp {
And = 0,
Or = 1,
}

pub struct ObTableFilterList {
pub op: FilterOp,
pub filters: Vec<Box<dyn Filter>>,
}

#[macro_export]
macro_rules! filter_list {
($op:expr, $($filter:expr),+ $(,)?) => {
ObTableFilterList {
op: $op,
filters: vec![$(Box::new($filter) as Box<dyn Filter>),+],
}
};
}

impl Filter for ObTableFilterList {
fn as_any(&self) -> &dyn Any {
self
}

fn string(&self) -> String {
let string_op = match self.op {
FilterOp::And => " && ",
FilterOp::Or => " || ",
};

// Use an iterator with map and collect to efficiently concatenate strings
self.filters
.iter()
.map(|filter| {
let filter_string = filter.string();
if filter.as_any().is::<ObTableValueFilter>() {
filter_string
} else {
format!("({})", filter_string)
}
})
.collect::<Vec<_>>()
.join(string_op)
}

fn concat_string(&self, filter_string: &mut String) {
let string_op = match self.op {
FilterOp::And => " && ",
FilterOp::Or => " || ",
};

for (i, filter) in self.filters.iter().enumerate() {
if i != 0 {
filter_string.push_str(string_op);
}
if filter.as_any().is::<ObTableValueFilter>() {
filter.concat_string(filter_string);
} else {
filter_string.push('(');
filter.concat_string(filter_string);
filter_string.push(')');
}
}
}
}

#[derive(Debug, Clone)]
pub struct ObTableValueFilter {
pub op: ObCompareOperator,
pub column_name: String,
pub value: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filter only supports String column filtering (in rust it must be encoded as valid utf-8)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use string to pass the message to the server. The OB Server will parse this string to extract the filter ( Filter may be refactored in the future). Although I considered using T to store the value of this filter, the complexity of the existing code necessitates the continued use of a string for this purpose. ( Since we only support ObTableValueFilter on numeric type and string type, using string to store the value of filter is considerable)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only support ObTableValueFilter on numeric type and string type

If so, I guess some comments about this should be added to this struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

}

#[macro_export]
macro_rules! value_filter {
($op:expr, $column_name:expr, $value:expr) => {
ObTableValueFilter {
op: $op,
column_name: $column_name.to_string(),
value: $value.to_string(),
}
};
}
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved

impl Filter for ObTableValueFilter {
fn as_any(&self) -> &dyn Any {
self
}

fn string(&self) -> String {
if self.column_name.is_empty() {
return String::new();
}
format!(
"{}({},'{}:{}')",
TABLE_COMPARE_FILTER_PREFIX,
self.op.string(),
self.column_name,
self.value
)
}

fn concat_string(&self, filter_string: &mut String) {
if !self.column_name.is_empty() {
if let Err(e) = write!(
filter_string,
"{}({},'{}:{}')",
TABLE_COMPARE_FILTER_PREFIX,
self.op.string(),
self.column_name,
self.value
) {
warn!("Failed to write to filter_string: {}", e);
}
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_value_filter_micro() {
let op = ObCompareOperator::Equal;
let column_name = "column";
let string_column_name = "string_column".to_string();

// create ObTableValueFilter by micro rules
let filter_i16 = value_filter!(op.clone(), column_name, 51i16);
let filter_i32 = value_filter!(op.clone(), string_column_name, 51i32);
let filter_i64 = value_filter!(op.clone(), column_name, 51i64);
let filter_u16 = value_filter!(op.clone(), string_column_name, 51u16);
let filter_u32 = value_filter!(op.clone(), column_name, 51u32);
let filter_u64 = value_filter!(op.clone(), string_column_name, 51u64);
let filter_f32 = value_filter!(op.clone(), column_name, 51.0f32);
let filter_f64 = value_filter!(op.clone(), string_column_name, 51.0f64);
let filter_string = value_filter!(op.clone(), column_name, "51".to_string());
let filter_str = value_filter!(op.clone(), string_column_name, "51");

println!("{:?}", filter_i16.string());
println!("{:?}", filter_i32.string());
println!("{:?}", filter_i64.string());
println!("{:?}", filter_u16.string());
println!("{:?}", filter_u32.string());
println!("{:?}", filter_u64.string());
println!("{:?}", filter_f32.string());
println!("{:?}", filter_f64.string());
println!("{:?}", filter_string.string());
println!("{:?}", filter_str.string());
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!("TableCompareFilter(=,'column:51')", filter_i16.string());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_i32.string()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_i64.string());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_u16.string()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_u32.string());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_u64.string()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_f32.string());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_f64.string()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_string.string());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_str.string()
);
}

#[test]
fn test_filter_list() {
let column_name = "column";

let filter_list_0 = filter_list!(
FilterOp::And,
value_filter!(ObCompareOperator::Equal, column_name, "0"),
value_filter!(ObCompareOperator::GreaterThan, column_name, "1")
);
let filter_list_component = filter_list!(
FilterOp::And,
value_filter!(ObCompareOperator::Equal, column_name, 2),
value_filter!(ObCompareOperator::GreaterThan, column_name, "3")
);
let filter_list_1 = filter_list!(
FilterOp::Or,
filter_list_component,
value_filter!(ObCompareOperator::GreaterThan, column_name, "4")
);

println!("{:?}", filter_list_0.string());
println!("{:?}", filter_list_1.string());
assert_eq!(
"TableCompareFilter(=,'column:0') && TableCompareFilter(>,'column:1')",
filter_list_0.string()
);
assert_eq!("(TableCompareFilter(=,'column:2') && TableCompareFilter(>,'column:3')) || TableCompareFilter(>,'column:4')", filter_list_1.string());
}
}
2 changes: 2 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use std::{collections::HashMap, time::Duration};

use crate::{rpc::protocol::DEFAULT_FLAG, serde_obkv::value::Value};

pub mod filter;
mod ocp;
pub mod query;
pub mod table;
pub mod table_client;

use self::table::ObTable;

#[derive(Clone, Debug)]
Expand Down
Loading