-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous calls to access global variables #17739
Merged
+91
−45
Merged
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
d3c02f9
Created branch
adixitconfluent c23c085
Added lock from DelayedOperation to avoid dirty read of variables bet…
adixitconfluent 090c5f9
Used linkedhashmap instead of map corresponding to pending comments f…
adixitconfluent b0f51ac
Made locking and unlocking more robust + minor refactor
adixitconfluent 8c0a69d
Removed depenedency on findNextFetchOffset while getting fetchOffsetM…
adixitconfluent 479c1a5
Trigger build
adixitconfluent 924a57d
Trigger build
adixitconfluent 3281e41
Addressed Apoorv's round 1 comments
adixitconfluent dcbbb15
Trigger build
adixitconfluent 7db5966
Addressed Jun's comment around completed future
adixitconfluent 8050817
Trigger build
adixitconfluent 9b3a514
Merge remote-tracking branch 'origin/trunk' into kafka-17948
adixitconfluent a27276a
Fixed issues due to the merge from trunk
adixitconfluent c06e6d5
Trigger build
adixitconfluent File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,11 +37,11 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.LinkedHashMap; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.stream.Collectors; | ||
|
||
import scala.Tuple2; | ||
|
@@ -58,13 +58,12 @@ public class DelayedShareFetch extends DelayedOperation { | |
|
||
private final ShareFetch shareFetch; | ||
private final ReplicaManager replicaManager; | ||
|
||
private Map<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired; | ||
private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched; | ||
private final SharePartitionManager sharePartitionManager; | ||
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions. | ||
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important. | ||
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions; | ||
private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired; | ||
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched; | ||
|
||
DelayedShareFetch( | ||
ShareFetch shareFetch, | ||
|
@@ -91,39 +90,47 @@ public void onExpiration() { | |
*/ | ||
@Override | ||
public void onComplete() { | ||
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables - | ||
// partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. | ||
lock.lock(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So now for share fetch trycomplete and oncomplete will be under lock. Seems fine as anyways the execution should be sequential. |
||
log.trace("Completing the delayed share fetch request for group {}, member {}, " | ||
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), | ||
partitionsAcquired.keySet()); | ||
|
||
if (shareFetch.isCompleted()) | ||
return; | ||
try { | ||
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData; | ||
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. | ||
if (partitionsAcquired.isEmpty()) | ||
topicPartitionData = acquirablePartitions(); | ||
// tryComplete invoked forceComplete, so we can use the data from tryComplete. | ||
else | ||
topicPartitionData = partitionsAcquired; | ||
|
||
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData; | ||
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. | ||
if (partitionsAcquired.isEmpty()) | ||
topicPartitionData = acquirablePartitions(); | ||
// tryComplete invoked forceComplete, so we can use the data from tryComplete. | ||
else | ||
topicPartitionData = partitionsAcquired; | ||
if (topicPartitionData.isEmpty()) { | ||
// No locks for share partitions could be acquired, so we complete the request with an empty response. | ||
shareFetch.maybeComplete(Collections.emptyMap()); | ||
return; | ||
} | ||
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", | ||
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); | ||
|
||
if (topicPartitionData.isEmpty()) { | ||
// No locks for share partitions could be acquired, so we complete the request with an empty response. | ||
shareFetch.maybeComplete(Collections.emptyMap()); | ||
return; | ||
completeShareFetchRequest(topicPartitionData); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", | ||
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); | ||
} | ||
|
||
private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | ||
try { | ||
Map<TopicIdPartition, LogReadResult> responseData; | ||
LinkedHashMap<TopicIdPartition, LogReadResult> responseData; | ||
if (partitionsAlreadyFetched.isEmpty()) | ||
responseData = readFromLog(topicPartitionData); | ||
else | ||
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting | ||
// updated in a different tryComplete thread. | ||
responseData = combineLogReadResponse(topicPartitionData, partitionsAlreadyFetched); | ||
|
||
Map<TopicIdPartition, FetchPartitionData> fetchPartitionsData = new LinkedHashMap<>(); | ||
LinkedHashMap<TopicIdPartition, FetchPartitionData> fetchPartitionsData = new LinkedHashMap<>(); | ||
for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet()) | ||
fetchPartitionsData.put(entry.getKey(), entry.getValue().toFetchPartitionData(false)); | ||
|
||
|
@@ -150,14 +157,14 @@ public void onComplete() { | |
*/ | ||
@Override | ||
public boolean tryComplete() { | ||
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions(); | ||
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions(); | ||
|
||
try { | ||
if (!topicPartitionData.isEmpty()) { | ||
// In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a | ||
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for | ||
// those topic partitions. | ||
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); | ||
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); | ||
maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse); | ||
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) { | ||
partitionsAcquired = topicPartitionData; | ||
|
@@ -194,9 +201,9 @@ public boolean tryComplete() { | |
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records. | ||
*/ | ||
// Visible for testing | ||
Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() { | ||
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() { | ||
// Initialize the topic partitions for which the fetch should be attempted. | ||
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); | ||
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>(); | ||
|
||
sharePartitions.forEach((topicIdPartition, sharePartition) -> { | ||
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0); | ||
|
@@ -231,23 +238,23 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() { | |
return topicPartitionData; | ||
} | ||
|
||
private Map<TopicIdPartition, LogReadResult> maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | ||
Map<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>(); | ||
private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | ||
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>(); | ||
topicPartitionData.forEach((topicIdPartition, partitionData) -> { | ||
SharePartition sharePartition = sharePartitions.get(topicIdPartition); | ||
if (sharePartition.fetchOffsetMetadata().isEmpty()) { | ||
partitionsMissingFetchOffsetMetadata.put(topicIdPartition, partitionData); | ||
} | ||
}); | ||
if (partitionsMissingFetchOffsetMetadata.isEmpty()) { | ||
return Collections.emptyMap(); | ||
return new LinkedHashMap<>(); | ||
} | ||
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata. | ||
return readFromLog(partitionsMissingFetchOffsetMetadata); | ||
} | ||
|
||
private void maybeUpdateFetchOffsetMetadata( | ||
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) { | ||
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) { | ||
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) { | ||
TopicIdPartition topicIdPartition = entry.getKey(); | ||
SharePartition sharePartition = sharePartitions.get(topicIdPartition); | ||
|
@@ -262,7 +269,7 @@ private void maybeUpdateFetchOffsetMetadata( | |
} | ||
|
||
// minByes estimation currently assumes the common case where all fetched data is acquirable. | ||
private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | ||
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | ||
long accumulatedSize = 0; | ||
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) { | ||
TopicIdPartition topicIdPartition = entry.getKey(); | ||
|
@@ -324,11 +331,11 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK) | |
|
||
} | ||
|
||
private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | ||
private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) { | ||
// Filter if there already exists any erroneous topic partition. | ||
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet()); | ||
if (partitionsToFetch.isEmpty()) { | ||
return Collections.emptyMap(); | ||
return new LinkedHashMap<>(); | ||
} | ||
|
||
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog( | ||
|
@@ -340,7 +347,7 @@ private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, F | |
QuotaFactory.UNBOUNDED_QUOTA, | ||
true); | ||
|
||
Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>(); | ||
LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new LinkedHashMap<>(); | ||
responseLogResult.foreach(tpLogResult -> { | ||
responseData.put(tpLogResult._1(), tpLogResult._2()); | ||
return BoxedUnit.UNIT; | ||
|
@@ -350,7 +357,7 @@ private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, F | |
return responseData; | ||
} | ||
|
||
private boolean anyPartitionHasLogReadError(Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse) { | ||
private boolean anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse) { | ||
return replicaManagerReadResponse.values().stream() | ||
.anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code()); | ||
} | ||
|
@@ -379,9 +386,9 @@ private void handleFetchException( | |
} | ||
|
||
// Visible for testing. | ||
Map<TopicIdPartition, LogReadResult> combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData, | ||
Map<TopicIdPartition, LogReadResult> existingFetchedData) { | ||
Map<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>(); | ||
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData, | ||
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) { | ||
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>(); | ||
topicPartitionData.forEach((topicIdPartition, partitionData) -> { | ||
if (!existingFetchedData.containsKey(topicIdPartition)) { | ||
missingLogReadTopicPartitions.put(topicIdPartition, partitionData); | ||
|
@@ -390,7 +397,7 @@ Map<TopicIdPartition, LogReadResult> combineLogReadResponse(Map<TopicIdPartition | |
if (missingLogReadTopicPartitions.isEmpty()) { | ||
return existingFetchedData; | ||
} | ||
Map<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); | ||
LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions); | ||
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData); | ||
return missingTopicPartitionsLogReadResponse; | ||
} | ||
|
@@ -402,4 +409,9 @@ void releasePartitionLocks(Set<TopicIdPartition> topicIdPartitions) { | |
sharePartition.releaseFetchLock(); | ||
}); | ||
} | ||
|
||
// Visible for testing. | ||
Lock lock() { | ||
return lock; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are instance variables, not global.