Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17948: Potential issue during tryComplete and onComplete simultaneous calls to access global variables #17739

Merged
merged 14 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 50 additions & 38 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;

import scala.Tuple2;
Expand All @@ -58,13 +58,12 @@ public class DelayedShareFetch extends DelayedOperation {

private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;

private Map<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
private Map<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;
private final SharePartitionManager sharePartitionManager;
// The topic partitions that need to be completed for the share fetch request are given by sharePartitions.
// sharePartitions is a subset of shareFetchData. The order of insertion/deletion of entries in sharePartitions is important.
private final LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions;
private LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsAcquired;
private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched;

DelayedShareFetch(
ShareFetch shareFetch,
Expand All @@ -91,39 +90,47 @@ public void onExpiration() {
*/
@Override
public void onComplete() {
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables -
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are instance variables, not global.

// partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread.
lock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now for share fetch trycomplete and oncomplete will be under lock. Seems fine as anyways the execution should be sequential.

log.trace("Completing the delayed share fetch request for group {}, member {}, "
+ "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(),
partitionsAcquired.keySet());

if (shareFetch.isCompleted())
return;
try {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
else
topicPartitionData = partitionsAcquired;

Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData;
// tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch.
if (partitionsAcquired.isEmpty())
topicPartitionData = acquirablePartitions();
// tryComplete invoked forceComplete, so we can use the data from tryComplete.
else
topicPartitionData = partitionsAcquired;
if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetch.maybeComplete(Collections.emptyMap());
return;
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams());

if (topicPartitionData.isEmpty()) {
// No locks for share partitions could be acquired, so we complete the request with an empty response.
shareFetch.maybeComplete(Collections.emptyMap());
return;
completeShareFetchRequest(topicPartitionData);
} finally {
lock.unlock();
}
log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}",
topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams());
}

private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
try {
Map<TopicIdPartition, LogReadResult> responseData;
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (partitionsAlreadyFetched.isEmpty())
responseData = readFromLog(topicPartitionData);
else
// There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting
// updated in a different tryComplete thread.
responseData = combineLogReadResponse(topicPartitionData, partitionsAlreadyFetched);

Map<TopicIdPartition, FetchPartitionData> fetchPartitionsData = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, FetchPartitionData> fetchPartitionsData = new LinkedHashMap<>();
for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet())
fetchPartitionsData.put(entry.getKey(), entry.getValue().toFetchPartitionData(false));

Expand All @@ -150,14 +157,14 @@ public void onComplete() {
*/
@Override
public boolean tryComplete() {
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = acquirablePartitions();

try {
if (!topicPartitionData.isEmpty()) {
// In case, fetch offset metadata doesn't exist for one or more topic partitions, we do a
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) {
partitionsAcquired = topicPartitionData;
Expand Down Expand Up @@ -194,9 +201,9 @@ public boolean tryComplete() {
* Prepare fetch request structure for partitions in the share fetch request for which we can acquire records.
*/
// Visible for testing
Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
// Initialize the topic partitions for which the fetch should be attempted.
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();

sharePartitions.forEach((topicIdPartition, sharePartition) -> {
int partitionMaxBytes = shareFetch.partitionMaxBytes().getOrDefault(topicIdPartition, 0);
Expand Down Expand Up @@ -231,23 +238,23 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
return topicPartitionData;
}

private Map<TopicIdPartition, LogReadResult> maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
Map<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata().isEmpty()) {
partitionsMissingFetchOffsetMetadata.put(topicIdPartition, partitionData);
}
});
if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
return Collections.emptyMap();
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsMissingFetchOffsetMetadata);
}

private void maybeUpdateFetchOffsetMetadata(
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
Expand All @@ -262,7 +269,7 @@ private void maybeUpdateFetchOffsetMetadata(
}

// minByes estimation currently assumes the common case where all fetched data is acquirable.
private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
long accumulatedSize = 0;
for (Map.Entry<TopicIdPartition, FetchRequest.PartitionData> entry : topicPartitionData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
Expand Down Expand Up @@ -324,11 +331,11 @@ else if (isolationType == FetchIsolation.HIGH_WATERMARK)

}

private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
private LinkedHashMap<TopicIdPartition, LogReadResult> readFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch = shareFetch.filterErroneousTopicPartitions(topicPartitionData.keySet());
if (partitionsToFetch.isEmpty()) {
return Collections.emptyMap();
return new LinkedHashMap<>();
}

Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
Expand All @@ -340,7 +347,7 @@ private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, F
QuotaFactory.UNBOUNDED_QUOTA,
true);

Map<TopicIdPartition, LogReadResult> responseData = new HashMap<>();
LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new LinkedHashMap<>();
responseLogResult.foreach(tpLogResult -> {
responseData.put(tpLogResult._1(), tpLogResult._2());
return BoxedUnit.UNIT;
Expand All @@ -350,7 +357,7 @@ private Map<TopicIdPartition, LogReadResult> readFromLog(Map<TopicIdPartition, F
return responseData;
}

private boolean anyPartitionHasLogReadError(Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse) {
private boolean anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse) {
return replicaManagerReadResponse.values().stream()
.anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code());
}
Expand Down Expand Up @@ -379,9 +386,9 @@ private void handleFetchException(
}

// Visible for testing.
Map<TopicIdPartition, LogReadResult> combineLogReadResponse(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
Map<TopicIdPartition, LogReadResult> existingFetchedData) {
Map<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, LogReadResult> combineLogReadResponse(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> existingFetchedData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
missingLogReadTopicPartitions.put(topicIdPartition, partitionData);
Expand All @@ -390,7 +397,7 @@ Map<TopicIdPartition, LogReadResult> combineLogReadResponse(Map<TopicIdPartition
if (missingLogReadTopicPartitions.isEmpty()) {
return existingFetchedData;
}
Map<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);
LinkedHashMap<TopicIdPartition, LogReadResult> missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
Expand All @@ -402,4 +409,9 @@ void releasePartitionLocks(Set<TopicIdPartition> topicIdPartitions) {
sharePartition.releaseFetchLock();
});
}

// Visible for testing.
Lock lock() {
return lock;
}
}
Loading