Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConsumeFilter.Send is executed after IConsumer is instantiated #1

Open
alekbarszczewski opened this issue Apr 9, 2021 · 5 comments

Comments

@alekbarszczewski
Copy link

I have issue with Consume Filter - it seems it's getting executed after IConsumer instance is created.
In my case I want to take message header ("User-Guid" in the example below) and inject it into scoped service (IAuthContextContainer with UserContext property) and then inject IAuthContextContainer into consumer class.
The problem is that I have also some more complex DI related to AuthContextContainer which internally uses IServiceProvider.GetRequiredService<>() (in consumer constructor scope, not in any method). In this case it throws error because ConsumeFilter haven't been executed yet (see output below).

Is is by design? Is there any workaround?

Consumer filter:

public class AuthContextConsumeFilter<T> :
        IFilter<ConsumeContext<T>>
        where T : class
    {
        readonly IAuthContextContainer _authContextContainer;

        public AuthContextConsumeFilter(IAuthContextContainer authContextContainer)
        {
            _authContextContainer = authContextContainer;
        }

        public Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
        {
            var userGuidStr = context.Headers.Get<string>("User-Guid");

            if (!string.IsNullOrWhiteSpace(userGuidStr))
            {
                _authContextContainer.UserContext = new UserContext(Guid.Parse(userGuidStr));
            }
            
            Console.WriteLine("AuthContextConsumerFilter executed");

            return next.Send(context);
        }

        public void Probe(ProbeContext context)
        {
        }
    }

Consumer:

public class CommandService
        : IConsumer<MyPayload>
    {
        readonly IAuthContextContainer _authContextContainer;
        
        protected CommandService(IAuthContextContainer authContextContainer;)
        {
            _authContextContainer = authContextContainer;
           Console.WriteLine($"AuthContextConsumer::Ctor {_authContextContainer.UserContext}");
        }
        
        public async Task Consume(ConsumeContext<MyPayload> consumeContext)
        {
            Console.WriteLine($"AuthContextConsumer::Consume {_authContextContainer.UserContext}");
        }

Output:

AuthContextConsumer::Ctor // <= no UserContext here
AuthContextConsumerFilter executed // <= executed after consumer constructor
AuthContextConsumer::Consume Airstrike.Backend.Common.Application.Authentication.UserContext // user context is injected here

@phatboyg
Copy link
Member

phatboyg commented Apr 9, 2021

Well, the consumer scope doesn't technically exist until the consumer is created. So that's how it is currently done, after the consumer is created.

Alternatively, I think you can UseMessageScope and then access IServiceProvider from the ConsumeContext payload (context.GetPayload<IServiceScope>().ServiceProvider) instead of using a scoped filter. That could be done prior to the consumer scope.

@alekbarszczewski
Copy link
Author

Hmm, not sure how to do it. I tried with UseInlineFilter and ConnectConsumeObserver (both are executed before consumer is instantiated it seems) but ConsumeContext.GetPayload<T>() always needs type parameter T so I don't know how to make it generic for all message types.

For now the only option I can see is to wrap each consumer in generic abstract class that does what you said (context.GetPayload<T>().ServiceProvider) before executing consume handler. But it would be better for sure to make it work "under the hood".

@phatboyg
Copy link
Member

phatboyg commented Apr 9, 2021

Comment updated, it wasn't showing the generic argument. And if you look at how UseMessageRetry uses the observer to add filters per message type, that would do it.

@alekbarszczewski
Copy link
Author

I couldn't make it work with Observer (pretty complicated stuff in UserMessageRetry) but I came out with following idea which seems to work for me, though it feels little bit hacky and I am not sure if it will work in every situation:

x.UsingRabbitMq((context, cfg) =>
                {
                    cfg.UseMessageScope(context);
                    cfg.UseInlineFilter((consumeContext, pipe) =>
                    {
                        // this additional check was needed because it was throwing exception when 
                        // response was being received in RequestClient
                        // R-FAULT rabbitmq://localhost/mbpalek_AirstrikeBackendGraphqlPublicGateway_bus_4wdoyyro5ycgm8m4bdcxzrhjnp?temporary=true d5070000-90d8-1865-c744-08d8fb938de6 00:00:00.2633040
                        // GreenPipes.PayloadNotFoundException: The payload was not found: Microsoft.Extensions.DependencyInjection.IServiceScope
                        var isResponse = consumeContext.ReceiveContext.InputAddress == consumeContext.DestinationAddress;
                        if (!isResponse)
                        {
                            var authContextContainer = consumeContext.GetPayload<IServiceScope>().ServiceProvider
                                .GetRequiredService<IAuthContextContainer>();

                            var userGuidStr = consumeContext.Headers.Get<string>("User-Guid");

                            if (!string.IsNullOrWhiteSpace(userGuidStr))
                            {
                                authContextContainer.UserContext = new UserContext(Guid.Parse(userGuidStr));
                            }
                        }

                        return pipe.Send(consumeContext);
                    });
                    cfg.UseSendFilter(typeof(AuthContextSendFilter<>), context); // same as in your example
                    cfg.UsePublishFilter(typeof(AuthContextPublishFilter<>), context); // same as in your example
                    cfg.ConfigureEndpoints(context);
                });

Does it make sense? Any idea if it's going to work (it works for now)?
Especially is var isResponse = consumeContext.ReceiveContext.InputAddress == consumeContext.DestinationAddress; legit? I couldn't find any other way to determine if ConsumeContext is related to response or request.

@phatboyg
Copy link
Member

phatboyg commented Apr 9, 2021

You can tweak it for safety as shown:

if(consumeContext.TryGetPayload<IServiceScope>(out var scope))
    scope.ServiceProvider.GetRequiredService<T>()...

To avoid the payload not found exception.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants