Skip to content

Commit

Permalink
stream: improve from perf
Browse files Browse the repository at this point in the history
PR-URL: nodejs#50359
Reviewed-By: Vinícius Lourenço Claro Cardoso <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
  • Loading branch information
rluvaton authored Oct 26, 2023
1 parent 4ddb263 commit 10d51e8
Showing 1 changed file with 113 additions and 18 deletions.
131 changes: 113 additions & 18 deletions lib/internal/streams/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ function from(Readable, iterable, opts) {
throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
}


const readable = new Readable({
objectMode: true,
highWaterMark: 1,
Expand All @@ -46,11 +47,19 @@ function from(Readable, iterable, opts) {
// Flag to protect against _read
// being called before last iteration completion.
let reading = false;
let isAsyncValues = false;

readable._read = function() {
if (!reading) {
reading = true;
next();

if (isAsync) {
nextAsync();
} else if (isAsyncValues) {
nextSyncWithAsyncValues();
} else {
nextSyncWithSyncValues();
}
}
};

Expand Down Expand Up @@ -78,29 +87,115 @@ function from(Readable, iterable, opts) {
}
}

async function next() {
// There are a lot of duplication here, it's done on purpose for performance
// reasons - avoid await when not needed.

function nextSyncWithSyncValues() {
for (;;) {
try {
const { value, done } = iterator.next();

if (done) {
readable.push(null);
return;
}

if (value &&
typeof value.then === 'function') {
return changeToAsyncValues(value);
}

if (value === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(value)) {
continue;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
break;
}
}

async function changeToAsyncValues(value) {
isAsyncValues = true;

try {
const res = await value;

if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(res)) {
nextSyncWithAsyncValues();
return;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
}

async function nextSyncWithAsyncValues() {
for (;;) {
try {
const { value, done } = isAsync ?
await iterator.next() :
iterator.next();
const { value, done } = iterator.next();

if (done) {
readable.push(null);
} else {
const res = (value &&
typeof value.then === 'function') ?
await value :
value;
if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
} else if (readable.push(res)) {
continue;
} else {
reading = false;
}
return;
}

const res = (value &&
typeof value.then === 'function') ?
await value :
value;

if (res === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(res)) {
continue;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
break;
}
}

async function nextAsync() {
for (;;) {
try {
const { value, done } = await iterator.next();

if (done) {
readable.push(null);
return;
}

if (value === null) {
reading = false;
throw new ERR_STREAM_NULL_VALUES();
}

if (readable.push(value)) {
continue;
}

reading = false;
} catch (err) {
readable.destroy(err);
}
Expand Down

0 comments on commit 10d51e8

Please sign in to comment.