Skip to content

Commit

Permalink
Merge branch 'reorder-rules' of github.com:sundy-li/fuse-query into r…
Browse files Browse the repository at this point in the history
…eorder-rules
  • Loading branch information
sundy-li committed Sep 12, 2024
2 parents 6d92dd7 + 473d174 commit 5a1bcfa
Show file tree
Hide file tree
Showing 22 changed files with 465 additions and 180 deletions.
6 changes: 6 additions & 0 deletions src/common/building/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub fn add_building_env_vars() {
add_env_credits_info();
add_target_features();
add_env_version();
add_env_license();
}

pub fn set_env_config() {
Expand Down Expand Up @@ -87,6 +88,11 @@ fn discover_version() -> Result<String> {
}
}

pub fn add_env_license() {
let v = env::var("DATABEND_ENTERPRISE_LICENSE_EMBEDDED").unwrap_or_default();
println!("cargo:rustc-env=DATABEND_ENTERPRISE_LICENSE_EMBEDDED={v}");
}

pub fn add_env_commit_authors(repo: &Repository) {
match git::get_commit_authors(repo) {
Ok(authors) => println!("cargo:rustc-env=DATABEND_COMMIT_AUTHORS={}", authors),
Expand Down
71 changes: 26 additions & 45 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2865,56 +2865,35 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
};

let table_infos = do_get_table_history(self, db_filter, left_num).await?;
let take_num = left_num.unwrap_or(usize::MAX);

// check if reach the limit
if let Some(left_num) = left_num {
let num = min(left_num, table_infos.len());
for table_info in table_infos.iter().take(num) {
let (table_info, db_id) = table_info;
// A DB can be removed only when all its tables are removed.
if drop_db && take_num > table_infos.len() {
drop_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
tables: table_infos
.iter()
.map(|(table_info, _)| {
(table_info.ident.table_id, table_info.name.clone())
})
.collect(),
});
} else {
for (table_info, db_id) in table_infos.iter().take(take_num) {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
drop_table_infos.push(table_info.clone());
}

// if limit is Some, append DroppedId::Db only when table_infos is empty
if drop_db && table_infos.is_empty() {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
));
}
if num == left_num {
return Ok(ListDroppedTableResp {
drop_table_infos,
drop_ids,
});
}
} else {
table_infos.iter().for_each(|(table_info, db_id)| {
if !drop_db {
drop_ids.push(DroppedId::Table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
))
}
});
drop_table_infos.extend(
table_infos
.into_iter()
.map(|(table_info, _)| table_info)
.collect::<Vec<_>>(),
);
if drop_db {
drop_ids.push(DroppedId::Db(
db_info.database_id.db_id,
db_info.name_ident.database_name().to_string(),
));
}
}
drop_table_infos.extend(
table_infos
.iter()
.take(take_num)
.map(|(table_info, _)| table_info.clone()),
);
}

return Ok(ListDroppedTableResp {
Expand Down Expand Up @@ -2974,9 +2953,11 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> {
for drop_id in req.drop_ids {
match drop_id {
DroppedId::Db(db_id, db_name) => {
gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?
}
DroppedId::Db {
db_id,
db_name,
tables: _,
} => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?,
DroppedId::Table(db_id, table_id, table_name) => {
gc_dropped_table_by_id(self, &req.tenant, db_id, table_id, table_name).await?
}
Expand Down
84 changes: 64 additions & 20 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4020,14 +4020,16 @@ impl SchemaApiTestSuite {
};

let res = mt.create_database(req).await?;
drop_ids_1.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
));
drop_ids_2.push(DroppedId::Db(
*res.db_id,
db_name.database_name().to_string(),
));
drop_ids_1.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});
drop_ids_2.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});

let req = CreateTableReq {
create_option: CreateOption::Create,
Expand Down Expand Up @@ -4063,7 +4065,11 @@ impl SchemaApiTestSuite {

let res = mt.create_database(create_db_req.clone()).await?;
let db_id = res.db_id;
drop_ids_2.push(DroppedId::Db(*db_id, "db2".to_string()));
drop_ids_2.push(DroppedId::Db {
db_id: *db_id,
db_name: "db2".to_string(),
tables: vec![],
});

info!("--- create and drop db2.tb1");
{
Expand Down Expand Up @@ -4262,15 +4268,47 @@ impl SchemaApiTestSuite {
left_table_id.cmp(right_table_id)
}
}
(DroppedId::Db(left_db_id, _), DroppedId::Db(right_db_id, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Db(left_db_id, _), DroppedId::Table(right_db_id, _, _)) => {
left_db_id.cmp(right_db_id)
}
(DroppedId::Table(left_db_id, _, _), DroppedId::Db(right_db_id, _)) => {
left_db_id.cmp(right_db_id)
}
(
DroppedId::Db {
db_id: left_db_id, ..
},
DroppedId::Db {
db_id: right_db_id, ..
},
) => left_db_id.cmp(right_db_id),
(
DroppedId::Db {
db_id: left_db_id,
db_name: _,
tables: _,
},
DroppedId::Table(right_db_id, _, _),
) => left_db_id.cmp(right_db_id),
(
DroppedId::Table(left_db_id, _, _),
DroppedId::Db {
db_id: right_db_id,
db_name: _,
tables: _,
},
) => left_db_id.cmp(right_db_id),
}
}
fn is_dropped_id_eq(l: &DroppedId, r: &DroppedId) -> bool {
match (l, r) {
(
DroppedId::Db {
db_id: left_db_id,
db_name: left_db_name,
tables: _,
},
DroppedId::Db {
db_id: right_db_id,
db_name: right_db_name,
tables: _,
},
) => left_db_id == right_db_id && left_db_name == right_db_name,
_ => l == r,
}
}
// case 1: test AllDroppedTables with filter time
Expand All @@ -4285,7 +4323,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_1);
assert_eq!(sort_drop_ids.len(), drop_ids_1.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_1.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down Expand Up @@ -4314,7 +4355,10 @@ impl SchemaApiTestSuite {
// sort drop id by table id
let mut sort_drop_ids = resp.drop_ids;
sort_drop_ids.sort_by(cmp_dropped_id);
assert_eq!(sort_drop_ids, drop_ids_2);
assert_eq!(sort_drop_ids.len(), drop_ids_2.len());
for (id1, id2) in sort_drop_ids.iter().zip(drop_ids_2.iter()) {
assert!(is_dropped_id_eq(id1, id2));
}

let expected: BTreeSet<String> = [
"'db1'.'tb1'".to_string(),
Expand Down
24 changes: 21 additions & 3 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::time::Duration;
use anyerror::func_name;
use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::FieldIndex;
use databend_common_expression::TableField;
Expand Down Expand Up @@ -201,6 +202,20 @@ pub struct TableInfo {
pub db_type: DatabaseType,
}

impl TableInfo {
pub fn database_name(&self) -> Result<&str> {
if self.engine() != "FUSE" {
return Err(ErrorCode::Internal(format!(
"Invalid engine: {}",
self.engine()
)));
}
let database_name = self.desc.split('.').next().unwrap();
let database_name = &database_name[1..database_name.len() - 1];
Ok(database_name)
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
pub struct TableStatistics {
/// Number of rows
Expand Down Expand Up @@ -360,7 +375,7 @@ impl Default for TableMeta {
fn default() -> Self {
TableMeta {
schema: Arc::new(TableSchema::empty()),
engine: "".to_string(),
engine: "FUSE".to_string(),
engine_options: BTreeMap::new(),
storage_params: None,
part_prefix: "".to_string(),
Expand Down Expand Up @@ -907,8 +922,11 @@ pub struct ListDroppedTableReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DroppedId {
// db id, db name
Db(u64, String),
Db {
db_id: u64,
db_name: String,
tables: Vec<(u64, String)>,
},
// db id, table id, table name
Table(u64, u64, String),
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/configs/outer_v0.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ pub struct RaftConfig {
#[clap(long, default_value_t = get_default_raft_advertise_host())]
pub raft_advertise_host: String,

/// The listening port for metadata communication.
/// The listening port for raft communication.
#[clap(long, default_value = "28004")]
pub raft_api_port: u16,

Expand Down
36 changes: 30 additions & 6 deletions src/meta/service/src/meta_service/raft_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ impl RaftService for RaftServiceImpl {
request: Request<RaftRequest>,
) -> Result<Response<RaftReply>, Status> {
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let remote_addr = remote_addr(&request);

async {
self.incr_meta_metrics_recv_bytes_from_peer(&request);
Expand All @@ -332,14 +333,20 @@ impl RaftService for RaftServiceImpl {
let req_summary = ae_req.summary();
let raft = &self.meta_node.raft;

info!("RaftServiceImpl::append_entries: {}", req_summary);
info!(
"RaftServiceImpl::append_entries: from:{remote_addr} {}",
req_summary
);

let resp = raft
.append_entries(ae_req)
.await
.map_err(GrpcHelper::internal_err)?;

info!("RaftServiceImpl::append_entries: done: {}", req_summary);
info!(
"RaftServiceImpl::append_entries: from:{remote_addr} done: {}",
req_summary
);

GrpcHelper::ok_response(&resp)
}
Expand All @@ -365,6 +372,7 @@ impl RaftService for RaftServiceImpl {

async fn vote(&self, request: Request<RaftRequest>) -> Result<Response<RaftReply>, Status> {
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let remote_addr = remote_addr(&request);

async {
self.incr_meta_metrics_recv_bytes_from_peer(&request);
Expand All @@ -373,13 +381,19 @@ impl RaftService for RaftServiceImpl {

let v_req_summary = v_req.summary();

info!("RaftServiceImpl::vote: start: {}", v_req_summary);
info!(
"RaftServiceImpl::vote: from:{remote_addr} start: {}",
v_req_summary
);

let raft = &self.meta_node.raft;

let resp = raft.vote(v_req).await.map_err(GrpcHelper::internal_err)?;

info!("RaftServiceImpl::vote: done: {}", v_req_summary);
info!(
"RaftServiceImpl::vote: from:{remote_addr} done: {}",
v_req_summary
);

GrpcHelper::ok_response(&resp)
}
Expand All @@ -392,21 +406,31 @@ impl RaftService for RaftServiceImpl {
request: Request<pb::TransferLeaderRequest>,
) -> Result<Response<Empty>, Status> {
let root = databend_common_tracing::start_trace_for_remote_request(full_name!(), &request);
let remote_addr = remote_addr(&request);

let fu = async {
let req = request.into_inner();
let req: TransferLeaderRequest = req.try_into()?;

let req_str = req.to_string();

info!("RaftServiceImpl::{}: start: {}", func_name!(), req_str);
info!(
"RaftServiceImpl::{}: from:{remote_addr} start: {}",
func_name!(),
req_str
);

let raft = &self.meta_node.raft;

raft.handle_transfer_leader(req)
.await
.map_err(GrpcHelper::internal_err)?;

info!("RaftServiceImpl::{}: done: {}", func_name!(), req_str);
info!(
"RaftServiceImpl::{}: from:{remote_addr} done: {}",
func_name!(),
req_str
);

Ok(Response::new(pb::Empty {}))
};
Expand Down
Loading

0 comments on commit 5a1bcfa

Please sign in to comment.