replicaManagerReadResponse) {
return replicaManagerReadResponse.values().stream()
.anyMatch(logReadResult -> logReadResult.error().code() != Errors.NONE.code());
}
+ /**
+ * The handleFetchException method is used to handle the exception that occurred while reading from log.
+ * The method will handle the exception for each topic-partition in the request. The share partition
+ * might get removed from the cache.
+ *
+ * The replica read request might error out for one share partition
+ * but as we cannot determine which share partition errored out, we might remove all the share partitions
+ * in the request.
+ *
+ * @param shareFetch The share fetch request.
+ * @param topicIdPartitions The topic-partitions in the replica read request.
+ * @param throwable The exception that occurred while fetching messages.
+ */
+ private void handleFetchException(
+ ShareFetch shareFetch,
+ Set topicIdPartitions,
+ Throwable throwable
+ ) {
+ topicIdPartitions.forEach(topicIdPartition -> sharePartitionManager.handleFencedSharePartitionException(
+ new SharePartitionKey(shareFetch.groupId(), topicIdPartition), throwable));
+ shareFetch.maybeCompleteWithException(topicIdPartitions, throwable);
+ }
+
// Visible for testing.
- Map combineLogReadResponse(Map topicPartitionData,
- Map existingFetchedData) {
- Map missingLogReadTopicPartitions = new LinkedHashMap<>();
+ LinkedHashMap combineLogReadResponse(LinkedHashMap topicPartitionData,
+ LinkedHashMap existingFetchedData) {
+ LinkedHashMap missingLogReadTopicPartitions = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
if (!existingFetchedData.containsKey(topicIdPartition)) {
missingLogReadTopicPartitions.put(topicIdPartition, partitionData);
@@ -351,7 +397,7 @@ Map combineLogReadResponse(Map missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);
+ LinkedHashMap missingTopicPartitionsLogReadResponse = readFromLog(missingLogReadTopicPartitions);
missingTopicPartitionsLogReadResponse.putAll(existingFetchedData);
return missingTopicPartitionsLogReadResponse;
}
@@ -363,4 +409,9 @@ void releasePartitionLocks(Set topicIdPartitions) {
sharePartition.releaseFetchLock();
});
}
+
+ // Visible for testing.
+ Lock lock() {
+ return lock;
+ }
}
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index 3515362152b0..e3608128eb5f 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -29,7 +29,7 @@
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class ShareFetchUtils {
* by acquiring records from the share partition.
*/
static Map processFetchResponse(
- ShareFetchData shareFetchData,
+ ShareFetch shareFetch,
Map responseData,
LinkedHashMap sharePartitions,
ReplicaManager replicaManager
@@ -91,7 +91,7 @@ static Map processFetchR
partitionData.setErrorMessage(Errors.NONE.message());
}
} else {
- ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetchData.memberId(), shareFetchData.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
+ ShareAcquiredRecords shareAcquiredRecords = sharePartition.acquire(shareFetch.memberId(), shareFetch.maxFetchRecords() - acquiredRecordsCount, fetchPartitionData);
log.trace("Acquired records: {} for topicIdPartition: {}", shareAcquiredRecords, topicIdPartition);
// Maybe, in the future, check if no records are acquired, and we want to retry
// replica manager fetch. Depends on the share partition manager implementation,
@@ -151,11 +151,15 @@ static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, ReplicaM
}
static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
+ return partition(replicaManager, tp).getLeaderEpoch();
+ }
+
+ static Partition partition(ReplicaManager replicaManager, TopicPartition tp) {
Partition partition = replicaManager.getPartitionOrException(tp);
if (!partition.isLeader()) {
log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition());
throw new NotLeaderOrFollowerException();
}
- return partition.getLeaderEpoch();
+ return partition;
}
}
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java
index 71baea101744..632cb1e31691 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1082,8 +1082,8 @@ boolean canAcquireRecords() {
/**
* Prior to fetching records from the leader, the fetch lock is acquired to ensure that the same
- * share partition does not enter a fetch queue while another one is being fetched within the queue.
- * The fetch lock is released once the records are fetched from the leader.
+ * share partition is not fetched concurrently by multiple clients. The fetch lock is released once
+ * the records are fetched and acquired.
*
* @return A boolean which indicates whether the fetch lock is acquired.
*/
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 4288dd55703d..1c6c5492372f 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -49,7 +49,7 @@
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
-import org.apache.kafka.server.share.fetch.ShareFetchData;
+import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
@@ -71,10 +71,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
/**
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions.
@@ -250,7 +248,7 @@ public CompletableFuture