Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(driver-adapters): PlanetScale transactions #4967

Merged
merged 36 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5f41707
fix(driver-adapters): fix transaction ordering for ISOLATION LEVEL, i…
jkomyno Jul 23, 2024
bb8460f
chore(driver-adapters): uncomment fixed PlanetScale tests
jkomyno Jul 23, 2024
3ac4848
Merge branch 'main' into fix/planetscale-transactions
jkomyno Jul 23, 2024
9f21cf7
chore: retrigger CI
jkomyno Jul 23, 2024
35ba9f4
Merge branch 'fix/planetscale-transactions' of github.com:prisma/pris…
jkomyno Jul 23, 2024
f229b5b
feat(driver-adapters): add support for TransactionContext
jkomyno Jul 24, 2024
c926163
test(connector-test-kit-rs): uncomment succeeding "basic_serializable…
jkomyno Jul 24, 2024
37337e0
chore(driver-adapters): fix types in executor
jkomyno Jul 24, 2024
df97438
DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retri…
jkomyno Jul 24, 2024
4c35e3b
feat(driver-adapters): impl Send on JsTransactionContext on wasm32
jkomyno Jul 24, 2024
9eb1b4a
DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retri…
jkomyno Jul 24, 2024
7fd92e7
feat(driver-adapters): impl FromJsValue for JsTransactionContext on w…
jkomyno Jul 24, 2024
7d29edf
DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retri…
jkomyno Jul 24, 2024
6cb4abc
feat(driver-adapters): impl Send for TransactionContextProxy on wasm32
jkomyno Jul 25, 2024
5e7db17
DRIVER_ADAPTERS_BRANCH=feat-driver-adapters-transaction-context retri…
jkomyno Jul 25, 2024
3fabd89
chore: attempt Send/Sync-compatibility for JS transactions
jkomyno Jul 29, 2024
ae1817f
Merge branch 'main' into integration/fix-planetscale-transactions
jkomyno Aug 12, 2024
f272255
chore: add "parse_raw_query" to "JsTransactionContext"
jkomyno Aug 12, 2024
1352120
fix(driver-adapters): enable wasm32 compilation of "JsQueryable::star…
jkomyno Aug 12, 2024
6b0a78a
chore: remove commented-out method
jkomyno Aug 12, 2024
630e1df
test(connector-test-kit-rs): uncomment "interactive_tx" tests
jkomyno Aug 12, 2024
5154993
Merge branch 'main' into integration/fix-planetscale-transactions
jkomyno Aug 12, 2024
86454fd
DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore…
jkomyno Aug 12, 2024
cc0baab
tmp(connector-test-kit-rs): add (failing) concurrent_create_select re…
jkomyno Aug 14, 2024
290b8da
Revert "tmp(connector-test-kit-rs): add (failing) concurrent_create_s…
jkomyno Aug 26, 2024
10625e5
Merge branch 'main' into integration/fix-planetscale-transactions
jkomyno Aug 26, 2024
6885845
DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore…
jkomyno Aug 26, 2024
1624c79
feat(driver-adapters): rename "parse_raw_query" to "describe_query" a…
jkomyno Aug 26, 2024
c83aea2
DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore…
jkomyno Aug 26, 2024
578efcc
Merge branch 'main' into integration/fix-planetscale-transactions
jkomyno Sep 3, 2024
c2572c2
chore: don't leak concrete UnsafeFuture types
jkomyno Sep 3, 2024
808fdbe
chore(driver-adapters): nit, replace "::napi" with "napi"
jkomyno Sep 3, 2024
e2b566d
Revert "chore(driver-adapters): nit, replace "::napi" with "napi""
jkomyno Sep 3, 2024
7cb8cc9
DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore…
jkomyno Sep 3, 2024
63e3110
chore: unbox JsTransactionContext
jkomyno Sep 3, 2024
0deadde
DRIVER_ADAPTERS_BRANCH=feat/driver-adapters-transaction-context chore…
jkomyno Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ mod interactive_tx {
Ok(())
}

#[connector_test(exclude(Vitess("planetscale.js.wasm"), Sqlite("cfd1")))]
#[connector_test(exclude(Sqlite("cfd1")))]
async fn batch_queries_failure(mut runner: Runner) -> TestResult<()> {
// Tx expires after five second.
let tx_id = runner.start_tx(5000, 5000, None).await?;
Expand Down Expand Up @@ -256,7 +256,7 @@ mod interactive_tx {
Ok(())
}

#[connector_test(exclude(Vitess("planetscale.js.wasm")))]
#[connector_test]
async fn tx_expiration_failure_cycle(mut runner: Runner) -> TestResult<()> {
// Tx expires after one seconds.
let tx_id = runner.start_tx(5000, 1000, None).await?;
Expand Down Expand Up @@ -573,10 +573,7 @@ mod itx_isolation {
use query_engine_tests::*;

// All (SQL) connectors support serializable.
// However, there's a bug in the PlanetScale driver adapter:
// "Transaction characteristics can't be changed while a transaction is in progress
// (errno 1568) (sqlstate 25001) during query: SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"
#[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm"), Sqlite("cfd1")))]
#[connector_test(exclude(MongoDb, Sqlite("cfd1")))]
async fn basic_serializable(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, Some("Serializable".to_owned())).await?;
runner.set_active_tx(tx_id.clone());
Expand All @@ -598,9 +595,7 @@ mod itx_isolation {
Ok(())
}

// On PlanetScale, this fails with:
// `InteractiveTransactionError("Error in connector: Error querying the database: Server error: `ERROR 25001 (1568): Transaction characteristics can't be changed while a transaction is in progress'")`
#[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm"), Sqlite("cfd1")))]
#[connector_test(exclude(MongoDb, Sqlite("cfd1")))]
async fn casing_doesnt_matter(mut runner: Runner) -> TestResult<()> {
let tx_id = runner.start_tx(5000, 5000, Some("sErIaLiZaBlE".to_owned())).await?;
runner.set_active_tx(tx_id.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ mod transactional {
Ok(())
}

// On PlanetScale, this fails with:
// "Error in connector: Error querying the database: Server error: `ERROR 25001 (1568): Transaction characteristics can't be changed while a transaction is in progress'""
#[connector_test(exclude(MongoDb, Vitess("planetscale.js", "planetscale.js.wasm")))]
#[connector_test(exclude(MongoDb))]
async fn valid_isolation_level(runner: Runner) -> TestResult<()> {
let queries = vec![r#"mutation { createOneModelB(data: { id: 1 }) { id }}"#.to_string()];

Expand Down
4 changes: 2 additions & 2 deletions query-engine/driver-adapters/executor/src/recording.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function recorder(adapter: DriverAdapter, recordings: Recordings) {
return {
provider: adapter.provider,
adapterName: adapter.adapterName,
startTransaction: () => {
transactionContext: () => {
throw new Error("Not implemented");
},
getConnectionInfo: () => {
Expand All @@ -43,7 +43,7 @@ function replayer(adapter: DriverAdapter, recordings: Recordings) {
provider: adapter.provider,
adapterName: adapter.adapterName,
recordings: recordings,
startTransaction: () => {
transactionContext: () => {
throw new Error("Not implemented");
},
getConnectionInfo: () => {
Expand Down
39 changes: 34 additions & 5 deletions query-engine/driver-adapters/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::send_future::UnsafeFuture;
use crate::types::JsConnectionInfo;
pub use crate::types::{JSResultSet, Query, TransactionOptions};
use crate::{
from_js_value, get_named_property, get_optional_named_property, to_rust_str, AdapterMethod, JsObject, JsResult,
JsString, JsTransaction,
};
use crate::{send_future::UnsafeFuture, transaction::JsTransactionContext};

use futures::Future;
use metrics::increment_gauge;
Expand All @@ -28,8 +28,19 @@ pub(crate) struct CommonProxy {
/// This is a JS proxy for accessing the methods specific to top level
/// JS driver objects
pub(crate) struct DriverProxy {
start_transaction: AdapterMethod<(), JsTransaction>,
/// Retrieve driver-specific info, such as the maximum number of query parameters
get_connection_info: Option<AdapterMethod<(), JsConnectionInfo>>,

/// Provide a transaction context, in which raw commands are guaranteed to be executed in
/// the same scope as a future transaction, which can be spawned by via
/// [`driver_adapters::transaction::JsTransactionContext::start_transaction`].
/// This was first introduced for supporting Isolation Levels in PlanetScale.
transaction_context: AdapterMethod<(), JsTransactionContext>,
}

/// This is a JS proxy for accessing the methods specific to JS transaction contexts.
pub(crate) struct TransactionContextProxy {
start_transaction: AdapterMethod<(), JsTransaction>,
}

/// This a JS proxy for accessing the methods, specific
Expand All @@ -48,6 +59,7 @@ pub(crate) struct TransactionProxy {
closed: AtomicBool,
}

// TypeScript: Queryable
impl CommonProxy {
pub fn new(object: &JsObject) -> JsResult<Self> {
let provider: JsString = get_named_property(object, "provider")?;
Expand All @@ -68,11 +80,12 @@ impl CommonProxy {
}
}

// TypeScript: DriverAdapter
impl DriverProxy {
pub fn new(object: &JsObject) -> JsResult<Self> {
Ok(Self {
start_transaction: get_named_property(object, "startTransaction")?,
get_connection_info: get_optional_named_property(object, "getConnectionInfo")?,
transaction_context: get_named_property(object, "transactionContext")?,
})
}

Expand All @@ -87,6 +100,20 @@ impl DriverProxy {
.await
}

pub async fn transaction_context(&self) -> quaint::Result<JsTransactionContext> {
let ctx = self.transaction_context.call_as_async(()).await?;

Ok(ctx)
}
}

impl TransactionContextProxy {
pub fn new(object: &JsObject) -> JsResult<Self> {
let start_transaction = get_named_property(object, "startTransaction")?;

Ok(Self { start_transaction })
}

async fn start_transaction_inner(&self) -> quaint::Result<Box<JsTransaction>> {
let tx = self.start_transaction.call_as_async(()).await?;

Expand All @@ -98,7 +125,7 @@ impl DriverProxy {
Ok(Box::new(tx))
}

pub fn start_transaction(&self) -> UnsafeFuture<impl Future<Output = quaint::Result<Box<JsTransaction>>> + '_> {
pub fn start_transaction(&self) -> impl Future<Output = quaint::Result<Box<JsTransaction>>> + '_ {
UnsafeFuture(self.start_transaction_inner())
}
}
Expand Down Expand Up @@ -184,6 +211,8 @@ macro_rules! impl_send_sync_on_wasm {

// Assume the proxy object will not be sent to service workers, we can unsafe impl Send + Sync.
impl_send_sync_on_wasm!(TransactionProxy);
impl_send_sync_on_wasm!(JsTransaction);
impl_send_sync_on_wasm!(TransactionContextProxy);
impl_send_sync_on_wasm!(JsTransactionContext);
impl_send_sync_on_wasm!(DriverProxy);
impl_send_sync_on_wasm!(CommonProxy);
impl_send_sync_on_wasm!(JsTransaction);
36 changes: 27 additions & 9 deletions query-engine/driver-adapters/src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,33 +301,41 @@ impl QuaintQueryable for JsQueryable {
}
}

#[async_trait]
impl TransactionCapable for JsQueryable {
async fn start_transaction<'a>(
impl JsQueryable {
async fn start_transaction_inner<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> quaint::Result<Box<dyn Transaction + 'a>> {
let tx = self.driver_proxy.start_transaction().await?;
// 1. Obtain a transaction context from the driver.
// Any command run on this context is guaranteed to be part of the same session
// as the transaction spawned from it.
let tx_ctx = self.driver_proxy.transaction_context().await?;

let isolation_first = tx.requires_isolation_first();
let requires_isolation_first = tx_ctx.requires_isolation_first();

if isolation_first {
// 2. Set the isolation level (if specified) if the provider requires it to be set before
// creating the transaction.
if requires_isolation_first {
if let Some(isolation) = isolation {
tx.set_tx_isolation_level(isolation).await?;
tx_ctx.set_tx_isolation_level(isolation).await?;
}
}

let begin_stmt = tx.begin_statement();
// 3. Spawn a transaction from the context.
let tx = tx_ctx.start_transaction().await?;
jkomyno marked this conversation as resolved.
Show resolved Hide resolved

let begin_stmt = tx.begin_statement();
let tx_opts = tx.options();

if tx_opts.use_phantom_query {
let begin_stmt = JsBaseQueryable::phantom_query_message(begin_stmt);
tx.raw_phantom_cmd(begin_stmt.as_str()).await?;
} else {
tx.raw_cmd(begin_stmt).await?;
}

if !isolation_first {
// 4. Set the isolation level (if specified) if we didn't do it before.
if !requires_isolation_first {
if let Some(isolation) = isolation {
tx.set_tx_isolation_level(isolation).await?;
}
Expand All @@ -339,6 +347,16 @@ impl TransactionCapable for JsQueryable {
}
}

#[async_trait]
impl TransactionCapable for JsQueryable {
async fn start_transaction<'a>(
&'a self,
isolation: Option<IsolationLevel>,
) -> quaint::Result<Box<dyn Transaction + 'a>> {
UnsafeFuture(self.start_transaction_inner(isolation)).await
}
}

pub fn from_js(driver: JsObject) -> JsQueryable {
let common = CommonProxy::new(&driver).unwrap();
let driver_proxy = DriverProxy::new(&driver).unwrap();
Expand Down
99 changes: 98 additions & 1 deletion query-engine/driver-adapters/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::future::Future;

use async_trait::async_trait;
use metrics::decrement_gauge;
use quaint::{
Expand All @@ -6,10 +8,78 @@ use quaint::{
Value,
};

use crate::proxy::{TransactionOptions, TransactionProxy};
use crate::proxy::{TransactionContextProxy, TransactionOptions, TransactionProxy};
use crate::{proxy::CommonProxy, queryable::JsBaseQueryable, send_future::UnsafeFuture};
use crate::{JsObject, JsResult};

pub(crate) struct JsTransactionContext {
tx_ctx_proxy: TransactionContextProxy,
inner: JsBaseQueryable,
}

// Wrapper around JS transaction context objects that implements Queryable. Can be used in place of quaint transaction,
// context, but delegates most operations to JS
impl JsTransactionContext {
pub(crate) fn new(inner: JsBaseQueryable, tx_ctx_proxy: TransactionContextProxy) -> Self {
Self { inner, tx_ctx_proxy }
}

pub fn start_transaction(&self) -> impl Future<Output = quaint::Result<Box<JsTransaction>>> + '_ {
UnsafeFuture(self.tx_ctx_proxy.start_transaction())
}
}

#[async_trait]
impl Queryable for JsTransactionContext {
aqrln marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

@aqrln aqrln Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of a slippery slope but I really feel like implementing Deref is appropriate in these cases. Let's not do it in this PR to avoid churning unrelated code, but I'd strongly consider doing it later. Others may disagree though.

async fn query(&self, q: QuaintQuery<'_>) -> quaint::Result<ResultSet> {
self.inner.query(q).await
}

async fn query_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<ResultSet> {
self.inner.query_raw(sql, params).await
}

async fn query_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<ResultSet> {
self.inner.query_raw_typed(sql, params).await
}

async fn execute(&self, q: QuaintQuery<'_>) -> quaint::Result<u64> {
self.inner.execute(q).await
}

async fn execute_raw(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.inner.execute_raw(sql, params).await
}

async fn execute_raw_typed(&self, sql: &str, params: &[Value<'_>]) -> quaint::Result<u64> {
self.inner.execute_raw_typed(sql, params).await
}

async fn raw_cmd(&self, cmd: &str) -> quaint::Result<()> {
self.inner.raw_cmd(cmd).await
}

async fn version(&self) -> quaint::Result<Option<String>> {
self.inner.version().await
}

async fn describe_query(&self, sql: &str) -> quaint::Result<DescribedQuery> {
self.inner.describe_query(sql).await
}

fn is_healthy(&self) -> bool {
self.inner.is_healthy()
}

async fn set_tx_isolation_level(&self, isolation_level: IsolationLevel) -> quaint::Result<()> {
self.inner.set_tx_isolation_level(isolation_level).await
}

fn requires_isolation_first(&self) -> bool {
self.inner.requires_isolation_first()
}
}

// Wrapper around JS transaction objects that implements Queryable
// and quaint::Transaction. Can be used in place of quaint transaction,
// but delegates most operations to JS
Expand Down Expand Up @@ -149,3 +219,30 @@ impl ::napi::bindgen_prelude::FromNapiValue for JsTransaction {
Ok(Self::new(JsBaseQueryable::new(common_proxy), tx_proxy))
}
}

#[cfg(target_arch = "wasm32")]
impl super::wasm::FromJsValue for JsTransactionContext {
fn from_js_value(value: wasm_bindgen::prelude::JsValue) -> JsResult<Self> {
use wasm_bindgen::JsCast;

let object = value.dyn_into::<JsObject>()?;
let common_proxy = CommonProxy::new(&object)?;
let base = JsBaseQueryable::new(common_proxy);
let tx_ctx_proxy = TransactionContextProxy::new(&object)?;

Ok(Self::new(base, tx_ctx_proxy))
}
}

/// Implementing unsafe `from_napi_value` allows retrieving a threadsafe `JsTransactionContext` in `DriverProxy`
/// while keeping derived futures `Send`.
#[cfg(not(target_arch = "wasm32"))]
impl ::napi::bindgen_prelude::FromNapiValue for JsTransactionContext {
jkomyno marked this conversation as resolved.
Show resolved Hide resolved
unsafe fn from_napi_value(env: napi::sys::napi_env, napi_val: napi::sys::napi_value) -> JsResult<Self> {
let object = JsObject::from_napi_value(env, napi_val)?;
let common_proxy = CommonProxy::new(&object)?;
let tx_ctx_proxy = TransactionContextProxy::new(&object)?;

Ok(Self::new(JsBaseQueryable::new(common_proxy), tx_ctx_proxy))
}
}
Loading