Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from batch to record-by-record processing in case an exception occured. #2233

Open
rloeffel opened this issue Apr 13, 2022 · 15 comments

Comments

@rloeffel
Copy link

rloeffel commented Apr 13, 2022

Current Behavior

When processing serveral records at the same time, it is sometimes impossible to say which record caused an exception and therefore you cannot rise a BatchListenerFailedException. In consequence the listener will reprocess the whole batch again according to https://docs.spring.io/spring-kafka/reference/html/#retrying-batch-eh

Expected Behavior

It would be helpful if one could switch to a record-by-record processing in case an exception occured during batch processing. Then one could detect which record caused the exception.

Context
I opened also a question on stackoverflow: https://stackoverflow.com/questions/71856309/spring-kafka-errorhandling-for-batchlisteners

@rloeffel rloeffel changed the title Switch from batch processing to record-by-record processing in case an exception occured. Switch from batch to record-by-record processing in case an exception occured. Apr 13, 2022
@garyrussell
Copy link
Contributor

garyrussell commented Apr 13, 2022

Don't ask questions in multiple places; it is a waste of your time, and ours.

We cannot switch between batch and record listeners when an exception occurs.

See the documentation for how to handle this use case.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

Use

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < records.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", e, i);
        }
    }
}

or

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<ConsumerRecord<String, Thing>> records) {
    things.forEach(record ->
        try {
            process(record);
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", e, record);
        }
    }
}

This will work (in 2.8.x) with the DefaultErrorHandler (which is the default). For earlier versions, it requires the RecoveringBatchErrorHandler.

@rloeffel
Copy link
Author

Thank you for your fast reply and sorry for asking the question on several places.

i read the manual several times and I also studied those code snippets. I do not understand why I should consume several records at once but process them separately. if I would like to do so, then I could use the "normal" record listener, correct?

We are processing several records together, this allows for higher troughput since we can load further external data more efficiently. Everything works as expected and in a performant manner - except the errorhandling. thats why I opened this issue.

@tomazfernandes
Copy link
Contributor

tomazfernandes commented Apr 13, 2022

I think maybe what you want is something like:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<ConsumerRecord<String, Thing>> records) {

    try {
         processBatch(records)
    } catch (Exception ex) {
      logger.warn("Error occurred when processing batch, switching to per record", ex);
      things.forEach(record ->
          try {
              process(record);
          }
          catch (Exception e) {
              throw new BatchListenerFailedException("Failed to process", e, record);
          }
      }
}

While I agree perhaps it might be interesting for the framework to provide such a solution OOTB, I fail to see what performance penalties you refer to, and what could be done to mitigate it - maybe you can elaborate on that?

@garyrussell
Copy link
Contributor

garyrussell commented Apr 13, 2022

Or something like this:

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < records.size(); i++) {
        try {
            validate(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to validate", e, i);
        }
        try {
            processBatch(things);
        }
        catch (MyExceptionWithFailedIndex e) {
            throw new BatchListenerFailedException("Failed to process", e, e.getFailedIndex());
        }
    }
}

@rloeffel
Copy link
Author

Thank you for replies.

Regarding the first solution: This is exaclty what I am looking for. But it is implemented on the listener, not in the framework. This has the drawback that it needs to be implemented on each listener and cannot be solved in a global manner. regarding the performance penalties: suppose you need to load some dependent data from a database inside the processBatch() method. When you have all the records, you are able to load those dependent object with a single query instead of many multiple queries.

The second solution: I see 2 drawbacks.

  1. you can validate the records only to a certain extent and you cannot guarantee that there wont be another error while processing the records. This brings to the 2 drawback
  2. There you assume that you can determine which record caused the exception, but you just wrapped into another exception class. but this is actually my initial problem....

Obviously the framework supports a certain strategy when the listener fails without a BatchListenerFailedException, but this behaviour cannot be changed. What i would like to do is to replace the batcherrorhandling with your first solution. wdyt?

@garyrussell
Copy link
Contributor

but this is actually my initial problem....

Any code that processes a batch will iterate over the list, so I see no problem with determining which record failed.

Such a change would require significant changes to the listener container, which is already quite complex.

We do have the concept of a BatchToRecordAdapter this enables, for example a whole batch to be processed in a single transaction but the listener is invoked one record at a time.

However, that is an all or nothing configuration; you can't switch between the two dynamically; again, the method signature of the batch listener method expects a list; you would need a different kind of adapter, to present the records as a singleton list for each record.

One way to separate your logic from your listener would be to add an Advice to the listener's advice chain containing code similar to @tomazfernandes example:

@SpringBootApplication
public class Kgh2233Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh2233Application.class, args);
	}

	@KafkaListener(id = "kgh2233", topics = "kgh2233")
	void listen(List<String> list) {
		System.out.println(list);
		list.forEach(in -> {
			System.out.println(in);
			if ("fail".equals(in)) {
				throw new RuntimeException("test");
			}
		});
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("kgh2233").partitions(1).replicas(1).build();
	}

	@Bean
	ApplicationRunner runner(KafkaTemplate<String, String> template,
			ConcurrentKafkaListenerContainerFactory<?, ?> factory,
			Advice batchRetryer) {

		factory.getContainerProperties().setAdviceChain(batchRetryer);
		factory.setCommonErrorHandler(new DefaultErrorHandler((rec, ex) -> {
			System.out.println("Retries exhausted for " + KafkaUtils.format(rec));
		}, new FixedBackOff(3000, 3)));

		return args -> {
			template.send("kgh2233", "foo");
			template.send("kgh2233", "bar");
			template.send("kgh2233", "fail");
		};
	}

	@Bean
	MethodInterceptor batchRetryer() {
		return invocation -> {
			try {
				return invocation.proceed();
			}
			catch (Exception e) {
				try {
					List<ConsumerRecord<?, ?>> records = (List<ConsumerRecord<?, ?>>) invocation.getArguments()[0];
					records.forEach(record -> {
						invocation.getArguments()[0] = Collections.singletonList(record);
						try {
							invocation.proceed();
						}
						catch (Throwable e1) {
							throw new BatchListenerFailedException("Failed", e1, record);
						}
					});
					return null;
				}
				catch (Exception e2) {
					throw e2;
				}
			}
		};
	}

}
[foo, bar, fail]
foo
bar
fail
[foo]
foo
[bar]
bar
[fail]
fail
...
[fail]
fail
[fail]
fail
...
[fail]
fail
[fail]
fail
...
[fail]
fail
[fail]
fail
Retries exhausted for kgh2233-0@38

@tomazfernandes
Copy link
Contributor

Well, @garyrussell's solution is much more interesting than what I had in mind - I was thinking about implementing a custom MessageListenerAdapter by doing something similar to FilteringBatchMessageListenerAdapter, and adding this behavior there. But I do like this Advice solution better.

As for the strategy as whole, depending on your actual requirements, you might want to look into splitting your batch in half recursively instead of moving to one-by-one at the first exception.

This way, assuming there's one poison pill in the second half of a batch, the first half will be processed as a whole, then the second half will fail and be split in two. Let's say the new first half fails, it'll be split again, and so on, until you reach the poison pill and fail only that. This way you might be able to reduce significantly the number of records processed one-by-one, at the expense of reprocessing some records more than once. Again, depends on your actual metrics and requirements.

@garyrussell
Copy link
Contributor

Hopefully, errors are rare, so I would consider holding off from adding more complexity unless you find you need it - I try to avoid premature optimization?

@tomazfernandes
Copy link
Contributor

Sure, depends on the actual requirements. Maybe errors are indeed rare and this wouldn't make a difference, and so totally not worth it. But maybe one error in a 1K batch is not that rare, and might have a huge impact on overall performance if everything would then be processed one by one. Even in a 250 record batch it might make a noticeable difference.

Implementing this pattern recursively doesn't add that much complexity to the code IMO, if the performance is worth it - just extract the logic to a separate method and call it with each half of the batch in the catch block. Of course, beware of the stack size to avoid a stack overflow, and log everything properly so to not get lost when this happens.

I've actually implemented this pattern I described before and it worked out great, that's what called my attention to this issue - sorry for dropping in unannounced 😄

@garyrussell
Copy link
Contributor

No apology needed; your opinion is valued.

@garyrussell
Copy link
Contributor

One consideration to take into account with solutions like this is that it could cause a significant increase in the time taken to process a batch, without polling the consumer, so you might need to increase max.poll.interval.ms and/or decrease max.poll.records, to avoid a rebalance.

One way to mitigate that would be to keep some state in the advice and throw the first exception to the container, then do the split or binary chop when the container resubmits the same batch.

@rloeffel
Copy link
Author

Thank you for all that valuable input

I see there are some possible approaches to solve the problem. I will check them out and evalute which one fits well for my purpose

One very last question regarding the default behaviour https://docs.spring.io/spring-kafka/reference/html/#retrying-batch-eh
The situation there is that a batch listener failed without throwing a BatchListenerFailedException. I assume this is the case when the listener is not able to detect which record caused the problem (or are there other cases?). So the retry will only solve temporary issues but not record related issues. Furthermore it calls the recoverer for each record after the retries are exhausted.
So my question: What was the design intention behind this solution? whats the targeted usecase?

@garyrussell
Copy link
Contributor

I can give you a bit of history, if that will help.

When we first added support for batch listeners, we had only one standard error handler, the SeekToCurrentBatchErrorHandler; when any exception occurs, all the partitions in the batch had seeks performed so the records will be redelivered by the next poll.

The problem with that handler is it would retry forever, with no recovery, because there was no easy way to save state for the batch; there is no guarantee that the batch on the next poll will be the same; records from different partitions could arrive in a different order and/or the record count could be different.

The next iteration added the RetryingBatchErrorHandler; this solved the problem by sending the entire batch to the recoverer (e.g. to a dead letter topic) after retries are exhausted; since the framework has no idea which record in the batch failed, the whole batch is considered to be "bad".

We then added the RecoveringBatchErrorHandler where the application can communicate which record failed via the BatchListenerExecutionFailedException. This allowed committing the offsets of the previous records, retrying the failed record, and finally just sending that one record to the recoverer before submitting the remainder to the listener in a new batch.

If some other exception was thrown, it fell back to the (never ending) SeekToCurrentBatchErrorHandler.

Finally in 2.8, the DefaultErrorHandler replaced the RecoveringBatchErrorHandler, but now falls back to the same functionality as the RetryingBatchErrorHandler (retry some number of times, then send the whole batch to the recoverer).

Hope that helps; if you have other ideas on how a batch error handler should behave (without assistance from the application), we'd be happy to listen.

In general, we feel that the framework is doing the best it can; for more sophistication, we recommend handling exceptions in batch listeners within the listener itself.

@rloeffel
Copy link
Author

Thank you for this little lesson in spring-kafka-batch-error-handling-history :-)

Maybe I can share with you the experience I made during the last few years. I am actually developing a kind of company-level framework over spring-kafka which simplifies, streamlines and limits certain configuration. This makes it a lot easier for our application developers to get started quickly. So I am a bit used framework programming.

Regarding the batch-error-handling it feels strange that things like BatchListenerExecutionFailedException leak into my business application code. Thats a very technical concept which messes up the business logic. My preferred solution would be that the framework tries to identifiy the failing record by itself, with the bisection approach describe by @tomazfernandes. This would also free the application code from that BatchListenerExecutionFailedException.

Just to be clear: I'm really greatful that you guys are delivering this awesome software. I can understand if such a concept would not fit into your architecture/strategy of your product.

Happy easter and Thank you very much

@garyrussell
Copy link
Contributor

Thanks; I understand. I will reopen this, and put it on the backlog, with a view to the framework providing a standard advice that can be added to the container config by the user.

Contributions are welcome!

We can also consider any other contributions for features you feel are missing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants