Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Log Stream operation #93

Merged
merged 21 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ crossbeam = "0.8.2"
futures = "0.1"
futures-cpupool = "0.1"
lazy_static = "1.3"
linked-hash-map = "0.5"
log = { workspace = true }
murmur2 = "0.1"
mysql = { version = "24.0.0", default-features = false, features = ["default-rustls"] }
Expand Down
16 changes: 13 additions & 3 deletions docs/test_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ CREATE TABLE `TEST_TABLE_BATCH_KEY` (
`c1` varchar(20) NOT NULL,
`c1sk` varchar(20) NOT NULL,
`c2` varchar(20) DEFAULT NULL,
`c3` bigint DEFAULT 0,
PRIMARY KEY (`c1`, `c1sk`)
) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'lz4_1.0' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10
partition by key(`c1`) partitions 16;
Expand Down Expand Up @@ -154,12 +155,21 @@ CREATE TABLE `TEST_STREAM_QUERY_TABLE_HASH` (
partition by hash(c1) partitions 16;

CREATE TABLE `TEST_TABLE_BATCH_HASH` (
`c1` bigint NOT NULL,
`c1sk` varchar(20) NOT NULL,
`c2` varchar(20) DEFAULT NULL,
`c1` bigint NOT NULL,
`c1sk` varchar(20) NOT NULL,
`c2` varchar(20) DEFAULT NULL,
`c3` bigint DEFAULT 0,
PRIMARY KEY (`c1`, `c1sk`)) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'lz4_1.0' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10
partition by hash(`c1`) partitions 16;

CREATE TABLE `TEST_UINT_FILTER` (
`c1` tinyint unsigned DEFAULT 0,
`c2` smallint unsigned DEFAULT 0,
`c3` int unsigned DEFAULT 0,
`c4` bigint unsigned DEFAULT 0,
PRIMARY KEY (`c1`)) DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'lz4_1.0' REPLICA_NUM = 3 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10
partition by hash(`c1`) partitions 16;

CREATE TABLE `test_varchar_table` (
`c1` varchar(20) NOT NULL,
`c2` varchar(20) DEFAULT NULL,
Expand Down
308 changes: 308 additions & 0 deletions src/client/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
/*-
* #%L
* OBKV Table Client Framework
* %%
* Copyright (C) 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the
* Mulan PSL v2. You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY
* KIND, EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

use std::fmt::Write;

const TABLE_COMPARE_FILTER_PREFIX: &str = "TableCompareFilter";

pub trait FilterEncoder {
/// Encode the filter as string.
fn encode(&self) -> String;

/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String);
}

pub enum Filter {
Value(ObTableValueFilter),
List(ObTableFilterList),
}

impl FilterEncoder for Filter {
/// Encode the filter as string.
fn encode(&self) -> String {
match self {
Filter::List(filter) => filter.encode(),
Filter::Value(filter) => filter.encode(),
}
}

/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String) {
match self {
Filter::List(filter) => filter.encode_to(buffer),
Filter::Value(filter) => filter.encode_to(buffer),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ObCompareOperator {
LessThan = 0,
GreaterThan = 1,
LessOrEqualThan = 2,
GreaterOrEqualThan = 3,
NotEqual = 4,
Equal = 5,
IsNull = 6,
IsNotNull = 7,
}

impl ObCompareOperator {
pub fn string(&self) -> &'static str {
match self {
ObCompareOperator::LessThan => "<",
ObCompareOperator::GreaterThan => ">",
ObCompareOperator::LessOrEqualThan => "<=",
ObCompareOperator::GreaterOrEqualThan => ">=",
ObCompareOperator::NotEqual => "!=",
ObCompareOperator::Equal => "=",
ObCompareOperator::IsNull => "IS",
ObCompareOperator::IsNotNull => "IS_NOT",
}
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FilterOp {
And = 0,
Or = 1,
}

pub struct ObTableFilterList {
pub op: FilterOp,
pub filters: Vec<Filter>,
}

impl ObTableFilterList {
pub fn new<I>(op: FilterOp, filters: I) -> Self
where
I: IntoIterator<Item = Filter>,
{
ObTableFilterList {
op,
filters: filters.into_iter().collect(),
}
}

pub fn add_filter(&mut self, filter: Filter) {
self.filters.push(filter)
}

pub fn add_filters<I>(&mut self, filters: I)
where
I: IntoIterator<Item = Filter>,
{
self.filters.extend(filters.into_iter())
}
}

impl FilterEncoder for ObTableFilterList {
/// Encode the filter as string.
fn encode(&self) -> String {
let string_op = match self.op {
FilterOp::And => " && ",
FilterOp::Or => " || ",
};

// Use an iterator with map and collect to efficiently concatenate strings
self.filters
.iter()
.map(|filter| {
let filter_string = filter.encode();
match filter {
Filter::List(_) => {
format!("({})", filter_string)
}
Filter::Value(_) => filter_string,
}
})
.collect::<Vec<_>>()
.join(string_op)
}

/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String) {
let string_op = match self.op {
FilterOp::And => " && ",
FilterOp::Or => " || ",
};

for (i, filter) in self.filters.iter().enumerate() {
if i != 0 {
buffer.push_str(string_op);
}
match filter {
Filter::List(filter_list) => {
buffer.push('(');
filter_list.encode_to(buffer);
buffer.push(')');
}
Filter::Value(value_filter) => value_filter.encode_to(buffer),
}
}
}
}

/// Only support [`ObTableValueFilter`] on numeric type and string type
/// The value will be encoded into string and will be parsed into filter in the server
#[derive(Debug, Clone)]
pub struct ObTableValueFilter {
pub op: ObCompareOperator,
pub column_name: String,
pub value: String,
}

impl ObTableValueFilter {
pub fn new<V: ToString>(op: ObCompareOperator, column_name: String, value: V) -> Self {
ObTableValueFilter {
op,
column_name,
value: value.to_string(),
}
}
}

impl FilterEncoder for ObTableValueFilter {
/// Encode the filter as string.
fn encode(&self) -> String {
if self.column_name.is_empty() {
return String::new();
}
format!(
"{}({},'{}:{}')",
TABLE_COMPARE_FILTER_PREFIX,
self.op.string(),
self.column_name,
self.value
)
}

/// Encode the filter to the buffer.
fn encode_to(&self, buffer: &mut String) {
if !self.column_name.is_empty() {
if let Err(e) = write!(
buffer,
"{}({},'{}:{}')",
TABLE_COMPARE_FILTER_PREFIX,
self.op.string(),
self.column_name,
self.value
) {
warn!("Failed to write to filter_string: {}", e);
}
}
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_value_filter() {
let op = ObCompareOperator::Equal;
let column_name = "column";
let string_column_name = "string_column";

// create ObTableValueFilter by micro rules
let filter_i16 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51i16);
let filter_i32 = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51i32);
let filter_i64 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51i64);
let filter_u16 = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51u16);
let filter_u32 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51u32);
let filter_u64 = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51u64);
let filter_f32 = ObTableValueFilter::new(op.clone(), column_name.to_string(), 51.0f32);
let filter_f64 =
ObTableValueFilter::new(op.clone(), string_column_name.to_string(), 51.0f64);
let filter_string =
ObTableValueFilter::new(op.clone(), column_name.to_string(), "51".to_string());
let filter_str = ObTableValueFilter::new(op.clone(), string_column_name.to_string(), "51");

assert_eq!("TableCompareFilter(=,'column:51')", filter_i16.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_i32.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_i64.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_u16.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_u32.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_u64.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_f32.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_f64.encode()
);
assert_eq!("TableCompareFilter(=,'column:51')", filter_string.encode());
assert_eq!(
"TableCompareFilter(=,'string_column:51')",
filter_str.encode()
);
}

#[test]
fn test_filter_list() {
let column_name = "column";
let filter_list_0 = ObTableFilterList::new(
FilterOp::And,
vec![
Filter::Value(ObTableValueFilter::new(
ObCompareOperator::Equal,
column_name.to_string(),
"0",
)),
Filter::Value(ObTableValueFilter::new(
ObCompareOperator::GreaterThan,
column_name.to_string(),
"1",
)),
],
);
let mut filter_list_component = ObTableFilterList::new(
FilterOp::And,
vec![Filter::Value(ObTableValueFilter::new(
ObCompareOperator::Equal,
column_name.to_string(),
2,
))],
);
filter_list_component.add_filter(Filter::Value(ObTableValueFilter::new(
ObCompareOperator::GreaterThan,
column_name.to_string(),
"3",
)));
let mut filter_list_1 =
ObTableFilterList::new(FilterOp::Or, vec![Filter::List(filter_list_component)]);
filter_list_1.add_filters(vec![Filter::Value(ObTableValueFilter::new(
ObCompareOperator::GreaterThan,
column_name.to_string(),
"4",
))]);

assert_eq!(
"TableCompareFilter(=,'column:0') && TableCompareFilter(>,'column:1')",
filter_list_0.encode()
);
assert_eq!("(TableCompareFilter(=,'column:2') && TableCompareFilter(>,'column:3')) || TableCompareFilter(>,'column:4')", filter_list_1.encode());
}
}
2 changes: 2 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ use std::{collections::HashMap, time::Duration};

use crate::{rpc::protocol::DEFAULT_FLAG, serde_obkv::value::Value};

pub mod filter;
mod ocp;
pub mod query;
pub mod table;
pub mod table_client;

use self::table::ObTable;

#[derive(Clone, Debug)]
Expand Down
Loading
Loading