Skip to content

Commit

Permalink
Fix kafka commit after rebalancing issue (#486)
Browse files Browse the repository at this point in the history
connect output to input to ensure message backlog of opensearch and elasticsearch is committed on rebalancing

* add test to connect input with output connector

---------

Co-authored-by: ekneg54 <[email protected]>
  • Loading branch information
dtrai2 and ekneg54 authored Dec 6, 2023
1 parent 199200c commit 6a35f99
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
### Bugfix

* fix the rule tree parsing some rules incorrectly, potentially resulting in more matches
* fix `confluent_kafka` commit issue after kafka did some rebalancing, fixes also negative offsets

## v8.0.0
### Breaking
Expand Down
8 changes: 6 additions & 2 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from abc import abstractmethod
from functools import partial
from hmac import HMAC
from typing import Optional, Tuple
from typing import TYPE_CHECKING, Optional, Tuple

from attrs import define, field, validators

Expand All @@ -19,6 +19,9 @@
from logprep.util.time import UTC, TimeParser
from logprep.util.validators import dict_structure_validator

if TYPE_CHECKING: # pragma: no cover
from logprep.abc.output import Output


class InputError(Exception):
"""Base class for Input related exceptions."""
Expand Down Expand Up @@ -180,7 +183,8 @@ class Config(Connector.Config):
)

pipeline_index: int
__slots__ = ["pipeline_index"]
output_connector: Optional["Output"]
__slots__ = ["pipeline_index", "output_connector"]

@property
def _add_hmac(self):
Expand Down
15 changes: 3 additions & 12 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,18 +446,7 @@ def _handle_offsets(self, offset_handler: Callable) -> None:
try:
offset_handler(message=message)
except KafkaException as error:
topic = self._consumer.list_topics(topic=self._config.topic)
partition_keys = list(topic.topics[self._config.topic].partitions.keys())
partitions = [
TopicPartition(self._config.topic, partition) for partition in partition_keys
]
self._consumer.assign(partitions)
self._logger.warning(
f"{self._consumer.memberid()} was assigned to "
f"topic: {topic} | partition {partitions}, due to "
f"KafkaException: {error}"
)
offset_handler(message=message)
raise InputWarning(self, f"{error}, {message}") from error

def _assign_callback(self, consumer, topic_partitions):
for topic_partition in topic_partitions:
Expand All @@ -482,6 +471,8 @@ def _revoke_callback(self, consumer, topic_partitions):
f"topic: {topic_partition.topic} | "
f"partition {topic_partition.partition}"
)
self.output_connector._write_backlog()
self.batch_finished_callback()

def _lost_callback(self, consumer, topic_partitions):
for topic_partition in topic_partitions:
Expand Down
2 changes: 2 additions & 0 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ def _setup(self):
self.logger.debug("Creating connectors")
for _, output in self._output.items():
output.input_connector = self._input
if output.default:
self._input.output_connector = output
self.logger.debug(
f"Created connectors -> input: '{self._input.describe()}',"
f" output -> '{[output.describe() for _, output in self._output.items()]}'"
Expand Down
22 changes: 14 additions & 8 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, sett
],
)
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_reassigns_partition_and_calls_again_on_kafka_exception(
def test_batch_finished_callback_raises_input_warning_on_kafka_exception(
self, _, settings, handler
):
input_config = deepcopy(self.CONFIG)
Expand All @@ -141,14 +141,8 @@ def raise_generator(return_sequence):

getattr(kafka_consumer, handler).side_effect = raise_generator(return_sequence)
kafka_input._last_valid_records = {0: "message"}
with pytest.raises(KafkaException):
with pytest.raises(InputWarning):
kafka_input.batch_finished_callback()
kafka_consumer.assign.assert_called()
getattr(kafka_consumer, handler).assert_called()
getattr(kafka_consumer, handler).assert_called_with(
message=kafka_input._last_valid_records.get(0)
)
assert getattr(kafka_consumer, handler).call_count == 2

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_get_next_raises_critical_input_error_if_not_a_dict(self, _):
Expand Down Expand Up @@ -338,8 +332,20 @@ def test_assign_callback_sets_offsets_and_logs_info(self, mock_consumer):
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_revoke_callback_logs_warning_and_counts(self, mock_consumer):
self.object.metrics.number_of_warnings = 0
self.object.output_connector = mock.MagicMock()
mock_partitions = [mock.MagicMock()]
with mock.patch("logging.Logger.warning") as mock_warning:
self.object._revoke_callback(mock_consumer, mock_partitions)
mock_warning.assert_called()
assert self.object.metrics.number_of_warnings == 1

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_revoke_callback_writes_output_backlog_and_calls_batch_finished_callback(
self, mock_consumer
):
self.object.output_connector = mock.MagicMock()
self.object.batch_finished_callback = mock.MagicMock()
mock_partitions = [mock.MagicMock()]
self.object._revoke_callback(mock_consumer, mock_partitions)
self.object.output_connector._write_backlog.assert_called()
self.object.batch_finished_callback.assert_called()
1 change: 1 addition & 0 deletions tests/unit/connector/test_real_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def setup_method(self):
},
}
self.kafka_input = Factory.create({"test input": input_config}, logger=logging.getLogger())
self.kafka_input.output_connector = mock.MagicMock()

def teardown_method(self):
self.kafka_input.shut_down()
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ def test_setup_connects_output_with_input(self, _):
self.pipeline._setup()
assert self.pipeline._output["dummy"].input_connector == self.pipeline._input

def test_setup_connects_input_with_output(self, _):
self.pipeline._setup()
assert self.pipeline._input.output_connector == self.pipeline._output["dummy"]

def test_pipeline_does_not_call_batch_finished_callback_if_output_store_does_not_return_true(
self, _
):
Expand Down

0 comments on commit 6a35f99

Please sign in to comment.