diff --git a/CHANGELOG.md b/CHANGELOG.md index 83042f651..c10c20d69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ ### Bugfix * fix the rule tree parsing some rules incorrectly, potentially resulting in more matches +* fix `confluent_kafka` commit issue after kafka did some rebalancing, fixes also negative offsets ## v8.0.0 ### Breaking diff --git a/logprep/abc/input.py b/logprep/abc/input.py index e565c3ee8..706c6ec5d 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -9,7 +9,7 @@ from abc import abstractmethod from functools import partial from hmac import HMAC -from typing import Optional, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from attrs import define, field, validators @@ -19,6 +19,9 @@ from logprep.util.time import UTC, TimeParser from logprep.util.validators import dict_structure_validator +if TYPE_CHECKING: # pragma: no cover + from logprep.abc.output import Output + class InputError(Exception): """Base class for Input related exceptions.""" @@ -180,7 +183,8 @@ class Config(Connector.Config): ) pipeline_index: int - __slots__ = ["pipeline_index"] + output_connector: Optional["Output"] + __slots__ = ["pipeline_index", "output_connector"] @property def _add_hmac(self): diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 5600e6e0b..9565c6a8f 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -446,18 +446,7 @@ def _handle_offsets(self, offset_handler: Callable) -> None: try: offset_handler(message=message) except KafkaException as error: - topic = self._consumer.list_topics(topic=self._config.topic) - partition_keys = list(topic.topics[self._config.topic].partitions.keys()) - partitions = [ - TopicPartition(self._config.topic, partition) for partition in partition_keys - ] - self._consumer.assign(partitions) - self._logger.warning( - f"{self._consumer.memberid()} was assigned to " - f"topic: {topic} | partition {partitions}, due to " - f"KafkaException: {error}" - ) - offset_handler(message=message) + raise InputWarning(self, f"{error}, {message}") from error def _assign_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: @@ -482,6 +471,8 @@ def _revoke_callback(self, consumer, topic_partitions): f"topic: {topic_partition.topic} | " f"partition {topic_partition.partition}" ) + self.output_connector._write_backlog() + self.batch_finished_callback() def _lost_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 25326f579..ab51c616a 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -186,6 +186,8 @@ def _setup(self): self.logger.debug("Creating connectors") for _, output in self._output.items(): output.input_connector = self._input + if output.default: + self._input.output_connector = output self.logger.debug( f"Created connectors -> input: '{self._input.describe()}'," f" output -> '{[output.describe() for _, output in self._output.items()]}'" diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index 3a16d3f10..f777c7bbf 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -127,7 +127,7 @@ def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, sett ], ) @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_batch_finished_callback_reassigns_partition_and_calls_again_on_kafka_exception( + def test_batch_finished_callback_raises_input_warning_on_kafka_exception( self, _, settings, handler ): input_config = deepcopy(self.CONFIG) @@ -141,14 +141,8 @@ def raise_generator(return_sequence): getattr(kafka_consumer, handler).side_effect = raise_generator(return_sequence) kafka_input._last_valid_records = {0: "message"} - with pytest.raises(KafkaException): + with pytest.raises(InputWarning): kafka_input.batch_finished_callback() - kafka_consumer.assign.assert_called() - getattr(kafka_consumer, handler).assert_called() - getattr(kafka_consumer, handler).assert_called_with( - message=kafka_input._last_valid_records.get(0) - ) - assert getattr(kafka_consumer, handler).call_count == 2 @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_get_next_raises_critical_input_error_if_not_a_dict(self, _): @@ -338,8 +332,20 @@ def test_assign_callback_sets_offsets_and_logs_info(self, mock_consumer): @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_revoke_callback_logs_warning_and_counts(self, mock_consumer): self.object.metrics.number_of_warnings = 0 + self.object.output_connector = mock.MagicMock() mock_partitions = [mock.MagicMock()] with mock.patch("logging.Logger.warning") as mock_warning: self.object._revoke_callback(mock_consumer, mock_partitions) mock_warning.assert_called() assert self.object.metrics.number_of_warnings == 1 + + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback( + self, mock_consumer + ): + self.object.output_connector = mock.MagicMock() + self.object.batch_finished_callback = mock.MagicMock() + mock_partitions = [mock.MagicMock()] + self.object._revoke_callback(mock_consumer, mock_partitions) + self.object.output_connector._write_backlog.assert_called() + self.object.batch_finished_callback.assert_called() diff --git a/tests/unit/connector/test_real_kafka.py b/tests/unit/connector/test_real_kafka.py index a742f2172..3f5237087 100644 --- a/tests/unit/connector/test_real_kafka.py +++ b/tests/unit/connector/test_real_kafka.py @@ -77,6 +77,7 @@ def setup_method(self): }, } self.kafka_input = Factory.create({"test input": input_config}, logger=logging.getLogger()) + self.kafka_input.output_connector = mock.MagicMock() def teardown_method(self): self.kafka_input.shut_down() diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 7592928f0..22526d4c3 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -444,6 +444,10 @@ def test_setup_connects_output_with_input(self, _): self.pipeline._setup() assert self.pipeline._output["dummy"].input_connector == self.pipeline._input + def test_setup_connects_input_with_output(self, _): + self.pipeline._setup() + assert self.pipeline._input.output_connector == self.pipeline._output["dummy"] + def test_pipeline_does_not_call_batch_finished_callback_if_output_store_does_not_return_true( self, _ ):