Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 19, 2024
1 parent 4b128d1 commit 22187b9
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,17 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception {
});
}

public void testLiveIndexNoPinnedTimestampsWithMetadataSkippedOnLastDeletionCheck() throws Exception {
public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder().put(remoteStoreIndexSettings(0, 1)).build();
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 10)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

// We don't set look-back interval to 0 as we want GC to skip based on last deletion check
// RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
Expand Down Expand Up @@ -171,6 +173,61 @@ public void testLiveIndexNoPinnedTimestampsWithMetadataSkippedOnLastDeletionChec
});
}

public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = 5;
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(4, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
}

public void testLiveIndexWithPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,10 +817,6 @@ Set<String> getMetadataFilesToFilterActiveSegments(
return metadataFilesToFilterActiveSegments;
}

public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
deleteStaleSegments(lastNMetadataFilesToKeep, Map.of());
}

/**
* Delete stale segment and metadata files
* One metadata file is kept per commit (refresh updates the same file). To read segments uploaded to remote store,
Expand All @@ -836,7 +832,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
* @param lastNMetadataFilesToKeep number of metadata files to keep
* @throws IOException in case of I/O error while reading from / writing to remote segment store
*/
private void deleteStaleSegments(int lastNMetadataFilesToKeep, Map<String, Long> pinnedTimestampsToSkip) throws IOException {
public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException {
if (lastNMetadataFilesToKeep == -1) {
logger.info(
"Stale segment deletion is disabled if cluster.remote_store.index.segment_metadata.retention.max_count is set to -1"
Expand All @@ -863,7 +859,7 @@ private void deleteStaleSegments(int lastNMetadataFilesToKeep, Map<String, Long>
return;
}

Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip);
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
sortedMetadataFileList,
Expand Down Expand Up @@ -999,8 +995,7 @@ public static void remoteDirectoryCleanup(
String indexUUID,
ShardId shardId,
RemoteStorePathStrategy pathStrategy,
boolean forceClean,
Map<String, Long> pinnedTimestampsToSkip
boolean forceClean
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
Expand All @@ -1012,7 +1007,7 @@ public static void remoteDirectoryCleanup(
if (forceClean) {
remoteSegmentStoreDirectory.delete();
} else {
remoteSegmentStoreDirectory.deleteStaleSegments(0, pinnedTimestampsToSkip);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
private long lastTimestampOfMetadataDeletionOnRemote = System.currentTimeMillis();
private long previousMinRemoteGenReferenced = -1;

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -148,11 +148,10 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
if (indexDeleted == false
&& (System.currentTimeMillis() - lastTimestampOfMetadataDeletionOnRemote <= RemoteStoreSettings
.getPinnedTimestampsLookbackInterval()
.millis() * 2)) {
if (indexDeleted == false && previousMinRemoteGenReferenced == minRemoteGenReferenced) {
return;
} else if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
previousMinRemoteGenReferenced = minRemoteGenReferenced;
}

// Since remote generation deletion is async, this ensures that only one generation deletion happens at a time.
Expand Down Expand Up @@ -204,7 +203,7 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted ? Long.MAX_VALUE : minRemoteGenReferenced
indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeep()
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
Expand All @@ -220,7 +219,6 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
lastTimestampOfMetadataDeletionOnRemote = System.currentTimeMillis();
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
Expand Down Expand Up @@ -248,11 +246,15 @@ public void onFailure(Exception e) {
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

private long getMinGenerationToKeep() {
return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep();
}

// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
long minRemoteGenReferenced
long minGenerationToKeep
) throws IOException {
Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
for (String mdFile : metadataFilesToBeDeleted) {
Expand All @@ -267,36 +269,28 @@ protected Set<Long> getGenerationsToBeDeleted(
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
// Check if the generation is not referred by metadata file matching pinned timestamps
// The check with minRemoteGenReferenced is redundant but kept as to make sure we don't delete generations
// The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations
// that are not persisted in remote segment store yet.
if (generation < minRemoteGenReferenced && isGenerationPinned(generation, pinnedGenerations) == false) {
if (generation < minGenerationToKeep && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
}
}
return generationsToBeDeleted;
}

protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles, boolean indexDeleted) {
return getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
minRemoteGenReferenced,
Map.of(),
indexDeleted,
logger
);
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, getMinGenerationToKeep(), indexDeleted, logger);
}

// Visible for testing
protected static List<String> getMetadataFilesToBeDeleted(
List<String> metadataFiles,
Map<Long, String> metadataFilePinnedTimestampMap,
long minRemoteGenReferenced,
Map<String, Long> pinnedTimestampsToSkip,
long minGenerationToKeep,
boolean indexDeleted,
Logger logger
) {
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(pinnedTimestampsToSkip);
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

// Keep files since last successful run of scheduler
List<String> metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
Expand Down Expand Up @@ -330,18 +324,18 @@ protected static List<String> getMetadataFilesToBeDeleted(
);

if (indexDeleted == false) {
// Filter out metadata files based on minRemoteGenReferenced
List<String> metadataFilesContainingMinRemoteGenReferenced = metadataFilesToBeDeleted.stream().filter(md -> {
// Filter out metadata files based on minGenerationToKeep
List<String> metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> {
long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md);
return maxGeneration == -1 || maxGeneration > minRemoteGenReferenced;
return maxGeneration == -1 || maxGeneration > minGenerationToKeep;
}).collect(Collectors.toList());
metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinRemoteGenReferenced);
metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep);

logger.trace(
"metadataFilesContainingMinRemoteGenReferenced.size = {}, metadataFilesToBeDeleted based on minRemoteGenReferenced filtering = {}, minRemoteGenReferenced = {}",
metadataFilesContainingMinRemoteGenReferenced.size(),
"metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}",
metadataFilesContainingMinGenerationToKeep.size(),
metadataFilesToBeDeleted.size(),
minRemoteGenReferenced
minGenerationToKeep
);
}

Expand Down Expand Up @@ -505,11 +499,7 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
}
}

public static void cleanup(
TranslogTransferManager translogTransferManager,
boolean forceClean,
Map<String, Long> pinnedTimestampsToSkip
) throws IOException {
public static void cleanup(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException {
if (forceClean) {
translogTransferManager.delete();
} else {
Expand All @@ -527,7 +517,6 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFiles,
new HashMap<>(),
Long.MAX_VALUE,
pinnedTimestampsToSkip,
true,
staticLogger
);
Expand Down
Loading

0 comments on commit 22187b9

Please sign in to comment.