Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18022: fetchOffsetMetadata handling for minBytes estimation in both common/uncommon cases of share fetch #17825

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public boolean tryComplete() {
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) {
partitionsAcquired = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
Expand Down Expand Up @@ -232,21 +232,22 @@ Map<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions() {
}

private Map<TopicIdPartition, LogReadResult> maybeReadFromLog(Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
Map<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
Map<TopicIdPartition, FetchRequest.PartitionData> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata().isEmpty()) {
partitionsMissingFetchOffsetMetadata.put(topicIdPartition, partitionData);
if (sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, partitionData);
}
});
if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
return Collections.emptyMap();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsMissingFetchOffsetMetadata);
return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
}

private void maybeUpdateFetchOffsetMetadata(
Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
Map<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
Expand All @@ -257,7 +258,9 @@ private void maybeUpdateFetchOffsetMetadata(
replicaManagerLogReadResult, topicIdPartition);
continue;
}
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
sharePartition.updateFetchOffsetMetadata(
topicPartitionData.get(topicIdPartition).fetchOffset,
replicaManagerLogReadResult.info().fetchOffsetMetadata);
}
}

Expand All @@ -283,7 +286,7 @@ private boolean isMinBytesSatisfied(Map<TopicIdPartition, FetchRequest.Partition

SharePartition sharePartition = sharePartitions.get(topicIdPartition);

Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata();
Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(partitionData.fetchOffset);
if (optionalFetchOffsetMetadata.isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get();
Expand Down
57 changes: 38 additions & 19 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ public static RecordState forId(byte id) {
private long endOffset;

/**
* We maintain the latest fetch offset metadata to estimate the minBytes requirement more efficiently.
* We maintain the latest fetch offset and its metadata to estimate the minBytes requirement more efficiently.
*/
private Optional<LogOffsetMetadata> fetchOffsetMetadata;
private final OffsetMetadata fetchOffsetMetadata;

/**
* The state epoch is used to track the version of the state of the share partition.
Expand Down Expand Up @@ -347,6 +347,7 @@ public static RecordState forId(byte id) {
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
this.fetchOffsetMetadata = new OffsetMetadata();
}

/**
Expand Down Expand Up @@ -451,12 +452,12 @@ public CompletableFuture<Void> maybeInitialize() {
// If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records
// in the cached state are not missed
findNextFetchOffset.set(true);
updateEndOffsetAndResetFetchOffsetMetadata(cachedState.lastEntry().getValue().lastOffset());
endOffset = cachedState.lastEntry().getValue().lastOffset();
// In case the persister read state RPC result contains no AVAILABLE records, we can update cached state
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
endOffset = startOffset;
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
Expand Down Expand Up @@ -943,7 +944,7 @@ void updateCacheAndOffsets(long logStartOffset) {
// If the cached state is empty, then the start and end offset will be the new log start offset.
// This can occur during the initialization of share partition if LSO has moved.
startOffset = logStartOffset;
updateEndOffsetAndResetFetchOffsetMetadata(logStartOffset);
endOffset = logStartOffset;
return;
}

Expand All @@ -961,7 +962,7 @@ void updateCacheAndOffsets(long logStartOffset) {
// This case means that the cached state is completely fresh now.
// Example scenario - batch of 0-10 in acquired state in cached state, then LSO moves to 15,
// then endOffset should be 15 as well.
updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
endOffset = startOffset;
}

// Note -
Expand Down Expand Up @@ -1192,7 +1193,7 @@ private AcquiredRecords acquireNewBatchRecords(
if (cachedState.firstKey() == firstAcquiredOffset) {
startOffset = firstAcquiredOffset;
}
updateEndOffsetAndResetFetchOffsetMetadata(lastAcquiredOffset);
endOffset = lastAcquiredOffset;
return new AcquiredRecords()
.setFirstOffset(firstAcquiredOffset)
.setLastOffset(lastAcquiredOffset)
Expand Down Expand Up @@ -1592,27 +1593,21 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}

// The caller of this function is expected to hold lock.writeLock() when calling this method.
protected void updateEndOffsetAndResetFetchOffsetMetadata(long updatedEndOffset) {
endOffset = updatedEndOffset;
fetchOffsetMetadata = Optional.empty();
}

protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffsetMetadata) {
protected void updateFetchOffsetMetadata(long nextFetchOffset, LogOffsetMetadata logOffsetMetadata) {
lock.writeLock().lock();
try {
this.fetchOffsetMetadata = fetchOffsetMetadata;
fetchOffsetMetadata.updateOffsetMetadata(nextFetchOffset, logOffsetMetadata);
} finally {
lock.writeLock().unlock();
}
}

protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
protected Optional<LogOffsetMetadata> fetchOffsetMetadata(long nextFetchOffset) {
lock.readLock().lock();
try {
if (findNextFetchOffset.get())
if (fetchOffsetMetadata.offsetMetadata() == null || fetchOffsetMetadata.offset() != nextFetchOffset)
return Optional.empty();
return fetchOffsetMetadata;
return Optional.of(fetchOffsetMetadata.offsetMetadata());
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -1696,7 +1691,7 @@ private void maybeUpdateCachedStateAndOffsets() {
long lastCachedOffset = cachedState.lastEntry().getValue().lastOffset();
if (lastOffsetAcknowledged == lastCachedOffset) {
startOffset = lastCachedOffset + 1; // The next offset that will be fetched and acquired in the share partition
updateEndOffsetAndResetFetchOffsetMetadata(lastCachedOffset + 1);
endOffset = lastCachedOffset + 1;
cachedState.clear();
// Nothing further to do.
return;
Expand Down Expand Up @@ -2426,4 +2421,28 @@ public String toString() {
")";
}
}

/**
* FetchOffsetMetadata class is used to cache offset and its log metadata.
*/
static final class OffsetMetadata {
private long offset;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's useful to add a comment that this offset could be different from offsetMetadata.messageOffset if it's in the middle of a batch.

private LogOffsetMetadata offsetMetadata;

OffsetMetadata() {
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we initialize the fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if initializing the fields has any benefits. The only value initialization inside the constructor that makes sense is offset=-1 i.e. an invalid case. We don't really need this initialization since we can use offsetMetadata() == null to check if the object fetchOffsetMetadata has been cached yet or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since 0 is a valid offset, it's better to initialize offset to -1.

Copy link
Contributor Author

@adixitconfluent adixitconfluent Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've pushed the change in my latest commit. Thanks!


long offset() {
return offset;
}

LogOffsetMetadata offsetMetadata() {
return offsetMetadata;
}

void updateOffsetMetadata(long offset, LogOffsetMetadata offsetMetadata) {
this.offset = offset;
this.offsetMetadata = offsetMetadata;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -163,7 +164,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
// We are testing the case when the share partition is getting fetched for the first time, so for the first time
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
// populated for the share partition, which has 1 as the positional difference, so it doesn't satisfy the minBytes(2).
when(sp0.fetchOffsetMetadata())
when(sp0.fetchOffsetMetadata(anyLong()))
.thenReturn(Optional.empty())
.thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
Expand Down Expand Up @@ -219,7 +220,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
// functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2).
LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class);
when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1);
when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);

DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
Expand Down Expand Up @@ -265,7 +266,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
Expand Down Expand Up @@ -1677,7 +1678,7 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() {
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 2);

// Initially you cannot acquire records for both sp1 and sp2.
Expand Down Expand Up @@ -1873,7 +1874,7 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() {
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 1);

// Initially you cannot acquire records for both sp1 and sp2.
Expand Down Expand Up @@ -2355,7 +2356,7 @@ public void testSharePartitionPartialInitializationFailure() throws Exception {
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Expand Down