Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multi agent for ts #300

Merged
merged 32 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
413593b
feat: update question to ask creating multiagent in express
thucpn Sep 18, 2024
622b84b
feat: add express simple multiagent
thucpn Sep 18, 2024
f464b40
fix: import from agent
thucpn Sep 18, 2024
0ebcb9f
Create yellow-jokes-protect.md
marcusschiesser Sep 19, 2024
f43f00a
create workflow with example agents
thucpn Sep 19, 2024
6c05872
remove unused files
thucpn Sep 19, 2024
2c7a538
update doc
thucpn Sep 19, 2024
5daf519
feat: streaming event
thucpn Sep 19, 2024
b875618
fix: streaming final result
thucpn Sep 19, 2024
b030a3d
fix: pipe final streaming result
thucpn Sep 19, 2024
33ce593
feat: funtional calling agent
thucpn Sep 20, 2024
de5ba29
fix: let default max attempt 2
thucpn Sep 20, 2024
aff4f0c
fix lint
thucpn Sep 20, 2024
c4041e2
refactor: move workflow folder to src
thucpn Sep 20, 2024
f659721
refactor: share settings file for ts templates
thucpn Sep 20, 2024
54d74f8
fix: move settings.ts to setting folder
thucpn Sep 20, 2024
d69cd42
refactor: move workflow to components
thucpn Sep 20, 2024
054ee5b
Update templates/components/multiagent/typescript/workflow/index.ts
marcusschiesser Sep 23, 2024
7297edf
create ts multi agent from streaming template
thucpn Sep 23, 2024
3ebc3ec
remove copy express template
thucpn Sep 23, 2024
8cfabc5
enhance streaming and add handle tool call step
thucpn Sep 23, 2024
305296b
update changeset
thucpn Sep 23, 2024
ea3bbcf
refactor: code review
thucpn Sep 25, 2024
325c7ca
fix: coderabbit comment
thucpn Sep 25, 2024
45f7529
enable multiagent ts test
thucpn Sep 25, 2024
234b15e
fix: e2e apptype for nextjs
thucpn Sep 25, 2024
32c3d89
refactor: use context write event instead of append data annotation d…
thucpn Sep 25, 2024
7079b68
fix streaming
marcusschiesser Sep 25, 2024
6ecd5f8
Merge branch 'main' into feat/support-multi-agent-for-ts
marcusschiesser Sep 26, 2024
0679c37
fix: writer is just streaming
marcusschiesser Sep 26, 2024
fa45102
fix: clearly separate streaming events and content and use workflowEv…
marcusschiesser Sep 26, 2024
2fb502e
fix: add correct tool calls for tool messages
marcusschiesser Sep 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/yellow-jokes-protect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
thucpn marked this conversation as resolved.
Show resolved Hide resolved
"create-llama": patch
---

Add multi agents template for Typescript
12 changes: 6 additions & 6 deletions e2e/multiagent_template.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ import type {
} from "../helpers";
import { createTestDir, runCreateLlama, type AppType } from "./utils";

const templateFramework: TemplateFramework = "fastapi";
const templateFramework: TemplateFramework = process.env.FRAMEWORK
? (process.env.FRAMEWORK as TemplateFramework)
: "fastapi";
const dataSource: string = "--example-file";
const templateUI: TemplateUI = "shadcn";
const templatePostInstallAction: TemplatePostInstallAction = "runApp";
const appType: AppType = "--frontend";
const appType: AppType = templateFramework === "nextjs" ? "" : "--frontend";
const userMessage = "Write a blog post about physical standards for letters";

test.describe(`Test multiagent template ${templateFramework} ${dataSource} ${templateUI} ${appType} ${templatePostInstallAction}`, async () => {
test.skip(
process.platform !== "linux" ||
process.env.FRAMEWORK !== "fastapi" ||
process.env.DATASOURCE === "--no-files",
"The multiagent template currently only works with FastAPI and files. We also only run on Linux to speed up tests.",
process.platform !== "linux" || process.env.DATASOURCE === "--no-files",
"The multiagent template currently only works with files. We also only run on Linux to speed up tests.",
);
let port: number;
let externalPort: number;
Expand Down
32 changes: 30 additions & 2 deletions helpers/typescript.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ export const installTSTemplate = async ({
* Copy the template files to the target directory.
*/
console.log("\nInitializing project with template:", template, "\n");
const type = template === "multiagent" ? "streaming" : template; // use nextjs streaming template for multiagent
const templatePath = path.join(templatesDir, "types", type, framework);
const templatePath = path.join(templatesDir, "types", "streaming", framework);
const copySource = ["**"];

await copy(copySource, root, {
Expand Down Expand Up @@ -124,6 +123,30 @@ export const installTSTemplate = async ({
cwd: path.join(compPath, "vectordbs", "typescript", vectorDb ?? "none"),
});

if (template === "multiagent") {
const multiagentPath = path.join(compPath, "multiagent", "typescript");

// copy workflow code for multiagent template
await copy("**", path.join(root, relativeEngineDestPath, "workflow"), {
parents: true,
cwd: path.join(multiagentPath, "workflow"),
});

if (framework === "nextjs") {
// patch route.ts file
await copy("**", path.join(root, relativeEngineDestPath), {
parents: true,
cwd: path.join(multiagentPath, "nextjs"),
});
} else if (framework === "express") {
// patch chat.controller.ts file
await copy("**", path.join(root, relativeEngineDestPath), {
parents: true,
cwd: path.join(multiagentPath, "express"),
});
}
}
thucpn marked this conversation as resolved.
Show resolved Hide resolved

// copy loader component (TS only supports llama_parse and file for now)
const loaderFolder = useLlamaParse ? "llama_parse" : "file";
await copy("**", enginePath, {
Expand All @@ -145,6 +168,11 @@ export const installTSTemplate = async ({
cwd: path.join(compPath, "engines", "typescript", engine),
});

// copy settings to engine folder
await copy("**", enginePath, {
cwd: path.join(compPath, "settings", "typescript"),
});
thucpn marked this conversation as resolved.
Show resolved Hide resolved

/**
* Copy the selected UI files to the target directory and reference it.
*/
Expand Down
5 changes: 1 addition & 4 deletions questions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,7 @@ export const askQuestions = async (
return; // early return - no further questions needed for llamapack projects
}

if (program.template === "multiagent") {
// TODO: multi-agents currently only supports FastAPI
program.framework = preferences.framework = "fastapi";
} else if (program.template === "extractor") {
if (program.template === "extractor") {
// Extractor template only supports FastAPI, empty data sources, and llamacloud
// So we just use example file for extractor template, this allows user to choose vector database later
program.dataSources = [EXAMPLE_FILE];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Message, StreamData, streamToResponse } from "ai";
import { Request, Response } from "express";
import { ChatMessage } from "llamaindex";
import { createStreamTimeout } from "./llamaindex/streaming/events";
import { createWorkflow } from "./workflow/factory";
import { toDataStream } from "./workflow/stream";

thucpn marked this conversation as resolved.
Show resolved Hide resolved
export const chat = async (req: Request, res: Response) => {
const vercelStreamData = new StreamData();
const streamTimeout = createStreamTimeout(vercelStreamData);
try {
const { messages }: { messages: Message[] } = req.body;
const userMessage = messages.pop();
thucpn marked this conversation as resolved.
Show resolved Hide resolved
if (!messages || !userMessage || userMessage.role !== "user") {
thucpn marked this conversation as resolved.
Show resolved Hide resolved
return res.status(400).json({
error:
"messages are required in the request body and the last message must be from the user",
});
}
thucpn marked this conversation as resolved.
Show resolved Hide resolved

const chatHistory = messages as ChatMessage[];
thucpn marked this conversation as resolved.
Show resolved Hide resolved
const agent = createWorkflow(chatHistory);
agent.run(userMessage.content);
thucpn marked this conversation as resolved.
Show resolved Hide resolved
const stream = toDataStream(agent.streamEvents(), vercelStreamData);
return streamToResponse(stream, res, {}, vercelStreamData);
} catch (error) {
console.error("[LlamaIndex]", error);
return res.status(500).json({
detail: (error as Error).message,
});
thucpn marked this conversation as resolved.
Show resolved Hide resolved
} finally {
clearTimeout(streamTimeout);
}
};
75 changes: 75 additions & 0 deletions templates/components/multiagent/typescript/nextjs/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { initObservability } from "@/app/observability";
import { StopEvent } from "@llamaindex/core/workflow";
import { Message, StreamData, StreamingTextResponse } from "ai";
import { ChatMessage, ChatResponseChunk } from "llamaindex";
import { NextRequest, NextResponse } from "next/server";
import { initSettings } from "./engine/settings";
import { createStreamTimeout } from "./llamaindex/streaming/events";
import { createWorkflow } from "./workflow/factory";
import { toDataStream } from "./workflow/stream";
import { AgentRunEvent } from "./workflow/type";

initObservability();
initSettings();

export const runtime = "nodejs";
export const dynamic = "force-dynamic";

export async function POST(request: NextRequest) {
// Init Vercel AI StreamData and timeout
const vercelStreamData = new StreamData();
const streamTimeout = createStreamTimeout(vercelStreamData);

try {
const body = await request.json();
thucpn marked this conversation as resolved.
Show resolved Hide resolved
const { messages, data }: { messages: Message[]; data?: any } = body;
const userMessage = messages.pop();
thucpn marked this conversation as resolved.
Show resolved Hide resolved
if (!messages || !userMessage || userMessage.role !== "user") {
return NextResponse.json(
{
error:
"messages are required in the request body and the last message must be from the user",
},
{ status: 400 },
);
}

const chatHistory = messages as ChatMessage[];
const agent = createWorkflow(chatHistory);
const result = agent.run<AsyncGenerator<ChatResponseChunk>>(
userMessage.content,
);
// start consuming the agent events in the background -> move to stream.ts
void (async () => {
for await (const event of agent.streamEvents()) {
// Add the event to vercelStreamData
if (event instanceof AgentRunEvent) {
const { name, msg } = event.data;
vercelStreamData.appendMessageAnnotation({
type: "agent",
data: { agent: name, text: msg },
});
}
}
})();
// TODO: fix type in agent.run in LITS
marcusschiesser marked this conversation as resolved.
Show resolved Hide resolved
const stream = toDataStream(
result as unknown as Promise<
StopEvent<AsyncGenerator<ChatResponseChunk>>
>,
);
return new StreamingTextResponse(stream, {}, vercelStreamData);
} catch (error) {
console.error("[LlamaIndex]", error);
return NextResponse.json(
{
detail: (error as Error).message,
},
{
status: 500,
},
);
thucpn marked this conversation as resolved.
Show resolved Hide resolved
} finally {
clearTimeout(streamTimeout);
}
}
51 changes: 51 additions & 0 deletions templates/components/multiagent/typescript/workflow/agents.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { ChatMessage, QueryEngineTool } from "llamaindex";
import { getDataSource } from "../engine";
import { FunctionCallingAgent } from "./single-agent";

const getQueryEngineTool = async () => {
const index = await getDataSource();
if (!index) {
throw new Error(
"StorageContext is empty - call 'npm run generate' to generate the storage first.",
);
}

const topK = process.env.TOP_K ? parseInt(process.env.TOP_K) : undefined;
marcusschiesser marked this conversation as resolved.
Show resolved Hide resolved
return new QueryEngineTool({
queryEngine: index.asQueryEngine({
similarityTopK: topK,
}),
metadata: {
name: "query_index",
description: `Use this tool to retrieve information about the text corpus from the index.`,
},
});
};

export const createResearcher = async (chatHistory: ChatMessage[]) => {
return new FunctionCallingAgent({
name: "researcher",
tools: [await getQueryEngineTool()],
systemPrompt:
"You are a researcher agent. You are given a researching task. You must use your tools to complete the research.",
chatHistory,
});
};

export const createWriter = (chatHistory: ChatMessage[]) => {
return new FunctionCallingAgent({
name: "writer",
systemPrompt:
"You are an expert in writing blog posts. You are given a task to write a blog post. Don't make up any information yourself.",
chatHistory,
});
};

export const createReviewer = (chatHistory: ChatMessage[]) => {
return new FunctionCallingAgent({
name: "reviewer",
systemPrompt:
"You are an expert in reviewing blog posts. You are given a task to review a blog post. Review the post for logical inconsistencies, ask critical questions, and provide suggestions for improvement. Furthermore, proofread the post for grammar and spelling errors. Only if the post is good enough for publishing, then you MUST return 'The post is good.'. In all other cases return your review.",
chatHistory,
});
};
133 changes: 133 additions & 0 deletions templates/components/multiagent/typescript/workflow/factory.ts
thucpn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import {
Context,
StartEvent,
StopEvent,
Workflow,
WorkflowEvent,
} from "@llamaindex/core/workflow";
import { ChatMessage, ChatResponseChunk } from "llamaindex";
import { createResearcher, createReviewer, createWriter } from "./agents";
import { AgentInput, AgentRunEvent } from "./type";

const TIMEOUT = 360 * 1000;
const MAX_ATTEMPTS = 2;

class ResearchEvent extends WorkflowEvent<{ input: string }> {}
class WriteEvent extends WorkflowEvent<{
input: string;
isGood: boolean;
}> {}
class ReviewEvent extends WorkflowEvent<{ input: string }> {}

export const createWorkflow = (chatHistory: ChatMessage[]) => {
const runAgent = async (
context: Context,
agent: Workflow,
input: AgentInput,
) => {
const run = agent.run(new StartEvent({ input }));
for await (const event of agent.streamEvents()) {
if (event.data instanceof AgentRunEvent) {
context.writeEventToStream(event.data);
}
}
return await run;
};
thucpn marked this conversation as resolved.
Show resolved Hide resolved

const start = async (context: Context, ev: StartEvent) => {
context.set("task", ev.data.input);
return new ResearchEvent({
input: `Research for this task: ${ev.data.input}`,
});
};

const research = async (context: Context, ev: ResearchEvent) => {
const researcher = await createResearcher(chatHistory);
const researchRes = await runAgent(context, researcher, {
message: ev.data.input,
});
const researchResult = researchRes.data.result;
return new WriteEvent({
input: `Write a blog post given this task: ${context.get("task")} using this research content: ${researchResult}`,
isGood: false,
});
};
thucpn marked this conversation as resolved.
Show resolved Hide resolved

const write = async (context: Context, ev: WriteEvent) => {
context.set("attempts", context.get("attempts", 0) + 1);
const tooManyAttempts = context.get("attempts") > MAX_ATTEMPTS;
if (tooManyAttempts) {
context.writeEventToStream(
new AgentRunEvent({
name: "writer",
msg: `Too many attempts (${MAX_ATTEMPTS}) to write the blog post. Proceeding with the current version.`,
}),
);
}

if (ev.data.isGood || tooManyAttempts) {
// TODO: the text is already written, so we don't need to run the writer again
marcusschiesser marked this conversation as resolved.
Show resolved Hide resolved
const writer = createWriter(chatHistory);
marcusschiesser marked this conversation as resolved.
Show resolved Hide resolved
const writeRes = (await runAgent(context, writer, {
message: ev.data.input,
streaming: true,
})) as unknown as StopEvent<AsyncGenerator<ChatResponseChunk>>;

return writeRes; // stop the workflow
}

const writer = createWriter(chatHistory);
const writeRes = await runAgent(context, writer, {
message: ev.data.input,
});
const writeResult = writeRes.data.result;
context.set("result", writeResult); // store the last result
return new ReviewEvent({ input: writeResult });
};

const review = async (context: Context, ev: ReviewEvent) => {
const reviewer = createReviewer(chatHistory);
const reviewRes = await reviewer.run(
new StartEvent<AgentInput>({ input: { message: ev.data.input } }),
);
const reviewResult = reviewRes.data.result;
const oldContent = context.get("result");
const postIsGood = reviewResult.toLowerCase().includes("post is good");
thucpn marked this conversation as resolved.
Show resolved Hide resolved
context.writeEventToStream(
new AgentRunEvent({
name: "reviewer",
msg: `The post is ${postIsGood ? "" : "not "}good enough for publishing. Sending back to the writer${
postIsGood ? " for publication." : "."
}`,
}),
);
if (postIsGood) {
return new WriteEvent({
input: `You're blog post is ready for publication. Please respond with just the blog post. Blog post: \`\`\`${oldContent}\`\`\``,
isGood: true,
});
}

return new WriteEvent({
input: `Improve the writing of a given blog post by using a given review.
Blog post:
\`\`\`
${oldContent}
\`\`\`

Review:
\`\`\`
${reviewResult}
\`\`\``,
isGood: false,
});
};

const workflow = new Workflow({ timeout: TIMEOUT, validate: true });
workflow.addStep(StartEvent, start, { outputs: ResearchEvent });
workflow.addStep(ResearchEvent, research, { outputs: WriteEvent });
workflow.addStep(WriteEvent, write, { outputs: [ReviewEvent, StopEvent] });
workflow.addStep(ReviewEvent, review, { outputs: WriteEvent });

return workflow;
};
thucpn marked this conversation as resolved.
Show resolved Hide resolved
Loading