-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Switch from batch to record-by-record processing in case an exception occured. #2233
Comments
Don't ask questions in multiple places; it is a waste of your time, and ours. We cannot switch between batch and record listeners when an exception occurs. See the documentation for how to handle this use case. https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh Use @KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", e, i);
}
}
} or @KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<ConsumerRecord<String, Thing>> records) {
things.forEach(record ->
try {
process(record);
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", e, record);
}
}
} This will work (in 2.8.x) with the |
Thank you for your fast reply and sorry for asking the question on several places. i read the manual several times and I also studied those code snippets. I do not understand why I should consume several records at once but process them separately. if I would like to do so, then I could use the "normal" record listener, correct? We are processing several records together, this allows for higher troughput since we can load further external data more efficiently. Everything works as expected and in a performant manner - except the errorhandling. thats why I opened this issue. |
I think maybe what you want is something like: @KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<ConsumerRecord<String, Thing>> records) {
try {
processBatch(records)
} catch (Exception ex) {
logger.warn("Error occurred when processing batch, switching to per record", ex);
things.forEach(record ->
try {
process(record);
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", e, record);
}
}
} While I agree perhaps it might be interesting for the framework to provide such a solution OOTB, I fail to see what performance penalties you refer to, and what could be done to mitigate it - maybe you can elaborate on that? |
Or something like this: @KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
validate(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to validate", e, i);
}
try {
processBatch(things);
}
catch (MyExceptionWithFailedIndex e) {
throw new BatchListenerFailedException("Failed to process", e, e.getFailedIndex());
}
}
} |
Thank you for replies. Regarding the first solution: This is exaclty what I am looking for. But it is implemented on the listener, not in the framework. This has the drawback that it needs to be implemented on each listener and cannot be solved in a global manner. regarding the performance penalties: suppose you need to load some dependent data from a database inside the processBatch() method. When you have all the records, you are able to load those dependent object with a single query instead of many multiple queries. The second solution: I see 2 drawbacks.
Obviously the framework supports a certain strategy when the listener fails without a BatchListenerFailedException, but this behaviour cannot be changed. What i would like to do is to replace the batcherrorhandling with your first solution. wdyt? |
Any code that processes a batch will iterate over the list, so I see no problem with determining which record failed. Such a change would require significant changes to the listener container, which is already quite complex. We do have the concept of a However, that is an all or nothing configuration; you can't switch between the two dynamically; again, the method signature of the batch listener method expects a list; you would need a different kind of adapter, to present the records as a singleton list for each record. One way to separate your logic from your listener would be to add an @SpringBootApplication
public class Kgh2233Application {
public static void main(String[] args) {
SpringApplication.run(Kgh2233Application.class, args);
}
@KafkaListener(id = "kgh2233", topics = "kgh2233")
void listen(List<String> list) {
System.out.println(list);
list.forEach(in -> {
System.out.println(in);
if ("fail".equals(in)) {
throw new RuntimeException("test");
}
});
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("kgh2233").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentKafkaListenerContainerFactory<?, ?> factory,
Advice batchRetryer) {
factory.getContainerProperties().setAdviceChain(batchRetryer);
factory.setCommonErrorHandler(new DefaultErrorHandler((rec, ex) -> {
System.out.println("Retries exhausted for " + KafkaUtils.format(rec));
}, new FixedBackOff(3000, 3)));
return args -> {
template.send("kgh2233", "foo");
template.send("kgh2233", "bar");
template.send("kgh2233", "fail");
};
}
@Bean
MethodInterceptor batchRetryer() {
return invocation -> {
try {
return invocation.proceed();
}
catch (Exception e) {
try {
List<ConsumerRecord<?, ?>> records = (List<ConsumerRecord<?, ?>>) invocation.getArguments()[0];
records.forEach(record -> {
invocation.getArguments()[0] = Collections.singletonList(record);
try {
invocation.proceed();
}
catch (Throwable e1) {
throw new BatchListenerFailedException("Failed", e1, record);
}
});
return null;
}
catch (Exception e2) {
throw e2;
}
}
};
}
}
|
Well, @garyrussell's solution is much more interesting than what I had in mind - I was thinking about implementing a custom As for the strategy as whole, depending on your actual requirements, you might want to look into splitting your batch in half recursively instead of moving to one-by-one at the first exception. This way, assuming there's one poison pill in the second half of a batch, the first half will be processed as a whole, then the second half will fail and be split in two. Let's say the new first half fails, it'll be split again, and so on, until you reach the poison pill and fail only that. This way you might be able to reduce significantly the number of records processed one-by-one, at the expense of reprocessing some records more than once. Again, depends on your actual metrics and requirements. |
Hopefully, errors are rare, so I would consider holding off from adding more complexity unless you find you need it - I try to avoid premature optimization? |
Sure, depends on the actual requirements. Maybe errors are indeed rare and this wouldn't make a difference, and so totally not worth it. But maybe one error in a 1K batch is not that rare, and might have a huge impact on overall performance if everything would then be processed one by one. Even in a 250 record batch it might make a noticeable difference. Implementing this pattern recursively doesn't add that much complexity to the code IMO, if the performance is worth it - just extract the logic to a separate method and call it with each half of the batch in the I've actually implemented this pattern I described before and it worked out great, that's what called my attention to this issue - sorry for dropping in unannounced 😄 |
No apology needed; your opinion is valued. |
One consideration to take into account with solutions like this is that it could cause a significant increase in the time taken to process a batch, without polling the consumer, so you might need to increase One way to mitigate that would be to keep some state in the advice and throw the first exception to the container, then do the split or binary chop when the container resubmits the same batch. |
Thank you for all that valuable input I see there are some possible approaches to solve the problem. I will check them out and evalute which one fits well for my purpose One very last question regarding the default behaviour https://docs.spring.io/spring-kafka/reference/html/#retrying-batch-eh |
I can give you a bit of history, if that will help. When we first added support for batch listeners, we had only one standard error handler, the The problem with that handler is it would retry forever, with no recovery, because there was no easy way to save state for the batch; there is no guarantee that the batch on the next poll will be the same; records from different partitions could arrive in a different order and/or the record count could be different. The next iteration added the We then added the If some other exception was thrown, it fell back to the (never ending) Finally in 2.8, the Hope that helps; if you have other ideas on how a batch error handler should behave (without assistance from the application), we'd be happy to listen. In general, we feel that the framework is doing the best it can; for more sophistication, we recommend handling exceptions in batch listeners within the listener itself. |
Thank you for this little lesson in spring-kafka-batch-error-handling-history :-) Maybe I can share with you the experience I made during the last few years. I am actually developing a kind of company-level framework over spring-kafka which simplifies, streamlines and limits certain configuration. This makes it a lot easier for our application developers to get started quickly. So I am a bit used framework programming. Regarding the batch-error-handling it feels strange that things like BatchListenerExecutionFailedException leak into my business application code. Thats a very technical concept which messes up the business logic. My preferred solution would be that the framework tries to identifiy the failing record by itself, with the bisection approach describe by @tomazfernandes. This would also free the application code from that BatchListenerExecutionFailedException. Just to be clear: I'm really greatful that you guys are delivering this awesome software. I can understand if such a concept would not fit into your architecture/strategy of your product. Happy easter and Thank you very much |
Thanks; I understand. I will reopen this, and put it on the backlog, with a view to the framework providing a standard advice that can be added to the container config by the user. Contributions are welcome! We can also consider any other contributions for features you feel are missing. |
Current Behavior
When processing serveral records at the same time, it is sometimes impossible to say which record caused an exception and therefore you cannot rise a BatchListenerFailedException. In consequence the listener will reprocess the whole batch again according to https://docs.spring.io/spring-kafka/reference/html/#retrying-batch-eh
Expected Behavior
It would be helpful if one could switch to a record-by-record processing in case an exception occured during batch processing. Then one could detect which record caused the exception.
Context
I opened also a question on stackoverflow: https://stackoverflow.com/questions/71856309/spring-kafka-errorhandling-for-batchlisteners
The text was updated successfully, but these errors were encountered: