Skip to content

Commit

Permalink
Add possibility to specify parameter to RPC invocation (#1798)
Browse files Browse the repository at this point in the history
* Add possibility to specify parameter to RPC invocation which will be accessible in TopicGenerationContext.

* Make parameters optional

* Update ReleaseNotes.md

---------

Co-authored-by: Christian <[email protected]>
  • Loading branch information
Temppus and chkr1011 authored Sep 1, 2023
1 parent fc31c30 commit 5965ded
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 9 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [Client] Added support for _RemoteCertificateValidationCallback_ for .NET 4.5.2, 4.6.1 and 4.8 (#1806, thanks to @troky).
* [Client] Fixed wrong logging of obsolete feature when connection was not successful (#1801, thanks to @ramonsmits).
* [Client] Fixed _NullReferenceException_ when performing several actions when not connected (#1800, thanks to @ramonsmits).
* [RpcClient] Added support for passing custom parameters to topic generation context (#1798, thanks to @Temppus).
* [Server] Fixed _NullReferenceException_ in retained messages management (#1762, thanks to @logicaloud).
* [Server] Exposed new option which allows disabling packet fragmentation (#1753).
* [Server] Expired sessions will no longer be used when a client connects (#1756).
Expand Down
5 changes: 3 additions & 2 deletions Source/MQTTnet.Extensions.Rpc/IMqttRpcClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Protocol;
Expand All @@ -7,8 +8,8 @@ namespace MQTTnet.Extensions.Rpc
{
public interface IMqttRpcClient : IDisposable
{
Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel);
Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary<string,object> parameters = null);

Task<byte[]> ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken = default);
Task<byte[]> ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary<string, object> parameters = null, CancellationToken cancellationToken = default);
}
}
9 changes: 5 additions & 4 deletions Source/MQTTnet.Extensions.Rpc/MqttRpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -42,13 +43,13 @@ public void Dispose()
_waitingCalls.Clear();
}

public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary<string, object> parameters = null)
{
using (var timeoutToken = new CancellationTokenSource(timeout))
{
try
{
return await ExecuteAsync(methodName, payload, qualityOfServiceLevel, timeoutToken.Token).ConfigureAwait(false);
return await ExecuteAsync(methodName, payload, qualityOfServiceLevel, parameters, timeoutToken.Token).ConfigureAwait(false);
}
catch (OperationCanceledException exception)
{
Expand All @@ -62,14 +63,14 @@ public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string methodName, byte
}
}

public async Task<byte[]> ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, CancellationToken cancellationToken = default)
public async Task<byte[]> ExecuteAsync(string methodName, byte[] payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary<string, object> parameters = null, CancellationToken cancellationToken = default)
{
if (methodName == null)
{
throw new ArgumentNullException(nameof(methodName));
}

var context = new TopicGenerationContext(_mqttClient, _options, methodName, qualityOfServiceLevel);
var context = new TopicGenerationContext(_mqttClient, _options, methodName, parameters, qualityOfServiceLevel);
var topicNames = _options.TopicGenerationStrategy.CreateRpcTopics(context);

var requestTopic = topicNames.RequestTopic;
Expand Down
5 changes: 3 additions & 2 deletions Source/MQTTnet.Extensions.Rpc/MqttRpcClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using MQTTnet.Protocol;
Expand All @@ -11,13 +12,13 @@ namespace MQTTnet.Extensions.Rpc
{
public static class MqttRpcClientExtensions
{
public static Task<byte[]> ExecuteAsync(this IMqttRpcClient client, TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
public static Task<byte[]> ExecuteAsync(this IMqttRpcClient client, TimeSpan timeout, string methodName, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel, IDictionary<string,object> parameters = null)
{
if (client == null) throw new ArgumentNullException(nameof(client));

var buffer = Encoding.UTF8.GetBytes(payload ?? string.Empty);

return client.ExecuteAsync(timeout, methodName, buffer, qualityOfServiceLevel);
return client.ExecuteAsync(timeout, methodName, buffer, qualityOfServiceLevel, parameters);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using MQTTnet.Client;
using MQTTnet.Protocol;

namespace MQTTnet.Extensions.Rpc
{
public sealed class TopicGenerationContext
{
public TopicGenerationContext(IMqttClient mqttClient, MqttRpcClientOptions options, string methodName, MqttQualityOfServiceLevel qualityOfServiceLevel)
public TopicGenerationContext(IMqttClient mqttClient, MqttRpcClientOptions options, string methodName,
IDictionary<string, object> parameters, MqttQualityOfServiceLevel qualityOfServiceLevel)
{
MethodName = methodName ?? throw new ArgumentNullException(nameof(methodName));
Parameters = parameters;
QualityOfServiceLevel = qualityOfServiceLevel;
MqttClient = mqttClient ?? throw new ArgumentNullException(nameof(mqttClient));
Options = options ?? throw new ArgumentNullException(nameof(options));
}

public string MethodName { get; }

public IDictionary<string, object> Parameters { get; }

public IMqttClient MqttClient { get; }

public MqttRpcClientOptions Options { get; }
Expand Down
56 changes: 56 additions & 0 deletions Source/MQTTnet.Tests/Extensions/Rpc_Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand Down Expand Up @@ -45,6 +46,34 @@ public async Task Execute_Success_MQTT_V5_Mixed_Clients()
}
}

[TestMethod]
public async Task Execute_Success_Parameters_Propagated_Correctly()
{
var paramValue = "123";
var parameters = new Dictionary<string, object>
{
{ TestParametersTopicGenerationStrategy.ExpectedParamName, "123" },
};

using (var testEnvironment = CreateTestEnvironment())
{
await testEnvironment.StartServer();

var responseSender = await testEnvironment.ConnectClient(new MqttClientOptionsBuilder());
await responseSender.SubscribeAsync($"MQTTnet.RPC/+/ping/{paramValue}");

responseSender.ApplicationMessageReceivedAsync += e => responseSender.PublishStringAsync(e.ApplicationMessage.Topic + "/response", "pong");

using (var rpcClient = await testEnvironment.ConnectRpcClient(new MqttRpcClientOptionsBuilder()
.WithTopicGenerationStrategy(new TestParametersTopicGenerationStrategy()).Build()))
{
var response = await rpcClient.ExecuteAsync(TimeSpan.FromSeconds(5), "ping", "", MqttQualityOfServiceLevel.AtMostOnce, parameters);

Assert.AreEqual("pong", Encoding.UTF8.GetString(response));
}
}
}

[TestMethod]
public Task Execute_Success_With_QoS_0()
{
Expand Down Expand Up @@ -222,5 +251,32 @@ public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context)
};
}
}

class TestParametersTopicGenerationStrategy : IMqttRpcClientTopicGenerationStrategy
{
internal const string ExpectedParamName = "test_param_name";

public MqttRpcTopicPair CreateRpcTopics(TopicGenerationContext context)
{
if (context.Parameters == null)
{
throw new InvalidOperationException("Parameters dictionary expected to be not null");
}

if (!context.Parameters.TryGetValue(ExpectedParamName, out var paramValue))
{
throw new InvalidOperationException($"Parameter with name {ExpectedParamName} not present");
}

var requestTopic = $"MQTTnet.RPC/{Guid.NewGuid():N}/{context.MethodName}/{paramValue}";
var responseTopic = requestTopic + "/response";

return new MqttRpcTopicPair
{
RequestTopic = requestTopic,
ResponseTopic = responseTopic
};
}
}
}
}

0 comments on commit 5965ded

Please sign in to comment.