-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make kafka output store offset for successfully delivered events #516
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #516 +/- ##
==========================================
+ Coverage 91.56% 91.65% +0.09%
==========================================
Files 130 130
Lines 9496 9551 +55
==========================================
+ Hits 8695 8754 +59
+ Misses 801 797 -4 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the idea is clear for me, but the mechanic is not and I think the problem is spread accross the output AND the input.
to get the possibility to add meta fields to the event is pretty cool as we need this in the http_input_connector too.
the guarantee of delivery is not optional. There is no possibility to opt out and to have a fire and forget kafka output as before. please consider to make the whole mechanic configurable.
As implemented for now I do not think, that this will do the job and I have big doubts on performance of this solution. please have a look on my remarks.
for meta_field in ("last_partition", "last_offset"): | ||
try: | ||
del event_dict["_metadata"][meta_field] | ||
except (TypeError, KeyError): | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for meta_field in ("last_partition", "last_offset"): | |
try: | |
del event_dict["_metadata"][meta_field] | |
except (TypeError, KeyError): | |
pass | |
metadata.pop("last_partition", None) | |
metadata.pop("last_offset", None) |
easier to read. (you have to adjust the tests, because you give string data where a dict is expected)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed it, but I kept try except to check in case _metadata is not a dict, since this field might already exist in the event and be of any type. I did not check for dict directly, since it is more likely to not happen and more performant this way.
if metadata is None: | ||
raise FatalInputError(self, "Metadata for setting offsets can't be 'None'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if metadata is None: | |
raise FatalInputError(self, "Metadata for setting offsets can't be 'None'") |
if it can't be None
, we should ensure it is not None to reflect the non optional type hint. But this seems to be the wrong place to check this. We should check this earlier to fail faster.
My suggestion is to set metadata to an empty dict in batch_finished_callback
if it is None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I changed it to be set in batch_finished_callback
.
Should be called by output connectors if they are finished processing a batch of records. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be called by output connectors if they are finished processing a batch of records. | |
""" | |
Should be called by output connectors if they are finished processing a batch of records. | |
""" | |
metadata = {} if metadata is None else metadata |
so we ensure it can't be 'None' in further processing. Please adjust the type hints accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I've added that suggestion.
|
||
@Metric.measure_time() | ||
def _write_backlog(self): | ||
self._producer.flush(self._config.flush_timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the flush_timeout
in opensearch and elasticsearch is the time to guarantee message delivery. so this confuses me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush_timeout
was already used for flush
in shut_down
and in case of a BufferError
.
flush
does internally call poll
, until the internal buffer is empy, ensuring that all messages get sent.
We could rename it, but calling it flush_timeout
for the flush
method makes sense in my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it makes sense. but the other option is to rename the parameters in elasticsearch and opensearch parameters so that the term "flush_timeout" means globaly the same in logprep. I would prefer, to change it here and now to get rid of this inconsistency and to not raise another pull_request to change this anywhere else
if error: | ||
raise FatalOutputError(output=self, message=error) | ||
self.metrics.number_of_successfully_delivered_events += 1 | ||
self.input_connector.batch_finished_callback(metadata=partition_offset) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as I understand batch_finished_callback
is called on every successful delivered message, right?
if so, this would decrease performance drastically, because on every successful delivery the GIL is on this method.
but the called method is named BATCH_finished_callback
but now it is called on every single message?
consider using the equivalent mechanic as in the opensearch_output
. Write all successful deliveries in a list. then if the list (you should use a deque for this) is full, get the last committable offset for all the partitions and call the batch_finished_callback
with it.
here it is possible that you commit for messages that are not delivered.
example:
kafka topic partition offsets:
current: 0 committed: 0
you consume messages
current: 1 committed: 0
you consume further messages
current 2 committed: 0
you deliver message 2 and callback is called
current 2 committed 2
what is with the first message that was not delivered?
kafka thinks it is delivered now, but it is actual not
name="number_of_successfully_delivered_events", | ||
) | ||
) | ||
"""Number of events that were successfully delivered to Kafka""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes ok it is not your issue now, but could you please add the documentation for the other kafka_output config parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added some documentation now.
No description provided.