Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 14, 2024
1 parent afac97d commit e6141a5
Showing 1 changed file with 110 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.translog.transfer.TranslogTransferMetadata;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
Expand Down Expand Up @@ -53,6 +54,14 @@ private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException {
}
}

ActionListener<Void> noOpActionListener = new ActionListener<>() {
@Override
public void onResponse(Void unused) {}

@Override
public void onFailure(Exception e) {}
};

public void testLiveIndexNoPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
Expand Down Expand Up @@ -225,46 +234,107 @@ public void testIndexDeletionNoPinnedTimestamps() throws Exception {
}, 30, TimeUnit.SECONDS);
}

// public void testLiveIndexPinnedTimestamps() throws Exception {
// prepareCluster(1, 1, Settings.EMPTY);
// Settings indexSettings = Settings.builder()
// .put(remoteStoreIndexSettings(0, 1))
// .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0)
// .build();
// createIndex(INDEX_NAME, indexSettings);
// ensureYellowAndNoInitializingShards(INDEX_NAME);
// ensureGreen(INDEX_NAME);
//
// RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
//
// RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
// RemoteStorePinnedTimestampService.class,
// primaryNodeName(INDEX_NAME)
// );
//
// remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
//
// int numDocs = randomIntBetween(5, 10);
// for (int i = 0; i < numDocs; i++) {
// keepPinnedTimestampSchedulerUpdated();
// indexSingleDoc(INDEX_NAME, true);
// }
//
// String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
// String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
// .buildAsString();
// Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
// String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
// .buildAsString();
// Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);
//
// assertBusy(() -> {
// List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
// assertEquals(1, metadataFiles.size());
//
// verifyTranslogDataFileCount(metadataFiles, translogDataPath);
// });
// }
public void testLiveIndexWithPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
if (i == 2) {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
}
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(2, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
}

public void testIndexDeletionWithPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 0)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
if (i == 2) {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
}
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(2, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});

keepPinnedTimestampSchedulerUpdated();
client().admin().indices().prepareDelete(INDEX_NAME).get();

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(1, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
}

private void verifyTranslogDataFileCount(List<Path> metadataFiles, Path translogDataPath) throws IOException {
List<String> mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList());
Expand Down

0 comments on commit e6141a5

Please sign in to comment.