Skip to content

Commit

Permalink
[Server] Add initial support for session expiration (#1776)
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 authored Jun 27, 2023
1 parent 18614d4 commit 9295626
Show file tree
Hide file tree
Showing 15 changed files with 224 additions and 240 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* [Client] Fixed _PlatformNotSupportedException_ when using Blazor (#1755, thanks to @Nickztar).
* [Server] Fixed _NullReferenceException_ in retained messages management (#1762, thanks to @logicaloud).
* [Server] Exposed new option which allows disabling packet fragmentation (#1753).
* [Server] Expired sessions will no longer be used when a client connects (#1756).
98 changes: 0 additions & 98 deletions Source/MQTTnet.Tests/Extension_Tests.cs

This file was deleted.

70 changes: 35 additions & 35 deletions Source/MQTTnet.Tests/Mockups/TestEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand All @@ -24,10 +25,9 @@ namespace MQTTnet.Tests.Mockups
public sealed class TestEnvironment : IDisposable
{
readonly List<string> _clientErrors = new List<string>();
readonly List<IMqttClient> _clients = new List<IMqttClient>();

readonly ConcurrentBag<IMqttClient> _clients = new ConcurrentBag<IMqttClient>();
readonly List<Exception> _exceptions = new List<Exception>();
readonly List<ILowLevelMqttClient> _lowLevelClients = new List<ILowLevelMqttClient>();
readonly ConcurrentBag<ILowLevelMqttClient> _lowLevelClients = new ConcurrentBag<ILowLevelMqttClient>();
readonly MqttProtocolVersion _protocolVersion;
readonly List<string> _serverErrors = new List<string>();

Expand Down Expand Up @@ -65,9 +65,12 @@ public TestEnvironment(TestContext testContext, MqttProtocolVersion protocolVers
if (e.LogMessage.Level == MqttNetLogLevel.Error)
{
lock (_clientErrors)
if (!IgnoreClientLogErrors)
{
_clientErrors.Add(e.LogMessage.ToString());
lock (_clientErrors)
{
_clientErrors.Add(e.LogMessage.ToString());
}
}
}
};
Expand Down Expand Up @@ -212,30 +215,27 @@ public TestApplicationMessageReceivedHandler CreateApplicationMessageHandler(IMa

public IMqttClient CreateClient()
{
lock (_clients)
var logger = EnableLogger ? (IMqttNetLogger)ClientLogger : MqttNetNullLogger.Instance;

var client = Factory.CreateMqttClient(logger);
client.ConnectingAsync += e =>
{
var logger = EnableLogger ? (IMqttNetLogger)ClientLogger : new MqttNetNullLogger();

var client = Factory.CreateMqttClient(logger);
_clients.Add(client);

client.ConnectingAsync += e =>
if (TestContext != null)
{
if (TestContext != null)
var clientOptions = e.ClientOptions;
var existingClientId = clientOptions.ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
var clientOptions = e.ClientOptions;
var existingClientId = clientOptions.ClientId;
if (existingClientId != null && !existingClientId.StartsWith(TestContext.TestName))
{
clientOptions.ClientId = TestContext.TestName + "_" + existingClientId;
}
clientOptions.ClientId = TestContext.TestName + "_" + existingClientId;
}
}
return CompletedTask.Instance;
};

return client;
}
return CompletedTask.Instance;
};

_clients.Add(client);

return client;
}

public MqttClientOptions CreateDefaultClientOptions()
Expand All @@ -250,13 +250,10 @@ public MqttClientOptionsBuilder CreateDefaultClientOptionsBuilder()

public ILowLevelMqttClient CreateLowLevelClient()
{
lock (_clients)
{
var client = Factory.CreateLowLevelMqttClient(ClientLogger);
_lowLevelClients.Add(client);
var client = Factory.CreateLowLevelMqttClient(ClientLogger);
_lowLevelClients.Add(client);

return client;
}
return client;
}

public MqttServer CreateServer(MqttServerOptions options)
Expand Down Expand Up @@ -383,13 +380,16 @@ public void ThrowIfLogErrors()
}
}

lock (_clientErrors)
if (!IgnoreClientLogErrors)
{
if (!IgnoreClientLogErrors && _clientErrors.Count > 0)
lock (_clientErrors)
{
var message = $"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})";
Console.WriteLine(message);
throw new Exception(message);
if (_clientErrors.Count > 0)
{
var message = $"Client(s) had {_clientErrors.Count} errors (${string.Join(Environment.NewLine, _clientErrors)})";
Console.WriteLine(message);
throw new Exception(message);
}
}
}
}
Expand Down
21 changes: 9 additions & 12 deletions Source/MQTTnet.Tests/Server/General.cs
Original file line number Diff line number Diff line change
Expand Up @@ -598,19 +598,16 @@ public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
// c
var c1 = await testEnvironment.ConnectClient(clientOptionsBuilder);

await Task.Delay(500);

await LongTestDelay();
var flow = string.Join(string.Empty, events);
Assert.AreEqual("c", flow);

// dc
// Connect client with same client ID. Should disconnect existing client.
var c2 = await testEnvironment.ConnectClient(clientOptionsBuilder);

await Task.Delay(500);

await LongTestDelay();
flow = string.Join(string.Empty, events);

Assert.AreEqual("cdc", flow);

c2.ApplicationMessageReceivedAsync += e =>
Expand All @@ -628,8 +625,7 @@ public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
// r
await c2.PublishStringAsync("topic");

await Task.Delay(500);

await LongTestDelay();
flow = string.Join(string.Empty, events);
Assert.AreEqual("cdcr", flow);

Expand All @@ -639,16 +635,17 @@ public async Task Same_Client_Id_Connect_Disconnect_Event_Order()
await c1.TryDisconnectAsync();
Assert.AreEqual(false, c1.IsConnected);

await Task.Delay(500);
await LongTestDelay();

// d
Assert.AreEqual(true, c2.IsConnected);
await c2.DisconnectAsync();

await Task.Delay(500);
await LongTestDelay();

await server.StopAsync();

await LongTestDelay();
flow = string.Join(string.Empty, events);
Assert.AreEqual("cdcrd", flow);
}
Expand Down Expand Up @@ -852,11 +849,11 @@ public async Task Shutdown_Disconnects_Clients_Gracefully()
return CompletedTask.Instance;
};

await Task.Delay(100);

// Stopping the server should disconnect the connection with the client
// which, it turn, will fire the disconnected event at the client.
await server.StopAsync();

await Task.Delay(100);
await LongTestDelay();

Assert.AreEqual(1, disconnectCalled);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ public void TestInitialize()
var eventContainer = new MqttServerEventContainer();
var clientSessionManager = new MqttClientSessionsManager(options, retainedMessagesManager, eventContainer, logger);

var session = new MqttSession("", false, new ConcurrentDictionary<object, object>(), options, eventContainer, retainedMessagesManager, clientSessionManager);
var session = new MqttSession(new MqttConnectPacket
{
ClientId = ""
}, new ConcurrentDictionary<object, object>(), options, eventContainer, retainedMessagesManager, clientSessionManager);

_subscriptionsManager = new MqttClientSubscriptionsManager(session, new MqttServerEventContainer(), retainedMessagesManager, clientSessionManager);
}
Expand Down
40 changes: 29 additions & 11 deletions Source/MQTTnet.Tests/Server/Session_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,28 @@ public async Task Clean_Session_Persistence()
}
}

[TestMethod]
public async Task Do_Not_Use_Expired_Session()
{
using (var testEnvironment = CreateTestEnvironment(MqttProtocolVersion.V500))
{
await testEnvironment.StartServer(o => o.WithPersistentSessions());

var c1 = await testEnvironment.ConnectClient(o => o.WithClientId("Client1").WithCleanSession(false).WithSessionExpiryInterval(3));

// Kill the client connection and ensure that the session will stay there but is expired after 3 seconds!
c1.Dispose();

await Task.Delay(TimeSpan.FromSeconds(6));

c1 = testEnvironment.CreateClient();
var options = testEnvironment.CreateDefaultClientOptionsBuilder().WithClientId("Client1").WithCleanSession(false).WithSessionExpiryInterval(3).Build();

var connectResult = await c1.ConnectAsync(options);
Assert.AreEqual(false, connectResult.IsSessionPresent);
}
}

[TestMethod]
public async Task Fire_Deleted_Event()
{
Expand Down Expand Up @@ -121,7 +143,7 @@ public async Task Get_Session_Items_In_Status()
Assert.AreEqual(true, session.Items["can_subscribe_x"]);
}
}

[TestMethod]
[DataRow(MqttProtocolVersion.V310)]
[DataRow(MqttProtocolVersion.V311)]
Expand All @@ -134,16 +156,12 @@ public async Task Handle_Parallel_Connection_Attempts(MqttProtocolVersion protoc

await testEnvironment.StartServer();

var options = new MqttClientOptionsBuilder().WithClientId("1").WithTimeout(TimeSpan.FromSeconds(1)).WithProtocolVersion(protocolVersion);
var options = new MqttClientOptionsBuilder().WithClientId("1").WithTimeout(TimeSpan.FromSeconds(10)).WithProtocolVersion(protocolVersion);

var hasReceive = false;

void OnReceive()
{
if (!hasReceive)
{
hasReceive = true;
}
hasReceive = true;
}

// Try to connect 50 clients at the same time.
Expand All @@ -152,7 +170,7 @@ void OnReceive()
var connectedClients = clients.Where(c => c?.TryPingAsync().GetAwaiter().GetResult() == true).ToList();

await LongTestDelay();

Assert.AreEqual(1, connectedClients.Count);

var option2 = new MqttClientOptionsBuilder().WithClientId("2").WithKeepAlivePeriod(TimeSpan.FromSeconds(10));
Expand All @@ -164,7 +182,7 @@ void OnReceive()
Assert.AreEqual(true, hasReceive);
}
}

[DataTestMethod]
[DataRow(MqttQualityOfServiceLevel.ExactlyOnce)]
[DataRow(MqttQualityOfServiceLevel.AtLeastOnce)]
Expand Down Expand Up @@ -355,8 +373,8 @@ static async Task<IMqttClient> ConnectAndSubscribe(TestEnvironment testEnvironme
onReceive();
return CompletedTask.Instance;
};
using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)))

using (var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30)))
{
await client.SubscribeAsync("aaa", MqttQualityOfServiceLevel.AtMostOnce, timeout.Token).ConfigureAwait(false);
}
Expand Down
Loading

0 comments on commit 9295626

Please sign in to comment.