Skip to content

Commit

Permalink
Fix wrong QoS level handling for server.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Aug 7, 2017
1 parent 0172606 commit 59b0b79
Show file tree
Hide file tree
Showing 15 changed files with 31 additions and 32 deletions.
8 changes: 2 additions & 6 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,15 @@
<package >
<metadata>
<id>MQTTnet</id>
<version>2.1.5</version>
<version>2.1.5.1</version>
<authors>Christian Kratky</authors>
<owners>Christian Kratky</owners>
<licenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</licenseUrl>
<projectUrl>https://github.com/chkr1011/MQTTnet</projectUrl>
<iconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [MqttServer] Added support for publishing application messages
* [Core] Fixed QoS level 2 handling
* [Core] Performance optimizations
* [MqttClient/MqttServer] Errors while handline application messages are now catched and traced
* [MqttClient/MqttServer] Added interfaces
<releaseNotes>* [Server] Fixed wrong handling of QoS levels
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2017</copyright>
<tags>MQTT MQTTClient MQTTServer MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Queue Hardware</tags>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private async Task AcceptDefaultEndpointConnectionsAsync(CancellationToken cance
try
{
var clientSocket = await Task.Factory.FromAsync(_defaultEndpointSocket.BeginAccept, _defaultEndpointSocket.EndAccept, null);
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception) when (!(exception is ObjectDisposedException))
Expand All @@ -107,7 +107,7 @@ private async Task AcceptTlsEndpointConnectionsAsync(CancellationToken cancellat
var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);

var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception)
Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.NetFramework/MqttClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer()));
}
}
}
4 changes: 2 additions & 2 deletions Frameworks/MQTTnet.NetFramework/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
[assembly: AssemblyCulture("")]
[assembly: ComVisible(false)]
[assembly: Guid("a480ef90-0eaa-4d9a-b271-47a9c47f6f7d")]
[assembly: AssemblyVersion("2.1.4.0")]
[assembly: AssemblyFileVersion("2.1.4.0")]
[assembly: AssemblyVersion("2.1.5.1")]
[assembly: AssemblyFileVersion("2.1.5.1")]
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private async Task AcceptDefaultEndpointConnectionsAsync(CancellationToken cance
try
{
var clientSocket = await _defaultEndpointSocket.AcceptAsync();
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, null), new MqttV311PacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception)
Expand All @@ -105,7 +105,7 @@ private async Task AcceptTlsEndpointConnectionsAsync(CancellationToken cancellat
var sslStream = new SslStream(new NetworkStream(clientSocket));
await sslStream.AuthenticateAsServerAsync(_tlsCertificate, false, SslProtocols.Tls12, false);

var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(clientSocket, sslStream), new MqttV311PacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(clientSocket.RemoteEndPoint.ToString(), clientAdapter));
}
catch (Exception exception)
Expand Down
4 changes: 2 additions & 2 deletions Frameworks/MQTTnet.NetStandard/MQTTnet.Netstandard.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<TargetFramework>netstandard1.3</TargetFramework>
<AssemblyName>MQTTnet</AssemblyName>
<RootNamespace>MQTTnet</RootNamespace>
<AssemblyVersion>2.1.4.0</AssemblyVersion>
<FileVersion>2.1.4.0</FileVersion>
<AssemblyVersion>2.1.5.1</AssemblyVersion>
<FileVersion>2.1.5.1</FileVersion>
<Version>0.0.0.0</Version>
<Company />
<Product />
Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.NetStandard/MqttClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private void AcceptDefaultEndpointConnectionsAsync(StreamSocketListener sender,
{
try
{
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new DefaultMqttV311PacketSerializer());
var clientAdapter = new MqttChannelCommunicationAdapter(new MqttTcpChannel(args.Socket), new MqttV311PacketSerializer());
ClientConnected?.Invoke(this, new MqttClientConnectedEventArgs(args.Socket.Information.RemoteAddress.ToString(), clientAdapter));
}
catch (Exception exception)
Expand Down
2 changes: 1 addition & 1 deletion Frameworks/MQTTnet.UniversalWindows/MqttClientFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public IMqttClient CreateMqttClient(MqttClientOptions options)
{
if (options == null) throw new ArgumentNullException(nameof(options));

return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new DefaultMqttV311PacketSerializer()));
return new MqttClient(options, new MqttChannelCommunicationAdapter(new MqttTcpChannel(), new MqttV311PacketSerializer()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
[assembly: ComVisible(false)]
[assembly: AssemblyVersion("2.1.4.0")]
[assembly: AssemblyFileVersion("2.1.4.0")]
[assembly: AssemblyVersion("2.1.5.1")]
[assembly: AssemblyFileVersion("2.1.5.1")]
2 changes: 1 addition & 1 deletion MQTTnet.Core/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private Task ProcessReceivedPublishPacket(MqttPublishPacket publishPacket)
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier });
}

throw new InvalidOperationException();
throw new MqttCommunicationException("Received a not supported QoS level.");
}

private async Task ProcessReceivedPubRelPacket(MqttPubRelPacket pubRelPacket)
Expand Down
4 changes: 2 additions & 2 deletions MQTTnet.Core/MQTTnet.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
<PackageIconUrl></PackageIconUrl>
<RepositoryUrl></RepositoryUrl>
<PackageTags></PackageTags>
<FileVersion>2.1.4.0</FileVersion>
<AssemblyVersion>2.1.4.0</AssemblyVersion>
<FileVersion>2.1.5.1</FileVersion>
<AssemblyVersion>2.1.5.1</AssemblyVersion>
<PackageLicenseUrl></PackageLicenseUrl>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@

namespace MQTTnet.Core.Serializer
{
public sealed class DefaultMqttV311PacketSerializer : IMqttPacketSerializer
public sealed class MqttV311PacketSerializer : IMqttPacketSerializer
{
private static readonly byte[] MqttV311Prefix = Encoding.UTF8.GetBytes("MQTT");

public Task SerializeAsync(MqttBasePacket packet, IMqttCommunicationChannel destination)
{
if (packet == null) throw new ArgumentNullException(nameof(packet));
Expand Down Expand Up @@ -351,8 +353,6 @@ private static void ValidatePublishPacket(MqttPublishPacket packet)
}
}

private static readonly byte[] MqttPrefix = Encoding.UTF8.GetBytes("MQTT");

private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationChannel destination)
{
ValidateConnectPacket(packet);
Expand All @@ -361,7 +361,7 @@ private static Task SerializeAsync(MqttConnectPacket packet, IMqttCommunicationC
{
// Write variable header
output.Write(0x00, 0x04); // 3.1.2.1 Protocol Name
output.Write(MqttPrefix);
output.Write(MqttV311Prefix);
output.Write(0x04); // 3.1.2.2 Protocol Level

var connectFlags = new ByteWriter(); // 3.1.2.3 Connect Flags
Expand Down
9 changes: 6 additions & 3 deletions MQTTnet.Core/Server/MqttClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,16 @@ private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtMostOnce)
{
_publishPacketReceivedCallback(this, publishPacket);
return Task.FromResult(0);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.AtLeastOnce)
{
_publishPacketReceivedCallback(this, publishPacket);
return _adapter.SendPacketAsync(new MqttPubAckPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
}
else if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
// QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
lock (_unacknowledgedPublishPackets)
Expand All @@ -172,7 +175,7 @@ private Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
return _adapter.SendPacketAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, _options.DefaultCommunicationTimeout);
}

throw new MqttCommunicationException("Received not supported QoS level.");
throw new MqttCommunicationException("Received a not supported QoS level.");
}

private Task HandleIncomingPubRelPacketAsync(MqttPubRelPacket pubRelPacket)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public byte[] ToArray()

private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
{
var serializer = new DefaultMqttV311PacketSerializer();
var serializer = new MqttV311PacketSerializer();
var channel = new TestChannel();
serializer.SerializeAsync(packet, channel).Wait();
var buffer = channel.ToArray();
Expand All @@ -415,7 +415,7 @@ private void SerializeAndCompare(MqttBasePacket packet, string expectedBase64Val

private void DeserializeAndCompare(MqttBasePacket packet, string expectedBase64Value)
{
var serializer = new DefaultMqttV311PacketSerializer();
var serializer = new MqttV311PacketSerializer();

var channel1 = new TestChannel();
serializer.SerializeAsync(packet, channel1).Wait();
Expand Down

0 comments on commit 59b0b79

Please sign in to comment.