diff --git a/backend/app/api/runs.py b/backend/app/api/runs.py index b35cf5d7..78022fbf 100644 --- a/backend/app/api/runs.py +++ b/backend/app/api/runs.py @@ -98,9 +98,10 @@ async def consume_astream() -> None: await streamer.send_stream.send(chunk) # hack: function messages aren't generated by chat model # so the callback handler doesn't know about them - message = chunk["messages"][-1] - if isinstance(message, FunctionMessage): - streamer.output[uuid4()] = ChatGeneration(message=message) + if chunk["messages"]: + message = chunk["messages"][-1] + if isinstance(message, FunctionMessage): + streamer.output[uuid4()] = ChatGeneration(message=message) except Exception as e: await streamer.send_stream.send(e) finally: diff --git a/backend/packages/agent-executor/agent_executor/permchain.py b/backend/packages/agent-executor/agent_executor/permchain.py index a472ac32..4ff7c83a 100644 --- a/backend/packages/agent-executor/agent_executor/permchain.py +++ b/backend/packages/agent-executor/agent_executor/permchain.py @@ -111,6 +111,10 @@ def get_agent_executor( agent_chain = agent | _create_agent_message | Channel.write_to("messages") def route_last_message(input: dict[str, bool | Sequence[AnyMessage]]) -> Runnable: + if not input["messages"]: + # no messages, do nothing + return RunnablePassthrough() + message: AnyMessage = input["messages"][-1] if isinstance(message.additional_kwargs.get("agent"), AgentFinish): # finished, do nothing @@ -137,7 +141,7 @@ def route_last_message(input: dict[str, bool | Sequence[AnyMessage]]) -> Runnabl return agent_chain executor = ( - Channel.subscribe_to(["messages", ReservedChannels.is_last_step]) + Channel.subscribe_to(["messages"]).join([ReservedChannels.is_last_step]) | route_last_message ) diff --git a/frontend/src/hooks/useStreamState.tsx b/frontend/src/hooks/useStreamState.tsx index 690b52f4..43a3186a 100644 --- a/frontend/src/hooks/useStreamState.tsx +++ b/frontend/src/hooks/useStreamState.tsx @@ -4,7 +4,7 @@ import { Message } from "./useChatList"; export interface StreamState { status: "inflight" | "error" | "done"; - messages: Message[]; + messages?: Message[]; run_id?: string; merge?: boolean; } @@ -50,13 +50,13 @@ export function useStreamState(): StreamStateProps { const { run_id } = JSON.parse(msg.data); setCurrent((current) => ({ status: "inflight", - messages: current?.messages ?? [], + messages: current?.messages, run_id: run_id, })); } else if (msg.event === "error") { setCurrent((current) => ({ status: "error", - messages: current?.messages ?? [], + messages: current?.messages, run_id: current?.run_id, })); } @@ -64,16 +64,18 @@ export function useStreamState(): StreamStateProps { onclose() { setCurrent((current) => ({ status: current?.status === "error" ? current.status : "done", - messages: current?.messages ?? [], + messages: current?.messages, run_id: current?.run_id, + merge: current?.merge, })); setController(null); }, onerror(error) { setCurrent((current) => ({ status: "error", - messages: current?.messages ?? [], + messages: current?.messages, run_id: current?.run_id, + merge: current?.merge, })); setController(null); throw error;