Purpose of this repository is to build a messaging client that encapuslates a consistent way of using the Azure.Messaging.EventHubs.Processor and Azure.Messaging.EventHubs.Producer.
The client implements a standardized Envelope, while hiding the details of communicating, leaving the developer to focus on the business logic.
Both message producer and consumer use Managed Identity authentication when connecting to Azure EventHub.
{
"eventhub-id": ""
"eventhub-storage-container": ""
}
By default Messaging Client will look for these values in the IConfiguration provider at setup time, but both producer and consumer have constructor overloads that allow you to provide these values directly.
As always with managed identity you avoid keeping secrets in configuration and access is entirely controlled by the identity of the machines running the instance.
Setup IMessageConsumer and IMessageProducer using your favorite DI tool. In order to use both the consumer and producer your application needs to know how to initialize it, the following example uses Lamar IOC framework.
using Messaging.Client;
public class MessagingClientRegistry : ServiceRegistry
{
public MessagingClientRegistry()
{
For<IMessageProducer>().Use(x => new MessageProducer(x.GetInstance<IConfiguration>())).Singleton();
For<IMessageConsumer>()
.Use(x => new MessageConsumer(x.GetInstance<IConfiguration>(), x.GetInstance<ILogger<MessageConsumer>>()))
.Singleton();
}
}
Both IMessageProducer and IMessageConsumer are meant to be long lived singleton instances, It is not recommended to instansiate multiple consumers in the same application, as they will default based on the applications identity to using the same consumer group, and cause problems with offsets.
For more advanced scenarios it is recommended to implement a custom consumer pattern using the Azure.Messaging.EventHubs.MessageProcessor library directly.
Implement a message handler using the generic interface IMessageHandler where T is you message type, messages are deserialized using System.Text.Json, and should use either fields or properties:
public interface ITestHandler : IMessageHandler<TestMessage>
{
}
class TestHandler : ITestHandler
{
public void Handle(TestMessage message, Guid messageId)
{
Console.WriteLine($"test message {message.MyDate}");
}
}
Using a dedicated interface for your handler is optional but genrally recommended as it plays well with most DI's.
In you applications persistent logic (such as the main running thread of a hosted service) inject the IMessageConsumer interface and any relevant handler interfaces.
The wire up the consumer with the handlers in its own running thread:
var consumer = Task.Run(() =>
{
var tokenSource = new CancellationTokenSource();
_consumer
.RegisterHandler(_handleTestMessage);
consumer.ConsumeWithManagedIdentity(tokenSource.Token);
}).ContinueWith(HandleTaskCancellation, cancellationToken);
_task.Add(consumer);
Note that its important to add the consumer task to the list of tasks so that they can be waited before shutdown.