Skip to content

Commit

Permalink
Fixed WebSocket client stream
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Nov 17, 2017
1 parent 5bcda12 commit acc269e
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 138 deletions.
4 changes: 3 additions & 1 deletion Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Core] Fixed library reference issues for .NET 4.6 (Thanks to @JanEggers).
<releaseNotes>* [Core] Fixed library reference issues for .NET 4.6 and netstandard 2.0 (Thanks to @JanEggers).
* [Core] Several COM exceptions are now wrapped properly resulting in less warnings in the trace.
* [Core] Removed application message payload from trace to reduce trace size and increase performance.
* [Client] Fixed WebSocket sub protocol negotiation for ASP.NET Core 2 servers (Thanks to @JanEggers).
* [Client] Fixed broken connection after 30 seconds then using WebSocket protocol (Thanks to @ChristianRiedl).
* [Server] Client connections are now closed when the server is stopped (Thanks to @zhudanfei).
* [Server] Published messages from the server are now retained (if set) (Thanks to @ChristianRiedl). BREAKING CHANGE!
</releaseNotes>
Expand Down
46 changes: 0 additions & 46 deletions Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Adapter;
using MQTTnet.Core.Channel;
using MQTTnet.Core.Server;
using MQTTnet.Implementations;

namespace MQTTnet.AspNetCore
{
Expand Down Expand Up @@ -47,47 +43,5 @@ public void Dispose()
{
StopAsync();
}

private class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable
{
private readonly WebSocket _webSocket;

public MqttWebSocketServerChannel(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));

RawReceiveStream = new WebSocketStream(_webSocket);
}

public Stream SendStream => RawReceiveStream;
public Stream ReceiveStream => RawReceiveStream;
public Stream RawReceiveStream { get; }

public Task ConnectAsync()
{
return Task.CompletedTask;
}

public Task DisconnectAsync()
{
RawReceiveStream?.Dispose();

if (_webSocket == null)
{
return Task.CompletedTask;
}

return _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}

public void Dispose()
{
RawReceiveStream?.Dispose();
SendStream?.Dispose();
ReceiveStream?.Dispose();

_webSocket?.Dispose();
}
}
}
}
60 changes: 60 additions & 0 deletions Frameworks/MQTTnet.AspnetCore/MqttWebSocketServerChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Core.Channel;
using MQTTnet.Implementations;

namespace MQTTnet.AspNetCore
{
public class MqttWebSocketServerChannel : IMqttCommunicationChannel, IDisposable
{
private WebSocket _webSocket;

public MqttWebSocketServerChannel(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));

SendStream = new WebSocketStream(_webSocket);
ReceiveStream = SendStream;
}

public Stream SendStream { get; private set; }
public Stream ReceiveStream { get; private set; }

public Task ConnectAsync()
{
return Task.CompletedTask;
}

public async Task DisconnectAsync()
{
if (_webSocket == null)
{
return;
}

try
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
}
finally
{
Dispose();
}
}

public void Dispose()
{
SendStream?.Dispose();
ReceiveStream?.Dispose();

_webSocket?.Dispose();

SendStream = null;
ReceiveStream = null;
_webSocket = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ namespace MQTTnet.Implementations
{
public sealed class MqttWebSocketChannel : IMqttCommunicationChannel, IDisposable
{
// ReSharper disable once MemberCanBePrivate.Global
// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
public static int BufferSize { get; set; } = 4096 * 20; // Can be changed for fine tuning by library user.

private readonly MqttClientWebSocketOptions _options;
private ClientWebSocket _webSocket;

Expand Down Expand Up @@ -80,7 +84,7 @@ public async Task DisconnectAsync()
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None).ConfigureAwait(false);
}

_webSocket = null;
Dispose();
}

public void Dispose()
Expand Down
85 changes: 64 additions & 21 deletions Frameworks/MQTTnet.NetStandard/Implementations/WebSocketStream.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -10,7 +12,9 @@ namespace MQTTnet.Implementations
public class WebSocketStream : Stream
{
private readonly WebSocket _webSocket;

private readonly byte[] _chunkBuffer = new byte[MqttWebSocketChannel.BufferSize];
private readonly Queue<byte> _buffer = new Queue<byte>(MqttWebSocketChannel.BufferSize);

public WebSocketStream(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
Expand All @@ -34,43 +38,66 @@ public override void Flush()
{
}

public override Task FlushAsync(CancellationToken cancellationToken)
{
return Task.FromResult(0);
}

public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var currentOffset = offset;
var targetOffset = offset + count;
while (_webSocket.State == WebSocketState.Open && currentOffset < targetOffset)
var bytesRead = 0;

// Use existing date from buffer.
while (count > 0 && _buffer.Any())
{
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(buffer, currentOffset, count), cancellationToken).ConfigureAwait(false);
currentOffset += response.Count;
count -= response.Count;

if (response.MessageType == WebSocketMessageType.Close)
buffer[offset++] = _buffer.Dequeue();
count--;
bytesRead++;
}

if (count == 0)
{
return bytesRead;
}

while (_webSocket.State == WebSocketState.Open)
{
await FetchChunkAsync(cancellationToken);

while (count > 0 && _buffer.Any())
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
buffer[offset++] = _buffer.Dequeue();
count--;
bytesRead++;
}

if (count == 0)
{
return bytesRead;
}
}

if (_webSocket.State == WebSocketState.Closed)
{
throw new MqttCommunicationException( "connection closed" );
throw new MqttCommunicationException("WebSocket connection closed.");
}

return currentOffset - offset;
return bytesRead;
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken);
}

public override int Read(byte[] buffer, int offset, int count)
public override void Write(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
}

public override void Write(byte[] buffer, int offset, int count)
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
WriteAsync(buffer, offset, count).GetAwaiter().GetResult();
return _webSocket.SendAsync(new ArraySegment<byte>(buffer, offset, count), WebSocketMessageType.Binary, true, cancellationToken);
}

public override long Seek(long offset, SeekOrigin origin)
Expand All @@ -82,5 +109,21 @@ public override void SetLength(long value)
{
throw new NotSupportedException();
}

private async Task FetchChunkAsync(CancellationToken cancellationToken)
{
var response = await _webSocket.ReceiveAsync(new ArraySegment<byte>(_chunkBuffer, 0, _chunkBuffer.Length), cancellationToken).ConfigureAwait(false);

for (var i = 0; i < response.Count; i++)
{
var @byte = _chunkBuffer[i];
_buffer.Enqueue(@byte);
}

if (response.MessageType == WebSocketMessageType.Close)
{
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false);
}
}
}
}
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.NetStandard/MqttFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public MqttFactory()

public MqttFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
}

public ILoggerFactory GetLoggerFactory()
Expand Down
Loading

0 comments on commit acc269e

Please sign in to comment.