Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug修复及功能支持 #247

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,7 @@ void stripslashes(std::string& str, bool is_gbk);
extern void update_schema_conf_common(const std::string& table_name, const pb::SchemaConf& schema_conf, pb::SchemaConf* p_conf);
extern void update_op_version(pb::SchemaConf* p_conf, const std::string& desc);
extern int primitive_to_proto_type(pb::PrimitiveType type);
extern int primitive_type_bytes_len(pb::PrimitiveType type);
extern int get_physical_room(const std::string& ip_and_port_str, std::string& host);
extern int get_instance_from_bns(int* ret,
const std::string& bns_name,
Expand Down
4 changes: 3 additions & 1 deletion include/common/histogram.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ class PacketSample {
sample_sorter.insert_row(batch->get_row().get());
}
batch->reset();
_batch_vector.push_back(batch);
if (batch->size() > 0) {
_batch_vector.push_back(batch);
}
} while (!eos);

sample_sorter.insert_done();
Expand Down
2 changes: 1 addition & 1 deletion include/exec/dml_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DMLNode : public ExecNode {
virtual void find_place_holder(std::unordered_multimap<int, ExprNode*>& placeholders);
int insert_row(RuntimeState* state, SmartRecord record, bool is_update = false);
int delete_row(RuntimeState* state, SmartRecord record, MemRow* row);
int get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row);
int get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row, int64_t& ttl_ts);
int remove_row(RuntimeState* state, SmartRecord record,
const std::string& pk_str, bool delete_primary = true);
int update_row(RuntimeState* state, SmartRecord record, MemRow* row);
Expand Down
1 change: 1 addition & 0 deletions include/logical_plan/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class QueryContext {
return plan.add_nodes();
}
int create_plan_tree();
int destroy_plan_tree();

void add_sub_ctx(std::shared_ptr<QueryContext>& ctx) {
std::unique_lock<bthread::Mutex> lck(_kill_lock);
Expand Down
7 changes: 0 additions & 7 deletions include/meta_server/namespace_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,6 @@ class NamespaceManager {
}
return _namespace_id_map[namespace_name];
}
const std::string get_resource_tag(const int64_t& namespace_id) {
BAIDU_SCOPED_LOCK(_namespace_mutex);
if (_namespace_info_map.find(namespace_id) == _namespace_info_map.end()) {
return "";
}
return _namespace_info_map[namespace_id].resource_tag();
}
int get_namespace_info(const int64_t& namespace_id, pb::NameSpaceInfo& namespace_info) {
BAIDU_SCOPED_LOCK(_namespace_mutex);
if (_namespace_info_map.find(namespace_id) == _namespace_info_map.end()) {
Expand Down
23 changes: 13 additions & 10 deletions include/meta_server/table_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,11 @@ class TableManager {
auto src_type = src_field.mysql_type();
auto target_type = target_field.mysql_type();
if (src_type == target_type) {
// if (src_type == pb::DATETIME || src_type == pb::FLOAT || src_type == pb::DOUBLE) {
// if (src_field.float_precision_len() > target_field.float_precision_len()) {
// return false;
// }
// }
if (src_type == pb::DATETIME) {
if (src_field.float_precision_len() > target_field.float_precision_len()) {
return false;
}
}
return true;
}
switch (src_type) {
Expand All @@ -1007,11 +1007,14 @@ class TableManager {
}
int s = primitive_to_proto_type(src_type);
int t = primitive_to_proto_type(target_type);
if (s == t) return true;
if (s == FieldDescriptorProto::TYPE_SINT32 && t == FieldDescriptorProto::TYPE_SINT64) return true;
if (s == FieldDescriptorProto::TYPE_SINT64 && t == FieldDescriptorProto::TYPE_SINT32) return true;
if (s == FieldDescriptorProto::TYPE_UINT32 && t == FieldDescriptorProto::TYPE_UINT64) return true;
if (s == FieldDescriptorProto::TYPE_UINT64 && t == FieldDescriptorProto::TYPE_UINT32) return true;
if (s != t) return false;
if (primitive_type_bytes_len(src_type) <= primitive_type_bytes_len(target_type)) {
return true;
}
// if (s == FieldDescriptorProto::TYPE_SINT32 && t == FieldDescriptorProto::TYPE_SINT64) return true;
// if (s == FieldDescriptorProto::TYPE_SINT64 && t == FieldDescriptorProto::TYPE_SINT32) return true;
// if (s == FieldDescriptorProto::TYPE_UINT32 && t == FieldDescriptorProto::TYPE_UINT64) return true;
// if (s == FieldDescriptorProto::TYPE_UINT64 && t == FieldDescriptorProto::TYPE_UINT32) return true;
return false;
}
int get_index_state(int64_t table_id, int64_t index_id, pb::IndexState& index_state) {
Expand Down
1 change: 1 addition & 0 deletions include/physical_plan/join_reorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace baikaldb {
class JoinReorder {
public:
int analyze(QueryContext* ctx);
int reorder(QueryContext* ctx, ExecNode* node);
};
}

Expand Down
2 changes: 1 addition & 1 deletion include/sqlparser/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ struct String {
size_t fast = 0;
bool has_slash = false;
static std::unordered_map<char, char> trans_map = {
{'0', '\x00'},
{'\\', '\\'},
{'\"', '\"'},
{'\'', '\''},
Expand All @@ -96,7 +97,6 @@ struct String {
{'n', '\n'},
{'b', '\b'},
{'Z', '\x1A'},
{'0', '\0'},
};
while (fast < length) {
if (has_slash) {
Expand Down
4 changes: 4 additions & 0 deletions proto/meta.interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ message NameSpaceInfo {
optional int64 byte_size_per_record = 9;
optional int64 replica_num = 10;
optional int64 region_split_lines = 11; //region分裂行数阈值
repeated ReplicaDist dists = 12;
optional bool if_exist = 13;
optional string main_logical_room = 14;
};

message DataBaseInfo {
Expand All @@ -38,6 +40,8 @@ message DataBaseInfo {
optional int64 byte_size_per_record = 11;
optional int64 replica_num = 12;
optional int64 region_split_lines = 13; //region分裂行数阈值
repeated ReplicaDist dists = 14;
optional string main_logical_room = 15;
};

enum Status {
Expand Down
1 change: 1 addition & 0 deletions proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ message UpdateNode {
repeated SlotDescriptor update_slots = 3;
repeated Expr update_exprs = 4;
//repeated int64 affect_index_ids = 5;
optional int64 row_ttl_duration = 6; //row ttl support, compatible whit prepared stmt
};

message PacketNode {
Expand Down
18 changes: 18 additions & 0 deletions src/common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ void stripslashes(std::string& str, bool is_gbk) {
size_t fast = 0;
bool has_slash = false;
static std::unordered_map<char, char> trans_map = {
{'0', '\x00'},
{'\\', '\\'},
{'\"', '\"'},
{'\'', '\''},
Expand Down Expand Up @@ -439,6 +440,23 @@ void update_schema_conf_common(const std::string& table_name, const pb::SchemaCo
DB_WARNING("%s schema conf UPDATE TO : %s", table_name.c_str(), schema_conf.ShortDebugString().c_str());
}

int primitive_type_bytes_len(pb::PrimitiveType type) {
static std::unordered_map<int32_t, int32_t> _mysql_pb_type_bytes_count = {
{ pb::INT8, 1},
{ pb::INT16, 2},
{ pb::INT32, 3},
{ pb::INT64, 4},
{ pb::UINT8, 1},
{ pb::UINT16, 2},
{ pb::UINT32, 3},
{ pb::UINT64, 4}
};
if (_mysql_pb_type_bytes_count.count(type) == 0) {
return 0xFFFF;
}
return _mysql_pb_type_bytes_count[type];
}

int primitive_to_proto_type(pb::PrimitiveType type) {
using google::protobuf::FieldDescriptorProto;
static std::unordered_map<int32_t, int32_t> _mysql_pb_type_mapping = {
Expand Down
2 changes: 1 addition & 1 deletion src/common/information_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ void InformationSchema::init_tables() {
record->set_value(record->get_field_by_name("CREATE_TIME"), ct.cast_to(pb::DATETIME));
record->set_string(record->get_field_by_name("TABLE_COLLATION"), coll);
record->set_string(record->get_field_by_name("CREATE_OPTIONS"), "");
record->set_string(record->get_field_by_name("TABLE_COMMENT"), "");
record->set_string(record->get_field_by_name("TABLE_COMMENT"), table_info->comment);
record->set_int64(record->get_field_by_name("TABLE_ID"), table_info->id);
records.emplace_back(record);
}
Expand Down
3 changes: 2 additions & 1 deletion src/exec/access_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ bool AccessPath::check_sort_use_index(Property& sort_property) {
std::vector<ExprNode*>& order_exprs = sort_property.slot_order_exprs;
SlotRef* slot_ref = static_cast<SlotRef*>(order_exprs[0]);
size_t idx = 0;
auto& fields = index_info_ptr->fields;
std::vector<FieldInfo>fields(index_info_ptr->fields.begin(), index_info_ptr->fields.end());
fields.insert(fields.end(),index_info_ptr->pk_fields.begin(), index_info_ptr->pk_fields.end());
for (; idx < fields.size(); ++idx) {
if (tuple_id == slot_ref->tuple_id() && fields[idx].id == slot_ref->field_id()) {
break;
Expand Down
19 changes: 14 additions & 5 deletions src/exec/dml_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ int DMLNode::insert_row(RuntimeState* state, SmartRecord record, bool is_update)
return ++affected_rows;
}

int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row) {
int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string* pk_str, MemRow* row, int64_t& ttl_ts) {
int ret = 0;
MutTableKey pk_key;
ret = record->encode_key(*_pri_info, pk_key, -1, false);
Expand All @@ -522,12 +522,12 @@ int DMLNode::get_lock_row(RuntimeState* state, SmartRecord record, std::string*
record->decode_key(*_pri_info, *pk_str);
}
//delete requires all fields (index and non-index fields)
ret = _txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_LOCK, true);
ret = _txn->get_update_primary(_region_id, *_pri_info, record, _field_ids, GET_LOCK, true, ttl_ts);
if (ret < 0) {
return ret;
}
if (row != nullptr && _tuple_desc != nullptr
&& (_node_type == pb::DELETE_NODE || _node_type == pb::UPDATE_NODE)) {
&& (_node_type == pb::DELETE_NODE || _node_type == pb::UPDATE_NODE || _node_type == pb::LOCK_PRIMARY_NODE)) {
for (auto slot : _tuple_desc->slots()) {
auto field = record->get_field_by_tag(slot.field_id());
row->set_value(slot.tuple_id(), slot.slot_id(),
Expand Down Expand Up @@ -637,7 +637,8 @@ int DMLNode::remove_row(RuntimeState* state, SmartRecord record,
int DMLNode::delete_row(RuntimeState* state, SmartRecord record, MemRow* row) {
int ret = 0;
std::string pk_str;
ret = get_lock_row(state, record, &pk_str, row);
int64_t ttl_ts = 0;
ret = get_lock_row(state, record, &pk_str, row, ttl_ts);
if (ret == -3) {
//DB_WARNING_STATE(state, "key not in this region:%ld", _region_id);
return 0;
Expand Down Expand Up @@ -673,7 +674,8 @@ bool DMLNode::satisfy_condition_again(RuntimeState* state, MemRow* row) {
int DMLNode::update_row(RuntimeState* state, SmartRecord record, MemRow* row) {
int ret = 0;
std::string pk_str;
ret = get_lock_row(state, record, &pk_str, row);
int64_t ttl_ts = 0;
ret = get_lock_row(state, record, &pk_str, row, ttl_ts);
if (ret == -3) {
//DB_WARNING_STATE(state, "key not in this region:%ld", _region_id);
return 0;
Expand All @@ -689,6 +691,13 @@ int DMLNode::update_row(RuntimeState* state, SmartRecord record, MemRow* row) {
// UndoGetForUpdate(pk_str)? 同一个txn GetForUpdate与UndoGetForUpdate之间不要写pk_str
return 0;
}
// _row_ttl_duration == -1 代表保持原ttl意思
// TODO: 全局索引 keep ttl功能
if (_row_ttl_duration == -1 && _ttl_timestamp_us > 0 && ttl_ts > 0) {
_ttl_timestamp_us = ttl_ts;
_txn->set_write_ttl_timestamp_us(_ttl_timestamp_us);
DB_DEBUG("keep ttl_timestamp_us: %ld", _ttl_timestamp_us);
}
_indexes_ptr = &_affected_indexes;
// 影响了主键需要删除旧的行
ret = remove_row(state, record, pk_str, _update_affect_primary);
Expand Down
3 changes: 2 additions & 1 deletion src/exec/lock_primary_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ int LockPrimaryNode::open(RuntimeState* state) {
case pb::LOCK_GET_DML: {
for (auto& record : delete_records) {
//DB_WARNING_STATE(state,"record:%s", record->debug_string().c_str());
ret = delete_row(state, record, nullptr);
std::unique_ptr<MemRow> row = state->mem_row_desc()->fetch_mem_row();
ret = delete_row(state, record, row.get());
if (ret < 0) {
DB_WARNING_STATE(state, "delete_row fail");
return -1;
Expand Down
12 changes: 7 additions & 5 deletions src/exec/packet_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ int PacketNode::open(RuntimeState* state) {
}
_send_buf = state->send_buf();
int ret = 0;
if (state->explain_type == EXPLAIN_SHOW_COST) {
handle_show_cost(state);
return 0;
}
if (!_return_empty || op_type() == pb::OP_SELECT) {
ret = ExecNode::open(state);
if (ret < 0) {
Expand Down Expand Up @@ -484,10 +488,6 @@ int PacketNode::open(RuntimeState* state) {
}
return 0;
}
if (state->explain_type == EXPLAIN_SHOW_COST) {
handle_show_cost(state);
return 0;
}
state->set_num_affected_rows(ret);
if (op_type() != pb::OP_SELECT && op_type() != pb::OP_UNION) {
pack_ok(state->num_affected_rows(), _client);
Expand Down Expand Up @@ -682,7 +682,9 @@ int PacketNode::open_analyze(RuntimeState* state) {
return ret;
}
state->inc_num_returned_rows(batch->size());
batch_vector.push_back(batch);
if (batch->size() > 0) {
batch_vector.push_back(batch);
}
} while (!eos);

std::vector<ExprNode*> slot_order_exprs;
Expand Down
2 changes: 2 additions & 0 deletions src/exec/update_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ int UpdateNode::init(const pb::PlanNode& node) {
}
_table_id = node.derive_node().update_node().table_id();
_global_index_id = _table_id;
_row_ttl_duration = node.derive_node().update_node().row_ttl_duration();
DB_DEBUG("_row_ttl_duration:%ld", _row_ttl_duration);
_primary_slots.clear();
_primary_slots.reserve(node.derive_node().update_node().primary_slots_size());
for (auto& slot : node.derive_node().update_node().primary_slots()) {
Expand Down
2 changes: 1 addition & 1 deletion src/logical_plan/insert_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ int InsertPlanner::plan() {
insert->set_need_ignore(_insert_stmt->is_ignore);
insert->set_is_replace(_insert_stmt->is_replace);
insert->set_is_merge(_insert_stmt->is_merge);
if (_ctx->row_ttl_duration > 0) {
if (_ctx->row_ttl_duration > 0 || _ctx->row_ttl_duration == -1) {
insert->set_row_ttl_duration(_ctx->row_ttl_duration);
DB_DEBUG("row_ttl_duration: %ld", _ctx->row_ttl_duration);
}
Expand Down
Loading
Loading