From 21d60b0d9cade7df449d3c90a538abfba1fef5be Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 17 Sep 2024 10:38:49 -0700 Subject: [PATCH] Add sync benchmarks --- libs/langgraph/bench/__main__.py | 39 ++++++++++++++++-- libs/langgraph/bench/fanout_to_subgraph.py | 48 ++++++++++++++++++++++ 2 files changed, 83 insertions(+), 4 deletions(-) diff --git a/libs/langgraph/bench/__main__.py b/libs/langgraph/bench/__main__.py index 37c8718b6..677e79440 100644 --- a/libs/langgraph/bench/__main__.py +++ b/libs/langgraph/bench/__main__.py @@ -5,14 +5,14 @@ from pyperf._runner import Runner from uvloop import new_event_loop -from bench.fanout_to_subgraph import fanout_to_subgraph +from bench.fanout_to_subgraph import fanout_to_subgraph, fanout_to_subgraph_sync from bench.react_agent import react_agent from bench.wide_state import wide_state from langgraph.checkpoint.memory import MemorySaver from langgraph.pregel import Pregel -async def run(graph: Pregel, input: dict): +async def arun(graph: Pregel, input: dict): len( [ c @@ -27,10 +27,26 @@ async def run(graph: Pregel, input: dict): ) +def run(graph: Pregel, input: dict): + len( + [ + c + for c in graph.stream( + input, + { + "configurable": {"thread_id": str(uuid4())}, + "recursion_limit": 1000000000, + }, + ) + ] + ) + + benchmarks = ( ( "fanout_to_subgraph_10x", fanout_to_subgraph().compile(checkpointer=None), + fanout_to_subgraph_sync().compile(checkpointer=None), { "subjects": [ random.choices("abcdefghijklmnopqrstuvwxyz", k=1000) for _ in range(10) @@ -40,6 +56,7 @@ async def run(graph: Pregel, input: dict): ( "fanout_to_subgraph_10x_checkpoint", fanout_to_subgraph().compile(checkpointer=MemorySaver()), + fanout_to_subgraph_sync().compile(checkpointer=MemorySaver()), { "subjects": [ random.choices("abcdefghijklmnopqrstuvwxyz", k=1000) for _ in range(10) @@ -49,6 +66,7 @@ async def run(graph: Pregel, input: dict): ( "fanout_to_subgraph_100x", fanout_to_subgraph().compile(checkpointer=None), + fanout_to_subgraph_sync().compile(checkpointer=None), { "subjects": [ random.choices("abcdefghijklmnopqrstuvwxyz", k=1000) for _ in range(100) @@ -58,6 +76,7 @@ async def run(graph: Pregel, input: dict): ( "fanout_to_subgraph_100x_checkpoint", fanout_to_subgraph().compile(checkpointer=MemorySaver()), + fanout_to_subgraph_sync().compile(checkpointer=MemorySaver()), { "subjects": [ random.choices("abcdefghijklmnopqrstuvwxyz", k=1000) for _ in range(100) @@ -67,26 +86,31 @@ async def run(graph: Pregel, input: dict): ( "react_agent_10x", react_agent(10, checkpointer=None), + react_agent(10, checkpointer=None), {"messages": [HumanMessage("hi?")]}, ), ( "react_agent_10x_checkpoint", react_agent(10, checkpointer=MemorySaver()), + react_agent(10, checkpointer=MemorySaver()), {"messages": [HumanMessage("hi?")]}, ), ( "react_agent_100x", react_agent(100, checkpointer=None), + react_agent(100, checkpointer=None), {"messages": [HumanMessage("hi?")]}, ), ( "react_agent_100x_checkpoint", react_agent(100, checkpointer=MemorySaver()), + react_agent(100, checkpointer=MemorySaver()), {"messages": [HumanMessage("hi?")]}, ), ( "wide_state_25x300", wide_state(300).compile(checkpointer=None), + wide_state(300).compile(checkpointer=None), { "messages": [ { @@ -102,6 +126,7 @@ async def run(graph: Pregel, input: dict): ( "wide_state_25x300_checkpoint", wide_state(300).compile(checkpointer=MemorySaver()), + wide_state(300).compile(checkpointer=MemorySaver()), { "messages": [ { @@ -117,6 +142,7 @@ async def run(graph: Pregel, input: dict): ( "wide_state_15x600", wide_state(600).compile(checkpointer=None), + wide_state(600).compile(checkpointer=None), { "messages": [ { @@ -132,6 +158,7 @@ async def run(graph: Pregel, input: dict): ( "wide_state_15x600_checkpoint", wide_state(600).compile(checkpointer=MemorySaver()), + wide_state(600).compile(checkpointer=MemorySaver()), { "messages": [ { @@ -147,6 +174,7 @@ async def run(graph: Pregel, input: dict): ( "wide_state_9x1200", wide_state(1200).compile(checkpointer=None), + wide_state(1200).compile(checkpointer=None), { "messages": [ { @@ -162,6 +190,7 @@ async def run(graph: Pregel, input: dict): ( "wide_state_9x1200_checkpoint", wide_state(1200).compile(checkpointer=MemorySaver()), + wide_state(1200).compile(checkpointer=MemorySaver()), { "messages": [ { @@ -179,5 +208,7 @@ async def run(graph: Pregel, input: dict): r = Runner() -for name, graph, input in benchmarks: - r.bench_async_func(name, run, graph, input, loop_factory=new_event_loop) +for name, agraph, graph, input in benchmarks: + r.bench_async_func(name, arun, agraph, input, loop_factory=new_event_loop) + if graph is not None: + r.bench_func(name + "_sync", run, graph, input) diff --git a/libs/langgraph/bench/fanout_to_subgraph.py b/libs/langgraph/bench/fanout_to_subgraph.py index 3c809a73b..6b0f52379 100644 --- a/libs/langgraph/bench/fanout_to_subgraph.py +++ b/libs/langgraph/bench/fanout_to_subgraph.py @@ -53,6 +53,54 @@ async def bump_loop(state: JokeOutput): return builder +def fanout_to_subgraph_sync() -> StateGraph: + class OverallState(TypedDict): + subjects: list[str] + jokes: Annotated[list[str], operator.add] + + def continue_to_jokes(state: OverallState): + return [Send("generate_joke", {"subject": s}) for s in state["subjects"]] + + class JokeInput(TypedDict): + subject: str + + class JokeOutput(TypedDict): + jokes: list[str] + + def bump(state: JokeOutput): + return {"jokes": [state["jokes"][0] + " a"]} + + def generate(state: JokeInput): + return {"jokes": [f"Joke about {state['subject']}"]} + + def edit(state: JokeInput): + subject = state["subject"] + return {"subject": f"{subject} - hohoho"} + + def bump_loop(state: JokeOutput): + return END if state["jokes"][0].endswith(" a" * 10) else "bump" + + # subgraph + subgraph = StateGraph(input=JokeInput, output=JokeOutput) + subgraph.add_node("edit", edit) + subgraph.add_node("generate", generate) + subgraph.add_node("bump", bump) + subgraph.set_entry_point("edit") + subgraph.add_edge("edit", "generate") + subgraph.add_edge("generate", "bump") + subgraph.add_conditional_edges("bump", bump_loop) + subgraph.set_finish_point("generate") + subgraphc = subgraph.compile() + + # parent graph + builder = StateGraph(OverallState) + builder.add_node("generate_joke", subgraphc) + builder.add_conditional_edges(START, continue_to_jokes) + builder.add_edge("generate_joke", END) + + return builder + + if __name__ == "__main__": import asyncio import random