Skip to content

Commit

Permalink
FF110 Using ReadableStream with async iteration (mdn#25284)
Browse files Browse the repository at this point in the history
* Using Async iteration of ReadableStream

* Using readable streams - explain async iteration using await for of

* Apply suggestions from code review

Co-authored-by: Joshua Chen <[email protected]>

* Expand on ways of aborting

* Little refactors

---------

Co-authored-by: Joshua Chen <[email protected]>
  • Loading branch information
hamishwillee and Josh-Cena authored Mar 27, 2023
1 parent 0376a43 commit 0f01eb2
Showing 1 changed file with 283 additions and 1 deletion.
284 changes: 283 additions & 1 deletion files/en-us/web/api/streams_api/using_readable_streams/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,289 @@ This is the standard pattern you'll see when using stream readers:
3. If there is more stream to read, you process the current chunk then run the function again.
4. You keep chaining the `pipe` function until there is no more stream to read, in which case step 2 is followed.

> **Note:** The function looks as if `pump()` calls itself and leads to a potentially deep recursion. However, because `pump` is asynchronous and each `pump()` call is at the end of the promise handler, it's actually analogous to a chain of promise handlers.
Removing all the code to actually perform a "pump", the code might be generalized to something like this:

```js
fetch("http://example.com/somefile.txt")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
while (true) {
// read() returns a promise that resolves when a value has been received
reader.read().then(function pump({ done, value }) {
if (done) {
// Do something with last chunk of data then exit reader
return;
}
// Otherwise do something here to process current chunk

// Read some more, and call this function again
return reader.read().then(pump);
});
}
})
.catch((err) => console.error(err));
```

> **Note:** The function looks as if `pump()` calls itself and leads to a potentially deep recursion.
> However, because `pump` is asynchronous and each `pump()` call is at the end of the promise handler, it's actually analogous to a chain of promise handlers.
Reading the stream is even easier when written using async/await rather than promises:

```js
async function readData(url) {
const response = await fetch(url);
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
// Do something with last chunk of data then exit reader
return;
}
// Otherwise do something here to process current chunk
}
}
```

## Consuming a fetch() using asynchronous iteration

There is another even simpler way to consume a `fetch()`, which is to iterate the returned `response.body` using the [`for await...of`](/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of) syntax.
This works because the `response.body` returns a `ReadableStream`, which is an [async iterable](/en-US/docs/Web/API/ReadableStream#async_iteration).

Using this approach, the example code in the previous section can be rewritten as shown:

```js
async function readData(url) {
const response = await fetch(url);
for await (const chunk of response.body) {
// Do something with each "chunk"
}
// Exit when done
}
```

If you want to stop iterating the stream you can cancel the `fetch()` operation using an [`AbortController`](/en-US/docs/Web/API/AbortController) and its associated [`AbortSignal`](/en-US/docs/Web/API/AbortSignal):

```js
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });

async function logChunks(url, { signal }) {
const response = await fetch(url, signal);
for await (const chunk of response.body) {
// Do something with the chunk
}
}
```

Alternatively, you can exit the loop using `break`, as shown in the code below.
Note that code in the loop is only run when the stream has new data to process, so there may be some delay between the signal being aborted and `break` being called.

```js
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });

async function logChunks(url, { signal }) {
const response = await fetch(url);
for await (const chunk of response.body) {
if (signal.aborted) break; // just break out of loop
// Do something with the chunk
}
}
```

### Example async reader

<!-- most of the code below is deliberately hidden as it is not relevant to the example -->

```js hidden
// A mock push source.
// Used to simulate some random data arriving
class MockPushSource {
// total amount of data to to stream from the push source
static #maxData = 90;
// total data read so far (capped to maxData)
#dataRead = 0;

// Method returning promise when this push source is readable.
dataRequest() {
const result = {
bytesRead: 8,
data: "",
};

return new Promise((resolve) => {
if (this.#dataRead >= MockPushSource.#maxData) {
// Out of data
result.bytesRead = 0;
result.data = "";
resolve(result);
return;
}

// Emulate slow read of data
setTimeout(() => {
const numberBytesReceived = 8;
this.#dataRead += numberBytesReceived;
result.data = MockPushSource.#randomChars();
resolve(result);
}, 500);
});
}

// Dummy close function
close() {
return;
}

// Return random character string
static #randomChars(length = 8) {
let string = "";
const choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";

for (let i = 0; i < length; i++) {
string += choices[Math.floor(Math.random() * choices.length)];
}
return string;
}
}
```

<!-- The following html and js sets up reporting. Hidden because it is not useful for readers -->

```css hidden
.input {
float: left;
width: 50%;
}
.output {
float: right;
width: 50%;
overflow-wrap: break-word;
}
button {
display: block;
}
```

```html hidden
<button>Cancel stream</button>
<div class="input">
<h2>Underlying source</h2>
<ul></ul>
</div>
<div class="output">
<h2>Consumer</h2>
<ul></ul>
</div>
```

```js hidden
// Store reference to lists, paragraph and button
const list1 = document.querySelector(".input ul");
const list2 = document.querySelector(".output ul");
const button = document.querySelector("button");

// Create empty string in which to store final result
let result = "";

// Function to log data from underlying source
function logSource(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list1.appendChild(listItem);
}

// Function to log data from consumer
function logConsumer(result) {
const listItem = document.createElement("li");
listItem.textContent = result;
list2.appendChild(listItem);
}
```

```js hidden
const stream = makePushSourceStream();

function makePushSourceStream() {
const pushSource = new MockPushSource();

return new ReadableStream({
start(controller) {
readRepeatedly().catch((e) => controller.error(e));
function readRepeatedly() {
return pushSource.dataRequest().then((result) => {
if (result.data.length == 0) {
logSource(`No data from source: closing`);
controller.close();
return;
}

logSource(`Enqueue data: ${result.data}`);
controller.enqueue(result.data);
return readRepeatedly();
});
}
},

cancel() {
logSource(`cancel() called on underlying source`);
pushSource.close();
},
});
}
```

```js hidden
// Monkey patch fetch() so it returns a response that is a mocked stream
window.fetch = async (...args) => {
return { body: stream };
};
```

The code below shows a more complete example.
Here the fetch stream is consumed using the iterator inside a try/catch block.
On each iteration of the loop the code simply logs and counts the received bytes.
If there is an error it logs the issue.
The `fetch()` operation can be cancelled using an `AbortSignal`, which would also be logged as an error.

```js
let bytes = 0;

const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });

async function logChunks(url, { signal }) {
try {
const response = await fetch(url, signal);
for await (const chunk of response.body) {
if (signal.aborted) throw signal.reason;
bytes += chunk.length;
logConsumer(`Chunk: ${chunk}. Read ${bytes} characters.`);
}
} catch (e) {
if (e instanceof TypeError) {
console.log(e);
logConsumer("TypeError: Browser may not support async iteration");
} else {
logConsumer(`Error in async iterator: ${e}.`);
}
}
}
```

The example log below shows the code running or reports that your browser does not support async iteration of `ReadableStream`.
The right hand side shows the received chunks; you can press the cancel button to stop the fetch.

> **Note:** This fetch operation is _mocked_ for the purpose of demonstration, and just returns a `ReadableStream` that generates random chunks of text.
> The "Underlying source" on the left below is the data being generated in the mocked source, while the column on the right is log from the consumer.
> (The code for the mocked source is not displayed as it is not relevant to the example.)
{{EmbedLiveSample("Example async reader","100%","400px")}}

## Creating your own custom readable stream

Expand Down

0 comments on commit 0f01eb2

Please sign in to comment.