diff --git a/.gitignore b/.gitignore index f1e3d20e0..3a2238d6b 100644 --- a/.gitignore +++ b/.gitignore @@ -15,12 +15,12 @@ [Dd]ebugPublic/ [Rr]elease/ [Rr]eleases/ -x64/ -x86/ +[Xx]64/ +[Xx]86/ +[Bb]uild/ bld/ [Bb]in/ [Oo]bj/ -[Ll]og/ # Visual Studio 2015 cache/options directory .vs/ @@ -81,7 +81,6 @@ ipch/ *.sdf *.cachefile *.VC.db -*.VC.VC.opendb # Visual Studio profiler *.psess @@ -140,15 +139,12 @@ publish/ # Publish Web Output *.[Pp]ublish.xml *.azurePubxml -# TODO: Comment the next line if you want to checkin your web deploy settings -# but database connection strings (with potential passwords) will be unencrypted -*.pubxml -*.publishproj -# Microsoft Azure Web App publish settings. Comment the next line if you want to -# checkin your Azure Web App publish settings, but sensitive information contained -# in these scripts will be unencrypted -PublishScripts/ +# TODO: Un-comment the next line if you do not want to checkin +# your web deploy settings because they may include unencrypted +# passwords +#*.pubxml +*.publishproj # NuGet Packages *.nupkg @@ -170,11 +166,12 @@ csx/ ecf/ rcf/ -# Windows Store app package directories and files +# Microsoft Azure ApplicationInsights config file +ApplicationInsights.config + +# Windows Store app package directory AppPackages/ BundleArtifacts/ -Package.StoreAssociation.xml -_pkginfo.txt # Visual Studio cache files # files ending in .cache can be ignored @@ -184,6 +181,7 @@ _pkginfo.txt # Others ClientBin/ +[Ss]tyle[Cc]op.* ~$* *~ *.dbmdl @@ -193,10 +191,6 @@ ClientBin/ node_modules/ orleans.codegen.cs -# Since there are multiple workflows, uncomment next line to ignore bower_components -# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622) -#bower_components/ - # RIA/Silverlight projects Generated_Code/ @@ -240,13 +234,12 @@ FakesAssemblies/ **/*.Server/ModelManifest.xml _Pvt_Extensions +# LightSwitch generated files +GeneratedArtifacts/ +ModelManifest.xml + # Paket dependency manager .paket/paket.exe -paket-files/ # FAKE - F# Make -.fake/ - -# JetBrains Rider -.idea/ -*.sln.iml +.fake/ \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetCore/MQTTnet.NetCore.csproj b/Frameworks/MQTTnet.NetCore/MQTTnet.NetCore.csproj new file mode 100644 index 000000000..8f20ee424 --- /dev/null +++ b/Frameworks/MQTTnet.NetCore/MQTTnet.NetCore.csproj @@ -0,0 +1,23 @@ + + + + netcoreapp1.1 + Christian Kratky + Christian Kratky + MQTTnet + MQTTnet for .NET Core + Copyright © Christian Kratky 2016-2017 + 2.0.4.0 + 2.0.4.0 + 2.0.4.0 + True + MQTTnet + MQTTnet + MQTTnet + + + + + + + \ No newline at end of file diff --git a/MQTTnet.Universal/MqttClientFactory.cs b/Frameworks/MQTTnet.NetCore/MqttClientFactory.cs similarity index 67% rename from MQTTnet.Universal/MqttClientFactory.cs rename to Frameworks/MQTTnet.NetCore/MqttClientFactory.cs index 36b86438d..4da24a174 100644 --- a/MQTTnet.Universal/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetCore/MqttClientFactory.cs @@ -3,7 +3,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; -namespace MQTTnet.Universal +namespace MQTTnet { public class MqttClientFactory { @@ -11,7 +11,7 @@ public MqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); } } } diff --git a/Frameworks/MQTTnet.NetCore/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetCore/MqttServerAdapter.cs new file mode 100644 index 000000000..1d3db70d1 --- /dev/null +++ b/Frameworks/MQTTnet.NetCore/MqttServerAdapter.cs @@ -0,0 +1,56 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Serializer; +using MQTTnet.Core.Server; + +namespace MQTTnet +{ + public sealed class MqttServerAdapter : IMqttServerAdapter, IDisposable + { + private CancellationTokenSource _cancellationTokenSource; + private Socket _socket; + + public event EventHandler ClientConnected; + + public void Start(MqttServerOptions options) + { + if (_socket != null) throw new InvalidOperationException("Server is already started."); + + _cancellationTokenSource = new CancellationTokenSource(); + + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + _socket.Bind(new IPEndPoint(IPAddress.Any, options.Port)); + _socket.Listen(options.ConnectionBacklog); + + Task.Run(async () => await AcceptConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); + } + + public void Stop() + { + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + + _socket?.Dispose(); + _socket = null; + } + + public void Dispose() + { + Stop(); + } + + private async Task AcceptConnectionsAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + var clientSocket = await _socket.AcceptAsync(); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket), new DefaultMqttV311PacketSerializer()); + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetCore/MqttServerFactory.cs b/Frameworks/MQTTnet.NetCore/MqttServerFactory.cs new file mode 100644 index 000000000..a72542659 --- /dev/null +++ b/Frameworks/MQTTnet.NetCore/MqttServerFactory.cs @@ -0,0 +1,15 @@ +using System; +using MQTTnet.Core.Server; + +namespace MQTTnet +{ + public class MqttServerFactory + { + public MqttServer CreateMqttServer(MqttServerOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttServer(options, new MqttServerAdapter()); + } + } +} diff --git a/Frameworks/MQTTnet.NetCore/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetCore/MqttTcpChannel.cs new file mode 100644 index 000000000..139037c89 --- /dev/null +++ b/Frameworks/MQTTnet.NetCore/MqttTcpChannel.cs @@ -0,0 +1,81 @@ +using System; +using System.Net.Sockets; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; + +namespace MQTTnet +{ + public class MqttTcpChannel : IMqttCommunicationChannel, IDisposable + { + private readonly Socket _socket; + + public MqttTcpChannel() + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + + public MqttTcpChannel(Socket socket) + { + _socket = socket ?? throw new ArgumentNullException(nameof(socket)); + } + + public async Task ConnectAsync(MqttClientOptions options) + { + try + { + await _socket.ConnectAsync(options.Server, options.Port); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task DisconnectAsync() + { + try + { + _socket.Dispose(); + await Task.FromResult(0); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task WriteAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + try + { + await _socket.SendAsync(new ArraySegment(buffer), SocketFlags.None); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task ReadAsync(byte[] buffer) + { + try + { + var buffer2 = new ArraySegment(buffer); + await _socket.ReceiveAsync(buffer2, SocketFlags.None); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public void Dispose() + { + _socket?.Dispose(); + } + } +} \ No newline at end of file diff --git a/MQTTnet.NET/MQTTnet.NETFramework.csproj b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj similarity index 84% rename from MQTTnet.NET/MQTTnet.NETFramework.csproj rename to Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj index 54d0160f4..dd62f3c2f 100644 --- a/MQTTnet.NET/MQTTnet.NETFramework.csproj +++ b/Frameworks/MQTTnet.NetFramework/MQTTnet.NetFramework.csproj @@ -7,10 +7,11 @@ {A480EF90-0EAA-4D9A-B271-47A9C47F6F7D} Library Properties - MQTTnet.NETFramework - MQTTnet.NETFramework - v4.6.1 + MQTTnet + MQTTnet + v4.5 512 + true @@ -20,6 +21,7 @@ DEBUG;TRACE prompt 4 + false pdbonly @@ -28,19 +30,21 @@ TRACE prompt 4 + false - + + - + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE} MQTTnet.Core diff --git a/MQTTnet.NET/MqttClientFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs similarity index 67% rename from MQTTnet.NET/MqttClientFactory.cs rename to Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs index 2a1600b3e..4da24a174 100644 --- a/MQTTnet.NET/MqttClientFactory.cs +++ b/Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs @@ -3,7 +3,7 @@ using MQTTnet.Core.Client; using MQTTnet.Core.Serializer; -namespace MQTTnet.NETFramework +namespace MQTTnet { public class MqttClientFactory { @@ -11,7 +11,7 @@ public MqttClient CreateMqttClient(MqttClientOptions options) { if (options == null) throw new ArgumentNullException(nameof(options)); - return new MqttClient(options, new MqttChannelAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); } } } diff --git a/Frameworks/MQTTnet.NetFramework/MqttServerAdapter.cs b/Frameworks/MQTTnet.NetFramework/MqttServerAdapter.cs new file mode 100644 index 000000000..b7b1b30a9 --- /dev/null +++ b/Frameworks/MQTTnet.NetFramework/MqttServerAdapter.cs @@ -0,0 +1,56 @@ +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Serializer; +using MQTTnet.Core.Server; + +namespace MQTTnet +{ + public sealed class MqttServerAdapter : IMqttServerAdapter, IDisposable + { + private CancellationTokenSource _cancellationTokenSource; + private Socket _socket; + + public event EventHandler ClientConnected; + + public void Start(MqttServerOptions options) + { + if (_socket != null) throw new InvalidOperationException("Server is already started."); + + _cancellationTokenSource = new CancellationTokenSource(); + + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + _socket.Bind(new IPEndPoint(IPAddress.Any, options.Port)); + _socket.Listen(options.ConnectionBacklog); + + Task.Run(async () => await AcceptConnectionsAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token); + } + + public void Stop() + { + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + + _socket?.Dispose(); + _socket = null; + } + + public void Dispose() + { + Stop(); + } + + private async Task AcceptConnectionsAsync(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + var clientSocket = await Task.Factory.FromAsync(_socket.BeginAccept, _socket.EndAccept, null); + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket), new DefaultMqttV311PacketSerializer()); + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter)); + } + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs new file mode 100644 index 000000000..a72542659 --- /dev/null +++ b/Frameworks/MQTTnet.NetFramework/MqttServerFactory.cs @@ -0,0 +1,15 @@ +using System; +using MQTTnet.Core.Server; + +namespace MQTTnet +{ + public class MqttServerFactory + { + public MqttServer CreateMqttServer(MqttServerOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttServer(options, new MqttServerAdapter()); + } + } +} diff --git a/Frameworks/MQTTnet.NetFramework/MqttTcpChannel.cs b/Frameworks/MQTTnet.NetFramework/MqttTcpChannel.cs new file mode 100644 index 000000000..c690bcb1d --- /dev/null +++ b/Frameworks/MQTTnet.NetFramework/MqttTcpChannel.cs @@ -0,0 +1,85 @@ +using System; +using System.Net.Sockets; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Core.Client; +using MQTTnet.Core.Exceptions; + +namespace MQTTnet +{ + public class MqttTcpChannel : IMqttCommunicationChannel, IDisposable + { + private readonly Socket _socket; + + public MqttTcpChannel() + { + _socket = new Socket(SocketType.Stream, ProtocolType.Tcp); + } + + public MqttTcpChannel(Socket socket) + { + _socket = socket ?? throw new ArgumentNullException(nameof(socket)); + } + + public async Task ConnectAsync(MqttClientOptions options) + { + try + { + await Task.Factory.FromAsync(_socket.BeginConnect, _socket.EndConnect, options.Server, options.Port, null); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task DisconnectAsync() + { + try + { + await Task.Factory.FromAsync(_socket.BeginDisconnect, _socket.EndDisconnect, true, null); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task WriteAsync(byte[] buffer) + { + if (buffer == null) throw new ArgumentNullException(nameof(buffer)); + + try + { + await Task.Factory.FromAsync( + // ReSharper disable once AssignNullToNotNullAttribute + _socket.BeginSend(buffer, 0, buffer.Length, SocketFlags.None, null, null), + _socket.EndSend); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public async Task ReadAsync(byte[] buffer) + { + try + { + await Task.Factory.FromAsync( + // ReSharper disable once AssignNullToNotNullAttribute + _socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, null, null), + _socket.EndReceive); + } + catch (SocketException exception) + { + throw new MqttCommunicationException(exception); + } + } + + public void Dispose() + { + _socket?.Dispose(); + } + } +} \ No newline at end of file diff --git a/MQTTnet.NET/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs similarity index 83% rename from MQTTnet.NET/Properties/AssemblyInfo.cs rename to Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs index a61a3e6a2..16d71a4bc 100644 --- a/MQTTnet.NET/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs @@ -1,8 +1,8 @@ using System.Reflection; using System.Runtime.InteropServices; -[assembly: AssemblyTitle("MQTTnet.NETFramework")] -[assembly: AssemblyDescription("")] +[assembly: AssemblyTitle("MQTTnet")] +[assembly: AssemblyDescription("MQTTnet for .NET Framework")] [assembly: AssemblyConfiguration("")] [assembly: AssemblyCompany("Christian Kratky")] [assembly: AssemblyProduct("MQTTnet")] diff --git a/MQTTnet.Universal/MQTTnet.Universal.csproj b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj similarity index 94% rename from MQTTnet.Universal/MQTTnet.Universal.csproj rename to Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj index b6612887b..f1c2f9727 100644 --- a/MQTTnet.Universal/MQTTnet.Universal.csproj +++ b/Frameworks/MQTTnet.UniversalWindows/MQTTnet.UniversalWindows.csproj @@ -7,12 +7,12 @@ {BD60C727-D8E8-40C3-B8E3-C95A864AE611} Library Properties - MQTTnet.Universal - MQTTnet.Universal + MQTTnet + MQTTnet en-US UAP 10.0.14393.0 - 10.0.10586.0 + 10.0.10240.0 14 512 {A5A43C5B-DE2A-4C0C-9213-0A381AF9435A};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} @@ -108,12 +108,14 @@ + + - + {99C884F3-B4B9-417D-AA92-DC7DD1C4CFEE} MQTTnet.Core diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs new file mode 100644 index 000000000..4da24a174 --- /dev/null +++ b/Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs @@ -0,0 +1,17 @@ +using System; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Client; +using MQTTnet.Core.Serializer; + +namespace MQTTnet +{ + public class MqttClientFactory + { + public MqttClient CreateMqttClient(MqttClientOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer())); + } + } +} diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttServerAdapter.cs b/Frameworks/MQTTnet.UniversalWindows/MqttServerAdapter.cs new file mode 100644 index 000000000..b9eeecd4b --- /dev/null +++ b/Frameworks/MQTTnet.UniversalWindows/MqttServerAdapter.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading; +using Windows.Networking.Sockets; +using MQTTnet.Core.Adapter; +using MQTTnet.Core.Serializer; +using MQTTnet.Core.Server; + +namespace MQTTnet +{ + public sealed class MqttServerAdapter : IMqttServerAdapter, IDisposable + { + private CancellationTokenSource _cancellationTokenSource; + private StreamSocketListener _socket; + + public event EventHandler ClientConnected; + + public void Start(MqttServerOptions options) + { + if (_socket != null) throw new InvalidOperationException("Server is already started."); + + _cancellationTokenSource = new CancellationTokenSource(); + + _socket = new StreamSocketListener(); + _socket.BindServiceNameAsync(options.Port.ToString()).AsTask().Wait(); + _socket.ConnectionReceived += ConnectionReceived; + } + + private void ConnectionReceived(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args) + { + var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new DefaultMqttV311PacketSerializer()); + + var identifier = $"{args.Socket.Information.RemoteAddress}:{args.Socket.Information.RemotePort}"; + ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(identifier, clientAdapter)); + } + + public void Stop() + { + _cancellationTokenSource?.Dispose(); + + if (_socket != null) + { + _socket.ConnectionReceived -= ConnectionReceived; + } + + _socket?.Dispose(); + _socket = null; + } + + public void Dispose() + { + Stop(); + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs new file mode 100644 index 000000000..a72542659 --- /dev/null +++ b/Frameworks/MQTTnet.UniversalWindows/MqttServerFactory.cs @@ -0,0 +1,15 @@ +using System; +using MQTTnet.Core.Server; + +namespace MQTTnet +{ + public class MqttServerFactory + { + public MqttServer CreateMqttServer(MqttServerOptions options) + { + if (options == null) throw new ArgumentNullException(nameof(options)); + + return new MqttServer(options, new MqttServerAdapter()); + } + } +} diff --git a/MQTTnet.Universal/MqttTcpChannel.cs b/Frameworks/MQTTnet.UniversalWindows/MqttTcpChannel.cs similarity index 78% rename from MQTTnet.Universal/MqttTcpChannel.cs rename to Frameworks/MQTTnet.UniversalWindows/MqttTcpChannel.cs index 350c4b2ac..5763df423 100644 --- a/MQTTnet.Universal/MqttTcpChannel.cs +++ b/Frameworks/MQTTnet.UniversalWindows/MqttTcpChannel.cs @@ -8,11 +8,21 @@ using MQTTnet.Core.Client; using Buffer = Windows.Storage.Streams.Buffer; -namespace MQTTnet.Universal +namespace MQTTnet { - public sealed class MqttTcpChannel : IMqttTransportChannel, IDisposable + public sealed class MqttTcpChannel : IMqttCommunicationChannel, IDisposable { - private readonly StreamSocket _socket = new StreamSocket(); + private readonly StreamSocket _socket; + + public MqttTcpChannel() + { + _socket = new StreamSocket(); + } + + public MqttTcpChannel(StreamSocket socket) + { + _socket = socket ?? throw new ArgumentNullException(nameof(socket)); + } public async Task ConnectAsync(MqttClientOptions options) { diff --git a/MQTTnet.Universal/Properties/AssemblyInfo.cs b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs similarity index 81% rename from MQTTnet.Universal/Properties/AssemblyInfo.cs rename to Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs index 3b16f2b96..1e2685caf 100644 --- a/MQTTnet.Universal/Properties/AssemblyInfo.cs +++ b/Frameworks/MQTTnet.UniversalWindows/Properties/AssemblyInfo.cs @@ -1,8 +1,8 @@ using System.Reflection; using System.Runtime.InteropServices; -[assembly: AssemblyTitle("MQTTnet.Universal")] -[assembly: AssemblyDescription("")] +[assembly: AssemblyTitle("MQTTnet")] +[assembly: AssemblyDescription("MQTTnet for Universal Windows")] [assembly: AssemblyConfiguration("")] [assembly: AssemblyCompany("Christian Kratky")] [assembly: AssemblyProduct("MQTTnet")] diff --git a/MQTTnet.Universal/Properties/MQTTnet.Universal.rd.xml b/Frameworks/MQTTnet.UniversalWindows/Properties/MQTTnet.Universal.rd.xml similarity index 100% rename from MQTTnet.Universal/Properties/MQTTnet.Universal.rd.xml rename to Frameworks/MQTTnet.UniversalWindows/Properties/MQTTnet.Universal.rd.xml diff --git a/MQTTnet.Universal/project.json b/Frameworks/MQTTnet.UniversalWindows/project.json similarity index 100% rename from MQTTnet.Universal/project.json rename to Frameworks/MQTTnet.UniversalWindows/project.json diff --git a/MQTT.NET.Core/Client/MqttClientStatistics.cs b/MQTT.NET.Core/Client/MqttClientStatistics.cs deleted file mode 100644 index c284e84d9..000000000 --- a/MQTT.NET.Core/Client/MqttClientStatistics.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace MQTTnet.Core.Client -{ - public class MqttClientStatistics - { - public int SentPackets { get; set; } - } -} diff --git a/MQTT.NET.Core/Client/MqttPacketAwaiter.cs b/MQTT.NET.Core/Client/MqttPacketAwaiter.cs deleted file mode 100644 index 4ee415f39..000000000 --- a/MQTT.NET.Core/Client/MqttPacketAwaiter.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System; -using System.Threading.Tasks; -using MQTTnet.Core.Packets; - -namespace MQTTnet.Core.Client -{ - public class MqttPacketAwaiter - { - private readonly TaskCompletionSource _taskCompletionSource = new TaskCompletionSource(); - private readonly Func _packetSelector; - - public MqttPacketAwaiter(Func packetSelector) - { - if (packetSelector == null) throw new ArgumentNullException(nameof(packetSelector)); - - _packetSelector = packetSelector; - } - - public Task Task => _taskCompletionSource.Task; - - public bool CheckPacket(MqttBasePacket packet) - { - if (!_packetSelector(packet)) - { - return false; - } - - _taskCompletionSource.SetResult(packet); - return true; - } - } -} \ No newline at end of file diff --git a/MQTT.NET.Core/Diagnostics/MqttTrace.cs b/MQTT.NET.Core/Diagnostics/MqttTrace.cs deleted file mode 100644 index ba72b367b..000000000 --- a/MQTT.NET.Core/Diagnostics/MqttTrace.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System; - -namespace MQTTnet.Core.Diagnostics -{ - public static class MqttTrace - { - public static event EventHandler TraceMessagePublished; - - public static void Verbose(string source, string message) - { - TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Verbose, message, null)); - } - - public static void Information(string source, string message) - { - TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Information, message, null)); - } - - public static void Warning(string source, string message) - { - TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Warning, message, null)); - } - - public static void Warning(string source, Exception exception, string message) - { - TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Warning, message, exception)); - } - - public static void Error(string source, string message) - { - TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Error, message, null)); - } - - public static void Error(string source, Exception exception, string message) - { - TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, MqttTraceLevel.Error, message, exception)); - } - } -} diff --git a/MQTT.NET.Core/DictionaryExtensions.cs b/MQTT.NET.Core/DictionaryExtensions.cs deleted file mode 100644 index da405aef3..000000000 --- a/MQTT.NET.Core/DictionaryExtensions.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using System.Collections.Generic; - -namespace MQTTnet.Core -{ - public static class DictionaryExtensions - { - public static TValue Take(this IDictionary dictionary, TKey key) - { - if (dictionary == null) throw new ArgumentNullException(nameof(dictionary)); - - TValue value; - if (dictionary.TryGetValue(key, out value)) - { - dictionary.Remove(key); - } - - return value; - } - } -} diff --git a/MQTT.NET.Core/Packets/MqttBasePacket.cs b/MQTT.NET.Core/Packets/MqttBasePacket.cs deleted file mode 100644 index 41901e524..000000000 --- a/MQTT.NET.Core/Packets/MqttBasePacket.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace MQTTnet.Core.Packets -{ - public abstract class MqttBasePacket - { - } -} diff --git a/MQTT.NET.Core/Packets/MqttPubAckPacket.cs b/MQTT.NET.Core/Packets/MqttPubAckPacket.cs deleted file mode 100644 index da61768a4..000000000 --- a/MQTT.NET.Core/Packets/MqttPubAckPacket.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace MQTTnet.Core.Packets -{ - public class MqttPubAckPacket : MqttBasePacket - { - public ushort PacketIdentifier { get; set; } - } -} diff --git a/MQTT.NET.Core/Packets/MqttPubCompPacket.cs b/MQTT.NET.Core/Packets/MqttPubCompPacket.cs deleted file mode 100644 index 9912ae80d..000000000 --- a/MQTT.NET.Core/Packets/MqttPubCompPacket.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace MQTTnet.Core.Packets -{ - public class MqttPubCompPacket : MqttBasePacket - { - public ushort PacketIdentifier { get; set; } - } -} diff --git a/MQTT.NET.Core/Packets/MqttPubRelPacket.cs b/MQTT.NET.Core/Packets/MqttPubRelPacket.cs deleted file mode 100644 index 787be0a3b..000000000 --- a/MQTT.NET.Core/Packets/MqttPubRelPacket.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace MQTTnet.Core.Packets -{ - public class MqttPubRelPacket : MqttBasePacket - { - public ushort PacketIdentifier { get; set; } - } -} diff --git a/MQTT.NET.Core/Packets/MqttPublishPacket.cs b/MQTT.NET.Core/Packets/MqttPublishPacket.cs deleted file mode 100644 index 912e1a8c5..000000000 --- a/MQTT.NET.Core/Packets/MqttPublishPacket.cs +++ /dev/null @@ -1,19 +0,0 @@ -using MQTTnet.Core.Protocol; - -namespace MQTTnet.Core.Packets -{ - public class MqttPublishPacket : MqttBasePacket - { - public ushort? PacketIdentifier { get; set; } - - public bool Retain { get; set; } - - public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; } - - public bool Dup { get; set; } - - public string Topic { get; set; } - - public byte[] Payload { get; set; } - } -} diff --git a/MQTT.NET.Core/Packets/MqttSubAckPacket.cs b/MQTT.NET.Core/Packets/MqttSubAckPacket.cs deleted file mode 100644 index 785af90e4..000000000 --- a/MQTT.NET.Core/Packets/MqttSubAckPacket.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Collections.Generic; -using MQTTnet.Core.Protocol; - -namespace MQTTnet.Core.Packets -{ - public class MqttSubAckPacket : MqttBasePacket - { - public ushort PacketIdentifier { get; set; } - - public List SubscribeReturnCodes { get; set; } = new List(); - } -} diff --git a/MQTT.NET.Core/Packets/MqttSubscribePacket.cs b/MQTT.NET.Core/Packets/MqttSubscribePacket.cs deleted file mode 100644 index 615715657..000000000 --- a/MQTT.NET.Core/Packets/MqttSubscribePacket.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Collections.Generic; - -namespace MQTTnet.Core.Packets -{ - public class MqttSubscribePacket : MqttBasePacket - { - public ushort PacketIdentifier { get; set; } - - public IList TopicFilters { get; set; } = new List(); - } -} diff --git a/MQTT.NET.Core/Adapter/IMqttAdapter.cs b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs similarity index 74% rename from MQTT.NET.Core/Adapter/IMqttAdapter.cs rename to MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs index d51e8afe2..6651e0515 100644 --- a/MQTT.NET.Core/Adapter/IMqttAdapter.cs +++ b/MQTTnet.Core/Adapter/IMqttCommunicationAdapter.cs @@ -5,7 +5,7 @@ namespace MQTTnet.Core.Adapter { - public interface IMqttAdapter + public interface IMqttCommunicationAdapter { Task ConnectAsync(MqttClientOptions options, TimeSpan timeout); @@ -13,6 +13,6 @@ public interface IMqttAdapter Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout); - Task ReceivePacket(); + Task ReceivePacketAsync(TimeSpan timeout); } } diff --git a/MQTTnet.Core/Adapter/IMqttServerAdapter.cs b/MQTTnet.Core/Adapter/IMqttServerAdapter.cs new file mode 100644 index 000000000..98d51bbbe --- /dev/null +++ b/MQTTnet.Core/Adapter/IMqttServerAdapter.cs @@ -0,0 +1,14 @@ +using System; +using MQTTnet.Core.Server; + +namespace MQTTnet.Core.Adapter +{ + public interface IMqttServerAdapter + { + event EventHandler ClientConnected; + + void Start(MqttServerOptions options); + + void Stop(); + } +} diff --git a/MQTT.NET.Core/Adapter/MqttChannelAdapter.cs b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs similarity index 51% rename from MQTT.NET.Core/Adapter/MqttChannelAdapter.cs rename to MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs index ef6e59f12..ae90984f1 100644 --- a/MQTT.NET.Core/Adapter/MqttChannelAdapter.cs +++ b/MQTTnet.Core/Adapter/MqttChannelCommunicationAdapter.cs @@ -9,18 +9,15 @@ namespace MQTTnet.Core.Adapter { - public class MqttChannelAdapter : IMqttAdapter + public class MqttChannelCommunicationAdapter : IMqttCommunicationAdapter { private readonly IMqttPacketSerializer _serializer; - private readonly IMqttTransportChannel _channel; + private readonly IMqttCommunicationChannel _channel; - public MqttChannelAdapter(IMqttTransportChannel channel, IMqttPacketSerializer serializer) + public MqttChannelCommunicationAdapter(IMqttCommunicationChannel channel, IMqttPacketSerializer serializer) { - if (channel == null) throw new ArgumentNullException(nameof(channel)); - if (serializer == null) throw new ArgumentNullException(nameof(serializer)); - - _channel = channel; - _serializer = serializer; + _channel = channel ?? throw new ArgumentNullException(nameof(channel)); + _serializer = serializer ?? throw new ArgumentNullException(nameof(serializer)); } public async Task ConnectAsync(MqttClientOptions options, TimeSpan timeout) @@ -39,7 +36,7 @@ public async Task DisconnectAsync() public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) { - MqttTrace.Information(nameof(MqttChannelAdapter), $"Sending with timeout {timeout} >>> {packet}"); + MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"TX >>> {packet} [Timeout={timeout}]"); bool hasTimeout; try @@ -58,15 +55,34 @@ public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout) } } - public async Task ReceivePacket() + public async Task ReceivePacketAsync(TimeSpan timeout) { - var mqttPacket = await _serializer.DeserializeAsync(_channel); - if (mqttPacket == null) + MqttBasePacket packet; + if (timeout > TimeSpan.Zero) + { + var workerTask = _serializer.DeserializeAsync(_channel); + var timeoutTask = Task.Delay(timeout); + var hasTimeout = Task.WhenAny(timeoutTask, workerTask) == timeoutTask; + + if (hasTimeout) + { + throw new MqttCommunicationTimedOutException(); + } + + packet = workerTask.Result; + } + else + { + packet = await _serializer.DeserializeAsync(_channel); + } + + if (packet == null) { throw new MqttProtocolViolationException("Received malformed packet."); } - return mqttPacket; + MqttTrace.Information(nameof(MqttChannelCommunicationAdapter), $"RX <<< {packet}"); + return packet; } } } \ No newline at end of file diff --git a/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs b/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs new file mode 100644 index 000000000..fc0fd8b73 --- /dev/null +++ b/MQTTnet.Core/Adapter/MqttClientConnectedEventArgs.cs @@ -0,0 +1,17 @@ +using System; + +namespace MQTTnet.Core.Adapter +{ + public class MqttClientConnectedEventArgs : EventArgs + { + public MqttClientConnectedEventArgs(string identifier, IMqttCommunicationAdapter clientAdapter) + { + Identifier = identifier ?? throw new ArgumentNullException(nameof(identifier)); + ClientAdapter = clientAdapter ?? throw new ArgumentNullException(nameof(clientAdapter)); + } + + public string Identifier { get; } + + public IMqttCommunicationAdapter ClientAdapter { get; } + } +} diff --git a/MQTT.NET.Core/Adapter/MqttConnectingFailedException.cs b/MQTTnet.Core/Adapter/MqttConnectingFailedException.cs similarity index 100% rename from MQTT.NET.Core/Adapter/MqttConnectingFailedException.cs rename to MQTTnet.Core/Adapter/MqttConnectingFailedException.cs diff --git a/MQTT.NET.Core/Channel/IMqttTransportChannel.cs b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs similarity index 85% rename from MQTT.NET.Core/Channel/IMqttTransportChannel.cs rename to MQTTnet.Core/Channel/IMqttCommunicationChannel.cs index 218c2e495..a1ec890ed 100644 --- a/MQTT.NET.Core/Channel/IMqttTransportChannel.cs +++ b/MQTTnet.Core/Channel/IMqttCommunicationChannel.cs @@ -3,7 +3,7 @@ namespace MQTTnet.Core.Channel { - public interface IMqttTransportChannel + public interface IMqttCommunicationChannel { Task ConnectAsync(MqttClientOptions options); diff --git a/MQTT.NET.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs similarity index 65% rename from MQTT.NET.Core/Client/MqttClient.cs rename to MQTTnet.Core/Client/MqttClient.cs index 1b0ad5084..db946fe69 100644 --- a/MQTT.NET.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -6,6 +7,7 @@ using MQTTnet.Core.Adapter; using MQTTnet.Core.Diagnostics; using MQTTnet.Core.Exceptions; +using MQTTnet.Core.Internal; using MQTTnet.Core.Packets; using MQTTnet.Core.Protocol; @@ -13,22 +15,20 @@ namespace MQTTnet.Core.Client { public class MqttClient { - private readonly Dictionary _pendingExactlyOncePublishPackets = new Dictionary(); + private readonly ConcurrentDictionary _pendingExactlyOncePublishPackets = new ConcurrentDictionary(); private readonly HashSet _processedPublishPackets = new HashSet(); + private readonly MqttPacketDispatcher _packetDispatcher = new MqttPacketDispatcher(); private readonly MqttClientOptions _options; - private readonly IMqttAdapter _adapter; + private readonly IMqttCommunicationAdapter _adapter; private int _latestPacketIdentifier; private CancellationTokenSource _cancellationTokenSource; - public MqttClient(MqttClientOptions options, IMqttAdapter adapter) + public MqttClient(MqttClientOptions options, IMqttCommunicationAdapter adapter) { - if (options == null) throw new ArgumentNullException(nameof(options)); - if (adapter == null) throw new ArgumentNullException(nameof(adapter)); - - _options = options; - _adapter = adapter; + _options = options ?? throw new ArgumentNullException(nameof(options)); + _adapter = adapter ?? throw new ArgumentNullException(nameof(adapter)); } public event EventHandler Connected; @@ -56,7 +56,7 @@ public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = n KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, WillMessage = willApplicationMessage }; - + await _adapter.ConnectAsync(_options, _options.DefaultCommunicationTimeout); MqttTrace.Verbose(nameof(MqttClient), "Connection with server established."); @@ -66,10 +66,9 @@ public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = n _packetDispatcher.Reset(); IsConnected = true; - Task.Factory.StartNew(async () => await ReceivePackets( - _cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Forget(); + Task.Run(async () => await ReceivePackets(_cancellationTokenSource.Token), _cancellationTokenSource.Token).Forget(); - var response = await SendAndReceiveAsync(connectPacket, p => true); + var response = await SendAndReceiveAsync(connectPacket); if (response.ConnectReturnCode != MqttConnectReturnCode.ConnectionAccepted) { throw new MqttConnectingFailedException(response.ConnectReturnCode); @@ -77,8 +76,7 @@ public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = n if (_options.KeepAlivePeriod != TimeSpan.Zero) { - Task.Factory.StartNew(async () => await SendKeepAliveMessagesAsync( - _cancellationTokenSource.Token), _cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Forget(); + Task.Run(async () => await SendKeepAliveMessagesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token).Forget(); } Connected?.Invoke(this, EventArgs.Empty); @@ -90,12 +88,11 @@ public async Task DisconnectAsync() await DisconnectInternalAsync(); } - private void ThrowIfNotConnected() + public async Task> SubscribeAsync(params TopicFilter[] topicFilters) { - if (!IsConnected) - { - throw new MqttCommunicationException("The client is not connected."); - } + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + return await SubscribeAsync(topicFilters.ToList()); } public async Task> SubscribeAsync(IList topicFilters) @@ -110,8 +107,7 @@ public async Task> SubscribeAsync(IList TopicFilters = topicFilters }; - Func packetSelector = p => p.PacketIdentifier == subscribePacket.PacketIdentifier; - var response = await SendAndReceiveAsync(subscribePacket, packetSelector); + var response = await SendAndReceiveAsync(subscribePacket); if (response.SubscribeReturnCodes.Count != topicFilters.Count) { @@ -127,6 +123,13 @@ public async Task> SubscribeAsync(IList return result; } + public async Task Unsubscribe(params string[] topicFilters) + { + if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); + + await Unsubscribe(topicFilters.ToList()); + } + public async Task Unsubscribe(IList topicFilters) { if (topicFilters == null) throw new ArgumentNullException(nameof(topicFilters)); @@ -139,8 +142,7 @@ public async Task Unsubscribe(IList topicFilters) TopicFilters = topicFilters }; - Func packetSelector = p => p.PacketIdentifier == unsubscribePacket.PacketIdentifier; - await SendAndReceiveAsync(unsubscribePacket, packetSelector); + await SendAndReceiveAsync(unsubscribePacket); } public async Task PublishAsync(MqttApplicationMessage applicationMessage) @@ -148,37 +150,30 @@ public async Task PublishAsync(MqttApplicationMessage applicationMessage) if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage)); ThrowIfNotConnected(); - var publishPacket = new MqttPublishPacket - { - Topic = applicationMessage.Topic, - Payload = applicationMessage.Payload, - QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel, - Retain = applicationMessage.Retain, - Dup = false - }; + var publishPacket = applicationMessage.ToPublishPacket(); - if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce) + if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await SendAsync(publishPacket); } - - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) + else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { - if (!publishPacket.PacketIdentifier.HasValue) throw new InvalidOperationException(); - - Func packageSelector = p => p.PacketIdentifier == publishPacket.PacketIdentifier.Value; - await SendAndReceiveAsync(publishPacket, packageSelector); + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await SendAndReceiveAsync(publishPacket); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - if (!publishPacket.PacketIdentifier.HasValue) throw new InvalidOperationException(); - - Func packageSelector = p => p.PacketIdentifier == publishPacket.PacketIdentifier.Value; - await SendAndReceiveAsync(publishPacket, packageSelector); - await SendAsync(new MqttPubCompPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value }); + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await SendAndReceiveAsync(publishPacket); + await SendAsync(publishPacket.CreateResponse()); } } + private void ThrowIfNotConnected() + { + if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); + } + private async Task DisconnectInternalAsync() { try @@ -196,38 +191,49 @@ private async Task DisconnectInternalAsync() } } - private async void ProcessIncomingPacket(MqttBasePacket mqttPacket) + private async void ProcessReceivedPacket(MqttBasePacket mqttPacket) { - var publishPacket = mqttPacket as MqttPublishPacket; - if (publishPacket != null) + try { - await ProcessReceivedPublishPacket(publishPacket); - return; - } + if (mqttPacket is MqttPingReqPacket) + { + await SendAsync(new MqttPingRespPacket()); + return; + } - var pingReqPacket = mqttPacket as MqttPingReqPacket; - if (pingReqPacket != null) - { - await SendAsync(new MqttPingRespPacket()); - return; - } + if (mqttPacket is MqttDisconnectPacket) + { + await DisconnectAsync(); + return; + } + + var publishPacket = mqttPacket as MqttPublishPacket; + if (publishPacket != null) + { + await ProcessReceivedPublishPacket(publishPacket); + return; + } + + var pubRelPacket = mqttPacket as MqttPubRelPacket; + if (pubRelPacket != null) + { + await ProcessReceivedPubRelPacket(pubRelPacket); + return; + } - var pubRelPacket = mqttPacket as MqttPubRelPacket; - if (pubRelPacket != null) + _packetDispatcher.Dispatch(mqttPacket); + } + catch (Exception exception) { - await ProcessReceivedPubRelPacket(pubRelPacket); - return; + MqttTrace.Error(nameof(MqttClient), exception, "Error while processing received packet."); } - - _packetDispatcher.Dispatch(mqttPacket); } private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket) { if (publishPacket.QualityOfServiceLevel != MqttQualityOfServiceLevel.AtMostOnce) { - if (publishPacket.PacketIdentifier == null) throw new InvalidOperationException(); - _processedPublishPackets.Add(publishPacket.PacketIdentifier.Value); + _processedPublishPackets.Add(publishPacket.PacketIdentifier); } var applicationMessage = new MqttApplicationMessage( @@ -240,15 +246,6 @@ private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(applicationMessage)); } - private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) - { - var originalPublishPacket = _pendingExactlyOncePublishPackets.Take(pubRelPacket.PacketIdentifier); - if (originalPublishPacket == null) throw new MqttCommunicationException(); - await SendAsync(new MqttPubCompPacket { PacketIdentifier = pubRelPacket.PacketIdentifier }); - - FireApplicationMessageReceivedEvent(originalPublishPacket); - } - private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) { if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce) @@ -257,43 +254,63 @@ private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) } else { - if (!publishPacket.PacketIdentifier.HasValue) { throw new InvalidOperationException(); } - if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { FireApplicationMessageReceivedEvent(publishPacket); - await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value }); + await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - _pendingExactlyOncePublishPackets.Add(publishPacket.PacketIdentifier.Value, publishPacket); - await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier.Value }); + _pendingExactlyOncePublishPackets[publishPacket.PacketIdentifier] = publishPacket; + await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); } } } - + + private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket) + { + MqttPublishPacket originalPublishPacket; + if (!_pendingExactlyOncePublishPackets.TryRemove(pubRelPacket.PacketIdentifier, out originalPublishPacket)) + { + throw new MqttCommunicationException(); + } + + await SendAsync(originalPublishPacket.CreateResponse()); + + FireApplicationMessageReceivedEvent(originalPublishPacket); + } + private async Task SendAsync(MqttBasePacket packet) { await _adapter.SendPacketAsync(packet, _options.DefaultCommunicationTimeout); } - private async Task SendAndReceiveAsync( - MqttBasePacket requestPacket, Func responsePacketSelector) where TResponsePacket : MqttBasePacket + private async Task SendAndReceiveAsync(MqttBasePacket requestPacket) where TResponsePacket : MqttBasePacket { - Func selector = p => + Func responsePacketSelector = p => { var p1 = p as TResponsePacket; - return p1 != null && responsePacketSelector(p1); - }; + if (p1 == null) + { + return false; + } - return (TResponsePacket)await SendAndReceiveAsync(requestPacket, selector); - } + var pi1 = requestPacket as IPacketWithIdentifier; + var pi2 = p as IPacketWithIdentifier; + + if (pi1 != null && pi2 != null) + { + if (pi1.PacketIdentifier != pi2.PacketIdentifier) + { + return false; + } + } + + return true; + }; - private async Task SendAndReceiveAsync(MqttBasePacket requestPacket, Func responsePacketSelector) - { - var waitTask = _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout); await _adapter.SendPacketAsync(requestPacket, _options.DefaultCommunicationTimeout); - return await waitTask; + return (TResponsePacket)await _packetDispatcher.WaitForPacketAsync(responsePacketSelector, _options.DefaultCommunicationTimeout); } private ushort GetNewPacketIdentifier() @@ -310,16 +327,20 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke while (!cancellationToken.IsCancellationRequested) { await Task.Delay(_options.KeepAlivePeriod, cancellationToken); - await SendAndReceiveAsync(new MqttPingReqPacket(), p => true); + await SendAndReceiveAsync(new MqttPingReqPacket()); } } + catch (MqttCommunicationException) + { + } catch (Exception exception) { - MqttTrace.Error(nameof(MqttClient), exception, "Error while sending keep alive packets."); + MqttTrace.Warning(nameof(MqttClient), exception, "Error while sending/receiving keep alive packets."); } finally { MqttTrace.Information(nameof(MqttClient), "Stopped sending keep alive packets."); + await DisconnectInternalAsync(); } } @@ -330,20 +351,23 @@ private async Task ReceivePackets(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { - var mqttPacket = await _adapter.ReceivePacket(); - MqttTrace.Information(nameof(MqttChannelAdapter), $"Received <<< {mqttPacket}"); + var mqttPacket = await _adapter.ReceivePacketAsync(TimeSpan.Zero); + MqttTrace.Information(nameof(MqttClient), $"Received <<< {mqttPacket}"); - Task.Run(() => ProcessIncomingPacket(mqttPacket), cancellationToken).Forget(); + Task.Run(() => ProcessReceivedPacket(mqttPacket), cancellationToken).Forget(); } } + catch (MqttCommunicationException) + { + } catch (Exception exception) { MqttTrace.Error(nameof(MqttClient), exception, "Error while receiving packets."); - await DisconnectInternalAsync(); } finally { MqttTrace.Information(nameof(MqttClient), "Stopped receiving packets."); + await DisconnectInternalAsync(); } } } diff --git a/MQTT.NET.Core/Client/MqttClientOptions.cs b/MQTTnet.Core/Client/MqttClientOptions.cs similarity index 100% rename from MQTT.NET.Core/Client/MqttClientOptions.cs rename to MQTTnet.Core/Client/MqttClientOptions.cs diff --git a/MQTTnet.Core/Client/MqttPacketAwaiter.cs b/MQTTnet.Core/Client/MqttPacketAwaiter.cs new file mode 100644 index 000000000..b7a655564 --- /dev/null +++ b/MQTTnet.Core/Client/MqttPacketAwaiter.cs @@ -0,0 +1,16 @@ +using System; +using System.Threading.Tasks; +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Client +{ + public class MqttPacketAwaiter : TaskCompletionSource + { + public MqttPacketAwaiter(Func packetSelector) + { + PacketSelector = packetSelector ?? throw new ArgumentNullException(nameof(packetSelector)); + } + + public Func PacketSelector { get; } + } +} \ No newline at end of file diff --git a/MQTT.NET.Core/Client/MqttPacketDispatcher.cs b/MQTTnet.Core/Client/MqttPacketDispatcher.cs similarity index 54% rename from MQTT.NET.Core/Client/MqttPacketDispatcher.cs rename to MQTTnet.Core/Client/MqttPacketDispatcher.cs index 830aee458..b613f06c5 100644 --- a/MQTT.NET.Core/Client/MqttPacketDispatcher.cs +++ b/MQTTnet.Core/Client/MqttPacketDispatcher.cs @@ -9,25 +9,27 @@ namespace MQTTnet.Core.Client { public class MqttPacketDispatcher { + private readonly object _syncRoot = new object(); + private readonly List _receivedPackets = new List(); private readonly List _packetAwaiters = new List(); public async Task WaitForPacketAsync(Func selector, TimeSpan timeout) { if (selector == null) throw new ArgumentNullException(nameof(selector)); - var waitHandle = new MqttPacketAwaiter(selector); - AddPacketAwaiter(waitHandle); + var packetAwaiter = AddPacketAwaiter(selector); + DispatchPendingPackets(); - var hasTimeout = await Task.WhenAny(Task.Delay(timeout), waitHandle.Task) != waitHandle.Task; - RemovePacketAwaiter(waitHandle); + var hasTimeout = await Task.WhenAny(Task.Delay(timeout), packetAwaiter.Task) != packetAwaiter.Task; + RemovePacketAwaiter(packetAwaiter); if (hasTimeout) { - MqttTrace.Error(nameof(MqttPacketDispatcher), $"Timeout while waiting for packet."); + MqttTrace.Warning(nameof(MqttPacketDispatcher), "Timeout while waiting for packet."); throw new MqttCommunicationTimedOutException(); } - return waitHandle.Task.Result; + return packetAwaiter.Task.Result; } public void Dispatch(MqttBasePacket packet) @@ -37,15 +39,33 @@ public void Dispatch(MqttBasePacket packet) var packetDispatched = false; foreach (var packetAwaiter in GetPacketAwaiters()) { - if (packetAwaiter.CheckPacket(packet)) + if (packetAwaiter.PacketSelector(packet)) { + packetAwaiter.SetResult(packet); packetDispatched = true; + break; } } - if (!packetDispatched) + lock (_syncRoot) { - MqttTrace.Warning(nameof(MqttPacketDispatcher), $"Received packet '{packet}' not dispatched."); + if (!packetDispatched) + { + _receivedPackets.Add(packet); + } + else + { + _receivedPackets.Remove(packet); + } + } + } + + public void Reset() + { + lock (_syncRoot) + { + _packetAwaiters.Clear(); + _receivedPackets.Clear(); } } @@ -57,27 +77,35 @@ private List GetPacketAwaiters() } } - private void AddPacketAwaiter(MqttPacketAwaiter packetAwaiter) + private MqttPacketAwaiter AddPacketAwaiter(Func selector) { - lock (_packetAwaiters) + lock (_syncRoot) { + var packetAwaiter = new MqttPacketAwaiter(selector); _packetAwaiters.Add(packetAwaiter); + return packetAwaiter; } } private void RemovePacketAwaiter(MqttPacketAwaiter packetAwaiter) { - lock (_packetAwaiters) + lock (_syncRoot) { _packetAwaiters.Remove(packetAwaiter); } } - public void Reset() + private void DispatchPendingPackets() { - lock (_packetAwaiters) + List receivedPackets; + lock (_syncRoot) { - _packetAwaiters.Clear(); + receivedPackets = new List(_receivedPackets); + } + + foreach (var pendingPacket in receivedPackets) + { + Dispatch(pendingPacket); } } } diff --git a/MQTT.NET.Core/Client/MqttSubscribeResult.cs b/MQTTnet.Core/Client/MqttSubscribeResult.cs similarity index 100% rename from MQTT.NET.Core/Client/MqttSubscribeResult.cs rename to MQTTnet.Core/Client/MqttSubscribeResult.cs diff --git a/MQTTnet.Core/Diagnostics/MqttTrace.cs b/MQTTnet.Core/Diagnostics/MqttTrace.cs new file mode 100644 index 000000000..86f5b45a6 --- /dev/null +++ b/MQTTnet.Core/Diagnostics/MqttTrace.cs @@ -0,0 +1,44 @@ +using System; + +namespace MQTTnet.Core.Diagnostics +{ + public static class MqttTrace + { + public static event EventHandler TraceMessagePublished; + + public static void Verbose(string source, string message) + { + Publish(source, MqttTraceLevel.Verbose, null, message); + } + + public static void Information(string source, string message) + { + Publish(source, MqttTraceLevel.Information, null, message); + } + + public static void Warning(string source, string message) + { + Publish(source, MqttTraceLevel.Warning, null, message); + } + + public static void Warning(string source, Exception exception, string message) + { + Publish(source, MqttTraceLevel.Warning, exception, message); + } + + public static void Error(string source, string message) + { + Publish(source, MqttTraceLevel.Error, null, message); + } + + public static void Error(string source, Exception exception, string message) + { + Publish(source, MqttTraceLevel.Error, exception, message); + } + + private static void Publish(string source, MqttTraceLevel traceLevel, Exception exception, string message) + { + TraceMessagePublished?.Invoke(null, new MqttTraceMessagePublishedEventArgs(Environment.CurrentManagedThreadId, source, traceLevel, message, exception)); + } + } +} diff --git a/MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs b/MQTTnet.Core/Diagnostics/MqttTraceLevel.cs similarity index 100% rename from MQTT.NET.Core/Diagnostics/MqttTraceLevel.cs rename to MQTTnet.Core/Diagnostics/MqttTraceLevel.cs diff --git a/MQTT.NET.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs b/MQTTnet.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs similarity index 100% rename from MQTT.NET.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs rename to MQTTnet.Core/Diagnostics/MqttTraceMessagePublishedEventArgs.cs diff --git a/MQTT.NET.Core/Exceptions/MqttCommunicationException.cs b/MQTTnet.Core/Exceptions/MqttCommunicationException.cs similarity index 100% rename from MQTT.NET.Core/Exceptions/MqttCommunicationException.cs rename to MQTTnet.Core/Exceptions/MqttCommunicationException.cs diff --git a/MQTT.NET.Core/Exceptions/MqttCommunicationTimedOutException.cs b/MQTTnet.Core/Exceptions/MqttCommunicationTimedOutException.cs similarity index 100% rename from MQTT.NET.Core/Exceptions/MqttCommunicationTimedOutException.cs rename to MQTTnet.Core/Exceptions/MqttCommunicationTimedOutException.cs diff --git a/MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs b/MQTTnet.Core/Exceptions/MqttProtocolViolationException.cs similarity index 61% rename from MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs rename to MQTTnet.Core/Exceptions/MqttProtocolViolationException.cs index 153e86789..95d72b8c6 100644 --- a/MQTT.NET.Core/Exceptions/MqttProtocolViolationException.cs +++ b/MQTTnet.Core/Exceptions/MqttProtocolViolationException.cs @@ -4,9 +4,9 @@ namespace MQTTnet.Core.Exceptions { public class MqttProtocolViolationException : Exception { - public MqttProtocolViolationException(string message) : base(message) + public MqttProtocolViolationException(string message) + : base(message) { - } } } diff --git a/MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs b/MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs new file mode 100644 index 000000000..71cdb7f90 --- /dev/null +++ b/MQTTnet.Core/Internal/MqttApplicationMessageExtensions.cs @@ -0,0 +1,24 @@ +using MQTTnet.Core.Packets; + +namespace MQTTnet.Core.Internal +{ + internal static class MqttApplicationMessageExtensions + { + public static MqttPublishPacket ToPublishPacket(this MqttApplicationMessage applicationMessage) + { + if (applicationMessage == null) + { + return null; + } + + return new MqttPublishPacket + { + Topic = applicationMessage.Topic, + Payload = applicationMessage.Payload, + QualityOfServiceLevel = applicationMessage.QualityOfServiceLevel, + Retain = applicationMessage.Retain, + Dup = false + }; + } + } +} diff --git a/MQTT.NET.Core/TaskExtensions.cs b/MQTTnet.Core/Internal/TaskExtensions.cs similarity index 62% rename from MQTT.NET.Core/TaskExtensions.cs rename to MQTTnet.Core/Internal/TaskExtensions.cs index d04213a03..418deb094 100644 --- a/MQTT.NET.Core/TaskExtensions.cs +++ b/MQTTnet.Core/Internal/TaskExtensions.cs @@ -1,8 +1,8 @@ using System.Threading.Tasks; -namespace MQTTnet.Core +namespace MQTTnet.Core.Internal { - public static class TaskExtensions + internal static class TaskExtensions { public static void Forget(this Task task) { diff --git a/MQTT.NET.Core/MQTTnet.Core.csproj b/MQTTnet.Core/MQTTnet.Core.csproj similarity index 82% rename from MQTT.NET.Core/MQTTnet.Core.csproj rename to MQTTnet.Core/MQTTnet.Core.csproj index 826d38b33..cacc8d58a 100644 --- a/MQTT.NET.Core/MQTTnet.Core.csproj +++ b/MQTTnet.Core/MQTTnet.Core.csproj @@ -37,22 +37,24 @@ - + + + - - - + + - + + @@ -62,6 +64,7 @@ + @@ -83,7 +86,15 @@ - + + + + + + + + +