diff --git a/asynciterator.ts b/asynciterator.ts index 8c22f8d..651b406 100644 --- a/asynciterator.ts +++ b/asynciterator.ts @@ -449,7 +449,7 @@ export class AsyncIterator extends EventEmitter { @param {Array|module:asynciterator.AsyncIterator} append Items to insert after this iterator's (remaining) items @returns {module:asynciterator.AsyncIterator} A new iterator that appends and prepends items to this iterator */ - surround(prepend: T[] | AsyncIterator, append: T[] | AsyncIterator): AsyncIterator { + surround(prepend: AsyncIteratorOrArray, append: AsyncIteratorOrArray): AsyncIterator { return this.transform({ prepend, append }); } @@ -958,8 +958,6 @@ export class BufferedIterator extends AsyncIterator { } } -type Source = AsyncIterator & { _destination: TransformIterator }; - /** An iterator that generates items based on a source iterator. This class serves as a base class for other iterators. @@ -1140,13 +1138,13 @@ export class TransformIterator extends BufferedIterator { } } -function destinationEmitError(this: Source, error: Error) { +function destinationEmitError(this: Source, error: Error) { this._destination.emit('error', error); } -function destinationCloseWhenDone(this: Source) { +function destinationCloseWhenDone(this: Source) { (this._destination as any)._closeWhenDone(); } -function destinationFillBuffer(this: Source) { +function destinationFillBuffer(this: Source) { (this._destination as any)._fillBuffer(); } @@ -1377,6 +1375,98 @@ export class MultiTransformIterator extends TransformIterator { } } +/** + An iterator that generates items by reading from multiple other iterators. + @extends module:asynciterator.BufferedIterator +*/ +export class UnionIterator extends BufferedIterator { + private _sources : Source[] = []; + private _sourcesComplete = true; + private _currentSource = -1; + + /** + Creates a new `UnionIterator`. + @param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from + @param {object} [options] Settings of the iterator + */ + constructor(sources: AsyncIteratorOrArray>, + options?: BufferedIteratorOptions) { + super(options); + + // Sources have been passed as a non-empty array + if (Array.isArray(sources) && sources.length > 0) { + for (const source of sources) + this._addSource(source as Source); + } + // Sources have been passed as an open iterator + else if (isEventEmitter(sources) && !sources.done) { + this._sourcesComplete = false; + sources.on('data', source => { + this._addSource(source as Source); + this._fillBufferAsync(); + }); + sources.on('end', () => { + this._sourcesComplete = true; + this._fillBuffer(); + }); + sources.on('error', error => this.emit('error', error)); + } + // Sources are an empty list + else { + this.close(); + } + } + + // Adds the given source to the internal sources array + protected _addSource(source: Source) { + if (!source.done) { + this._sources.push(source); + source._destination = this; + source.on('error', destinationEmitError); + source.on('readable', destinationFillBuffer); + source.on('end', destinationRemoveEmptySources); + } + } + + // Removes sources that will no longer emit items + protected _removeEmptySources() { + this._sources = this._sources.filter((source, index) => { + // Adjust the index of the current source if needed + if (source.done && index <= this._currentSource) + this._currentSource--; + return !source.done; + }); + this._fillBuffer(); + } + + // Reads items from the next sources + protected _read(count: number, done: () => void): void { + // Try to read `count` items + let lastCount = 0, item : T | null; + while (lastCount !== (lastCount = count)) { + // Try every source at least once + for (let i = 0; i < this._sources.length && count > 0; i++) { + // Pick the next source + this._currentSource = (this._currentSource + 1) % this._sources.length; + const source = this._sources[this._currentSource]; + // Attempt to read an item from that source + if ((item = source.read()) !== null) { + count--; + this._push(item); + } + } + } + // Close this iterator if all of its sources have been read + if (this._sourcesComplete && this._sources.length === 0) + this.close(); + done(); + } +} + +function destinationRemoveEmptySources(this: Source) { + (this._destination as any)._removeEmptySources(); +} + /** An iterator that copies items from another iterator. @@ -1631,6 +1721,14 @@ export function fromArray(items: T[]) { return new ArrayIterator(items); } +/** + Creates an iterator containing all items from the given iterators. + @param {Array} items the items + */ +export function union(sources: AsyncIteratorOrArray>) { + return new UnionIterator(sources); +} + /** Creates an iterator of integers for the given numeric range. @param {Array} items the items @@ -1668,10 +1766,13 @@ export interface TransformIteratorOptions extends BufferedIteratorOptions { export interface TransformOptions extends TransformIteratorOptions { offset?: number; limit?: number; - prepend?: D[] | AsyncIterator; - append?: D[] | AsyncIterator; + prepend?: AsyncIteratorOrArray; + append?: AsyncIteratorOrArray; filter?: (item: S) => boolean; map?: (item: S) => D; transform?: (item: S, done: () => void) => void; } + +type AsyncIteratorOrArray = AsyncIterator | T[]; +type Source = AsyncIterator & { _destination: AsyncIterator }; diff --git a/test/UnionIterator-test.js b/test/UnionIterator-test.js new file mode 100644 index 0000000..56a4c16 --- /dev/null +++ b/test/UnionIterator-test.js @@ -0,0 +1,311 @@ +import { + AsyncIterator, + UnionIterator, + ArrayIterator, + BufferedIterator, + EmptyIterator, + union, + range, +} from '../asynciterator.mjs'; + +import { EventEmitter } from 'events'; +import queueMicrotask from 'queue-microtask'; + +describe('UnionIterator', () => { + describe('The UnionIterator function', () => { + describe('the result when called with `new`', () => { + let instance; + before(() => { instance = new UnionIterator(); }); + + it('should be a UnionIterator object', () => { + instance.should.be.an.instanceof(UnionIterator); + }); + + it('should be a AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + + describe('the result when called through `union`', () => { + let instance; + before(() => { instance = union(); }); + + it('should be an UnionIterator object', () => { + instance.should.be.an.instanceof(UnionIterator); + }); + + it('should be an AsyncIterator object', () => { + instance.should.be.an.instanceof(AsyncIterator); + }); + + it('should be an EventEmitter object', () => { + instance.should.be.an.instanceof(EventEmitter); + }); + }); + }); + + it('should include all data from 3 sources', async () => { + const iterator = new UnionIterator([ + range(0, 2), + range(3, 4), + range(5, 6), + ]); + (await toArray(iterator)).sort().should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + + it('should include all data from 1 non-empty and 4 empty sources', async () => { + const iterator = new UnionIterator([ + new EmptyIterator(), + new EmptyIterator(), + range(0, 2), + new EmptyIterator(), + new EmptyIterator(), + ]); + (await toArray(iterator)).sort().should.eql([0, 1, 2]); + }); + + describe('when constructed with an array of 0 sources', () => { + let iterator; + before(() => { + const sources = []; + iterator = new UnionIterator(sources); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + + describe('when constructed with an array of 1 source', () => { + let iterator; + before(() => { + const sources = [range(0, 2)]; + iterator = new UnionIterator(sources); + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + }); + + describe('when constructed with an array of 2 sources', () => { + let iterator; + before(() => { + const sources = [range(0, 2), range(3, 6)]; + iterator = new UnionIterator(sources); + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + }); + + describe('when constructed with an empty iterator', () => { + let iterator; + before(() => { + iterator = new UnionIterator(new EmptyIterator()); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + + describe('when constructed with an iterator of 0 sources', () => { + let iterator; + before(() => { + const sources = []; + iterator = new UnionIterator(new ArrayIterator(sources)); + }); + + it('should have ended', () => { + iterator.ended.should.be.true; + }); + }); + + describe('when constructed with an iterator of 1 source', () => { + let iterator; + before(() => { + const sources = [range(0, 2)]; + iterator = new UnionIterator(new ArrayIterator(sources)); + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + }); + + describe('when constructed with an iterator of 2 sources', () => { + let iterator; + before(() => { + const sources = [range(0, 2), range(3, 6)]; + iterator = new UnionIterator(new ArrayIterator(sources)); + }); + + it('should not have ended', () => { + iterator.ended.should.be.false; + }); + }); + + describe('when the source iterator emits an error', () => { + let callback, error; + before(() => { + const sources = new BufferedIterator(); + const iterator = new UnionIterator(sources); + iterator.on('error', callback = sinon.spy()); + sources.emit('error', error = new Error('error')); + }); + + it('should emit the error', () => { + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + }); + + describe('a UnionIterator with two sources', () => { + let iterator, sources; + + beforeEach(() => { + sources = [ + range(0, 2), + range(3, 6), + ]; + iterator = new UnionIterator(sources); + }); + + it('should emit an error when the first iterator emits an error', () => { + const error = new Error('error'); + const callback = sinon.spy(); + iterator.on('error', callback); + sources[0].emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + + it('should emit an error when the second iterator emits an error', () => { + const error = new Error('error'); + const callback = sinon.spy(); + iterator.on('error', callback); + sources[1].emit('error', error); + callback.should.have.been.calledOnce; + callback.should.have.been.calledWith(error); + }); + + it('should not emit an error when no iterators emit an error', async () => { + (await toArray(iterator)).should.be.instanceof(Array); + }); + + it('should allow the _read method to be called multiple times', () => { + iterator._read(1, noop); + iterator._read(1, noop); + }); + + it('should make a round-robin union of the data elements', async () => { + (await toArray(iterator)).sort().should.eql([0, 1, 2, 3, 4, 5, 6]); + }); + }); + + describe('a UnionIterator with sources that are added dynamically', () => { + let iterator, sources, sourceIterator; + before(() => { + sourceIterator = new BufferedIterator(); + iterator = new UnionIterator(sourceIterator); + sources = [ + range(0, 2), + range(3, 5), + range(6, 7), + ]; + }); + + describe('before sources are added', () => { + it('returns null on read', () => { + expect(iterator.read()).to.be.null; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + }); + + describe('after one source is added', () => { + before(() => { + sourceIterator._push(sources[0]); + }); + + it('should read the whole stream', () => { + expect(iterator.read()).to.equal(0); + expect(iterator.read()).to.equal(1); + expect(iterator.read()).to.equal(2); + expect(iterator.read()).to.be.null; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + }); + + + describe('after two streams have been added', () => { + before(() => { + sourceIterator._push(sources[1]); + sourceIterator._push(sources[2]); + }); + + it('should read 2 streams in round-robin order', async () => { + // Read 4 buffered items + expect(iterator.read()).to.equal(3); + expect(iterator.read()).to.equal(6); + expect(iterator.read()).to.equal(4); + expect(iterator.read()).to.equal(7); + + // Buffer + await new Promise(resolve => queueMicrotask(resolve)); + + // Read remaining items + expect(iterator.read()).to.equal(5); + expect(iterator.read()).to.be.null; + }); + + it('should not have ended', () => { + expect(iterator.ended).to.be.false; + }); + }); + + describe('after the source stream ends', () => { + before(() => { + sourceIterator._end(); + }); + + it('returns null on read', () => { + expect(iterator.read()).to.be.null; + }); + + it('should have ended', () => { + expect(iterator.ended).to.be.true; + }); + }); + }); + + it('should end when the end event of the source stream is delayed', async () => { + const delayed = new AsyncIterator(); + const iterator = new UnionIterator(delayed); + delayed.readable = true; + queueMicrotask(() => delayed.close()); + (await toArray(iterator)).should.eql([]); + }); +}); + +function toArray(stream) { + return new Promise((resolve, reject) => { + const array = []; + stream.on('data', data => array.push(data)); + stream.on('error', reject); + stream.on('end', () => resolve(array)); + }); +} + +function noop() { /* */ }