Skip to content

Commit

Permalink
Pass pinning entity and timestamp while deleting snapshot of deleted …
Browse files Browse the repository at this point in the history
…index

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 15, 2024
1 parent 913edd6 commit 4128745
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -206,7 +207,6 @@ public void testDeleteShallowCopyV2MultipleSnapshots() throws Exception {

}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/15692")
public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down Expand Up @@ -242,12 +242,6 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

String numShards = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_NUMBER_OF_SHARDS);

logger.info("--> create two remote index shallow snapshots");
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
Expand All @@ -269,6 +263,14 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertThat(snapshotInfo2.successfulShards(), equalTo(snapshotInfo2.totalShards()));
assertThat(snapshotInfo2.snapshotId().getName(), equalTo("snap2"));

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
long maxWaitRetry = 10;
while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxWaitRetry -= 1;
}

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

Expand All @@ -291,14 +293,13 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
.get();
assertAcked(deleteSnapshotResponse);

Thread.sleep(5000);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
} catch (NoSuchFileException e) {
fail();
}
}, 30, TimeUnit.SECONDS);
int segmentFilesCountAfterDeletingSnapshot1 = RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath);

logger.info("--> delete snapshot 1");
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
Expand All @@ -310,18 +311,109 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2() throws Exceptio
assertAcked(deleteSnapshotResponse);

// Delete is async. Give time for it
// assertBusy(() -> {
// try {
// assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
// } catch (NoSuchFileException e) {
// fail();
// }
// }, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath), lessThan(segmentFilesCountAfterDeletingSnapshot1));
} catch (Exception e) {}
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath));
} catch (NoSuchFileException e) {
fail();
}
}, 60, TimeUnit.SECONDS);

}

public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2SingleSnapshot() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString())
.build();
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNode(settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
clusterManagerName
);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, 25);

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> create two remote index shallow snapshots");
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(snapshotRepoName, "snap1")
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo1 = createSnapshotResponse.getSnapshotInfo();

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");

Path segmentsPath = Path.of(String.valueOf(shardPath), "segments");
Path translogPath = Path.of(String.valueOf(shardPath), "translog");

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
long currentTime = System.currentTimeMillis();
long maxWaitRetry = 10;
while (maxWaitRetry >= 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxWaitRetry -= 1;
}

AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, snapshotInfo1.snapshotId().getName())
.get();
assertAcked(deleteSnapshotResponse);

// Delete is async. Give time for it
// assertBusy(() -> {
// try {
// assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(segmentsPath));
// } catch (NoSuchFileException e) {
// fail();
// }
// }, 60, TimeUnit.SECONDS);

assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(translogPath), lessThan(translogFilesCountBeforeDeletingSnapshot1));
} catch (Exception e) {}
assertEquals(0, RemoteStoreBaseIntegTestCase.getFileCount(translogPath));
} catch (NoSuchFileException e) {
fail();
}
}, 60, TimeUnit.SECONDS);

}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,17 +266,22 @@ protected Set<Long> getGenerationsToBeDeleted(List<String> metadataFilesNotToBeD
}

protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, minRemoteGenReferenced, logger);
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, minRemoteGenReferenced, null, null, logger);
}

// Visible for testing
protected static List<String> getMetadataFilesToBeDeleted(
List<String> metadataFiles,
Map<Long, String> metadataFilePinnedTimestampMap,
long minRemoteGenReferenced,
String pinningEntityToSkip,
Long pinnedTimestampToSkip,
Logger logger
) {
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps(
pinningEntityToSkip,
pinnedTimestampToSkip
);

// Keep files since last successful run of scheduler
List<String> metadataFilesToBeDeleted = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
Expand Down Expand Up @@ -483,55 +488,66 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
}
}

public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
public static void cleanup(
TranslogTransferManager translogTransferManager,
boolean forceClean,
String pinningEntity,
Long pinnedTimestamp
) throws IOException {
if (forceClean) {
translogTransferManager.delete();
} else {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
new HashMap<>(),
Long.MAX_VALUE,
pinningEntity,
pinnedTimestamp,
staticLogger
);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
new HashMap<>(),
Long.MAX_VALUE,
staticLogger
);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
}

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}
}
}
Loading

0 comments on commit 4128745

Please sign in to comment.