diff --git a/Source/MQTTnet/Client/MqttClient.cs b/Source/MQTTnet/Client/MqttClient.cs index 0719ed1fa..365a2f91c 100644 --- a/Source/MQTTnet/Client/MqttClient.cs +++ b/Source/MQTTnet/Client/MqttClient.cs @@ -121,11 +121,11 @@ public async Task ConnectAsync(MqttClientOptions option var mqttClientAliveToken = _mqttClientAlive.Token; var adapter = _adapterFactory.CreateClientAdapter(options, new MqttPacketInspector(_events.InspectPacketEvent, _rootLogger), _rootLogger); - _adapter = adapter; + _adapter = adapter ?? throw new InvalidOperationException("The adapter factory did not provide an adapter."); if (cancellationToken.CanBeCanceled) { - connectResult = await ConnectInternal(cancellationToken).ConfigureAwait(false); + connectResult = await ConnectInternal(adapter, cancellationToken).ConfigureAwait(false); } else { @@ -133,7 +133,7 @@ public async Task ConnectAsync(MqttClientOptions option // CancellationToken.None or similar. using (var timeout = new CancellationTokenSource(Options.Timeout)) { - connectResult = await ConnectInternal(timeout.Token).ConfigureAwait(false); + connectResult = await ConnectInternal(adapter, timeout.Token).ConfigureAwait(false); } } @@ -421,7 +421,7 @@ Task AcknowledgeReceivedPublishPacket(MqttApplicationMessageReceivedEventArgs ev return CompletedTask.Instance; } - async Task Authenticate(MqttClientOptions options, CancellationToken cancellationToken) + async Task Authenticate(IMqttChannelAdapter channelAdapter, MqttClientOptions options, CancellationToken cancellationToken) { MqttClientConnectResult result; @@ -435,7 +435,7 @@ async Task Authenticate(MqttClientOptions options, Canc if (receivedPacket is MqttConnAckPacket connAckPacket) { var clientConnectResultFactory = new MqttClientConnectResultFactory(); - result = clientConnectResultFactory.Create(connAckPacket, _adapter.PacketFormatterAdapter.ProtocolVersion); + result = clientConnectResultFactory.Create(connAckPacket, channelAdapter.PacketFormatterAdapter.ProtocolVersion); } else { @@ -484,7 +484,7 @@ MqttClientConnectionStatus CompareExchangeConnectionStatus(MqttClientConnectionS return (MqttClientConnectionStatus)Interlocked.CompareExchange(ref _connectionStatus, (int)value, (int)comparand); } - async Task ConnectInternal(CancellationToken cancellationToken) + async Task ConnectInternal(IMqttChannelAdapter channelAdapter, CancellationToken cancellationToken) { var backgroundCancellationToken = _mqttClientAlive.Token; @@ -497,7 +497,7 @@ async Task ConnectInternal(CancellationToken cancellati _publishPacketReceiverQueue?.Dispose(); _publishPacketReceiverQueue = new AsyncQueue(); - var connectResult = await Authenticate(Options, effectiveCancellationToken.Token).ConfigureAwait(false); + var connectResult = await Authenticate(channelAdapter, Options, effectiveCancellationToken.Token).ConfigureAwait(false); _publishPacketReceiverTask = Task.Run(() => ProcessReceivedPublishPackets(backgroundCancellationToken), backgroundCancellationToken); _packetReceiverTask = Task.Run(() => ReceivePacketsLoop(backgroundCancellationToken), backgroundCancellationToken);