Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Parse persited query on subscription. #1012

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions lib/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ module.exports = async function (app, opts) {
notSupportedError
} = persistedQueryProvider || {}

async function executeQuery (query, variables, operationName, request, reply) {
async function executeQuery(query, variables, operationName, request, reply) {
// Validate a query is present
if (!query) {
return new MER_ERR_GQL_PERSISTED_QUERY_NOT_FOUND('Unknown query')
Expand All @@ -237,10 +237,8 @@ module.exports = async function (app, opts) {
return executeQuery(query, variables, operationName, request, reply)
}

async function executePersistedQuery (body, request, reply) {
async function getPersistedQuery (body) {
let { query } = body
const { operationName, variables } = body

// Verify if a query matches the persisted format
const persisted = isPersistedQuery(body)
if (persisted) {
Expand All @@ -266,23 +264,32 @@ module.exports = async function (app, opts) {
}
}

// Execute the query
const result = await executeQuery(query, variables, operationName, request, reply)

// Only save queries which are not yet persisted
if (!persisted && query) {
// If provided the getHashForQuery, saveQuery settings we save this query
const hash = getHashForQuery && getHashForQuery(query)
if (hash) {
try {
await saveQuery(hash, query)
} catch (err) {
request.log.warn({ err, hash, query }, 'Failed to persist query')
return {
query,
persisted,
async saveQuery () {
const hash = getHashForQuery && getHashForQuery(query)
if (hash) {
try {
await saveQuery(hash, query)
} catch (err) {
// FIMXE: memory leak vulnerability
// request.log.warn({ err, hash, query }, 'Failed to persist query')
}
}
}
}
}

async function executePersistedQuery (body, request, reply) {
const { operationName, variables } = body

const { query, saveQuery } = await getPersistedQuery(body)

const result = await executeQuery(query, variables, operationName, request, reply)

await saveQuery()

// Return the result
return result
}

Expand Down Expand Up @@ -326,7 +333,8 @@ module.exports = async function (app, opts) {
subscriptionContextFn,
keepAlive,
fullWsTransport,
errorFormatter
errorFormatter,
getPersistedQuery,
})
} else {
app.route(getOptions)
Expand Down
6 changes: 4 additions & 2 deletions lib/subscription-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ module.exports = class SubscriptionConnection {
resolveContext,
keepAlive,
fullWsTransport,
errorFormatter
errorFormatter,
getPersistedQuery
}) {
this.fastify = fastify
this.socket = socket
Expand All @@ -37,6 +38,7 @@ module.exports = class SubscriptionConnection {
this.fullWsTransport = fullWsTransport
this.errorFormatter = errorFormatter
this.headers = {}
this.getPersistedQuery = getPersistedQuery

this.protocolMessageTypes = getProtocolByName(socket.protocol)
this.socket.on('error', this.handleConnectionClose.bind(this))
Expand Down Expand Up @@ -208,7 +210,7 @@ module.exports = class SubscriptionConnection {
}
}

const document = typeof query !== 'string' ? query : parse(query)
const document = typeof query !== 'string' ? query : parse((await this.getPersistedQuery(payload)).query)

if (!document) {
throw new Error('Must provide document.')
Expand Down
10 changes: 6 additions & 4 deletions lib/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { kHooks } = require('./symbols')
const SubscriptionConnection = require('./subscription-connection')
const { getProtocolByName } = require('./subscription-protocol')

function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive, fullWsTransport, errorFormatter }) {
function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive, fullWsTransport, errorFormatter, getPersistedQuery }) {
return async (connection, request) => {
const { socket } = connection

Expand Down Expand Up @@ -48,7 +48,8 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect
resolveContext,
keepAlive,
fullWsTransport,
errorFormatter
errorFormatter,
getPersistedQuery
})

/* istanbul ignore next */
Expand All @@ -62,7 +63,7 @@ function createConnectionHandler ({ subscriber, fastify, onConnect, onDisconnect
}

module.exports = async function (fastify, opts) {
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive, fullWsTransport, errorFormatter } = opts
const { getOptions, subscriber, verifyClient, onConnect, onDisconnect, entityResolversFactory, subscriptionContextFn, keepAlive, fullWsTransport, errorFormatter, getPersistedQuery } = opts

// If `fastify.websocketServer` exists, it means `@fastify/websocket` already registered.
// Without this check, @fastify/websocket will be registered multiple times and raises FST_ERR_DEC_ALREADY_PRESENT.
Expand All @@ -86,7 +87,8 @@ module.exports = async function (fastify, opts) {
subscriptionContextFn,
keepAlive,
fullWsTransport,
errorFormatter
errorFormatter,
getPersistedQuery
})
})
}