-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18022: fetchOffsetMetadata handling for minBytes estimation in both common/uncommon cases of share fetch #17825
base: trunk
Are you sure you want to change the base?
Changes from 3 commits
3ce9c87
60974e0
7c935b1
f7d1c89
5f343a5
1968ce9
aa6e8bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -280,9 +280,9 @@ public static RecordState forId(byte id) { | |
private long endOffset; | ||
|
||
/** | ||
* We maintain the latest fetch offset metadata to estimate the minBytes requirement more efficiently. | ||
* We maintain the latest fetch offset and its metadata to estimate the minBytes requirement more efficiently. | ||
*/ | ||
private Optional<LogOffsetMetadata> fetchOffsetMetadata; | ||
private final OffsetMetadata fetchOffsetMetadata; | ||
|
||
/** | ||
* The state epoch is used to track the version of the state of the share partition. | ||
|
@@ -347,6 +347,7 @@ public static RecordState forId(byte id) { | |
this.partitionState = sharePartitionState; | ||
this.replicaManager = replicaManager; | ||
this.groupConfigManager = groupConfigManager; | ||
this.fetchOffsetMetadata = new OffsetMetadata(); | ||
} | ||
|
||
/** | ||
|
@@ -451,12 +452,12 @@ public CompletableFuture<Void> maybeInitialize() { | |
// If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records | ||
// in the cached state are not missed | ||
findNextFetchOffset.set(true); | ||
updateEndOffsetAndResetFetchOffsetMetadata(cachedState.lastEntry().getValue().lastOffset()); | ||
endOffset = cachedState.lastEntry().getValue().lastOffset(); | ||
// In case the persister read state RPC result contains no AVAILABLE records, we can update cached state | ||
// and start/end offsets. | ||
maybeUpdateCachedStateAndOffsets(); | ||
} else { | ||
updateEndOffsetAndResetFetchOffsetMetadata(startOffset); | ||
endOffset = startOffset; | ||
} | ||
// Set the partition state to Active and complete the future. | ||
partitionState = SharePartitionState.ACTIVE; | ||
|
@@ -943,7 +944,7 @@ void updateCacheAndOffsets(long logStartOffset) { | |
// If the cached state is empty, then the start and end offset will be the new log start offset. | ||
// This can occur during the initialization of share partition if LSO has moved. | ||
startOffset = logStartOffset; | ||
updateEndOffsetAndResetFetchOffsetMetadata(logStartOffset); | ||
endOffset = logStartOffset; | ||
return; | ||
} | ||
|
||
|
@@ -961,7 +962,7 @@ void updateCacheAndOffsets(long logStartOffset) { | |
// This case means that the cached state is completely fresh now. | ||
// Example scenario - batch of 0-10 in acquired state in cached state, then LSO moves to 15, | ||
// then endOffset should be 15 as well. | ||
updateEndOffsetAndResetFetchOffsetMetadata(startOffset); | ||
endOffset = startOffset; | ||
} | ||
|
||
// Note - | ||
|
@@ -1192,7 +1193,7 @@ private AcquiredRecords acquireNewBatchRecords( | |
if (cachedState.firstKey() == firstAcquiredOffset) { | ||
startOffset = firstAcquiredOffset; | ||
} | ||
updateEndOffsetAndResetFetchOffsetMetadata(lastAcquiredOffset); | ||
endOffset = lastAcquiredOffset; | ||
return new AcquiredRecords() | ||
.setFirstOffset(firstAcquiredOffset) | ||
.setLastOffset(lastAcquiredOffset) | ||
|
@@ -1592,27 +1593,21 @@ private Optional<Throwable> acknowledgeCompleteBatch( | |
return Optional.empty(); | ||
} | ||
|
||
// The caller of this function is expected to hold lock.writeLock() when calling this method. | ||
protected void updateEndOffsetAndResetFetchOffsetMetadata(long updatedEndOffset) { | ||
endOffset = updatedEndOffset; | ||
fetchOffsetMetadata = Optional.empty(); | ||
} | ||
|
||
protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffsetMetadata) { | ||
protected void updateFetchOffsetMetadata(long nextFetchOffset, LogOffsetMetadata logOffsetMetadata) { | ||
lock.writeLock().lock(); | ||
try { | ||
this.fetchOffsetMetadata = fetchOffsetMetadata; | ||
fetchOffsetMetadata.updateOffsetMetadata(nextFetchOffset, logOffsetMetadata); | ||
} finally { | ||
lock.writeLock().unlock(); | ||
} | ||
} | ||
|
||
protected Optional<LogOffsetMetadata> fetchOffsetMetadata() { | ||
protected Optional<LogOffsetMetadata> fetchOffsetMetadata(long nextFetchOffset) { | ||
lock.readLock().lock(); | ||
try { | ||
if (findNextFetchOffset.get()) | ||
if (fetchOffsetMetadata.offsetMetadata() == null || fetchOffsetMetadata.offset() != nextFetchOffset) | ||
return Optional.empty(); | ||
return fetchOffsetMetadata; | ||
return Optional.of(fetchOffsetMetadata.offsetMetadata()); | ||
} finally { | ||
lock.readLock().unlock(); | ||
} | ||
|
@@ -1696,7 +1691,7 @@ private void maybeUpdateCachedStateAndOffsets() { | |
long lastCachedOffset = cachedState.lastEntry().getValue().lastOffset(); | ||
if (lastOffsetAcknowledged == lastCachedOffset) { | ||
startOffset = lastCachedOffset + 1; // The next offset that will be fetched and acquired in the share partition | ||
updateEndOffsetAndResetFetchOffsetMetadata(lastCachedOffset + 1); | ||
endOffset = lastCachedOffset + 1; | ||
cachedState.clear(); | ||
// Nothing further to do. | ||
return; | ||
|
@@ -2426,4 +2421,28 @@ public String toString() { | |
")"; | ||
} | ||
} | ||
|
||
/** | ||
* FetchOffsetMetadata class is used to cache offset and its log metadata. | ||
*/ | ||
static final class OffsetMetadata { | ||
private long offset; | ||
private LogOffsetMetadata offsetMetadata; | ||
|
||
OffsetMetadata() { | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we initialize the fields? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if initializing the fields has any benefits. The only value initialization inside the constructor that makes sense is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, since 0 is a valid offset, it's better to initialize offset to -1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i've pushed the change in my latest commit. Thanks! |
||
|
||
long offset() { | ||
return offset; | ||
} | ||
|
||
LogOffsetMetadata offsetMetadata() { | ||
return offsetMetadata; | ||
} | ||
|
||
void updateOffsetMetadata(long offset, LogOffsetMetadata offsetMetadata) { | ||
this.offset = offset; | ||
this.offsetMetadata = offsetMetadata; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's useful to add a comment that this offset could be different from offsetMetadata.messageOffset if it's in the middle of a batch.