Skip to content

Commit

Permalink
split data and assistant stream processing (#3673)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgrammel authored Nov 14, 2024
1 parent a6ff3a7 commit 571469d
Show file tree
Hide file tree
Showing 26 changed files with 789 additions and 367 deletions.
4 changes: 2 additions & 2 deletions content/docs/06-advanced/04-caching.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ This example uses [Vercel KV](https://vercel.com/storage/kv) and Next.js to cach

```tsx filename="app/api/chat/route.ts"
import { openai } from '@ai-sdk/openai';
import { formatStreamPart, streamText } from 'ai';
import { formatDataStreamPart, streamText } from 'ai';
import kv from '@vercel/kv';

// Allow streaming responses up to 30 seconds
Expand All @@ -36,7 +36,7 @@ export async function POST(req: Request) {
// Check if we have a cached response
const cached = await kv.get(key);
if (cached != null) {
return new Response(formatStreamPart('text', cached), {
return new Response(formatDataStreamPart('text', cached), {
status: 200,
headers: { 'Content-Type': 'text/plain' },
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,14 @@ The following methods have been removed from the `streamText` result:
- `pipeAIStreamToResponse`
- `toAIStreamResponse`

### Renamed "formatStreamPart" to "formatDataStreamPart"

The `formatStreamPart` function has been renamed to `formatDataStreamPart`.

### Renamed "parseStreamPart" to "parseDataStreamPart"

The `parseStreamPart` function has been renamed to `parseDataStreamPart`.

### Removed `TokenUsage`, `CompletionTokenUsage` and `EmbeddingTokenUsage` types

The `TokenUsage`, `CompletionTokenUsage` and `EmbeddingTokenUsage` types have been removed.
Expand Down
4 changes: 2 additions & 2 deletions examples/next-openai/app/api/use-chat-cache/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { openai } from '@ai-sdk/openai';
import { formatStreamPart, streamText } from 'ai';
import { formatDataStreamPart, streamText } from 'ai';

// Allow streaming responses up to 30 seconds
export const maxDuration = 30;
Expand All @@ -16,7 +16,7 @@ export async function POST(req: Request) {
// Check if we have a cached response
const cached = cache.get(key);
if (cached != null) {
return new Response(formatStreamPart('text', cached), {
return new Response(formatDataStreamPart('text', cached), {
status: 200,
headers: { 'Content-Type': 'text/plain' },
});
Expand Down
21 changes: 9 additions & 12 deletions packages/ai/core/generate-text/stream-text.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { createIdGenerator } from '@ai-sdk/provider-utils';
import { formatStreamPart } from '@ai-sdk/ui-utils';
import { Span } from '@opentelemetry/api';
import { formatDataStreamPart } from '@ai-sdk/ui-utils';
import { ServerResponse } from 'node:http';
import { InvalidArgumentError } from '../../errors/invalid-argument-error';
import { StreamData } from '../../streams/stream-data';
Expand All @@ -21,13 +20,11 @@ import { selectTelemetryAttributes } from '../telemetry/select-telemetry-attribu
import { TelemetrySettings } from '../telemetry/telemetry-settings';
import { CoreTool } from '../tool';
import {
CallWarning,
CoreToolChoice,
FinishReason,
LanguageModel,
LogProbs,
} from '../types/language-model';
import { LanguageModelRequestMetadata } from '../types/language-model-request-metadata';
import { ProviderMetadata } from '../types/provider-metadata';
import { LanguageModelUsage } from '../types/usage';
import {
Expand Down Expand Up @@ -1074,13 +1071,13 @@ However, the LLM results are expected to be small enough to not cause issues.
const chunkType = chunk.type;
switch (chunkType) {
case 'text-delta': {
controller.enqueue(formatStreamPart('text', chunk.textDelta));
controller.enqueue(formatDataStreamPart('text', chunk.textDelta));
break;
}

case 'tool-call-streaming-start': {
controller.enqueue(
formatStreamPart('tool_call_streaming_start', {
formatDataStreamPart('tool_call_streaming_start', {
toolCallId: chunk.toolCallId,
toolName: chunk.toolName,
}),
Expand All @@ -1090,7 +1087,7 @@ However, the LLM results are expected to be small enough to not cause issues.

case 'tool-call-delta': {
controller.enqueue(
formatStreamPart('tool_call_delta', {
formatDataStreamPart('tool_call_delta', {
toolCallId: chunk.toolCallId,
argsTextDelta: chunk.argsTextDelta,
}),
Expand All @@ -1100,7 +1097,7 @@ However, the LLM results are expected to be small enough to not cause issues.

case 'tool-call': {
controller.enqueue(
formatStreamPart('tool_call', {
formatDataStreamPart('tool_call', {
toolCallId: chunk.toolCallId,
toolName: chunk.toolName,
args: chunk.args,
Expand All @@ -1111,7 +1108,7 @@ However, the LLM results are expected to be small enough to not cause issues.

case 'tool-result': {
controller.enqueue(
formatStreamPart('tool_result', {
formatDataStreamPart('tool_result', {
toolCallId: chunk.toolCallId,
result: chunk.result,
}),
Expand All @@ -1121,14 +1118,14 @@ However, the LLM results are expected to be small enough to not cause issues.

case 'error': {
controller.enqueue(
formatStreamPart('error', getErrorMessage(chunk.error)),
formatDataStreamPart('error', getErrorMessage(chunk.error)),
);
break;
}

case 'step-finish': {
controller.enqueue(
formatStreamPart('finish_step', {
formatDataStreamPart('finish_step', {
finishReason: chunk.finishReason,
usage: sendUsage
? {
Expand All @@ -1144,7 +1141,7 @@ However, the LLM results are expected to be small enough to not cause issues.

case 'finish': {
controller.enqueue(
formatStreamPart('finish_message', {
formatDataStreamPart('finish_message', {
finishReason: chunk.finishReason,
usage: sendUsage
? {
Expand Down
18 changes: 11 additions & 7 deletions packages/ai/streams/assistant-response.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {
AssistantMessage,
DataMessage,
formatStreamPart,
formatAssistantStreamPart,
} from '@ai-sdk/ui-utils';

/**
Expand Down Expand Up @@ -54,19 +54,23 @@ export function AssistantResponse(

const sendMessage = (message: AssistantMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart('assistant_message', message)),
textEncoder.encode(
formatAssistantStreamPart('assistant_message', message),
),
);
};

const sendDataMessage = (message: DataMessage) => {
controller.enqueue(
textEncoder.encode(formatStreamPart('data_message', message)),
textEncoder.encode(
formatAssistantStreamPart('data_message', message),
),
);
};

const sendError = (errorMessage: string) => {
controller.enqueue(
textEncoder.encode(formatStreamPart('error', errorMessage)),
textEncoder.encode(formatAssistantStreamPart('error', errorMessage)),
);
};

Expand All @@ -78,7 +82,7 @@ export function AssistantResponse(
case 'thread.message.created': {
controller.enqueue(
textEncoder.encode(
formatStreamPart('assistant_message', {
formatAssistantStreamPart('assistant_message', {
id: value.data.id,
role: 'assistant',
content: [{ type: 'text', text: { value: '' } }],
Expand All @@ -94,7 +98,7 @@ export function AssistantResponse(
if (content?.type === 'text' && content.text?.value != null) {
controller.enqueue(
textEncoder.encode(
formatStreamPart('text', content.text.value),
formatAssistantStreamPart('text', content.text.value),
),
);
}
Expand All @@ -116,7 +120,7 @@ export function AssistantResponse(
// send the threadId and messageId as the first message:
controller.enqueue(
textEncoder.encode(
formatStreamPart('assistant_control_data', {
formatAssistantStreamPart('assistant_control_data', {
threadId,
messageId,
}),
Expand Down
26 changes: 14 additions & 12 deletions packages/ai/streams/index.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
// forwarding exports from ui-utils:
export {
formatStreamPart,
parseStreamPart,
formatAssistantStreamPart,
formatDataStreamPart,
parseAssistantStreamPart,
parseDataStreamPart,
processDataProtocolResponse,
processDataStream,
processTextStream,
processDataProtocolResponse,
} from '@ai-sdk/ui-utils';
export type {
AssistantStatus,
UseAssistantOptions,
Message,
CreateMessage,
DataMessage,
AssistantMessage,
JSONValue,
AssistantStatus,
Attachment,
ChatRequest,
ChatRequestOptions,
ToolInvocation,
StreamPart,
CreateMessage,
DataMessage,
DataStreamPart,
IdGenerator,
JSONValue,
Message,
RequestOptions,
Attachment,
ToolInvocation,
UseAssistantOptions,
} from '@ai-sdk/ui-utils';

export { generateId } from '@ai-sdk/provider-utils';
Expand Down
8 changes: 4 additions & 4 deletions packages/ai/streams/stream-data.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { JSONValue, formatStreamPart } from '@ai-sdk/ui-utils';
import { JSONValue, formatDataStreamPart } from '@ai-sdk/ui-utils';
import { HANGING_STREAM_WARNING_TIME_MS } from '../util/constants';

/**
Expand Down Expand Up @@ -66,7 +66,7 @@ export class StreamData {
}

this.controller.enqueue(
this.encoder.encode(formatStreamPart('data', [value])),
this.encoder.encode(formatDataStreamPart('data', [value])),
);
}

Expand All @@ -80,7 +80,7 @@ export class StreamData {
}

this.controller.enqueue(
this.encoder.encode(formatStreamPart('message_annotations', [value])),
this.encoder.encode(formatDataStreamPart('message_annotations', [value])),
);
}
}
Expand All @@ -95,7 +95,7 @@ export function createStreamDataTransformer() {
return new TransformStream({
transform: async (chunk, controller) => {
const message = decoder.decode(chunk);
controller.enqueue(encoder.encode(formatStreamPart('text', message)));
controller.enqueue(encoder.encode(formatDataStreamPart('text', message)));
},
});
}
4 changes: 2 additions & 2 deletions packages/react/src/use-assistant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
Message,
UseAssistantOptions,
generateId,
processDataStream,
processAssistantStream,
} from '@ai-sdk/ui-utils';
import { useCallback, useRef, useState } from 'react';

Expand Down Expand Up @@ -176,7 +176,7 @@ export function useAssistant({
throw new Error('The response body is empty.');
}

await processDataStream({
await processAssistantStream({
stream: response.body,
onStreamPart: async ({ type, value }) => {
switch (type) {
Expand Down
22 changes: 11 additions & 11 deletions packages/react/src/use-assistant.ui.test.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { formatStreamPart } from '@ai-sdk/ui-utils';
import { formatAssistantStreamPart } from '@ai-sdk/ui-utils';
import {
mockFetchDataStream,
mockFetchDataStreamWithGenerator,
Expand Down Expand Up @@ -49,11 +49,11 @@ describe('stream data stream', () => {
const { requestBody } = mockFetchDataStream({
url: 'https://example.com/api/assistant',
chunks: [
formatStreamPart('assistant_control_data', {
formatAssistantStreamPart('assistant_control_data', {
threadId: 't0',
messageId: 'm0',
}),
formatStreamPart('assistant_message', {
formatAssistantStreamPart('assistant_message', {
id: 'm0',
role: 'assistant',
content: [{ type: 'text', text: { value: '' } }],
Expand Down Expand Up @@ -109,14 +109,14 @@ describe('stream data stream', () => {
const encoder = new TextEncoder();

yield encoder.encode(
formatStreamPart('assistant_control_data', {
formatAssistantStreamPart('assistant_control_data', {
threadId: 't0',
messageId: 'm1',
}),
);

yield encoder.encode(
formatStreamPart('assistant_message', {
formatAssistantStreamPart('assistant_message', {
id: 'm1',
role: 'assistant',
content: [{ type: 'text', text: { value: '' } }],
Expand Down Expand Up @@ -203,11 +203,11 @@ describe('thread management', () => {
const { requestBody } = mockFetchDataStream({
url: 'https://example.com/api/assistant',
chunks: [
formatStreamPart('assistant_control_data', {
formatAssistantStreamPart('assistant_control_data', {
threadId: 't0',
messageId: 'm0',
}),
formatStreamPart('assistant_message', {
formatAssistantStreamPart('assistant_message', {
id: 'm0',
role: 'assistant',
content: [{ type: 'text', text: { value: '' } }],
Expand Down Expand Up @@ -250,11 +250,11 @@ describe('thread management', () => {
const { requestBody } = mockFetchDataStream({
url: 'https://example.com/api/assistant',
chunks: [
formatStreamPart('assistant_control_data', {
formatAssistantStreamPart('assistant_control_data', {
threadId: 't1',
messageId: 'm0',
}),
formatStreamPart('assistant_message', {
formatAssistantStreamPart('assistant_message', {
id: 'm0',
role: 'assistant',
content: [{ type: 'text', text: { value: '' } }],
Expand Down Expand Up @@ -297,11 +297,11 @@ describe('thread management', () => {
const { requestBody } = mockFetchDataStream({
url: 'https://example.com/api/assistant',
chunks: [
formatStreamPart('assistant_control_data', {
formatAssistantStreamPart('assistant_control_data', {
threadId: 't3',
messageId: 'm0',
}),
formatStreamPart('assistant_message', {
formatAssistantStreamPart('assistant_message', {
id: 'm0',
role: 'assistant',
content: [{ type: 'text', text: { value: '' } }],
Expand Down
Loading

0 comments on commit 571469d

Please sign in to comment.