Skip to content

Commit

Permalink
Adapt s3 connector for kafka fix (#499)
Browse files Browse the repository at this point in the history
* Make s3 connector use max_retries parameter
* Make number_of_warnings metric work in s3 connector
* Add number_of_successful_writes metric to s3 connector
* Make s3 connector support _write_backlog and refactor code
* Update changelog
* Optimize checking if s3 should be written and add tests
* Make s3 connector blocking and make it raise FatalOutputError instead of warnings

---------

Co-authored-by: Jörg Zimmermann <[email protected]>
  • Loading branch information
ppcad and ekneg54 authored Jan 8, 2024
1 parent cce0db5 commit 74623ce
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 60 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@

### Features


* add a `number_of_successful_writes` metric to the s3 connector, which counts how many events were successfully written to s3
* make the s3 connector work with the new `_write_backlog` method introduced by the `confluent_kafka` commit bugfix in v9.0.0
* add option to Opensearch Output Connector to use parallel bulk implementation (default is True)


### Improvements

* make the s3 connector raise `FatalOutputError` instead of warnings
* make the s3 connector blocking by removing threading

### Bugfix

* make the s3 connector actually use the `max_retries` parameter

## v9.0.3
### Breaking
Expand Down
94 changes: 49 additions & 45 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
This section contains the connection settings for the AWS s3 output connector.
This connector is non-blocking and may skip sending data if previous data has not finished sending.
It doesn't crash if a connection couldn't be established, but sends a warning.
The target bucket is defined by the :code:`bucket` configuration parameter.
The prefix is defined by the value in the field :code:`prefix_field` in the document.
Expand Down Expand Up @@ -42,9 +39,7 @@
"""
import json
import re
import threading
from collections import defaultdict
from copy import deepcopy
from functools import cached_property
from logging import Logger
from time import time
Expand All @@ -62,8 +57,8 @@
EndpointConnectionError,
)

from logprep.abc.output import Output
from logprep.metrics.metrics import Metric
from logprep.abc.output import Output, FatalOutputError
from logprep.metrics.metrics import Metric, CounterMetric
from logprep.util.helper import get_dotted_field_value
from logprep.util.time import TimeParser

Expand Down Expand Up @@ -117,13 +112,21 @@ class Config(Output.Config):
"""The input callback is called after the maximum backlog size has been reached
if this is set to True (optional)"""

__slots__ = ["_message_backlog", "_current_backlog_count", "_index_cache"]
@define(kw_only=True)
class Metrics(Output.Metrics):
"""Tracks statistics about this output"""

_message_backlog: DefaultDict
number_of_successful_writes: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of events that were successfully written to s3",
name="number_of_successful_writes",
)
)
"""Number of events that were successfully written to s3"""

_current_backlog_count: int
__slots__ = ["_message_backlog", "_index_cache"]

_writing_thread: Optional[threading.Thread]
_message_backlog: DefaultDict

_s3_resource: Optional["boto3.resources.factory.s3.ServiceResource"]

Expand All @@ -134,7 +137,6 @@ class Config(Output.Config):
def __init__(self, name: str, configuration: "S3Output.Config", logger: Logger):
super().__init__(name, configuration, logger)
self._message_backlog = defaultdict(list)
self._current_backlog_count = 0
self._writing_thread = None
self._base_prefix = f"{self._config.base_prefix}/" if self._config.base_prefix else ""
self._s3_resource = None
Expand All @@ -147,7 +149,8 @@ def _setup_s3_resource(self):
region_name=self._config.region_name,
)
config = boto3.session.Config(
connect_timeout=self._config.connect_timeout, retries={"max_attempts": 0}
connect_timeout=self._config.connect_timeout,
retries={"max_attempts": self._config.max_retries},
)
self._s3_resource = session.resource(
"s3",
Expand All @@ -162,6 +165,10 @@ def s3_resource(self):
"""Return s3 resource"""
return self._s3_resource

@property
def _backlog_size(self):
return sum(map(len, self._message_backlog.values()))

@cached_property
def _replace_pattern(self):
return re.compile(r"%{\S+?}")
Expand Down Expand Up @@ -195,52 +202,51 @@ def _write_to_s3_resource(self, document: dict, prefix: str):
----------
document : dict
Document to store.
Returns
-------
Returns True to inform the pipeline to call the batch_finished_callback method in the
configured input
"""
prefix = self._add_dates(prefix)
prefix = f"{self._base_prefix}{prefix}"
self._message_backlog[prefix].append(document)

backlog_count = self._current_backlog_count + 1
if backlog_count == self._config.message_backlog_size:
if self._writing_thread is None or not self._writing_thread.is_alive():
message_backlog = deepcopy(self._message_backlog)
self._writing_thread = threading.Thread(
target=self._write_document_batches, args=(message_backlog,)
)
self._writing_thread.start()
return True
self._current_backlog_count = backlog_count
return False

def _write_document_batches(self, message_backlog):
self._logger.info(f"Writing {self._current_backlog_count + 1} documents to s3")
for prefix_mb, document_batch in message_backlog.items():
if self._backlog_size >= self._config.message_backlog_size:
self._write_backlog()

def _write_backlog(self):
"""Write to s3 if it is not already writing."""
if not self._message_backlog:
return

self._bulk()

def _bulk(self):
self._logger.info("Writing %s documents to s3", self._backlog_size)
for prefix_mb, document_batch in self._message_backlog.items():
self._write_document_batch(document_batch, f"{prefix_mb}/{time()}-{uuid4()}")
self._message_backlog.clear()
self._current_backlog_count = 0

if not self._config.call_input_callback:
return

if self.input_connector and hasattr(self.input_connector, "batch_finished_callback"):
self.input_connector.batch_finished_callback()

def _write_document_batch(self, document_batch: dict, identifier: str):
try:
self._write_to_s3(document_batch, identifier)
except EndpointConnectionError:
self._logger.warning(f"{self.describe()}: Could not connect to the endpoint URL")
except ConnectionClosedError:
self._logger.warning(
f"{self.describe()}: "
f"Connection was closed before we received a valid response from endpoint URL"
)
except EndpointConnectionError as error:
raise FatalOutputError(self, "Could not connect to the endpoint URL") from error
except ConnectionClosedError as error:
raise FatalOutputError(
self,
"Connection was closed before we received a valid response from endpoint URL",
) from error
except (BotoCoreError, ClientError) as error:
self._logger.warning(f"{self.describe()}: {error}")
raise FatalOutputError(self, str(error)) from error

def _write_to_s3(self, document_batch: dict, identifier: str):
self._logger.debug(f'Writing "{identifier}" to s3 bucket "{self._config.bucket}"')
s3_obj = self.s3_resource.Object(self._config.bucket, identifier)
s3_obj.put(Body=self._encoder.encode(document_batch), ContentType="application/json")
self.metrics.number_of_successful_writes += len(document_batch)

def store(self, document: dict):
"""Store a document into s3 bucket.
Expand All @@ -259,9 +265,7 @@ def store(self, document: dict):
)
prefix_value = self._config.default_prefix

batch_finished = self._write_to_s3_resource(document, prefix_value)
if self._config.call_input_callback and batch_finished and self.input_connector:
self.input_connector.batch_finished_callback()
self._write_to_s3_resource(document, prefix_value)

@staticmethod
def _build_no_prefix_document(message_document: dict, reason: str):
Expand Down
64 changes: 49 additions & 15 deletions tests/unit/connector/test_s3_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@
# pylint: disable=wrong-import-order
# pylint: disable=attribute-defined-outside-init
import logging
import re
from copy import deepcopy
from datetime import datetime
from math import isclose
from time import sleep
from unittest import mock

import pytest
Expand All @@ -19,6 +17,7 @@
EndpointConnectionError,
)

from logprep.abc.output import FatalOutputError
from logprep.factory import Factory
from logprep.util.time import TimeParser
from tests.unit.connector.base import BaseOutputTestCase
Expand All @@ -41,6 +40,15 @@ class TestS3Output(BaseOutputTestCase):
"message_backlog_size": 1,
}

expected_metrics = [
"logprep_processing_time_per_event",
"logprep_number_of_processed_events",
"logprep_number_of_failed_events",
"logprep_number_of_warnings",
"logprep_number_of_errors",
"logprep_number_of_successful_writes",
]

def test_describe_returns_s3_output(self):
assert (
self.object.describe() == "S3Output (Test Instance Name) - S3 Output: http://host:123"
Expand Down Expand Up @@ -173,18 +181,18 @@ def test_write_document_batch_calls_handles_errors(self, caplog, error, message)
"logprep.connector.s3.output.S3Output._write_to_s3",
side_effect=error,
):
self.object._write_document_batch({"dummy": "event"}, "dummy_identifier")
assert re.match(message, caplog.text)
with pytest.raises(FatalOutputError, match=message):
self.object._write_document_batch({"dummy": "event"}, "dummy_identifier")

def test_write_to_s3_resource_sets_current_backlog_count_and_below_max_backlog(self):
s3_config = deepcopy(self.CONFIG)
message_backlog_size = 5
s3_config.update({"message_backlog_size": message_backlog_size})
s3_output = Factory.create({"s3": s3_config}, self.logger)
assert s3_output._current_backlog_count == 0
assert self._calculate_backlog_size(s3_output) == 0
for idx in range(1, message_backlog_size):
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
assert s3_output._current_backlog_count == idx
assert self._calculate_backlog_size(s3_output) == idx

def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self):
s3_config = deepcopy(self.CONFIG)
Expand All @@ -198,36 +206,39 @@ def test_write_to_s3_resource_sets_current_backlog_count_and_is_max_backlog(self
# Backlog not full
for idx in range(message_backlog_size - 1):
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
sleep(0.1) # nosemgrep
assert s3_output._current_backlog_count == idx + 1
self._wait_for_writing_thread(s3_output)
assert self._calculate_backlog_size(s3_output) == idx + 1
s3_output._write_document_batch.assert_not_called()

# Backlog full then cleared
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
sleep(0.1) # nosemgrep
self._wait_for_writing_thread(s3_output)
s3_output._write_document_batch.assert_called_once()
assert s3_output._current_backlog_count == 0
assert self._calculate_backlog_size(s3_output) == 0

# Backlog not full
for idx in range(message_backlog_size - 1):
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
sleep(0.1) # nosemgrep
assert s3_output._current_backlog_count == idx + 1
self._wait_for_writing_thread(s3_output)
assert self._calculate_backlog_size(s3_output) == idx + 1
s3_output._write_document_batch.assert_called_once()

# Backlog full then cleared
s3_output._write_to_s3_resource({"dummy": "event"}, "write_to_s3")
sleep(0.1) # nosemgrep
self._wait_for_writing_thread(s3_output)
assert s3_output._write_document_batch.call_count == 2
assert s3_output._current_backlog_count == 0
assert self._calculate_backlog_size(s3_output) == 0

def test_store_counts_processed_events(self):
self.object._s3_resource = mock.MagicMock()
super().test_store_counts_processed_events()

def test_store_calls_batch_finished_callback(self):
self.object._s3_resource = mock.MagicMock()
super().test_store_calls_batch_finished_callback()
self.object.input_connector = mock.MagicMock()
self.object.store({"message": "my event message"})
self._wait_for_writing_thread(self.object)
self.object.input_connector.batch_finished_callback.assert_called()

def test_store_does_not_call_batch_finished_callback_if_disabled(self):
s3_config = deepcopy(self.CONFIG)
Expand All @@ -240,7 +251,30 @@ def test_store_does_not_call_batch_finished_callback_if_disabled(self):

def test_write_to_s3_resource_replaces_dates(self):
expected_prefix = f'base_prefix/prefix-{TimeParser.now().strftime("%y:%m:%d")}'
self.object._write_backlog = mock.MagicMock()
self.object._write_to_s3_resource({"foo": "bar"}, "base_prefix/prefix-%{%y:%m:%d}")
resulting_prefix = next(iter(self.object._message_backlog.keys()))

assert expected_prefix == resulting_prefix

def test_message_backlog_is_not_written_if_message_backlog_size_not_reached(self):
self.object._config.message_backlog_size = 2
assert len(self.object._message_backlog) == 0
with mock.patch(
"logprep.connector.s3.output.S3Output._write_backlog"
) as mock_write_backlog:
self.object.store({"test": "event"})
mock_write_backlog.assert_not_called()

def test_store_failed_counts_failed_events(self):
self.object._write_backlog = mock.MagicMock()
super().test_store_failed_counts_failed_events()

@staticmethod
def _wait_for_writing_thread(s3_output):
if s3_output._writing_thread is not None:
s3_output._writing_thread.join()

@staticmethod
def _calculate_backlog_size(s3_output):
return sum(len(values) for values in s3_output._message_backlog.values())

0 comments on commit 74623ce

Please sign in to comment.