Skip to content

Commit

Permalink
refactor(query): first check privilege in SystemEngine get_full_data (#โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ16421)

* optimize(query): first check privilege in SystemEngine get_full_data

* use mget_databases replace for
  • Loading branch information
TCeason committed Sep 10, 2024
1 parent 4bb036c commit 16c7d99
Show file tree
Hide file tree
Showing 17 changed files with 454 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -223,6 +224,13 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
// Get the db name by meta id.
async fn get_db_name_by_id(&self, db_ids: MetaId) -> Result<String>;

// Mget dbs by DatabaseNameIdent.
async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>>;

// Mget the dbs name by meta ids.
async fn mget_database_names_by_ids(
&self,
Expand Down
33 changes: 33 additions & 0 deletions src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -338,6 +339,38 @@ impl Catalog for DatabaseCatalog {
}
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let sys_dbs = self.immutable_catalog.list_databases(tenant).await?;
let sys_db_names: Vec<_> = sys_dbs
.iter()
.map(|sys_db| sys_db.get_db_info().name_ident.database_name())
.collect();

let mut mut_db_names: Vec<_> = Vec::new();
for db_name in db_names {
if !sys_db_names.contains(&db_name.database_name()) {
mut_db_names.push(db_name.clone());
}
}

let mut dbs = self
.immutable_catalog
.mget_databases(tenant, db_names)
.await?;

let other = self
.mutable_catalog
.mget_databases(tenant, &mut_db_names)
.await?;

dbs.extend(other);
Ok(dbs)
}

#[async_backtrace::framed]
async fn mget_database_names_by_ids(
&self,
Expand Down
18 changes: 18 additions & 0 deletions src/query/service/src/catalogs/default/immutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_catalog::catalog::Catalog;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -232,6 +233,23 @@ impl Catalog for ImmutableCatalog {
}
}

async fn mget_databases(
&self,
_tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let mut res: Vec<Arc<dyn Database>> = vec![];
for db_name in db_names {
let db_name = db_name.database_name();
if db_name == "system" {
res.push(self.sys_db.clone());
} else if db_name == "information_schema" {
res.push(self.info_schema_db.clone());
}
}
Ok(res)
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
29 changes: 29 additions & 0 deletions src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use databend_common_catalog::catalog::Catalog;
use databend_common_config::InnerConfig;
use databend_common_exception::Result;
use databend_common_meta_api::kv_app_error::KVAppError;
use databend_common_meta_api::name_id_value_api::NameIdValueApiCompat;
use databend_common_meta_api::SchemaApi;
use databend_common_meta_api::SequenceApi;
use databend_common_meta_app::app_error::AppError;
Expand Down Expand Up @@ -422,6 +423,34 @@ impl Catalog for MutableCatalog {
Ok(res)
}

// Mget dbs by DatabaseNameIdent.
async fn mget_databases(
&self,
_tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
let res = self
.ctx
.meta
.mget_id_value_compat(db_names.iter().cloned())
.await?;
let dbs = res
.map(|(name_ident, database_id, meta)| {
Arc::new(DatabaseInfo {
database_id,
name_ident,
meta,
})
})
.collect::<Vec<Arc<DatabaseInfo>>>();

dbs.iter().try_fold(vec![], |mut acc, item| {
let db = self.build_db_instance(item)?;
acc.push(db);
Ok(acc)
})
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use databend_common_catalog::table_args::TableArgs;
use databend_common_catalog::table_function::TableFunction;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -305,6 +306,14 @@ impl Catalog for SessionCatalog {
self.inner.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.inner.mget_databases(tenant, db_names).await
}

// Mget the dbs name by meta ids.
async fn mget_database_names_by_ids(
&self,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use databend_common_meta_app::principal::RoleInfo;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -208,6 +209,14 @@ impl Catalog for FakedCatalog {
self.cat.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.cat.mget_databases(tenant, db_names).await
}

async fn mget_database_names_by_ids(
&self,
tenant: &Tenant,
Expand Down
9 changes: 9 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use databend_common_meta_app::principal::RoleInfo;
use databend_common_meta_app::principal::UserDefinedConnection;
use databend_common_meta_app::principal::UserInfo;
use databend_common_meta_app::principal::UserPrivilegeType;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CommitTableMetaReply;
Expand Down Expand Up @@ -957,6 +958,14 @@ impl Catalog for FakedCatalog {
self.cat.get_db_name_by_id(db_id).await
}

async fn mget_databases(
&self,
tenant: &Tenant,
db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
self.cat.mget_databases(tenant, db_names).await
}

#[async_backtrace::framed]
async fn mget_database_names_by_ids(
&self,
Expand Down
11 changes: 11 additions & 0 deletions src/query/storages/hive/hive/src/hive_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogOption;
Expand Down Expand Up @@ -385,6 +386,16 @@ impl Catalog for HiveCatalog {
))
}

async fn mget_databases(
&self,
_tenant: &Tenant,
_db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
Err(ErrorCode::Unimplemented(
"Cannot mget databases in HIVE catalog",
))
}

async fn mget_database_names_by_ids(
&self,
_tenant: &Tenant,
Expand Down
10 changes: 10 additions & 0 deletions src/query/storages/iceberg/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_catalog::table_function::TableFunction;
use databend_common_config::InnerConfig;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::dictionary_name_ident::DictionaryNameIdent;
use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::CatalogOption;
Expand Down Expand Up @@ -295,6 +296,15 @@ impl Catalog for IcebergCatalog {
"Cannot get db name by id in ICEBERG catalog",
))
}
async fn mget_databases(
&self,
_tenant: &Tenant,
_db_names: &[DatabaseNameIdent],
) -> Result<Vec<Arc<dyn Database>>> {
Err(ErrorCode::Unimplemented(
"Cannot mget databases in ICEBERG catalog",
))
}

async fn mget_database_names_by_ids(
&self,
Expand Down
63 changes: 52 additions & 11 deletions src/query/storages/system/src/columns_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchemaRefExt;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -277,16 +278,7 @@ pub(crate) async fn dump_tables(

let mut final_dbs: Vec<(String, u64)> = Vec::new();

if databases.is_empty() {
let all_databases = catalog.list_databases(&tenant).await?;
for db in all_databases {
let db_id = db.get_db_info().database_id.db_id;
let db_name = db.name();
if visibility_checker.check_database_visibility(CATALOG_DEFAULT, db_name, db_id) {
final_dbs.push((db_name.to_string(), db_id));
}
}
} else {
if !databases.is_empty() {
for db in databases {
let db_id = catalog
.get_database(&tenant, &db)
Expand All @@ -298,12 +290,61 @@ pub(crate) async fn dump_tables(
final_dbs.push((db.to_string(), db_id));
}
}
} else {
let catalog_dbs = visibility_checker.get_visibility_database();
// None means has global level privileges
if let Some(catalog_dbs) = catalog_dbs {
for (catalog_name, dbs) in catalog_dbs {
if catalog_name == CATALOG_DEFAULT {
let mut catalog_db_ids = vec![];
let mut catalog_db_names = vec![];
catalog_db_names.extend(
dbs.iter()
.filter_map(|(db_name, _)| *db_name)
.map(|db_name| db_name.to_string()),
);
catalog_db_ids.extend(dbs.iter().filter_map(|(_, db_id)| *db_id));
if let Ok(databases) = catalog
.mget_database_names_by_ids(&tenant, &catalog_db_ids)
.await
{
catalog_db_names.extend(databases.into_iter().flatten());
} else {
let msg = format!("Failed to get database name by id: {}", catalog.name());
warn!("{}", msg);
}
let db_idents = catalog_db_names
.iter()
.map(|name| DatabaseNameIdent::new(&tenant, name))
.collect::<Vec<DatabaseNameIdent>>();
let dbs: Vec<(String, u64)> = catalog
.mget_databases(&tenant, &db_idents)
.await?
.iter()
.map(|db| (db.name().to_string(), db.get_db_info().database_id.db_id))
.collect();
final_dbs.extend(dbs);
}
}
} else {
let all_databases = catalog.list_databases(&tenant).await?;
for db in all_databases {
let db_id = db.get_db_info().database_id.db_id;
let db_name = db.name();
if visibility_checker.check_database_visibility(CATALOG_DEFAULT, db_name, db_id) {
final_dbs.push((db_name.to_string(), db_id));
}
}
}
}

let mut final_tables: Vec<(String, Vec<Arc<dyn Table>>)> = Vec::with_capacity(final_dbs.len());
for (database, db_id) in final_dbs {
let tables = if tables.is_empty() {
(catalog.list_tables(&tenant, &database).await).unwrap_or_default()
catalog
.list_tables(&tenant, &database)
.await
.unwrap_or_default()
} else {
let mut res = Vec::new();
for table in &tables {
Expand Down
Loading

0 comments on commit 16c7d99

Please sign in to comment.