Skip to content

Commit

Permalink
Prepared statements issue (#223)
Browse files Browse the repository at this point in the history
* fix prepared statements issue

* bumped notification server

* remove prepared statement

* add changelog

* updated changelog

* use self.0
  • Loading branch information
lassemand authored Sep 17, 2024
1 parent f1c87e6 commit 9f443d9
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 133 deletions.
6 changes: 6 additions & 0 deletions notification-server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.3.2
- Initiate prepared statements from within each function call
-
## 0.3.1
- No longer use aspn specific information

## 0.3.0
- Updated the Concordium Rust SDK to support the changes introduced in protocol 7.

Expand Down
2 changes: 1 addition & 1 deletion notification-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion notification-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Concordium AG [email protected]"]
edition = "2021"
name = "notification-server"
version = "0.3.1"
version = "0.3.2"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
6 changes: 5 additions & 1 deletion notification-server/scripts/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ FROM ${base_image}
WORKDIR /usr/app

RUN apt-get update && \
apt-get install -y gnupg wget lsb-release && \
sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/postgres.list' && \
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \
apt-get update && \
apt-get -y install \
postgresql-client \
postgresql-client-14 \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

Expand Down
1 change: 0 additions & 1 deletion notification-server/src/bin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ async fn process_device_subscription(
let decoded_accounts = decoded_accounts?;
state
.db_connection
.prepared
.upsert_subscription(
decoded_accounts,
subscription.preferences,
Expand Down
3 changes: 0 additions & 3 deletions notification-server/src/bin/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ async fn process_block(
);
let operation = || async {
match database_connection
.prepared
.get_devices_from_account(result.address())
.await
{
Expand Down Expand Up @@ -228,7 +227,6 @@ async fn process_block(
}
let operation = || async {
database_connection
.prepared
.insert_block(&block_hash, &finalized_block.height)
.await
.map_err(|err| match err {
Expand Down Expand Up @@ -405,7 +403,6 @@ async fn main() -> anyhow::Result<()> {
let database_connection = DatabaseConnection::create(args.db_connection).await?;
let mut concordium_client = Client::new(endpoint).await?;
let mut height = if let Some(height) = database_connection
.prepared
.get_processed_block_height()
.await
.context("Failed to get processed block height")?
Expand Down
188 changes: 62 additions & 126 deletions notification-server/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::models::device::{Device, Preference};
use anyhow::Context;
use concordium_rust_sdk::{
base::hashes::BlockHash, common::types::AccountAddress, types::AbsoluteBlockHeight,
};
use deadpool_postgres::{Manager, ManagerConfig, Pool, PoolError, RecyclingMethod};
use deadpool_postgres::{GenericClient, Manager, ManagerConfig, Pool, PoolError, RecyclingMethod};
use lazy_static::lazy_static;
use log::error;
use std::{
Expand All @@ -27,63 +26,31 @@ pub enum Error {
}

#[derive(Clone, Debug)]
pub struct PreparedStatements {
get_devices_from_account: tokio_postgres::Statement,
upsert_device: tokio_postgres::Statement,
get_latest_block_height: tokio_postgres::Statement,
insert_block: tokio_postgres::Statement,
pool: Pool,
}
pub struct DatabaseConnection(Pool);

impl PreparedStatements {
async fn new(pool: Pool) -> anyhow::Result<Self> {
let mut client = pool.get().await.context("Failed to get client")?;
let transaction = client
.transaction()
.await
.context("Failed to start a transaction")?;
let get_devices_from_account = transaction
.prepare(
"SELECT device_id, preferences FROM account_device_mapping WHERE address = $1 \
LIMIT 1000",
)
.await
.context("Failed to create account device mapping")?;
let upsert_device = transaction
.prepare(
"INSERT INTO account_device_mapping (address, device_id, preferences) VALUES ($1, \
$2, $3) ON CONFLICT (address, device_id) DO UPDATE SET preferences = \
EXCLUDED.preferences;",
)
.await
.context("Failed to create account device mapping")?;
let get_latest_block_height = transaction
.prepare(
impl DatabaseConnection {
pub async fn create(config: tokio_postgres::config::Config) -> anyhow::Result<Self> {
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
let mgr = Manager::from_config(config, NoTls, mgr_config);
let pool = Pool::builder(mgr)
.max_size(16)
.build()
.expect("Failed to create pool");
Ok(DatabaseConnection(pool))
}

pub async fn get_processed_block_height(&self) -> Result<Option<AbsoluteBlockHeight>, Error> {
let client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached(
"SELECT blocks.height FROM blocks WHERE blocks.id = (SELECT MAX(blocks.id) FROM \
blocks);",
)
.await
.context("Failed to create get latest block height")?;
let insert_block = transaction
.prepare("INSERT INTO blocks (hash, height) VALUES ($1, $2);")
.await
.context("Failed to create insert block")?;
transaction
.commit()
.await
.context("Failed to commit transaction")?;
Ok(PreparedStatements {
get_devices_from_account,
upsert_device,
get_latest_block_height,
insert_block,
pool,
})
}

pub async fn get_processed_block_height(&self) -> Result<Option<AbsoluteBlockHeight>, Error> {
let client = self.pool.get().await.map_err(Into::<Error>::into)?;
let row = client.query_opt(&self.get_latest_block_height, &[]).await?;
.map_err(Into::<Error>::into)?;
let row = client.query_opt(&stmt, &[]).await?;
row.map(|row| row.try_get::<_, i64>(0).map(|raw| (raw as u64).into()))
.transpose()
.map_err(Into::into)
Expand All @@ -93,10 +60,17 @@ impl PreparedStatements {
&self,
account_address: &AccountAddress,
) -> Result<Vec<Device>, Error> {
let client = self.pool.get().await.map_err(Into::<Error>::into)?;
let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&account_address.0.as_ref()];
let client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached(
"SELECT device_id, preferences FROM account_device_mapping WHERE address = $1 \
LIMIT 1000",
)
.await
.map_err(Into::<Error>::into)?;
let params: &[&(dyn ToSql + Sync)] = &[&account_address.0.as_ref()];
let rows = client
.query(&self.get_devices_from_account, params)
.query(&stmt, params)
.await
.map_err(Into::<Error>::into)?;
rows.iter()
Expand All @@ -114,13 +88,21 @@ impl PreparedStatements {
preferences: Vec<Preference>,
device_token: &str,
) -> Result<(), Error> {
let mut client = self.pool.get().await.map_err(Into::<Error>::into)?;
let mut client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached(
"INSERT INTO account_device_mapping (address, device_id, preferences) VALUES ($1, \
$2, $3) ON CONFLICT (address, device_id) DO UPDATE SET preferences = \
EXCLUDED.preferences;",
)
.await
.map_err(Into::<Error>::into)?;
let preferences_mask = preferences_to_bitmask(preferences.into_iter());
let transaction = client.transaction().await?;
for account in account_address {
let params: &[&(dyn ToSql + Sync)] =
&[&account.0.as_ref(), &device_token, &preferences_mask];
if let Err(e) = transaction.execute(&self.upsert_device, params).await {
if let Err(e) = transaction.execute(&stmt, params).await {
let _ = transaction.rollback().await;
return Err(e.into());
}
Expand All @@ -133,42 +115,23 @@ impl PreparedStatements {
hash: &BlockHash,
height: &AbsoluteBlockHeight,
) -> Result<(), Error> {
let client = self.pool.get().await.map_err(Into::<Error>::into)?;
let params: &[&(dyn ToSql + Sync); 2] = &[&hash.as_ref(), &(height.height as i64)];
client
.execute(&self.insert_block, params)
let client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached("INSERT INTO blocks (hash, height) VALUES ($1, $2);")
.await
.map_or_else(
|err| {
if let Some(db_err) = err.as_db_error() {
if db_err.code() == &SqlState::UNIQUE_VIOLATION {
return Err(Error::ConstraintViolation(*hash, *height));
}
};
Err(Error::DatabaseConnection(err))
},
|_| Ok(()),
)
}
}

#[derive(Clone, Debug)]
pub struct DatabaseConnection {
pub prepared: PreparedStatements,
}

impl DatabaseConnection {
pub async fn create(config: tokio_postgres::config::Config) -> anyhow::Result<Self> {
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
let mgr = Manager::from_config(config, NoTls, mgr_config);
let pool = Pool::builder(mgr)
.max_size(16)
.build()
.expect("Failed to create pool");
let prepared = PreparedStatements::new(pool).await?;
Ok(DatabaseConnection { prepared })
.map_err(Into::<Error>::into)?;
let params: &[&(dyn ToSql + Sync); 2] = &[&hash.as_ref(), &(height.height as i64)];
client.execute(&stmt, params).await.map_or_else(
|err| {
if let Some(db_err) = err.as_db_error() {
if db_err.code() == &SqlState::UNIQUE_VIOLATION {
return Err(Error::ConstraintViolation(*hash, *height));
}
};
Err(Error::DatabaseConnection(err))
},
|_| Ok(()),
)
}
}

Expand Down Expand Up @@ -322,7 +285,7 @@ mod tests {
.unwrap();
let db_connection = DatabaseConnection::create(config).await?;

let client = db_connection.prepared.pool.get().await?;
let client = db_connection.0.get().await?;
drop_all_tables(&client).await?;
create_sql(&client).await?;

Expand All @@ -337,7 +300,6 @@ mod tests {
AccountAddress::from_str("4FmiTW2L2AccyR9VjzsnpWFSAcohXWf7Vf797i36y526mqiEcp").unwrap();
let device = "device-1";
db_connection
.prepared
.upsert_subscription(
vec![account_address],
vec![Preference::CIS2Transaction],
Expand All @@ -346,7 +308,6 @@ mod tests {
.await
.unwrap();
let devices = db_connection
.prepared
.get_devices_from_account(&account_address)
.await
.unwrap();
Expand All @@ -366,12 +327,10 @@ mod tests {
AccountAddress::from_str("4FmiTW2L2AccyR9VjzsnpWFSAcohXWf7Vf797i36y526mqiEcp").unwrap();
let device = "device-1";
db_connection
.prepared
.upsert_subscription(vec![account_address], vec![CIS2Transaction], device)
.await
.unwrap();
db_connection
.prepared
.upsert_subscription(
vec![account_address],
vec![CIS2Transaction, CCDTransaction],
Expand All @@ -380,7 +339,6 @@ mod tests {
.await
.unwrap();
let devices = db_connection
.prepared
.get_devices_from_account(&account_address)
.await
.unwrap();
Expand All @@ -391,12 +349,10 @@ mod tests {
)]);

db_connection
.prepared
.upsert_subscription(vec![account_address], vec![], device)
.await
.unwrap();
let devices = db_connection
.prepared
.get_devices_from_account(&account_address)
.await
.unwrap();
Expand All @@ -414,32 +370,16 @@ mod tests {
let hash = BlockHash::new([0; 32]); // Example block hash
let height = AbsoluteBlockHeight::from(1);

db_connection
.prepared
.insert_block(&hash, &height)
.await
.unwrap();
db_connection.insert_block(&hash, &height).await.unwrap();

let latest_height = db_connection
.prepared
.get_processed_block_height()
.await
.unwrap();
let latest_height = db_connection.get_processed_block_height().await.unwrap();
assert_eq!(latest_height, Some(height));

let hash = BlockHash::new([1; 32]); // Example block hash
let height = AbsoluteBlockHeight::from(2);

db_connection
.prepared
.insert_block(&hash, &height)
.await
.unwrap();
let latest_height = db_connection
.prepared
.get_processed_block_height()
.await
.unwrap();
db_connection.insert_block(&hash, &height).await.unwrap();
let latest_height = db_connection.get_processed_block_height().await.unwrap();
assert_eq!(latest_height.unwrap().height, 2);
}

Expand All @@ -452,7 +392,6 @@ mod tests {
let expected_height = AbsoluteBlockHeight::from(1);

db_connection
.prepared
.insert_block(
&BlockHash::new(expected_hash),
&AbsoluteBlockHeight::from(2),
Expand All @@ -461,7 +400,6 @@ mod tests {
.unwrap();

if db_connection
.prepared
.insert_block(&BlockHash::new(expected_hash), &expected_height)
.await
.is_err()
Expand All @@ -479,13 +417,11 @@ mod tests {
let expected_height = AbsoluteBlockHeight::from(1);

db_connection
.prepared
.insert_block(&BlockHash::new([0; 32]), &expected_height)
.await
.unwrap();

match db_connection
.prepared
.insert_block(&BlockHash::new(expected_hash), &expected_height)
.await
{
Expand Down

0 comments on commit 9f443d9

Please sign in to comment.