Skip to content

Commit

Permalink
Fixed multicast based discoverer.
Browse files Browse the repository at this point in the history
  • Loading branch information
lextm committed Oct 19, 2024
1 parent c4dfca9 commit 3e670b3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 61 deletions.
61 changes: 25 additions & 36 deletions SharpSnmpLib/Messaging/Discoverer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ namespace Lextm.SharpSnmpLib.Messaging
public sealed partial class Discoverer
{
private int _active;
private int _bufferSize;
private int _requestId;
private static readonly UserRegistry Empty = new();
private readonly IList<Variable> _defaultVariables = new List<Variable> { new(new ObjectIdentifier(new uint[] { 1, 3, 6, 1, 2, 1, 1, 1, 0 })) };
Expand Down Expand Up @@ -88,7 +87,7 @@ public void Discover(VersionCode version, IPEndPoint broadcastAddress, OctetStri
using var udp = new UdpClient(addressFamily);
if (addressFamily == AddressFamily.InterNetworkV6)
{
udp.JoinMulticastGroup(broadcastAddress.Address);
udp.MulticastLoopback = false;
}
else if (addressFamily == AddressFamily.InterNetwork)
{
Expand All @@ -108,12 +107,12 @@ public void Discover(VersionCode version, IPEndPoint broadcastAddress, OctetStri
return;
}

_bufferSize = udp.Client.ReceiveBufferSize = Messenger.MaxMessageSize;
udp.Client.ReceiveBufferSize = Messenger.MaxMessageSize;

#if ASYNC
Task.Factory.StartNew(() => AsyncBeginReceive(udp.Client));
Task.Factory.StartNew(() => AsyncBeginReceive(udp));
#else
Task.Factory.StartNew(() => AsyncReceive(udp.Client));
Task.Factory.StartNew(() => AsyncReceive(udp));
#endif

Thread.Sleep(interval);
Expand Down Expand Up @@ -198,7 +197,7 @@ private void AsyncEndReceive(IAsyncResult iar)
}
#else

private void AsyncReceive(Socket socket)
private void AsyncReceive(UdpClient client)
{
while (true)
{
Expand All @@ -210,10 +209,9 @@ private void AsyncReceive(Socket socket)

try
{
var buffer = new byte[_bufferSize];
EndPoint remote = new IPEndPoint(IPAddress.Any, 0);
var count = socket.ReceiveFrom(buffer, ref remote);
Task.Factory.StartNew(() => HandleMessage(buffer, count, (IPEndPoint)remote));
var remote = new IPEndPoint(IPAddress.Any, 0);
var buffer = client.Receive(ref remote);
Task.Factory.StartNew(() => HandleMessage(buffer, buffer.Length, remote));
}
catch (SocketException ex)
{
Expand Down Expand Up @@ -298,11 +296,6 @@ public async Task DiscoverAsync(VersionCode version, IPEndPoint broadcastAddress
}

var addressFamily = broadcastAddress.AddressFamily;
if (addressFamily == AddressFamily.InterNetworkV6)
{
throw new ArgumentException("IP v6 is not yet supported.", nameof(broadcastAddress));
}

byte[] bytes;
_requestId = Messenger.NextRequestId;
if (version == VersionCode.V3)
Expand All @@ -316,25 +309,32 @@ public async Task DiscoverAsync(VersionCode version, IPEndPoint broadcastAddress
bytes = message.ToBytes();
}

using var udp = new Socket(addressFamily, SocketType.Dgram, ProtocolType.Udp);
udp.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast, true);
var buffer = new ArraySegment<byte>(bytes);
await udp.SendToAsync(buffer, SocketFlags.None, broadcastAddress);
using var udp = new UdpClient(addressFamily);
if (addressFamily == AddressFamily.InterNetworkV6)
{
udp.MulticastLoopback = false;
}
else if (addressFamily == AddressFamily.InterNetwork)
{
#if (!CF)
udp.EnableBroadcast = true;
#endif
}

await udp.SendAsync(bytes, bytes.Length, broadcastAddress);
var activeBefore = Interlocked.CompareExchange(ref _active, Active, Inactive);
if (activeBefore == Active)
{
// If already started, we've nothing to do.
return;
}

_bufferSize = udp.ReceiveBufferSize;
#if NET6_0_OR_GREATER
var source = new CancellationTokenSource();

Check warning on line 333 in SharpSnmpLib/Messaging/Discoverer.cs

View workflow job for this annotation

GitHub Actions / windows

Dispose 'source' when it is no longer needed. (https://rules.sonarsource.com/csharp/RSPEC-2930)

Check warning on line 333 in SharpSnmpLib/Messaging/Discoverer.cs

View workflow job for this annotation

GitHub Actions / windows

Dispose 'source' when it is no longer needed. (https://rules.sonarsource.com/csharp/RSPEC-2930)
source.CancelAfter(interval);
try
{
await ReceiveAsync(udp, source.Token);
await ReceiveAsync(udp.Client, source.Token);
}
catch (OperationCanceledException)
{
Expand All @@ -346,18 +346,10 @@ await Task.WhenAny(
Task.Delay(interval));
#endif
Interlocked.CompareExchange(ref _active, Inactive, Active);
try
{
udp.Shutdown(SocketShutdown.Both);
}
catch (SocketException)
{
// This exception is thrown in .NET Core <=2.1.4 on non-Windows systems.
// However, the shutdown call is necessary to release the socket binding.
}
udp.Close();
}
#if !NET6_0_OR_GREATER
private async Task ReceiveAsync(Socket socket)
private async Task ReceiveAsync(UdpClient client)
{
while (true)
{
Expand All @@ -369,11 +361,8 @@ private async Task ReceiveAsync(Socket socket)

try
{
EndPoint remote = new IPEndPoint(IPAddress.Any, 0);

var buffer = new byte[_bufferSize];
var result = await socket.ReceiveMessageFromAsync(new ArraySegment<byte>(buffer), SocketFlags.None, remote);
await Task.Factory.StartNew(() => HandleMessage(buffer, result.ReceivedBytes, (IPEndPoint) result.RemoteEndPoint))
var result = await client.ReceiveAsync();
await Task.Factory.StartNew(() => HandleMessage(result.Buffer, result.Buffer.Length, result.RemoteEndPoint))
.ConfigureAwait(false);
}
catch (SocketException ex)
Expand Down
42 changes: 17 additions & 25 deletions SharpSnmpLib/Messaging/Net6Discoverer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public void Discover(VersionCode version, IPEndPoint broadcastAddress, OctetStri
return;
}

_bufferSize = udp.Client.ReceiveBufferSize = Messenger.MaxMessageSize;
udp.Client.ReceiveBufferSize = Messenger.MaxMessageSize;

#if ASYNC
Task.Factory.StartNew(() => AsyncBeginReceive(udp.Client, token));
#else
Task.Factory.StartNew(() => AsyncReceive(udp.Client), token);
Task.Factory.StartNew(() => AsyncReceive(udp), token);
#endif
token.WaitHandle.WaitOne();
Interlocked.CompareExchange(ref _active, Inactive, Active);
Expand Down Expand Up @@ -154,11 +154,6 @@ public async Task DiscoverAsync(VersionCode version, IPEndPoint broadcastAddress
}

var addressFamily = broadcastAddress.AddressFamily;
if (addressFamily == AddressFamily.InterNetworkV6)
{
throw new ArgumentException("IP v6 is not yet supported.", nameof(broadcastAddress));
}

byte[] bytes;
_requestId = Messenger.NextRequestId;
if (version == VersionCode.V3)
Expand All @@ -172,31 +167,28 @@ public async Task DiscoverAsync(VersionCode version, IPEndPoint broadcastAddress
bytes = message.ToBytes();
}

using var udp = new Socket(addressFamily, SocketType.Dgram, ProtocolType.Udp);
udp.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Broadcast, true);
var buffer = new ArraySegment<byte>(bytes);
await udp.SendToAsync(buffer, SocketFlags.None, broadcastAddress, token);
using var udp = new UdpClient(addressFamily);
if (addressFamily == AddressFamily.InterNetworkV6)
{
udp.MulticastLoopback = false;
}
else if (addressFamily == AddressFamily.InterNetwork)
{
udp.EnableBroadcast = true;
}

await udp.Client.SendToAsync(bytes, SocketFlags.None, broadcastAddress, token);
var activeBefore = Interlocked.CompareExchange(ref _active, Active, Inactive);
if (activeBefore == Active)
{
// If already started, we've nothing to do.
return;
}

_bufferSize = udp.ReceiveBufferSize;
await ReceiveAsync(udp, token);
await ReceiveAsync(udp.Client, token);

Interlocked.CompareExchange(ref _active, Inactive, Active);
try
{
udp.Shutdown(SocketShutdown.Both);
}
catch (SocketException)
{
// This exception is thrown in .NET Core <=2.1.4 on non-Windows systems.
// However, the shutdown call is necessary to release the socket binding.
}
udp.Close();
}

private async Task ReceiveAsync(Socket socket, CancellationToken token)
Expand All @@ -216,10 +208,10 @@ private async Task ReceiveAsync(Socket socket, CancellationToken token)

try
{
EndPoint remote = new IPEndPoint(IPAddress.Any, 0);
EndPoint remote = new IPEndPoint(socket.AddressFamily == AddressFamily.InterNetwork ? IPAddress.Any : IPAddress.IPv6Any, 0);

var buffer = new byte[_bufferSize];
var result = await socket.ReceiveMessageFromAsync(new ArraySegment<byte>(buffer), SocketFlags.None, remote, token);
var buffer = new byte[socket.ReceiveBufferSize];
var result = await socket.ReceiveFromAsync(new ArraySegment<byte>(buffer), SocketFlags.None, remote, token);
await Task.Factory.StartNew(() => HandleMessage(buffer, result.ReceivedBytes, (IPEndPoint)result.RemoteEndPoint), token)
.ConfigureAwait(false);
}
Expand Down

0 comments on commit 3e670b3

Please sign in to comment.