Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hive metastore catalog support (part 1/2) #237

Merged
merged 25 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

[advisories]
ignore = [
# rsa
# Marvin Attack: potential key recovery through timing sidechannels
# Issues: https://github.com/apache/iceberg-rust/issues/221
"RUSTSEC-2023-0071",
# rsa
# Marvin Attack: potential key recovery through timing sidechannels
# Issues: https://github.com/apache/iceberg-rust/issues/221
"RUSTSEC-2023-0071",
]
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

[workspace]
resolver = "2"
members = ["crates/catalog/*", "crates/examples", "crates/iceberg", "crates/test_utils"]
members = [
"crates/catalog/*",
"crates/examples",
"crates/iceberg",
"crates/test_utils",
]

[workspace.package]
version = "0.2.0"
Expand Down Expand Up @@ -55,6 +60,7 @@ once_cell = "1"
opendal = "0.45"
ordered-float = "4.0.0"
parquet = "50"
pilota = "0.10.0"
pretty_assertions = "1.4.0"
port_scanner = "0.1.5"
reqwest = { version = "^0.11", features = ["json"] }
Expand Down
7 changes: 7 additions & 0 deletions crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,12 @@ anyhow = { workspace = true }
async-trait = { workspace = true }
hive_metastore = { workspace = true }
iceberg = { workspace = true }
log = { workspace = true }
pilota = { workspace = true }
typed-builder = { workspace = true }
volo-thrift = { workspace = true }

[dev-dependencies]
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
port_scanner = { workspace = true }
tokio = { workspace = true }
160 changes: 145 additions & 15 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::utils::*;
use async_trait::async_trait;
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
use hive_metastore::ThriftHiveMetastoreGetDatabaseException;
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
Expand All @@ -28,6 +29,7 @@ use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;
use volo_thrift::ResponseError;

/// Which variant of the thrift transport to communicate with HMS
/// See: <https://github.com/apache/thrift/blob/master/doc/specs/thrift-rpc.md#framed-vs-unframed-transport>
Expand Down Expand Up @@ -97,7 +99,6 @@ impl HmsCatalog {
}
}

/// Refer to <https://github.com/apache/iceberg/blob/main/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java> for implementation details.
#[async_trait]
impl Catalog for HmsCatalog {
/// HMS doesn't support nested namespaces.
Expand Down Expand Up @@ -125,36 +126,165 @@ impl Catalog for HmsCatalog {
.collect())
}

/// Creates a new namespace with the given identifier and properties.
///
/// Attempts to create a namespace defined by the `namespace`
/// parameter and configured with the specified `properties`.
///
/// This function can return an error in the following situations:
///
/// - If `hive.metastore.database.owner-type` is specified without
/// `hive.metastore.database.owner`,
/// - Errors from `validate_namespace` if the namespace identifier does not
/// meet validation criteria.
/// - Errors from `convert_to_database` if the properties cannot be
/// successfully converted into a database configuration.
/// - Errors from the underlying database creation process, converted using
/// `from_thrift_error`.
async fn create_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
todo!()
let database = convert_to_database(namespace, &properties)?;

self.client
.0
.create_database(database)
.await
.map_err(from_thrift_error)?;

Ok(Namespace::new(namespace.clone()))
}

async fn get_namespace(&self, _namespace: &NamespaceIdent) -> Result<Namespace> {
todo!()
/// Retrieves a namespace by its identifier.
///
/// Validates the given namespace identifier and then queries the
/// underlying database client to fetch the corresponding namespace data.
/// Constructs a `Namespace` object with the retrieved data and returns it.
///
/// This function can return an error in any of the following situations:
/// - If the provided namespace identifier fails validation checks
/// - If there is an error querying the database, returned by
/// `from_thrift_error`.
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let name = validate_namespace(namespace)?;

let db = self
.client
.0
.get_database(name.clone().into())
.await
.map_err(from_thrift_error)?;

let ns = convert_to_namespace(&db)?;

Ok(ns)
}

async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> Result<bool> {
todo!()
/// Checks if a namespace exists within the Hive Metastore.
///
/// Validates the namespace identifier by querying the Hive Metastore
/// to determine if the specified namespace (database) exists.
///
/// # Returns
/// A `Result<bool>` indicating the outcome of the check:
/// - `Ok(true)` if the namespace exists.
/// - `Ok(false)` if the namespace does not exist, identified by a specific
/// `UserException` variant.
/// - `Err(...)` if an error occurs during validation or the Hive Metastore
/// query, with the error encapsulating the issue.
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let name = validate_namespace(namespace)?;

let resp = self.client.0.get_database(name.clone().into()).await;

match resp {
Ok(_) => Ok(true),
Err(err) => {
if let ResponseError::UserException(ThriftHiveMetastoreGetDatabaseException::O1(
_,
)) = &err
{
Ok(false)
} else {
Err(from_thrift_error(err))
}
}
}
}

/// Asynchronously updates properties of an existing namespace.
///
/// Converts the given namespace identifier and properties into a database
/// representation and then attempts to update the corresponding namespace
/// in the Hive Metastore.
///
/// # Returns
/// Returns `Ok(())` if the namespace update is successful. If the
/// namespace cannot be updated due to missing information or an error
/// during the update process, an `Err(...)` is returned.
async fn update_namespace(
&self,
_namespace: &NamespaceIdent,
_properties: HashMap<String, String>,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
todo!()
let db = convert_to_database(namespace, &properties)?;

let name = match &db.name {
Some(name) => name,
None => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Database name must be specified",
))
}
};

self.client
.0
.alter_database(name.clone(), db)
.await
.map_err(from_thrift_error)?;

Ok(())
}

async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
todo!()
/// Asynchronously drops a namespace from the Hive Metastore.
///
/// # Returns
/// A `Result<()>` indicating the outcome:
/// - `Ok(())` signifies successful namespace deletion.
/// - `Err(...)` signifies failure to drop the namespace due to validation
/// errors, connectivity issues, or Hive Metastore constraints.
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let name = validate_namespace(namespace)?;

self.client
.0
.drop_database(name.into(), false, false)
.await
.map_err(from_thrift_error)?;

Ok(())
}

async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
todo!()
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let name = validate_namespace(namespace)?;

let tables = self
.client
.0
.get_all_tables(name.clone().into())
.await
.map_err(from_thrift_error)?;

let tables = tables
.iter()
.map(|table| TableIdent::new(namespace.clone(), table.to_string()))
.collect();

Ok(tables)
}

async fn create_table(
Expand Down
Loading
Loading