-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous calls to access global variables #17739
Conversation
…ween tryComplete and onComplete in DelayedShareFetch
…rom Jun on PR#17539
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the PR. Left a comment.
@@ -1602,8 +1602,6 @@ protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffset | |||
protected Optional<LogOffsetMetadata> fetchOffsetMetadata() { | |||
lock.readLock().lock(); | |||
try { | |||
if (findNextFetchOffset.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we update fetchOffsetMetadata everytime we change endOffset, I don't think that we should have dependency on findNextFetchOffset while getting the value of fetchOffsetMetadata
Hmm, the issue is that nextFetchOffset doesn't return endOffset if findNextFetchOffset is true. Currently we only reset fetchOffsetMetadata when updating the endOffset. It's possible that findNextFetchOffset stays on for multiple fetches without changing endOffset. In that case, we will set fetchOffsetMetadata for the first fetch and keep reusing it for subsequent fetches, which will be incorrect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @junrao, so shall we do this, that when the call goes to fetchOffsetMetadata()
, we check if findNextFetchOffset
is true or not, in case it is true, we do a call to nextFetchOffset()
which will correctly update the endOffset if it needs to be updated or not. Finally, we just return fetchOffsetMetadata
. Do you think it will work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @junrao, now that I think more about it, IIUC, considering the common case when all fetched data is acquirable -
- acknowledgements/acquisition lock timeout/ release of records on session close are the only places where we set
findNextFetchOffset
to true - In all the 3 scenarios mentioned above, if there is a change to the
endOffset
, we update theendOffset
(therebyfetchOffsetMetadata
is also updated automatically with our changes)
Hence, I feel that the findNextFetchOffset shouldn't be considered when dealing with the common case.
In the not common cases, when Log Start Offset is later than the fetch offset and we need to archive records, then we setfindNextFetchOffset
to True. But we have done the minBytes implementation only for the common cases right now, hence i feel the current change is correct. Please correct me if I am wrong.
cc - @apoorvmittal10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree findNextFetchOffset=true is the uncommon case. It might be useful to at least have some kind of consistent behavior for the uncommon case. Since the minByte estimation will be off anyway in this case, we could choose to consistent satisfy the request immediately or wait for the timeout. With the logic in this PR, because fetchOffsetMetadata can be outdated in this uncommon case, it's not clear when the request will be satisfied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao, so, should I do what I suggested #17739 (comment) here, that when the call goes to fetchOffsetMetadata(), we check if findNextFetchOffset
is true or not, in case it is true, we do a call to nextFetchOffset() which will correctly update the endOffset if it needs to be updated or not. Finally, we just return fetchOffsetMetadata OR for the uncommon case, I update the fetchOffsetMetadata
to Optional.empty() and remove any dependency on findNextFetchOffset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that works as well @junrao, we can have such structure in SharePartition to accomodate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I wanted to confirm whether it's necessary
Yes it's an optimization over current implemetation where we might do unnecessary processing because of released records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewJSchofield, in theory, AdminClient.alterShareGroupOffsets() can initialize to an arbitrary offset right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, indeed. Great point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi everyone, thanks for your inputs. @junrao , I think it is considerably different than the objective for this PR, hence I have created JIRA https://issues.apache.org/jira/browse/KAFKA-18022 to track this issue. I would like to address this in a new PR, if it is fine to you. Meanwhile I am reverting the code change for this code line and updating the PR description to reflect the same.
@@ -90,39 +90,50 @@ public void onExpiration() { | |||
*/ | |||
@Override | |||
public void onComplete() { | |||
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables - | |||
// partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. | |||
lock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now for share fetch trycomplete and oncomplete will be under lock. Seems fine as anyways the execution should be sequential.
if (shareFetchData.future().isDone()) | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we have this check here for share fetch future completion, so if there are locks acquired for share partitions but the share fetch future is already completed in line 101 then how will they be released? I don't think code handles that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I agree that's a super corner case scenario, but definitely possible. I have pushed a fix for it. Thanks for pointing it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, could shareFetchData.future().isDone() be true inside onComplete()? We complete the future only after DelayedOperation.completed is set to true. After that point, onComplete() is not expected to be called again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, if we have 2 different keys corresponding to a ShareFetch request, it could be a case that for one of those keys, we get a checkAndComplete
call which could result in completing the share fetch request. Now when the purgatory entry corresponding to the other key could timeout/have checkAndComplete
triggered, when the code reaches onComplete, the share fetch request's future was already complete, so it would hit shareFetchData.future().isDone()
and return true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onComplete
is always called through forceComplete
, right? So, only one thread could ever call onComplete
.
public boolean forceComplete() {
if (completed.compareAndSet(false, true)) {
// cancel the timeout timer
cancel();
onComplete();
return true;
} else {
return false;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @junrao , you're right. There was a gap in my understanding of purgatory operation where I thought the the copy of the operation goes to multiple watch keys used for that operation, but this line in documentation cleared it out.
Note that a delayed operation can be watched on multiple keys.
It is possible that an operation is completed after it has been added to the watch list for some, but not all the keys.
In this case, the operation is considered completed and won't be added to the watch list of the remaining keys.
The expiration reaper thread will remove this operation from any watcher list in which the operation exists.
Hence, I've removed the mentioned condition from the code now. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adixitconfluent : Thanks for the updated PR. LGTM. I have a minor comment and we can address that in your followup PR.
@@ -91,39 +90,47 @@ public void onExpiration() { | |||
*/ | |||
@Override | |||
public void onComplete() { | |||
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables - |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are instance variables, not global.
About
This PR addresses the following issues -
a. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
b. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
c. KAFKA-17743: Add minBytes implementation to DelayedShareFetch #17539 (comment)
Testing
Testing has been done with the help of new/already present unit tests and already present integration tests.