Skip to content

Commit

Permalink
feat: Bump hive_metastore to use pure rust thrift impl volo (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Feb 5, 2024
1 parent 390cd51 commit 09765db
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 47 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ typed-builder = "^0.18"
url = "2"
urlencoding = "2"
uuid = "1.6.1"

volo-thrift = "0.9.2"
hive_metastore = "0.0.2"
tera = "1"
10 changes: 3 additions & 7 deletions crates/catalog/hms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,9 @@ license = { workspace = true }
keywords = ["iceberg", "hive", "catalog"]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
hive_metastore = "0.0.1"
hive_metastore = { workspace = true }
iceberg = { workspace = true }
# the thrift upstream suffered from no regular rust release.
#
# [test-rs](https://github.com/tent-rs) is an organization that helps resolves this
# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the thrift
# crate, built from the thrift upstream with only version bumped.
thrift = { package = "tent-thrift", version = "0.18.1" }
typed-builder = { workspace = true }
volo-thrift = { workspace = true }
76 changes: 38 additions & 38 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

use super::utils::*;
use async_trait::async_trait;
use hive_metastore::{TThriftHiveMetastoreSyncClient, ThriftHiveMetastoreSyncClient};
use hive_metastore::ThriftHiveMetastoreClient;
use hive_metastore::ThriftHiveMetastoreClientBuilder;
use iceberg::table::Table;
use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent};
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
TableIdent,
};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};
use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol};
use thrift::transport::{
ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, WriteHalf,
};
use std::net::ToSocketAddrs;
use typed_builder::TypedBuilder;

/// Hive metastore Catalog configuration.
Expand All @@ -35,24 +35,7 @@ pub struct HmsCatalogConfig {
address: String,
}

/// TODO: We only support binary protocol for now.
type HmsClientType = ThriftHiveMetastoreSyncClient<
TBinaryInputProtocol<TBufferedReadTransport<ReadHalf<thrift::transport::TTcpChannel>>>,
TBinaryOutputProtocol<TBufferedWriteTransport<WriteHalf<thrift::transport::TTcpChannel>>>,
>;

/// # TODO
///
/// we are using the same connection everytime, we should support connection
/// pool in the future.
struct HmsClient(Arc<Mutex<HmsClientType>>);

impl HmsClient {
fn call<T>(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result<T>) -> Result<T> {
let mut client = self.0.lock().unwrap();
f(&mut client).map_err(from_thrift_error)
}
}
struct HmsClient(ThriftHiveMetastoreClient);

/// Hive metastore Catalog.
pub struct HmsCatalog {
Expand All @@ -71,19 +54,29 @@ impl Debug for HmsCatalog {
impl HmsCatalog {
/// Create a new hms catalog.
pub fn new(config: HmsCatalogConfig) -> Result<Self> {
let mut channel = thrift::transport::TTcpChannel::new();
channel
.open(config.address.as_str())
.map_err(from_thrift_error)?;
let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?;
let i_chan = TBufferedReadTransport::new(i_chan);
let o_chan = TBufferedWriteTransport::new(o_chan);
let i_proto = TBinaryInputProtocol::new(i_chan, true);
let o_proto = TBinaryOutputProtocol::new(o_chan, true);
let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto);
let address = config
.address
.as_str()
.to_socket_addrs()
.map_err(from_io_error)?
.next()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
format!("invalid address: {}", config.address),
)
})?;

let client = ThriftHiveMetastoreClientBuilder::new("hms")
.address(address)
// Framed thrift rpc is not enabled by default in HMS, we use
// buffered instead.
.make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered())
.build();

Ok(Self {
config,
client: HmsClient(Arc::new(Mutex::new(client))),
client: HmsClient(client),
})
}
}
Expand All @@ -103,10 +96,17 @@ impl Catalog for HmsCatalog {
let dbs = if parent.is_some() {
return Ok(vec![]);
} else {
self.client.call(|client| client.get_all_databases())?
self.client
.0
.get_all_databases()
.await
.map_err(from_thrift_error)?
};

Ok(dbs.into_iter().map(NamespaceIdent::new).collect())
Ok(dbs
.into_iter()
.map(|v| NamespaceIdent::new(v.into()))
.collect())
}

async fn create_namespace(
Expand Down
17 changes: 16 additions & 1 deletion crates/catalog/hms/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,28 @@
// specific language governing permissions and limitations
// under the License.

use anyhow::anyhow;
use iceberg::{Error, ErrorKind};
use std::fmt::Debug;
use std::io;

/// Format a thrift error into iceberg error.
pub fn from_thrift_error(error: thrift::Error) -> Error {
pub fn from_thrift_error<T>(error: volo_thrift::error::ResponseError<T>) -> Error
where
T: Debug,
{
Error::new(
ErrorKind::Unexpected,
"operation failed for hitting thrift error".to_string(),
)
.with_source(anyhow!("thrift error: {:?}", error))
}

/// Format an io error into iceberg error.
pub fn from_io_error(error: io::Error) -> Error {
Error::new(
ErrorKind::Unexpected,
"operation failed for hitting io error".to_string(),
)
.with_source(error)
}

0 comments on commit 09765db

Please sign in to comment.