From 71c387f4b0a1803d6ed0aa855d3568e57e8a4681 Mon Sep 17 00:00:00 2001 From: Jan Kaul Date: Tue, 12 Mar 2024 18:10:52 +0100 Subject: [PATCH] use prepared statements --- crates/catalog/sql/Cargo.toml | 6 +- crates/catalog/sql/src/catalog.rs | 112 +++++++++++------------------- 2 files changed, 42 insertions(+), 76 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 1c1c7244f..5440935b1 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -29,18 +29,18 @@ license = { workspace = true } keywords = ["iceberg", "sql", "catalog"] [dependencies] -anyhow = "1" +anyhow = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } dashmap = "5.5.3" futures = { workspace = true } iceberg = { workspace = true } -log = "0.4.20" +log = { workspace = true } opendal = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } serde_json = { workspace = true } -sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } +sqlx = { version = "0.7.2", features = ["tls-rustls", "any", "sqlite", "postgres", "mysql"], default-features = false } typed-builder = { workspace = true } url = { workspace = true } urlencoding = { workspace = true } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index cec5b3f8e..fc4296be9 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; -use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt}; +use futures::{AsyncReadExt, AsyncWriteExt}; use sqlx::{ - any::{install_default_drivers, AnyConnectOptions, AnyRow}, - AnyConnection, ConnectOptions, Connection, Row, + any::{install_default_drivers, AnyRow}, + AnyPool, Row, }; use std::collections::HashMap; @@ -38,7 +38,7 @@ use crate::error::from_sqlx_error; /// Sql catalog implementation. pub struct SqlCatalog { name: String, - connection: Arc>, + connection: AnyPool, storage: FileIO, cache: Arc>, } @@ -48,17 +48,10 @@ impl SqlCatalog { pub async fn new(url: &str, name: &str, storage: FileIO) -> Result { install_default_drivers(); - let mut connection = AnyConnectOptions::connect( - &AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?, - ) - .await - .map_err(from_sqlx_error)?; + let pool = AnyPool::connect(url).await.map_err(from_sqlx_error)?; - connection - .transaction(|txn| { - Box::pin(async move { - sqlx::query( - "create table if not exists iceberg_tables ( + sqlx::query( + "create table if not exists iceberg_tables ( catalog_name text not null, table_namespace text not null, table_name text not null, @@ -66,36 +59,27 @@ impl SqlCatalog { previous_metadata_location text, primary key (catalog_name, table_namespace, table_name) );", - ) - .execute(&mut **txn) - .await - }) - }) - .await - .map_err(from_sqlx_error)?; + ) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; - connection - .transaction(|txn| { - Box::pin(async move { - sqlx::query( - "create table if not exists iceberg_namespace_properties ( + sqlx::query( + "create table if not exists iceberg_namespace_properties ( catalog_name text not null, namespace text not null, property_key text, property_value text, primary key (catalog_name, namespace, property_key) );", - ) - .execute(&mut **txn) - .await - }) - }) - .await - .map_err(from_sqlx_error)?; + ) + .execute(&pool) + .await + .map_err(from_sqlx_error)?; Ok(SqlCatalog { name: name.to_owned(), - connection: Arc::new(Mutex::new(connection)), + connection: pool, storage, cache: Arc::new(DashMap::new()), }) @@ -135,12 +119,14 @@ impl Catalog for SqlCatalog { &self, _parent: Option<&NamespaceIdent>, ) -> Result> { - let mut connection = self.connection.lock().await; - let rows = connection.transaction(|txn|{ - let name = self.name.clone(); - Box::pin(async move { - sqlx::query(&format!("select distinct table_namespace from iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let name = self.name.clone(); + let rows = sqlx::query( + "select distinct table_namespace from iceberg_tables where catalog_name = ?;", + ) + .bind(&name) + .fetch_all(&self.connection) + .await + .map_err(from_sqlx_error)?; let iter = rows.iter().map(|row| row.try_get::(0)); Ok(iter @@ -185,13 +171,9 @@ impl Catalog for SqlCatalog { } async fn list_tables(&self, namespace: &NamespaceIdent) -> Result> { - let mut connection = self.connection.lock().await; - let rows = connection.transaction(|txn|{ - let name = self.name.clone(); - let namespace = namespace.encode_in_url(); - Box::pin(async move { - sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}';",&name, &namespace)).fetch_all(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let name = self.name.clone(); + let namespace = namespace.encode_in_url(); + let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ?;").bind(&name).bind(&namespace).fetch_all(&self.connection).await.map_err(from_sqlx_error)?; let iter = rows.iter().map(query_map); Ok(iter @@ -212,16 +194,10 @@ impl Catalog for SqlCatalog { } async fn stat_table(&self, identifier: &TableIdent) -> Result { - let mut connection = self.connection.lock().await; - let rows = connection.transaction(|txn|{ - let catalog_name = self.name.clone(); - let namespace = identifier.namespace().encode_in_url(); - let name = identifier.name().to_string(); - Box::pin(async move { - sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, - &namespace, - &name)).fetch_all(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let catalog_name = self.name.clone(); + let namespace = identifier.namespace().encode_in_url(); + let name = identifier.name().to_string(); + let rows = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_all(&self.connection).await.map_err(from_sqlx_error)?; let mut iter = rows.iter().map(query_map); Ok(iter.next().is_some()) @@ -233,16 +209,10 @@ impl Catalog for SqlCatalog { async fn load_table(&self, identifier: &TableIdent) -> Result { let metadata_location = { - let mut connection = self.connection.lock().await; - let row = connection.transaction(|txn|{ let catalog_name = self.name.clone(); let namespace = identifier.namespace().encode_in_url(); let name = identifier.name().to_string(); - Box::pin(async move { - sqlx::query(&format!("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = '{}' and table_namespace = '{}' and table_name = '{}';",&catalog_name, - &namespace, - &name)).fetch_one(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let row = sqlx::query("select table_namespace, table_name, metadata_location, previous_metadata_location from iceberg_tables where catalog_name = ? and table_namespace = ? and table_name = ?;").bind(&catalog_name).bind(&namespace).bind(&name).fetch_one(&self.connection).await.map_err(from_sqlx_error)?; let row = query_map(&row).map_err(from_sqlx_error)?; row.metadata_location @@ -301,15 +271,11 @@ impl Catalog for SqlCatalog { .write_all(&serde_json::to_vec(&metadata)?) .await?; { - let mut connection = self.connection.lock().await; - connection.transaction(|txn|{ - let catalog_name = self.name.clone(); - let namespace = namespace.encode_in_url(); - let name = creation.name.clone(); - let metadata_location = metadata_location.to_string(); - Box::pin(async move { - sqlx::query(&format!("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values ('{}', '{}', '{}', '{}');",catalog_name,namespace,name, metadata_location)).execute(&mut **txn).await - })}).await.map_err(from_sqlx_error)?; + let catalog_name = self.name.clone(); + let namespace = namespace.encode_in_url(); + let name = creation.name.clone(); + let metadata_location = metadata_location.to_string(); + sqlx::query("insert into iceberg_tables (catalog_name, table_namespace, table_name, metadata_location) values (?, ?, ?, ?);").bind(&catalog_name).bind(&namespace).bind(&name).bind(&metadata_location).execute(&self.connection).await.map_err(from_sqlx_error)?; } Ok(Table::builder() .file_io(self.storage.clone())