Skip to content

Commit

Permalink
Make iterators AsyncIterable, Closes RubenVerborgh#89
Browse files Browse the repository at this point in the history
  • Loading branch information
rubensworks committed Jul 4, 2023
1 parent 9e80cc5 commit c330211
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
node: true,
},

globals: {
AsyncIterable: false,
},

parser: "@typescript-eslint/parser",

parserOptions: {
Expand Down
51 changes: 49 additions & 2 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,11 @@ export const ENDED = 1 << 4;
*/
export const DESTROYED = 1 << 5;


/**
An asynchronous iterator provides pull-based access to a stream of objects.
@extends module:asynciterator.EventEmitter
*/
export class AsyncIterator<T> extends EventEmitter {
export class AsyncIterator<T> extends EventEmitter implements AsyncIterable<T> {
protected _state: number;
private _readable = false;
protected _properties?: { [name: string]: any };
Expand Down Expand Up @@ -568,6 +567,47 @@ export class AsyncIterator<T> extends EventEmitter {
clone(): ClonedIterator<T> {
return new ClonedIterator<T>(this);
}

/**
* An AsyncIterator is async iterable.
* This allows iterators to be used via the for-await syntax.
* @returns {ESAsyncIterator<T>} An EcmaScript AsyncIterator
*/
[Symbol.asyncIterator](): ESAsyncIterator<T> {
const it = this;
// An EcmaScript AsyncIterator exposes the next() function that can be invoked repeatedly
return {
next(): Promise<IteratorResult<T>> {
return new Promise<IteratorResult<T>>((resolve, reject) => {
it.on('error', reject);
function tryResolve(): void {
const value = it.read();
if (value !== null) {
// Immediately resolve if we have read an element
it.removeListener('error', reject);
resolve({ done: false, value });
}
else if (it.done) {
// Close if we're done
it.removeListener('error', reject);
resolve({ done: true, value: undefined });
}
else {
// In all other cases, wait for the iterator to become readable or ended
const nextCallback = () => {
it.removeListener('readable', nextCallback);
it.removeListener('end', nextCallback);
tryResolve();
};
it.once('readable', nextCallback);
it.once('end', nextCallback);
}
}
tryResolve();
});
},
};
}
}

// Starts emitting `data` events when `data` listeners are added
Expand Down Expand Up @@ -2245,6 +2285,13 @@ export interface MultiTransformOptions<S, D> extends TransformOptions<S, D> {
multiTransform?: (item: S) => AsyncIterator<D>;
}

/**
* Copy of the EcmaScript AsyncIterator interface, which we can not use directly due to the name conflict.
*/
interface ESAsyncIterator<T> {
next(value?: any): Promise<IteratorResult<T>>;
}

type MaybePromise<T> =
T |
Promise<T>;
Expand Down
107 changes: 107 additions & 0 deletions test/AsyncIterator-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,113 @@ describe('AsyncIterator', () => {
});
});
});

describe('The AsyncIterator#[Symbol.asyncIterator] function', () => {
it('should be a function', () => {
expect(AsyncIterator.prototype[Symbol.asyncIterator]).to.be.a('function');
});

describe('called on an empty iterator', () => {
let iterator;
before(() => {
iterator = new AsyncIterator();
iterator.close();
});

it('should go through zero iterations', async () => {
let i = 0;
for await (const value of iterator) {
value.should.not.equal(undefined);
i++;
}
i.should.equal(0);
});
});

describe('called on an iterator with two items', () => {
let iterator;
before(() => {
let i = 0;
iterator = new AsyncIterator();
iterator.readable = true;
iterator.read = () => {
if (i++ < 2)
return i;
iterator.close();
return null;
};
});

it('should go through two iterations', async () => {
const values = [];
for await (const value of iterator)
values.push(value);
values.should.eql([1, 2]);
});
});

describe('called on an iterator with two slowly generated items', () => {
let iterator;
before(() => {
let i = 0;
let generate = false;
iterator = new AsyncIterator();
iterator.readable = false;
iterator.read = () => {
if (!generate) {
generate = true;
setImmediate(() => {
iterator.readable = true;
});
return null;
}
generate = false;
iterator.readable = false;

if (i++ < 2)
return i;
iterator.close();
return null;
};
});

it('should go through two iterations', async () => {
const values = [];
for await (const value of iterator)
values.push(value);
values.should.eql([1, 2]);
});
});

describe('called on an erroring iterator', () => {
let iterator;
before(() => {
let i = 0;
iterator = new AsyncIterator();
iterator.readable = true;
iterator.read = () => {
if (i++ < 2)
return i;
iterator.emit('error', new Error('AsyncIterator error'));
return null;
};
});

it('should go through two iterations and then throw', async () => {
const values = [];
let caughtError;
try {
for await (const value of iterator)
values.push(value);
}
catch (error) {
caughtError = error;
}
values.should.eql([1, 2]);
caughtError.message.should.eql('AsyncIterator error');
});
});
});
});

describe('Type-checking functions', () => {
Expand Down
17 changes: 17 additions & 0 deletions test/integration-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ describe('Integration tests', () => {
});
});

describe('A sequence of ArrayIterator, TransformIterator, and Unioniterator is AsyncIterable', () => {
let arrayIterator, transformIterator, unionIterator;

before(() => {
arrayIterator = new ArrayIterator([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], { autoStart: false });
transformIterator = new TransformIterator(arrayIterator, { autoStart: false });
unionIterator = new UnionIterator([transformIterator], { autoStart: false });
});

it('returns all values', async () => {
const values = [];
for await (const value of unionIterator)
values.push(value);
values.should.eql([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
});
});

describe('Cloning iterators', () => {
describe('A clone of an empty ArrayIterator without autoStart', () => {
let arrayIterator, clonedIterator;
Expand Down

0 comments on commit c330211

Please sign in to comment.