Skip to content

Commit

Permalink
Add UnionIterator.
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenVerborgh committed Jun 16, 2020
1 parent c9b2d5b commit f405a7d
Show file tree
Hide file tree
Showing 2 changed files with 420 additions and 8 deletions.
117 changes: 109 additions & 8 deletions asynciterator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ export class AsyncIterator<T> extends EventEmitter {
@param {Array|module:asynciterator.AsyncIterator} append Items to insert after this iterator's (remaining) items
@returns {module:asynciterator.AsyncIterator} A new iterator that appends and prepends items to this iterator
*/
surround(prepend: T[] | AsyncIterator<T>, append: T[] | AsyncIterator<T>): AsyncIterator<T> {
surround(prepend: AsyncIteratorOrArray<T>, append: AsyncIteratorOrArray<T>): AsyncIterator<T> {
return this.transform({ prepend, append });
}

Expand Down Expand Up @@ -958,8 +958,6 @@ export class BufferedIterator<T> extends AsyncIterator<T> {
}
}

type Source<S> = AsyncIterator<S> & { _destination: TransformIterator<any, any> };

/**
An iterator that generates items based on a source iterator.
This class serves as a base class for other iterators.
Expand Down Expand Up @@ -1140,13 +1138,13 @@ export class TransformIterator<S, D = S> extends BufferedIterator<D> {
}
}

function destinationEmitError(this: Source<any>, error: Error) {
function destinationEmitError<S>(this: Source<S>, error: Error) {
this._destination.emit('error', error);
}
function destinationCloseWhenDone(this: Source<any>) {
function destinationCloseWhenDone<S>(this: Source<S>) {
(this._destination as any)._closeWhenDone();
}
function destinationFillBuffer(this: Source<any>) {
function destinationFillBuffer<S>(this: Source<S>) {
(this._destination as any)._fillBuffer();
}

Expand Down Expand Up @@ -1377,6 +1375,98 @@ export class MultiTransformIterator<S, D = S> extends TransformIterator<S, D> {
}
}

/**
An iterator that generates items by reading from multiple other iterators.
@extends module:asynciterator.BufferedIterator
*/
export class UnionIterator<T> extends BufferedIterator<T> {
private _sources : Source<T>[] = [];
private _sourcesComplete = true;
private _currentSource = -1;

/**
Creates a new `UnionIterator`.
@param {module:asynciterator.AsyncIterator|Array} [sources] The sources to read from
@param {object} [options] Settings of the iterator
*/
constructor(sources: AsyncIteratorOrArray<AsyncIterator<T>>,
options?: BufferedIteratorOptions) {
super(options);

// Sources have been passed as a non-empty array
if (Array.isArray(sources) && sources.length > 0) {
for (const source of sources)
this._addSource(source as Source<T>);
}
// Sources have been passed as an open iterator
else if (isEventEmitter(sources) && !sources.done) {
this._sourcesComplete = false;
sources.on('data', source => {
this._addSource(source as Source<T>);
this._fillBufferAsync();
});
sources.on('end', () => {
this._sourcesComplete = true;
this._fillBuffer();
});
sources.on('error', error => this.emit('error', error));
}
// Sources are an empty list
else {
this.close();
}
}

// Adds the given source to the internal sources array
protected _addSource(source: Source<T>) {
if (!source.done) {
this._sources.push(source);
source._destination = this;
source.on('error', destinationEmitError);
source.on('readable', destinationFillBuffer);
source.on('end', destinationRemoveEmptySources);
}
}

// Removes sources that will no longer emit items
protected _removeEmptySources() {
this._sources = this._sources.filter((source, index) => {
// Adjust the index of the current source if needed
if (source.done && index <= this._currentSource)
this._currentSource--;
return !source.done;
});
this._fillBuffer();
}

// Reads items from the next sources
protected _read(count: number, done: () => void): void {
// Try to read `count` items
let lastCount = 0, item : T | null;
while (lastCount !== (lastCount = count)) {
// Try every source at least once
for (let i = 0; i < this._sources.length && count > 0; i++) {
// Pick the next source
this._currentSource = (this._currentSource + 1) % this._sources.length;
const source = this._sources[this._currentSource];
// Attempt to read an item from that source
if ((item = source.read()) !== null) {
count--;
this._push(item);
}
}
}
// Close this iterator if all of its sources have been read
if (this._sourcesComplete && this._sources.length === 0)
this.close();
done();
}
}

function destinationRemoveEmptySources<T>(this: Source<T>) {
(this._destination as any)._removeEmptySources();
}


/**
An iterator that copies items from another iterator.
Expand Down Expand Up @@ -1631,6 +1721,14 @@ export function fromArray<T>(items: T[]) {
return new ArrayIterator<T>(items);
}

/**
Creates an iterator containing all items from the given iterators.
@param {Array} items the items
*/
export function union<T>(sources: AsyncIteratorOrArray<AsyncIterator<T>>) {
return new UnionIterator<T>(sources);
}

/**
Creates an iterator of integers for the given numeric range.
@param {Array} items the items
Expand Down Expand Up @@ -1668,10 +1766,13 @@ export interface TransformIteratorOptions<S> extends BufferedIteratorOptions {
export interface TransformOptions<S, D> extends TransformIteratorOptions<S> {
offset?: number;
limit?: number;
prepend?: D[] | AsyncIterator<D>;
append?: D[] | AsyncIterator<D>;
prepend?: AsyncIteratorOrArray<D>;
append?: AsyncIteratorOrArray<D>;

filter?: (item: S) => boolean;
map?: (item: S) => D;
transform?: (item: S, done: () => void) => void;
}

type AsyncIteratorOrArray<T> = AsyncIterator<T> | T[];
type Source<T> = AsyncIterator<T> & { _destination: AsyncIterator<any> };
Loading

0 comments on commit f405a7d

Please sign in to comment.