diff --git a/Build/MQTTnet.nuspec b/Build/MQTTnet.nuspec index c1cd37182..6b111fc01 100644 --- a/Build/MQTTnet.nuspec +++ b/Build/MQTTnet.nuspec @@ -10,9 +10,11 @@ https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png false MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker). - * [Core] Fixed library reference issues for .NET 4.6 (Thanks to @JanEggers). + * [Core] Fixed library reference issues for .NET 4.6 and netstandard 2.0 (Thanks to @JanEggers). * [Core] Several COM exceptions are now wrapped properly resulting in less warnings in the trace. +* [Core] Removed application message payload from trace to reduce trace size and increase performance. * [Client] Fixed WebSocket sub protocol negotiation for ASP.NET Core 2 servers (Thanks to @JanEggers). +* [Client] Fixed broken connection after 30 seconds then using WebSocket protocol (Thanks to @ChristianRiedl). * [Server] Client connections are now closed when the server is stopped (Thanks to @zhudanfei). * [Server] Published messages from the server are now retained (if set) (Thanks to @ChristianRiedl). BREAKING CHANGE! diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs index 45bf5457f..0e3a3d5fe 100644 --- a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs @@ -1,12 +1,8 @@ using System; -using System.IO; using System.Net.WebSockets; -using System.Threading; using System.Threading.Tasks; using MQTTnet.Core.Adapter; -using MQTTnet.Core.Channel; using MQTTnet.Core.Server; -using MQTTnet.Implementations; namespace MQTTnet.AspNetCore { @@ -47,47 +43,5 @@ public void Dispose() { StopAsync(); } - - private class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable - { - private readonly WebSocket _webSocket; - - public MqttWebSocketServerChannel(WebSocket webSocket) - { - _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); - - RawReceiveStream = new WebSocketStream(_webSocket); - } - - public Stream SendStream => RawReceiveStream; - public Stream ReceiveStream => RawReceiveStream; - public Stream RawReceiveStream { get; } - - public Task ConnectAsync() - { - return Task.CompletedTask; - } - - public Task DisconnectAsync() - { - RawReceiveStream?.Dispose(); - - if (_webSocket == null) - { - return Task.CompletedTask; - } - - return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); - } - - public void Dispose() - { - RawReceiveStream?.Dispose(); - SendStream?.Dispose(); - ReceiveStream?.Dispose(); - - _webSocket?.Dispose(); - } - } } } \ No newline at end of file diff --git a/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs new file mode 100644 index 000000000..54cfb0b9b --- /dev/null +++ b/Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs @@ -0,0 +1,60 @@ +using System; +using System.IO; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; +using MQTTnet.Core.Channel; +using MQTTnet.Implementations; + +namespace MQTTnet.AspNetCore +{ + public class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable + { + private WebSocket _webSocket; + + public MqttWebSocketServerChannel(WebSocket webSocket) + { + _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); + + SendStream = new WebSocketStream(_webSocket); + ReceiveStream = SendStream; + } + + public Stream SendStream { get; private set; } + public Stream ReceiveStream { get; private set; } + + public Task ConnectAsync() + { + return Task.CompletedTask; + } + + public async Task DisconnectAsync() + { + if (_webSocket == null) + { + return; + } + + try + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None); + } + finally + { + Dispose(); + } + } + + public void Dispose() + { + SendStream?.Dispose(); + ReceiveStream?.Dispose(); + + _webSocket?.Dispose(); + + SendStream = null; + ReceiveStream = null; + _webSocket = null; + } + } +} \ No newline at end of file diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs index c7cd3d024..8f56c9457 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/MqttWebSocketChannel.cs @@ -11,6 +11,10 @@ namespace MQTTnet.Implementations { public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable { + // ReSharper disable once MemberCanBePrivate.Global + // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global + public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user. + private readonly MqttClientWebSocketOptions _options; private ClientWebSocket _webSocket; @@ -80,7 +84,7 @@ public async Task DisconnectAsync() await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false); } - _webSocket = null; + Dispose(); } public void Dispose() diff --git a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs index fb2d81216..4ff11dadc 100644 --- a/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs +++ b/Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Generic; using System.IO; +using System.Linq; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -10,7 +12,9 @@ namespace MQTTnet.Implementations public class WebSocketStream : Stream { private readonly WebSocket _webSocket; - + private readonly byte[] _chunkBuffer = new byte[MqttWebSocketChannel.BufferSize]; + private readonly Queue _buffer = new Queue(MqttWebSocketChannel.BufferSize); + public WebSocketStream(WebSocket webSocket) { _webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket)); @@ -34,43 +38,66 @@ public override void Flush() { } + public override Task FlushAsync(CancellationToken cancellationToken) + { + return Task.FromResult(0); + } + + public override int Read(byte[] buffer, int offset, int count) + { + return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + } + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - var currentOffset = offset; - var targetOffset = offset + count; - while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset) + var bytesRead = 0; + + // Use existing date from buffer. + while (count > 0 && _buffer.Any()) { - var response = await _webSocket.ReceiveAsync(new ArraySegment(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false); - currentOffset += response.Count; - count -= response.Count; - - if (response.MessageType == WebSocketMessageType.Close) + buffer[offset++] = _buffer.Dequeue(); + count--; + bytesRead++; + } + + if (count == 0) + { + return bytesRead; + } + + while (_webSocket.State == WebSocketState.Open) + { + await FetchChunkAsync(cancellationToken); + + while (count > 0 && _buffer.Any()) { - await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + buffer[offset++] = _buffer.Dequeue(); + count--; + bytesRead++; + } + + if (count == 0) + { + return bytesRead; } } if (_webSocket.State == WebSocketState.Closed) { - throw new MqttCommunicationException( "connection closed" ); + throw new MqttCommunicationException("WebSocket connection closed."); } - return currentOffset - offset; + return bytesRead; } - public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - return _webSocket.SendAsync(new ArraySegment(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken); - } - - public override int Read(byte[] buffer, int offset, int count) + public override void Write(byte[] buffer, int offset, int count) { - return ReadAsync(buffer, offset, count).GetAwaiter().GetResult(); + WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); } - public override void Write(byte[] buffer, int offset, int count) + public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - WriteAsync(buffer, offset, count).GetAwaiter().GetResult(); + return _webSocket.SendAsync(new ArraySegment(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken); } public override long Seek(long offset, SeekOrigin origin) @@ -82,5 +109,21 @@ public override void SetLength(long value) { throw new NotSupportedException(); } + + private async Task FetchChunkAsync(CancellationToken cancellationToken) + { + var response = await _webSocket.ReceiveAsync(new ArraySegment(_chunkBuffer, 0, _chunkBuffer.Length), cancellationToken).ConfigureAwait(false); + + for (var i = 0; i < response.Count; i++) + { + var @byte = _chunkBuffer[i]; + _buffer.Enqueue(@byte); + } + + if (response.MessageType == WebSocketMessageType.Close) + { + await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); + } + } } } diff --git a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs index c8b7968e3..6037929ec 100644 --- a/Frameworks/MQTTnet.NetStandard/MqttFactory.cs +++ b/Frameworks/MQTTnet.NetStandard/MqttFactory.cs @@ -23,7 +23,7 @@ public MqttFactory() public MqttFactory(IServiceProvider serviceProvider) { - _serviceProvider = serviceProvider; + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); } public ILoggerFactory GetLoggerFactory() diff --git a/MQTTnet.Core/Client/MqttClient.cs b/MQTTnet.Core/Client/MqttClient.cs index 202cbc581..fcab3fba2 100644 --- a/MQTTnet.Core/Client/MqttClient.cs +++ b/MQTTnet.Core/Client/MqttClient.cs @@ -60,14 +60,14 @@ public async Task ConnectAsync(IMqttClientOptions optio await _adapter.ConnectAsync(_options.CommunicationTimeout).ConfigureAwait(false); _logger.LogTrace("Connection with server established."); - await SetupIncomingPacketProcessingAsync().ConfigureAwait(false); + await StartReceivingPacketsAsync(_cancellationTokenSource.Token).ConfigureAwait(false); var connectResponse = await AuthenticateAsync(options.WillMessage).ConfigureAwait(false); _logger.LogTrace("MQTT connection with server established."); if (_options.KeepAlivePeriod != TimeSpan.Zero) { - StartSendKeepAliveMessages(_cancellationTokenSource.Token); + StartSendingKeepAliveMessages(_cancellationTokenSource.Token); } IsConnected = true; @@ -96,7 +96,6 @@ public async Task DisconnectAsync() finally { await DisconnectInternalAsync().ConfigureAwait(false); - _scopeHandle.Dispose(); } } @@ -149,36 +148,36 @@ public async Task PublishAsync(IEnumerable applicationMe switch (qosGroup.Key) { case MqttQualityOfServiceLevel.AtMostOnce: - { - // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] - await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, qosPackets).ConfigureAwait(false); - break; - } - case MqttQualityOfServiceLevel.AtLeastOnce: - { - foreach (var publishPacket in qosPackets) { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + // No packet identifier is used for QoS 0 [3.3.2.2 Packet Identifier] + await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, qosPackets).ConfigureAwait(false); + break; } + case MqttQualityOfServiceLevel.AtLeastOnce: + { + foreach (var publishPacket in qosPackets) + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + } - break; - } + break; + } case MqttQualityOfServiceLevel.ExactlyOnce: - { - foreach (var publishPacket in qosPackets) { - publishPacket.PacketIdentifier = GetNewPacketIdentifier(); - var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); - await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); + foreach (var publishPacket in qosPackets) + { + publishPacket.PacketIdentifier = GetNewPacketIdentifier(); + var pubRecPacket = await SendAndReceiveAsync(publishPacket).ConfigureAwait(false); + await SendAndReceiveAsync(pubRecPacket.CreateResponse()).ConfigureAwait(false); + } + + break; } - - break; - } default: - { - throw new InvalidOperationException(); - } + { + throw new InvalidOperationException(); + } } } } @@ -191,7 +190,7 @@ private async Task AuthenticateAsync(MqttApplicationMessage w Username = _options.Credentials?.Username, Password = _options.Credentials?.Password, CleanSession = _options.CleanSession, - KeepAlivePeriod = (ushort) _options.KeepAlivePeriod.TotalSeconds, + KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds, WillMessage = willApplicationMessage }; @@ -204,24 +203,6 @@ private async Task AuthenticateAsync(MqttApplicationMessage w return response; } - private async Task SetupIncomingPacketProcessingAsync() - { - _isReceivingPackets = false; - -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew( - () => ReceivePackets(_cancellationTokenSource.Token), - _cancellationTokenSource.Token, - TaskCreationOptions.LongRunning, - TaskScheduler.Default).ConfigureAwait(false); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - - while (!_isReceivingPackets && _cancellationTokenSource != null && !_cancellationTokenSource.IsCancellationRequested) - { - await Task.Delay(TimeSpan.FromMilliseconds(100)).ConfigureAwait(false); - } - } - private void ThrowIfNotConnected() { if (!IsConnected) throw new MqttCommunicationException("The client is not connected."); @@ -234,6 +215,8 @@ private void ThrowIfConnected(string message) private async Task DisconnectInternalAsync() { + _scopeHandle?.Dispose(); + var clientWasConnected = IsConnected; IsConnected = false; @@ -258,6 +241,7 @@ private async Task DisconnectInternalAsync() } finally { + _logger.LogInformation("Disconnected."); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected)); } } @@ -324,7 +308,7 @@ private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce) { FireApplicationMessageReceivedEvent(publishPacket); - await SendAsync(new MqttPubAckPacket {PacketIdentifier = publishPacket.PacketIdentifier}); + await SendAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }); return; } @@ -337,7 +321,7 @@ private async Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket) } FireApplicationMessageReceivedEvent(publishPacket); - await SendAsync(new MqttPubRecPacket {PacketIdentifier = publishPacket.PacketIdentifier}); + await SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }); return; } @@ -363,12 +347,12 @@ private async Task SendAndReceiveAsync(MqttBas { var packetAwaiter = _packetDispatcher.WaitForPacketAsync(requestPacket, typeof(TResponsePacket), _options.CommunicationTimeout); await _adapter.SendPacketsAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token, requestPacket).ConfigureAwait(false); - return (TResponsePacket) await packetAwaiter.ConfigureAwait(false); + return (TResponsePacket)await packetAwaiter.ConfigureAwait(false); } private ushort GetNewPacketIdentifier() { - return (ushort) Interlocked.Increment(ref _latestPacketIdentifier); + return (ushort)Interlocked.Increment(ref _latestPacketIdentifier); } private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToken) @@ -390,6 +374,12 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke } catch (OperationCanceledException) { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await DisconnectInternalAsync().ConfigureAwait(false); } catch (MqttCommunicationException exception) { @@ -412,7 +402,7 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke } } - private async Task ReceivePackets(CancellationToken cancellationToken) + private async Task ReceivePacketsAsync(CancellationToken cancellationToken) { _logger.LogInformation("Start receiving packets."); @@ -423,6 +413,7 @@ private async Task ReceivePackets(CancellationToken cancellationToken) _isReceivingPackets = true; var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); + if (cancellationToken.IsCancellationRequested) { return; @@ -433,6 +424,12 @@ private async Task ReceivePackets(CancellationToken cancellationToken) } catch (OperationCanceledException) { + if (cancellationToken.IsCancellationRequested) + { + return; + } + + await DisconnectInternalAsync().ConfigureAwait(false); } catch (MqttCommunicationException exception) { @@ -458,15 +455,34 @@ private async Task ReceivePackets(CancellationToken cancellationToken) private void StartProcessReceivedPacket(MqttBasePacket packet, CancellationToken cancellationToken) { #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Run(() => ProcessReceivedPacketAsync(packet), cancellationToken).ConfigureAwait(false); + Task.Run( + () => ProcessReceivedPacketAsync(packet), + cancellationToken).ConfigureAwait(false); +#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + } + + private async Task StartReceivingPacketsAsync(CancellationToken cancellationToken) + { + _isReceivingPackets = false; + +#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + Task.Run( + async () => await ReceivePacketsAsync(cancellationToken), + cancellationToken).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed + + while (!_isReceivingPackets && !cancellationToken.IsCancellationRequested) + { + await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken).ConfigureAwait(false); + } } - private void StartSendKeepAliveMessages(CancellationToken cancellationToken) + private void StartSendingKeepAliveMessages(CancellationToken cancellationToken) { #pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - Task.Factory.StartNew(() => SendKeepAliveMessagesAsync(cancellationToken), cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default) - .ConfigureAwait(false); + Task.Run( + async () => await SendKeepAliveMessagesAsync(cancellationToken), + cancellationToken).ConfigureAwait(false); #pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed } } diff --git a/MQTTnet.Core/Packets/MqttPublishPacket.cs b/MQTTnet.Core/Packets/MqttPublishPacket.cs index 447aaed7e..b7d10d118 100644 --- a/MQTTnet.Core/Packets/MqttPublishPacket.cs +++ b/MQTTnet.Core/Packets/MqttPublishPacket.cs @@ -1,5 +1,4 @@ -using System; -using MQTTnet.Core.Protocol; +using MQTTnet.Core.Protocol; namespace MQTTnet.Core.Packets { @@ -19,7 +18,7 @@ public override string ToString() { return nameof(MqttPublishPacket) + ": [Topic=" + Topic + "]" + - " [Payload=" + Convert.ToBase64String(Payload) + "]" + + " [PayloadLength=" + Payload?.Length + "]" + " [QoSLevel=" + QualityOfServiceLevel + "]" + " [Dup=" + Dup + "]" + " [Retain=" + Retain + "]" + diff --git a/MQTTnet.Core/Serializer/MqttPacketReader.cs b/MQTTnet.Core/Serializer/MqttPacketReader.cs index 97b6478cd..9fcae9e5f 100644 --- a/MQTTnet.Core/Serializer/MqttPacketReader.cs +++ b/MQTTnet.Core/Serializer/MqttPacketReader.cs @@ -91,7 +91,7 @@ private static int ReadBodyLengthFromSource(Stream stream, CancellationToken can if (buffer == -1) { - break; + throw new MqttCommunicationException("Connection closed while reading remaining length data."); } encodedByte = (byte)buffer; diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml index 4fdbd2e6b..43b26c7af 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml @@ -78,6 +78,7 @@ Received messages: + Add received messages to list diff --git a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs index 6ae711be9..5e1a1261b 100644 --- a/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs +++ b/Tests/MQTTnet.TestApp.UniversalWindows/MainPage.xaml.cs @@ -30,14 +30,14 @@ public MainPage() private async void OnTraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e) { - await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.High, () => + var text = $"[{e.TraceMessage.Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{e.TraceMessage.Level}] [{e.TraceMessage.Source}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Message}]{Environment.NewLine}"; + if (e.TraceMessage.Exception != null) + { + text += $"{e.TraceMessage.Exception}{Environment.NewLine}"; + } + + await Trace.Dispatcher.RunAsync(CoreDispatcherPriority.Low, () => { - var text = $"[{e.TraceMessage.Timestamp:yyyy-MM-dd HH:mm:ss.fff}] [{e.TraceMessage.Level}] [{e.TraceMessage.Source}] [{e.TraceMessage.ThreadId}] [{e.TraceMessage.Message}]{Environment.NewLine}"; - if (e.TraceMessage.Exception != null) - { - text += $"{e.TraceMessage.Exception}{Environment.NewLine}"; - } - Trace.Text += text; }); } @@ -109,11 +109,13 @@ private async void OnApplicationMessageReceived(object sender, MqttApplicationMe { var item = $"Timestamp: {DateTime.Now:O} | Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {Encoding.UTF8.GetString(eventArgs.ApplicationMessage.Payload)} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel}"; - await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () => + await Dispatcher.RunAsync(CoreDispatcherPriority.Low, () => { - ReceivedMessages.Items.Add(item); + if (AddReceivedMessagesToList.IsChecked == true) + { + ReceivedMessages.Items.Add(item); + } }); - } private async void Publish(object sender, RoutedEventArgs e)