Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous calls to access global variables #17739

Merged
merged 14 commits into from
Nov 15, 2024

Conversation

adixitconfluent
Copy link
Contributor

@adixitconfluent adixitconfluent commented Nov 10, 2024

About

This PR addresses the following issues -

  1. KAFKA-17984: Potential issue during tryComplete and onComplete simultaneous calls to access global variables
  2. Pending minor comments on AK PR KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 -
    a. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
    b. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
    c. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)

Testing

Testing has been done with the help of new/already present unit tests and already present integration tests.

@github-actions github-actions bot added core Kafka Broker KIP-932 Queues for Kafka labels Nov 10, 2024
@adixitconfluent adixitconfluent marked this pull request as ready for review November 10, 2024 14:41
@adixitconfluent adixitconfluent marked this pull request as draft November 10, 2024 17:30
@adixitconfluent adixitconfluent marked this pull request as ready for review November 10, 2024 18:24
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the PR. Left a comment.

@@ -1602,8 +1602,6 @@ protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffset
protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
lock.readLock().lock();
try {
if (findNextFetchOffset.get())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we update fetchOffsetMetadata everytime we change endOffset, I don't think that we should have dependency on findNextFetchOffset while getting the value of fetchOffsetMetadata

Hmm, the issue is that nextFetchOffset doesn't return endOffset if findNextFetchOffset is true. Currently we only reset fetchOffsetMetadata when updating the endOffset. It's possible that findNextFetchOffset stays on for multiple fetches without changing endOffset. In that case, we will set fetchOffsetMetadata for the first fetch and keep reusing it for subsequent fetches, which will be incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao, so shall we do this, that when the call goes to fetchOffsetMetadata(), we check if findNextFetchOffset is true or not, in case it is true, we do a call to nextFetchOffset() which will correctly update the endOffset if it needs to be updated or not. Finally, we just return fetchOffsetMetadata. Do you think it will work?

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @junrao, now that I think more about it, IIUC, considering the common case when all fetched data is acquirable -

  1. acknowledgements/acquisition lock timeout/ release of records on session close are the only places where we set findNextFetchOffset to true
  2. In all the 3 scenarios mentioned above, if there is a change to the endOffset, we update the endOffset (thereby fetchOffsetMetadata is also updated automatically with our changes)
    Hence, I feel that the findNextFetchOffset shouldn't be considered when dealing with the common case.
    In the not common cases, when Log Start Offset is later than the fetch offset and we need to archive records, then we set findNextFetchOffset to True. But we have done the minBytes implementation only for the common cases right now, hence i feel the current change is correct. Please correct me if I am wrong.
    cc - @apoorvmittal10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree findNextFetchOffset=true is the uncommon case. It might be useful to at least have some kind of consistent behavior for the uncommon case. Since the minByte estimation will be off anyway in this case, we could choose to consistent satisfy the request immediately or wait for the timeout. With the logic in this PR, because fetchOffsetMetadata can be outdated in this uncommon case, it's not clear when the request will be satisfied.

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao, so, should I do what I suggested #17739 (comment) here, that when the call goes to fetchOffsetMetadata(), we check if findNextFetchOffset is true or not, in case it is true, we do a call to nextFetchOffset() which will correctly update the endOffset if it needs to be updated or not. Finally, we just return fetchOffsetMetadata OR for the uncommon case, I update the fetchOffsetMetadata to Optional.empty() and remove any dependency on findNextFetchOffset

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that works as well @junrao, we can have such structure in SharePartition to accomodate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I wanted to confirm whether it's necessary

Yes it's an optimization over current implemetation where we might do unnecessary processing because of released records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield, in theory, AdminClient.alterShareGroupOffsets() can initialize to an arbitrary offset right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, indeed. Great point.

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi everyone, thanks for your inputs. @junrao , I think it is considerably different than the objective for this PR, hence I have created JIRA https://issues.apache.org/jira/browse/KAFKA-18022 to track this issue. I would like to address this in a new PR, if it is fine to you. Meanwhile I am reverting the code change for this code line and updating the PR description to reflect the same.

@@ -90,39 +90,50 @@ public void onExpiration() {
*/
@Override
public void onComplete() {
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables -
// partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread.
lock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now for share fetch trycomplete and oncomplete will be under lock. Seems fine as anyways the execution should be sequential.

Comment on lines 101 to 102
if (shareFetchData.future().isDone())
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have this check here for share fetch future completion, so if there are locks acquired for share partitions but the share fetch future is already completed in line 101 then how will they be released? I don't think code handles that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I agree that's a super corner case scenario, but definitely possible. I have pushed a fix for it. Thanks for pointing it out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, could shareFetchData.future().isDone() be true inside onComplete()? We complete the future only after DelayedOperation.completed is set to true. After that point, onComplete() is not expected to be called again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, if we have 2 different keys corresponding to a ShareFetch request, it could be a case that for one of those keys, we get a checkAndComplete call which could result in completing the share fetch request. Now when the purgatory entry corresponding to the other key could timeout/have checkAndComplete triggered, when the code reaches onComplete, the share fetch request's future was already complete, so it would hit shareFetchData.future().isDone() and return true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onComplete is always called through forceComplete, right? So, only one thread could ever call onComplete.

    public boolean forceComplete() {
        if (completed.compareAndSet(false, true)) {
            // cancel the timeout timer
            cancel();
            onComplete();
            return true;
        } else {
            return false;
        }
    }

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @junrao , you're right. There was a gap in my understanding of purgatory operation where I thought the the copy of the operation goes to multiple watch keys used for that operation, but this line in documentation cleared it out.

Note that a delayed operation can be watched on multiple keys. 
It is possible that an operation is completed after it has been added to the watch list for some, but not all the keys. 
In this case, the operation is considered completed and won't be added to the watch list of the remaining keys. 
The expiration reaper thread will remove this operation from any watcher list in which the operation exists.

Hence, I've removed the mentioned condition from the code now. Thanks!

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adixitconfluent : Thanks for the updated PR. LGTM. I have a minor comment and we can address that in your followup PR.

@@ -91,39 +90,47 @@ public void onExpiration() {
*/
@Override
public void onComplete() {
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables -
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are instance variables, not global.

@junrao junrao merged commit 77cc8ff into apache:trunk Nov 15, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants