Skip to content

Commit

Permalink
1681 misleading signature of mqttclientconnectasync mqttclientconnect…
Browse files Browse the repository at this point in the history
…resultcode expected but throws exception (#1749)
  • Loading branch information
chkr1011 authored May 29, 2023
1 parent d8e5936 commit 8ad8692
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 78 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [Client] Exposed _UseDefaultCredentials_ and more for Web Socket options and proxy options (#1734, thanks to @impworks).
* [Client] Exposed more TLS options (#1729).
* [Client] Fixed wrong return code conversion (#1729).
* [Client] Added an option to avoid throwing an exception when the server returned a proper non success (but valid) response (#1681).
* [Server] Improved performance by changing internal locking strategy for subscriptions (#1716, thanks to @zeheng).
* [Server] Fixed exceptions when clients are connecting and disconnecting very fast while accessing the client status for connection validation (#1742).
* [Server] Exposed more properties in _ClientConnectedEventArgs_ (#1738).
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;
using MQTTnet.Exceptions;
using MQTTnet.Formatter;
using MQTTnet.Internal;
using MQTTnet.Packets;
using MQTTnet.Protocol;
using MQTTnet.Server;

Expand Down Expand Up @@ -128,7 +130,7 @@ public async Task Disconnect_Clean_With_Custom_Reason()
await client.DisconnectAsync(disconnectOptions);

await LongTestDelay();

Assert.IsNotNull(eventArgs);
Assert.AreEqual(MqttDisconnectReasonCode.MessageRateTooHigh, eventArgs.ReasonCode);
}
Expand Down Expand Up @@ -156,13 +158,43 @@ public async Task Disconnect_Clean_With_User_Properties()
await client.DisconnectAsync(disconnectOptions);

await LongTestDelay();

Assert.IsNotNull(eventArgs);
Assert.IsNotNull(eventArgs.UserProperties);
Assert.AreEqual(1, eventArgs.UserProperties.Count);
Assert.AreEqual("test_name", eventArgs.UserProperties[0].Name);
Assert.AreEqual("test_value", eventArgs.UserProperties[0].Value);
}
}

[TestMethod]
public async Task Return_Non_Success()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
var server = await testEnvironment.StartServer();

server.ValidatingConnectionAsync += args =>
{
args.ResponseUserProperties = new List<MqttUserProperty>
{
new MqttUserProperty("Property", "Value")
};
args.ReasonCode = MqttConnectReasonCode.QuotaExceeded;
return CompletedTask.Instance;
};

var client = testEnvironment.CreateClient();

var response = await client.ConnectAsync(testEnvironment.CreateDefaultClientOptionsBuilder().WithoutThrowOnNonSuccessfulConnectResponse().Build());

Assert.IsNotNull(response);
Assert.AreEqual(MqttClientConnectResultCode.QuotaExceeded, response.ResultCode);
Assert.AreEqual(response.UserProperties[0].Name, "Property");
Assert.AreEqual(response.UserProperties[0].Value, "Value");
}
}
}
}
1 change: 1 addition & 0 deletions Source/MQTTnet/Client/Internal/MqttClientResultFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace MQTTnet.Client.Internal
{
public static class MqttClientResultFactory
{
public static readonly MqttClientConnectResultFactory ConnectResult = new MqttClientConnectResultFactory();
public static readonly MqttClientPublishResultFactory PublishResult = new MqttClientPublishResultFactory();
public static readonly MqttClientSubscribeResultFactory SubscribeResult = new MqttClientSubscribeResultFactory();
public static readonly MqttClientUnsubscribeResultFactory UnsubscribeResult = new MqttClientUnsubscribeResultFactory();
Expand Down
148 changes: 82 additions & 66 deletions Source/MQTTnet/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ public async Task<MqttClientConnectResult> ConnectAsync(MqttClientOptions option
}
}

if (connectResult.ResultCode != MqttClientConnectResultCode.Success)
{
_logger.Warning("Connecting failed: {0}", connectResult.ResultCode);
return connectResult;
}

_lastPacketSentTimestamp = DateTime.UtcNow;

var keepAliveInterval = Options.KeepAlivePeriod;
Expand Down Expand Up @@ -434,8 +440,7 @@ async Task<MqttClientConnectResult> Authenticate(IMqttChannelAdapter channelAdap

if (receivedPacket is MqttConnAckPacket connAckPacket)
{
var clientConnectResultFactory = new MqttClientConnectResultFactory();
result = clientConnectResultFactory.Create(connAckPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion);
result = MqttClientResultFactory.ConnectResult.Create(connAckPacket, _adapter.PacketFormatterAdapter.ProtocolVersion);
}
else
{
Expand All @@ -447,9 +452,18 @@ async Task<MqttClientConnectResult> Authenticate(IMqttChannelAdapter channelAdap
throw new MqttConnectingFailedException($"Error while authenticating. {exception.Message}", exception, null);
}

if (result.ResultCode != MqttClientConnectResultCode.Success)
// This is no feature. It is basically a backward compatibility option and should be removed in the future.
// The client should not throw any exception if the transport layer connection was successful and the server
// did send a proper ACK packet with a non success response.
if (options.ThrowOnNonSuccessfulConnectResponse)
{
throw new MqttConnectingFailedException($"Connecting with MQTT server failed ({result.ResultCode}).", null, result);
_logger.Warning(
"Client will now throw an _MqttConnectingFailedException_. This is obsolete and will be removed in the future. Consider setting _ThrowOnNonSuccessfulResponseFromServer=False_ in client options.");

if (result.ResultCode != MqttClientConnectResultCode.Success)
{
throw new MqttConnectingFailedException($"Connecting with MQTT server failed ({result.ResultCode}).", null, result);
}
}

_logger.Verbose("Authenticated MQTT connection with server established.");
Expand Down Expand Up @@ -498,9 +512,11 @@ async Task<MqttClientConnectResult> ConnectInternal(IMqttChannelAdapter channelA
_publishPacketReceiverQueue = new AsyncQueue<MqttPublishPacket>();

var connectResult = await Authenticate(channelAdapter, Options, effectiveCancellationToken.Token).ConfigureAwait(false);

_publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken);
_packetReceiverTask = Task.Run(() => ReceivePacketsLoop(backgroundCancellationToken), backgroundCancellationToken);
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
_publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken);
_packetReceiverTask = Task.Run(() => ReceivePacketsLoop(backgroundCancellationToken), backgroundCancellationToken);
}

return connectResult;
}
Expand Down Expand Up @@ -754,6 +770,65 @@ async Task<MqttPacket> Receive(CancellationToken cancellationToken)
return packet;
}

async Task ReceivePacketsLoop(CancellationToken cancellationToken)
{
try
{
_logger.Verbose("Start receiving packets.");

while (!cancellationToken.IsCancellationRequested)
{
var packet = await Receive(cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return;
}

if (packet == null)
{
await DisconnectInternal(_packetReceiverTask, null, null).ConfigureAwait(false);

return;
}

await TryProcessReceivedPacket(packet, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
{
if (_cleanDisconnectInitiated)
{
return;
}

if (exception is AggregateException aggregateException)
{
exception = aggregateException.GetBaseException();
}

if (exception is OperationCanceledException)
{
}
else if (exception is MqttCommunicationException)
{
_logger.Warning(exception, "Communication error while receiving packets.");
}
else
{
_logger.Error(exception, "Error while receiving packets.");
}

_packetDispatcher.FailAll(exception);

await DisconnectInternal(_packetReceiverTask, exception, null).ConfigureAwait(false);
}
finally
{
_logger.Verbose("Stopped receiving packets.");
}
}

async Task<TResponsePacket> Request<TResponsePacket>(MqttPacket requestPacket, CancellationToken cancellationToken) where TResponsePacket : MqttPacket
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -920,65 +995,6 @@ async Task TryProcessReceivedPacket(MqttPacket packet, CancellationToken cancell
}
}

async Task ReceivePacketsLoop(CancellationToken cancellationToken)
{
try
{
_logger.Verbose("Start receiving packets.");

while (!cancellationToken.IsCancellationRequested)
{
var packet = await Receive(cancellationToken).ConfigureAwait(false);

if (cancellationToken.IsCancellationRequested)
{
return;
}

if (packet == null)
{
await DisconnectInternal(_packetReceiverTask, null, null).ConfigureAwait(false);

return;
}

await TryProcessReceivedPacket(packet, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
{
if (_cleanDisconnectInitiated)
{
return;
}

if (exception is AggregateException aggregateException)
{
exception = aggregateException.GetBaseException();
}

if (exception is OperationCanceledException)
{
}
else if (exception is MqttCommunicationException)
{
_logger.Warning(exception, "Communication error while receiving packets.");
}
else
{
_logger.Error(exception, "Error while receiving packets.");
}

_packetDispatcher.FailAll(exception);

await DisconnectInternal(_packetReceiverTask, exception, null).ConfigureAwait(false);
}
finally
{
_logger.Verbose("Stopped receiving packets.");
}
}

async Task TrySendKeepAliveMessages(CancellationToken cancellationToken)
{
try
Expand Down
23 changes: 14 additions & 9 deletions Source/MQTTnet/Client/Options/MqttClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ namespace MQTTnet.Client
{
public sealed class MqttClientOptions
{
/// <summary>
/// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets
/// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and
/// will close the connection when receiving such packets. If such a service is used this flag must
/// be set to _false_.
/// </summary>
public bool AllowPacketFragmentation { get; set; } = true;

/// <summary>
/// Gets or sets the authentication data.
/// <remarks>MQTT 5.0.0+ feature.</remarks>
Expand All @@ -24,14 +32,6 @@ public sealed class MqttClientOptions
/// </summary>
public string AuthenticationMethod { get; set; }

/// <summary>
/// Usually the MQTT packets can be send partially. This is done by using multiple TCP packets
/// or WebSocket frames etc. Unfortunately not all brokers (like Amazon Web Services (AWS)) do support this feature and
/// will close the connection when receiving such packets. If such a service is used this flag must
/// be set to _false_.
/// </summary>
public bool AllowPacketFragmentation { get; set; } = true;

public IMqttClientChannelOptions ChannelOptions { get; set; }

/// <summary>
Expand Down Expand Up @@ -100,6 +100,11 @@ public sealed class MqttClientOptions
/// </summary>
public uint SessionExpiryInterval { get; set; }

/// <summary>
/// Gets or sets whether an exception should be thrown when the server has sent a non success ACK packet.
/// </summary>
public bool ThrowOnNonSuccessfulConnectResponse { get; set; } = true;

/// <summary>
/// Gets or sets the timeout which will be applied at socket level and internal operations.
/// The default value is the same as for sockets in .NET in general.
Expand Down Expand Up @@ -222,4 +227,4 @@ public sealed class MqttClientOptions
/// </summary>
public int WriterBufferSizeMax { get; set; } = 65535;
}
}
}
12 changes: 11 additions & 1 deletion Source/MQTTnet/Client/Options/MqttClientOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,17 @@ public MqttClientOptions Build()

return _options;
}


/// <summary>
/// The client will not throw an exception when the MQTT server responses with a non success ACK packet.
/// This will become the default behavior in future versions of the library.
/// </summary>
public MqttClientOptionsBuilder WithoutThrowOnNonSuccessfulConnectResponse()
{
_options.ThrowOnNonSuccessfulConnectResponse = false;
return this;
}

public MqttClientOptionsBuilder WithAuthentication(string method, byte[] data)
{
_options.AuthenticationMethod = method;
Expand Down

0 comments on commit 8ad8692

Please sign in to comment.