Skip to content

Commit

Permalink
the IncrementalPublisher should handle response building
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jul 6, 2023
1 parent fae5da5 commit 7ef6ee4
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 215 deletions.
249 changes: 161 additions & 88 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,63 @@ import type {
GraphQLFormattedError,
} from '../error/GraphQLError.js';

/**
* The result of GraphQL execution.
*
* - `errors` is included when any errors occurred as a non-empty array.
* - `data` is the result of a successful execution of the query.
* - `hasNext` is true if a future payload is expected.
* - `extensions` is reserved for adding non-standard properties.
* - `incremental` is a list of the results from defer/stream directives.
*/
export interface ExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> {
errors?: ReadonlyArray<GraphQLError>;
data?: TData | null;
extensions?: TExtensions;
}

export interface FormattedExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> {
errors?: ReadonlyArray<GraphQLFormattedError>;
data?: TData | null;
extensions?: TExtensions;
}

export interface ExperimentalIncrementalExecutionResults<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> {
initialResult: InitialIncrementalExecutionResult<TData, TExtensions>;
subsequentResults: AsyncGenerator<
SubsequentIncrementalExecutionResult<TData, TExtensions>,
void,
void
>;
}

export interface InitialIncrementalExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> extends ExecutionResult<TData, TExtensions> {
hasNext: boolean;
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
extensions?: TExtensions;
}

export interface FormattedInitialIncrementalExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> extends FormattedExecutionResult<TData, TExtensions> {
hasNext: boolean;
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
extensions?: TExtensions;
}

export interface SubsequentIncrementalExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
Expand Down Expand Up @@ -113,86 +170,6 @@ export class IncrementalPublisher {
this._reset();
}

hasNext(): boolean {
return this._pending.size > 0;
}

subscribe(): AsyncGenerator<
SubsequentIncrementalExecutionResult,
void,
void
> {
let isDone = false;

const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
// eslint-disable-next-line no-constant-condition
while (true) {
if (isDone) {
return { value: undefined, done: true };
}

for (const item of this._released) {
this._pending.delete(item);
}
const released = this._released;
this._released = new Set();

const result = this._getIncrementalResult(released);

if (!this.hasNext()) {
isDone = true;
}

if (result !== undefined) {
return { value: result, done: false };
}

// eslint-disable-next-line no-await-in-loop
await this._signalled;
}
};

const returnStreamIterators = async (): Promise<void> => {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
this._pending.forEach((incrementalDataRecord) => {
if (
isStreamItemsRecord(incrementalDataRecord) &&
incrementalDataRecord.asyncIterator?.return
) {
promises.push(incrementalDataRecord.asyncIterator.return());
}
});
await Promise.all(promises);
};

const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
isDone = true;
await returnStreamIterators();
return { value: undefined, done: true };
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
await returnStreamIterators();
return Promise.reject(error);
};

return {
[Symbol.asyncIterator]() {
return this;
},
next: _next,
return: _return,
throw: _throw,
};
}

prepareInitialResultRecord(): InitialResultRecord {
return {
errors: [],
Expand Down Expand Up @@ -256,19 +233,38 @@ export class IncrementalPublisher {
incrementalDataRecord.errors.push(error);
}

publishInitial(initialResult: InitialResultRecord) {
for (const child of initialResult.children) {
buildDataResponse(
initialResultRecord: InitialResultRecord,
data: ObjMap<unknown> | null,
): ExecutionResult | ExperimentalIncrementalExecutionResults {
for (const child of initialResultRecord.children) {
if (child.filtered) {
continue;
}
this._publish(child);
}

const errors = initialResultRecord.errors;
const initialResult = errors.length === 0 ? { data } : { errors, data };
if (this._pending.size > 0) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: this._subscribe(),
};
}
return initialResult;
}

getInitialErrors(
initialResult: InitialResultRecord,
): ReadonlyArray<GraphQLError> {
return initialResult.errors;
buildErrorResponse(
initialResultRecord: InitialResultRecord,
error: GraphQLError,
): ExecutionResult {
const errors = initialResultRecord.errors;
errors.push(error);
return { data: null, errors };
}

filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) {
Expand Down Expand Up @@ -301,6 +297,82 @@ export class IncrementalPublisher {
});
}

private _subscribe(): AsyncGenerator<
SubsequentIncrementalExecutionResult,
void,
void
> {
let isDone = false;

const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
// eslint-disable-next-line no-constant-condition
while (true) {
if (isDone) {
return { value: undefined, done: true };
}

for (const item of this._released) {
this._pending.delete(item);
}
const released = this._released;
this._released = new Set();

const result = this._getIncrementalResult(released);

if (this._pending.size === 0) {
isDone = true;
}

if (result !== undefined) {
return { value: result, done: false };
}

// eslint-disable-next-line no-await-in-loop
await this._signalled;
}
};

const returnStreamIterators = async (): Promise<void> => {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
this._pending.forEach((incrementalDataRecord) => {
if (
isStreamItemsRecord(incrementalDataRecord) &&
incrementalDataRecord.asyncIterator?.return
) {
promises.push(incrementalDataRecord.asyncIterator.return());
}
});
await Promise.all(promises);
};

const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
isDone = true;
await returnStreamIterators();
return { value: undefined, done: true };
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
await returnStreamIterators();
return Promise.reject(error);
};

return {
[Symbol.asyncIterator]() {
return this;
},
next: _next,
return: _return,
throw: _throw,
};
}

private _trigger() {
this._resolve();
this._reset();
Expand Down Expand Up @@ -368,9 +440,10 @@ export class IncrementalPublisher {
incrementalResults.push(incrementalResult);
}

const hasNext = this._pending.size > 0;
return incrementalResults.length
? { incremental: incrementalResults, hasNext: this.hasNext() }
: encounteredCompletedAsyncIterator && !this.hasNext()
? { incremental: incrementalResults, hasNext }
: encounteredCompletedAsyncIterator && !hasNext
? { hasNext: false }
: undefined;
}
Expand Down
6 changes: 4 additions & 2 deletions src/execution/__tests__/defer-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import {
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
import { GraphQLSchema } from '../../type/schema.js';

import type { InitialIncrementalExecutionResult } from '../execute.js';
import { execute, experimentalExecuteIncrementally } from '../execute.js';
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
import type {
InitialIncrementalExecutionResult,
SubsequentIncrementalExecutionResult,
} from '../IncrementalPublisher.js';

const friendType = new GraphQLObjectType({
fields: {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/__tests__/lists-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { GraphQLSchema } from '../../type/schema.js';

import { buildSchema } from '../../utilities/buildASTSchema.js';

import type { ExecutionResult } from '../execute.js';
import { execute, executeSync } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

describe('Execute: Accepts any iterable as list value', () => {
function complete(rootValue: unknown) {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/__tests__/nonnull-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import { GraphQLSchema } from '../../type/schema.js';

import { buildSchema } from '../../utilities/buildASTSchema.js';

import type { ExecutionResult } from '../execute.js';
import { execute, executeSync } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

const syncError = new Error('sync');
const syncNonNullError = new Error('syncNonNull');
Expand Down
2 changes: 1 addition & 1 deletion src/execution/__tests__/oneof-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { parse } from '../../language/parser.js';

import { buildSchema } from '../../utilities/buildASTSchema.js';

import type { ExecutionResult } from '../execute.js';
import { execute } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

const schema = buildSchema(`
type Query {
Expand Down
6 changes: 4 additions & 2 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import {
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
import { GraphQLSchema } from '../../type/schema.js';

import type { InitialIncrementalExecutionResult } from '../execute.js';
import { experimentalExecuteIncrementally } from '../execute.js';
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
import type {
InitialIncrementalExecutionResult,
SubsequentIncrementalExecutionResult,
} from '../IncrementalPublisher.js';

const friendType = new GraphQLObjectType({
fields: {
Expand Down
3 changes: 2 additions & 1 deletion src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import {
} from '../../type/scalars.js';
import { GraphQLSchema } from '../../type/schema.js';

import type { ExecutionArgs, ExecutionResult } from '../execute.js';
import type { ExecutionArgs } from '../execute.js';
import { createSourceEventStream, subscribe } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

import { SimplePubSub } from './simplePubSub.js';

Expand Down
Loading

0 comments on commit 7ef6ee4

Please sign in to comment.