From 515f8f083973bec61f1cd20abc03712e40f11374 Mon Sep 17 00:00:00 2001 From: George Svarovsky Date: Mon, 21 Nov 2022 08:03:28 +0000 Subject: [PATCH] #118: Using queueMicrotask exclusively for asynciterator task scheduling due to RubenVerborgh/AsyncIterator#96 --- package-lock.json | 1 + package.json | 1 + src/engine/async.ts | 12 ++++++++++++ src/engine/dataset/DatasetEngine.ts | 8 ++++---- src/engine/dataset/index.ts | 30 ++++++++++++++--------------- src/orm/OrmDomain.ts | 10 +++++----- src/orm/OrmSubject.ts | 4 ++-- test/StateEngine.test.ts | 4 ++-- 8 files changed, 42 insertions(+), 28 deletions(-) create mode 100644 src/engine/async.ts diff --git a/package-lock.json b/package-lock.json index 811f5bfd..75348297 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,6 +26,7 @@ "mqtt-pattern": "^1.2.0", "quadstore": "^11.0.3", "quadstore-comunica": "^3.0.3", + "queue-microtask": "^1.2.3", "rdf-data-factory": "^1.0.4", "reflect-metadata": "^0.1.13", "rx-flowable": "^0.1.0", diff --git a/package.json b/package.json index 7cdfe6a8..10b21a1c 100644 --- a/package.json +++ b/package.json @@ -98,6 +98,7 @@ "mqtt-pattern": "^1.2.0", "quadstore": "^11.0.3", "quadstore-comunica": "^3.0.3", + "queue-microtask": "^1.2.3", "rdf-data-factory": "^1.0.4", "reflect-metadata": "^0.1.13", "rx-flowable": "^0.1.0", diff --git a/src/engine/async.ts b/src/engine/async.ts new file mode 100644 index 00000000..6977c9be --- /dev/null +++ b/src/engine/async.ts @@ -0,0 +1,12 @@ +/** + * Library override + */ + +import * as async from 'asynciterator'; + +/** @see https://github.com/RubenVerborgh/AsyncIterator/issues/96 */ +async.setTaskScheduler(require('queue-microtask')); + +// This awkward export is to prevent WebStorm from flagging import +// simplification warnings that would bypass this module +export default async; \ No newline at end of file diff --git a/src/engine/dataset/DatasetEngine.ts b/src/engine/dataset/DatasetEngine.ts index 38a80f51..6677b4ac 100644 --- a/src/engine/dataset/DatasetEngine.ts +++ b/src/engine/dataset/DatasetEngine.ts @@ -23,7 +23,7 @@ import { AbstractMeld, comesAlive } from '../AbstractMeld'; import { RemoteOperations } from './RemoteOperations'; import { CloneEngine } from '../StateEngine'; import { MeldError, MeldErrorStatus } from '../MeldError'; -import { AsyncIterator, TransformIterator, wrap } from 'asynciterator'; +import async from '../async'; import { BaseStream } from '../../rdfjs-support'; import { Consumable } from 'rx-flowable'; import { MeldApp, MeldConfig } from '../../config'; @@ -523,11 +523,11 @@ export class DatasetEngine extends AbstractMeld implements CloneEngine, MeldLoca } private wrapStreamFn

( - fn: (...args: P) => BaseStream): ((...args: P) => AsyncIterator) { + fn: (...args: P) => BaseStream): ((...args: P) => async.AsyncIterator) { return (...args) => { - return new TransformIterator(this.closed ? + return new async.TransformIterator(this.closed ? Promise.reject(new MeldError('Clone has closed')) : - this.lock.share('live', 'sparql', async () => wrap(fn(...args)))); + this.lock.share('live', 'sparql', async () => async.wrap(fn(...args)))); }; } diff --git a/src/engine/dataset/index.ts b/src/engine/dataset/index.ts index 2d6d3eb7..ea2922e9 100644 --- a/src/engine/dataset/index.ts +++ b/src/engine/dataset/index.ts @@ -14,7 +14,7 @@ import { Algebra } from 'sparqlalgebrajs'; import { Engine } from 'quadstore-comunica'; import { DataFactory as RdfDataFactory } from 'rdf-data-factory'; import { JRQL, M_LD, RDF, XS } from '../../ns'; -import { AsyncIterator, empty, EmptyIterator, SimpleTransformIterator } from 'asynciterator'; +import async from '../async'; import { MutableOperation } from '../ops'; import { MeldError } from '../MeldError'; import { BaseStream, Binding, CountableRdf, QueryableRdfSource } from '../../rdfjs-support'; @@ -125,11 +125,11 @@ export interface Graph extends RdfFactory, QueryableRdfSource { readonly name: GraphName; readonly lock: LockManager<'state'>; - query(...args: Parameters): AsyncIterator; - query(query: Algebra.Construct): AsyncIterator; - query(query: Algebra.Describe): AsyncIterator; - query(query: Algebra.Project): AsyncIterator; - query(query: Algebra.Distinct): AsyncIterator; + query(...args: Parameters): async.AsyncIterator; + query(query: Algebra.Construct): async.AsyncIterator; + query(query: Algebra.Describe): async.AsyncIterator; + query(query: Algebra.Project): async.AsyncIterator; + query(query: Algebra.Distinct): async.AsyncIterator; ask(query: Algebra.Ask): Promise; } @@ -413,7 +413,7 @@ class QuadStoreGraph implements Graph { match: Graph['match'] = (subject, predicate, object, graph) => { if (graph != null && !graph.equals(this.name)) - return empty(); + return async.empty(); else // Must specify graph term due to optimised indexing return (this.dataset.store).match(subject, predicate, object, this.name); @@ -427,14 +427,14 @@ class QuadStoreGraph implements Graph { return (this.dataset.store).countQuads(subject, predicate, object, this.name); }; - query(...args: Parameters): AsyncIterator; - query(query: Algebra.Construct): AsyncIterator; - query(query: Algebra.Describe): AsyncIterator; - query(query: Algebra.Project): AsyncIterator; - query(query: Algebra.Distinct): AsyncIterator; + query(...args: Parameters): async.AsyncIterator; + query(query: Algebra.Construct): async.AsyncIterator; + query(query: Algebra.Describe): async.AsyncIterator; + query(query: Algebra.Project): async.AsyncIterator; + query(query: Algebra.Distinct): async.AsyncIterator; query( ...args: Parameters | [Algebra.Operation] - ): AsyncIterator { + ): async.AsyncIterator { const source: Promise> = (async () => { try { const [algebra] = args; @@ -448,12 +448,12 @@ class QuadStoreGraph implements Graph { } catch (err) { // TODO: Comunica bug? Cannot read property 'close' of undefined, if stream empty if (err instanceof TypeError) - return new EmptyIterator(); + return new async.EmptyIterator(); throw err; } throw new Error('Expected bindings or quads'); })(); - return new SimpleTransformIterator( + return new async.SimpleTransformIterator( // A transform iterator is actually capable of taking a base stream, despite typings this.dataset.lock.extend('state', 'query', source), { map: item => ('type' in item && item.type === 'bindings') ? toBinding(item) : item diff --git a/src/orm/OrmDomain.ts b/src/orm/OrmDomain.ts index e39b0d75..4b2c0141 100644 --- a/src/orm/OrmDomain.ts +++ b/src/orm/OrmDomain.ts @@ -8,7 +8,7 @@ import { SubjectUpdater } from '../updates'; import { ReadLatchable } from '../engine/index'; import { BehaviorSubject, firstValueFrom } from 'rxjs'; import { filter } from 'rxjs/operators'; -import { isPromise } from 'asynciterator'; +import async from '../engine/async'; import { array } from '../util'; import { MeldApp, MeldConfig } from '../config'; @@ -171,7 +171,7 @@ export class OrmDomain { if (update != null) { const updater = new SubjectUpdater(update); for (let subject of this.domain._cache.values()) { - if (isPromise(subject)) + if (async.isPromise(subject)) await subject .then(subject => this.updateSubject(updater, subject, deleted)) .catch(); // Error will have been reported by get() @@ -190,9 +190,9 @@ export class OrmDomain { // In the course of an update, the cache may mutate. Rely on Map order to // ensure any new subjects are captured and updated. for (let [id, subject] of this.domain._cache.entries()) { - if (isPromise(subject)) + if (async.isPromise(subject)) this.domain._cache.set(id, subject = await subject); - if (isPromise(subject.updated)) + if (async.isPromise(subject.updated)) await settled(subject.updated); } } @@ -282,7 +282,7 @@ export class OrmDomain { commit(): Update { const update = { '@delete': [] as Subject[], '@insert': [] as Subject[] }; for (let subject of this._cache.values()) { - if (isPromise(subject)) + if (async.isPromise(subject)) throw new TypeError('ORM domain has not finished updating'); const subjectUpdate = subject.commit(); update['@delete'].push(...array(subjectUpdate['@delete'])); diff --git a/src/orm/OrmSubject.ts b/src/orm/OrmSubject.ts index 8b0ec109..ed3a70e7 100644 --- a/src/orm/OrmSubject.ts +++ b/src/orm/OrmSubject.ts @@ -8,7 +8,7 @@ import { import { isNaturalNumber } from '../engine/util'; import { asValues, compareValues, minimiseValue } from '../engine/jsonld'; import { SubjectPropertyValues } from '../subjects'; -import { isPromise } from 'asynciterator'; +import async from '../engine/async'; import { ConstructOrmSubject, OrmUpdating } from './OrmDomain'; import { Iri } from '@m-ld/jsonld'; import 'reflect-metadata'; @@ -300,7 +300,7 @@ export abstract class OrmSubject { */ protected setUpdated(result: unknown | Promise) { // Don't make unnecessary promises - if (this._updated !== this || isPromise(result)) { + if (this._updated !== this || async.isPromise(result)) { this._updated = Promise.all([this._updated, result]).then(() => this); this._updated.catch(() => {}); // Prevents unhandled rejection } diff --git a/test/StateEngine.test.ts b/test/StateEngine.test.ts index 06b585fb..7c2409cc 100644 --- a/test/StateEngine.test.ts +++ b/test/StateEngine.test.ts @@ -3,7 +3,7 @@ import { MeldUpdate } from '../src'; import { LockManager } from '../src/engine/locks'; import { CloneEngine, StateEngine } from '../src/engine/StateEngine'; import { SubjectGraph } from '../src/engine/SubjectGraph'; -import { single } from 'asynciterator'; +import async from '../src/engine/async'; import { DataFactory as RdfDataFactory, Quad } from 'rdf-data-factory'; import { drain } from 'rx-flowable'; import { consume } from 'rx-flowable/consume'; @@ -14,7 +14,7 @@ describe('State Engine', () => { readonly lock = new LockManager<'state'>(); readonly dataUpdates = new Source(); // noinspection JSUnusedGlobalSymbols - match: CloneEngine['match'] = () => single( + match: CloneEngine['match'] = () => async.single( rdf.quad(rdf.namedNode('state'), rdf.namedNode('tick'), rdf.literal(this.tick.toString())));