Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AzureServiceBus ensure client reuse #1974

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
60be56e
chor: add Microsoft.Bcl.AsyncInterfaces to be able to use ClientCache
TomMalow Jul 11, 2023
af2c725
refactor: use ClientCatch instead of internal concurrent dictionaries
TomMalow Jul 11, 2023
6bcbd79
refactor: Change ConnectionKey to point to ServiceBus connectionStrin…
TomMalow Jul 11, 2023
b012102
feat: add Substitutable ServiceBusClientProvider to allow unit testing
TomMalow Jul 11, 2023
c36ff09
feat: add unit test for AzureServiceBusQueueHealthCheck
TomMalow Jul 11, 2023
1ca452a
feat: add unit test for AzureServiceBusSubscriptionHealthCheck
TomMalow Jul 11, 2023
66a9864
feat: add unit test for AzureServiceBusTopicHealthCheck
TomMalow Jul 31, 2023
bed2127
feat: add unit test for AzureServiceBusQueueMessageCountThresholdHeal…
TomMalow Jul 31, 2023
2b4f6ce
Revert "chor: add Microsoft.Bcl.AsyncInterfaces to be able to use Cli…
TomMalow Jul 31, 2023
c57e94c
refactor: fix format
TomMalow Jul 31, 2023
f05cae5
refactor!: add ctor with 1 argument to allow backward compatability
TomMalow Aug 1, 2023
f96d7cf
refactor!: Change Prefix field to obsolete and use ConnectionKey dire…
TomMalow Aug 1, 2023
1e45b37
Update src/HealthChecks.AzureServiceBus/AzureServiceBusHealthCheck.cs
sungam3r Aug 2, 2023
0c5a64e
Update test/HealthChecks.AzureServiceBus.Tests/HealthChecks.AzureServ…
sungam3r Aug 2, 2023
d197498
refactor: move queueKey to field
TomMalow Aug 3, 2023
bb982a3
fix: incorrect setup of test with across service busses
TomMalow Aug 3, 2023
a2ac5a3
refactor: allign naming and ordering of asserts
TomMalow Aug 3, 2023
7bb2bab
Update test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHe…
sungam3r Aug 6, 2023
9c97238
Update test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHe…
sungam3r Aug 6, 2023
38cf220
Update test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHe…
sungam3r Aug 6, 2023
85afba7
refactor: extract HealthCheck setup in test
TomMalow Aug 9, 2023
a1c0565
refactor: extend ServiceBus test with additional assert
TomMalow Aug 9, 2023
9491eaf
refactor: simplify HealthCheck calls in service bus test
TomMalow Aug 9, 2023
ea6e21c
refactor: alling service bus test format
TomMalow Aug 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions src/HealthChecks.AzureServiceBus/AzureServiceBusHealthCheck.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Collections.Concurrent;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using HealthChecks.AzureServiceBus.Configuration;
Expand All @@ -7,21 +6,19 @@ namespace HealthChecks.AzureServiceBus;

public abstract class AzureServiceBusHealthCheck<TOptions> where TOptions : AzureServiceBusHealthCheckOptions
{
protected static readonly ConcurrentDictionary<string, ServiceBusClient> ClientConnections = new();

protected static readonly ConcurrentDictionary<string, ServiceBusAdministrationClient> ManagementClientConnections = new();

protected static readonly ConcurrentDictionary<string, ServiceBusReceiver> ServiceBusReceivers = new();

protected TOptions Options { get; }

private readonly ServiceBusClientProvider _clientProvider;

[Obsolete("Please, override ConnectionKey property instead.")]
protected string Prefix => Options.ConnectionString ?? Options.FullyQualifiedNamespace!;

protected abstract string ConnectionKey { get; }
protected virtual string ConnectionKey => Options.ConnectionString ?? Options.FullyQualifiedNamespace!;

protected AzureServiceBusHealthCheck(TOptions options)
protected AzureServiceBusHealthCheck(TOptions options, ServiceBusClientProvider clientProvider)
{
Options = options;
_clientProvider = clientProvider;

if (!string.IsNullOrWhiteSpace(options.ConnectionString))
return;
Expand All @@ -37,11 +34,11 @@ protected AzureServiceBusHealthCheck(TOptions options)

protected ServiceBusClient CreateClient() =>
Options.Credential is null
? new ServiceBusClient(Options.ConnectionString)
: new ServiceBusClient(Options.FullyQualifiedNamespace, Options.Credential);
? _clientProvider.CreateClient(Options.ConnectionString)
: _clientProvider.CreateClient(Options.FullyQualifiedNamespace, Options.Credential);

protected ServiceBusAdministrationClient CreateManagementClient() =>
Options.Credential is null
? new ServiceBusAdministrationClient(Options.ConnectionString)
: new ServiceBusAdministrationClient(Options.FullyQualifiedNamespace, Options.Credential);
? _clientProvider.CreateManagementClient(Options.ConnectionString)
: _clientProvider.CreateManagementClient(Options.FullyQualifiedNamespace, Options.Credential);
}
27 changes: 15 additions & 12 deletions src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@ namespace HealthChecks.AzureServiceBus;

public class AzureServiceBusQueueHealthCheck : AzureServiceBusHealthCheck<AzureServiceBusQueueHealthCheckOptions>, IHealthCheck
{
private string? _connectionKey;
private string queueKey => $"{ConnectionKey}_{Options.QueueName}";

protected override string ConnectionKey => _connectionKey ??= $"{Prefix}_{Options.QueueName}";

public AzureServiceBusQueueHealthCheck(AzureServiceBusQueueHealthCheckOptions options)
: base(options)
public AzureServiceBusQueueHealthCheck(AzureServiceBusQueueHealthCheckOptions options, ServiceBusClientProvider clientProvider)
: base(options, clientProvider)
{
Guard.ThrowIfNull(options.QueueName, true);
}

public AzureServiceBusQueueHealthCheck(AzureServiceBusQueueHealthCheckOptions options)
: this(options, new ServiceBusClientProvider())
{ }

/// <inheritdoc />
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
Expand All @@ -32,19 +34,20 @@ public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context
return new HealthCheckResult(context.Registration.FailureStatus, exception: ex);
}

Task CheckWithReceiver()
async Task CheckWithReceiver()
{
var client = ClientConnections.GetOrAdd(ConnectionKey, _ => CreateClient());
var receiver = ServiceBusReceivers.GetOrAdd(
$"{nameof(AzureServiceBusQueueHealthCheck)}_{ConnectionKey}",
client.CreateReceiver(Options.QueueName));
var client = await ClientCache.GetOrAddAsyncDisposableAsync(ConnectionKey, _ => CreateClient()).ConfigureAwait(false);
var receiver = await ClientCache.GetOrAddAsyncDisposableAsync(
$"{nameof(AzureServiceBusQueueHealthCheck)}_{queueKey}",
TomMalow marked this conversation as resolved.
Show resolved Hide resolved
_ => client.CreateReceiver(Options.QueueName))
.ConfigureAwait(false);

return receiver.PeekMessageAsync(cancellationToken: cancellationToken);
await receiver.PeekMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

Task CheckWithManagement()
{
var managementClient = ManagementClientConnections.GetOrAdd(ConnectionKey, _ => CreateManagementClient());
var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient());

return managementClient.GetQueueRuntimePropertiesAsync(Options.QueueName, cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,24 @@ public class AzureServiceBusQueueMessageCountThresholdHealthCheck : AzureService
private readonly AzureServiceBusQueueMessagesCountThreshold? _activeMessagesThreshold;
private readonly AzureServiceBusQueueMessagesCountThreshold? _deadLetterMessagesThreshold;

public AzureServiceBusQueueMessageCountThresholdHealthCheck(AzureServiceBusQueueMessagesCountThresholdHealthCheckOptions options)
: base(options)
public AzureServiceBusQueueMessageCountThresholdHealthCheck(AzureServiceBusQueueMessagesCountThresholdHealthCheckOptions options, ServiceBusClientProvider clientProvider)
: base(options, clientProvider)
{
_queueName = Guard.ThrowIfNull(options.QueueName);
_activeMessagesThreshold = options.ActiveMessages;
_deadLetterMessagesThreshold = options.DeadLetterMessages;
}

protected override string ConnectionKey => $"{Prefix}_{_queueName}";
public AzureServiceBusQueueMessageCountThresholdHealthCheck(AzureServiceBusQueueMessagesCountThresholdHealthCheckOptions options)
: this(options, new ServiceBusClientProvider())
{ }

/// <inheritdoc />
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
try
{
var managementClient = ManagementClientConnections.GetOrAdd(ConnectionKey, CreateManagementClient());
var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient());

var properties = await managementClient.GetQueueRuntimePropertiesAsync(_queueName, cancellationToken).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ namespace HealthChecks.AzureServiceBus;

public class AzureServiceBusSubscriptionHealthCheck : AzureServiceBusHealthCheck<AzureServiceBusSubscriptionHealthCheckHealthCheckOptions>, IHealthCheck
{
private string? _connectionKey;
private string subscriptionKey => $"{ConnectionKey}_{Options.TopicName}_{Options.SubscriptionName}";

protected override string ConnectionKey =>
_connectionKey ??= $"{Prefix}_{Options.TopicName}_{Options.SubscriptionName}";

public AzureServiceBusSubscriptionHealthCheck(AzureServiceBusSubscriptionHealthCheckHealthCheckOptions options)
: base(options)
public AzureServiceBusSubscriptionHealthCheck(AzureServiceBusSubscriptionHealthCheckHealthCheckOptions options, ServiceBusClientProvider clientProvider)
: base(options, clientProvider)
{
Guard.ThrowIfNull(options.TopicName, true);
Guard.ThrowIfNull(options.SubscriptionName, true);
}

public AzureServiceBusSubscriptionHealthCheck(AzureServiceBusSubscriptionHealthCheckHealthCheckOptions options)
: this(options, new ServiceBusClientProvider())
{ }

/// <inheritdoc />
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
Expand All @@ -34,19 +35,20 @@ public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context
return new HealthCheckResult(context.Registration.FailureStatus, exception: ex);
}

Task CheckWithReceiver()
async Task CheckWithReceiver()
{
var client = ClientConnections.GetOrAdd(ConnectionKey, _ => CreateClient());
var receiver = ServiceBusReceivers.GetOrAdd(
$"{nameof(AzureServiceBusSubscriptionHealthCheck)}_{ConnectionKey}",
client.CreateReceiver(Options.TopicName, Options.SubscriptionName));
var client = await ClientCache.GetOrAddAsyncDisposableAsync(ConnectionKey, _ => CreateClient()).ConfigureAwait(false);
var receiver = await ClientCache.GetOrAddAsyncDisposableAsync(
$"{nameof(AzureServiceBusSubscriptionHealthCheck)}_{subscriptionKey}",
TomMalow marked this conversation as resolved.
Show resolved Hide resolved
_ => client.CreateReceiver(Options.TopicName, Options.SubscriptionName))
.ConfigureAwait(false);

return receiver.PeekMessageAsync(cancellationToken: cancellationToken);
await receiver.PeekMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
}

Task CheckWithManagement()
{
var managementClient = ManagementClientConnections.GetOrAdd(ConnectionKey, _ => CreateManagementClient());
var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient());

return managementClient.GetSubscriptionRuntimePropertiesAsync(
Options.TopicName, Options.SubscriptionName, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ namespace HealthChecks.AzureServiceBus;

public class AzureServiceBusTopicHealthCheck : AzureServiceBusHealthCheck<AzureServiceBusTopicHealthCheckOptions>, IHealthCheck
{
private string? _connectionKey;

protected override string ConnectionKey => _connectionKey ??= $"{Prefix}_{Options.TopicName}";

public AzureServiceBusTopicHealthCheck(AzureServiceBusTopicHealthCheckOptions options)
: base(options)
public AzureServiceBusTopicHealthCheck(AzureServiceBusTopicHealthCheckOptions options, ServiceBusClientProvider clientProvider)
: base(options, clientProvider)
{
Guard.ThrowIfNull(options.TopicName, true);
}

public AzureServiceBusTopicHealthCheck(AzureServiceBusTopicHealthCheckOptions options)
: this(options, new ServiceBusClientProvider())
{ }

/// <inheritdoc />
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
var managementClient = ManagementClientConnections.GetOrAdd(ConnectionKey, _ => CreateManagementClient());
var managementClient = ClientCache.GetOrAdd(ConnectionKey, _ => CreateManagementClient());

_ = await managementClient.GetTopicRuntimePropertiesAsync(Options.TopicName, cancellationToken).ConfigureAwait(false);

Expand Down
20 changes: 20 additions & 0 deletions src/HealthChecks.AzureServiceBus/ServiceBusClientProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Azure.Core;
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;

namespace HealthChecks.AzureServiceBus;

public class ServiceBusClientProvider
{
public virtual ServiceBusClient CreateClient(string? connectionString)
=> new ServiceBusClient(connectionString);

public virtual ServiceBusClient CreateClient(string? fullyQualifiedName, TokenCredential credential)
=> new ServiceBusClient(fullyQualifiedName, credential);

public virtual ServiceBusAdministrationClient CreateManagementClient(string? connectionString)
=> new ServiceBusAdministrationClient(connectionString);

public virtual ServiceBusAdministrationClient CreateManagementClient(string? fullyQualifiedName, TokenCredential credential)
=> new ServiceBusAdministrationClient(fullyQualifiedName, credential);
}
Loading
Loading