Skip to content

Commit

Permalink
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
Browse files Browse the repository at this point in the history
…timestamp-support
  • Loading branch information
bharath-techie committed Sep 3, 2024
2 parents a52b29c + 70ba5a1 commit 4387345
Show file tree
Hide file tree
Showing 68 changed files with 1,681 additions and 245 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -67,14 +68,16 @@ public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
assertBusy(() -> {
String shardPath = getShardLevelBlobPath(
client(),
indexName,
new BlobPath(),
String.valueOf(shardRouting.getId()),
SEGMENTS,
DATA
DATA,
segmentsPathFixedPrefix
).buildAsString();
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
Client client = client();
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix).buildAsString();
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix).buildAsString();
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA, segmentsPathFixedPrefix).buildAsString();
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix).buildAsString();
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);

try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,16 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
Expand Down Expand Up @@ -236,7 +245,16 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
Expand All @@ -247,11 +265,19 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
internalCluster().startNode(settings);

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
Expand All @@ -271,7 +297,16 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
int actualFileCount = getFileCount(indexPath);
Expand Down Expand Up @@ -604,8 +639,10 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
indexBulk(INDEX_NAME, 50);
flushAndRefresh(INDEX_NAME);

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
// 3. Delete data from remote segment store
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
.buildAsString();
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(segmentDataPath)) {
Expand Down Expand Up @@ -844,7 +881,16 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA).buildAsString();
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(translogMetaDataPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -50,7 +51,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {

String indexName = response.getShards()[0].getShardRouting().index().getName();
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
.buildAsString();
Path segmentDataRepoPath = location.resolve(shardPath);
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME).length == 0);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9208")
public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
Expand Down Expand Up @@ -323,13 +324,15 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {

final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
client(),
remoteStoreEnabledIndexName,
remoteStoreRepository.basePath(),
"0",
SEGMENTS,
LOCK_FILES
LOCK_FILES,
segmentsPathFixedPrefix
);
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
String[] lockFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -109,7 +110,8 @@ public TransportCleanupRepositoryAction(
SnapshotsService snapshotsService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteStoreSettings remoteStoreSettings
) {
super(
CleanupRepositoryAction.NAME,
Expand All @@ -122,7 +124,10 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(
() -> repositoriesService,
remoteStoreSettings.getSegmentsPathFixedPrefix()
);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,17 @@
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.compositeindex.CompositeIndexSettings;
import org.opensearch.index.compositeindex.CompositeIndexValidator;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeIndexSettings;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.MapperService.MergeReason;
Expand Down Expand Up @@ -153,6 +156,7 @@
import static org.opensearch.cluster.metadata.Metadata.DEFAULT_REPLICA_COUNT_SETTING;
import static org.opensearch.cluster.metadata.MetadataIndexTemplateService.findContextTemplateName;
import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING;
import static org.opensearch.index.IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
Expand Down Expand Up @@ -1072,6 +1076,7 @@ static Settings aggregateIndexSettings(
validateTranslogRetentionSettings(indexSettings);
validateStoreTypeSettings(indexSettings);
validateRefreshIntervalSettings(request.settings(), clusterSettings);
validateTranslogFlushIntervalSettingsForCompositeIndex(request.settings(), clusterSettings);
validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings);
return indexSettings;
}
Expand Down Expand Up @@ -1740,6 +1745,71 @@ public static void validateTranslogRetentionSettings(Settings indexSettings) {
}
}

/**
* Validates {@code index.translog.flush_threshold_size} is equal or below the {@code indices.composite_index.translog.max_flush_threshold_size}
* for composite indices based on {{@code index.composite_index}}
*
* @param requestSettings settings passed in during index create/update request
* @param clusterSettings cluster setting
*/
public static void validateTranslogFlushIntervalSettingsForCompositeIndex(Settings requestSettings, ClusterSettings clusterSettings) {
if (StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.exists(requestSettings) == false
|| requestSettings.get(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey()) == null) {
return;
}
ByteSizeValue translogFlushSize = INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.get(requestSettings);
ByteSizeValue compositeIndexMaxFlushSize = clusterSettings.get(
CompositeIndexSettings.COMPOSITE_INDEX_MAX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING
);
if (translogFlushSize.compareTo(compositeIndexMaxFlushSize) > 0) {
throw new IllegalArgumentException(
String.format(
Locale.ROOT,
"You can configure '%s' with upto '%s' for composite index",
INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
compositeIndexMaxFlushSize
)
);
}
}

/**
* Validates {@code index.translog.flush_threshold_size} is equal or below the {@code indices.composite_index.translog.max_flush_threshold_size}
* for composite indices based on {{@code index.composite_index}}
* This is used during update index settings flow
*
* @param requestSettings settings passed in during index update request
* @param clusterSettings cluster setting
* @param indexSettings index settings
*/
public static Optional<String> validateTranslogFlushIntervalSettingsForCompositeIndex(
Settings requestSettings,
ClusterSettings clusterSettings,
Settings indexSettings
) {
if (INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.exists(requestSettings) == false
|| requestSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey()) == null
|| StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.exists(indexSettings) == false
|| indexSettings.get(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING.getKey()) == null) {
return Optional.empty();
}
ByteSizeValue translogFlushSize = INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.get(requestSettings);
ByteSizeValue compositeIndexMaxFlushSize = clusterSettings.get(
CompositeIndexSettings.COMPOSITE_INDEX_MAX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING
);
if (translogFlushSize.compareTo(compositeIndexMaxFlushSize) > 0) {
return Optional.of(
String.format(
Locale.ROOT,
"You can configure '%s' with upto '%s' for composite index",
INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(),
compositeIndexMaxFlushSize
)
);
}
return Optional.empty();
}

/**
* Validates {@code index.refresh_interval} is equal or below the {@code cluster.minimum.index.refresh_interval}.
*
Expand Down
Loading

0 comments on commit 4387345

Please sign in to comment.