Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8717 #8

Merged
merged 17 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

package org.opensearch.index.remote;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.logging.Loggers;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
Expand All @@ -18,21 +22,26 @@
import org.opensearch.index.store.DirectoryFileTransferTracker;

import java.io.IOException;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.EXCLUDE_FILES;

/**
* Keeps track of remote refresh which happens in {@link org.opensearch.index.shard.RemoteStoreRefreshListener}. This consist of multiple critical metrics.
*
* @opensearch.internal
*/
public class RemoteSegmentTransferTracker {

private final Logger logger;

/**
* ShardId for which this instance tracks the remote segment upload metadata.
*/
Expand Down Expand Up @@ -124,14 +133,15 @@ public class RemoteSegmentTransferTracker {
private final Map<String, AtomicLong> rejectionCountMap = ConcurrentCollections.newConcurrentMap();

/**
* Map of name to size of the segment files created as part of the most recent refresh.
* Keeps track of segment files and their size in bytes which are part of the most recent refresh.
*/
private volatile Map<String, Long> latestLocalFileNameLengthMap;
private final Map<String, Long> latestLocalFileNameLengthMap = ConcurrentCollections.newConcurrentMap();

/**
* Set of names of segment files that were uploaded as part of the most recent remote refresh.
* This contains the files from the last successful remote refresh and ongoing uploads. This gets reset to just the
* last successful remote refresh state on successful remote refresh.
*/
private final Set<String> latestUploadedFiles = new HashSet<>();
private final Set<String> latestUploadedFiles = ConcurrentCollections.newConcurrentSet();

/**
* Keeps the bytes lag computed so that we do not compute it for every request.
Expand Down Expand Up @@ -182,6 +192,7 @@ public RemoteSegmentTransferTracker(
int uploadBytesPerSecMovingAverageWindowSize,
int uploadTimeMsMovingAverageWindowSize
) {
logger = Loggers.getLogger(getClass(), shardId);
this.shardId = shardId;
// Both the local refresh time and remote refresh time are set with current time to give consistent view of time lag when it arises.
long currentClockTimeMs = System.currentTimeMillis();
Expand All @@ -193,8 +204,6 @@ public RemoteSegmentTransferTracker(
uploadBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesMovingAverageWindowSize));
uploadBytesPerSecMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadBytesPerSecMovingAverageWindowSize));
uploadTimeMsMovingAverageReference = new AtomicReference<>(new MovingAverage(uploadTimeMsMovingAverageWindowSize));

latestLocalFileNameLengthMap = new HashMap<>();
this.directoryFileTransferTracker = directoryFileTransferTracker;
}

Expand All @@ -206,7 +215,8 @@ public long getLocalRefreshSeqNo() {
return localRefreshSeqNo;
}

public void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
// Visible for testing
void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
assert localRefreshSeqNo >= this.localRefreshSeqNo : "newLocalRefreshSeqNo="
+ localRefreshSeqNo
+ " < "
Expand All @@ -224,7 +234,17 @@ public long getLocalRefreshClockTimeMs() {
return localRefreshClockTimeMs;
}

public void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
/**
* Updates the last refresh time and refresh seq no which is seen by local store.
*/
public void updateLocalRefreshTimeAndSeqNo() {
updateLocalRefreshClockTimeMs(System.currentTimeMillis());
updateLocalRefreshTimeMs(System.nanoTime() / 1_000_000L);
updateLocalRefreshSeqNo(getLocalRefreshSeqNo() + 1);
}

// Visible for testing
void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ " < "
Expand All @@ -234,7 +254,7 @@ public void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
computeTimeMsLag();
}

public void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
private void updateLocalRefreshClockTimeMs(long localRefreshClockTimeMs) {
this.localRefreshClockTimeMs = localRefreshClockTimeMs;
}

Expand Down Expand Up @@ -369,12 +389,36 @@ long getRejectionCount(String rejectionReason) {
return rejectionCountMap.get(rejectionReason).get();
}

Map<String, Long> getLatestLocalFileNameLengthMap() {
return latestLocalFileNameLengthMap;
public Map<String, Long> getLatestLocalFileNameLengthMap() {
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
}

public void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
/**
* Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files.
*
* @param segmentFiles list of local refreshed segment files
* @param fileSizeFunction function is used to determine the file size in bytes
*/
public void updateLatestLocalFileNameLengthMap(
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
// Update the map
segmentFiles.stream()
.filter(file -> EXCLUDE_FILES.contains(file) == false)
.filter(file -> latestLocalFileNameLengthMap.containsKey(file) == false || latestLocalFileNameLengthMap.get(file) == 0)
.forEach(file -> {
long fileSize = 0;
try {
fileSize = fileSizeFunction.apply(file);
} catch (IOException e) {
logger.warn(new ParameterizedMessage("Exception while reading the fileLength of file={}", file), e);
}
latestLocalFileNameLengthMap.put(file, fileSize);
});
Set<String> fileSet = new HashSet<>(segmentFiles);
// Remove keys from the fileSizeMap that do not exist in the latest segment files
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
computeBytesLag();
}

Expand All @@ -390,7 +434,7 @@ public void setLatestUploadedFiles(Set<String> files) {
}

private void computeBytesLag() {
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
if (latestLocalFileNameLengthMap.isEmpty()) {
return;
}
Set<String> filesNotYetUploaded = latestLocalFileNameLengthMap.keySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void beforeRefresh() throws IOException {
}

@Override
protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) {
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
Expand Down
Loading
Loading