From 07820a03c7b06664890302767f7dbff87fb1e9f6 Mon Sep 17 00:00:00 2001 From: Saud <65331551+saudsami@users.noreply.github.com> Date: Sat, 28 Sep 2024 03:24:50 +0500 Subject: [PATCH] U270924 review updates (#1589) * step by step quickstart * updated imports, added parseargs * updated parseargs * updated codeblocks * updated code * updated testing * review updates --------- Co-authored-by: digitallysavvy --- shared/open-ai-integration/quickstart.mdx | 1247 ++++++++++++++------- 1 file changed, 866 insertions(+), 381 deletions(-) diff --git a/shared/open-ai-integration/quickstart.mdx b/shared/open-ai-integration/quickstart.mdx index 124e0f2d5..bbea06a86 100644 --- a/shared/open-ai-integration/quickstart.mdx +++ b/shared/open-ai-integration/quickstart.mdx @@ -25,417 +25,900 @@ This guide walks you through the core elements of the [Agora Conversational AI D 1. Create a new folder for the project: ``` - mkdir realtime-agent - cd realtime-agent/ + mkdir realtime_agent + cd realtime_agent/ ``` -1. Create the following structure for your project: +1. Create the base file structure: + + ``` + touch {__init__.py,.env,agent.py,parse_args.py,requirements.txt} + ``` + + Import the OpenAI Realtime API example code. + + + This project uses the OpenAI [`realtimeapi-examples`](https://openai.com/api/) package. Download the project and unzip it into your `realtime_agent` folder. + + + The project structure should look like this: ``` /realtime_agent - ├── __init__.py - ├── .env - ├── agent.py - ├── requirements.txt - └── realtimeapi - ├── __init__.py - ├── client.py - ├── messages.py - └── util.py + ├── __init__.py + ├── .env + ├── agent.py + ├── parse_args.py + ├── requirements.txt + └── realtimeapi + ├── __init__.py + ├── client.py + ├── messages.py + └── util.py + ``` + +1. Add the following dependencies to the `requirements.txt` file: + + ``` + aiohttp[speedups] + annotated-types==0.7.0 + anyio==4.4.0 + attrs==23.2.0 + black==24.4.2 + certifi==2024.7.4 + click==8.1.7 + distro==1.9.0 + frozenlist==1.4.1 + h11==0.14.0 + httpcore==1.0.5 + httpx==0.27.0 + idna==3.7 + iniconfig==2.0.0 + multidict==6.0.5 + mypy==1.10.1 + mypy-extensions==1.0.0 + numpy>=1.21.0 + openai==1.37.1 + packaging==24.1 + pathspec==0.12.1 + platformdirs==4.2.2 + pluggy==1.5.0 + protobuf==5.27.2 + psutil==5.9.8 + pydantic==2.8.2 + pydantic_core==2.20.1 + pyaudio>=0.2.11 + pydub==0.25.1 + pyee==12.0.0 + PyJWT==2.8.0 + pytest==8.2.2 + python-dotenv==1.0.1 + ruff==0.5.2 + sniffio==1.3.1 + sounddevice>=0.4.6 + tqdm==4.66.4 + types-protobuf==4.25.0.20240417 + typing_extensions==4.12.2 + watchfiles==0.22.0 + yarl==1.9.4 + agora-python-server-sdk>=2.0.0 ``` - - This project uses the OpenAI [`realtimeapi-examples`](https://openai.com/api/) package. Download the project and unzip it into your `realtime-agent` folder. - +1. Open the `.env` file and fill in the values for the environment variables: + + ```python + # Agora RTC app ID and app certificate + AGORA_APP_ID= + AGORA_APP_CERT= + + # OpenAI API key for authentication + OPENAI_API_KEY= + OPENAI_MODEL=gpt-4o-realtime-preview-2024-10-1 + ``` + +1. Create a virtual environment and activate it: + + ```bash + python3 -m venv venv && source venv/bin/activate + ``` + +1. Install the required dependencies: + + ```bash + pip install -r requirements.txt + ``` + +1. Install Agora realtime API: + + ```bash + pip3 install agora-realtime-ai-api + ``` + +Overview of key files: + +- `agent.py`: The main script responsible for executing the `RealtimeKitAgent`. It integrates Agora's functionality from the `rtc.py` module and OpenAI's capabilities from the `realtimeapi` package. +- `rtc.py`: Part of the `agora-realtime-ai-api` package, this file is used in `agent.py` and contains an AI-specific implementation of Agora's server-side Python Voice SDK. +- `realtimeapi/`: Contains the classes and methods that interact with OpenAI's Realtime API. + +The [complete code](#complete-integration-code) for `agent.py` is provided at the bottom of this page. + +## Implementation + +Before diving into the implementation details, it is essential to establish a solid foundation. Start by copying the base `agent.py` code provided below to the file. This includes the core structure and necessary imports for your agent. From there, we will proceed step by step to implement each function. + +```python +import abc +import asyncio +import base64 +import json +import logging +import os +from builtins import anext +from typing import Any, Callable, assert_never + +from agora.rtc.rtc_connection import RTCConnection, RTCConnInfo +from attr import dataclass +from dotenv import load_dotenv +from pydantic import BaseModel + +from realtimeapi import messages +from realtimeapi.client import RealtimeApiClient +from realtimeapi.util import SAMPLE_RATE, CHANNELS + +from agora_realtime_ai_api.rtc import Channel, ChatMessage, RtcEngine, RtcOptions +from .parse_args import parse_args_realtimekit + +logger = logging.getLogger(__name__) + +async def wait_for_remote_user(channel: Channel) -> int: + """Waits for a remote user to join the channel. + - Implement logic to handle user joining events. + - Set the result when a user joins or handle errors appropriately. + """ + pass + +@dataclass(frozen=True, kw_only=True) +class InferenceConfig: + """Data class for inference configuration. + - Populate with the necessary parameters for the agent's inference. + - Configure turn detection, system message, and voice parameters. + """ + system_message: str | None = None + turn_detection: messages.TurnDetection | None = None + voice: messages.Voices | None = None + +@dataclass(frozen=True, kw_only=True) +class LocalFunctionToolDeclaration: + """Declaration of a local tool that can be called by the model, and runs a function locally. + - Define the tool's name, description, parameters, and the function to be executed. + """ + name: str + description: str + parameters: dict[str, Any] + function: Callable[..., Any] + + def model_description(self) -> dict[str, Any]: + """Return the tool's model description for the agent.""" + pass + +@dataclass(frozen=True, kw_only=True) +class PassThroughFunctionToolDeclaration: + """Declaration of a tool that is called by the model, but is not executed locally. + - Define the tool's name, description, and parameters for pass-through tools. + """ + name: str + description: str + parameters: dict[str, Any] + + def model_description(self) -> dict[str, Any]: + """Return the model description for pass-through tools.""" + pass + +ToolDeclaration = LocalFunctionToolDeclaration | PassThroughFunctionToolDeclaration + +@dataclass(frozen=True, kw_only=True) +class LocalToolCallExecuted: + """Represents the execution of a tool locally. + - Store and encode the output of the tool call as a JSON string. + """ + json_encoded_output: str + +@dataclass(frozen=True, kw_only=True) +class ShouldPassThroughToolCall: + """Handles a pass-through tool call. + - Define how the decoded function arguments are handled. + """ + decoded_function_args: dict[str, Any] + +ExecuteToolCallResult = LocalToolCallExecuted | ShouldPassThroughToolCall + +class ToolContext(abc.ABC): + """Represents the tool context for registering and executing tools. + - Implement logic for registering both local and pass-through tools. + - Provide methods for executing tools and returning results. + """ + _tool_declarations: dict[str, ToolDeclaration] + + def __init__(self) -> None: + """Initialize tool context and declarations.""" + pass + + def register_function( + self, *, name: str, description: str = "", parameters: dict[str, Any], fn: Callable[..., Any] + ) -> None: + """Register a local function tool with the provided parameters.""" + pass + + def register_client_function( + self, *, name: str, description: str = "", parameters: dict[str, Any] + ) -> None: + """Register a client function tool.""" + pass + + async def execute_tool(self, tool_name: str, encoded_function_args: str) -> ExecuteToolCallResult | None: + """Execute the given tool and return the result. + - Implement logic to call the appropriate tool based on the name and arguments. + """ + pass + + def model_description(self) -> list[dict[str, Any]]: + """Return the model descriptions of all registered tools.""" + pass + +class ClientToolCallResponse(BaseModel): + """Model for the response of a client tool call. + - Define how tool call responses are stored and handled. + """ + tool_call_id: str + result: dict[str, Any] | str | float | int | bool | None = None + +@dataclass(frozen=True, kw_only=True) +class RTCConfigure: + """Configuration for RTC (Real-Time Communication). + - Populate with the Agora APP_ID and TOKEN to configure RTC communication. + """ + APP_ID: str + TOKEN: str = "" + +class RealtimeKitAgent: + """Represents the agent responsible for handling real-time communication and tool interactions.""" + engine: RtcEngine + channel: Channel + client: RealtimeApiClient + audio_queue: asyncio.Queue[bytes] = asyncio.Queue() + message_queue: asyncio.Queue[messages.ResponseAudioTranscriptDelta] = asyncio.Queue() + message_done_queue: asyncio.Queue[messages.ResponseAudioTranscriptDone] = asyncio.Queue() + tools: ToolContext | None = None + + _client_tool_futures: dict[str, asyncio.Future[ClientToolCallResponse]] + + @classmethod + async def setup_and_run_agent(cls, *, engine: RtcEngine, options: RtcOptions, inference_config: InferenceConfig, tools: ToolContext | None) -> None: + """Set up and run the agent. + - Initialize the RTC engine, connect to the channel, and configure the inference setup. + - Implement the setup and teardown logic for the agent. + """ + pass + + @classmethod + async def entry_point(cls, *, engine: RtcEngine, options: RtcOptions, inference_config: InferenceConfig, tools: ToolContext | None = None) -> None: + """Entry point for running the agent. + - This method should initialize and start the agent's main workflow. + """ + pass + + def __init__(self, *, client: RealtimeApiClient, tools: ToolContext | None, channel: Channel) -> None: + """Initialize the RealtimeKitAgent with the given client, tools, and channel. + - Store necessary attributes like client, tools, and channel for agent operation. + """ + pass + + async def run(self) -> None: + """Run the agent's main loop, handling audio streams and messages. + - Implement logic for processing audio input, handling model messages, and managing the user session. + """ + pass + + async def _stream_input_audio_to_model(self) -> None: + """Stream input audio to the model. + - Implement logic to capture audio from the Agora channel and send it to the model. + """ + pass + + async def _stream_audio_queue_to_audio_output(self) -> None: + """Stream audio from the queue to the audio output. + - Implement logic to retrieve audio from the queue and push it to the Agora channel. + """ + pass + + async def _process_model_messages(self) -> None: + """Process incoming messages from the model. + - Implement logic to handle and respond to different message types from the model. + """ + pass + +async def shutdown(loop, signal=None): + """Gracefully shut down the application. + - Implement logic to cancel outstanding tasks and stop the asyncio event loop. + """ + pass + +if __name__ == "__main__": + load_dotenv() + options = parse_args_realtimekit() + + if not os.environ.get("AGORA_APP_ID"): + raise ValueError("Need to set environment variable AGORA_APP_ID") + + if not os.environ.get("AGORA_APP_CERT"): + raise ValueError("Need to set environment variable AGORA_APP_CERT") + + if not os.environ.get("OPENAI_API_KEY"): + raise ValueError("Need to set environment variable OPENAI_API_KEY") + + if not os.environ.get("OPENAI_MODEL"): + raise ValueError("Need to set environment variable OPENAI_MODEL") + + asyncio.run( + RealtimeKitAgent.entry_point( + engine=RtcEngine(appid=os.environ.get("AGORA_APP_ID"), appcert=os.environ.get("AGORA_APP_CERT")), + options=RtcOptions( + channel_name=options['channel_name'], + uid=options['uid'], + sample_rate=SAMPLE_RATE, + channels=CHANNELS, + ), + inference_config=InferenceConfig( + system_message="""You are a helpful assistant...""", + voice=messages.Voices.Alloy, + turn_detection=messages.ServerVAD(threshold=0.5, prefix_padding_ms=500, suffix_padding_ms=200), + ), + ) + ) +``` + +### Entry point + +The execution of `agent.py` begins with the `__main__` section, which initializes the Agora `RtcEngine`, loads environment variables, and parses configuration options. The real work begins when `RealtimeKitAgent.entry_point()` is called, which starts the agent by setting up communication between Agora and OpenAI. + +The following sections explain how the `RealtimeKitAgent` connects to Agora and OpenAI and manages the audio streams and AI interaction. + +### RealtimeKitAgent + +The `RealtimeKitAgent` class integrates Agora's real-time audio communication with OpenAI’s AI services. It handles streaming, communication with the OpenAI API, and AI responses, creating a seamless conversational experience. + +### Connect to Agora and OpenAI + +The `setup_and_run_agent` method connects to an Agora channel using `RtcEngine`, and sets up a session with OpenAI’s Realtime API. It configures the session parameters, such as system messages and voice settings, and uses asynchronous tasks to concurrently listen for the session to start and update the conversation configuration. In the base `agent.py` file, replace the placeholder with the following implementation. + +```python + @classmethod + async def setup_and_run_agent( + cls, + *, + engine: RtcEngine, + options: RtcOptions, + inference_config: InferenceConfig, + tools: ToolContext | None, + ) -> None: + channel = engine.create_channel(options) + await channel.connect() + + try: + async with RealtimeApiClient( + base_uri="wss://api.openai.com", + api_key=os.getenv("OPENAI_API_KEY"), + verbose=False, + ) as client: + await client.send_message( + messages.SessionUpdate( + session=messages.SessionUpdateParams( + turn_detection=inference_config.turn_detection, + tools=tools.model_description() if tools else None, + tool_choice="auto", + instructions=inference_config.system_message, + ) + ) + ) + + [start_session_message, _] = await asyncio.gather( + anext(client.listen()), + client.send_message( + messages.UpdateConversationConfig( + system_message=inference_config.system_message, + output_audio_format=messages.AudioFormats.PCM16, + voice=inference_config.voice, + tools=tools.model_description() if tools else None, + transcribe_input=False, + ) + ) + ) + logger.info( + f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}" + ) + + agent = cls( + client=client, + tools=tools, + channel=channel, + ) + await agent.run() + finally: + engine.destroy() +``` + +### Initialize the RealtimeKitAgent + +The constructor for `RealtimeKitAgent` sets up the OpenAI client, optional tools, and Agora channel to manage real-time audio communication. In `agent.py`, add the following code after the `setup_and_run_agent` method: + +```python +@classmethod + async def entry_point( + cls, + *, + engine: RtcEngine, + options: RtcOptions, + inference_config: InferenceConfig, + tools: ToolContext | None = None, + ) -> None: + await cls.setup_and_run_agent( + engine=engine, options=options, inference_config=inference_config, tools=tools + ) + + def __init__( + self, + *, + client: RealtimeApiClient, + tools: ToolContext | None, + channel: Channel, + ) -> None: + self.client = client + self.tools = tools + self._client_tool_futures = {} + self.channel = channel + self.subscribe_user = None +``` + +### Launch the Agent + +The `run` method is the core of the `RealtimeKitAgent`. It manages the agent’s operations by handling audio streams, subscribing to remote users, and processing both incoming and outgoing messages. This method also ensures proper exception handling and graceful shutdown. Following are the key functions of this method: + +- **Waiting for Remote Users**: The agent waits for a remote user to join the Agora channel and subscribes to their audio stream. +- **Task Management**: The agent initiates tasks for audio input, audio output, and processing messages from OpenAI, ensuring that they run concurrently. +- **Connection State Handling**: It monitors changes in connection state and handles user disconnections, ensuring the agent shuts down gracefully. + +After the `def __init__`, method, add in the following `run` method to `agent.py`: + +```python + async def run(self) -> None: + try: + def log_exception(t: asyncio.Task[Any]) -> None: + if not t.cancelled() and t.exception(): + logger.error( + "unhandled exception", + exc_info=t.exception(), + ) + + logger.info("Waiting for remote user to join") + self.subscribe_user = await wait_for_remote_user(self.channel) + logger.info(f"Subscribing to user {self.subscribe_user}") + await self.channel.subscribe_audio(self.subscribe_user) + + async def on_user_left(agora_rtc_conn: RTCConnection, user_id: int, reason: int): + logger.info(f"User left: {user_id}") + if self.subscribe_user == user_id: + self.subscribe_user = None + logger.info("Subscribed user left, disconnecting") + await self.channel.disconnect() + + self.channel.on("user_left", on_user_left) + + disconnected_future = asyncio.Future[None]() + + def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): + logger.info(f"Connection state changed: {conn_info.state}") + if conn_info.state == 1: + if not disconnected_future.done(): + disconnected_future.set_result(None) + + self.channel.on("connection_state_changed", callback) + + asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback( + log_exception + ) + asyncio.create_task( + self._stream_audio_queue_to_audio_output() + ).add_done_callback(log_exception) + + asyncio.create_task(self._process_model_messages()).add_done_callback( + log_exception + ) + + await disconnected_future + logger.info("Agent finished running") + except asyncio.CancelledError: + logger.info("Agent cancelled") +``` + +### Audio Streaming to the AI model + +The `RealtimeKitAgent` is responsible for managing real-time audio communication between Agora and OpenAI’s AI model. It implements this through two core streaming methods: + +- `_stream_input_audio_to_model`: Captures audio frames from the Agora channel and streams them to OpenAI’s model for processing. +- `_stream_audio_queue_to_audio_output`: Handles the output by pushing audio responses from OpenAI’s model back to the Agora channel for playback. + +Additionally, the agent must process messages received from the OpenAI model. This is handled by the `_process_model_messages` method. + +#### Stream input audio to the AI model + +The `_stream_input_audio_to_model` method captures audio frames from the Agora channel and streams them to OpenAI’s model for processing. This method runs continuously, retrieving audio frames from the subscribed user and forwarding them through the OpenAI API. + +The code implements the following key features: + +- **Subscription check**: Ensures that a remote user is subscribed before capturing any audio frames. +- **Audio frame processing**: Sends each audio frame from the Agora channel to OpenAI’s model. +- **Error handling**: Logs any errors that occur during the audio streaming process. + +Add the following `_stream_input_audio_to_model` method just below the the `run` method in `agent.py`: + +```python + async def _stream_input_audio_to_model(self) -> None: + # Wait until a remote user is subscribed before capturing audio + while self.subscribe_user is None: + await asyncio.sleep(0.1) + + # Capture audio frames from the Agora channel for the subscribed user + audio_frames = self.channel.get_audio_frames(self.subscribe_user) + + async for audio_frame in audio_frames: + try: + # Stream the audio frame to OpenAI’s model via the API client + await self.client.send_audio_data(audio_frame.data) + except Exception as e: + logger.error(f"Error sending audio data to model: {e}") +``` + +#### Stream audio queue to audio output + +The `_stream_audio_queue_to_audio_output` method streams audio generated by OpenAI’s model back to the Agora channel. It retrieves audio data from an internal queue and pushes it to Agora for real-time playback. + +The code implements the following key features: + +- **Audio queue management**: Retrieves audio data from an asynchronous queue filled with responses from OpenAI’s model. +- **Efficient task management**: After processing each audio frame, the method yields control to ensure other tasks can run concurrently. +- **Real-time playback**: Audio data is pushed to the Agora channel for immediate playback to the user. + +Add the following `_stream_audio_queue_to_audio_output` method after the `_stream_input_audio_to_model` method: + +```python + async def _stream_audio_queue_to_audio_output(self) -> None: + while True: + # Retrieve audio data from the queue and send it to the Agora channel + frame = await self.audio_queue.get() + await self.channel.push_audio_frame(frame) + + # Yield control to allow other tasks to execute efficiently + await asyncio.sleep(0) +``` + +#### Process model messages + +In addition to handling audio streaming, the agent must process messages received from the OpenAI model. The `_process_model_messages` method listens for these messages and takes appropriate actions based on the type of message, such as audio responses, transcripts, and various model-generated outputs. + +The code implements the following key features: + +- **Message handling**: The method listens for various message types, including audio data, text transcripts, and other outputs, and processes them accordingly. +- **Queue management**: For audio messages, the data is decoded and placed in the audio queue for playback. +- **Real-time response**: Text messages and other outputs are immediately sent back to the Agora chat. + +Add the following `_process_model_messages` method after the `_stream_audio_queue_to_audio_output` method: + +```python +async def _process_model_messages(self) -> None: + # Continuously listen for incoming messages from OpenAI + async for message in self.client.listen(): + match message: + # Handle different message types +``` + +### Audio and message flow + +The agent manages real-time audio and message flow between Agora and OpenAI as follows: + +- `_stream_input_audio_to_model`: Continuously captures audio from the Agora channel and streams it to OpenAI. +- `_stream_audio_queue_to_audio_output`: Retrieves audio responses from OpenAI and plays them back in real-time. +- `_process_model_messages`: Listens for and processes various message types, such as audio and transcripts and ensures timely delivery to the Agora channel. + +### Message processing in `RealtimeKitAgent` + +The message processing logic is central to how the agent interacts with OpenAI’s model and the Agora channel. Messages received from the model can include audio data, text transcripts, or other responses, and the agent needs to process these accordingly to ensure smooth real-time communication. + +The `_process_model_messages` method listens for incoming messages and handles them according to their type, ensuring the appropriate action is taken, such as playing back audio, sending text transcripts, or invoking tools. + +The code implements the following key features: + +- **Listening for messages**: The agent continuously listens for incoming messages from OpenAI’s model. +- **Handling audio data**: If the message contains audio data, it is placed in a queue for playback to the Agora channel. +- **Handling transcripts**: If the message contains partial or final text transcripts, they are processed and sent to the Agora chat. +- **Handling other responses**: Additional message types, such as tool invocations and other outputs are processed as needed. + +#### Handling Text Transcripts + +The agent receives partial or completed text transcripts. These are identified and handled by their message types: + +- `ResponseAudioTranscriptDelta`: Represents partial transcripts. +- `ResponseAudioTranscriptDone`: Indicates a completed transcript. - Overview of key files: +For both types, the agent sends the transcript to the Agora chat as a message. - - `agent.py`: The primary script responsible for executing the `RealtimeKitAgent`. It integrates Agora's functionality from the `rtc.py` module and OpenAI's capabilities from the `realtimeapi` package. - - `rtc.py`: Contains an implementation of the server-side Agora Python Voice SDK. - - `realtimeapi/`: Contains the classes and methods that interact with OpenAI's Realtime API. +```python +case messages.ResponseAudioTranscriptDelta(): + # Handle partial text transcript, log it and send to Agora chat + logger.info(f"Received transcript delta: {message=}") + await self.channel.chat.send_message( + ChatMessage(message=message.model_dump_json(), msg_id=message.item_id) + ) - The [complete code](#complete-integration-code) for `agent.py` is provided at the bottom of this page. +case messages.ResponseAudioTranscriptDone(): + # Handle completed transcript and send final message to Agora chat + logger.info(f"Transcript completed: {message=}") + await self.channel.chat.send_message( + ChatMessage(message=message.model_dump_json(), msg_id=message.item_id) + ) +``` -1. Add the following dependencies to the `requirements.txt` file: +#### Handling Other Responses - ``` - aiohttp[speedups] - annotated-types==0.7.0 - anyio==4.4.0 - attrs==23.2.0 - black==24.4.2 - certifi==2024.7.4 - click==8.1.7 - distro==1.9.0 - frozenlist==1.4.1 - h11==0.14.0 - httpcore==1.0.5 - httpx==0.27.0 - idna==3.7 - iniconfig==2.0.0 - multidict==6.0.5 - mypy==1.10.1 - mypy-extensions==1.0.0 - numpy>=1.21.0 - openai==1.37.1 - packaging==24.1 - pathspec==0.12.1 - platformdirs==4.2.2 - pluggy==1.5.0 - protobuf==5.27.2 - psutil==5.9.8 - pydantic==2.8.2 - pydantic_core==2.20.1 - pyaudio>=0.2.11 - pydub==0.25.1 - pyee==12.0.0 - PyJWT==2.8.0 - pytest==8.2.2 - python-dotenv==1.0.1 - ruff==0.5.2 - sniffio==1.3.1 - sounddevice>=0.4.6 - tqdm==4.66.4 - types-protobuf==4.25.0.20240417 - typing_extensions==4.12.2 - watchfiles==0.22.0 - yarl==1.9.4 - agora-python-server-sdk>=2.0.0 - agora-realtime-ai-api==1.0.2 - ``` +The agent handles a variety of other message types from OpenAI’s model. These include tool calls, errors, or other output from the model. In the event of an unhandled message type, the agent logs a warning for further investigation. -1. Create the `.env` file and fill in the values for the environment variables: +```python +case _: + # Handle other types of messages or unrecognized types + logger.warning(f"Unhandled message type: {message=}") +``` - ```python - # Agora RTC app ID and app certificate - AGORA_APP_ID= - AGORA_APP_CERT= +Add the following complete `_process_model_messages` method to the `agent.py` file, after the `_stream_audio_queue_to_audio_output` method: - # OpenAI API key for authentication - OPENAI_API_KEY= - ``` +```python + async def _process_model_messages(self) -> None: + # Continuously listen for messages from OpenAI's model + async for message in self.client.listen(): + match message: + case messages.ResponseAudioDelta(): + # Handle audio response and add it to the audio queue for playback + await self.audio_queue.put(base64.b64decode(message.delta)) -1. Create a virtual environment and activate it: - ```bash - python3 -m venv venv && source venv/bin/activate - ``` + case messages.ResponseAudioTranscriptDelta(): + # Handle partial text transcript, log it and send to Agora chat + logger.info(f"Received transcript delta: {message=}") + await self.channel.chat.send_message( + ChatMessage(message=message.model_dump_json(), msg_id=message.item_id) + ) -1. Install the required dependencies: - ```bash - pip install -r requirements.txt - ``` + case messages.ResponseAudioTranscriptDone(): + # Handle completed transcript and send final message to Agora chat + logger.info(f"Transcript completed: {message=}") + await self.channel.chat.send_message( + ChatMessage(message=message.model_dump_json(), msg_id=message.item_id) + ) + # Other messages that can be handled + case messages.InputAudioBufferSpeechStarted() | messages.InputAudioBufferSpeechStopped() | messages.InputAudioBufferCommitted() | messages.ItemCreated() | messages.ResponseCreated() | messages.ResponseOutputItemAdded() | messages.ResponseContenPartAdded() | messages.ResponseAudioDone() | messages.ResponseContentPartDone() | messages.ResponseOutputItemDone(): + pass -1. Install Agora realtime API: + # Handle other types of messages or unrecognized message types + case _: + logger.warning(f"Unhandled message type: {message=}") +``` - ```bash - pip3 install agora-realtime-ai-api - ``` +Using these components, the agent handles audio, transcripts, and other messages in real-time, ensuring that it responds appropriately to OpenAI model’s output and maintain seamless communication with the Agora channel. +### Tool Management -## Implementation +The `RealtimeKitAgent`'s tool management system enables it to extend its functionality by allowing OpenAI’s model to invoke specific tools. These tools can either run locally or pass data back to the model for further processing. By registering tools and executing them based on incoming messages, the agent adds the capability and flexibility to handling a variety of tasks. -The `RealtimeKitAgent` class integrates Agora's audio communication capabilities with OpenAI's AI services. This class manages audio streams, handles communication with the OpenAI API, and processes AI-generated responses, providing a seamless conversational AI experience. +The agent implements the following key features: -### Connect to Agora and OpenAI +- **Tool registration**: The agent registers both local function tools and pass-through tools, making them available for execution. +- **Tool execution**: The agent executes tools in response to requests from the OpenAI model, running them locally or passing data back to the model. +- **Tool context**: The `ToolContext` class manages the tools, providing methods to register and execute them as needed. -The `setup_and_run_agent` method sets up the `RealtimeKitAgent` by connecting to an Agora channel using the provided `RtcEngine` and initializing a session with the OpenAI Realtime API client. It sends configuration messages to set up the session and define conversation parameters, such as the system message and output audio format, before starting the agent's operations. The method uses asynchronous execution to handle both listening for the session start and sending conversation configuration updates concurrently. It ensures that the connection is properly managed and cleaned up after use, even in cases of exceptions, early exits, or shutdowns. +#### Tool Registration - - UIDs in the Python SDK are set using a string value. Agora recommends using only numerical values for UID strings to ensure compatibility - with all Agora products and extensions. - +The agent allows tools to be registered under two categories: -```python -@classmethod -async def setup_and_run_agent( - cls, - *, - engine: RtcEngine, - options: RtcOptions, - inference_config: InferenceConfig, - tools: ToolContext | None, -) -> None: - # Create and connect to an Agora channel - channel = engine.create_channel(options) - await channel.connect() +- **Local function tools**: Executed directly by the agent on the local context. +- **Pass-through tools**: These tools send data back to OpenAI’s model without it being executed locally. - try: - # Initialize the OpenAI Realtime API client - async with RealtimeApiClient( - base_uri="wss://api.openai.com", - api_key=os.getenv("OPENAI_API_KEY"), - verbose=False, - ) as client: - # Update the session configuration - await client.send_message( - messages.SessionUpdate( - session=messages.SessionUpdateParams( - turn_detection=inference_config.turn_detection, - tools=tools.model_description() if tools else None, - tool_choice="auto", - instructions=inference_config.system_message, - ) - ) - ) +The agent registers these tools during its setup process, making them available for the model to call. - # Concurrently wait for the session to start and update the conversation config - [start_session_message, _] = await asyncio.gather( - *[ - anext(client.listen()), - client.send_message( - messages.UpdateConversationConfig( - system_message=inference_config.system_message, - output_audio_format=messages.AudioFormats.PCM16, - voice=inference_config.voice, - tools=tools.model_description() if tools else None, - transcribe_input=False, - ) - ), - ] - ) - logger.info( - f"Session started: {start_session_message.session.id} model: {start_session_message.session.model}" - ) +```python +class ToolContext(abc.ABC): + """Represents the tool context for registering and executing tools. + - Implement logic for registering both local and pass-through tools. + - Provide methods for executing tools and returning results. + """ + _tool_declarations: dict[str, ToolDeclaration] - # Create and run the RealtimeKitAgent - agent = cls( - client=client, - tools=tools, - channel=channel, - ) - await agent.run() + def __init__(self) -> None: + # Initializes the tool context with an empty dictionary for storing tools + self._tool_declarations = {} + + def register_function( + self, *, name: str, description: str = "", parameters: dict[str, Any], fn: Callable[..., Any] + ) -> None: + """Register a local function tool.""" + self._tool_declarations[name] = LocalFunctionToolDeclaration( + name=name, description=description, parameters=parameters, function=fn + ) - finally: - # Ensure the Agora engine is destroyed, even if an exception occurs - engine.destroy() + def register_client_function( + self, *, name: str, description: str = "", parameters: dict[str, Any] + ) -> None: + """Register a pass-through tool that returns data to the model for further processing.""" + self._tool_declarations[name] = PassThroughFunctionToolDeclaration( + name=name, description=description, parameters=parameters + ) ``` -### Initialize the RealtimeKitAgent +#### Tool Execution -The `RealtimeKitAgent` class constructor accepts an OpenAI `RealtimeApiClient`, an optional `ToolContext` for function registration, and an Agora channel for managing audio communication. This setup initializes the agent to process audio streams, registers tools (if provided), and interacts with the AI model. +Once tools are registered, the agent can execute them in response to messages from OpenAI’s model. The agent listens for tool call requests and either executes the tool locally or passes data back to the model. + +The `execute_tool` method retrieves the tool by name and runs it with the provided arguments. If it is a local function tool, the agent executes the function and returns the result. If it is a pass-through tool, it simply returns the decoded arguments to the model for further processing. ```python -def __init__( - self, - *, - client: RealtimeApiClient, - tools: ToolContext | None, - channel: Channel, -) -> None: - self.client = client # OpenAI Realtime API client - self.tools = tools # Optional tool context for function registration - self._client_tool_futures = {} # For managing asynchronous tool calls - self.channel = channel # Agora channel for audio communication - self.subscribe_user = None # Will store the user ID we're subscribing to + async def execute_tool(self, tool_name: str, encoded_function_args: str) -> ExecuteToolCallResult | None: + # Retrieve the tool by its name + tool = self._tool_declarations.get(tool_name) + if not tool: + return None + + # Decode the function arguments + args = json.loads(encoded_function_args) + assert isinstance(args, dict) + + # Check if the tool is a locally-executed function + if isinstance(tool, LocalFunctionToolDeclaration): + logger.info(f"Executing tool {tool_name} with args {args}") + result = await tool.function(**args) + logger.info(f"Tool {tool_name} executed with result {result}") + return LocalToolCallExecuted(json_encoded_output=json.dumps(result)) + + # If the tool is pass-through, return the decoded arguments to the model + if isinstance(tool, PassThroughFunctionToolDeclaration): + return ShouldPassThroughToolCall(decoded_function_args=args) + + # If the tool type is unrecognized, raise an assertion error + assert_never(tool) ``` -### Launch the Agent +#### Tool context -The `run` method orchestrates the main operations of the `RealtimeKitAgent`. It manages audio streaming, processes tasks related to audio input, output, and model messages, and ensures exception handling is in place. +The `ToolContext` class manages all available tools. It provides the logic for both registering tools and executing them when requested by the OpenAI model. -```python -async def run(self) -> None: - try: - # Helper function to log unhandled exceptions in tasks - def log_exception(t: asyncio.Task[Any]) -> None: - if not t.cancelled() and t.exception(): - logger.error( - "unhandled exception", - exc_info=t.exception(), - ) - - logger.info("Waiting for remote user to join") - # Wait for a remote user to join the channel - self.subscribe_user = await wait_for_remote_user(self.channel) - logger.info(f"Subscribing to user {self.subscribe_user}") - # Subscribe to the audio of the joined user - await self.channel.subscribe_audio(self.subscribe_user) - - # Handle user leaving the channel - async def on_user_left(agora_rtc_conn: RTCConnection, user_id: int, reason: int): - logger.info(f"User left: {user_id}") - if self.subscribe_user == user_id: - self.subscribe_user = None - logger.info("Subscribed user left, disconnecting") - await self.channel.disconnect() - - self.channel.on("user_left", on_user_left) - - # Set up a future to track when the agent should disconnect - disconnected_future = asyncio.Future[None]() - - # Handle connection state changes - def callback(agora_rtc_conn: RTCConnection, conn_info: RTCConnInfo, reason): - logger.info(f"Connection state changed: {conn_info.state}") - if conn_info.state == 1: # Disconnected state - if not disconnected_future.done(): - disconnected_future.set_result(None) - - self.channel.on("connection_state_changed", callback) - - # Start tasks for streaming audio and processing messages - asyncio.create_task(self._stream_input_audio_to_model()).add_done_callback( - log_exception - ) - asyncio.create_task( - self._stream_audio_queue_to_audio_output() - ).add_done_callback(log_exception) - asyncio.create_task(self._process_model_messages()).add_done_callback( - log_exception - ) +The `model_description` method generates a description of all registered tools, which is passed back to the model so it knows what tools are available for invocation. - # Wait until the agent is disconnected - await disconnected_future - logger.info("Agent finished running") - except asyncio.CancelledError: - logger.info("Agent cancelled") +```python + def model_description(self) -> list[dict[str, Any]]: + # Returns a description of all registered tools, making them available for the model + return [v.model_description() for v in self._tool_declarations.values()] ``` +### Tool invocation in message processing + +It is important to highlight how tools are invoked. During message processing, certain messages may trigger tool invocations, prompting the agent to execute the relevant tool. + +The following flow illustrates how this works: + +1. The OpenAI model sends a message that includes a tool call. +2. The `_process_model_messages` method identifies the tool call request. +3. The agent retrieves the relevant tool from the `ToolContext` and executes it, either locally or by passing data back to the model. + +This integration between **message processing** and **tool management** ensures that the agent can extend its capabilities dynamically, performing tasks or calculations in real-time based on incoming requests. + +With these pieces in place, the agent can effectively manage tool registration and execution, ensuring that it can handle a variety of tasks as directed by the OpenAI model. This structure allows the agent to either execute functions locally or pass them to the model for further handling. -### Stream input audio to the AI model +### Wait for a remote user -The `_stream_input_audio_to_model` method captures audio frames from the Agora channel and sends them to the OpenAI API client for real-time processing by the AI model. +The `wait_for_remote_user` function is a key component of the agent's functionality. It listens for an event where a remote user joins the Agora channel. This function will block until a user joins or until it times out after 60 seconds. + +The method implements the following: + +- **Event listener**: The function listens for the `user_joined` event from the Agora SDK. When a user joins the channel, it captures the user ID and signals that a user has joined using `remote_user_joined.set()`. +- **Timeout handling**: If no user joins within 60 seconds, a `TimeoutError` is raised, which is logged as an error. +- **Cleanup**: After successfully getting a user ID or timing out, the event listener is removed using `channel.off("user_joined", on_user_joined)`. + +In `agent.py`, replace the placeholder code with: ```python -async def _stream_input_audio_to_model(self) -> None: - # Wait until we have a subscribed user - while self.subscribe_user is None: - await asyncio.sleep(0.1) - # Get the audio frame stream for the subscribed user - audio_frames = self.channel.get_audio_frames(self.subscribe_user) - async for audio_frame in audio_frames: - try: - # Send the audio frame to the OpenAI model via the API client - await self.client.send_audio_data(audio_frame.data) - except Exception as e: - logger.error(f"Error sending audio data to model: {e}") +async def wait_for_remote_user(channel: Channel) -> int: + """Waits for a remote user to join the channel.""" + remote_users = list(channel.remote_users.keys()) + if len(remote_users) > 0: + return remote_users[0] + + future = asyncio.Future[int]() + + channel.once("user_joined", lambda conn, user_id: future.set_result(user_id)) + + try: + remote_user = await future + return remote_user + except Exception as e: + logger.error(f"Error waiting for remote user: {e}") + raise ``` +### Shutdown gracefully -### Stream audio from the AI model to the user +The `shutdown` function gracefully cancels running tasks and stopping the event loop. This prevents tasks from hanging and ensures resources are properly released. -The `_stream_audio_queue_to_audio_output` method handles the playback of processed audio data from the AI model. It retrieves audio frames from a queue and sends them to the Agora channel, allowing users to hear AI-generated responses in real-time. +When the agent receives a shutdown signal or encounters an error, the following method is executed: ```python -async def _stream_audio_queue_to_audio_output(self) -> None: - while True: - # Get the next audio frame from the queue (contains audio data from the model) - frame = await self.audio_queue.get() - # Send the frame to the Agora channel for playback to the user - await self.channel.push_audio_frame(frame) - await asyncio.sleep(0) # Allow other tasks to run +async def shutdown(loop, signal=None): + """Gracefully shut down the application.""" + if signal: + logger.info(f"Received exit signal {signal.name}...") + + # Retrieve all running tasks excluding the current task + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + + # Log the number of tasks being cancelled + logger.info(f"Cancelling {len(tasks)} outstanding tasks") + + for task in tasks: + task.cancel() + + # Wait for the tasks to be cancelled + await asyncio.gather(*tasks, return_exceptions=True) + + # Stop the event loop + loop.stop() ``` -### Process model messages +### Add model to RealtimeApiClient -The `_process_model_messages` method listens for messages from the OpenAI API client and processes them based on their type. It handles a variety of message types, including audio deltas, transcriptions, and errors. +Modify the `realtimeapi/client.py` file to include the model name as part of the `self.url`. This ensures the appropriate model is used when interacting with the OpenAI API. + +Update the `self.url` definition as follows: ```python -async def _process_model_messages(self) -> None: - # Listen for incoming messages from the OpenAI API client - async for message in self.client.listen(): - # Process each type of message received from the client - match message: - case messages.ResponseAudioDelta(): - # Process incoming audio data from the model - await self.audio_queue.put(base64.b64decode(message.delta)) - - case messages.ResponseAudioTranscriptDelta(): - # Handle incoming transcription updates - logger.info(f"Received text message {message=}") - await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id)) - - case messages.ResponseAudioTranscriptDone(): - # Handle completed transcriptions - logger.info(f"Text message done: {message=}") - await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id)) - - case messages.InputAudioBufferSpeechStarted(): - # Handle the start of speech in the input audio - pass - case messages.InputAudioBufferSpeechStopped(): - # Handle the end of speech in the input audio - pass - case messages.InputAudioBufferCommitted(): - # Handle when an input audio buffer is committed - pass - case messages.ItemCreated(): - # Handle when a new item is created in the conversation - pass - case messages.ResponseCreated(): - # Handle when a new response is created - pass - case messages.ResponseOutputItemAdded(): - # Handle when a new output item is added to the response - pass - case messages.ResponseContenPartAdded(): - # Handle when a new content part is added to the response - pass - case messages.ResponseAudioDone(): - # Handle when the audio response is complete - pass - case messages.ResponseContentPartDone(): - # Handle when a content part of the response is complete - pass - case messages.ResponseOutputItemDone(): - # Handle when an output item in the response is complete - pass - - case _: - # Log any unhandled or unknown message types - logger.warning(f"Unhandled message {message=}") +self.url = f"{base_uri}{path}?model={os.environ.get('OPENAI_MODEL')}" ``` -### Main entry point +This adjustment ensures that the model specified in your environment variables `OPENAI_MODEL` is included in the API requests made by the `RealtimeApiClient`. -The main entry point of the application sets up the Agora RTC engine, configures the options, and launches the RealtimeKitAgent. +### Parse arguments + +To parse the command-line arguments used to customize the channel name and user ID when running the `agent.py` script, add the following code to `parse_args.py`: ```python -if __name__ == "__main__": - # Load environment variables from .env file - load_dotenv() - - # Parse command line arguments - options = parse_args_realtimekit() - logger.info(f"app_id: channel_id: {options['channel_name']}, uid: {options['uid']}") - - # Ensure the Agora App ID is set - if not os.environ.get("AGORA_APP_ID"): - raise ValueError("Need to set environment variable AGORA_APP_ID") - - # Run the RealtimeKitAgent - asyncio.run( - RealtimeKitAgent.entry_point( - # Initialize the RtcEngine with Agora credentials - engine=RtcEngine(appid=os.environ.get("AGORA_APP_ID"), appcert=os.environ.get("AGORA_APP_CERT")), - # Configure RTC options - options=RtcOptions( - channel_name=options['channel_name'], - uid=options['uid'], - sample_rate=SAMPLE_RATE, - channels=CHANNELS - ), - # Configure inference settings - inference_config=InferenceConfig( - # Set up the AI assistant's behavior - system_message="""\ -You are a helpful assistant. If asked about the weather make sure to use the provided tool to get that information. \ -If you are asked a question that requires a tool, say something like "working on that" and dont provide a concrete response \ -until you have received the response to the tool call.\ -""", - voice=messages.Voices.Alloy, - # Configure voice activity detection - turn_detection=messages.ServerVAD( - threshold=0.5, - prefix_padding_ms=500, - suffix_padding_ms=200, - ), - ), - ) - ) +import argparse +import logging +from typing import TypedDict + +logger = logging.getLogger(__name__) + +class RealtimeKitOptions(TypedDict): + channel_name: str + uid: int + +def parse_args(): + parser = argparse.ArgumentParser(description="Agora SDK Example") + parser.add_argument("--channel_name", required=True, help="Channel Id / must") + parser.add_argument("--uid", default=0, help="User Id / default is 0") + return parser.parse_args() + +def parse_args_realtimekit() -> RealtimeKitOptions: + args = parse_args() + logger.info(f"Parsed arguments: {args}") + options: RealtimeKitOptions = {"channel_name": args.channel_name, "uid": args.uid} + + return options ``` -### Complete integration code +## Complete integration code -The `agent.py` imports key classes from `rtc.py`, which implements the server-side Agora Python Voice SDK, facilitating communication and managing audio streams. +The `agent.py` imports key classes from `rtc.py`, which implements the server-side Agora Python Voice SDK, facilitating communication and managing audio streams.
Complete code for `agent.py` @@ -454,11 +937,11 @@ from attr import dataclass from dotenv import load_dotenv from pydantic import BaseModel -from realtime_agent.realtimeapi import messages -from realtime_agent.realtimeapi.client import RealtimeApiClient -from realtime_agent.realtimeapi.util import SAMPLE_RATE,CHANNELS +from realtimeapi import messages +from realtimeapi.client import RealtimeApiClient +from realtimeapi.util import SAMPLE_RATE, CHANNELS -from .agora.rtc import Channel, ChatMessage, RtcEngine, RtcOptions +from agora_realtime_ai_api.rtc import Channel, ChatMessage, RtcEngine, RtcOptions from .parse_args import parse_args_realtimekit logger = logging.getLogger(__name__) @@ -467,11 +950,11 @@ async def wait_for_remote_user(channel: Channel) -> int: remote_users = list(channel.remote_users.keys()) if len(remote_users) > 0: return remote_users[0] - + future = asyncio.Future[int]() - + channel.once("user_joined", lambda conn, user_id: future.set_result(user_id)) - + try: remote_user = await future return remote_user @@ -485,7 +968,6 @@ class InferenceConfig: turn_detection: messages.TurnDetection | None = None # MARK: CHECK! voice: messages.Voices | None = None - @dataclass(frozen=True, kw_only=True) class LocalFunctionToolDeclaration: """Declaration of a tool that can be called by the model, and runs a function locally on the tool context.""" @@ -505,7 +987,6 @@ class LocalFunctionToolDeclaration: }, } - @dataclass(frozen=True, kw_only=True) class PassThroughFunctionToolDeclaration: """Declaration of a tool that can be called by the model.""" @@ -524,23 +1005,18 @@ class PassThroughFunctionToolDeclaration: }, } - ToolDeclaration = LocalFunctionToolDeclaration | PassThroughFunctionToolDeclaration - @dataclass(frozen=True, kw_only=True) class LocalToolCallExecuted: json_encoded_output: str - @dataclass(frozen=True, kw_only=True) class ShouldPassThroughToolCall: decoded_function_args: dict[str, Any] - ExecuteToolCallResult = LocalToolCallExecuted | ShouldPassThroughToolCall - class ToolContext(abc.ABC): _tool_declarations: dict[str, ToolDeclaration] @@ -595,7 +1071,6 @@ class ToolContext(abc.ABC): def model_description(self) -> list[dict[str, Any]]: return [v.model_description() for v in self._tool_declarations.values()] - class ClientToolCallResponse(BaseModel): tool_call_id: str result: dict[str, Any] | str | float | int | bool | None = None @@ -631,7 +1106,7 @@ class RealtimeKitAgent: try: async with RealtimeApiClient( - base_uri=os.getenv("REALTIME_API_BASE_URI", "wss://api.openai.com"), + base_uri="wss://api.openai.com", api_key=os.getenv("OPENAI_API_KEY"), verbose=False, ) as client: @@ -722,7 +1197,7 @@ class RealtimeKitAgent: self.subscribe_user = None logger.info("Subscribed user left, disconnecting") await self.channel.disconnect() - + self.channel.on("user_left", on_user_left) disconnected_future = asyncio.Future[None]() @@ -767,8 +1242,7 @@ class RealtimeKitAgent: # audio queue contains audio data from the model, send it the end-user via our local audio source frame = await self.audio_queue.get() await self.channel.push_audio_frame(frame) - await asyncio.sleep(0) # allow other tasks to run - + await asyncio.sleep(0) # allow other tasks to run async def _process_model_messages(self) -> None: async for message in self.client.listen(): @@ -786,7 +1260,7 @@ class RealtimeKitAgent: logger.info(f"Text message done: {message=}") await self.channel.chat.send_message(ChatMessage(message=message.model_dump_json(), msg_id=message.item_id)) - # case messages.MessageAdded(): + # case messages.MessageAdded(): # pass case messages.InputAudioBufferSpeechStarted(): pass @@ -803,11 +1277,9 @@ class RealtimeKitAgent: # ResponseCreated case messages.ResponseCreated(): pass - # ResponseOutputItemAdded case messages.ResponseOutputItemAdded(): pass - # ResponseContenPartAdded case messages.ResponseContenPartAdded(): pass @@ -829,7 +1301,7 @@ async def shutdown(loop, signal=None): logger.info(f"Received exit signal {signal.name}...") tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] - + logger.info(f"Cancelling {len(tasks)} outstanding tasks") for task in tasks: task.cancel() @@ -842,10 +1314,19 @@ if __name__ == "__main__": options = parse_args_realtimekit() logger.info(f"app_id: channel_id: {options['channel_name']}, uid: {options['uid']}") - - if not os.environ.get("AGORA_APP_ID") : + + if not os.environ.get("AGORA_APP_ID"): raise ValueError("Need to set environment variable AGORA_APP_ID") - + + if not os.environ.get("AGORA_APP_CERT"): + raise ValueError("Need to set environment variable AGORA_APP_CERT") + + if not os.environ.get("OPENAI_API_KEY"): + raise ValueError("Need to set environment variable OPENAI_API_KEY") + + if not os.environ.get("OPENAI_MODEL"): + raise ValueError("Need to set environment variable OPENAI_MODEL") + asyncio.run( RealtimeKitAgent.entry_point( engine=RtcEngine(appid=os.environ.get("AGORA_APP_ID"), appcert=os.environ.get("AGORA_APP_CERT")), @@ -853,7 +1334,7 @@ if __name__ == "__main__": channel_name=options['channel_name'], uid=options['uid'], sample_rate=SAMPLE_RATE, - channels=CHANNELS + channels=CHANNELS, ), inference_config=InferenceConfig( system_message="""\ @@ -874,23 +1355,27 @@ until you have received the response to the tool call.\\
-## Test the code +## Test the Code -1. **Update the values for** `AGORA_APP_ID`, `AGORA_APP_CERT`, **and** `OPENAI_API_KEY` **in the project's** `.env` **file**. +1. Update the values for `AGORA_APP_ID`, `AGORA_APP_CERT`, and `OPENAI_API_KEY` in the project's** `.env` file. - This step ensures that the necessary credentials for Agora and OpenAI are correctly configured in your project. + Ensure that the necessary credentials for Agora and OpenAI are correctly configured in your project’s environment file. -2. **Execute the following command to run the demo**: +2. Execute the following command to run the demo: ```bash python3 agent.py --channel_name=your_channel_name --uid=your_user_id ``` - This command launches the `agent.py` script, initializing the Agora channel and the OpenAI API connection. Replace `your_channel_name` with the desired channel name and `your_user_id` with a unique user ID. + This command launches the `agent.py` script, initializing the Agora channel and connecting to the OpenAI API. Replace `your_channel_name` with the desired channel name and `your_user_id` with a unique user ID. + +### Front-end for testing + +Use Agora's [Voice Call Demo](https://webdemo.agora.io/basicVoiceCall/index.html) for testing. Join with your AppID and generate a token from the project's settings page on the [Agora Console](https://console.agora.io/). ## Reference -This section contains additional information or links to relevant documentation that complements the current page or explains other aspects of the product. +Additional relevant documentation that complements the current page or explains other aspects of the product. - Checkout the [Demo project on GitHub](https://github.com/AgoraIO/agora-openai-converse) - [API reference for `rtc.py`](https://api-reference-git-python-voice-implementation-agora-gdxe.vercel.app/voice-sdk/python/rtc-py-api.html)