From 2ab45719743108f7d5c47d4fe671d2ffc4bf0c8f Mon Sep 17 00:00:00 2001 From: "zeli.lwb" Date: Mon, 5 Feb 2024 21:21:48 +0800 Subject: [PATCH] [Feat] Log Stream operation (phase 1) --- Cargo.lock | 7 + Cargo.toml | 1 + src/rpc/protocol/lsop.rs | 559 ++++++++++++++++++++++++++--------- src/rpc/protocol/payloads.rs | 51 +++- 4 files changed, 484 insertions(+), 134 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f8e5db..2b7280b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1154,6 +1154,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -1407,6 +1413,7 @@ dependencies = [ "futures 0.1.31", "futures-cpupool", "lazy_static", + "linked-hash-map", "log", "murmur2", "mysql", diff --git a/Cargo.toml b/Cargo.toml index 632397b..df5a090 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ crossbeam = "0.8.2" futures = "0.1" futures-cpupool = "0.1" lazy_static = "1.3" +linked-hash-map = "0.5" log = { workspace = true } murmur2 = "0.1" mysql = { version = "24.0.0", default-features = false, features = ["default-rustls"] } diff --git a/src/rpc/protocol/lsop.rs b/src/rpc/protocol/lsop.rs index 345e688..c0bce16 100644 --- a/src/rpc/protocol/lsop.rs +++ b/src/rpc/protocol/lsop.rs @@ -18,82 +18,66 @@ #![allow(dead_code)] use std::{io, time::Duration}; +use std::collections::HashMap; +use std::rc::Rc; use bytes::{BufMut, BytesMut}; +use linked_hash_map::LinkedHashMap; + +use crate::{location::OB_INVALID_ID, payloads::{ + ObTableConsistencyLevel, ObTableEntityType, ObTableOperationResult, +}, rpc::protocol::{ + BasePayLoad, ObPayload, ObTablePacketCode, + ProtoDecoder, ProtoEncoder, Result, +}, serde_obkv::util, util::duration_to_millis, Value}; +use crate::payloads::ObTableOperationType; +use crate::query::ObNewRange; +use crate::serde_obkv::util::decode_i8; +use crate::util::decode_value; + +/// Option flag for [`ObTableSingleOp`] +#[derive(Debug, Clone, PartialEq)] +pub struct ObTableSingleOpFlag { + flags: i64, +} -use crate::{ - location::OB_INVALID_ID, - payloads::{ - ObTableBatchOperation, ObTableConsistencyLevel, ObTableEntityType, ObTableOperationResult, - }, - query::ObTableQuery, - rpc::protocol::{ - query_and_mutate::ObTableQueryAndMutate, BasePayLoad, ObPayload, ObTablePacketCode, - ProtoDecoder, ProtoEncoder, - }, - serde_obkv::util, - util::duration_to_millis, -}; - -/// Type of Operation for [`ObTableSingleOp`] -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub enum ObTableSingleOpType { - SingleGet, - SingleInsert, - SingleDel, - SingleUpdate, - SingleInsertOrUpdate, - SingleReplace, - SingleIncrement, - SingleAppend, - SingleMax, // reserved - SyncQuery, - AsyncQuery, - QueryAndMutate, - SingleOpTypeMax, -} - -impl ObTableSingleOpType { - fn value(&self) -> i8 { - match self { - ObTableSingleOpType::SingleGet => 0, - ObTableSingleOpType::SingleInsert => 1, - ObTableSingleOpType::SingleDel => 2, - ObTableSingleOpType::SingleUpdate => 3, - ObTableSingleOpType::SingleInsertOrUpdate => 4, - ObTableSingleOpType::SingleReplace => 5, - ObTableSingleOpType::SingleIncrement => 6, - ObTableSingleOpType::SingleAppend => 7, - ObTableSingleOpType::SingleMax => 63, - ObTableSingleOpType::SyncQuery => 64, - ObTableSingleOpType::AsyncQuery => 65, - ObTableSingleOpType::QueryAndMutate => 66, - ObTableSingleOpType::SingleOpTypeMax => 67, - } - } -} - -impl From for ObTableSingleOpType { - fn from(value: i8) -> Self { - match value { - 0 => ObTableSingleOpType::SingleGet, - 1 => ObTableSingleOpType::SingleInsert, - 2 => ObTableSingleOpType::SingleDel, - 3 => ObTableSingleOpType::SingleUpdate, - 4 => ObTableSingleOpType::SingleInsertOrUpdate, - 5 => ObTableSingleOpType::SingleReplace, - 6 => ObTableSingleOpType::SingleIncrement, - 7 => ObTableSingleOpType::SingleAppend, - 63 => ObTableSingleOpType::SingleMax, - 64 => ObTableSingleOpType::SyncQuery, - 65 => ObTableSingleOpType::AsyncQuery, - 66 => ObTableSingleOpType::QueryAndMutate, - 67 => ObTableSingleOpType::SingleOpTypeMax, - _ => panic!("Invalid value for ObTableSingleOpType"), +impl Default for ObTableSingleOpFlag { + fn default() -> Self { + ObTableSingleOpFlag::new() + } +} + +impl ObTableSingleOpFlag { + const FLAG_IS_CHECK_NOT_EXISTS: i64 = 1 << 0; + + pub fn new() -> Self { + let mut flag = ObTableSingleOpFlag { flags: 0 }; + flag.set_flag_is_check_not_exist(false); + flag + } + + pub fn value(&self) -> i64 { + self.flags + } + + pub fn set_value(&mut self, flags: i64) { + self.flags = flags; + } + + pub fn set_flag_is_check_not_exist(&mut self, is_check_not_exist: bool) { + if is_check_not_exist { + self.flags |= Self::FLAG_IS_CHECK_NOT_EXISTS; + } else { + self.flags &= !Self::FLAG_IS_CHECK_NOT_EXISTS; } } + + pub fn is_check_not_exist(&self) -> bool { + (self.flags & Self::FLAG_IS_CHECK_NOT_EXISTS) != 0 + } } + /// Option flag for [`ObTableTabletOp`] #[derive(Debug, Clone, PartialEq)] pub struct ObTableTabletOpFlag { @@ -206,70 +190,310 @@ impl ObTableLSOpFlag { } } -/// Single operation in OBKV -/// [`ObTableSingleOp`] -> [`ObTableTabletOp`] -> [`ObTableLSOperation`] +/// Single operation query in OBKV +/// [`ObTableSingleOpQuery`]|[`ObTableSingleOpEntity`] -> [`ObTableSingleOp`] -> [`ObTableTabletOp`] -> [`ObTableLSOperation`] #[derive(Debug, Clone)] -pub struct ObTableSingleOp { +pub struct ObTableSingleOpQuery { base: BasePayLoad, - op_type: ObTableSingleOpType, - op: ObTableQueryAndMutate, + index_name: String, + scan_range_cols_bm: Vec, + key_ranges: Vec, + filter_string: String, + scan_range_len: i64, + scan_range_columns: Vec, + agg_column_names: Vec, } -impl Default for ObTableSingleOp { +impl Default for ObTableSingleOpQuery { fn default() -> Self { - ObTableSingleOp::new( - ObTableSingleOpType::SingleMax, - ObTableQueryAndMutate::dummy(), + ObTableSingleOpQuery::new(Vec::new(), Vec::new()) + } +} + +impl ObTableSingleOpQuery { + pub fn new(scan_range_columns: Vec, key_ranges: Vec,) -> Self { + Self { + base: BasePayLoad::dummy(), + index_name: String::new(), + scan_range_cols_bm: Vec::new(), + key_ranges, + filter_string: String::new(), + scan_range_len: 0i64, + scan_range_columns, + agg_column_names: Vec::new(), + } + } + + +} + +impl ObPayload for ObTableSingleOpQuery { + fn base(&self) -> &BasePayLoad { + &self.base + } + + fn base_mut(&mut self) -> &mut BasePayLoad { + &mut self.base + } + + // payload size, without header bytes + fn content_len(&self) -> Result { + let mut len: usize = 0; + len += util::encoded_length_vstring(&self.index_name); + len += util::encoded_length_vi64(self.scan_range_len); + len += self.scan_range_cols_bm.len(); + len += util::encoded_length_vi64(self.key_ranges.len() as i64); + for r in &self.key_ranges { + len += r.content_len()?; + } + len += util::encoded_length_vstring(&self.filter_string); + Ok(len) + } +} + +impl ProtoEncoder for ObTableSingleOpQuery { + fn encode(&self, buf: &mut BytesMut) -> Result<()> { + self.encode_header(buf)?; + // 1. encode index name + util::encode_vstring(&self.index_name, buf)?; + // 2. encode column name bitmap + util::encode_vi64(self.scan_range_len, buf)?; + for bm in &self.scan_range_cols_bm { + buf.put_i8(*bm); + } + // 3. encode key ranges + util::encode_vi64(self.key_ranges.len() as i64, buf)?; + for r in &self.key_ranges { + r.encode(buf)?; + } + // 4. encode filter string + util::encode_vstring(&self.filter_string, buf)?; + Ok(()) + } +} + +impl ProtoDecoder for ObTableSingleOpQuery { + fn decode(&mut self, _src: &mut BytesMut) -> Result<()> { + unimplemented!(); + } +} + +/// Single operation entity in OBKV +/// [`ObTableSingleOpQuery`]|[`ObTableSingleOpEntity`] -> [`ObTableSingleOp`] -> [`ObTableTabletOp`] -> [`ObTableLSOperation`] +#[derive(Debug, Clone)] +pub struct ObTableSingleOpEntity { + // encoded params + base: BasePayLoad, + row_key_bit_len: i64, + row_key_bitmap: Vec, + row_key: Vec, + properties_bit_len: i64, + properties_bitmap: Vec, + properties: Vec, + + // intermediate params + row_key_names: Vec, + properties_names: Vec, + agg_row_key_names: Option>>, + agg_properties_names: Option>>, + + // options + ignore_encode_properties_col_names: bool, +} + +impl Default for ObTableSingleOpEntity { + fn default() -> Self { + ObTableSingleOpEntity::new( + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), ) } } -impl ObTableSingleOp { - pub fn new(op_type: ObTableSingleOpType, op: ObTableQueryAndMutate) -> Self { +impl ObTableSingleOpEntity { + pub fn new(row_key_names: Vec, row_key: Vec, properties_names: Vec, properties: Vec,) -> Self { Self { base: BasePayLoad::dummy(), - op_type, - op, + row_key_bit_len: 0i64, + row_key_bitmap: Vec::new(), + row_key, + properties_bit_len: 0i64, + properties_bitmap: Vec::new(), + properties, + row_key_names, + properties_names, + agg_row_key_names: None, + agg_properties_names: None, + ignore_encode_properties_col_names: false, + } + } + + /// parse_bit_map is used by [`ProtoEncoder`] + /// Accept source byte, length and aggregation column names + /// Output bit map and corresponding column names + fn parse_bit_map(len: i64, agg_names: Rc>, src: &mut BytesMut, bit_map: &mut Vec, column_names: &mut Vec) { + bit_map.clear(); + column_names.clear(); + let bm_len = (len + 7) / 8; + for idx in 0..bm_len { + bit_map.push(decode_i8(src)?); + for bit_idx in 0usize..8 { + if bit_map[idx] & (1i8 << bit_idx) != 0 { + if idx * 8 + bit_idx < agg_names.len() as i64 { + column_names.push(agg_names[idx * 8 + bit_idx]); + } + } + } + } + } +} + +impl ObPayload for ObTableSingleOpEntity { + fn base(&self) -> &BasePayLoad { + &self.base + } + + fn base_mut(&mut self) -> &mut BasePayLoad { + &mut self.base + } + + // payload size, without header bytes + fn content_len(&self) -> Result { + let mut len: usize = 0; + len += util::encoded_length_vi64(self.row_key_bit_len); + len += self.row_key_bitmap.len(); + len += util::encoded_length_vi64(self.row_key.len() as i64); + for rk in &self.row_key { + len += rk.len(); } + if self.ignore_encode_properties_col_names { + len += util::encoded_length_vi64(0i64); + } else { + len += util::encoded_length_vi64(self.properties_bit_len); + len += self.properties_bitmap.len(); + } + len += util::encoded_length_vi64(self.properties.len() as i64); + for ppt in &self.properties { + len += ppt.len(); + } + Ok(len) } +} - pub fn mutations(&self) -> &Option { - self.op.mutations() +impl ProtoEncoder for ObTableSingleOpEntity { + fn encode(&self, buf: &mut BytesMut) -> Result<()> { + self.encode_header(buf)?; + // 1. encode rowkey bitmap + util::encode_vi64(self.row_key_bit_len, buf)?; + for bm in self.row_key_bitmap { + buf.put_i8(bm); + } + // 2. encode rowkey + util::encode_vi64(self.row_key.len() as i64, buf)?; + for key in &self.row_key { + key.encode(buf)?; + } + // 3. encode properties bitmap + if self.ignore_encode_properties_col_names { + util::encode_vi64(0i64, buf)?; + } else { + util::encode_vi64(self.properties_bit_len, buf)?; + for bm in self.properties_bitmap { + buf.put_i8(bm); + } + } + // 4. encode properties + util::encode_vi64(self.properties.len() as i64, buf)?; + for key in &self.properties { + key.encode(buf)?; + } + Ok(()) } +} + +impl ProtoDecoder for ObTableSingleOpEntity { + fn decode(&mut self, src: &mut BytesMut) -> Result<()> { + self.decode_base(src)?; - pub fn set_mutations(&mut self, mutations: ObTableBatchOperation) { - self.op.set_mutations(mutations) + // 1. row key bitmap + self.row_key_bit_len = util::decode_vi64(src)?; + self.parse_bit_map(self.row_key_bit_len, self.agg_row_key_names.unwrap_or_else(|| Rc::new(Vec::new())).clone(), src, &mut self.row_key_bitmap, &mut self.row_key_names); + + // 2. row key obobj + self.row_key.clear(); + let row_key_len = util::decode_vi64(src)?; + for _ in 0..row_key_len { + self.row_key.push(decode_value(src)?); + } + + // 3. properties bitmaps + self.properties_bit_len = util::decode_vi64(src)?; + self.parse_bit_map(self.properties_bit_len, self.agg_properties_names.unwrap_or_else(|| Rc::new(Vec::new())).clone(), src, &mut self.properties_bitmap, &mut self.properties_names); + + // 4. properties obobj + self.properties_names.clear(); + let properties_len = util::decode_vi64(src)?; + for _ in 0..properties_len { + self.properties.push(decode_value(src)?); + } + + Ok(()) } +} - pub fn query(&self) -> &Option { - self.op.query() +/// Single operation in OBKV +/// [`ObTableSingleOpQuery`]|[`ObTableSingleOpEntity`] -> [`ObTableSingleOp`] -> [`ObTableTabletOp`] -> [`ObTableLSOperation`] +#[derive(Debug, Clone)] +pub struct ObTableSingleOp { + base: BasePayLoad, + op_type: ObTableOperationType, + op_flag: ObTableSingleOpFlag, + query: Option, + entities: Vec, +} + +impl Default for ObTableSingleOp { + fn default() -> Self { + ObTableSingleOp::new( + ObTableOperationType::Invalid, + ) } +} - pub fn set_query(&mut self, query: ObTableQuery) { - self.op.set_query(query) +impl ObTableSingleOp { + pub fn new(op_type: ObTableOperationType) -> Self { + Self { + base: BasePayLoad::dummy(), + op_type, + op_flag: ObTableSingleOpFlag::new(), + query: None, + entities: Vec::new(), + } } - pub fn set_is_check_and_execute(&mut self, is_check_and_execute: bool) { - self.op.set_is_check_and_execute(is_check_and_execute) + pub fn query(&self) -> &Option { + &self.query } - pub fn is_check_and_execute(&self) -> bool { - self.op.is_check_and_execute() + pub fn set_query(&mut self, query: ObTableSingleOpQuery) { + self.query = Some(query) } pub fn set_is_check_not_exists(&mut self, is_check_not_exists: bool) { - self.op.set_is_check_not_exists(is_check_not_exists) + self.op_flag.set_flag_is_check_not_exist(is_check_not_exists) } pub fn is_check_not_exists(&self) -> bool { - self.op.is_check_not_exists() + self.op_flag.is_check_not_exists() } - pub fn single_op_type(&self) -> ObTableSingleOpType { + pub fn single_op_type(&self) -> ObTableOperationType { self.op_type } - pub fn set_single_op_type(&mut self, op_type: ObTableSingleOpType) { + pub fn set_single_op_type(&mut self, op_type: ObTableOperationType) { self.op_type = op_type } } @@ -284,25 +508,48 @@ impl ObPayload for ObTableSingleOp { } // payload size, without header bytes - fn content_len(&self) -> crate::rpc::protocol::Result { + fn content_len(&self) -> Result { let mut len: usize = 0; len += 1; // op type - len += self.op.len()?; + len += util::encoded_length_vi64(self.op_flag.value()); + if self.op_type.need_encode_query() { + len += self.query.map_or(0, |query| query.len()?); + } + len += util::encoded_length_vi64(self.entities.len() as i64); + for entity in &self.entities { + len += entity.len()?; + } Ok(len) } } impl ProtoEncoder for ObTableSingleOp { - fn encode(&self, buf: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn encode(&self, buf: &mut BytesMut) -> Result<()> { self.encode_header(buf)?; - buf.put_i8(self.op_type.value()); - self.op.encode(buf)?; + + // 1. op type + buf.put_i8(self.op_type as i8); + + // 2. op flag + util::encode_vi64(self.op_flag.value(), buf)?; + + // 3. single op query + if self.op_type.need_encode_query() { + self.query?.encode(buf)?; + } + + // 4. single op entity + util::encode_vi64(self.entities.len() as i64, buf)?; + for entity in &self.entities { + entity.encode(buf) + } + Ok(()) } } impl ProtoDecoder for ObTableSingleOp { - fn decode(&mut self, _src: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn decode(&mut self, _src: &mut BytesMut) -> Result<()> { unimplemented!(); } } @@ -312,7 +559,6 @@ impl ProtoDecoder for ObTableSingleOp { #[derive(Debug, Clone)] pub struct ObTableTabletOp { base: BasePayLoad, - table_id: i64, partition_id: i64, option_flag: ObTableTabletOpFlag, single_ops: Vec, @@ -326,14 +572,12 @@ impl Default for ObTableTabletOp { impl ObTableTabletOp { pub fn internal_new( - table_id: i64, partition_id: i64, option_flag: ObTableTabletOpFlag, single_ops: Vec, ) -> Self { Self { base: BasePayLoad::dummy(), - table_id, partition_id, option_flag, single_ops, @@ -342,7 +586,6 @@ impl ObTableTabletOp { pub fn new(ops_num: usize) -> Self { ObTableTabletOp::internal_new( - OB_INVALID_ID, 0, ObTableTabletOpFlag::default(), Vec::with_capacity(ops_num), @@ -361,14 +604,6 @@ impl ObTableTabletOp { self.single_ops = single_ops } - pub fn table_id(&self) -> i64 { - self.table_id - } - - pub fn set_table_id(&mut self, table_id: i64) { - self.table_id = table_id - } - pub fn partition_id(&self) -> i64 { self.partition_id } @@ -397,9 +632,8 @@ impl ObPayload for ObTableTabletOp { } // payload size, without header bytes - fn content_len(&self) -> crate::rpc::protocol::Result { + fn content_len(&self) -> Result { let mut len: usize = 0; - len += util::encoded_length_vi64(self.table_id); len += 8; // partition/tablet_id len += util::encoded_length_vi64(self.option_flag.value()); len += util::encoded_length_vi64(self.single_ops.len() as i64); @@ -411,9 +645,8 @@ impl ObPayload for ObTableTabletOp { } impl ProtoEncoder for ObTableTabletOp { - fn encode(&self, buf: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn encode(&self, buf: &mut BytesMut) -> Result<()> { self.encode_header(buf)?; - util::encode_vi64(self.table_id, buf)?; buf.put_i64(self.partition_id); util::encode_vi64(self.option_flag.value(), buf)?; util::encode_vi64(self.single_ops.len() as i64, buf)?; @@ -425,7 +658,7 @@ impl ProtoEncoder for ObTableTabletOp { } impl ProtoDecoder for ObTableTabletOp { - fn decode(&mut self, _src: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn decode(&mut self, _src: &mut BytesMut) -> Result<()> { unimplemented!(); } } @@ -470,19 +703,19 @@ impl ObPayload for ObTableTabletOpResult { } impl ProtoEncoder for ObTableTabletOpResult { - fn encode(&self, _buf: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn encode(&self, _buf: &mut BytesMut) -> Result<()> { unimplemented!(); } } impl ProtoDecoder for ObTableTabletOpResult { - fn decode(&mut self, src: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn decode(&mut self, src: &mut BytesMut) -> Result<()> { self.decode_base(src)?; let op_res_num = util::decode_vi64(src)?; if op_res_num < 0 { return Err(io::Error::new( - std::io::ErrorKind::InvalidData, + io::ErrorKind::InvalidData, format!("invalid operation results num:{op_res_num}"), )); } @@ -503,10 +736,21 @@ impl ProtoDecoder for ObTableTabletOpResult { /// [`ObTableSingleOp`] -> [`ObTableTabletOp`] -> [`ObTableLSOperation`] #[derive(Debug, Clone)] pub struct ObTableLSOperation { + // encoded params base: BasePayLoad, ls_id: i64, + table_name: String, + table_id: i64, + row_key_names: Vec, + properties_names: Vec, option_flag: ObTableLSOpFlag, tablet_ops: Vec, + + // intermediate params + row_key_names_set: LinkedHashMap, + properties_names_set: LinkedHashMap, + row_key_names_idx_map: HashMap, + properties_names_idx_map: HashMap, } impl Default for ObTableLSOperation { @@ -518,20 +762,36 @@ impl Default for ObTableLSOperation { impl ObTableLSOperation { pub fn internal_new( ls_id: i64, + table_name: String, + table_id: i64, + row_key_names: Vec, + properties_names: Vec, option_flag: ObTableLSOpFlag, tablet_ops: Vec, ) -> Self { Self { base: BasePayLoad::dummy(), ls_id, + table_name, + table_id, + row_key_names, + properties_names, option_flag, tablet_ops, + row_key_names_set: LinkedHashMap::new(), + properties_names_set: LinkedHashMap::new(), + row_key_names_idx_map: HashMap::new(), + properties_names_idx_map: HashMap::new(), } } pub fn new(ops_num: usize) -> Self { ObTableLSOperation::internal_new( OB_INVALID_ID, + String::new(), + OB_INVALID_ID, + Vec::default(), + Vec::default(), ObTableLSOpFlag::default(), Vec::with_capacity(ops_num), ) @@ -582,9 +842,19 @@ impl ObPayload for ObTableLSOperation { } // payload size, without header bytes - fn content_len(&self) -> crate::rpc::protocol::Result { + fn content_len(&self) -> Result { let mut len: usize = 0; len += 8; // ls_id + len += util::encoded_length_vstring(&self.table_name); + len += util::encoded_length_vi64(self.table_id); + len += util::encoded_length_vi64(self.row_key_names.len() as i64); + for rk_name in &self.row_key_names { + len += util::encoded_length_vstring(rk_name); + } + len += util::encoded_length_vi64(self.properties_names.len() as i64); + for ppt_name in &self.properties_names { + len += util::encoded_length_vstring(ppt_name); + } len += util::encoded_length_vi64(self.option_flag.value()); len += util::encoded_length_vi64(self.tablet_ops.len() as i64); for op in self.tablet_ops.iter() { @@ -595,20 +865,45 @@ impl ObPayload for ObTableLSOperation { } impl ProtoEncoder for ObTableLSOperation { - fn encode(&self, buf: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn encode(&self, buf: &mut BytesMut) -> Result<()> { self.encode_header(buf)?; + + // 1. Log Stream ID buf.put_i64(self.ls_id); + + // 2. table name + util::encode_vstring(&self.table_name, buf)?; + + // 3. table id + util::encode_vi64(self.table_id, buf)?; + + // 4. row key names + util::encode_vi64(self.row_key_names.len() as i64, buf)?; + for idx in 0..self.row_key_names.len() { + util::encode_vstring(&self.row_key_names[idx], buf)?; + } + + // 5. properties names + util::encode_vi64(self.properties_names.len() as i64, buf)?; + for idx in 0..self.properties_names.len() { + util::encode_vstring(&self.properties_names[idx], buf)?; + } + + // 6. option flag util::encode_vi64(self.option_flag.value(), buf)?; + + // 7. operation util::encode_vi64(self.tablet_ops.len() as i64, buf)?; for op in self.tablet_ops.iter() { op.encode(buf)?; } + Ok(()) } } impl ProtoDecoder for ObTableLSOperation { - fn decode(&mut self, _src: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn decode(&mut self, _src: &mut BytesMut) -> Result<()> { unimplemented!(); } } @@ -651,7 +946,7 @@ impl ObPayload for ObTableLSOpRequest { } // payload size, without header bytes - fn content_len(&self) -> crate::rpc::protocol::Result { + fn content_len(&self) -> Result { Ok(util::encoded_length_bytes_string(&self.credential) + util::encoded_length_i8(self.entity_type as i8) + util::encoded_length_i8(self.consistency_level as i8) @@ -664,7 +959,7 @@ impl ObPayload for ObTableLSOpRequest { } impl ProtoEncoder for ObTableLSOpRequest { - fn encode(&self, buf: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn encode(&self, buf: &mut BytesMut) -> Result<()> { self.encode_header(buf)?; util::encode_bytes_string(&self.credential, buf)?; buf.put_i8(self.entity_type as i8); @@ -675,7 +970,7 @@ impl ProtoEncoder for ObTableLSOpRequest { } impl ProtoDecoder for ObTableLSOpRequest { - fn decode(&mut self, _src: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn decode(&mut self, _src: &mut BytesMut) -> Result<()> { unimplemented!() } } @@ -735,13 +1030,13 @@ impl ObPayload for ObTableLSOpResult { } impl ProtoEncoder for ObTableLSOpResult { - fn encode(&self, _buf: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn encode(&self, _buf: &mut BytesMut) -> Result<()> { unimplemented!(); } } impl ProtoDecoder for ObTableLSOpResult { - fn decode(&mut self, src: &mut BytesMut) -> crate::rpc::protocol::Result<()> { + fn decode(&mut self, src: &mut BytesMut) -> Result<()> { self.decode_base(src)?; let op_res_num = util::decode_vi64(src)?; diff --git a/src/rpc/protocol/payloads.rs b/src/rpc/protocol/payloads.rs index 4ea3372..c3cee97 100644 --- a/src/rpc/protocol/payloads.rs +++ b/src/rpc/protocol/payloads.rs @@ -35,7 +35,7 @@ use crate::{ query::{ObNewRange, ObTableQuery}, rpc::protocol::{ codes::ResultCodes, - lsop::{ObTableSingleOp, ObTableSingleOpType, ObTableTabletOp, ObTableTabletOpFlag}, + lsop::{ObTableSingleOp, ObTableTabletOp, ObTableTabletOpFlag}, query_and_mutate::ObTableQueryAndMutate, }, serde_obkv::{util, value::Value}, @@ -67,6 +67,29 @@ pub enum ObTableOperationType { Replace = 5, Increment = 6, Append = 7, + Scan = 8, + TTL = 9, + CheckAndInsertUp = 10, + Invalid = 11, +} + +impl From for ObTableOperationType { + fn from(value: i8) -> Self { + match value { + 0 => ObTableOperationType::Get, + 1 => ObTableOperationType::Insert, + 2 => ObTableOperationType::Del, + 3 => ObTableOperationType::Update, + 4 => ObTableOperationType::InsertOrUpdate, + 5 => ObTableOperationType::Replace, + 6 => ObTableOperationType::Increment, + 7 => ObTableOperationType::Append, + 8 => ObTableOperationType::Scan, + 9 => ObTableOperationType::TTL, + 10 => ObTableOperationType::CheckAndInsertUp, + _ => panic!("Invalid value for ObTableSingleOpType"), + } + } } impl ObTableOperationType { @@ -80,6 +103,9 @@ impl ObTableOperationType { 5 => Ok(ObTableOperationType::Replace), 6 => Ok(ObTableOperationType::Increment), 7 => Ok(ObTableOperationType::Append), + 8 => Ok(ObTableOperationType::Scan), + 9 => Ok(ObTableOperationType::TTL), + 10 => Ok(ObTableOperationType::CheckAndInsertUp), _ => Err(io::Error::new( std::io::ErrorKind::InvalidData, format!("Invalid operation type: {i}"), @@ -98,6 +124,27 @@ impl ObTableOperationType { ObTableOperationType::Replace => "replace", ObTableOperationType::Increment => "increment", ObTableOperationType::Append => "append", + ObTableOperationType::Scan => "scan", + ObTableOperationType::TTL => "TTL", + ObTableOperationType::CheckAndInsertUp => "check_and_upsert", + ObTableOperationType::Invalid => "invalid_type" + } + } + + pub fn need_encode_query(&self) -> bool { + match self { + ObTableOperationType::Get => false, + ObTableOperationType::Insert => false, + ObTableOperationType::Del => false, + ObTableOperationType::Update => false, + ObTableOperationType::InsertOrUpdate => false, + ObTableOperationType::Replace => false, + ObTableOperationType::Increment => false, + ObTableOperationType::Append => false, + ObTableOperationType::Scan => false, + ObTableOperationType::TTL => false, + ObTableOperationType::CheckAndInsertUp => true, + ObTableOperationType::Invalid => false } } } @@ -869,7 +916,7 @@ impl ObTableBatchOperation { // generate singele operation let single_op = - ObTableSingleOp::new(ObTableSingleOpType::QueryAndMutate, query_and_mutate); + ObTableSingleOp::new(ObTableOperationType::CheckAndInsertUp, query_and_mutate); ops.push(single_op); }