Skip to content

Commit

Permalink
Protocol: add support for BecomeMonitor. (#217)
Browse files Browse the repository at this point in the history
  • Loading branch information
tmds authored Dec 26, 2023
1 parent a0ca30c commit c018ff9
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Install .NET
uses: actions/setup-dotnet@v3
with:
dotnet-version: "7.0.x"
dotnet-version: "8.0.x"

- name: Install reportgenerator tool
run: dotnet tool install --global dotnet-reportgenerator-globaltool
Expand Down
14 changes: 14 additions & 0 deletions samples/Monitor/Monitor.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<ProjectReference Include="..\..\src\Tmds.DBus.Protocol\Tmds.DBus.Protocol.csproj" />
</ItemGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
11 changes: 11 additions & 0 deletions samples/Monitor/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Tmds.DBus.Protocol;

string address = Address.Session ?? throw new ArgumentNullException(nameof(address));

await foreach (DisposableMessage dmsg in Connection.MonitorBusAsync(address))
{
using var _ = dmsg;
Message msg = dmsg.Message;

Console.WriteLine($"{msg.MessageType} {msg.SenderAsString} -> {msg.DestinationAsString}");
}
2 changes: 2 additions & 0 deletions src/Tmds.DBus.Protocol/ClientConnectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ protected ClientConnectionOptions()

public bool AutoConnect { get; set; }

internal bool IsShared { get; set; }

protected internal virtual ValueTask<ClientSetupResult> SetupAsync(CancellationToken cancellationToken)
{
return new ValueTask<ClientSetupResult>(
Expand Down
74 changes: 74 additions & 0 deletions src/Tmds.DBus.Protocol/Connection.DBus.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using System.Threading.Channels;

namespace Tmds.DBus.Protocol;

public partial class Connection
Expand All @@ -20,4 +22,76 @@ MessageBuffer CreateMessage()
return writer.CreateMessage();
}
}

public async Task BecomeMonitorAsync(Action<Exception?, DisposableMessage> handler, IEnumerable<MatchRule>? rules = null)
{
if (_connectionOptions.IsShared)
{
throw new InvalidOperationException("Cannot become monitor on a shared connection.");
}

DBusConnection connection = await ConnectCoreAsync().ConfigureAwait(false);
await connection.BecomeMonitorAsync(handler, rules).ConfigureAwait(false);
}

public static async IAsyncEnumerable<DisposableMessage> MonitorBusAsync(string address, IEnumerable<MatchRule>? rules = null, [EnumeratorCancellation]CancellationToken ct = default)
{
ct.ThrowIfCancellationRequested();

var channel = Channel.CreateUnbounded<DisposableMessage>(
new UnboundedChannelOptions()
{
AllowSynchronousContinuations = true,
SingleReader = true,
SingleWriter = true,
}
);

using var connection = new Connection(address);
using CancellationTokenRegistration ctr =
#if NETCOREAPP3_1_OR_GREATER
ct.UnsafeRegister(c => ((Connection)c!).Dispose(), connection);
#else
ct.Register(c => ((Connection)c!).Dispose(), connection);
#endif
try
{
await connection.ConnectAsync().ConfigureAwait(false);

await connection.BecomeMonitorAsync(
(Exception? ex, DisposableMessage message) =>
{
if (ex is not null)
{
if (ct.IsCancellationRequested)
{
ex = new OperationCanceledException(ct);
}
channel.Writer.TryComplete(ex);
return;
}
if (!channel.Writer.TryWrite(message))
{
message.Dispose();
}
},
rules
).ConfigureAwait(false);
}
catch
{
ct.ThrowIfCancellationRequested();

throw;
}

while (await channel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
if (channel.Reader.TryRead(out DisposableMessage msg))
{
yield return msg;
}
}
}
}
2 changes: 1 addition & 1 deletion src/Tmds.DBus.Protocol/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private static Connection CreateConnection(ref Connection? field, string? addres
{
return connection;
}
var newConnection = new Connection(new ClientConnectionOptions(address) { AutoConnect = true });
var newConnection = new Connection(new ClientConnectionOptions(address) { AutoConnect = true, IsShared = true });
connection = Interlocked.CompareExchange(ref field, newConnection, null);
if (connection != null)
{
Expand Down
Loading

0 comments on commit c018ff9

Please sign in to comment.