Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 15, 2024
1 parent e6141a5 commit 913edd6
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA.getKey(), false)
.build();
}

private void keepPinnedTimestampSchedulerUpdated() throws InterruptedException {
long currentTime = System.currentTimeMillis();
int maxRetry = 10;
while(maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
while (maxRetry > 0 && RemoteStorePinnedTimestampService.getPinnedTimestamps().v1() <= currentTime) {
Thread.sleep(1000);
maxRetry--;
}
Expand Down Expand Up @@ -88,11 +89,25 @@ public void testLiveIndexNoPinnedTimestamps() throws Exception {
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
String shardMetadataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
Expand Down Expand Up @@ -122,18 +137,32 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSettingWithinLimit() thro

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
int numDocs = randomIntBetween(5, 9);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
String shardMetadataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
Expand All @@ -148,7 +177,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exceptio
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 4)
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
Expand All @@ -170,11 +199,25 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exceptio
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
String shardMetadataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
Expand All @@ -185,7 +228,7 @@ public void testLiveIndexNoPinnedTimestampsWithExtraGenSetting() throws Exceptio
});
}

public void testIndexDeletionNoPinnedTimestamps() throws Exception {
public void testLiveIndexWithPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
Expand All @@ -208,33 +251,44 @@ public void testIndexDeletionNoPinnedTimestamps() throws Exception {
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
if (i == 2) {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
}
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
String shardMetadataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(1, metadataFiles.size());
assertEquals(2, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});

keepPinnedTimestampSchedulerUpdated();
client().admin().indices().prepareDelete(INDEX_NAME).get();

assertBusy(() -> {
assertEquals(0, Files.list(translogMetadataPath).collect(Collectors.toList()).size());
assertEquals(0, Files.list(translogDataPath).collect(Collectors.toList()).size());
}, 30, TimeUnit.SECONDS);
}

public void testLiveIndexWithPinnedTimestamps() throws Exception {
public void testIndexDeletionNoPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(0, 1))
Expand All @@ -257,24 +311,43 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception {
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
if (i == 2) {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
}
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
String shardMetadataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(2, metadataFiles.size());
assertEquals(1, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});

keepPinnedTimestampSchedulerUpdated();
client().admin().indices().prepareDelete(INDEX_NAME).get();

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(1, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
Expand Down Expand Up @@ -311,26 +384,40 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception {
}

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix)
.buildAsString();
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");
String shardMetadataPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix)
.buildAsString();
String shardMetadataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetadataPath = Path.of(translogRepoPath + "/" + shardMetadataPath);

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(2, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
}, 30, TimeUnit.SECONDS);

keepPinnedTimestampSchedulerUpdated();
client().admin().indices().prepareDelete(INDEX_NAME).get();

assertBusy(() -> {
List<Path> metadataFiles = Files.list(translogMetadataPath).collect(Collectors.toList());
assertEquals(1, metadataFiles.size());
assertEquals(2, metadataFiles.size());

verifyTranslogDataFileCount(metadataFiles, translogDataPath);
});
Expand All @@ -339,11 +426,10 @@ public void testIndexDeletionWithPinnedTimestamps() throws Exception {
private void verifyTranslogDataFileCount(List<Path> metadataFiles, Path translogDataPath) throws IOException {
List<String> mdFiles = metadataFiles.stream().map(p -> p.getFileName().toString()).collect(Collectors.toList());
Set<Long> generations = new HashSet<>();
for(String mdFile : mdFiles) {
for (String mdFile : mdFiles) {
Tuple<Long, Long> minMaxGen = TranslogTransferMetadata.getMinMaxTranslogGenerationFromFilename(mdFile);
generations.addAll(LongStream.rangeClosed(minMaxGen.v1(), minMaxGen.v2()).boxed().collect(Collectors.toList()));
}
assertEquals(generations.size() * 2, Files.list(translogDataPath).collect(Collectors.toList()).size());
}

}
Loading

0 comments on commit 913edd6

Please sign in to comment.