diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 9cffc7051d756..69d1996a61510 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -53,7 +53,7 @@ import java.util.Set; import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java b/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java index 828db5864d28b..4db1656c1d799 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java @@ -156,7 +156,7 @@ public Releasable startElectionScheduler(TimeValue gracePeriod, Runnable schedul } @SuppressForbidden(reason = "Argument to Math.abs() is definitely not Long.MIN_VALUE") - private static long nonNegative(long n) { + public static long nonNegative(long n) { return n == Long.MIN_VALUE ? 0 : Math.abs(n); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java b/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java index 168ae5212888f..c65184faa0be5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java @@ -53,7 +53,7 @@ import java.util.Locale; import java.util.Objects; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; /** * Tool to run an unsafe bootstrap diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 220093b428989..530a6764d32ee 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -27,6 +27,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.DefaultRandomObject; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; @@ -45,10 +47,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.toPositiveLongAtMost; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; /** @@ -68,11 +72,14 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; private final String clusterName; + private RemoteClusterStateSettings remoteClusterStateSettings; + private Random random; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, ThreadPool threadpool, String clusterName ) { @@ -82,6 +89,8 @@ public InternalRemoteRoutingTableService( this.threadPool = threadpool; this.clusterName = clusterName; this.clusterSettings = clusterSettings; + this.remoteClusterStateSettings = remoteClusterStateSettings; + this.random = DefaultRandomObject.INSTANCE; } public List getIndicesRouting(RoutingTable routingTable) { @@ -193,7 +202,10 @@ public void getAsyncIndexRoutingReadAction( RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); - remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); + long maxDelayInMillis = remoteClusterStateSettings.getRemoteStateReadMaxJitter().getMillis(); + final long delayInMillis = toPositiveLongAtMost(random.nextLong(), maxDelayInMillis); + remoteIndexRoutingTableStore.readAsyncWithDelay(delayInMillis, remoteIndexRoutingTable, actionListener); + } @Override @@ -208,7 +220,10 @@ public void getAsyncIndexRoutingTableDiffReadAction( ); RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor); - remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener); + + long maxDelayInMillis = remoteClusterStateSettings.getRemoteStateReadMaxJitter().getMillis(); + final long delayInMillis = toPositiveLongAtMost(random.nextLong(), maxDelayInMillis); + remoteRoutingTableDiffStore.readAsyncWithDelay(delayInMillis, remoteRoutingTableDiff, actionListener); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index 56dfa03215a64..8c6f69ccbcb04 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.ThreadPool; @@ -34,11 +35,19 @@ public static RemoteRoutingTableService getService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, ThreadPool threadPool, String clusterName ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName); + return new InternalRemoteRoutingTableService( + repositoriesService, + settings, + clusterSettings, + remoteClusterStateSettings, + threadPool, + clusterName + ); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java index 8e2de1580a49f..457c63ea32ef8 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -8,12 +8,19 @@ package org.opensearch.common.remote; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.DefaultRandomObject; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.model.RemoteReadResult; import java.util.HashMap; import java.util.Map; +import java.util.Random; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.toPositiveLongAtMost; /** * An abstract class that provides a base implementation for managing remote entities in the remote store. @@ -24,6 +31,13 @@ public abstract class AbstractRemoteWritableEntityManager implements RemoteWrita */ protected final Map remoteWritableEntityStores = new HashMap<>(); + protected RemoteClusterStateSettings remoteClusterStateSettings = new RemoteClusterStateSettings( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + protected final Random random = DefaultRandomObject.INSTANCE; + /** * Retrieves the remote writable entity store for the given entity. * @@ -79,6 +93,9 @@ public void writeAsync( @Override public void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener listener) { - getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener)); + long maxDelayInMillis = remoteClusterStateSettings.getRemoteStateReadMaxJitter().getMillis(); + final long delayInMillis = toPositiveLongAtMost(random.nextLong(), maxDelayInMillis); + getStore(entity).readAsyncWithDelay(delayInMillis, entity, getWrappedReadListener(component, entity, listener)); } + } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java index 385c6f20ba58d..2b1edfbf4b987 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java @@ -27,4 +27,6 @@ public interface RemoteWritableEntityStore public T read(U entity) throws IOException; public void readAsync(U entity, ActionListener listener); + + public void readAsyncWithDelay(long delayInMillis, U entity, ActionListener listener); } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java index b5e074874dd38..3da718dbf958c 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java @@ -10,6 +10,7 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -32,8 +33,10 @@ public class RemoteWriteableEntityBlobStore listener) { }); } + public void readAsyncWithDelay(final long delayMillis, final U entity, final ActionListener listener) { + Runnable runnable = () -> { + try { + listener.onResponse(read(entity)); + } catch (Exception e) { + listener.onFailure(e); + } + }; + threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), executor, runnable); + } + public String getClusterName() { return clusterName; } @@ -121,5 +137,4 @@ public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity obj) { private static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } - } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 09832e2b41b6d..61d7fd94a6449 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -107,7 +107,6 @@ import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemoteIndexMetadataManager; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.http.HttpTransportSettings; @@ -187,6 +186,12 @@ import java.util.Set; import java.util.function.Predicate; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_PUBLICATION_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_STATE_READ_MAX_JITTER; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_STATE_READ_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING; @@ -733,13 +738,13 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, - RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, - RemoteClusterStateService.REMOTE_PUBLICATION_SETTING, + REMOTE_CLUSTER_STATE_ENABLED_SETTING, + REMOTE_PUBLICATION_SETTING, INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, - RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING, - RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX, + REMOTE_STATE_READ_TIMEOUT_SETTING, + CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX, RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING, RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, @@ -748,7 +753,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, - RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, + REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, + REMOTE_STATE_READ_MAX_JITTER, RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX, // Admission Control Settings diff --git a/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java b/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java new file mode 100644 index 0000000000000..8e8c72b949cd4 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.Randomness; + +import java.util.Random; + +public class DefaultRandomObject { + public static final Random INSTANCE = new Random(Randomness.get().nextLong()); +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 877e2585cb1eb..7f49f5e1006d0 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -39,12 +39,14 @@ public class RemoteClusterStateAttributesManager extends AbstractRemoteWritableE public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; RemoteClusterStateAttributesManager( + RemoteClusterStateSettings remoteClusterStateSettings, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, NamedWriteableRegistry namedWriteableRegistry, ThreadPool threadpool ) { + this.remoteClusterStateSettings = remoteClusterStateSettings; this.remoteWritableEntityStores.put( RemoteDiscoveryNodes.DISCOVERY_NODES, new RemoteWriteableEntityBlobStore<>( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ece29180f9cf5..68498fa21c055 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -37,8 +37,6 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; @@ -47,6 +45,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateSettings.RemoteClusterStateValidationMode; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; @@ -71,7 +70,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -123,91 +121,6 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - /** - * Gates the functionality of remote publication. - */ - public static final String REMOTE_PUBLICATION_SETTING_KEY = "cluster.remote_store.publication.enabled"; - - public static final Setting REMOTE_PUBLICATION_SETTING = Setting.boolSetting( - REMOTE_PUBLICATION_SETTING_KEY, - false, - Property.NodeScope, - Property.Final - ); - - /** - * Used to specify if cluster state metadata should be published to remote store - */ - public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( - "cluster.remote_store.state.enabled", - false, - Property.NodeScope, - Property.Final - ); - - public static final TimeValue REMOTE_STATE_READ_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); - - public static final Setting REMOTE_STATE_READ_TIMEOUT_SETTING = Setting.timeSetting( - "cluster.remote_store.state.read_timeout", - REMOTE_STATE_READ_TIMEOUT_DEFAULT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( - "cluster.remote_store.state.checksum_validation.mode", - RemoteClusterStateValidationMode.NONE.name(), - RemoteClusterStateValidationMode::parseString, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Controls the fixed prefix for the cluster state path on remote store. - */ - public static final Setting CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString( - "cluster.remote_store.state.path.prefix", - "", - Property.NodeScope, - Property.Final - ); - - /** - * Validation mode for cluster state checksum. - * None: Validation will be disabled. - * Debug: Validation enabled but only matches checksum and logs failing entities. - * Trace: Matches checksum and downloads full cluster state to find diff in failing entities. Only logs failures. - * Failure: Throws exception on failing validation. - */ - public enum RemoteClusterStateValidationMode { - DEBUG("debug"), - TRACE("trace"), - FAILURE("failure"), - NONE("none"); - - public final String mode; - - RemoteClusterStateValidationMode(String mode) { - this.mode = mode; - } - - public static RemoteClusterStateValidationMode parseString(String mode) { - try { - return RemoteClusterStateValidationMode.valueOf(mode.toUpperCase(Locale.ROOT)); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - "[" - + mode - + "] mode is not supported. " - + "supported modes are [" - + Arrays.toString(RemoteClusterStateValidationMode.values()) - + "]" - ); - } - } - } - - private TimeValue remoteStateReadTimeout; private final String nodeId; private final Supplier repositoriesService; private final Settings settings; @@ -218,7 +131,6 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private BlobStoreTransferService blobStoreTransferService; private RemoteRoutingTableService remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; - private RemoteClusterStateValidationMode remoteClusterStateValidationMode; private final RemotePersistenceStats remoteStateStats; private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; @@ -233,7 +145,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; private final boolean isPublicationEnabled; - private final String remotePathPrefix; + private RemoteClusterStateSettings remoteClusterStateSettings; private final RemoteClusterStateCache remoteClusterStateCache; // ToXContent Params with gateway mode. @@ -265,22 +177,18 @@ public RemoteClusterStateService( clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); - this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); - clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); - this.remoteClusterStateValidationMode = REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, this::setChecksumValidationMode); - this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; - this.isPublicationEnabled = REMOTE_PUBLICATION_SETTING.get(settings) + this.remoteClusterStateSettings = new RemoteClusterStateSettings(settings, clusterSettings); + this.isPublicationEnabled = remoteClusterStateSettings.getRemotePublicationSetting() && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); - this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, clusterSettings, + remoteClusterStateSettings, threadpool, ClusterName.CLUSTER_NAME_SETTING.get(settings).value() ); @@ -332,7 +240,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode() + .equals(RemoteClusterStateSettings.RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, false, codecVersion ); @@ -539,7 +448,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState) + : null, false, previousManifest.getCodecVersion() ); @@ -764,7 +675,7 @@ UploadedMetadataResults writeMetadataInParallel( blobStoreRepository.getNamedXContentRegistry(), remoteIndexMetadataManager.getPathTypeSetting(), remoteIndexMetadataManager.getPathHashAlgoSetting(), - remotePathPrefix + remoteClusterStateSettings.getRemotePathPrefix() ), listener ); @@ -1010,7 +921,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState) + : null, true, previousManifest.getCodecVersion() ); @@ -1065,9 +978,9 @@ public void start() { blobStoreRepository = (BlobStoreRepository) repository; String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); - remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, + remoteClusterStateSettings, clusterName, blobStoreRepository, blobStoreTransferService, @@ -1076,6 +989,7 @@ public void start() { ); remoteIndexMetadataManager = new RemoteIndexMetadataManager( clusterSettings, + remoteClusterStateSettings, clusterName, blobStoreRepository, blobStoreTransferService, @@ -1090,6 +1004,7 @@ public void start() { threadpool ); remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager( + remoteClusterStateSettings, clusterName, blobStoreRepository, blobStoreTransferService, @@ -1105,10 +1020,6 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } - private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteClusterStateValidationMode) { - this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; - } - // Package private for unit test RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; @@ -1355,9 +1266,10 @@ ClusterState readClusterStateInParallel( } try { - if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { + if (latch.await(remoteClusterStateSettings.getRemoteStateReadTimeout().getMillis(), TimeUnit.MILLISECONDS) == false) { RemoteStateTransferException exception = new RemoteStateTransferException( - "Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout + "Timed out waiting to read cluster state from remote within timeout " + + remoteClusterStateSettings.getRemoteStateReadTimeout().getMillis() ); exceptionList.forEach(exception::addSuppressed); throw exception; @@ -1484,7 +1396,7 @@ public ClusterState getClusterStateForManifest( ); if (includeEphemeral - && !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + && !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) && manifest.getClusterStateChecksum() != null) { validateClusterStateFromChecksum(manifest, clusterState, clusterName, localNodeId, true); } @@ -1516,10 +1428,12 @@ public ClusterState getClusterStateForManifest( final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateFullDownloadSucceeded(); remoteStateStats.stateFullDownloadTook(durationMillis); + if (includeEphemeral) { // cache only if the entire cluster-state is present remoteClusterStateCache.putState(clusterState); } + return clusterState; } @@ -1611,7 +1525,8 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C .metadata(metadataBuilder) .routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)) .build(); - if (!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) && manifest.getClusterStateChecksum() != null) { + if (!remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + && manifest.getClusterStateChecksum() != null) { validateClusterStateFromChecksum(manifest, clusterState, previousState.getClusterName().value(), localNodeId, false); } final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); @@ -1650,13 +1565,14 @@ void validateClusterStateFromChecksum( remoteStateStats.stateDiffDownloadValidationFailed(); } - if (isFullStateDownload && remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE)) { + if (isFullStateDownload + && remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE)) { throw new IllegalStateException( "Cluster state checksums do not match during full state read. Validation failed for " + failedValidation ); } - if (remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE) - || remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.TRACE)) { + if (remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE) + || remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.TRACE)) { // download full cluster state and match against state created for the failing entities ClusterState fullClusterState = readClusterStateInParallel( ClusterState.builder(new ClusterName(clusterName)).build(), @@ -1790,7 +1706,7 @@ void validateClusterStateFromChecksum( } } } - if (remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE)) { + if (remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE)) { throw new IllegalStateException( "Cluster state checksums do not match during diff read. Validation failed for " + failedValidation ); @@ -1827,10 +1743,6 @@ public boolean isRemotePublicationEnabled() { return this.isPublicationEnabled; } - public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { - this.remoteStateReadTimeout = remoteStateReadTimeout; - } - private BlobStoreTransferService getBlobStoreTransferService() { if (blobStoreTransferService == null) { blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java new file mode 100644 index 0000000000000..a9e75582be6a8 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java @@ -0,0 +1,162 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +import java.util.Arrays; +import java.util.Locale; + +public class RemoteClusterStateSettings { + + /** + * Gates the functionality of remote publication. + */ + public static final String REMOTE_PUBLICATION_SETTING_KEY = "cluster.remote_store.publication.enabled"; + + public static final Setting REMOTE_PUBLICATION_SETTING = Setting.boolSetting( + REMOTE_PUBLICATION_SETTING_KEY, + false, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + /** + * Used to specify if cluster state metadata should be published to remote store + */ + public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.state.enabled", + false, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + public static final TimeValue REMOTE_STATE_READ_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting REMOTE_STATE_READ_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.read_timeout", + REMOTE_STATE_READ_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( + "cluster.remote_store.state.checksum_validation.mode", + RemoteClusterStateValidationMode.NONE.name(), + RemoteClusterStateValidationMode::parseString, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Controls the fixed prefix for the cluster state path on remote store. + */ + public static final Setting CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString( + "cluster.remote_store.state.path.prefix", + "", + Setting.Property.NodeScope, + Setting.Property.Final + ); + + public static final TimeValue REMOTE_STATE_READ_MAX_JITTER_DEFAULT = TimeValue.timeValueMillis(500); + + public static final Setting REMOTE_STATE_READ_MAX_JITTER = Setting.timeSetting( + "cluster.remote_store.state.read.max_jitter", + REMOTE_STATE_READ_MAX_JITTER_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private TimeValue remoteStateReadTimeout; + private RemoteClusterStateValidationMode remoteClusterStateValidationMode; + private final String remotePathPrefix; + private TimeValue remoteStateReadMaxJitter; + private boolean remotePublicationSetting; + + public RemoteClusterStateSettings(Settings settings, ClusterSettings clusterSettings) { + this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); + this.remoteClusterStateValidationMode = REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, this::setChecksumValidationMode); + this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); + this.remoteStateReadMaxJitter = REMOTE_STATE_READ_MAX_JITTER.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_MAX_JITTER, this::setRemoteStateReadMaxJitter); + this.remotePublicationSetting = REMOTE_PUBLICATION_SETTING.get(settings); + } + + public boolean getRemotePublicationSetting() { + return remotePublicationSetting; + } + + public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { + this.remoteStateReadTimeout = remoteStateReadTimeout; + } + + private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteClusterStateValidationMode) { + this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; + } + + private void setRemoteStateReadMaxJitter(TimeValue remoteStateReadMaxJitter) { + this.remoteStateReadMaxJitter = remoteStateReadMaxJitter; + } + + public TimeValue getRemoteStateReadTimeout() { + return remoteStateReadTimeout; + } + + public RemoteClusterStateValidationMode getRemoteClusterStateValidationMode() { + return remoteClusterStateValidationMode; + } + + public String getRemotePathPrefix() { + return remotePathPrefix; + } + + public TimeValue getRemoteStateReadMaxJitter() { + return remoteStateReadMaxJitter; + } + + /** + * Validation mode for cluster state checksum. + * None: Validation will be disabled. + * Debug: Validation enabled but only matches checksum and logs failing entities. + * Trace: Matches checksum and downloads full cluster state to find diff in failing entities. Only logs failures. + * Failure: Throws exception on failing validation. + */ + public enum RemoteClusterStateValidationMode { + DEBUG("debug"), + TRACE("trace"), + FAILURE("failure"), + NONE("none"); + + public final String mode; + + RemoteClusterStateValidationMode(String mode) { + this.mode = mode; + } + + public static RemoteClusterStateValidationMode parseString(String mode) { + try { + return RemoteClusterStateValidationMode.valueOf(mode.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "[" + + mode + + "] mode is not supported. " + + "supported modes are [" + + Arrays.toString(RemoteClusterStateValidationMode.values()) + + "]" + ); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index 74cb838286961..757fc476a5327 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -22,6 +22,7 @@ import java.util.Locale; import java.util.Map; +import static org.opensearch.cluster.coordination.ElectionSchedulerFactory.nonNegative; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; /** @@ -73,6 +74,11 @@ static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepositor ); } + public static long toPositiveLongAtMost(long randomNumber, long upperBound) { + assert 0 < upperBound : upperBound; + return nonNegative(randomNumber) % upperBound + 1; + } + /** * Container class to keep metadata of all uploaded attributes */ diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 763a8e3ff4951..7b8ced7852519 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -73,6 +73,7 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan RemoteGlobalMetadataManager( ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, @@ -83,6 +84,7 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan this.compressor = blobStoreRepository.getCompressor(); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.namedWriteableRegistry = namedWriteableRegistry; + this.remoteClusterStateSettings = remoteClusterStateSettings; this.remoteWritableEntityStores.put( RemoteGlobalMetadata.GLOBAL_METADATA, new RemoteWriteableEntityBlobStore<>( diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index d8e8ffc68834d..570bb31cd7de9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -79,6 +79,7 @@ public class RemoteIndexMetadataManager extends AbstractRemoteWritableEntityMana public RemoteIndexMetadataManager( ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, @@ -100,6 +101,7 @@ public RemoteIndexMetadataManager( this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); this.pathType = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING); this.pathHashAlgo = clusterSettings.get(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING); + this.remoteClusterStateSettings = remoteClusterStateSettings; clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout); clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_TYPE_SETTING, this::setPathTypeSetting); clusterSettings.addSettingsUpdateConsumer(REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 55971398634c5..3e60de66cec55 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -28,6 +27,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + /** * This is an abstraction for validating and storing information specific to remote backed storage nodes. * @@ -194,8 +195,7 @@ public static String getRemoteStoreTranslogRepo(Settings settings) { } public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { - return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) - && isRemoteClusterStateAttributePresent(settings); + return REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteClusterStateAttributePresent(settings); } private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {