Skip to content

Commit

Permalink
Add ability to cancel the event subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
seclerp committed Nov 14, 2023
1 parent 41a6a27 commit 70fc4a1
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 15 deletions.
5 changes: 4 additions & 1 deletion src/ChromeProtocol.Core/DomainEventHandler.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
namespace ChromeProtocol.Core;

public delegate Task DomainEventHandler<in TEvent>(TEvent @event)
public delegate void SyncDomainEventHandler<in TEvent>(TEvent @event)

Check warning on line 3 in src/ChromeProtocol.Core/DomainEventHandler.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'SyncDomainEventHandler<TEvent>'
where TEvent : IEvent;

public delegate Task AsyncDomainEventHandler<in TEvent>(TEvent @event)

Check warning on line 6 in src/ChromeProtocol.Core/DomainEventHandler.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'AsyncDomainEventHandler<TEvent>'
where TEvent : IEvent;
14 changes: 10 additions & 4 deletions src/ChromeProtocol.Runtime/Messaging/IProtocolClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@ namespace ChromeProtocol.Runtime.Messaging;
public interface IProtocolClient : IDisposable
{
event EventHandler OnConnected;

event EventHandler OnDisconnected;

event EventHandler<ProtocolRequest<ICommand>> OnRequestSent;
event EventHandler<ProtocolResponse<JObject>> OnResponseReceived;
event EventHandler<ProtocolEvent<JObject>> OnEventReceived;

Task ConnectAsync(CancellationToken token = default);

[Obsolete("Use Dispose of the ConnectAsync return object.")]
Task DisconnectAsync(CancellationToken token = default);

void ListenEvent<TEvent>(DomainEventHandler<TEvent> handler)
[Obsolete("Use SubscribeSync or SubscribeAsync instead.")]
void ListenEvent<TEvent>(AsyncDomainEventHandler<TEvent> handler)
where TEvent : IEvent;

IDisposable SubscribeAsync<TEvent>(AsyncDomainEventHandler<TEvent> handler)
where TEvent : IEvent;

IDisposable SubscribeSync<TEvent>(SyncDomainEventHandler<TEvent> handler)
where TEvent : IEvent;

Task<TResponse> SendCommandAsync<TResponse>(ICommand<TResponse> command, string? sessionId = default, CancellationToken? token = default)
Expand All @@ -26,4 +32,4 @@ Task<TResponse> SendCommandAsync<TResponse>(ICommand<TResponse> command, string?
Task FireCommandAsync(ICommand command, string? sessionId = default, CancellationToken token = default);

IScopedProtocolClient CreateScoped(string sessionId);
}
}
11 changes: 9 additions & 2 deletions src/ChromeProtocol.Runtime/Messaging/IScopedProtocolClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ public interface IScopedProtocolClient : IDisposable
{
public string SessionId { get; }

void ListenEvent<TEvent>(DomainEventHandler<TEvent> handler)
[Obsolete("Use SubscribeSync or SubscribeAsync instead.")]
void ListenEvent<TEvent>(AsyncDomainEventHandler<TEvent> handler)
where TEvent : IEvent;

IDisposable SubscribeAsync<TEvent>(AsyncDomainEventHandler<TEvent> handler)
where TEvent : IEvent;

IDisposable SubscribeSync<TEvent>(SyncDomainEventHandler<TEvent> handler)
where TEvent : IEvent;

Task<TResponse> SendCommandAsync<TResponse>(ICommand<TResponse> command, CancellationToken? token = default)
where TResponse : IType;

Task FireCommandAsync(ICommand command, CancellationToken token = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ public ScopedProtocolClient(IProtocolClient mainClient, string sessionId)
}

// TODO: Support session-specific events
public void ListenEvent<TEvent>(DomainEventHandler<TEvent> handler) where TEvent : IEvent =>
_mainClient.ListenEvent(handler);
public void ListenEvent<TEvent>(AsyncDomainEventHandler<TEvent> handler) where TEvent : IEvent =>
SubscribeAsync(handler);

public IDisposable SubscribeAsync<TEvent>(AsyncDomainEventHandler<TEvent> handler) where TEvent : IEvent =>
_mainClient.SubscribeAsync(handler);

public IDisposable SubscribeSync<TEvent>(SyncDomainEventHandler<TEvent> handler) where TEvent : IEvent =>
_mainClient.SubscribeSync(handler);

public Task<TResponse> SendCommandAsync<TResponse>(ICommand<TResponse> command, CancellationToken? token = default)
where TResponse : IType => _mainClient.SendCommandAsync(command, SessionId, token);
Expand All @@ -26,4 +32,4 @@ public Task FireCommandAsync(ICommand command, CancellationToken token = default
public void Dispose()
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,33 @@ public async Task DisconnectAsync(CancellationToken token = default)
OnDisconnected?.Invoke(this, EventArgs.Empty);
}

public void ListenEvent<TEvent>(DomainEventHandler<TEvent> handler) where TEvent : IEvent
public void ListenEvent<TEvent>(AsyncDomainEventHandler<TEvent> handler) where TEvent : IEvent
{
Func<ProtocolEvent<JObject>, Task> HandleProtocolEvent(DomainEventHandler<TEvent> eventHandler) =>
SubscribeAsync(handler);
}

public IDisposable SubscribeAsync<TEvent>(AsyncDomainEventHandler<TEvent> handler) where TEvent : IEvent
{
Func<ProtocolEvent<JObject>, Task> HandleProtocolEvent(AsyncDomainEventHandler<TEvent> eventHandler) =>
async rawEvent =>
{
var eventItself = rawEvent.Params.ToObject<TEvent>();
await eventHandler(eventItself).ConfigureAwait(false);
};

_eventHandlers.AddOrUpdate(GetMethodName(typeof(TEvent)), HandleProtocolEvent(handler), (_, existing) => existing + HandleProtocolEvent(handler));
return SubscribeInternal<TEvent>(HandleProtocolEvent(handler));
}

public IDisposable SubscribeSync<TEvent>(SyncDomainEventHandler<TEvent> handler) where TEvent : IEvent
{
Func<ProtocolEvent<JObject>, Task> HandleProtocolEvent(SyncDomainEventHandler<TEvent> eventHandler) =>
rawEvent =>
{
var eventItself = rawEvent.Params.ToObject<TEvent>();
return Task.Run(() => eventHandler(eventItself));
};

return SubscribeInternal<TEvent>(HandleProtocolEvent(handler));
}

public async Task<TResponse> SendCommandAsync<TResponse>(ICommand<TResponse> command,
Expand Down Expand Up @@ -123,6 +140,14 @@ private async Task FireInternalAsync(int id, string methodName, ICommand command
}
}

private IDisposable SubscribeInternal<TEvent>(Func<ProtocolEvent<JObject>, Task> rawHandler) where TEvent : IEvent
{
var eventName = GetMethodName(typeof(TEvent));
var subscription = new ProtocolSubscription<TNativeClient>(eventName, rawHandler, this);
_eventHandlers.AddOrUpdate(GetMethodName(typeof(TEvent)), rawHandler, (_, existing) => existing + rawHandler);
return subscription;
}

private void StartOutgoingWorker(CancellationToken token)
{
new Thread(() =>
Expand Down Expand Up @@ -216,4 +241,35 @@ private async Task ProcessOutgoingRequest(ProtocolRequest<ICommand> request)
private static string GetMethodName(MemberInfo type) =>
type.GetCustomAttribute<MethodNameAttribute>()?.MethodName
?? throw new Exception($"{nameof(MethodNameAttribute)} is required on type {type.Name} but it is not presented.");
}

private class ProtocolSubscription<T> : IDisposable where T : WebSocket
{
private readonly string _eventName;
private readonly Func<ProtocolEvent<JObject>, Task>? _wrappedHandler;
private readonly WebSocketProtocolClient<T> _client;

public ProtocolSubscription(string eventName, Func<ProtocolEvent<JObject>,Task>? wrappedHandler, WebSocketProtocolClient<T> client)
{
_eventName = eventName;
_wrappedHandler = wrappedHandler;
_client = client;
}

public void Dispose()
{
if (_client._eventHandlers.TryGetValue(_eventName, out var aggregatedHandlers))
{
var updatedHandlers = aggregatedHandlers - _wrappedHandler;
if (updatedHandlers is null)
{
_client._eventHandlers.TryRemove(_eventName, out _);
}
else
{
_client._eventHandlers[_eventName] = updatedHandlers;
}
}
}
}
}

2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<!-- Global project and StyleCop Analyzers configuration -->
<PropertyGroup>
<Version>1.0.0-preview3</Version>
<Version>1.0.0-preview4</Version>
<Authors>seclerp</Authors>
<GenerateDocumentationFile>true</GenerateDocumentationFile>

Expand Down

0 comments on commit 70fc4a1

Please sign in to comment.