Skip to content

Commit

Permalink
#118: Using queueMicrotask exclusively for asynciterator task schedul…
Browse files Browse the repository at this point in the history
  • Loading branch information
gsvarovsky committed Nov 21, 2022
1 parent 8b17dbc commit 515f8f0
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 28 deletions.
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
"mqtt-pattern": "^1.2.0",
"quadstore": "^11.0.3",
"quadstore-comunica": "^3.0.3",
"queue-microtask": "^1.2.3",
"rdf-data-factory": "^1.0.4",
"reflect-metadata": "^0.1.13",
"rx-flowable": "^0.1.0",
Expand Down
12 changes: 12 additions & 0 deletions src/engine/async.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/**
* Library override
*/

import * as async from 'asynciterator';

/** @see https://github.com/RubenVerborgh/AsyncIterator/issues/96 */
async.setTaskScheduler(require('queue-microtask'));

// This awkward export is to prevent WebStorm from flagging import
// simplification warnings that would bypass this module
export default async;
8 changes: 4 additions & 4 deletions src/engine/dataset/DatasetEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { AbstractMeld, comesAlive } from '../AbstractMeld';
import { RemoteOperations } from './RemoteOperations';
import { CloneEngine } from '../StateEngine';
import { MeldError, MeldErrorStatus } from '../MeldError';
import { AsyncIterator, TransformIterator, wrap } from 'asynciterator';
import async from '../async';
import { BaseStream } from '../../rdfjs-support';
import { Consumable } from 'rx-flowable';
import { MeldApp, MeldConfig } from '../../config';
Expand Down Expand Up @@ -523,11 +523,11 @@ export class DatasetEngine extends AbstractMeld implements CloneEngine, MeldLoca
}

private wrapStreamFn<P extends any[], T>(
fn: (...args: P) => BaseStream<T>): ((...args: P) => AsyncIterator<T>) {
fn: (...args: P) => BaseStream<T>): ((...args: P) => async.AsyncIterator<T>) {
return (...args) => {
return new TransformIterator<T>(this.closed ?
return new async.TransformIterator<T>(this.closed ?
Promise.reject(new MeldError('Clone has closed')) :
this.lock.share('live', 'sparql', async () => wrap(fn(...args))));
this.lock.share('live', 'sparql', async () => async.wrap(fn(...args))));
};
}

Expand Down
30 changes: 15 additions & 15 deletions src/engine/dataset/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { Algebra } from 'sparqlalgebrajs';
import { Engine } from 'quadstore-comunica';
import { DataFactory as RdfDataFactory } from 'rdf-data-factory';
import { JRQL, M_LD, RDF, XS } from '../../ns';
import { AsyncIterator, empty, EmptyIterator, SimpleTransformIterator } from 'asynciterator';
import async from '../async';
import { MutableOperation } from '../ops';
import { MeldError } from '../MeldError';
import { BaseStream, Binding, CountableRdf, QueryableRdfSource } from '../../rdfjs-support';
Expand Down Expand Up @@ -125,11 +125,11 @@ export interface Graph extends RdfFactory, QueryableRdfSource {
readonly name: GraphName;
readonly lock: LockManager<'state'>;

query(...args: Parameters<QuadSource['match']>): AsyncIterator<Quad>;
query(query: Algebra.Construct): AsyncIterator<Quad>;
query(query: Algebra.Describe): AsyncIterator<Quad>;
query(query: Algebra.Project): AsyncIterator<Binding>;
query(query: Algebra.Distinct): AsyncIterator<Binding>;
query(...args: Parameters<QuadSource['match']>): async.AsyncIterator<Quad>;
query(query: Algebra.Construct): async.AsyncIterator<Quad>;
query(query: Algebra.Describe): async.AsyncIterator<Quad>;
query(query: Algebra.Project): async.AsyncIterator<Binding>;
query(query: Algebra.Distinct): async.AsyncIterator<Binding>;

ask(query: Algebra.Ask): Promise<boolean>;
}
Expand Down Expand Up @@ -413,7 +413,7 @@ class QuadStoreGraph implements Graph {

match: Graph['match'] = (subject, predicate, object, graph) => {
if (graph != null && !graph.equals(this.name))
return empty();
return async.empty();
else
// Must specify graph term due to optimised indexing
return (<QuadSource>this.dataset.store).match(subject, predicate, object, this.name);
Expand All @@ -427,14 +427,14 @@ class QuadStoreGraph implements Graph {
return (<CountableRdf>this.dataset.store).countQuads(subject, predicate, object, this.name);
};

query(...args: Parameters<QuadSource['match']>): AsyncIterator<Quad>;
query(query: Algebra.Construct): AsyncIterator<Quad>;
query(query: Algebra.Describe): AsyncIterator<Quad>;
query(query: Algebra.Project): AsyncIterator<Binding>;
query(query: Algebra.Distinct): AsyncIterator<Binding>;
query(...args: Parameters<QuadSource['match']>): async.AsyncIterator<Quad>;
query(query: Algebra.Construct): async.AsyncIterator<Quad>;
query(query: Algebra.Describe): async.AsyncIterator<Quad>;
query(query: Algebra.Project): async.AsyncIterator<Binding>;
query(query: Algebra.Distinct): async.AsyncIterator<Binding>;
query(
...args: Parameters<QuadSource['match']> | [Algebra.Operation]
): AsyncIterator<Binding | Quad> {
): async.AsyncIterator<Binding | Quad> {
const source: Promise<BaseStream<Quad | Bindings>> = (async () => {
try {
const [algebra] = args;
Expand All @@ -448,12 +448,12 @@ class QuadStoreGraph implements Graph {
} catch (err) {
// TODO: Comunica bug? Cannot read property 'close' of undefined, if stream empty
if (err instanceof TypeError)
return new EmptyIterator<Quad | Bindings>();
return new async.EmptyIterator<Quad | Bindings>();
throw err;
}
throw new Error('Expected bindings or quads');
})();
return new SimpleTransformIterator<Bindings | Quad, Binding | Quad>(
return new async.SimpleTransformIterator<Bindings | Quad, Binding | Quad>(
// A transform iterator is actually capable of taking a base stream, despite typings
this.dataset.lock.extend('state', 'query', <any>source), {
map: item => ('type' in item && item.type === 'bindings') ? toBinding(item) : <Quad>item
Expand Down
10 changes: 5 additions & 5 deletions src/orm/OrmDomain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { SubjectUpdater } from '../updates';
import { ReadLatchable } from '../engine/index';
import { BehaviorSubject, firstValueFrom } from 'rxjs';
import { filter } from 'rxjs/operators';
import { isPromise } from 'asynciterator';
import async from '../engine/async';
import { array } from '../util';
import { MeldApp, MeldConfig } from '../config';

Expand Down Expand Up @@ -171,7 +171,7 @@ export class OrmDomain {
if (update != null) {
const updater = new SubjectUpdater(update);
for (let subject of this.domain._cache.values()) {
if (isPromise(subject))
if (async.isPromise(subject))
await subject
.then(subject => this.updateSubject(updater, subject, deleted))
.catch(); // Error will have been reported by get()
Expand All @@ -190,9 +190,9 @@ export class OrmDomain {
// In the course of an update, the cache may mutate. Rely on Map order to
// ensure any new subjects are captured and updated.
for (let [id, subject] of this.domain._cache.entries()) {
if (isPromise(subject))
if (async.isPromise(subject))
this.domain._cache.set(id, subject = await subject);
if (isPromise(subject.updated))
if (async.isPromise(subject.updated))
await settled(subject.updated);
}
}
Expand Down Expand Up @@ -282,7 +282,7 @@ export class OrmDomain {
commit(): Update {
const update = { '@delete': [] as Subject[], '@insert': [] as Subject[] };
for (let subject of this._cache.values()) {
if (isPromise(subject))
if (async.isPromise(subject))
throw new TypeError('ORM domain has not finished updating');
const subjectUpdate = subject.commit();
update['@delete'].push(...array<Subject>(subjectUpdate['@delete']));
Expand Down
4 changes: 2 additions & 2 deletions src/orm/OrmSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
import { isNaturalNumber } from '../engine/util';
import { asValues, compareValues, minimiseValue } from '../engine/jsonld';
import { SubjectPropertyValues } from '../subjects';
import { isPromise } from 'asynciterator';
import async from '../engine/async';
import { ConstructOrmSubject, OrmUpdating } from './OrmDomain';
import { Iri } from '@m-ld/jsonld';
import 'reflect-metadata';
Expand Down Expand Up @@ -300,7 +300,7 @@ export abstract class OrmSubject {
*/
protected setUpdated(result: unknown | Promise<unknown>) {
// Don't make unnecessary promises
if (this._updated !== this || isPromise(result)) {
if (this._updated !== this || async.isPromise(result)) {
this._updated = Promise.all([this._updated, result]).then(() => this);
this._updated.catch(() => {}); // Prevents unhandled rejection
}
Expand Down
4 changes: 2 additions & 2 deletions test/StateEngine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { MeldUpdate } from '../src';
import { LockManager } from '../src/engine/locks';
import { CloneEngine, StateEngine } from '../src/engine/StateEngine';
import { SubjectGraph } from '../src/engine/SubjectGraph';
import { single } from 'asynciterator';
import async from '../src/engine/async';
import { DataFactory as RdfDataFactory, Quad } from 'rdf-data-factory';
import { drain } from 'rx-flowable';
import { consume } from 'rx-flowable/consume';
Expand All @@ -14,7 +14,7 @@ describe('State Engine', () => {
readonly lock = new LockManager<'state'>();
readonly dataUpdates = new Source<MeldUpdate>();
// noinspection JSUnusedGlobalSymbols
match: CloneEngine['match'] = () => single(
match: CloneEngine['match'] = () => async.single(
rdf.quad(rdf.namedNode('state'),
rdf.namedNode('tick'),
rdf.literal(this.tick.toString())));
Expand Down

0 comments on commit 515f8f0

Please sign in to comment.