Skip to content

Commit

Permalink
use prepared statements
Browse files Browse the repository at this point in the history
  • Loading branch information
JanKaul committed Mar 12, 2024
1 parent 04fa683 commit 71c387f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 76 deletions.
6 changes: 3 additions & 3 deletions crates/catalog/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ license = { workspace = true }
keywords = ["iceberg", "sql", "catalog"]

[dependencies]
anyhow = "1"
anyhow = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
dashmap = "5.5.3"
futures = { workspace = true }
iceberg = { workspace = true }
log = "0.4.20"
log = { workspace = true }
opendal = { workspace = true }
serde = { workspace = true }
serde_derive = { workspace = true }
serde_json = { workspace = true }
sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false }
sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false }
typed-builder = { workspace = true }
url = { workspace = true }
urlencoding = { workspace = true }
Expand Down
112 changes: 39 additions & 73 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;
use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt};
use futures::{AsyncReadExt, AsyncWriteExt};
use sqlx::{
any::{install_default_drivers, AnyConnectOptions, AnyRow},
AnyConnection, ConnectOptions, Connection, Row,
any::{install_default_drivers, AnyRow},
AnyPool, Row,
};
use std::collections::HashMap;

Expand All @@ -38,7 +38,7 @@ use crate::error::from_sqlx_error;
/// Sql catalog implementation.
pub struct SqlCatalog {
name: String,
connection: Arc<Mutex<AnyConnection>>,
connection: AnyPool,
storage: FileIO,
cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
}
Expand All @@ -48,54 +48,38 @@ impl SqlCatalog {
pub async fn new(url: &str, name: &str, storage: FileIO) -> Result<Self> {
install_default_drivers();

let mut connection = AnyConnectOptions::connect(
&AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?,
)
.await
.map_err(from_sqlx_error)?;
let pool = AnyPool::connect(url).await.map_err(from_sqlx_error)?;

connection
.transaction(|txn| {
Box::pin(async move {
sqlx::query(
"create table if not exists iceberg_tables (
sqlx::query(
"create table if not exists iceberg_tables (
catalog_name text not null,
table_namespace text not null,
table_name text not null,
metadata_location text not null,
previous_metadata_location text,
primary key (catalog_name, table_namespace, table_name)
);",
)
.execute(&mut **txn)
.await
})
})
.await
.map_err(from_sqlx_error)?;
)
.execute(&pool)
.await
.map_err(from_sqlx_error)?;

connection
.transaction(|txn| {
Box::pin(async move {
sqlx::query(
"create table if not exists iceberg_namespace_properties (
sqlx::query(
"create table if not exists iceberg_namespace_properties (
catalog_name text not null,
namespace text not null,
property_key text,
property_value text,
primary key (catalog_name, namespace, property_key)
);",
)
.execute(&mut **txn)
.await
})
})
.await
.map_err(from_sqlx_error)?;
)
.execute(&pool)
.await
.map_err(from_sqlx_error)?;

Ok(SqlCatalog {
name: name.to_owned(),
connection: Arc::new(Mutex::new(connection)),
connection: pool,
storage,
cache: Arc::new(DashMap::new()),
})
Expand Down Expand Up @@ -135,12 +119,14 @@ impl Catalog for SqlCatalog {
&self,
_parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
let mut connection = self.connection.lock().await;
let rows = connection.transaction(|txn|{
let name = self.name.clone();
Box::pin(async move {
sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await
})}).await.map_err(from_sqlx_error)?;
let name = self.name.clone();
let rows = sqlx::query(
"select distinct table_namespace from iceberg_tables where catalog_name = ?;",
)
.bind(&name)
.fetch_all(&self.connection)
.await
.map_err(from_sqlx_error)?;
let iter = rows.iter().map(|row| row.try_get::<String, _>(0));

Ok(iter
Expand Down Expand Up @@ -185,13 +171,9 @@ impl Catalog for SqlCatalog {
}

async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let mut connection = self.connection.lock().await;
let rows = connection.transaction(|txn|{
let name = self.name.clone();
let namespace = namespace.encode_in_url();
Box::pin(async move {
sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&mut **txn).await
})}).await.map_err(from_sqlx_error)?;
let name = self.name.clone();
let namespace = namespace.encode_in_url();
let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ?;").bind(&name).bind(&namespace).fetch_all(&self.connection).await.map_err(from_sqlx_error)?;
let iter = rows.iter().map(query_map);

Ok(iter
Expand All @@ -212,16 +194,10 @@ impl Catalog for SqlCatalog {
}

async fn stat_table(&self, identifier: &TableIdent) -> Result<bool> {
let mut connection = self.connection.lock().await;
let rows = connection.transaction(|txn|{
let catalog_name = self.name.clone();
let namespace = identifier.namespace().encode_in_url();
let name = identifier.name().to_string();
Box::pin(async move {
sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
&namespace,
&name)).fetch_all(&mut **txn).await
})}).await.map_err(from_sqlx_error)?;
let catalog_name = self.name.clone();
let namespace = identifier.namespace().encode_in_url();
let name = identifier.name().to_string();
let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_all(&self.connection).await.map_err(from_sqlx_error)?;
let mut iter = rows.iter().map(query_map);

Ok(iter.next().is_some())
Expand All @@ -233,16 +209,10 @@ impl Catalog for SqlCatalog {

async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
let metadata_location = {
let mut connection = self.connection.lock().await;
let row = connection.transaction(|txn|{
let catalog_name = self.name.clone();
let namespace = identifier.namespace().encode_in_url();
let name = identifier.name().to_string();
Box::pin(async move {
sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name,
&namespace,
&name)).fetch_one(&mut **txn).await
})}).await.map_err(from_sqlx_error)?;
let row = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_one(&self.connection).await.map_err(from_sqlx_error)?;
let row = query_map(&row).map_err(from_sqlx_error)?;

row.metadata_location
Expand Down Expand Up @@ -301,15 +271,11 @@ impl Catalog for SqlCatalog {
.write_all(&serde_json::to_vec(&metadata)?)
.await?;
{
let mut connection = self.connection.lock().await;
connection.transaction(|txn|{
let catalog_name = self.name.clone();
let namespace = namespace.encode_in_url();
let name = creation.name.clone();
let metadata_location = metadata_location.to_string();
Box::pin(async move {
sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut **txn).await
})}).await.map_err(from_sqlx_error)?;
let catalog_name = self.name.clone();
let namespace = namespace.encode_in_url();
let name = creation.name.clone();
let metadata_location = metadata_location.to_string();
sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);").bind(&catalog_name).bind(&namespace).bind(&name).bind(&metadata_location).execute(&self.connection).await.map_err(from_sqlx_error)?;
}
Ok(Table::builder()
.file_io(self.storage.clone())
Expand Down

0 comments on commit 71c387f

Please sign in to comment.