Skip to content

Commit

Permalink
Make changes to segment GC
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 16, 2024
1 parent 4128745 commit fb79c43
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
RemoteStorePinnedTimestampService.class,
clusterManagerName
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
Expand All @@ -234,7 +233,7 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, 5);
indexRandomDocs(remoteStoreEnabledIndexName, 25);

String indexUUID = client().admin()
.indices()
Expand All @@ -250,74 +249,41 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
.get();
SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo();

indexRandomDocs(remoteStoreEnabledIndexName, 25);

CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, "snap2")
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo2 = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2"));

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
long maxWaitRetry = 10;
while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxWaitRetry -= 1;
}
Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 2");
logger.info("--> delete snapshot 1");

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
long maxWaitRetry = 10;
while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxWaitRetry -= 1;
}

AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo2.snapshotId().getName())
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1));
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
} catch (NoSuchFileException e) {
fail();
}
}, 30, TimeUnit.SECONDS);

logger.info("--> delete snapshot 1");
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
// on snapshot deletion, remote store segment files should get cleaned up for deleted index - `remote-index-1`
deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

// Delete is async. Give time for it
// assertBusy(() -> {
// try {
// assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
// } catch (NoSuchFileException e) {
// fail();
// }
// }, 60, TimeUnit.SECONDS);
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
Expand All @@ -326,10 +292,9 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
fail();
}
}, 60, TimeUnit.SECONDS);

}

public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() throws Exception {
public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
Expand All @@ -347,6 +312,7 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() t
RemoteStorePinnedTimestampService.class,
clusterManagerName
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
Expand All @@ -355,7 +321,7 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() t
final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, 25);
indexRandomDocs(remoteStoreEnabledIndexName, 5);

String indexUUID = client().admin()
.indices()
Expand All @@ -371,18 +337,18 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() t
.get();
SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo();

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");

Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");
indexRandomDocs(remoteStoreEnabledIndexName, 25);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
CreateSnapshotResponse createSnapshotResponse2 = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, "snap2")
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo2 = createSnapshotResponse2.getSnapshotInfo();
assertThat(snapshotInfo2.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo2.successfulShards(), greaterThan(0));
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2"));

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
Expand All @@ -392,20 +358,53 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() t
maxWaitRetry -= 1;
}

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 2");

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

// Get total segments remote store directory file count for deleted index and shard 0
int segmentFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);
int translogFilesCountBeforeDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(translogPath);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo2.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1));
} catch (NoSuchFileException e) {
fail();
}
}, 30, TimeUnit.SECONDS);

logger.info("--> delete snapshot 1");
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
// on snapshot deletion, remote store segment files should get cleaned up for deleted index - `remote-index-1`
deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

// Delete is async. Give time for it
// assertBusy(() -> {
// try {
// assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
// } catch (NoSuchFileException e) {
// fail();
// }
// }, 60, TimeUnit.SECONDS);
assertBusy(() -> {
try {
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
} catch (NoSuchFileException e) {
fail();
}
}, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
Expand All @@ -414,6 +413,7 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() t
fail();
}
}, 60, TimeUnit.SECONDS);

}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,8 @@ public static void remoteDirectoryCleanup(
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
boolean forceClean
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
Expand All @@ -1003,8 +1004,12 @@ public static void remoteDirectoryCleanup(
shardId,
pathStrategy
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
if (forceClean) {
remoteSegmentStoreDirectory.delete();
} else {
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
}
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
Expand All @@ -1023,7 +1028,10 @@ private boolean deleteIfEmpty() throws IOException {
logger.info("Remote directory still has files, not deleting the path");
return false;
}
return delete();
}

private boolean delete() {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,8 @@ public static void remoteDirectoryCleanupAsync(
String indexUUID,
ShardId shardId,
String threadPoolName,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
boolean forceClean
) {
threadpool.executor(threadPoolName)
.execute(
Expand All @@ -1467,7 +1468,8 @@ public static void remoteDirectoryCleanupAsync(
remoteStoreRepoForIndex,
indexUUID,
shardId,
pathStrategy
pathStrategy,
forceClean
),
indexUUID,
shardId
Expand Down Expand Up @@ -1523,7 +1525,8 @@ protected void releaseRemoteStoreLockAndCleanup(
indexUUID,
new ShardId(Index.UNKNOWN_INDEX_NAME, indexUUID, Integer.parseInt(shardId)),
ThreadPool.Names.REMOTE_PURGE,
remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy()
remoteStoreShardShallowCopySnapshot.getRemoteStorePathStrategy(),
false
);
}
}
Expand Down Expand Up @@ -2206,7 +2209,8 @@ private void cleanRemoteStoreDirectoryIfNeeded(
prevIndexMetadata.getIndexUUID(),
shard,
ThreadPool.Names.REMOTE_PURGE,
remoteStorePathStrategy
remoteStorePathStrategy,
forceClean
);
remoteTranslogCleanupAsync(
remoteTranslogRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ public void testCleanupAsync() throws Exception {
repositoryName,
indexUUID,
shardId,
pathStrategy
pathStrategy,
false
);
verify(remoteSegmentStoreDirectoryFactory).newDirectory(repositoryName, indexUUID, shardId, pathStrategy);
verify(threadPool, times(0)).executor(ThreadPool.Names.REMOTE_PURGE);
Expand Down

0 comments on commit fb79c43

Please sign in to comment.