From 435109fef09c95d99ce0739f7b181e87302b42a4 Mon Sep 17 00:00:00 2001 From: Marcus Schiesser Date: Thu, 5 Sep 2024 12:13:39 +0700 Subject: [PATCH] feat: add multi-agents template based on workflows (#271) --------- Co-authored-by: Thuc Pham <51660321+thucpn@users.noreply.github.com> --- .changeset/tall-ties-own.md | 5 + e2e/multiagent_template.spec.ts | 85 +++++ helpers/run-app.ts | 2 +- helpers/typescript.ts | 3 +- questions.ts | 44 ++- .../multiagent/fastapi/README-template.md | 47 ++- .../fastapi/app/agents/dummy/agent.py | 33 -- .../multiagent/fastapi/app/agents/multi.py | 83 +++++ .../multiagent/fastapi/app/agents/planner.py | 328 ++++++++++++++++++ .../fastapi/app/agents/query_engine/agent.py | 55 --- .../multiagent/fastapi/app/agents/single.py | 245 +++++++++++++ .../multiagent/fastapi/app/api/__init__.py | 0 .../fastapi/app/api/routers/__init__.py | 0 .../fastapi/app/api/routers/chat.py | 43 +++ .../fastapi/app/api/routers/chat_config.py | 48 +++ .../fastapi/app/api/routers/models.py | 227 ++++++++++++ .../fastapi/app/api/routers/upload.py | 29 ++ .../app/api/routers/vercel_response.py | 100 ++++++ .../fastapi/app/api/services/file.py | 119 +++++++ .../fastapi/app/api/services/suggestion.py | 60 ++++ .../types/multiagent/fastapi/app/config.py | 1 + .../fastapi/app/core/control_plane.py | 19 - .../fastapi/app/core/message_queue.py | 12 - .../fastapi/app/core/task_result.py | 88 ----- .../fastapi/app/examples/choreography.py | 25 ++ .../fastapi/app/examples/factory.py | 29 ++ .../fastapi/app/examples/orchestrator.py | 27 ++ .../fastapi/app/examples/researcher.py | 39 +++ .../fastapi/app/examples/workflow.py | 139 ++++++++ .../multiagent/fastapi/app/observability.py | 2 + templates/types/multiagent/fastapi/gitignore | 4 + templates/types/multiagent/fastapi/main.py | 84 +++-- .../types/multiagent/fastapi/pyproject.toml | 17 +- .../chat/chat-message/chat-agent-events.tsx | 152 ++++++++ .../components/ui/chat/chat-message/index.tsx | 16 + .../nextjs/app/components/ui/chat/index.ts | 8 +- .../types/streaming/nextjs/app/globals.css | 14 + 37 files changed, 1960 insertions(+), 272 deletions(-) create mode 100644 .changeset/tall-ties-own.md create mode 100644 e2e/multiagent_template.spec.ts delete mode 100644 templates/types/multiagent/fastapi/app/agents/dummy/agent.py create mode 100644 templates/types/multiagent/fastapi/app/agents/multi.py create mode 100644 templates/types/multiagent/fastapi/app/agents/planner.py delete mode 100644 templates/types/multiagent/fastapi/app/agents/query_engine/agent.py create mode 100644 templates/types/multiagent/fastapi/app/agents/single.py create mode 100644 templates/types/multiagent/fastapi/app/api/__init__.py create mode 100644 templates/types/multiagent/fastapi/app/api/routers/__init__.py create mode 100644 templates/types/multiagent/fastapi/app/api/routers/chat.py create mode 100644 templates/types/multiagent/fastapi/app/api/routers/chat_config.py create mode 100644 templates/types/multiagent/fastapi/app/api/routers/models.py create mode 100644 templates/types/multiagent/fastapi/app/api/routers/upload.py create mode 100644 templates/types/multiagent/fastapi/app/api/routers/vercel_response.py create mode 100644 templates/types/multiagent/fastapi/app/api/services/file.py create mode 100644 templates/types/multiagent/fastapi/app/api/services/suggestion.py create mode 100644 templates/types/multiagent/fastapi/app/config.py delete mode 100644 templates/types/multiagent/fastapi/app/core/control_plane.py delete mode 100644 templates/types/multiagent/fastapi/app/core/message_queue.py delete mode 100644 templates/types/multiagent/fastapi/app/core/task_result.py create mode 100644 templates/types/multiagent/fastapi/app/examples/choreography.py create mode 100644 templates/types/multiagent/fastapi/app/examples/factory.py create mode 100644 templates/types/multiagent/fastapi/app/examples/orchestrator.py create mode 100644 templates/types/multiagent/fastapi/app/examples/researcher.py create mode 100644 templates/types/multiagent/fastapi/app/examples/workflow.py create mode 100644 templates/types/multiagent/fastapi/app/observability.py create mode 100644 templates/types/multiagent/fastapi/gitignore create mode 100644 templates/types/streaming/nextjs/app/components/ui/chat/chat-message/chat-agent-events.tsx diff --git a/.changeset/tall-ties-own.md b/.changeset/tall-ties-own.md new file mode 100644 index 00000000..92fd62b1 --- /dev/null +++ b/.changeset/tall-ties-own.md @@ -0,0 +1,5 @@ +--- +"create-llama": patch +--- + +Add chat agent events UI diff --git a/e2e/multiagent_template.spec.ts b/e2e/multiagent_template.spec.ts new file mode 100644 index 00000000..c69e34c2 --- /dev/null +++ b/e2e/multiagent_template.spec.ts @@ -0,0 +1,85 @@ +/* eslint-disable turbo/no-undeclared-env-vars */ +import { expect, test } from "@playwright/test"; +import { ChildProcess } from "child_process"; +import fs from "fs"; +import path from "path"; +import type { + TemplateFramework, + TemplatePostInstallAction, + TemplateUI, +} from "../helpers"; +import { createTestDir, runCreateLlama, type AppType } from "./utils"; + +const templateFramework: TemplateFramework = "fastapi"; +const dataSource: string = "--example-file"; +const templateUI: TemplateUI = "shadcn"; +const templatePostInstallAction: TemplatePostInstallAction = "runApp"; +const appType: AppType = "--frontend"; +const userMessage = "Write a blog post about physical standards for letters"; + +test.describe(`Test multiagent template ${templateFramework} ${dataSource} ${templateUI} ${appType} ${templatePostInstallAction}`, async () => { + test.skip( + process.platform !== "linux" || + process.env.FRAMEWORK !== "fastapi" || + process.env.DATASOURCE === "--no-files", + "The multiagent template currently only works with FastAPI and files. We also only run on Linux to speed up tests.", + ); + let port: number; + let externalPort: number; + let cwd: string; + let name: string; + let appProcess: ChildProcess; + // Only test without using vector db for now + const vectorDb = "none"; + + test.beforeAll(async () => { + port = Math.floor(Math.random() * 10000) + 10000; + externalPort = port + 1; + cwd = await createTestDir(); + const result = await runCreateLlama( + cwd, + "multiagent", + templateFramework, + dataSource, + vectorDb, + port, + externalPort, + templatePostInstallAction, + templateUI, + appType, + ); + name = result.projectName; + appProcess = result.appProcess; + }); + + test("App folder should exist", async () => { + const dirExists = fs.existsSync(path.join(cwd, name)); + expect(dirExists).toBeTruthy(); + }); + + test("Frontend should have a title", async ({ page }) => { + await page.goto(`http://localhost:${port}`); + await expect(page.getByText("Built by LlamaIndex")).toBeVisible(); + }); + + test("Frontend should be able to submit a message and receive the start of a streamed response", async ({ + page, + }) => { + await page.goto(`http://localhost:${port}`); + await page.fill("form input", userMessage); + + const responsePromise = page.waitForResponse((res) => + res.url().includes("/api/chat"), + ); + + await page.click("form button[type=submit]"); + + const response = await responsePromise; + expect(response.ok()).toBeTruthy(); + }); + + // clean processes + test.afterAll(async () => { + appProcess?.kill(); + }); +}); diff --git a/helpers/run-app.ts b/helpers/run-app.ts index 3d55b568..2ec4e762 100644 --- a/helpers/run-app.ts +++ b/helpers/run-app.ts @@ -81,7 +81,7 @@ export async function runApp( if (template === "extractor") { processes.push(runReflexApp(appPath, port, externalPort)); } - if (template === "streaming") { + if (template === "streaming" || template === "multiagent") { if (framework === "fastapi" || framework === "express") { const backendRunner = framework === "fastapi" ? runFastAPIApp : runTSApp; if (frontend) { diff --git a/helpers/typescript.ts b/helpers/typescript.ts index 7becd8e7..f818a1d3 100644 --- a/helpers/typescript.ts +++ b/helpers/typescript.ts @@ -33,7 +33,8 @@ export const installTSTemplate = async ({ * Copy the template files to the target directory. */ console.log("\nInitializing project with template:", template, "\n"); - const templatePath = path.join(templatesDir, "types", template, framework); + const type = template === "multiagent" ? "streaming" : template; // use nextjs streaming template for multiagent + const templatePath = path.join(templatesDir, "types", type, framework); const copySource = ["**"]; await copy(copySource, root, { diff --git a/questions.ts b/questions.ts index 4b1f61f3..5e855cf9 100644 --- a/questions.ts +++ b/questions.ts @@ -287,27 +287,25 @@ export const askQuestions = async ( }, ]; - if (program.template !== "multiagent") { - const modelConfigured = - !program.llamapack && program.modelConfig.isConfigured(); - // If using LlamaParse, require LlamaCloud API key - const llamaCloudKeyConfigured = program.useLlamaParse - ? program.llamaCloudKey || process.env["LLAMA_CLOUD_API_KEY"] - : true; - const hasVectorDb = program.vectorDb && program.vectorDb !== "none"; - // Can run the app if all tools do not require configuration - if ( - !hasVectorDb && - modelConfigured && - llamaCloudKeyConfigured && - !toolsRequireConfig(program.tools) - ) { - actionChoices.push({ - title: - "Generate code, install dependencies, and run the app (~2 min)", - value: "runApp", - }); - } + const modelConfigured = + !program.llamapack && program.modelConfig.isConfigured(); + // If using LlamaParse, require LlamaCloud API key + const llamaCloudKeyConfigured = program.useLlamaParse + ? program.llamaCloudKey || process.env["LLAMA_CLOUD_API_KEY"] + : true; + const hasVectorDb = program.vectorDb && program.vectorDb !== "none"; + // Can run the app if all tools do not require configuration + if ( + !hasVectorDb && + modelConfigured && + llamaCloudKeyConfigured && + !toolsRequireConfig(program.tools) + ) { + actionChoices.push({ + title: + "Generate code, install dependencies, and run the app (~2 min)", + value: "runApp", + }); } const { action } = await prompts( @@ -341,7 +339,7 @@ export const askQuestions = async ( choices: [ { title: "Agentic RAG (e.g. chat with docs)", value: "streaming" }, { - title: "Multi-agent app (using llama-agents)", + title: "Multi-agent app (using workflows)", value: "multiagent", }, { title: "Structured Extractor", value: "extractor" }, @@ -448,7 +446,7 @@ export const askQuestions = async ( if ( (program.framework === "express" || program.framework === "fastapi") && - program.template === "streaming" + (program.template === "streaming" || program.template === "multiagent") ) { // if a backend-only framework is selected, ask whether we should create a frontend if (program.frontend === undefined) { diff --git a/templates/types/multiagent/fastapi/README-template.md b/templates/types/multiagent/fastapi/README-template.md index b75c3c6b..761f19a4 100644 --- a/templates/types/multiagent/fastapi/README-template.md +++ b/templates/types/multiagent/fastapi/README-template.md @@ -1,4 +1,18 @@ -This is a [LlamaIndex](https://www.llamaindex.ai/) project using [FastAPI](https://fastapi.tiangolo.com/) bootstrapped with [`create-llama`](https://github.com/run-llama/LlamaIndexTS/tree/main/packages/create-llama). +This is a [LlamaIndex](https://www.llamaindex.ai/) multi-agents project using [Workflows](https://docs.llamaindex.ai/en/stable/understanding/workflows/). + +## Overview + +This example is using three agents to generate a blog post: + +- a researcher that retrieves content via a RAG pipeline, +- a writer that specializes in writing blog posts and +- a reviewer that is reviewing the blog post. + +There are three different methods how the agents can interact to reach their goal: + +1. [Choreography](./app/examples/choreography.py) - the agents decide themselves to delegate a task to another agent +1. [Orchestrator](./app/examples/orchestrator.py) - a central orchestrator decides which agent should execute a task +1. [Explicit Workflow](./app/examples/workflow.py) - a pre-defined workflow specific for the task is used to execute the tasks ## Getting Started @@ -8,43 +22,48 @@ First, setup the environment with poetry: ```shell poetry install -poetry shell ``` Then check the parameters that have been pre-configured in the `.env` file in this directory. (E.g. you might need to configure an `OPENAI_API_KEY` if you're using OpenAI as model provider). -Second, generate the embeddings of the documents in the `./data` directory (if this folder exists - otherwise, skip this step): +Second, generate the embeddings of the documents in the `./data` directory: ```shell poetry run generate ``` -Third, run all the services in one command: +Third, run the development server: ```shell poetry run python main.py ``` -You can monitor and test the agent services with `llama-agents` monitor TUI: +Per default, the example is using the explicit workflow. You can change the example by setting the `EXAMPLE_TYPE` environment variable to `choreography` or `orchestrator`. -```shell -poetry run llama-agents monitor --control-plane-url http://127.0.0.1:8001 +The example provides one streaming API endpoint `/api/chat`. +You can test the endpoint with the following curl request: + +``` +curl --location 'localhost:8000/api/chat' \ +--header 'Content-Type: application/json' \ +--data '{ "messages": [{ "role": "user", "content": "Write a blog post about physical standards for letters" }] }' ``` -## Services: +You can start editing the API by modifying `app/api/routers/chat.py` or `app/examples/workflow.py`. The API auto-updates as you save the files. + +Open [http://localhost:8000/docs](http://localhost:8000/docs) with your browser to see the Swagger UI of the API. -- Message queue (port 8000): To exchange the message between services -- Control plane (port 8001): A gateway to manage the tasks and services. -- Human consumer (port 8002): To handle result when the task is completed. -- Agent service `query_engine` (port 8003): Agent that can query information from the configured LlamaIndex index. -- Agent service `dummy_agent` (port 8004): A dummy agent that does nothing. Good starting point to add more agents. +The API allows CORS for all origins to simplify development. You can change this behavior by setting the `ENVIRONMENT` environment variable to `prod`: -The ports listed above are set by default, but you can change them in the `.env` file. +``` +ENVIRONMENT=prod poetry run python main.py +``` ## Learn More To learn more about LlamaIndex, take a look at the following resources: - [LlamaIndex Documentation](https://docs.llamaindex.ai) - learn about LlamaIndex. +- [Workflows Introduction](https://docs.llamaindex.ai/en/stable/understanding/workflows/) - learn about LlamaIndex workflows. You can check out [the LlamaIndex GitHub repository](https://github.com/run-llama/llama_index) - your feedback and contributions are welcome! diff --git a/templates/types/multiagent/fastapi/app/agents/dummy/agent.py b/templates/types/multiagent/fastapi/app/agents/dummy/agent.py deleted file mode 100644 index dde7fa38..00000000 --- a/templates/types/multiagent/fastapi/app/agents/dummy/agent.py +++ /dev/null @@ -1,33 +0,0 @@ -from llama_agents import AgentService, SimpleMessageQueue -from llama_index.core.agent import FunctionCallingAgentWorker -from llama_index.core.tools import FunctionTool -from llama_index.core.settings import Settings -from app.utils import load_from_env - - -DEFAULT_DUMMY_AGENT_DESCRIPTION = "I'm a dummy agent which does nothing." - - -def dummy_function(): - """ - This function does nothing. - """ - return "" - - -def init_dummy_agent(message_queue: SimpleMessageQueue) -> AgentService: - agent = FunctionCallingAgentWorker( - tools=[FunctionTool.from_defaults(fn=dummy_function)], - llm=Settings.llm, - prefix_messages=[], - ).as_agent() - - return AgentService( - service_name="dummy_agent", - agent=agent, - message_queue=message_queue.client, - description=load_from_env("AGENT_DUMMY_DESCRIPTION", throw_error=False) - or DEFAULT_DUMMY_AGENT_DESCRIPTION, - host=load_from_env("AGENT_DUMMY_HOST", throw_error=False) or "127.0.0.1", - port=int(load_from_env("AGENT_DUMMY_PORT")), - ) diff --git a/templates/types/multiagent/fastapi/app/agents/multi.py b/templates/types/multiagent/fastapi/app/agents/multi.py new file mode 100644 index 00000000..adbc10f7 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/agents/multi.py @@ -0,0 +1,83 @@ +import asyncio +from typing import Any, List + +from llama_index.core.tools.types import ToolMetadata, ToolOutput +from llama_index.core.tools.utils import create_schema_from_function +from llama_index.core.workflow import Context, Workflow + +from app.agents.single import ( + AgentRunResult, + ContextAwareTool, + FunctionCallingAgent, +) +from app.agents.planner import StructuredPlannerAgent + + +class AgentCallTool(ContextAwareTool): + def __init__(self, agent: Workflow) -> None: + self.agent = agent + name = f"call_{agent.name}" + + async def schema_call(input: str) -> str: + pass + + # create the schema without the Context + fn_schema = create_schema_from_function(name, schema_call) + self._metadata = ToolMetadata( + name=name, + description=( + f"Use this tool to delegate a sub task to the {agent.name} agent." + + (f" The agent is an {agent.role}." if agent.role else "") + ), + fn_schema=fn_schema, + ) + + # overload the acall function with the ctx argument as it's needed for bubbling the events + async def acall(self, ctx: Context, input: str) -> ToolOutput: + task = asyncio.create_task(self.agent.run(input=input)) + # bubble all events while running the agent to the calling agent + async for ev in self.agent.stream_events(): + ctx.write_event_to_stream(ev) + ret: AgentRunResult = await task + response = ret.response.message.content + return ToolOutput( + content=str(response), + tool_name=self.metadata.name, + raw_input={"args": input, "kwargs": {}}, + raw_output=response, + ) + + +class AgentCallingAgent(FunctionCallingAgent): + def __init__( + self, + *args: Any, + name: str, + agents: List[FunctionCallingAgent] | None = None, + **kwargs: Any, + ) -> None: + agents = agents or [] + tools = [AgentCallTool(agent=agent) for agent in agents] + super().__init__(*args, name=name, tools=tools, **kwargs) + # call add_workflows so agents will get detected by llama agents automatically + self.add_workflows(**{agent.name: agent for agent in agents}) + + +class AgentOrchestrator(StructuredPlannerAgent): + def __init__( + self, + *args: Any, + name: str = "orchestrator", + agents: List[FunctionCallingAgent] | None = None, + **kwargs: Any, + ) -> None: + agents = agents or [] + tools = [AgentCallTool(agent=agent) for agent in agents] + super().__init__( + *args, + name=name, + tools=tools, + **kwargs, + ) + # call add_workflows so agents will get detected by llama agents automatically + self.add_workflows(**{agent.name: agent for agent in agents}) diff --git a/templates/types/multiagent/fastapi/app/agents/planner.py b/templates/types/multiagent/fastapi/app/agents/planner.py new file mode 100644 index 00000000..8a72def6 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/agents/planner.py @@ -0,0 +1,328 @@ +import asyncio +import uuid +from enum import Enum +from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union + +from llama_index.core.agent.runner.planner import ( + DEFAULT_INITIAL_PLAN_PROMPT, + DEFAULT_PLAN_REFINE_PROMPT, + Plan, + PlannerAgentState, + SubTask, +) +from llama_index.core.bridge.pydantic import ValidationError +from llama_index.core.llms.function_calling import FunctionCallingLLM +from llama_index.core.prompts import PromptTemplate +from llama_index.core.settings import Settings +from llama_index.core.tools import BaseTool +from llama_index.core.workflow import ( + Context, + Event, + StartEvent, + StopEvent, + Workflow, + step, +) + +from app.agents.single import AgentRunEvent, AgentRunResult, FunctionCallingAgent + + +class ExecutePlanEvent(Event): + pass + + +class SubTaskEvent(Event): + sub_task: SubTask + + +class SubTaskResultEvent(Event): + sub_task: SubTask + result: AgentRunResult | AsyncGenerator + + +class PlanEventType(Enum): + CREATED = "created" + REFINED = "refined" + + +class PlanEvent(AgentRunEvent): + event_type: PlanEventType + plan: Plan + + @property + def msg(self) -> str: + sub_task_names = ", ".join(task.name for task in self.plan.sub_tasks) + return f"Plan {self.event_type.value}: Let's do: {sub_task_names}" + + +class StructuredPlannerAgent(Workflow): + def __init__( + self, + *args: Any, + name: str, + llm: FunctionCallingLLM | None = None, + tools: List[BaseTool] | None = None, + timeout: float = 360.0, + refine_plan: bool = False, + **kwargs: Any, + ) -> None: + super().__init__(*args, timeout=timeout, **kwargs) + self.name = name + self.refine_plan = refine_plan + + self.tools = tools or [] + self.planner = Planner(llm=llm, tools=self.tools, verbose=self._verbose) + # The executor is keeping the memory of all tool calls and decides to call the right tool for the task + self.executor = FunctionCallingAgent( + name="executor", + llm=llm, + tools=self.tools, + write_events=False, + # it's important to instruct to just return the tool call, otherwise the executor will interpret and change the result + system_prompt="You are an expert in completing given tasks by calling the right tool for the task. Just return the result of the tool call. Don't add any information yourself", + ) + self.add_workflows(executor=self.executor) + + @step() + async def create_plan( + self, ctx: Context, ev: StartEvent + ) -> ExecutePlanEvent | StopEvent: + # set streaming + ctx.data["streaming"] = getattr(ev, "streaming", False) + ctx.data["task"] = ev.input + + plan_id, plan = await self.planner.create_plan(input=ev.input) + ctx.data["act_plan_id"] = plan_id + + # inform about the new plan + ctx.write_event_to_stream( + PlanEvent(name=self.name, event_type=PlanEventType.CREATED, plan=plan) + ) + if self._verbose: + print("=== Executing plan ===\n") + return ExecutePlanEvent() + + @step() + async def execute_plan(self, ctx: Context, ev: ExecutePlanEvent) -> SubTaskEvent: + upcoming_sub_tasks = self.planner.state.get_next_sub_tasks( + ctx.data["act_plan_id"] + ) + + ctx.data["num_sub_tasks"] = len(upcoming_sub_tasks) + # send an event per sub task + events = [SubTaskEvent(sub_task=sub_task) for sub_task in upcoming_sub_tasks] + for event in events: + ctx.send_event(event) + + return None + + @step() + async def execute_sub_task( + self, ctx: Context, ev: SubTaskEvent + ) -> SubTaskResultEvent: + if self._verbose: + print(f"=== Executing sub task: {ev.sub_task.name} ===") + is_last_tasks = ctx.data["num_sub_tasks"] == self.get_remaining_subtasks(ctx) + # TODO: streaming only works without plan refining + streaming = is_last_tasks and ctx.data["streaming"] and not self.refine_plan + task = asyncio.create_task( + self.executor.run( + input=ev.sub_task.input, + streaming=streaming, + ) + ) + # bubble all events while running the executor to the planner + async for event in self.executor.stream_events(): + ctx.write_event_to_stream(event) + result = await task + if self._verbose: + print("=== Done executing sub task ===\n") + self.planner.state.add_completed_sub_task(ctx.data["act_plan_id"], ev.sub_task) + return SubTaskResultEvent(sub_task=ev.sub_task, result=result) + + @step() + async def gather_results( + self, ctx: Context, ev: SubTaskResultEvent + ) -> ExecutePlanEvent | StopEvent: + # wait for all sub tasks to finish + num_sub_tasks = ctx.data["num_sub_tasks"] + results = ctx.collect_events(ev, [SubTaskResultEvent] * num_sub_tasks) + if results is None: + return None + + upcoming_sub_tasks = self.get_upcoming_sub_tasks(ctx) + # if no more tasks to do, stop workflow and send result of last step + if upcoming_sub_tasks == 0: + return StopEvent(result=results[-1].result) + + if self.refine_plan: + # store all results for refining the plan + ctx.data["results"] = ctx.data.get("results", {}) + for result in results: + ctx.data["results"][result.sub_task.name] = result.result + + new_plan = await self.planner.refine_plan( + ctx.data["task"], ctx.data["act_plan_id"], ctx.data["results"] + ) + # inform about the new plan + if new_plan is not None: + ctx.write_event_to_stream( + PlanEvent( + name=self.name, event_type=PlanEventType.REFINED, plan=new_plan + ) + ) + + # continue executing plan + return ExecutePlanEvent() + + def get_upcoming_sub_tasks(self, ctx: Context): + upcoming_sub_tasks = self.planner.state.get_next_sub_tasks( + ctx.data["act_plan_id"] + ) + return len(upcoming_sub_tasks) + + def get_remaining_subtasks(self, ctx: Context): + remaining_subtasks = self.planner.state.get_remaining_subtasks( + ctx.data["act_plan_id"] + ) + return len(remaining_subtasks) + + +# Concern dealing with creating and refining a plan, extracted from https://github.com/run-llama/llama_index/blob/main/llama-index-core/llama_index/core/agent/runner/planner.py#L138 +class Planner: + def __init__( + self, + llm: FunctionCallingLLM | None = None, + tools: List[BaseTool] | None = None, + initial_plan_prompt: Union[str, PromptTemplate] = DEFAULT_INITIAL_PLAN_PROMPT, + plan_refine_prompt: Union[str, PromptTemplate] = DEFAULT_PLAN_REFINE_PROMPT, + verbose: bool = True, + ) -> None: + if llm is None: + llm = Settings.llm + self.llm = llm + assert self.llm.metadata.is_function_calling_model + + self.tools = tools or [] + self.state = PlannerAgentState() + self.verbose = verbose + + if isinstance(initial_plan_prompt, str): + initial_plan_prompt = PromptTemplate(initial_plan_prompt) + self.initial_plan_prompt = initial_plan_prompt + + if isinstance(plan_refine_prompt, str): + plan_refine_prompt = PromptTemplate(plan_refine_prompt) + self.plan_refine_prompt = plan_refine_prompt + + async def create_plan(self, input: str) -> Tuple[str, Plan]: + tools = self.tools + tools_str = "" + for tool in tools: + tools_str += tool.metadata.name + ": " + tool.metadata.description + "\n" + + try: + plan = await self.llm.astructured_predict( + Plan, + self.initial_plan_prompt, + tools_str=tools_str, + task=input, + ) + except (ValueError, ValidationError): + if self.verbose: + print("No complex plan predicted. Defaulting to a single task plan.") + plan = Plan( + sub_tasks=[ + SubTask( + name="default", input=input, expected_output="", dependencies=[] + ) + ] + ) + + if self.verbose: + print("=== Initial plan ===") + for sub_task in plan.sub_tasks: + print( + f"{sub_task.name}:\n{sub_task.input} -> {sub_task.expected_output}\ndeps: {sub_task.dependencies}\n\n" + ) + + plan_id = str(uuid.uuid4()) + self.state.plan_dict[plan_id] = plan + + return plan_id, plan + + async def refine_plan( + self, + input: str, + plan_id: str, + completed_sub_tasks: Dict[str, str], + ) -> Optional[Plan]: + """Refine a plan.""" + prompt_args = self.get_refine_plan_prompt_kwargs( + plan_id, input, completed_sub_tasks + ) + + try: + new_plan = await self.llm.astructured_predict( + Plan, self.plan_refine_prompt, **prompt_args + ) + + self._update_plan(plan_id, new_plan) + + return new_plan + except (ValueError, ValidationError) as e: + # likely no new plan predicted + if self.verbose: + print(f"No new plan predicted: {e}") + return None + + def _update_plan(self, plan_id: str, new_plan: Plan) -> None: + """Update the plan.""" + # update state with new plan + self.state.plan_dict[plan_id] = new_plan + + if self.verbose: + print("=== Refined plan ===") + for sub_task in new_plan.sub_tasks: + print( + f"{sub_task.name}:\n{sub_task.input} -> {sub_task.expected_output}\ndeps: {sub_task.dependencies}\n\n" + ) + + def get_refine_plan_prompt_kwargs( + self, + plan_id: str, + task: str, + completed_sub_task: Dict[str, str], + ) -> dict: + """Get the refine plan prompt.""" + # gather completed sub-tasks and response pairs + completed_outputs_str = "" + for sub_task_name, task_output in completed_sub_task.items(): + task_str = f"{sub_task_name}:\n" f"\t{task_output!s}\n" + completed_outputs_str += task_str + + # get a string for the remaining sub-tasks + remaining_sub_tasks = self.state.get_remaining_subtasks(plan_id) + remaining_sub_tasks_str = "" if len(remaining_sub_tasks) != 0 else "None" + for sub_task in remaining_sub_tasks: + task_str = ( + f"SubTask(name='{sub_task.name}', " + f"input='{sub_task.input}', " + f"expected_output='{sub_task.expected_output}', " + f"dependencies='{sub_task.dependencies}')\n" + ) + remaining_sub_tasks_str += task_str + + # get the tools string + tools = self.tools + tools_str = "" + for tool in tools: + tools_str += tool.metadata.name + ": " + tool.metadata.description + "\n" + + # return the kwargs + return { + "tools_str": tools_str.strip(), + "task": task.strip(), + "completed_outputs": completed_outputs_str.strip(), + "remaining_sub_tasks": remaining_sub_tasks_str.strip(), + } diff --git a/templates/types/multiagent/fastapi/app/agents/query_engine/agent.py b/templates/types/multiagent/fastapi/app/agents/query_engine/agent.py deleted file mode 100644 index 4ed24e5e..00000000 --- a/templates/types/multiagent/fastapi/app/agents/query_engine/agent.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -from llama_agents import AgentService, SimpleMessageQueue -from llama_index.core.agent import FunctionCallingAgentWorker -from llama_index.core.tools import QueryEngineTool, ToolMetadata -from llama_index.core.settings import Settings -from app.engine.index import get_index -from app.utils import load_from_env - - -DEFAULT_QUERY_ENGINE_AGENT_DESCRIPTION = ( - "Used to answer the questions using the provided context data." -) - - -def get_query_engine_tool() -> QueryEngineTool: - """ - Provide an agent worker that can be used to query the index. - """ - index = get_index() - if index is None: - raise ValueError("Index not found. Please create an index first.") - top_k = int(os.getenv("TOP_K", 0)) - query_engine = index.as_query_engine( - **({"similarity_top_k": top_k} if top_k != 0 else {}) - ) - return QueryEngineTool( - query_engine=query_engine, - metadata=ToolMetadata( - name="context_data", - description=""" - Provide the provided context information. - Use a detailed plain text question as input to the tool. - """, - ), - ) - - -def init_query_engine_agent( - message_queue: SimpleMessageQueue, -) -> AgentService: - """ - Initialize the agent service. - """ - agent = FunctionCallingAgentWorker( - tools=[get_query_engine_tool()], llm=Settings.llm, prefix_messages=[] - ).as_agent() - return AgentService( - service_name="context_query_agent", - agent=agent, - message_queue=message_queue.client, - description=load_from_env("AGENT_QUERY_ENGINE_DESCRIPTION", throw_error=False) - or DEFAULT_QUERY_ENGINE_AGENT_DESCRIPTION, - host=load_from_env("AGENT_QUERY_ENGINE_HOST", throw_error=False) or "127.0.0.1", - port=int(load_from_env("AGENT_QUERY_ENGINE_PORT")), - ) diff --git a/templates/types/multiagent/fastapi/app/agents/single.py b/templates/types/multiagent/fastapi/app/agents/single.py new file mode 100644 index 00000000..b47662f8 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/agents/single.py @@ -0,0 +1,245 @@ +from abc import abstractmethod +from typing import Any, AsyncGenerator, List, Optional + +from llama_index.core.llms import ChatMessage, ChatResponse +from llama_index.core.llms.function_calling import FunctionCallingLLM +from llama_index.core.memory import ChatMemoryBuffer +from llama_index.core.settings import Settings +from llama_index.core.tools import ToolOutput, ToolSelection +from llama_index.core.tools.types import BaseTool +from llama_index.core.tools import FunctionTool + +from llama_index.core.workflow import ( + Context, + Event, + StartEvent, + StopEvent, + Workflow, + step, +) +from pydantic import BaseModel + + +class InputEvent(Event): + input: list[ChatMessage] + + +class ToolCallEvent(Event): + tool_calls: list[ToolSelection] + + +class AgentRunEvent(Event): + name: str + _msg: str + + @property + def msg(self): + return self._msg + + @msg.setter + def msg(self, value): + self._msg = value + + +class AgentRunResult(BaseModel): + response: ChatResponse + sources: list[ToolOutput] + + +class ContextAwareTool(FunctionTool): + @abstractmethod + async def acall(self, ctx: Context, input: Any) -> ToolOutput: + pass + + +class FunctionCallingAgent(Workflow): + def __init__( + self, + *args: Any, + llm: FunctionCallingLLM | None = None, + chat_history: Optional[List[ChatMessage]] = None, + tools: List[BaseTool] | None = None, + system_prompt: str | None = None, + verbose: bool = False, + timeout: float = 360.0, + name: str, + write_events: bool = True, + role: Optional[str] = None, + **kwargs: Any, + ) -> None: + super().__init__(*args, verbose=verbose, timeout=timeout, **kwargs) + self.tools = tools or [] + self.name = name + self.role = role + self.write_events = write_events + + if llm is None: + llm = Settings.llm + self.llm = llm + assert self.llm.metadata.is_function_calling_model + + self.system_prompt = system_prompt + + self.memory = ChatMemoryBuffer.from_defaults( + llm=self.llm, chat_history=chat_history + ) + self.sources = [] + + @step() + async def prepare_chat_history(self, ctx: Context, ev: StartEvent) -> InputEvent: + # clear sources + self.sources = [] + + # set system prompt + if self.system_prompt is not None: + system_msg = ChatMessage(role="system", content=self.system_prompt) + self.memory.put(system_msg) + + # set streaming + ctx.data["streaming"] = getattr(ev, "streaming", False) + + # get user input + user_input = ev.input + user_msg = ChatMessage(role="user", content=user_input) + self.memory.put(user_msg) + if self.write_events: + ctx.write_event_to_stream( + AgentRunEvent(name=self.name, msg=f"Start to work on: {user_input}") + ) + + # get chat history + chat_history = self.memory.get() + return InputEvent(input=chat_history) + + @step() + async def handle_llm_input( + self, ctx: Context, ev: InputEvent + ) -> ToolCallEvent | StopEvent: + if ctx.data["streaming"]: + return await self.handle_llm_input_stream(ctx, ev) + + chat_history = ev.input + + response = await self.llm.achat_with_tools( + self.tools, chat_history=chat_history + ) + self.memory.put(response.message) + + tool_calls = self.llm.get_tool_calls_from_response( + response, error_on_no_tool_call=False + ) + + if not tool_calls: + if self.write_events: + ctx.write_event_to_stream( + AgentRunEvent(name=self.name, msg="Finished task") + ) + return StopEvent( + result=AgentRunResult(response=response, sources=[*self.sources]) + ) + else: + return ToolCallEvent(tool_calls=tool_calls) + + async def handle_llm_input_stream( + self, ctx: Context, ev: InputEvent + ) -> ToolCallEvent | StopEvent: + chat_history = ev.input + + async def response_generator() -> AsyncGenerator: + response_stream = await self.llm.astream_chat_with_tools( + self.tools, chat_history=chat_history + ) + + full_response = None + yielded_indicator = False + async for chunk in response_stream: + if "tool_calls" not in chunk.message.additional_kwargs: + # Yield a boolean to indicate whether the response is a tool call + if not yielded_indicator: + yield False + yielded_indicator = True + + # if not a tool call, yield the chunks! + yield chunk + elif not yielded_indicator: + # Yield the indicator for a tool call + yield True + yielded_indicator = True + + full_response = chunk + + # Write the full response to memory + self.memory.put(full_response.message) + + # Yield the final response + yield full_response + + # Start the generator + generator = response_generator() + + # Check for immediate tool call + is_tool_call = await generator.__anext__() + if is_tool_call: + full_response = await generator.__anext__() + tool_calls = self.llm.get_tool_calls_from_response(full_response) + return ToolCallEvent(tool_calls=tool_calls) + + # If we've reached here, it's not an immediate tool call, so we return the generator + if self.write_events: + ctx.write_event_to_stream( + AgentRunEvent(name=self.name, msg="Finished task") + ) + return StopEvent(result=generator) + + @step() + async def handle_tool_calls(self, ctx: Context, ev: ToolCallEvent) -> InputEvent: + tool_calls = ev.tool_calls + tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools} + + tool_msgs = [] + + # call tools -- safely! + for tool_call in tool_calls: + tool = tools_by_name.get(tool_call.tool_name) + additional_kwargs = { + "tool_call_id": tool_call.tool_id, + "name": tool.metadata.get_name(), + } + if not tool: + tool_msgs.append( + ChatMessage( + role="tool", + content=f"Tool {tool_call.tool_name} does not exist", + additional_kwargs=additional_kwargs, + ) + ) + continue + + try: + if isinstance(tool, ContextAwareTool): + # inject context for calling an context aware tool + tool_output = await tool.acall(ctx=ctx, **tool_call.tool_kwargs) + else: + tool_output = await tool.acall(**tool_call.tool_kwargs) + self.sources.append(tool_output) + tool_msgs.append( + ChatMessage( + role="tool", + content=tool_output.content, + additional_kwargs=additional_kwargs, + ) + ) + except Exception as e: + tool_msgs.append( + ChatMessage( + role="tool", + content=f"Encountered error in tool call: {e}", + additional_kwargs=additional_kwargs, + ) + ) + + for msg in tool_msgs: + self.memory.put(msg) + + chat_history = self.memory.get() + return InputEvent(input=chat_history) diff --git a/templates/types/multiagent/fastapi/app/api/__init__.py b/templates/types/multiagent/fastapi/app/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/templates/types/multiagent/fastapi/app/api/routers/__init__.py b/templates/types/multiagent/fastapi/app/api/routers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/templates/types/multiagent/fastapi/app/api/routers/chat.py b/templates/types/multiagent/fastapi/app/api/routers/chat.py new file mode 100644 index 00000000..beeb724e --- /dev/null +++ b/templates/types/multiagent/fastapi/app/api/routers/chat.py @@ -0,0 +1,43 @@ +import asyncio +import logging + +from fastapi import APIRouter, HTTPException, Request, status +from llama_index.core.workflow import Workflow + +from app.examples.factory import create_agent +from app.api.routers.models import ( + ChatData, +) +from app.api.routers.vercel_response import VercelStreamResponse + +chat_router = r = APIRouter() + +logger = logging.getLogger("uvicorn") + + +@r.post("") +async def chat( + request: Request, + data: ChatData, +): + try: + last_message_content = data.get_last_message_content() + messages = data.get_history_messages() + # TODO: generate filters based on doc_ids + # for now just use all documents + # doc_ids = data.get_chat_document_ids() + # TODO: use params + # params = data.data or {} + + agent: Workflow = create_agent(chat_history=messages) + task = asyncio.create_task( + agent.run(input=last_message_content, streaming=True) + ) + + return VercelStreamResponse(request, task, agent.stream_events, data) + except Exception as e: + logger.exception("Error in agent", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error in agent: {e}", + ) from e diff --git a/templates/types/multiagent/fastapi/app/api/routers/chat_config.py b/templates/types/multiagent/fastapi/app/api/routers/chat_config.py new file mode 100644 index 00000000..8d926e50 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/api/routers/chat_config.py @@ -0,0 +1,48 @@ +import logging +import os + +from fastapi import APIRouter + +from app.api.routers.models import ChatConfig + + +config_router = r = APIRouter() + +logger = logging.getLogger("uvicorn") + + +@r.get("") +async def chat_config() -> ChatConfig: + starter_questions = None + conversation_starters = os.getenv("CONVERSATION_STARTERS") + if conversation_starters and conversation_starters.strip(): + starter_questions = conversation_starters.strip().split("\n") + return ChatConfig(starter_questions=starter_questions) + + +try: + from app.engine.service import LLamaCloudFileService + + logger.info("LlamaCloud is configured. Adding /config/llamacloud route.") + + @r.get("/llamacloud") + async def chat_llama_cloud_config(): + projects = LLamaCloudFileService.get_all_projects_with_pipelines() + pipeline = os.getenv("LLAMA_CLOUD_INDEX_NAME") + project = os.getenv("LLAMA_CLOUD_PROJECT_NAME") + pipeline_config = None + if pipeline and project: + pipeline_config = { + "pipeline": pipeline, + "project": project, + } + return { + "projects": projects, + "pipeline": pipeline_config, + } + +except ImportError: + logger.debug( + "LlamaCloud is not configured. Skipping adding /config/llamacloud route." + ) + pass diff --git a/templates/types/multiagent/fastapi/app/api/routers/models.py b/templates/types/multiagent/fastapi/app/api/routers/models.py new file mode 100644 index 00000000..29648608 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/api/routers/models.py @@ -0,0 +1,227 @@ +import logging +import os +from typing import Any, Dict, List, Literal, Optional + +from llama_index.core.llms import ChatMessage, MessageRole +from llama_index.core.schema import NodeWithScore +from pydantic import BaseModel, Field, validator +from pydantic.alias_generators import to_camel + +from app.config import DATA_DIR + +logger = logging.getLogger("uvicorn") + + +class FileContent(BaseModel): + type: Literal["text", "ref"] + # If the file is pure text then the value is be a string + # otherwise, it's a list of document IDs + value: str | List[str] + + +class File(BaseModel): + id: str + content: FileContent + filename: str + filesize: int + filetype: str + + +class AnnotationFileData(BaseModel): + files: List[File] = Field( + default=[], + description="List of files", + ) + + class Config: + json_schema_extra = { + "example": { + "csvFiles": [ + { + "content": "Name, Age\nAlice, 25\nBob, 30", + "filename": "example.csv", + "filesize": 123, + "id": "123", + "type": "text/csv", + } + ] + } + } + alias_generator = to_camel + + +class Annotation(BaseModel): + type: str + data: AnnotationFileData | List[str] + + def to_content(self) -> str | None: + if self.type == "document_file": + # We only support generating context content for CSV files for now + csv_files = [file for file in self.data.files if file.filetype == "csv"] + if len(csv_files) > 0: + return "Use data from following CSV raw content\n" + "\n".join( + [f"```csv\n{csv_file.content.value}\n```" for csv_file in csv_files] + ) + else: + logger.warning( + f"The annotation {self.type} is not supported for generating context content" + ) + return None + + +class Message(BaseModel): + role: MessageRole + content: str + annotations: List[Annotation] | None = None + + +class ChatData(BaseModel): + messages: List[Message] + data: Any = None + + class Config: + json_schema_extra = { + "example": { + "messages": [ + { + "role": "user", + "content": "What standards for letters exist?", + } + ] + } + } + + @validator("messages") + def messages_must_not_be_empty(cls, v): + if len(v) == 0: + raise ValueError("Messages must not be empty") + return v + + def get_last_message_content(self) -> str: + """ + Get the content of the last message along with the data content if available. + Fallback to use data content from previous messages + """ + if len(self.messages) == 0: + raise ValueError("There is not any message in the chat") + last_message = self.messages[-1] + message_content = last_message.content + for message in reversed(self.messages): + if message.role == MessageRole.USER and message.annotations is not None: + annotation_contents = filter( + None, + [annotation.to_content() for annotation in message.annotations], + ) + if not annotation_contents: + continue + annotation_text = "\n".join(annotation_contents) + message_content = f"{message_content}\n{annotation_text}" + break + return message_content + + def get_history_messages(self) -> List[ChatMessage]: + """ + Get the history messages + """ + return [ + ChatMessage(role=message.role, content=message.content) + for message in self.messages[:-1] + ] + + def is_last_message_from_user(self) -> bool: + return self.messages[-1].role == MessageRole.USER + + def get_chat_document_ids(self) -> List[str]: + """ + Get the document IDs from the chat messages + """ + document_ids: List[str] = [] + for message in self.messages: + if message.role == MessageRole.USER and message.annotations is not None: + for annotation in message.annotations: + if ( + annotation.type == "document_file" + and annotation.data.files is not None + ): + for fi in annotation.data.files: + if fi.content.type == "ref": + document_ids += fi.content.value + return list(set(document_ids)) + + +class SourceNodes(BaseModel): + id: str + metadata: Dict[str, Any] + score: Optional[float] + text: str + url: Optional[str] + + @classmethod + def from_source_node(cls, source_node: NodeWithScore): + metadata = source_node.node.metadata + url = cls.get_url_from_metadata(metadata) + + return cls( + id=source_node.node.node_id, + metadata=metadata, + score=source_node.score, + text=source_node.node.text, # type: ignore + url=url, + ) + + @classmethod + def get_url_from_metadata(cls, metadata: Dict[str, Any]) -> str: + url_prefix = os.getenv("FILESERVER_URL_PREFIX") + if not url_prefix: + logger.warning( + "Warning: FILESERVER_URL_PREFIX not set in environment variables. Can't use file server" + ) + file_name = metadata.get("file_name") + + if file_name and url_prefix: + # file_name exists and file server is configured + pipeline_id = metadata.get("pipeline_id") + if pipeline_id: + # file is from LlamaCloud + file_name = f"{pipeline_id}${file_name}" + return f"{url_prefix}/output/llamacloud/{file_name}" + is_private = metadata.get("private", "false") == "true" + if is_private: + # file is a private upload + return f"{url_prefix}/output/uploaded/{file_name}" + # file is from calling the 'generate' script + # Get the relative path of file_path to data_dir + file_path = metadata.get("file_path") + data_dir = os.path.abspath(DATA_DIR) + if file_path and data_dir: + relative_path = os.path.relpath(file_path, data_dir) + return f"{url_prefix}/data/{relative_path}" + # fallback to URL in metadata (e.g. for websites) + return metadata.get("URL") + + @classmethod + def from_source_nodes(cls, source_nodes: List[NodeWithScore]): + return [cls.from_source_node(node) for node in source_nodes] + + +class Result(BaseModel): + result: Message + nodes: List[SourceNodes] + + +class ChatConfig(BaseModel): + starter_questions: Optional[List[str]] = Field( + default=None, + description="List of starter questions", + serialization_alias="starterQuestions", + ) + + class Config: + json_schema_extra = { + "example": { + "starterQuestions": [ + "What standards for letters exist?", + "What are the requirements for a letter to be considered a letter?", + ] + } + } diff --git a/templates/types/multiagent/fastapi/app/api/routers/upload.py b/templates/types/multiagent/fastapi/app/api/routers/upload.py new file mode 100644 index 00000000..ccc03004 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/api/routers/upload.py @@ -0,0 +1,29 @@ +import logging +from typing import List, Any + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from app.api.services.file import PrivateFileService + +file_upload_router = r = APIRouter() + +logger = logging.getLogger("uvicorn") + + +class FileUploadRequest(BaseModel): + base64: str + filename: str + params: Any = None + + +@r.post("") +def upload_file(request: FileUploadRequest) -> List[str]: + try: + logger.info("Processing file") + return PrivateFileService.process_file( + request.filename, request.base64, request.params + ) + except Exception as e: + logger.error(f"Error processing file: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Error processing file") diff --git a/templates/types/multiagent/fastapi/app/api/routers/vercel_response.py b/templates/types/multiagent/fastapi/app/api/routers/vercel_response.py new file mode 100644 index 00000000..ec03fb6c --- /dev/null +++ b/templates/types/multiagent/fastapi/app/api/routers/vercel_response.py @@ -0,0 +1,100 @@ +from asyncio import Task +import json +import logging +from typing import AsyncGenerator + +from aiostream import stream +from fastapi import Request +from fastapi.responses import StreamingResponse + +from app.api.routers.models import ChatData +from app.agents.single import AgentRunEvent, AgentRunResult + +logger = logging.getLogger("uvicorn") + + +class VercelStreamResponse(StreamingResponse): + """ + Class to convert the response from the chat engine to the streaming format expected by Vercel + """ + + TEXT_PREFIX = "0:" + DATA_PREFIX = "8:" + + @classmethod + def convert_text(cls, token: str): + # Escape newlines and double quotes to avoid breaking the stream + token = json.dumps(token) + return f"{cls.TEXT_PREFIX}{token}\n" + + @classmethod + def convert_data(cls, data: dict): + data_str = json.dumps(data) + return f"{cls.DATA_PREFIX}[{data_str}]\n" + + def __init__( + self, + request: Request, + task: Task[AgentRunResult | AsyncGenerator], + events: AsyncGenerator[AgentRunEvent, None], + chat_data: ChatData, + verbose: bool = True, + ): + content = VercelStreamResponse.content_generator( + request, task, events, chat_data, verbose + ) + super().__init__(content=content) + + @classmethod + async def content_generator( + cls, + request: Request, + task: Task[AgentRunResult | AsyncGenerator], + events: AsyncGenerator[AgentRunEvent, None], + chat_data: ChatData, + verbose: bool = True, + ): + # Yield the text response + async def _chat_response_generator(): + result = await task + + if isinstance(result, AgentRunResult): + for token in result.response.message.content: + yield VercelStreamResponse.convert_text(token) + + if isinstance(result, AsyncGenerator): + async for token in result: + yield VercelStreamResponse.convert_text(token.delta) + + # TODO: stream NextQuestionSuggestion + # TODO: stream sources + + # Yield the events from the event handler + async def _event_generator(): + async for event in events(): + event_response = _event_to_response(event) + if verbose: + logger.debug(event_response) + if event_response is not None: + yield VercelStreamResponse.convert_data(event_response) + + combine = stream.merge(_chat_response_generator(), _event_generator()) + + is_stream_started = False + async with combine.stream() as streamer: + if not is_stream_started: + is_stream_started = True + # Stream a blank message to start the stream + yield VercelStreamResponse.convert_text("") + + async for output in streamer: + yield output + if await request.is_disconnected(): + break + + +def _event_to_response(event: AgentRunEvent) -> dict: + return { + "type": "agent", + "data": {"agent": event.name, "text": event.msg}, + } diff --git a/templates/types/multiagent/fastapi/app/api/services/file.py b/templates/types/multiagent/fastapi/app/api/services/file.py new file mode 100644 index 00000000..9441db6e --- /dev/null +++ b/templates/types/multiagent/fastapi/app/api/services/file.py @@ -0,0 +1,119 @@ +import base64 +import mimetypes +import os +from io import BytesIO +from pathlib import Path +from typing import Any, List, Tuple + +from app.engine.index import IndexConfig, get_index +from llama_index.core import VectorStoreIndex +from llama_index.core.ingestion import IngestionPipeline +from llama_index.core.readers.file.base import ( + _try_loading_included_file_formats as get_file_loaders_map, +) +from llama_index.core.schema import Document +from llama_index.indices.managed.llama_cloud.base import LlamaCloudIndex +from llama_index.readers.file import FlatReader + + +def get_llamaparse_parser(): + from app.engine.loaders import load_configs + from app.engine.loaders.file import FileLoaderConfig, llama_parse_parser + + config = load_configs() + file_loader_config = FileLoaderConfig(**config["file"]) + if file_loader_config.use_llama_parse: + return llama_parse_parser() + else: + return None + + +def default_file_loaders_map(): + default_loaders = get_file_loaders_map() + default_loaders[".txt"] = FlatReader + return default_loaders + + +class PrivateFileService: + PRIVATE_STORE_PATH = "output/uploaded" + + @staticmethod + def preprocess_base64_file(base64_content: str) -> Tuple[bytes, str | None]: + header, data = base64_content.split(",", 1) + mime_type = header.split(";")[0].split(":", 1)[1] + extension = mimetypes.guess_extension(mime_type) + # File data as bytes + return base64.b64decode(data), extension + + @staticmethod + def store_and_parse_file(file_name, file_data, extension) -> List[Document]: + # Store file to the private directory + os.makedirs(PrivateFileService.PRIVATE_STORE_PATH, exist_ok=True) + file_path = Path(os.path.join(PrivateFileService.PRIVATE_STORE_PATH, file_name)) + + # write file + with open(file_path, "wb") as f: + f.write(file_data) + + # Load file to documents + # If LlamaParse is enabled, use it to parse the file + # Otherwise, use the default file loaders + reader = get_llamaparse_parser() + if reader is None: + reader_cls = default_file_loaders_map().get(extension) + if reader_cls is None: + raise ValueError(f"File extension {extension} is not supported") + reader = reader_cls() + documents = reader.load_data(file_path) + # Add custom metadata + for doc in documents: + doc.metadata["file_name"] = file_name + doc.metadata["private"] = "true" + return documents + + @staticmethod + def process_file(file_name: str, base64_content: str, params: Any) -> List[str]: + file_data, extension = PrivateFileService.preprocess_base64_file(base64_content) + + # Add the nodes to the index and persist it + index_config = IndexConfig(**params) + current_index = get_index(index_config) + + # Insert the documents into the index + if isinstance(current_index, LlamaCloudIndex): + from app.engine.service import LLamaCloudFileService + + project_id = current_index._get_project_id() + pipeline_id = current_index._get_pipeline_id() + # LlamaCloudIndex is a managed index so we can directly use the files + upload_file = (file_name, BytesIO(file_data)) + return [ + LLamaCloudFileService.add_file_to_pipeline( + project_id, + pipeline_id, + upload_file, + custom_metadata={ + # Set private=true to mark the document as private user docs (required for filtering) + "private": "true", + }, + ) + ] + else: + # First process documents into nodes + documents = PrivateFileService.store_and_parse_file( + file_name, file_data, extension + ) + pipeline = IngestionPipeline() + nodes = pipeline.run(documents=documents) + + # Add the nodes to the index and persist it + if current_index is None: + current_index = VectorStoreIndex(nodes=nodes) + else: + current_index.insert_nodes(nodes=nodes) + current_index.storage_context.persist( + persist_dir=os.environ.get("STORAGE_DIR", "storage") + ) + + # Return the document ids + return [doc.doc_id for doc in documents] diff --git a/templates/types/multiagent/fastapi/app/api/services/suggestion.py b/templates/types/multiagent/fastapi/app/api/services/suggestion.py new file mode 100644 index 00000000..f881962e --- /dev/null +++ b/templates/types/multiagent/fastapi/app/api/services/suggestion.py @@ -0,0 +1,60 @@ +import logging +from typing import List + +from app.api.routers.models import Message +from llama_index.core.prompts import PromptTemplate +from llama_index.core.settings import Settings +from pydantic import BaseModel + +NEXT_QUESTIONS_SUGGESTION_PROMPT = PromptTemplate( + "You're a helpful assistant! Your task is to suggest the next question that user might ask. " + "\nHere is the conversation history" + "\n---------------------\n{conversation}\n---------------------" + "Given the conversation history, please give me {number_of_questions} questions that you might ask next!" +) +N_QUESTION_TO_GENERATE = 3 + + +logger = logging.getLogger("uvicorn") + + +class NextQuestions(BaseModel): + """A list of questions that user might ask next""" + + questions: List[str] + + +class NextQuestionSuggestion: + @staticmethod + async def suggest_next_questions( + messages: List[Message], + number_of_questions: int = N_QUESTION_TO_GENERATE, + ) -> List[str]: + """ + Suggest the next questions that user might ask based on the conversation history + Return as empty list if there is an error + """ + try: + # Reduce the cost by only using the last two messages + last_user_message = None + last_assistant_message = None + for message in reversed(messages): + if message.role == "user": + last_user_message = f"User: {message.content}" + elif message.role == "assistant": + last_assistant_message = f"Assistant: {message.content}" + if last_user_message and last_assistant_message: + break + conversation: str = f"{last_user_message}\n{last_assistant_message}" + + output: NextQuestions = await Settings.llm.astructured_predict( + NextQuestions, + prompt=NEXT_QUESTIONS_SUGGESTION_PROMPT, + conversation=conversation, + number_of_questions=number_of_questions, + ) + + return output.questions + except Exception as e: + logger.error(f"Error when generating next question: {e}") + return [] diff --git a/templates/types/multiagent/fastapi/app/config.py b/templates/types/multiagent/fastapi/app/config.py new file mode 100644 index 00000000..29fa8d9a --- /dev/null +++ b/templates/types/multiagent/fastapi/app/config.py @@ -0,0 +1 @@ +DATA_DIR = "data" diff --git a/templates/types/multiagent/fastapi/app/core/control_plane.py b/templates/types/multiagent/fastapi/app/core/control_plane.py deleted file mode 100644 index 2ac3a1e3..00000000 --- a/templates/types/multiagent/fastapi/app/core/control_plane.py +++ /dev/null @@ -1,19 +0,0 @@ -from llama_index.llms.openai import OpenAI -from llama_agents import AgentOrchestrator, ControlPlaneServer -from app.core.message_queue import message_queue -from app.utils import load_from_env - - -control_plane_host = ( - load_from_env("CONTROL_PLANE_HOST", throw_error=False) or "127.0.0.1" -) -control_plane_port = load_from_env("CONTROL_PLANE_PORT", throw_error=False) or "8001" - - -# setup control plane -control_plane = ControlPlaneServer( - message_queue=message_queue, - orchestrator=AgentOrchestrator(llm=OpenAI()), - host=control_plane_host, - port=int(control_plane_port) if control_plane_port else None, -) diff --git a/templates/types/multiagent/fastapi/app/core/message_queue.py b/templates/types/multiagent/fastapi/app/core/message_queue.py deleted file mode 100644 index cebca445..00000000 --- a/templates/types/multiagent/fastapi/app/core/message_queue.py +++ /dev/null @@ -1,12 +0,0 @@ -from llama_agents import SimpleMessageQueue -from app.utils import load_from_env - -message_queue_host = ( - load_from_env("MESSAGE_QUEUE_HOST", throw_error=False) or "127.0.0.1" -) -message_queue_port = load_from_env("MESSAGE_QUEUE_PORT", throw_error=False) or "8000" - -message_queue = SimpleMessageQueue( - host=message_queue_host, - port=int(message_queue_port) if message_queue_port else None, -) diff --git a/templates/types/multiagent/fastapi/app/core/task_result.py b/templates/types/multiagent/fastapi/app/core/task_result.py deleted file mode 100644 index 9b0737e2..00000000 --- a/templates/types/multiagent/fastapi/app/core/task_result.py +++ /dev/null @@ -1,88 +0,0 @@ -import json -from logging import getLogger -from pathlib import Path -from fastapi import FastAPI -from typing import Dict, Optional -from llama_agents import CallableMessageConsumer, QueueMessage -from llama_agents.message_queues.base import BaseMessageQueue -from llama_agents.message_consumers.base import BaseMessageQueueConsumer -from llama_agents.message_consumers.remote import RemoteMessageConsumer -from app.utils import load_from_env -from app.core.message_queue import message_queue - - -logger = getLogger(__name__) - - -class TaskResultService: - def __init__( - self, - message_queue: BaseMessageQueue, - name: str = "human", - host: str = "127.0.0.1", - port: Optional[int] = 8002, - ) -> None: - self.name = name - self.host = host - self.port = port - - self._message_queue = message_queue - - # app - self._app = FastAPI() - self._app.add_api_route( - "/", self.home, methods=["GET"], tags=["Human Consumer"] - ) - self._app.add_api_route( - "/process_message", - self.process_message, - methods=["POST"], - tags=["Human Consumer"], - ) - - @property - def message_queue(self) -> BaseMessageQueue: - return self._message_queue - - def as_consumer(self, remote: bool = False) -> BaseMessageQueueConsumer: - if remote: - return RemoteMessageConsumer( - url=( - f"http://{self.host}:{self.port}/process_message" - if self.port - else f"http://{self.host}/process_message" - ), - message_type=self.name, - ) - - return CallableMessageConsumer( - message_type=self.name, - handler=self.process_message, - ) - - async def process_message(self, message: QueueMessage) -> None: - Path("task_results").mkdir(exist_ok=True) - with open("task_results/task_results.json", "+a") as f: - json.dump(message.model_dump(), f) - f.write("\n") - - async def home(self) -> Dict[str, str]: - return {"message": "hello, human."} - - async def register_to_message_queue(self) -> None: - """Register to the message queue.""" - await self.message_queue.register_consumer(self.as_consumer(remote=True)) - - -human_consumer_host = ( - load_from_env("HUMAN_CONSUMER_HOST", throw_error=False) or "127.0.0.1" -) -human_consumer_port = load_from_env("HUMAN_CONSUMER_PORT", throw_error=False) or "8002" - - -human_consumer_server = TaskResultService( - message_queue=message_queue, - host=human_consumer_host, - port=int(human_consumer_port) if human_consumer_port else None, - name="human", -) diff --git a/templates/types/multiagent/fastapi/app/examples/choreography.py b/templates/types/multiagent/fastapi/app/examples/choreography.py new file mode 100644 index 00000000..aa7c197d --- /dev/null +++ b/templates/types/multiagent/fastapi/app/examples/choreography.py @@ -0,0 +1,25 @@ +from typing import List, Optional +from app.agents.single import FunctionCallingAgent +from app.agents.multi import AgentCallingAgent +from app.examples.researcher import create_researcher +from llama_index.core.chat_engine.types import ChatMessage + + +def create_choreography(chat_history: Optional[List[ChatMessage]] = None): + researcher = create_researcher(chat_history) + reviewer = FunctionCallingAgent( + name="reviewer", + role="expert in reviewing blog posts", + system_prompt="You are an expert in reviewing blog posts. You are given a task to review a blog post. Review the post for logical inconsistencies, ask critical questions, and provide suggestions for improvement. Furthermore, proofread the post for grammar and spelling errors. If the post is good, you can say 'The post is good.'", + chat_history=chat_history, + ) + return AgentCallingAgent( + name="writer", + agents=[researcher, reviewer], + role="expert in writing blog posts", + system_prompt="""You are an expert in writing blog posts. You are given a task to write a blog post. Before starting to write the post, consult the researcher agent to get the information you need. Don't make up any information yourself. + After creating a draft for the post, send it to the reviewer agent to receive some feedback and make sure to incorporate the feedback from the reviewer. + You can consult the reviewer and researcher maximal two times. Your output should just contain the blog post.""", + # TODO: add chat_history support to AgentCallingAgent + # chat_history=chat_history, + ) diff --git a/templates/types/multiagent/fastapi/app/examples/factory.py b/templates/types/multiagent/fastapi/app/examples/factory.py new file mode 100644 index 00000000..2a376a32 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/examples/factory.py @@ -0,0 +1,29 @@ +import logging +from typing import List, Optional +from app.examples.choreography import create_choreography +from app.examples.orchestrator import create_orchestrator +from app.examples.workflow import create_workflow + + +from llama_index.core.workflow import Workflow +from llama_index.core.chat_engine.types import ChatMessage + + +import os + +logger = logging.getLogger("uvicorn") + + +def create_agent(chat_history: Optional[List[ChatMessage]] = None) -> Workflow: + agent_type = os.getenv("EXAMPLE_TYPE", "").lower() + match agent_type: + case "choreography": + agent = create_choreography(chat_history) + case "orchestrator": + agent = create_orchestrator(chat_history) + case _: + agent = create_workflow(chat_history) + + logger.info(f"Using agent pattern: {agent_type}") + + return agent diff --git a/templates/types/multiagent/fastapi/app/examples/orchestrator.py b/templates/types/multiagent/fastapi/app/examples/orchestrator.py new file mode 100644 index 00000000..9f915124 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/examples/orchestrator.py @@ -0,0 +1,27 @@ +from typing import List, Optional +from app.agents.single import FunctionCallingAgent +from app.agents.multi import AgentOrchestrator +from app.examples.researcher import create_researcher + +from llama_index.core.chat_engine.types import ChatMessage + + +def create_orchestrator(chat_history: Optional[List[ChatMessage]] = None): + researcher = create_researcher(chat_history) + writer = FunctionCallingAgent( + name="writer", + role="expert in writing blog posts", + system_prompt="""You are an expert in writing blog posts. You are given a task to write a blog post. Don't make up any information yourself. If you don't have the necessary information to write a blog post, reply "I need information about the topic to write the blog post". If you have all the information needed, write the blog post.""", + chat_history=chat_history, + ) + reviewer = FunctionCallingAgent( + name="reviewer", + role="expert in reviewing blog posts", + system_prompt="""You are an expert in reviewing blog posts. You are given a task to review a blog post. Review the post and fix the issues found yourself. You must output a final blog post. + Especially check for logical inconsistencies and proofread the post for grammar and spelling errors.""", + chat_history=chat_history, + ) + return AgentOrchestrator( + agents=[writer, reviewer, researcher], + refine_plan=False, + ) diff --git a/templates/types/multiagent/fastapi/app/examples/researcher.py b/templates/types/multiagent/fastapi/app/examples/researcher.py new file mode 100644 index 00000000..ed6819d4 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/examples/researcher.py @@ -0,0 +1,39 @@ +import os +from typing import List +from llama_index.core.tools import QueryEngineTool, ToolMetadata +from app.agents.single import FunctionCallingAgent +from app.engine.index import get_index + +from llama_index.core.chat_engine.types import ChatMessage + + +def get_query_engine_tool() -> QueryEngineTool: + """ + Provide an agent worker that can be used to query the index. + """ + index = get_index() + if index is None: + raise ValueError("Index not found. Please create an index first.") + top_k = int(os.getenv("TOP_K", 0)) + query_engine = index.as_query_engine( + **({"similarity_top_k": top_k} if top_k != 0 else {}) + ) + return QueryEngineTool( + query_engine=query_engine, + metadata=ToolMetadata( + name="query_index", + description=""" + Use this tool to retrieve information about the text corpus from the index. + """, + ), + ) + + +def create_researcher(chat_history: List[ChatMessage]): + return FunctionCallingAgent( + name="researcher", + tools=[get_query_engine_tool()], + role="expert in retrieving any unknown content", + system_prompt="You are a researcher agent. You are given a researching task. You must use your tools to complete the research.", + chat_history=chat_history, + ) diff --git a/templates/types/multiagent/fastapi/app/examples/workflow.py b/templates/types/multiagent/fastapi/app/examples/workflow.py new file mode 100644 index 00000000..731b3033 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/examples/workflow.py @@ -0,0 +1,139 @@ +import asyncio +from typing import AsyncGenerator, List, Optional + + +from llama_index.core.workflow import ( + Context, + Event, + StartEvent, + StopEvent, + Workflow, + step, +) +from llama_index.core.chat_engine.types import ChatMessage +from app.agents.single import AgentRunEvent, AgentRunResult, FunctionCallingAgent +from app.examples.researcher import create_researcher + + +def create_workflow(chat_history: Optional[List[ChatMessage]] = None): + researcher = create_researcher( + chat_history=chat_history, + ) + writer = FunctionCallingAgent( + name="writer", + role="expert in writing blog posts", + system_prompt="""You are an expert in writing blog posts. You are given a task to write a blog post. Don't make up any information yourself.""", + chat_history=chat_history, + ) + reviewer = FunctionCallingAgent( + name="reviewer", + role="expert in reviewing blog posts", + system_prompt="You are an expert in reviewing blog posts. You are given a task to review a blog post. Review the post for logical inconsistencies, ask critical questions, and provide suggestions for improvement. Furthermore, proofread the post for grammar and spelling errors. Only if the post is good enough for publishing, then you MUST return 'The post is good.'. In all other cases return your review.", + chat_history=chat_history, + ) + workflow = BlogPostWorkflow(timeout=360) + workflow.add_workflows(researcher=researcher, writer=writer, reviewer=reviewer) + return workflow + + +class ResearchEvent(Event): + input: str + + +class WriteEvent(Event): + input: str + is_good: bool = False + + +class ReviewEvent(Event): + input: str + + +class BlogPostWorkflow(Workflow): + @step() + async def start(self, ctx: Context, ev: StartEvent) -> ResearchEvent: + # set streaming + ctx.data["streaming"] = getattr(ev, "streaming", False) + # start the workflow with researching about a topic + ctx.data["task"] = ev.input + return ResearchEvent(input=f"Research for this task: {ev.input}") + + @step() + async def research( + self, ctx: Context, ev: ResearchEvent, researcher: FunctionCallingAgent + ) -> WriteEvent: + result: AgentRunResult = await self.run_agent(ctx, researcher, ev.input) + content = result.response.message.content + return WriteEvent( + input=f"Write a blog post given this task: {ctx.data['task']} using this research content: {content}" + ) + + @step() + async def write( + self, ctx: Context, ev: WriteEvent, writer: FunctionCallingAgent + ) -> ReviewEvent | StopEvent: + MAX_ATTEMPTS = 2 + ctx.data["attempts"] = ctx.data.get("attempts", 0) + 1 + too_many_attempts = ctx.data["attempts"] > MAX_ATTEMPTS + if too_many_attempts: + ctx.write_event_to_stream( + AgentRunEvent( + name=writer.name, + msg=f"Too many attempts ({MAX_ATTEMPTS}) to write the blog post. Proceeding with the current version.", + ) + ) + if ev.is_good or too_many_attempts: + # too many attempts or the blog post is good - stream final response if requested + result = await self.run_agent( + ctx, writer, ev.input, streaming=ctx.data["streaming"] + ) + return StopEvent(result=result) + result: AgentRunResult = await self.run_agent(ctx, writer, ev.input) + ctx.data["result"] = result + return ReviewEvent(input=result.response.message.content) + + @step() + async def review( + self, ctx: Context, ev: ReviewEvent, reviewer: FunctionCallingAgent + ) -> WriteEvent: + result: AgentRunResult = await self.run_agent(ctx, reviewer, ev.input) + review = result.response.message.content + old_content = ctx.data["result"].response.message.content + post_is_good = "post is good" in review.lower() + ctx.write_event_to_stream( + AgentRunEvent( + name=reviewer.name, + msg=f"The post is {'not ' if not post_is_good else ''}good enough for publishing. Sending back to the writer{' for publication.' if post_is_good else '.'}", + ) + ) + if post_is_good: + return WriteEvent( + input=f"You're blog post is ready for publication. Please respond with just the blog post. Blog post: ```{old_content}```", + is_good=True, + ) + else: + return WriteEvent( + input=f"""Improve the writing of a given blog post by using a given review. +Blog post: +``` +{old_content} +``` + +Review: +``` +{review} +```""" + ) + + async def run_agent( + self, + ctx: Context, + agent: FunctionCallingAgent, + input: str, + streaming: bool = False, + ) -> AgentRunResult | AsyncGenerator: + task = asyncio.create_task(agent.run(input=input, streaming=streaming)) + # bubble all events while running the executor to the planner + async for event in agent.stream_events(): + ctx.write_event_to_stream(event) + return await task diff --git a/templates/types/multiagent/fastapi/app/observability.py b/templates/types/multiagent/fastapi/app/observability.py new file mode 100644 index 00000000..28019c37 --- /dev/null +++ b/templates/types/multiagent/fastapi/app/observability.py @@ -0,0 +1,2 @@ +def init_observability(): + pass diff --git a/templates/types/multiagent/fastapi/gitignore b/templates/types/multiagent/fastapi/gitignore new file mode 100644 index 00000000..ae22d348 --- /dev/null +++ b/templates/types/multiagent/fastapi/gitignore @@ -0,0 +1,4 @@ +__pycache__ +storage +.env +output diff --git a/templates/types/multiagent/fastapi/main.py b/templates/types/multiagent/fastapi/main.py index 03dc98a6..11395a07 100644 --- a/templates/types/multiagent/fastapi/main.py +++ b/templates/types/multiagent/fastapi/main.py @@ -1,28 +1,72 @@ # flake8: noqa: E402 +import os from dotenv import load_dotenv -from app.settings import init_settings + +from app.config import DATA_DIR load_dotenv() + +import logging + +import uvicorn +from app.api.routers.chat import chat_router +from app.api.routers.chat_config import config_router +from app.api.routers.upload import file_upload_router +from app.observability import init_observability +from app.settings import init_settings +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import RedirectResponse +from fastapi.staticfiles import StaticFiles + +app = FastAPI() + init_settings() +init_observability() + + +environment = os.getenv("ENVIRONMENT", "dev") # Default to 'development' if not set +logger = logging.getLogger("uvicorn") + +if environment == "dev": + logger.warning("Running in development mode - allowing CORS for all origins") + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + # Redirect to documentation page when accessing base URL + @app.get("/") + async def redirect_to_docs(): + return RedirectResponse(url="/docs") + + +def mount_static_files(directory, path): + if os.path.exists(directory): + logger.info(f"Mounting static files '{directory}' at '{path}'") + app.mount( + path, + StaticFiles(directory=directory, check_dir=False), + name=f"{directory}-static", + ) + + +# Mount the data files to serve the file viewer +mount_static_files(DATA_DIR, "/api/files/data") +# Mount the output files from tools +mount_static_files("output", "/api/files/output") + +app.include_router(chat_router, prefix="/api/chat") +app.include_router(config_router, prefix="/api/chat/config") +app.include_router(file_upload_router, prefix="/api/chat/upload") -from llama_agents import ServerLauncher -from app.core.message_queue import message_queue -from app.core.control_plane import control_plane -from app.core.task_result import human_consumer_server -from app.agents.query_engine.agent import init_query_engine_agent -from app.agents.dummy.agent import init_dummy_agent - -agents = [ - init_query_engine_agent(message_queue), - init_dummy_agent(message_queue), -] - -launcher = ServerLauncher( - agents, - control_plane, - message_queue, - additional_consumers=[human_consumer_server.as_consumer()], -) if __name__ == "__main__": - launcher.launch_servers() + app_host = os.getenv("APP_HOST", "0.0.0.0") + app_port = int(os.getenv("APP_PORT", "8000")) + reload = True if environment == "dev" else False + + uvicorn.run(app="main:app", host=app_host, port=app_port, reload=reload) diff --git a/templates/types/multiagent/fastapi/pyproject.toml b/templates/types/multiagent/fastapi/pyproject.toml index 72bfa476..804bd464 100644 --- a/templates/types/multiagent/fastapi/pyproject.toml +++ b/templates/types/multiagent/fastapi/pyproject.toml @@ -1,3 +1,4 @@ +[tool] [tool.poetry] name = "app" version = "0.1.0" @@ -10,11 +11,17 @@ generate = "app.engine.generate:generate_datasource" [tool.poetry.dependencies] python = "^3.11" -llama-agents = "^0.0.3" -llama-index-agent-openai = "^0.2.7" -llama-index-embeddings-openai = "^0.1.10" -llama-index-llms-openai = "^0.1.23" +llama-index-agent-openai = ">=0.3.0,<0.4.0" +llama-index = "^0.11.4" +fastapi = "^0.112.2" +python-dotenv = "^1.0.0" +uvicorn = { extras = ["standard"], version = "^0.23.2" } +cachetools = "^5.3.3" +aiostream = "^0.5.2" + +[tool.poetry.dependencies.docx2txt] +version = "^0.8" [build-system] requires = ["poetry-core"] -build-backend = "poetry.core.masonry.api" \ No newline at end of file +build-backend = "poetry.core.masonry.api" diff --git a/templates/types/streaming/nextjs/app/components/ui/chat/chat-message/chat-agent-events.tsx b/templates/types/streaming/nextjs/app/components/ui/chat/chat-message/chat-agent-events.tsx new file mode 100644 index 00000000..618dd064 --- /dev/null +++ b/templates/types/streaming/nextjs/app/components/ui/chat/chat-message/chat-agent-events.tsx @@ -0,0 +1,152 @@ +import { icons, LucideIcon } from "lucide-react"; +import { useMemo } from "react"; +import { Button } from "../../button"; +import { + Drawer, + DrawerClose, + DrawerContent, + DrawerHeader, + DrawerTitle, + DrawerTrigger, +} from "../../drawer"; +import { AgentEventData } from "../index"; +import Markdown from "./markdown"; + +const AgentIcons: Record = { + bot: icons.Bot, + researcher: icons.ScanSearch, + writer: icons.PenLine, + reviewer: icons.MessageCircle, +}; + +type MergedEvent = { + agent: string; + texts: string[]; + icon: LucideIcon; +}; + +export function ChatAgentEvents({ + data, + isFinished, +}: { + data: AgentEventData[]; + isFinished: boolean; +}) { + const events = useMemo(() => mergeAdjacentEvents(data), [data]); + return ( +
+
+ {events.map((eventItem, index) => ( + + ))} +
+
+ ); +} + +const MAX_TEXT_LENGTH = 150; + +function AgentEventContent({ + event, + isLast, + isFinished, +}: { + event: MergedEvent; + isLast: boolean; + isFinished: boolean; +}) { + const { agent, texts } = event; + const AgentIcon = event.icon; + return ( +
+
+
+ {isLast && !isFinished && ( +
+ + + + +
+ )} + +
+ {agent} +
+
    + {texts.map((text, index) => ( +
  • + {text.length <= MAX_TEXT_LENGTH && {text}} + {text.length > MAX_TEXT_LENGTH && ( +
    + {text.slice(0, MAX_TEXT_LENGTH)}... + + + Show more + + +
    + )} +
  • + ))} +
+
+ ); +} + +type AgentEventDialogProps = { + title: string; + content: string; + children: React.ReactNode; +}; + +function AgentEventDialog(props: AgentEventDialogProps) { + return ( + + {props.children} + + +
+ {props.title} +
+ + + +
+
+ +
+
+
+ ); +} + +function mergeAdjacentEvents(events: AgentEventData[]): MergedEvent[] { + const mergedEvents: MergedEvent[] = []; + + for (const event of events) { + const lastMergedEvent = mergedEvents[mergedEvents.length - 1]; + + if (lastMergedEvent && lastMergedEvent.agent === event.agent) { + // If the last event in mergedEvents has the same non-null agent, add the title to it + lastMergedEvent.texts.push(event.text); + } else { + // Otherwise, create a new merged event + mergedEvents.push({ + agent: event.agent, + texts: [event.text], + icon: AgentIcons[event.agent] ?? icons.Bot, + }); + } + } + + return mergedEvents; +} diff --git a/templates/types/streaming/nextjs/app/components/ui/chat/chat-message/index.tsx b/templates/types/streaming/nextjs/app/components/ui/chat/chat-message/index.tsx index b0f7e4a8..375b1d4c 100644 --- a/templates/types/streaming/nextjs/app/components/ui/chat/chat-message/index.tsx +++ b/templates/types/streaming/nextjs/app/components/ui/chat/chat-message/index.tsx @@ -5,6 +5,7 @@ import { Fragment } from "react"; import { Button } from "../../button"; import { useCopyToClipboard } from "../hooks/use-copy-to-clipboard"; import { + AgentEventData, ChatHandler, DocumentFileData, EventData, @@ -16,6 +17,7 @@ import { getAnnotationData, getSourceAnnotationData, } from "../index"; +import { ChatAgentEvents } from "./chat-agent-events"; import ChatAvatar from "./chat-avatar"; import { ChatEvents } from "./chat-events"; import { ChatFiles } from "./chat-files"; @@ -56,6 +58,10 @@ function ChatMessageContent({ annotations, MessageAnnotationType.EVENTS, ); + const agentEventData = getAnnotationData( + annotations, + MessageAnnotationType.AGENT_EVENTS, + ); const sourceData = getSourceAnnotationData(annotations); @@ -80,6 +86,16 @@ function ChatMessageContent({ ) : null, }, + { + order: -2, + component: + agentEventData.length > 0 ? ( + + ) : null, + }, { order: 2, component: contentFileData[0] ? ( diff --git a/templates/types/streaming/nextjs/app/components/ui/chat/index.ts b/templates/types/streaming/nextjs/app/components/ui/chat/index.ts index cacd5503..a9aba73e 100644 --- a/templates/types/streaming/nextjs/app/components/ui/chat/index.ts +++ b/templates/types/streaming/nextjs/app/components/ui/chat/index.ts @@ -12,6 +12,7 @@ export enum MessageAnnotationType { EVENTS = "events", TOOLS = "tools", SUGGESTED_QUESTIONS = "suggested_questions", + AGENT_EVENTS = "agent", } export type ImageData = { @@ -51,7 +52,11 @@ export type SourceData = { export type EventData = { title: string; - isCollapsed: boolean; +}; + +export type AgentEventData = { + agent: string; + text: string; }; export type ToolData = { @@ -75,6 +80,7 @@ export type AnnotationData = | DocumentFileData | SourceData | EventData + | AgentEventData | ToolData | SuggestedQuestionsData; diff --git a/templates/types/streaming/nextjs/app/globals.css b/templates/types/streaming/nextjs/app/globals.css index 0c2b9bdf..6fc9b1a4 100644 --- a/templates/types/streaming/nextjs/app/globals.css +++ b/templates/types/streaming/nextjs/app/globals.css @@ -94,4 +94,18 @@ radial-gradient(at 91% 36%, rgba(194, 213, 255, 0.68) 0, transparent 50%), radial-gradient(at 8% 40%, rgba(251, 218, 239, 0.46) 0, transparent 50%); } + + @keyframes fadeIn { + 0% { + opacity: 0; + } + 100% { + opacity: 1; + } + } + + .fadein-agent { + animation-name: fadeIn; + animation-duration: 1.5s; + } }