Skip to content

Commit

Permalink
Bugfix in RemoteFsTimestampAwareTranslog.trimUnreferencedReaders
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale committed Sep 23, 2024
1 parent fae9f37 commit 5a7f24c
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.remotestore;

import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
Expand All @@ -32,6 +33,7 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase {
Expand Down Expand Up @@ -288,6 +290,79 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception {
});
}

public void testLiveIndexWithPinnedTimestampsMultiplePrimaryTerms() throws Exception {
prepareCluster(1, 2, Settings.EMPTY);
Settings indexSettings = Settings.builder()
.put(remoteStoreIndexSettings(1, 1))
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
primaryNodeName(INDEX_NAME)
);

remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));

int numDocs = randomIntBetween(5, 10);
for (int i = 0; i < numDocs; i++) {
keepPinnedTimestampSchedulerUpdated();
indexSingleDoc(INDEX_NAME, true);
if (i == 2) {
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1));
remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener);
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
}
}

ingestDocs();

internalCluster().restartNode(primaryNodeName(INDEX_NAME));
ensureGreen(INDEX_NAME);

ingestDocs();

String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardDataPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
DATA,
translogPathFixedPrefix
).buildAsString();
Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1");

assertBusy(() -> {
List<Path> dataFiles = Files.list(translogDataPath).collect(Collectors.toList());
assertFalse(dataFiles.isEmpty());
});
}

private void ingestDocs() {
int numDocs = randomIntBetween(15, 20);
for (int i = 0; i < numDocs; i++) {
indexSingleDoc(INDEX_NAME, false);
}

assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
flushAndRefresh(INDEX_NAME);

int numDocsPostFailover = randomIntBetween(15, 20);
for (int i = 0; i < numDocsPostFailover; i++) {
indexSingleDoc(INDEX_NAME, false);
}

flushAndRefresh(INDEX_NAME);
assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get());
}

public void testIndexDeletionNoPinnedTimestamps() throws Exception {
prepareCluster(1, 1, Settings.EMPTY);
Settings indexSettings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -312,6 +313,107 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots(
// translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS);
}

public void testRemoteStoreCleanupMultiplePrimaryOnSnapshotDeletion() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString())
.build();
String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNodes(3, settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(4);

RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance(
RemoteStorePinnedTimestampService.class,
clusterManagerName
);
remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1));
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath));

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = Settings.builder()
.put(getRemoteStoreBackedIndexSettings())
.put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 2)
.build();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
ensureGreen(remoteStoreEnabledIndexName);

// Create 2 snapshots for primary term 1
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap1");
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap2");

// Restart current primary to change the primary term
internalCluster().restartNode(primaryNodeName(remoteStoreEnabledIndexName));
ensureGreen(remoteStoreEnabledIndexName);

// Create 2 snapshots for primary term 2
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap3");
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);
createSnapshot(snapshotRepoName, "snap4");

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
Path shardPath = Path.of(String.valueOf(indexPath), "0");
Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1");

// Deleting snap1 will still keep files in primary term 1 due to snap2
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap1");
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);

// Deleting snap2 will not remove primary term 1 as we need to trigger trimUnreferencedReaders once
deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap2");
assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0);

// Index a doc to trigger trimUnreferencedReaders
RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO);
keepPinnedTimestampSchedulerUpdated();
indexRandomDocs(remoteStoreEnabledIndexName, 5);

assertBusy(() -> assertFalse(Files.exists(translogPath)), 30, TimeUnit.SECONDS);
}

private void createSnapshot(String repoName, String snapshotName) {
CreateSnapshotResponse createSnapshotResponse = client().admin()
.cluster()
.prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();

assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName));
}

private void deleteSnapshot(Client clusterManagerClient, String repoName, String snapshotName) {
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(repoName, snapshotName)
.get();
assertAcked(deleteSnapshotResponse);
}

private Settings snapshotV2Settings(Path remoteStoreRepoPath) {
Settings settings = Settings.builder()
.put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void trimUnreferencedReaders() throws IOException {
protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) throws IOException {
if (trimLocal) {
// clean up local translog files and updates readers
super.trimUnreferencedReaders();
super.trimUnreferencedReaders(true);
}

// Update file tracker to reflect local translog state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,9 +549,17 @@ protected Releasable drainSync() {

@Override
public void trimUnreferencedReaders() throws IOException {
trimUnreferencedReaders(false);
}

protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException {
// clean up local translog files and updates readers
super.trimUnreferencedReaders();

if (onlyTrimLocal) {
return;
}

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
Expand Down

0 comments on commit 5a7f24c

Please sign in to comment.