From bb89459b720dd132af5eeb9cbea542855f236644 Mon Sep 17 00:00:00 2001 From: Serhii Tatarintsev Date: Fri, 1 Sep 2023 01:03:37 +0200 Subject: [PATCH] feat(js-connectors): Proper JS error propagation (#4186) * feat(js-connectors): Proper JS error propogation The way it is implemented: 1. On TS side, we catch all errors from connector methods. In case we encounterd an error, we store it on per-connector error registry and return it's numeric id to the engine. 2. Engine propogates that numeric id all the way throuhg quaint up to query-engine-node-api, where it becomes new kind of error, `ExternalError` with a code of P2036 and error id in the meta field. 3. Client is then expected to pick the error from the registry and re-throw it. This PR implements this for smoke-tests, similar implementation needs to be done in the client. Implementation is done this way, rather than propogating `napi::Error` instance, to avoid dependencies on napi types in all intermediate crates other than `js-connectors` and `query-engine-node-api`. To better accomodate this pattern, `Result` type was introduced to the connectors. It allows to explicitly indicate if particular call succeeded or failed via return value. On Rust side, those values get parsed and converted into `std::result::Result` values. On Rust side, `AsyncJsFunction` wrapper over `ThreadsafeFunction` is introduced. It handles promises and JS result type conversions automatically and simplifies `Proxy` code. This also lays the foundation for supporting "Known" errors - JsError type could be extended in a future with new error types, that could be converted into standard prisma errors with normal error codes. Fix #prisma/team-orm#260 * Address review feedback --------- Co-authored-by: jkomyno --- .../src/query_engine/mod.rs | 7 ++ quaint/src/error.rs | 8 ++ .../connectors/query-connector/src/error.rs | 6 + .../sql-query-connector/src/error.rs | 5 + .../js/js-connector-utils/src/binder.ts | 67 +++++++++-- .../js/js-connector-utils/src/index.ts | 2 +- .../js/js-connector-utils/src/types.ts | 35 +++++- .../js/neon-js-connector/src/neon.ts | 32 ++--- .../js/pg-js-connector/src/pg.ts | 28 +++-- .../src/planetscale.ts | 30 ++--- .../js/smoke-test-js/src/test.ts | 68 +++++++---- .../js-connectors/src/async_js_function.rs | 70 +++++++++++ query-engine/js-connectors/src/lib.rs | 2 + query-engine/js-connectors/src/proxy.rs | 113 ++++++------------ query-engine/js-connectors/src/queryable.rs | 33 ++--- query-engine/js-connectors/src/result.rs | 78 ++++++++++++ query-engine/js-connectors/src/transaction.rs | 16 ++- .../query-engine-node-api/src/engine.rs | 2 +- 18 files changed, 410 insertions(+), 192 deletions(-) create mode 100644 query-engine/js-connectors/src/async_js_function.rs create mode 100644 query-engine/js-connectors/src/result.rs diff --git a/libs/user-facing-errors/src/query_engine/mod.rs b/libs/user-facing-errors/src/query_engine/mod.rs index 065189ee75d6..48833cb490a0 100644 --- a/libs/user-facing-errors/src/query_engine/mod.rs +++ b/libs/user-facing-errors/src/query_engine/mod.rs @@ -328,3 +328,10 @@ pub struct DatabaseAssertionViolation { /// Database error returned by the underlying connector driver. pub database_error: String, } + +#[derive(Debug, UserFacingError, Serialize)] +#[user_facing(code = "P2036", message = "Error in external connector (id {id})")] +pub struct ExternalError { + /// id of the error in external system, which would allow to retrieve it later + pub id: i32, +} diff --git a/quaint/src/error.rs b/quaint/src/error.rs index fcfe83da5952..73f88dc90b30 100644 --- a/quaint/src/error.rs +++ b/quaint/src/error.rs @@ -138,6 +138,11 @@ impl Error { pub fn raw_connector_error(status: String, reason: String) -> Error { Error::builder(ErrorKind::RawConnectorError { status, reason }).build() } + + // Builds an error from an externally stored error + pub fn external_error(error_id: i32) -> Error { + Error::builder(ErrorKind::ExternalError(error_id)).build() + } } impl fmt::Display for Error { @@ -272,6 +277,9 @@ pub enum ErrorKind { #[error("Column type '{}' could not be deserialized from the database.", column_type)] UnsupportedColumnType { column_type: String }, + + #[error("External error id#{}", _0)] + ExternalError(i32), } impl ErrorKind { diff --git a/query-engine/connectors/query-connector/src/error.rs b/query-engine/connectors/query-connector/src/error.rs index 903fca348ce1..96d8d9dcbacb 100644 --- a/query-engine/connectors/query-connector/src/error.rs +++ b/query-engine/connectors/query-connector/src/error.rs @@ -113,6 +113,9 @@ impl ConnectorError { message: message.clone(), }, )), + ErrorKind::ExternalError(id) => Some(user_facing_errors::KnownError::new( + user_facing_errors::query_engine::ExternalError { id: id.to_owned() }, + )), _ => None, }; @@ -266,6 +269,9 @@ pub enum ErrorKind { #[error("Unsupported connector: {0}")] UnsupportedConnector(String), + + #[error("External connector error")] + ExternalError(i32), } impl From for ConnectorError { diff --git a/query-engine/connectors/sql-query-connector/src/error.rs b/query-engine/connectors/sql-query-connector/src/error.rs index edbe087b34e4..070faba6ca14 100644 --- a/query-engine/connectors/sql-query-connector/src/error.rs +++ b/query-engine/connectors/sql-query-connector/src/error.rs @@ -175,6 +175,9 @@ pub enum SqlError { #[error("Cannot find a fulltext index to use for the search")] MissingFullTextSearchIndex, + + #[error("External connector error")] + ExternalError(i32), } impl SqlError { @@ -254,6 +257,7 @@ impl SqlError { } SqlError::MissingFullTextSearchIndex => ConnectorError::from_kind(ErrorKind::MissingFullTextSearchIndex), SqlError::InvalidIsolationLevel(msg) => ConnectorError::from_kind(ErrorKind::InternalConversionError(msg)), + SqlError::ExternalError(error_id) => ConnectorError::from_kind(ErrorKind::ExternalError(error_id)), } } } @@ -295,6 +299,7 @@ impl From for SqlError { QuaintKind::InvalidIsolationLevel(msg) => Self::InvalidIsolationLevel(msg), QuaintKind::TransactionWriteConflict => Self::TransactionWriteConflict, QuaintKind::RollbackWithoutBegin => Self::RollbackWithoutBegin, + QuaintKind::ExternalError(error_id) => Self::ExternalError(error_id), e @ QuaintKind::UnsupportedColumnType { .. } => SqlError::ConversionError(e.into()), e @ QuaintKind::TransactionAlreadyClosed(_) => SqlError::TransactionAlreadyClosed(format!("{e}")), e @ QuaintKind::IncorrectNumberOfParameters { .. } => SqlError::QueryError(e.into()), diff --git a/query-engine/js-connectors/js/js-connector-utils/src/binder.ts b/query-engine/js-connectors/js/js-connector-utils/src/binder.ts index 1f64f3e98700..640fdf0462a4 100644 --- a/query-engine/js-connectors/js/js-connector-utils/src/binder.ts +++ b/query-engine/js-connectors/js/js-connector-utils/src/binder.ts @@ -1,23 +1,64 @@ -import type { Connector, Transaction } from './types'; +import type { ErrorCapturingConnector, Connector, Transaction, ErrorRegistry, ErrorRecord, Result } from './types'; + + +class ErrorRegistryInternal implements ErrorRegistry { + private registeredErrors: ErrorRecord[] = [] + + consumeError(id: number): ErrorRecord | undefined { + return this.registeredErrors[id] + } + + registerNewError(error: unknown) { + let i=0; + while (this.registeredErrors[i] !== undefined) { + i++ + } + this.registeredErrors[i] = { error } + return i + } + +} // *.bind(connector) is required to preserve the `this` context of functions whose // execution is delegated to napi.rs. -export const bindConnector = (connector: Connector): Connector => ({ - queryRaw: connector.queryRaw.bind(connector), - executeRaw: connector.executeRaw.bind(connector), - flavour: connector.flavour, - startTransaction: connector.startTransaction.bind(connector), - close: connector.close.bind(connector) -}) +export const bindConnector = (connector: Connector): ErrorCapturingConnector => { + const errorRegistry = new ErrorRegistryInternal() + + return { + errorRegistry, + queryRaw: wrapAsync(errorRegistry, connector.queryRaw.bind(connector)), + executeRaw: wrapAsync(errorRegistry, connector.executeRaw.bind(connector)), + flavour: connector.flavour, + startTransaction: async (...args) => { + const result = await connector.startTransaction(...args); + if (result.ok) { + return { ok: true, value: bindTransaction(errorRegistry, result.value)} + } + return result + }, + close: wrapAsync(errorRegistry, connector.close.bind(connector)) + } +} // *.bind(transaction) is required to preserve the `this` context of functions whose // execution is delegated to napi.rs. -export const bindTransaction = (transaction: Transaction): Transaction => { +const bindTransaction = (errorRegistry: ErrorRegistryInternal, transaction: Transaction): Transaction => { return ({ flavour: transaction.flavour, - queryRaw: transaction.queryRaw.bind(transaction), - executeRaw: transaction.executeRaw.bind(transaction), - commit: transaction.commit.bind(transaction), - rollback: transaction.rollback.bind(transaction) + queryRaw: wrapAsync(errorRegistry, transaction.queryRaw.bind(transaction)), + executeRaw: wrapAsync(errorRegistry, transaction.executeRaw.bind(transaction)), + commit: wrapAsync(errorRegistry, transaction.commit.bind(transaction)), + rollback: wrapAsync(errorRegistry, transaction.rollback.bind(transaction)) }); +} + +function wrapAsync(registry: ErrorRegistryInternal, fn: (...args: A) => Promise>): (...args: A) => Promise> { + return async (...args) => { + try { + return await fn(...args) + } catch (error) { + const id = registry.registerNewError(error) + return { ok: false, error: { kind: 'GenericJsError', id } } + } + } } \ No newline at end of file diff --git a/query-engine/js-connectors/js/js-connector-utils/src/index.ts b/query-engine/js-connectors/js/js-connector-utils/src/index.ts index 921411d50987..704d996280e6 100644 --- a/query-engine/js-connectors/js/js-connector-utils/src/index.ts +++ b/query-engine/js-connectors/js/js-connector-utils/src/index.ts @@ -1,4 +1,4 @@ -export { bindConnector, bindTransaction } from './binder' +export { bindConnector } from './binder' export { ColumnTypeEnum } from './const' export { Debug } from './debug' export type * from './types' diff --git a/query-engine/js-connectors/js/js-connector-utils/src/types.ts b/query-engine/js-connectors/js/js-connector-utils/src/types.ts index 196ef2f7dbec..0ecce7b4edad 100644 --- a/query-engine/js-connectors/js/js-connector-utils/src/types.ts +++ b/query-engine/js-connectors/js/js-connector-utils/src/types.ts @@ -32,6 +32,19 @@ export type Query = { args: Array } +export type Error = { + kind: 'GenericJsError', + id: number +} + +export type Result = { + ok: true, + value: T +} | { + ok: false, + error: Error +} + export interface Queryable { readonly flavour: 'mysql' | 'postgres' @@ -41,7 +54,7 @@ export interface Queryable { * * This is the preferred way of executing `SELECT` queries. */ - queryRaw(params: Query): Promise + queryRaw(params: Query): Promise> /** * Execute a query given as SQL, interpolating the given parameters, @@ -50,7 +63,7 @@ export interface Queryable { * This is the preferred way of executing `INSERT`, `UPDATE`, `DELETE` queries, * as well as transactional queries. */ - executeRaw(params: Query): Promise + executeRaw(params: Query): Promise> } export interface Connector extends Queryable { @@ -58,23 +71,27 @@ export interface Connector extends Queryable { * Starts new transation with the specified isolation level * @param isolationLevel */ - startTransaction(isolationLevel?: string): Promise + startTransaction(isolationLevel?: string): Promise> /** * Closes the connection to the database, if any. */ - close: () => Promise + close: () => Promise> } export interface Transaction extends Queryable { /** * Commit the transaction */ - commit(): Promise + commit(): Promise> /** * Rolls back the transaction. */ - rollback(): Promise + rollback(): Promise> +} + +export interface ErrorCapturingConnector extends Connector { + readonly errorRegistry: ErrorRegistry } /** @@ -86,3 +103,9 @@ export type ConnectorConfig = { */ url: string, } + +export interface ErrorRegistry { + consumeError(id: number): ErrorRecord | undefined +} + +export type ErrorRecord = { error: unknown } diff --git a/query-engine/js-connectors/js/neon-js-connector/src/neon.ts b/query-engine/js-connectors/js/neon-js-connector/src/neon.ts index 2ab36c75bb62..d5b11301c4cc 100644 --- a/query-engine/js-connectors/js/neon-js-connector/src/neon.ts +++ b/query-engine/js-connectors/js/neon-js-connector/src/neon.ts @@ -1,8 +1,8 @@ import { FullQueryResults, PoolClient, neon, neonConfig } from '@neondatabase/serverless' import { NeonConfig, NeonQueryFunction, Pool, QueryResult } from '@neondatabase/serverless' import ws from 'ws' -import { bindConnector, bindTransaction, Debug } from '@jkomyno/prisma-js-connector-utils' -import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction } from '@jkomyno/prisma-js-connector-utils' +import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils' +import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' import { fieldToColumnType } from './conversion' neonConfig.webSocketConstructor = ws @@ -22,7 +22,7 @@ type PerformIOResult = QueryResult | FullQueryResults abstract class NeonQueryable implements Queryable { flavour = 'postgres' as const - async queryRaw(query: Query): Promise { + async queryRaw(query: Query): Promise> { const tag = '[js::query_raw]' debug(`${tag} %O`, query) @@ -35,15 +35,15 @@ abstract class NeonQueryable implements Queryable { rows: results.map(result => columns.map(column => result[column])), } - return resultSet + return { ok: true, value: resultSet } } - async executeRaw(query: Query): Promise { + async executeRaw(query: Query): Promise> { const tag = '[js::execute_raw]' debug(`${tag} %O`, query) const { rowCount: rowsAffected } = await this.performIO(query) - return rowsAffected + return { ok: true, value: rowsAffected } } abstract performIO(query: Query): Promise @@ -71,17 +71,19 @@ class NeonWsQueryable extends NeonQueryable { } class NeonTransaction extends NeonWsQueryable implements Transaction { - async commit(): Promise { + async commit(): Promise> { try { await this.client.query('COMMIT'); + return { ok: true, value: undefined } } finally { this.client.release() } } - async rollback(): Promise { + async rollback(): Promise> { try { await this.client.query('ROLLBACK'); + return { ok: true, value: undefined } } finally { this.client.release() } @@ -96,22 +98,22 @@ class NeonWsConnector extends NeonWsQueryable implements Connector { super(new Pool({ connectionString, ...rest })) } - async startTransaction(isolationLevel?: string | undefined): Promise { + async startTransaction(isolationLevel?: string | undefined): Promise> { const connection = await this.client.connect() await connection.query('BEGIN') if (isolationLevel) { await connection.query(`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`) } - return bindTransaction(new NeonTransaction(connection)) + return { ok: true, value: new NeonTransaction(connection) } } async close() { - this.client.on('error', e => console.log(e)) if (this.isRunning) { await this.client.end() this.isRunning = false } + return { ok: true as const, value: undefined } } } @@ -129,15 +131,17 @@ class NeonHttpConnector extends NeonQueryable implements Connector { return await this.client(sql, values) } - startTransaction(): Promise { + startTransaction(): Promise> { return Promise.reject(new Error('Transactions are not supported in HTTP mode')) } - async close() {} + async close() { + return { ok: true as const, value: undefined } + } } -export const createNeonConnector = (config: PrismaNeonConfig): Connector => { +export const createNeonConnector = (config: PrismaNeonConfig): ErrorCapturingConnector => { const db = config.httpMode ? new NeonHttpConnector(config) : new NeonWsConnector(config) return bindConnector(db) } diff --git a/query-engine/js-connectors/js/pg-js-connector/src/pg.ts b/query-engine/js-connectors/js/pg-js-connector/src/pg.ts index 57fcb92677c7..2e98892f28ef 100644 --- a/query-engine/js-connectors/js/pg-js-connector/src/pg.ts +++ b/query-engine/js-connectors/js/pg-js-connector/src/pg.ts @@ -1,6 +1,6 @@ import * as pg from 'pg' -import { bindConnector, bindTransaction, Debug } from '@jkomyno/prisma-js-connector-utils' -import type { Connector, ConnectorConfig, Query, Queryable, ResultSet, Transaction } from '@jkomyno/prisma-js-connector-utils' +import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils' +import type { ErrorCapturingConnector, Connector, ConnectorConfig, Query, Queryable, Result, ResultSet, Transaction } from '@jkomyno/prisma-js-connector-utils' import { fieldToColumnType } from './conversion' const debug = Debug('prisma:js-connector:pg') @@ -20,7 +20,7 @@ class PgQueryable /** * Execute a query given as SQL, interpolating the given parameters. */ - async queryRaw(query: Query): Promise { + async queryRaw(query: Query): Promise> { const tag = '[js::query_raw]' debug(`${tag} %O`, query) @@ -33,7 +33,7 @@ class PgQueryable rows: results.map((result) => columns.map((column) => result[column])), } - return resultSet + return { ok: true, value: resultSet } } /** @@ -41,12 +41,12 @@ class PgQueryable * returning the number of affected rows. * Note: Queryable expects a u64, but napi.rs only supports u32. */ - async executeRaw(query: Query): Promise { + async executeRaw(query: Query): Promise> { const tag = '[js::execute_raw]' debug(`${tag} %O`, query) const { rowCount } = await this.performIO(query) - return rowCount + return { ok: true, value: rowCount } } /** @@ -74,23 +74,25 @@ class PgTransaction extends PgQueryable super(client) } - async commit(): Promise { + async commit(): Promise> { const tag = '[js::commit]' debug(`${tag} committing transaction`) try { await this.client.query('COMMIT') + return { ok: true, value: undefined } } finally { this.client.release() } } - async rollback(): Promise { + async rollback(): Promise> { const tag = '[js::rollback]' debug(`${tag} rolling back the transaction`) try { await this.client.query('ROLLBACK') + return { ok: true, value: undefined } } finally { this.client.release() } @@ -108,7 +110,7 @@ class PrismaPg extends PgQueryable implements Connector { super(client) } - async startTransaction(isolationLevel?: string): Promise { + async startTransaction(isolationLevel?: string): Promise> { const connection = await this.client.connect() await connection.query('BEGIN') @@ -118,13 +120,15 @@ class PrismaPg extends PgQueryable implements Connector { ) } - return bindTransaction(new PgTransaction(connection)) + return { ok: true, value: new PgTransaction(connection) } } - async close() {} + async close() { + return { ok: true as const, value: undefined } + } } -export const createPgConnector = (config: PrismaPgConfig): Connector => { +export const createPgConnector = (config: PrismaPgConfig): ErrorCapturingConnector => { const db = new PrismaPg(config) return bindConnector(db) } diff --git a/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts b/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts index dc11d548c781..b2b66c85b22f 100644 --- a/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts +++ b/query-engine/js-connectors/js/planetscale-js-connector/src/planetscale.ts @@ -1,7 +1,7 @@ import * as planetScale from '@planetscale/database' import type { Config as PlanetScaleConfig } from '@planetscale/database' -import { bindConnector, bindTransaction, Debug } from '@jkomyno/prisma-js-connector-utils' -import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction } from '@jkomyno/prisma-js-connector-utils' +import { bindConnector, Debug } from '@jkomyno/prisma-js-connector-utils' +import type { Connector, ResultSet, Query, ConnectorConfig, Queryable, Transaction, Result, ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' import { type PlanetScaleColumnType, fieldToColumnType } from './conversion' import { createDeferred, Deferred } from './deferred' @@ -29,7 +29,7 @@ class PlanetScaleQueryable { + async queryRaw(query: Query): Promise> { const tag = '[js::query_raw]' debug(`${tag} %O`, query) @@ -43,7 +43,7 @@ class PlanetScaleQueryable { + async executeRaw(query: Query): Promise> { const tag = '[js::execute_raw]' debug(`${tag} %O`, query) const { rowsAffected } = await this.performIO(query) - return rowsAffected + return { ok: true, value: rowsAffected } } /** @@ -83,18 +83,18 @@ class PlanetScaleTransaction extends PlanetScaleQueryable { + async commit(): Promise> { const tag = '[js::commit]' debug(`${tag} committing transaction`) this.txDeferred.resolve() - return this.txResultPromise; + return { ok: true, value: await this.txResultPromise }; } - rollback(): Promise { + async rollback(): Promise> { const tag = '[js::rollback]' debug(`${tag} rolling back the transaction`) this.txDeferred.reject(new RollbackError()) - return this.txResultPromise; + return { ok: true, value: await this.txResultPromise }; } } @@ -108,7 +108,7 @@ class PrismaPlanetScale extends PlanetScaleQueryable imp } async startTransaction(isolationLevel?: string) { - return new Promise((resolve) => { + return new Promise>((resolve) => { const txResultPromise = this.client.transaction(async tx => { if (isolationLevel) { await tx.execute(`SET TRANSACTION ISOLATION LEVEL ${isolationLevel}`) @@ -116,7 +116,7 @@ class PrismaPlanetScale extends PlanetScaleQueryable imp const [txDeferred, deferredPromise] = createDeferred() const txWrapper = new PlanetScaleTransaction(tx, txDeferred, txResultPromise) - resolve(bindTransaction(txWrapper)); + resolve({ ok: true, value: txWrapper }); return deferredPromise }).catch(error => { @@ -131,10 +131,12 @@ class PrismaPlanetScale extends PlanetScaleQueryable imp }) } - async close() {} + async close() { + return { ok: true as const, value: undefined } + } } -export const createPlanetScaleConnector = (config: PrismaPlanetScaleConfig): Connector => { +export const createPlanetScaleConnector = (config: PrismaPlanetScaleConfig): ErrorCapturingConnector => { const db = new PrismaPlanetScale(config) return bindConnector(db) } diff --git a/query-engine/js-connectors/js/smoke-test-js/src/test.ts b/query-engine/js-connectors/js/smoke-test-js/src/test.ts index f5ba493f25b6..4c297c9e39b2 100644 --- a/query-engine/js-connectors/js/smoke-test-js/src/test.ts +++ b/query-engine/js-connectors/js/smoke-test-js/src/test.ts @@ -1,9 +1,10 @@ import { setImmediate, setTimeout } from 'node:timers/promises' -import type { Connector } from '@jkomyno/prisma-js-connector-utils' +import type { ErrorCapturingConnector } from '@jkomyno/prisma-js-connector-utils' import type { QueryEngineInstance } from './engines/types/Library' import { initQueryEngine } from './util' +import { JsonQuery } from './engines/types/JsonProtocol' -export async function smokeTest(db: Connector, prismaSchemaRelativePath: string) { +export async function smokeTest(db: ErrorCapturingConnector, prismaSchemaRelativePath: string) { // wait for the database pool to be initialized await setImmediate(0) @@ -15,7 +16,7 @@ export async function smokeTest(db: Connector, prismaSchemaRelativePath: string) // console.log('[nodejs] isHealthy', await conn.isHealthy()) - const test = new SmokeTest(engine, db.flavour) + const test = new SmokeTest(engine, db, db.flavour) await test.testTypeTest2() await test.testFindManyTypeTest() @@ -45,7 +46,7 @@ export async function smokeTest(db: Connector, prismaSchemaRelativePath: string) } class SmokeTest { - constructor(private readonly engine: QueryEngineInstance, readonly flavour: Connector['flavour']) {} + constructor(private readonly engine: QueryEngineInstance, private readonly connector: ErrorCapturingConnector, readonly flavour: ErrorCapturingConnector['flavour']) {} async testTypeTest2() { const create = await this.engine.query(` @@ -112,7 +113,7 @@ class SmokeTest { return } - const resultSet = await this.engine.query(` + const resultSet = await this.doQuery( { "action": "findMany", "modelName": "type_test", @@ -141,9 +142,9 @@ class SmokeTest { "blob_column": true } } - } - `, 'trace', undefined) - console.log('[nodejs] findMany resultSet', JSON.stringify(JSON.parse(resultSet), null, 2)) + }) + + console.log('[nodejs] findMany resultSet', JSON.stringify(resultSet, null, 2)) return resultSet } @@ -153,7 +154,7 @@ class SmokeTest { return } - const resultSet = await this.engine.query(` + const resultSet = await this.doQuery( { "action": "findMany", "modelName": "type_test", @@ -178,14 +179,14 @@ class SmokeTest { } } } - `, 'trace', undefined) - console.log('[nodejs] findMany resultSet', JSON.stringify(JSON.parse(resultSet), null, 2)) + ) + console.log('[nodejs] findMany resultSet', JSON.stringify((resultSet), null, 2)) return resultSet } async createAutoIncrement() { - await this.engine.query(` + await this.doQuery( { "modelName": "Author", "action": "deleteMany", @@ -198,9 +199,9 @@ class SmokeTest { } } } - `, 'trace', undefined) + ) - const author = await this.engine.query(` + const author = await this.doQuery( { "modelName": "Author", "action": "createOne", @@ -219,8 +220,8 @@ class SmokeTest { } } } - `, 'trace', undefined) - console.log('[nodejs] author', JSON.stringify(JSON.parse(author), null, 2)) + ) + console.log('[nodejs] author', JSON.stringify(author, null, 2)) } async testCreateAndDeleteChildParent() { @@ -340,17 +341,34 @@ class SmokeTest { const tx_id = JSON.parse(startResponse).id console.log('[nodejs] transaction id', tx_id) - await this.engine.query(` - { - "action": "findMany", - "modelName": "Author", - "query": { - "selection": { "$scalars": true } - } - } - `, 'trace', tx_id) + await this.doQuery( + { + "action": "findMany", + "modelName": "Author", + "query": { + "selection": { "$scalars": true } + } + }, + tx_id + ) const commitResponse = await this.engine.commitTransaction(tx_id, 'trace') console.log('[nodejs] commited', commitResponse) } + + private async doQuery(query: JsonQuery, tx_id?: string) { + const result = await this.engine.query(JSON.stringify(query), 'trace', tx_id) + const parsedResult = JSON.parse(result) + if (parsedResult.errors) { + const error = parsedResult.errors[0]?.user_facing_error + if (error.error_code === 'P2036') { + const jsError = this.connector.errorRegistry.consumeError(error.meta.id) + if (!jsError) { + throw new Error(`Something went wrong. Engine reported external error with id ${error.meta.id}, but it was not registered.`) + } + throw jsError.error + } + } + return parsedResult + } } diff --git a/query-engine/js-connectors/src/async_js_function.rs b/query-engine/js-connectors/src/async_js_function.rs new file mode 100644 index 000000000000..5f535334ffb9 --- /dev/null +++ b/query-engine/js-connectors/src/async_js_function.rs @@ -0,0 +1,70 @@ +use std::marker::PhantomData; + +use napi::{ + bindgen_prelude::*, + threadsafe_function::{ErrorStrategy, ThreadsafeFunction}, +}; + +use crate::{ + error::{async_unwinding_panic, into_quaint_error}, + result::JsResult, +}; + +/// Wrapper for napi-rs's ThreadsafeFunction that is aware of +/// JS drivers conventions. Performs following things: +/// - Automatically unrefs the function so it won't hold off event loop +/// - Awaits for returned Promise +/// - Unpacks JS `Result` type into Rust `Result` type and converts the error +/// into `quaint::Error`. +/// - Catches panics and converts them to `quaint:Error` +pub(crate) struct AsyncJsFunction +where + ArgType: ToNapiValue + 'static, + ReturnType: FromNapiValue + 'static, +{ + threadsafe_fn: ThreadsafeFunction, + _phantom: PhantomData, +} + +impl AsyncJsFunction +where + ArgType: ToNapiValue + 'static, + ReturnType: FromNapiValue + 'static, +{ + fn from_threadsafe_function( + mut threadsafe_fn: ThreadsafeFunction, + env: Env, + ) -> napi::Result { + threadsafe_fn.unref(&env)?; + + Ok(AsyncJsFunction { + threadsafe_fn, + _phantom: PhantomData, + }) + } + + pub(crate) async fn call(&self, arg: ArgType) -> quaint::Result { + let js_result = async_unwinding_panic(async { + let promise = self + .threadsafe_fn + .call_async::>>(arg) + .await?; + promise.await + }) + .await + .map_err(into_quaint_error)?; + js_result.into() + } +} + +impl FromNapiValue for AsyncJsFunction +where + ArgType: ToNapiValue + 'static, + ReturnType: FromNapiValue + 'static, +{ + unsafe fn from_napi_value(napi_env: napi::sys::napi_env, napi_val: napi::sys::napi_value) -> napi::Result { + let env = Env::from_raw(napi_env); + let threadsafe_fn = ThreadsafeFunction::from_napi_value(napi_env, napi_val)?; + Self::from_threadsafe_function(threadsafe_fn, env) + } +} diff --git a/query-engine/js-connectors/src/lib.rs b/query-engine/js-connectors/src/lib.rs index 9e2664621e8d..00c75218e96a 100644 --- a/query-engine/js-connectors/src/lib.rs +++ b/query-engine/js-connectors/src/lib.rs @@ -7,8 +7,10 @@ //! plus some transformation of types to adhere to what a quaint::Value expresses. //! +mod async_js_function; mod error; mod proxy; mod queryable; +mod result; mod transaction; pub use queryable::{from_napi, JsQueryable}; diff --git a/query-engine/js-connectors/src/proxy.rs b/query-engine/js-connectors/src/proxy.rs index e69b818f1a57..8331a2e46638 100644 --- a/query-engine/js-connectors/src/proxy.rs +++ b/query-engine/js-connectors/src/proxy.rs @@ -1,11 +1,10 @@ use core::panic; use std::str::FromStr; -use crate::error::*; +use crate::async_js_function::AsyncJsFunction; use crate::transaction::JsTransaction; -use napi::bindgen_prelude::{FromNapiValue, Promise as JsPromise, ToNapiValue}; -use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction}; -use napi::{Env, JsObject, JsString}; +use napi::bindgen_prelude::{FromNapiValue, ToNapiValue}; +use napi::{JsObject, JsString}; use napi_derive::napi; use quaint::connector::{IsolationLevel, ResultSet as QuaintResultSet}; use quaint::Value as QuaintValue; @@ -18,13 +17,13 @@ use chrono::{NaiveDate, NaiveTime}; /// Proxy is a struct wrapping a javascript object that exhibits basic primitives for /// querying and executing SQL (i.e. a client connector). The Proxy uses NAPI ThreadSafeFunction to /// invoke the code within the node runtime that implements the client connector. -pub struct CommonProxy { +pub(crate) struct CommonProxy { /// Execute a query given as SQL, interpolating the given parameters. - query_raw: ThreadsafeFunction, + query_raw: AsyncJsFunction, /// Execute a query given as SQL, interpolating the given parameters and /// returning the number of affected rows. - execute_raw: ThreadsafeFunction, + execute_raw: AsyncJsFunction, /// Return the flavour for this driver. pub(crate) flavour: String, @@ -32,17 +31,17 @@ pub struct CommonProxy { /// This is a JS proxy for accessing the methods specific to top level /// JS driver objects -pub struct DriverProxy { - start_transaction: ThreadsafeFunction, ErrorStrategy::Fatal>, +pub(crate) struct DriverProxy { + start_transaction: AsyncJsFunction, JsTransaction>, } /// This a JS proxy for accessing the methods, specific /// to JS transaction objects -pub struct TransactionProxy { +pub(crate) struct TransactionProxy { /// commit transaction - commit: ThreadsafeFunction<(), ErrorStrategy::Fatal>, + commit: AsyncJsFunction<(), ()>, /// rollback transcation - rollback: ThreadsafeFunction<(), ErrorStrategy::Fatal>, + rollback: AsyncJsFunction<(), ()>, } /// This result set is more convenient to be manipulated from both Rust and NodeJS. @@ -307,91 +306,57 @@ impl From for QuaintResultSet { } impl CommonProxy { - pub fn new(object: &JsObject, env: &Env) -> napi::Result { - let query_raw = object.get_named_property("queryRaw")?; - let execute_raw = object.get_named_property("executeRaw")?; + pub fn new(object: &JsObject) -> napi::Result { let flavour: JsString = object.get_named_property("flavour")?; - let mut result = Self { - query_raw, - execute_raw, + Ok(Self { + query_raw: object.get_named_property("queryRaw")?, + execute_raw: object.get_named_property("executeRaw")?, flavour: flavour.into_utf8()?.as_str()?.to_owned(), - }; - - result.query_raw.unref(env)?; - result.execute_raw.unref(env)?; - - Ok(result) + }) } - pub async fn query_raw(&self, params: Query) -> napi::Result { - async_unwinding_panic(async { - let promise = self.query_raw.call_async::>(params).await?; - let value = promise.await?; - Ok(value) - }) - .await + pub async fn query_raw(&self, params: Query) -> quaint::Result { + self.query_raw.call(params).await } - pub async fn execute_raw(&self, params: Query) -> napi::Result { - async_unwinding_panic(async { - let promise = self.execute_raw.call_async::>(params).await?; - let value = promise.await?; - Ok(value) - }) - .await + pub async fn execute_raw(&self, params: Query) -> quaint::Result { + self.execute_raw.call(params).await } } impl DriverProxy { - pub fn new(js_connector: &JsObject, env: &Env) -> napi::Result { - let start_transaction = js_connector.get_named_property("startTransaction")?; - let mut result = Self { start_transaction }; - result.start_transaction.unref(env)?; - - Ok(result) + pub fn new(js_connector: &JsObject) -> napi::Result { + Ok(Self { + start_transaction: js_connector.get_named_property("startTransaction")?, + }) } - pub async fn start_transaction(&self, isolation_level: Option) -> napi::Result> { - async_unwinding_panic(async move { - let promise = self - .start_transaction - .call_async::>(isolation_level.map(|l| l.to_string())) - .await?; - - let tx = promise.await?; - Ok(Box::new(tx)) - }) - .await + pub async fn start_transaction( + &self, + isolation_level: Option, + ) -> quaint::Result> { + let tx = self + .start_transaction + .call(isolation_level.map(|l| l.to_string())) + .await?; + Ok(Box::new(tx)) } } impl TransactionProxy { - pub fn new(js_transaction: &JsObject, env: &Env) -> napi::Result { + pub fn new(js_transaction: &JsObject) -> napi::Result { let commit = js_transaction.get_named_property("commit")?; let rollback = js_transaction.get_named_property("rollback")?; - let mut result = Self { commit, rollback }; - - result.commit.unref(env)?; - result.rollback.unref(env)?; - - Ok(result) + Ok(Self { commit, rollback }) } - pub async fn commit(&self) -> napi::Result<()> { - async_unwinding_panic(async move { - let promise = self.commit.call_async::>(()).await?; - promise.await - }) - .await + pub async fn commit(&self) -> quaint::Result<()> { + self.commit.call(()).await } - pub async fn rollback(&self) -> napi::Result<()> { - async_unwinding_panic(async move { - let promise = self.rollback.call_async::>(()).await?; - promise.await - }) - .await + pub async fn rollback(&self) -> quaint::Result<()> { + self.rollback.call(()).await } } diff --git a/query-engine/js-connectors/src/queryable.rs b/query-engine/js-connectors/src/queryable.rs index 0a498b31e011..e75542099e01 100644 --- a/query-engine/js-connectors/src/queryable.rs +++ b/query-engine/js-connectors/src/queryable.rs @@ -1,9 +1,6 @@ -use crate::{ - error::into_quaint_error, - proxy::{CommonProxy, DriverProxy, Query}, -}; +use crate::proxy::{CommonProxy, DriverProxy, Query}; use async_trait::async_trait; -use napi::{Env, JsObject}; +use napi::JsObject; use psl::datamodel_connector::Flavour; use quaint::{ connector::{ @@ -30,13 +27,13 @@ use tracing::{info_span, Instrument}; /// into a `quaint::connector::result_set::ResultSet`. A quaint `ResultSet` is basically a vector /// of `quaint::Value` but said type is a tagged enum, with non-unit variants that cannot be converted to javascript as is. /// -pub struct JsBaseQueryable { +pub(crate) struct JsBaseQueryable { pub(crate) proxy: CommonProxy, pub flavour: Flavour, } impl JsBaseQueryable { - pub fn new(proxy: CommonProxy) -> Self { + pub(crate) fn new(proxy: CommonProxy) -> Self { let flavour: Flavour = proxy.flavour.to_owned().parse().unwrap(); Self { proxy, flavour } } @@ -137,12 +134,7 @@ impl JsBaseQueryable { let query = Self::build_query(sql, params).instrument(serialization_span).await; let sql_span = info_span!("js:query:sql", user_facing = true, "db.statement" = %sql); - let result_set = self - .proxy - .query_raw(query) - .instrument(sql_span) - .await - .map_err(into_quaint_error)?; + let result_set = self.proxy.query_raw(query).instrument(sql_span).await?; let len = result_set.len(); let _deserialization_span = info_span!("js:query:result", user_facing = true, "length" = %len).entered(); @@ -154,9 +146,8 @@ impl JsBaseQueryable { let serialization_span = info_span!("js:query:args", user_facing = true, "length" = %len); let query = Self::build_query(sql, params).instrument(serialization_span).await; - // Todo: convert napi::Error to quaint::error::Error. let sql_span = info_span!("js:query:sql", user_facing = true, "db.statement" = %sql); - let affected_rows = self.proxy.execute_raw(query).instrument(sql_span).await.unwrap(); + let affected_rows = self.proxy.execute_raw(query).instrument(sql_span).await?; Ok(affected_rows as u64) } @@ -245,19 +236,15 @@ impl TransactionCapable for JsQueryable { &'a self, isolation: Option, ) -> quaint::Result> { - let tx = self - .driver_proxy - .start_transaction(isolation) - .await - .map_err(into_quaint_error)?; + let tx = self.driver_proxy.start_transaction(isolation).await?; Ok(tx) } } -pub fn from_napi(napi_env: &Env, driver: JsObject) -> JsQueryable { - let common = CommonProxy::new(&driver, napi_env).unwrap(); - let driver_proxy = DriverProxy::new(&driver, napi_env).unwrap(); +pub fn from_napi(driver: JsObject) -> JsQueryable { + let common = CommonProxy::new(&driver).unwrap(); + let driver_proxy = DriverProxy::new(&driver).unwrap(); JsQueryable { inner: JsBaseQueryable::new(common), diff --git a/query-engine/js-connectors/src/result.rs b/query-engine/js-connectors/src/result.rs new file mode 100644 index 000000000000..c4aedd2b781d --- /dev/null +++ b/query-engine/js-connectors/src/result.rs @@ -0,0 +1,78 @@ +use napi::{bindgen_prelude::FromNapiValue, Env, JsUnknown, NapiValue}; +use quaint::error::Error as QuaintError; +use serde::Deserialize; + +#[derive(Deserialize, Debug)] +#[serde(tag = "kind")] +/// Wrapper for JS-side errors +/// See js-connectors/js-connector-utils/types file for example +pub(crate) enum JsConnectorError { + /// Unexpected JS exception + GenericJsError { id: i32 }, + // in the future, expected errors that map to known user errors with PXXX codes will also go here +} + +impl FromNapiValue for JsConnectorError { + unsafe fn from_napi_value(napi_env: napi::sys::napi_env, napi_val: napi::sys::napi_value) -> napi::Result { + let env = Env::from_raw(napi_env); + let value = JsUnknown::from_raw(napi_env, napi_val)?; + env.from_js_value(value) + } +} + +impl From for QuaintError { + fn from(value: JsConnectorError) -> Self { + match value { + JsConnectorError::GenericJsError { id } => QuaintError::external_error(id), + // in future, more error types would be added and we'll need to convert them to proper QuaintErrors here + } + } +} + +/// Wrapper for JS-side result type +/// See js-connectors/js-connector-utils/types file for example +pub(crate) enum JsResult +where + T: FromNapiValue, +{ + Ok(T), + Err(JsConnectorError), +} + +impl JsResult +where + T: FromNapiValue, +{ + fn from_js_unknown(unknown: JsUnknown) -> napi::Result { + let object = unknown.coerce_to_object()?; + let ok: bool = object.get_named_property("ok")?; + if ok { + let value: JsUnknown = object.get_named_property("value")?; + return Ok(Self::Ok(T::from_unknown(value)?)); + } + + let error = object.get_named_property("error")?; + Ok(Self::Err(error)) + } +} + +impl FromNapiValue for JsResult +where + T: FromNapiValue, +{ + unsafe fn from_napi_value(napi_env: napi::sys::napi_env, napi_val: napi::sys::napi_value) -> napi::Result { + Self::from_js_unknown(JsUnknown::from_raw(napi_env, napi_val)?) + } +} + +impl From> for quaint::Result +where + T: FromNapiValue, +{ + fn from(value: JsResult) -> Self { + match value { + JsResult::Ok(result) => Ok(result), + JsResult::Err(error) => Err(error.into()), + } + } +} diff --git a/query-engine/js-connectors/src/transaction.rs b/query-engine/js-connectors/src/transaction.rs index b7c456467cc5..df1745d12e78 100644 --- a/query-engine/js-connectors/src/transaction.rs +++ b/query-engine/js-connectors/src/transaction.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use napi::{bindgen_prelude::FromNapiValue, Env, JsObject}; +use napi::{bindgen_prelude::FromNapiValue, JsObject}; use quaint::{ connector::{IsolationLevel, Transaction as QuaintTransaction}, prelude::{Query as QuaintQuery, Queryable, ResultSet}, @@ -7,7 +7,6 @@ use quaint::{ }; use crate::{ - error::into_quaint_error, proxy::{CommonProxy, TransactionProxy}, queryable::JsBaseQueryable, }; @@ -15,13 +14,13 @@ use crate::{ // Wrapper around JS transaction objects that implements Queryable // and quaint::Transaction. Can be used in place of quaint transaction, // but delegates most operations to JS -pub struct JsTransaction { +pub(crate) struct JsTransaction { tx_proxy: TransactionProxy, inner: JsBaseQueryable, } impl JsTransaction { - pub fn new(inner: JsBaseQueryable, tx_proxy: TransactionProxy) -> Self { + pub(crate) fn new(inner: JsBaseQueryable, tx_proxy: TransactionProxy) -> Self { Self { inner, tx_proxy } } } @@ -29,11 +28,11 @@ impl JsTransaction { #[async_trait] impl QuaintTransaction for JsTransaction { async fn commit(&self) -> quaint::Result<()> { - self.tx_proxy.commit().await.map_err(into_quaint_error) + self.tx_proxy.commit().await } async fn rollback(&self) -> quaint::Result<()> { - self.tx_proxy.rollback().await.map_err(into_quaint_error) + self.tx_proxy.rollback().await } fn as_queryable(&self) -> &dyn Queryable { @@ -95,9 +94,8 @@ impl Queryable for JsTransaction { impl FromNapiValue for JsTransaction { unsafe fn from_napi_value(env: napi::sys::napi_env, napi_val: napi::sys::napi_value) -> napi::Result { let object = JsObject::from_napi_value(env, napi_val)?; - let env_safe = Env::from_raw(env); - let common_proxy = CommonProxy::new(&object, &env_safe)?; - let tx_proxy = TransactionProxy::new(&object, &env_safe)?; + let common_proxy = CommonProxy::new(&object)?; + let tx_proxy = TransactionProxy::new(&object)?; Ok(Self::new(JsBaseQueryable::new(common_proxy), tx_proxy)) } diff --git a/query-engine/query-engine-node-api/src/engine.rs b/query-engine/query-engine-node-api/src/engine.rs index f3ec435ec308..e535da371063 100644 --- a/query-engine/query-engine-node-api/src/engine.rs +++ b/query-engine/query-engine-node-api/src/engine.rs @@ -173,7 +173,7 @@ impl QueryEngine { #[cfg(feature = "js-connectors")] if let Some(driver) = maybe_driver { - let js_queryable = js_connectors::from_napi(&napi_env, driver); + let js_queryable = js_connectors::from_napi(driver); let provider_name = schema.connector.provider_name(); match sql_connector::register_js_connector(provider_name, Arc::new(js_queryable)) {