Skip to content

Commit

Permalink
Handle errors that may happen in the web socket receiver thread (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
daxian-dbw authored Sep 28, 2024
1 parent 1db6ef4 commit d672195
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
20 changes: 17 additions & 3 deletions shell/agents/Microsoft.Azure.Agent/AzureCopilotReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Runtime.ExceptionServices;
using System.Text.Json;

namespace Microsoft.Azure.Agent;
Expand All @@ -26,7 +27,6 @@ private AzureCopilotReceiver(ClientWebSocket webSocket)
}

internal int Watermark { get; private set; }
internal BlockingCollection<CopilotActivity> ActivityQueue => _activityQueue;

internal static async Task<AzureCopilotReceiver> CreateAsync(string streamUrl)
{
Expand All @@ -52,6 +52,7 @@ private async Task ProcessActivities()
if (result.MessageType is WebSocketMessageType.Close)
{
closingMessage = "Close message received";
_activityQueue.Add(new CopilotActivity { Error = new ConnectionDroppedException("The server websocket is closing. Connection dropped.") });
}
}
catch (OperationCanceledException)
Expand All @@ -65,6 +66,7 @@ private async Task ProcessActivities()
{
// TODO: log the closing request.
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, closingMessage, CancellationToken.None);
_activityQueue.CompleteAdding();
break;
}

Expand Down Expand Up @@ -98,8 +100,20 @@ private async Task ProcessActivities()
}
}

// TODO: log the current state of the web socket
// TODO: handle error state, such as 'aborted'
// TODO: log the current state of the web socket.
_activityQueue.Add(new CopilotActivity { Error = new ConnectionDroppedException($"The websocket got in '{_webSocket.State}' state. Connection dropped.") });
_activityQueue.CompleteAdding();
}

internal CopilotActivity Take(CancellationToken cancellationToken)
{
CopilotActivity activity = _activityQueue.Take(cancellationToken);
if (activity.Error is not null)
{
ExceptionDispatchInfo.Capture(activity.Error).Throw();
}

return activity;
}

public void Dispose()
Expand Down
4 changes: 2 additions & 2 deletions shell/agents/Microsoft.Azure.Agent/ChatSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private async Task StartConversationAsync(IHost host, CancellationToken cancella

while (true)
{
CopilotActivity activity = _copilotReceiver.ActivityQueue.Take(cancellationToken);
CopilotActivity activity = _copilotReceiver.Take(cancellationToken);
if (activity.IsMessage && activity.IsFromCopilot && _copilotReceiver.Watermark is 0)
{
activity.ExtractMetadata(out _, out ConversationState conversationState);
Expand Down Expand Up @@ -259,7 +259,7 @@ internal async Task<CopilotResponse> GetChatResponseAsync(string input, IStatusC

while (true)
{
CopilotActivity activity = _copilotReceiver.ActivityQueue.Take(cancellationToken);
CopilotActivity activity = _copilotReceiver.Take(cancellationToken);

if (activity.ReplyToId != activityId)
{
Expand Down
2 changes: 1 addition & 1 deletion shell/agents/Microsoft.Azure.Agent/Schema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ internal CopilotActivity ReadChunk(CancellationToken cancellationToken)
return null;
}

CopilotActivity activity = _receiver.ActivityQueue.Take(cancellationToken);
CopilotActivity activity = _receiver.Take(cancellationToken);

if (!activity.IsMessageUpdate)
{
Expand Down

0 comments on commit d672195

Please sign in to comment.