Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poison handling with RabbitMQ Qourum queues #114

Open
zlepper opened this issue Dec 15, 2023 · 4 comments
Open

Poison handling with RabbitMQ Qourum queues #114

zlepper opened this issue Dec 15, 2023 · 4 comments

Comments

@zlepper
Copy link
Contributor

zlepper commented Dec 15, 2023

Greetings :D

I created this a long time ago for the main Rebus repo: rebus-org/Rebus#1044 where we agreed that there is no way Rebus itself can solve this.

However now that Qourum queues are well supported in RabbitMQ there is actually something we can do. From the docs here: https://www.rabbitmq.com/quorum-queues.html#poison-message-handling it is possible to check the x-delivery-count header. I have tacked together some code to do it, however i'm not sure if it's something that should be integrated in Rebus.RabbitMq directly:

await using var services = new ServiceCollection()
    .AddRebus(r =>
    {
        return r.Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost:5672", "deliver-test").InputQueueOptions(
            o => { o.AddArgument("x-queue-type", "quorum"); })).Options(o =>
        {
            o.Decorate<IPipeline>(c =>
            {
                var pipeline = c.Get<IPipeline>();

                var injector = new PipelineStepInjector(pipeline);

                injector.OnReceive(new TooManyDeliveryAttemptsRejector(), PipelineRelativePosition.After,
                    typeof(DefaultRetryStep));

                return injector;
            });
            
            o.Decorate<ITransport>(c =>
            {
                var transport = c.Get<ITransport>();

                return new ClearDeliveryHeaderTransport(transport);
            });
            
            o.LogPipeline();
        });
    })
    .AddRebusHandler<MyHandler>()
    .BuildServiceProvider(true);

foreach (var hostedService in services.GetServices<IHostedService>())
{
    await hostedService.StartAsync(default);
}

var bus = services.GetRequiredService<IBus>();

// await bus.SendLocal("a message");
//
await Task.Delay(100000000);


class MyHandler : IHandleMessages<string>
{
    public Task Handle(string message)
    {
        Span<byte> lotsOfData = stackalloc byte[100_000_000];
        Console.WriteLine($"Received message: {message}, {lotsOfData.Length}");
        return Task.CompletedTask;
    }
}

[StepDocumentation("""
                   Rejects messages that have been delivered too many times.

                   If Rebus has not handled this using the normal error strategies it might be because the message is killing
                   the process preventing Rebus from handling it.
                   """)]
class TooManyDeliveryAttemptsRejector : IIncomingStep
{
    private const int MaxDeliveryAttempts = 5;

    public async Task Process(IncomingStepContext context, Func<Task> next)
    {
        var message = context.Load<TransportMessage>();

        if (message == null)
        {
            await next();
            return;
        }

        var headers = message.Headers;

        if (headers.TryGetValue(MoreRabbitMqHeader.DeliveryCount, out var deliveryCount) &&
            int.TryParse(deliveryCount, out var count))
        {
            if (count > MaxDeliveryAttempts)
            {
                throw new MaxDeliverAttemptsExceededException(
                    $"Max delivery attempts exceeded of {MaxDeliveryAttempts}. Deliver attempt count is {count}.");
            }
        }

        await next();
    }
}

class ClearDeliveryHeaderTransport : ITransport
{
    private readonly ITransport _transportImplementation;

    public ClearDeliveryHeaderTransport(ITransport transportImplementation)
    {
        _transportImplementation = transportImplementation;
    }

    public void CreateQueue(string address)
    {
        _transportImplementation.CreateQueue(address);
    }

    public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context)
    {
        message.Headers.Remove(MoreRabbitMqHeader.DeliveryCount);

        return _transportImplementation.Send(destinationAddress, message, context);
    }

    public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
    {
        return _transportImplementation.Receive(context, cancellationToken);
    }

    public string Address => _transportImplementation.Address;
}

public class MoreRabbitMqHeader
{
    /// <summary>
    /// How many times rabbitmq has attempted to deliver this message.
    /// This is only supported when working with Quorum queues.
    /// </summary>
    public const string DeliveryCount = "x-delivery-count";
}

public class MaxDeliverAttemptsExceededException : Exception, IFailFastException
{
    public MaxDeliverAttemptsExceededException(string? message) : base(message)
    {
    }
}

This has to be tested in a separate process and not a unit test to really test it, as it has to take down the entire process for Rabbit to do the incrementing.

I couldn't find a way to mutate headers when handling error messages except for wrapping the transport itself and having it clear the header. Outgoing pipeline decorations seems to be ignored when processing error messages.

It should probably also read the max attempts from the retry strategy, but i tried to keep it as simple as possible.

@mookid8000
Copy link
Member

Hi there, sorry for not replying sooner 😅 It is definitely an interesting approach, but I'm thinking that maybe it could be "upgraded" a bit and formalized as a new, optional rbs2-delivery-count header, which the transport could provide when possible.

I've published Rebus 8.2.0-alpha01 to NuGet.org just now where I've added the header under Headers.DeliveryCount and added the necessary logic to DefaultRetryStep to act on it.

Only thing left (I think) is to make transports that support it set the header when they build the TransportMessage object. Unfortunately I don't have time to implement it in Rebus.RabbitMq right now, so feel free to do it 😉 or wait for me to come around again (which I'll probably do some time this week)

@zlepper
Copy link
Contributor Author

zlepper commented Jan 2, 2024

Hi there, sorry for not replying sooner 😅 It is definitely an interesting approach, but I'm thinking that maybe it could be "upgraded" a bit and formalized as a new, optional rbs2-delivery-count header, which the transport could provide when possible.

I've published Rebus 8.2.0-alpha01 to NuGet.org just now where I've added the header under Headers.DeliveryCount and added the necessary logic to DefaultRetryStep to act on it.

Only thing left (I think) is to make transports that support it set the header when they build the TransportMessage object. Unfortunately I don't have time to implement it in Rebus.RabbitMq right now, so feel free to do it 😉 or wait for me to come around again (which I'll probably do some time this week)

No problem, Christmas vacation is important!

I mostly put it here to get the ideas down, and to act as a POC. I was very sure the exact implementation was going to differ since you know the code much better than me :D

@mookid8000
Copy link
Member

Ok I just had 10 minutes to try 🙂 I've added a test here: https://github.com/rebus-org/Rebus.RabbitMq/blob/master/Rebus.RabbitMq.Tests/RabbitMqNativeDeliveryCount.cs

and the implementation just provides the value from the x-delivery-count header when it is there.

Haven't had a quorum queue to test on yet, though.... feel free to take it for a spin if you like

it's out as Rebus.RabbitMq 9.1.0-alpha01 on NuGet.org now

@zlepper
Copy link
Contributor Author

zlepper commented Jan 2, 2024

Just took a quick look at the implementation. It probably needs to clear the header when passing the message to the error queue, otherwise simple retrying by using the Rabbit management interface to move messages from the error queue to the work queue will cause the messages to be considered failed immediately, at least that was the result from my testing originally :)

But this is going to be lovely, much less dead processes when BAD things happens :D :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants