Skip to content

Commit

Permalink
* [Server] Added support for complex client IDs.
Browse files Browse the repository at this point in the history
* [Server] Fixed an issue with not correctly removed old client sessions.
* [Server] Several minor performance improvements.
* [Server] An existing client session is no longer closed if a new client connection is invalid.
* [Client] Added support for sending "CleanSession" flag.
  • Loading branch information
chkr1011 committed May 5, 2017
1 parent b3a6919 commit 105abeb
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 135 deletions.
1 change: 1 addition & 0 deletions MQTTnet.Core/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public async Task ConnectAsync(MqttApplicationMessage willApplicationMessage = n
ClientId = _options.ClientId,
Username = _options.UserName,
Password = _options.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};
Expand Down
2 changes: 2 additions & 0 deletions MQTTnet.Core/Client/MqttClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class MqttClientOptions

public string ClientId { get; set; } = Guid.NewGuid().ToString().Replace("-", string.Empty);

public bool CleanSession { get; set; } = true;

public TimeSpan KeepAlivePeriod { get; set; } = TimeSpan.FromSeconds(5);

public TimeSpan DefaultCommunicationTimeout { get; set; } = TimeSpan.FromSeconds(10);
Expand Down
6 changes: 3 additions & 3 deletions MQTTnet.Core/MQTTnet.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
<Product>MQTTnet</Product>
<Company>Christian Kratky</Company>
<Authors>Christian Kratky</Authors>
<Version>2.1.1.0</Version>
<Version>2.1.2.0</Version>
<PackageId>MQTTnet.Core</PackageId>
<Copyright>Copyright © Christian Kratky 2016-2017</Copyright>
<PackageProjectUrl>https://github.com/chkr1011/MQTTnet</PackageProjectUrl>
<PackageIconUrl>https://raw.githubusercontent.com/chkr1011/MQTTnet/master/Images/Logo_128x128.png</PackageIconUrl>
<RepositoryUrl>https://github.com/chkr1011/MQTTnet</RepositoryUrl>
<PackageTags>MQTT MQTTClient MQTTServer MQTTBroker Broker</PackageTags>
<FileVersion>2.1.1.0</FileVersion>
<AssemblyVersion>2.1.1.0</AssemblyVersion>
<FileVersion>2.1.2.0</FileVersion>
<AssemblyVersion>2.1.2.0</AssemblyVersion>
<PackageLicenseUrl>https://github.com/chkr1011/MQTTnet/blob/master/LICENSE</PackageLicenseUrl>
</PropertyGroup>

Expand Down
9 changes: 9 additions & 0 deletions MQTTnet.Core/Server/GetOrCreateClientSessionResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace MQTTnet.Core.Server
{
public class GetOrCreateClientSessionResult
{
public bool IsExistingSession { get; set; }

public MqttClientSession Session { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace MQTTnet.Core.Server
{
public class MqttOutgoingPublicationsManager
public class MqttClientMessageQueue
{
private readonly List<MqttClientPublishPacketContext> _pendingPublishPackets = new List<MqttClientPublishPacketContext>();
private readonly AsyncGate _gate = new AsyncGate();
Expand All @@ -20,7 +20,7 @@ public class MqttOutgoingPublicationsManager
private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;

public MqttOutgoingPublicationsManager(MqttServerOptions options)
public MqttClientMessageQueue(MqttServerOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
}
Expand All @@ -29,13 +29,13 @@ public void Start(IMqttCommunicationAdapter adapter)
{
if (_cancellationTokenSource != null)
{
throw new InvalidOperationException($"{nameof(MqttOutgoingPublicationsManager)} already started.");
throw new InvalidOperationException($"{nameof(MqttClientMessageQueue)} already started.");
}

_adapter = adapter ?? throw new ArgumentNullException(nameof(adapter));
_cancellationTokenSource = new CancellationTokenSource();

Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token)).Forget();
Task.Run(() => SendPendingPublishPacketsAsync(_cancellationTokenSource.Token));
}

public void Stop()
Expand Down Expand Up @@ -87,7 +87,7 @@ private async Task SendPendingPublishPacketsAsync(CancellationToken cancellation
}
catch (Exception e)
{
MqttTrace.Error(nameof(MqttOutgoingPublicationsManager), e, "Error while sending pending publish packets.");
MqttTrace.Error(nameof(MqttClientMessageQueue), e, "Error while sending pending publish packets.");
}
finally
{
Expand All @@ -112,11 +112,11 @@ private async Task TrySendPendingPublishPacketAsync(MqttClientPublishPacketConte
}
catch (MqttCommunicationException exception)
{
MqttTrace.Warning(nameof(MqttOutgoingPublicationsManager), exception, "Sending publish packet failed.");
MqttTrace.Warning(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
}
catch (Exception exception)
{
MqttTrace.Error(nameof(MqttOutgoingPublicationsManager), exception, "Sending publish packet failed.");
MqttTrace.Error(nameof(MqttClientMessageQueue), exception, "Sending publish packet failed.");
}
finally
{
Expand Down
52 changes: 28 additions & 24 deletions MQTTnet.Core/Server/MqttClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,30 @@

namespace MQTTnet.Core.Server
{
public class MqttClientSession
public sealed class MqttClientSession : IDisposable
{
private readonly ConcurrentDictionary<ushort, MqttPublishPacket> _pendingIncomingPublications = new ConcurrentDictionary<ushort, MqttPublishPacket>();

private readonly MqttClientSubscriptionsManager _subscriptionsManager = new MqttClientSubscriptionsManager();
private readonly MqttOutgoingPublicationsManager _outgoingPublicationsManager;
private readonly MqttClientMessageQueue _messageQueue;
private readonly Action<MqttClientSession, MqttPublishPacket> _publishPacketReceivedCallback;
private readonly MqttServerOptions _options;

private CancellationTokenSource _cancellationTokenSource;
private IMqttCommunicationAdapter _adapter;
private string _identifier;
private MqttApplicationMessage _willApplicationMessage;

public MqttClientSession(MqttServerOptions options, Action<MqttClientSession, MqttPublishPacket> publishPacketReceivedCallback)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_publishPacketReceivedCallback = publishPacketReceivedCallback ?? throw new ArgumentNullException(nameof(publishPacketReceivedCallback));
_outgoingPublicationsManager = new MqttOutgoingPublicationsManager(options);

_messageQueue = new MqttClientMessageQueue(options);
}

public bool IsConnected => _adapter != null;

public async Task RunAsync(string identifier, MqttApplicationMessage willApplicationMessage, IMqttCommunicationAdapter adapter)
{
if (adapter == null) throw new ArgumentNullException(nameof(adapter));
Expand All @@ -45,7 +47,7 @@ public async Task RunAsync(string identifier, MqttApplicationMessage willApplica
_adapter = adapter;
_cancellationTokenSource = new CancellationTokenSource();

_outgoingPublicationsManager.Start(adapter);
_messageQueue.Start(adapter);
while (!_cancellationTokenSource.IsCancellationRequested)
{
var packet = await adapter.ReceivePacketAsync(TimeSpan.Zero);
Expand All @@ -65,8 +67,8 @@ public async Task RunAsync(string identifier, MqttApplicationMessage willApplica
{
_publishPacketReceivedCallback(this, _willApplicationMessage.ToPublishPacket());
}
_outgoingPublicationsManager.Stop();

_messageQueue.Stop();
_cancellationTokenSource.Cancel();
_adapter = null;

Expand All @@ -84,66 +86,68 @@ public void EnqueuePublishPacket(MqttClientSession senderClientSession, MqttPubl
return;
}

_outgoingPublicationsManager.Enqueue(senderClientSession, publishPacket);
_messageQueue.Enqueue(senderClientSession, publishPacket);
MqttTrace.Verbose(nameof(MqttClientSession), $"Client '{_identifier}: Enqueued pending publish packet.");
}

private async Task HandleIncomingPacketAsync(MqttBasePacket packet)
public void Dispose()
{
_cancellationTokenSource?.Cancel();
_cancellationTokenSource?.Dispose();
}

private Task HandleIncomingPacketAsync(MqttBasePacket packet)
{
var subscribePacket = packet as MqttSubscribePacket;
if (subscribePacket != null)
{
await _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
return;
return _adapter.SendPacketAsync(_subscriptionsManager.Subscribe(subscribePacket), _options.DefaultCommunicationTimeout);
}

var unsubscribePacket = packet as MqttUnsubscribePacket;
if (unsubscribePacket != null)
{
await _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
return;
return _adapter.SendPacketAsync(_subscriptionsManager.Unsubscribe(unsubscribePacket), _options.DefaultCommunicationTimeout);
}

var publishPacket = packet as MqttPublishPacket;
if (publishPacket != null)
{
await HandleIncomingPublishPacketAsync(publishPacket);
return;
return HandleIncomingPublishPacketAsync(publishPacket);
}

var pubRelPacket = packet as MqttPubRelPacket;
if (pubRelPacket != null)
{
await HandleIncomingPubRelPacketAsync(pubRelPacket);
return;
return HandleIncomingPubRelPacketAsync(pubRelPacket);
}

var pubAckPacket = packet as MqttPubAckPacket;
if (pubAckPacket != null)
{
await HandleIncomingPubAckPacketAsync(pubAckPacket);
return;
return HandleIncomingPubAckPacketAsync(pubAckPacket);
}

if (packet is MqttPingReqPacket)
{
await _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
return;
return _adapter.SendPacketAsync(new MqttPingRespPacket(), _options.DefaultCommunicationTimeout);
}

if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
{
_cancellationTokenSource.Cancel();
return;
return Task.FromResult((object)null);
}

MqttTrace.Warning(nameof(MqttClientSession), $"Client '{_identifier}': Received not supported packet ({packet}). Closing connection.");
_cancellationTokenSource.Cancel();

return Task.FromResult((object)null);
}

private async Task HandleIncomingPubAckPacketAsync(MqttPubAckPacket pubAckPacket)
{
await Task.FromResult(0);
await Task.FromResult((object)null);
}

private async Task HandleIncomingPublishPacketAsync(MqttPublishPacket publishPacket)
Expand Down
93 changes: 0 additions & 93 deletions MQTTnet.Core/Server/MqttClientSessionManager.cs

This file was deleted.

Loading

0 comments on commit 105abeb

Please sign in to comment.