Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the ability to add record interceptors instead of override them #3542

Open
Nyamiou opened this issue Oct 8, 2024 · 5 comments
Open

Comments

@Nyamiou
Copy link

Nyamiou commented Oct 8, 2024

Expected Behavior

I'm using a Spring Cloud Steam ListenerContainerCustomizer to customize a AbstractMessageListenerContainer in order to add a record interceptor. I would expect to be able to add a record interceptor at the last position, or to be able to get the current record interceptor, create a CompositeRecordInterceptor to do the same thing and then set it (either is fine).

Current Behavior

The issue I have is that only setRecordInterceptor(...) is public, not getRecordInterceptor() and there is no addRecordInterceptor(...) either. I would like to keep the existing record interceptors there and just add mine in last position but there seem to be no easy way to do this.

Context

I have a code similar to this:

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> listenerContainerCustomizer() {
	return (container, destinationName, group) -> container.setRecordInterceptor(new MyRecordInterceptor());
}

That I want to use in a small company internal library, but I don't want to remove the other projects the ability to use their own record interceptors.

I tried to use Spring Cloud Steam ChannelInterceptor (with GlobalChannelInterceptor) to achieve the thing I wanted, but they call afterSendCompletion(...) after each attempts, while the record interceptor afterRecord(...) is called after all attempts. Also they do not allow to modify headers.

@artembilan
Copy link
Member

Thank for the report!
We don't find this as an easy fix since there is going to be too many moving parts with those add/remove([indext]) in the CompositeRecordInterceptor and AbstractMessageListenerContainer.
So, unless you see how to fix it quickly, we are going to look into that for the next version.

@Nyamiou
Copy link
Author

Nyamiou commented Oct 8, 2024

No problem, this is not urgent. I just wanted this to hopefully be fixed in the future. Thank you for answering so quickly.

@chickenchickenlove
Copy link
Contributor

chickenchickenlove commented Oct 9, 2024

@Nyamiou
I have an idea for you. maybe, it could bo short-term solution.
How about using this class for your case?

public class TestRecordInterceptor<K,V> implements RecordInterceptor<K, V> {

    private final List<RecordInterceptor<K, V>> children = new ArrayList<>();

    public void addRecordInterceptor(RecordInterceptor<K, V> interceptor) {
        this.children.add(interceptor);
    }
    
    @Override
    public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
        ConsumerRecord<K, V> invokedRecord = record;
        for (RecordInterceptor<K, V> child : children) {
            invokedRecord = child.intercept(invokedRecord, consumer);
        }
        return invokedRecord;
    }
    
}

@artembilan
Copy link
Member

Right, @chickenchickenlove , that will work for some use-case.
But I believe @Nyamiou 's request is more about AbstractMessageListenerContainer internals, where an addRecordInterceptor() API would be exposed to deal with a CompositeRecordInterceptor behind the scene.
That's why the request, but not a workaround in the target project.

@chickenchickenlove
Copy link
Contributor

I see!
Thank you for the explanation 🙇‍♂️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants