From 5c32ca54053bcd83eb7c24336740442623d6b574 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 18 Sep 2024 16:56:44 +0530 Subject: [PATCH] Add jitter for remote download calls Signed-off-by: Arpit Bandejiya --- .../UnsafeBootstrapClusterManagerCommand.java | 2 +- .../InternalRemoteRoutingTableService.java | 19 ++- .../RemoteRoutingTableServiceFactory.java | 11 +- .../AbstractRemoteWritableEntityManager.java | 19 ++- .../remote/RemoteWritableEntityStore.java | 2 + .../RemoteWriteableEntityBlobStore.java | 21 ++- .../common/settings/ClusterSettings.java | 15 +- .../gateway/remote/DefaultRandomObject.java | 17 +++ .../remote/RemoteClusterStateService.java | 138 ++++------------- .../remote/RemoteClusterStateSettings.java | 144 ++++++++++++++++++ .../remote/RemoteClusterStateUtils.java | 6 + .../remote/RemoteGlobalMetadataManager.java | 6 + .../remotestore/RemoteStoreNodeAttribute.java | 6 +- 13 files changed, 285 insertions(+), 121 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java create mode 100644 server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java b/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java index 168ae5212888f..c65184faa0be5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/UnsafeBootstrapClusterManagerCommand.java @@ -53,7 +53,7 @@ import java.util.Locale; import java.util.Objects; -import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; /** * Tool to run an unsafe bootstrap diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java index 220093b428989..530a6764d32ee 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java @@ -27,6 +27,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.compress.Compressor; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.DefaultRandomObject; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.gateway.remote.RemoteStateTransferException; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; @@ -45,10 +47,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.toPositiveLongAtMost; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; /** @@ -68,11 +72,14 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen private BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; private final String clusterName; + private RemoteClusterStateSettings remoteClusterStateSettings; + private Random random; public InternalRemoteRoutingTableService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, ThreadPool threadpool, String clusterName ) { @@ -82,6 +89,8 @@ public InternalRemoteRoutingTableService( this.threadPool = threadpool; this.clusterName = clusterName; this.clusterSettings = clusterSettings; + this.remoteClusterStateSettings = remoteClusterStateSettings; + this.random = DefaultRandomObject.INSTANCE; } public List getIndicesRouting(RoutingTable routingTable) { @@ -193,7 +202,10 @@ public void getAsyncIndexRoutingReadAction( RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor); - remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener); + long maxDelayInMillis = remoteClusterStateSettings.getRemoteStateReadMaxJitter().getMillis(); + final long delayInMillis = toPositiveLongAtMost(random.nextLong(), maxDelayInMillis); + remoteIndexRoutingTableStore.readAsyncWithDelay(delayInMillis, remoteIndexRoutingTable, actionListener); + } @Override @@ -208,7 +220,10 @@ public void getAsyncIndexRoutingTableDiffReadAction( ); RemoteRoutingTableDiff remoteRoutingTableDiff = new RemoteRoutingTableDiff(uploadedFilename, clusterUUID, compressor); - remoteRoutingTableDiffStore.readAsync(remoteRoutingTableDiff, actionListener); + + long maxDelayInMillis = remoteClusterStateSettings.getRemoteStateReadMaxJitter().getMillis(); + final long delayInMillis = toPositiveLongAtMost(random.nextLong(), maxDelayInMillis); + remoteRoutingTableDiffStore.readAsyncWithDelay(delayInMillis, remoteRoutingTableDiff, actionListener); } @Override diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java index 56dfa03215a64..8c6f69ccbcb04 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceFactory.java @@ -10,6 +10,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.repositories.RepositoriesService; import org.opensearch.threadpool.ThreadPool; @@ -34,11 +35,19 @@ public static RemoteRoutingTableService getService( Supplier repositoriesService, Settings settings, ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, ThreadPool threadPool, String clusterName ) { if (isRemoteRoutingTableEnabled(settings)) { - return new InternalRemoteRoutingTableService(repositoriesService, settings, clusterSettings, threadPool, clusterName); + return new InternalRemoteRoutingTableService( + repositoriesService, + settings, + clusterSettings, + remoteClusterStateSettings, + threadPool, + clusterName + ); } return new NoopRemoteRoutingTableService(); } diff --git a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java index 8e2de1580a49f..457c63ea32ef8 100644 --- a/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java +++ b/server/src/main/java/org/opensearch/common/remote/AbstractRemoteWritableEntityManager.java @@ -8,12 +8,19 @@ package org.opensearch.common.remote; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.DefaultRandomObject; +import org.opensearch.gateway.remote.RemoteClusterStateSettings; import org.opensearch.gateway.remote.model.RemoteReadResult; import java.util.HashMap; import java.util.Map; +import java.util.Random; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.toPositiveLongAtMost; /** * An abstract class that provides a base implementation for managing remote entities in the remote store. @@ -24,6 +31,13 @@ public abstract class AbstractRemoteWritableEntityManager implements RemoteWrita */ protected final Map remoteWritableEntityStores = new HashMap<>(); + protected RemoteClusterStateSettings remoteClusterStateSettings = new RemoteClusterStateSettings( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + protected final Random random = DefaultRandomObject.INSTANCE; + /** * Retrieves the remote writable entity store for the given entity. * @@ -79,6 +93,9 @@ public void writeAsync( @Override public void readAsync(String component, AbstractClusterMetadataWriteableBlobEntity entity, ActionListener listener) { - getStore(entity).readAsync(entity, getWrappedReadListener(component, entity, listener)); + long maxDelayInMillis = remoteClusterStateSettings.getRemoteStateReadMaxJitter().getMillis(); + final long delayInMillis = toPositiveLongAtMost(random.nextLong(), maxDelayInMillis); + getStore(entity).readAsyncWithDelay(delayInMillis, entity, getWrappedReadListener(component, entity, listener)); } + } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java index 385c6f20ba58d..2b1edfbf4b987 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWritableEntityStore.java @@ -27,4 +27,6 @@ public interface RemoteWritableEntityStore public T read(U entity) throws IOException; public void readAsync(U entity, ActionListener listener); + + public void readAsyncWithDelay(long delayInMillis, U entity, ActionListener listener); } diff --git a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java index b5e074874dd38..0ee7665aa8561 100644 --- a/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java +++ b/server/src/main/java/org/opensearch/common/remote/RemoteWriteableEntityBlobStore.java @@ -8,8 +8,11 @@ package org.opensearch.common.remote; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -29,11 +32,15 @@ */ public class RemoteWriteableEntityBlobStore> implements RemoteWritableEntityStore { + private static final Logger logger = LogManager.getLogger(RemoteWriteableEntityBlobStore.class); + private final BlobStoreTransferService transferService; private final BlobStoreRepository blobStoreRepository; private final String clusterName; + private final String executor; private final ExecutorService executorService; private final String pathToken; + private final ThreadPool threadPool; public RemoteWriteableEntityBlobStore( final BlobStoreTransferService blobStoreTransferService, @@ -46,6 +53,8 @@ public RemoteWriteableEntityBlobStore( this.transferService = blobStoreTransferService; this.blobStoreRepository = blobStoreRepository; this.clusterName = clusterName; + this.threadPool = threadPool; + this.executor = executor; this.executorService = threadPool.executor(executor); this.pathToken = pathToken; } @@ -89,6 +98,17 @@ public void readAsync(final U entity, final ActionListener listener) { }); } + public void readAsyncWithDelay(final long delayMillis, final U entity, final ActionListener listener) { + Runnable runnable = () -> { + try { + listener.onResponse(read(entity)); + } catch (Exception e) { + listener.onFailure(e); + } + }; + threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), executor, runnable); + } + public String getClusterName() { return clusterName; } @@ -121,5 +141,4 @@ public BlobPath getBlobPathForDownload(final RemoteWriteableBlobEntity obj) { private static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); } - } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index c6a3ee6df609b..6587ca643b470 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -107,7 +107,6 @@ import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.gateway.ShardsBatchGatewayAllocator; import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemoteIndexMetadataManager; import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore; import org.opensearch.http.HttpTransportSettings; @@ -186,6 +185,11 @@ import java.util.Set; import java.util.function.Predicate; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_STATE_READ_MAX_JITTER; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_STATE_READ_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING; import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING; @@ -732,12 +736,12 @@ public void apply(Settings value, Settings current, Settings previous) { // Remote cluster state settings RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING, - RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING, + REMOTE_CLUSTER_STATE_ENABLED_SETTING, INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, - RemoteClusterStateService.REMOTE_STATE_READ_TIMEOUT_SETTING, - RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX, + REMOTE_STATE_READ_TIMEOUT_SETTING, + CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX, RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_TYPE_SETTING, RemoteIndexMetadataManager.REMOTE_INDEX_METADATA_PATH_HASH_ALGO_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, @@ -746,7 +750,8 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, - RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, + REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, + REMOTE_STATE_READ_MAX_JITTER, RemoteRoutingTableBlobStore.CLUSTER_REMOTE_STORE_ROUTING_TABLE_PATH_PREFIX, // Admission Control Settings diff --git a/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java b/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java new file mode 100644 index 0000000000000..8e8c72b949cd4 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/DefaultRandomObject.java @@ -0,0 +1,17 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.Randomness; + +import java.util.Random; + +public class DefaultRandomObject { + public static final Random INSTANCE = new Random(Randomness.get().nextLong()); +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index fe34b68702c41..1c5cd9b340e73 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -33,11 +33,10 @@ import org.opensearch.cluster.routing.remote.RemoteRoutingTableServiceFactory; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; +import org.opensearch.common.Randomness; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; @@ -47,6 +46,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateSettings.RemoteClusterStateValidationMode; import org.opensearch.gateway.remote.model.RemoteClusterBlocks; import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms; import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo; @@ -71,7 +71,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -79,6 +78,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -123,79 +123,6 @@ public class RemoteClusterStateService implements Closeable { private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - /** - * Used to specify if cluster state metadata should be published to remote store - */ - public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( - "cluster.remote_store.state.enabled", - false, - Property.NodeScope, - Property.Final - ); - - public static final TimeValue REMOTE_STATE_READ_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); - - public static final Setting REMOTE_STATE_READ_TIMEOUT_SETTING = Setting.timeSetting( - "cluster.remote_store.state.read_timeout", - REMOTE_STATE_READ_TIMEOUT_DEFAULT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( - "cluster.remote_store.state.checksum_validation.mode", - RemoteClusterStateValidationMode.NONE.name(), - RemoteClusterStateValidationMode::parseString, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Controls the fixed prefix for the cluster state path on remote store. - */ - public static final Setting CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString( - "cluster.remote_store.state.path.prefix", - "", - Property.NodeScope, - Property.Final - ); - - /** - * Validation mode for cluster state checksum. - * None: Validation will be disabled. - * Debug: Validation enabled but only matches checksum and logs failing entities. - * Trace: Matches checksum and downloads full cluster state to find diff in failing entities. Only logs failures. - * Failure: Throws exception on failing validation. - */ - public enum RemoteClusterStateValidationMode { - DEBUG("debug"), - TRACE("trace"), - FAILURE("failure"), - NONE("none"); - - public final String mode; - - RemoteClusterStateValidationMode(String mode) { - this.mode = mode; - } - - public static RemoteClusterStateValidationMode parseString(String mode) { - try { - return RemoteClusterStateValidationMode.valueOf(mode.toUpperCase(Locale.ROOT)); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - "[" - + mode - + "] mode is not supported. " - + "supported modes are [" - + Arrays.toString(RemoteClusterStateValidationMode.values()) - + "]" - ); - } - } - } - - private TimeValue remoteStateReadTimeout; private final String nodeId; private final Supplier repositoriesService; private final Settings settings; @@ -206,7 +133,6 @@ public static RemoteClusterStateValidationMode parseString(String mode) { private BlobStoreTransferService blobStoreTransferService; private RemoteRoutingTableService remoteRoutingTableService; private volatile TimeValue slowWriteLoggingThreshold; - private RemoteClusterStateValidationMode remoteClusterStateValidationMode; private final RemotePersistenceStats remoteStateStats; private RemoteClusterStateCleanupManager remoteClusterStateCleanupManager; @@ -221,7 +147,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) { + "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata " + "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]"; private final boolean isPublicationEnabled; - private final String remotePathPrefix; + private RemoteClusterStateSettings remoteClusterStateSettings; // ToXContent Params with gateway mode. // We are using gateway context mode to persist all custom metadata. @@ -252,22 +178,18 @@ public RemoteClusterStateService( clusterSettings = clusterService.getClusterSettings(); this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD); clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold); - this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); - clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); - this.remoteClusterStateValidationMode = REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.get(settings); - clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, this::setChecksumValidationMode); - this.remoteStateStats = new RemotePersistenceStats(); this.namedWriteableRegistry = namedWriteableRegistry; this.indexMetadataUploadListeners = indexMetadataUploadListeners; + this.remoteClusterStateSettings = new RemoteClusterStateSettings(settings, clusterSettings); this.isPublicationEnabled = FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings) && RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings); - this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService( repositoriesService, settings, clusterSettings, + remoteClusterStateSettings, threadpool, ClusterName.CLUSTER_NAME_SETTING.get(settings).value() ); @@ -318,7 +240,8 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat uploadedMetadataResults, previousClusterUUID, clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode() + .equals(RemoteClusterStateSettings.RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, false, codecVersion ); @@ -518,7 +441,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata( uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), clusterStateDiffManifest, - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState) + : null, false, previousManifest.getCodecVersion() ); @@ -743,7 +668,7 @@ UploadedMetadataResults writeMetadataInParallel( blobStoreRepository.getNamedXContentRegistry(), remoteIndexMetadataManager.getPathTypeSetting(), remoteIndexMetadataManager.getPathHashAlgoSetting(), - remotePathPrefix + remoteClusterStateSettings.getRemotePathPrefix() ), listener ); @@ -966,7 +891,9 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(ClusterState clus uploadedMetadataResults, previousManifest.getPreviousClusterUUID(), previousManifest.getDiffManifest(), - !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) ? new ClusterStateChecksum(clusterState) : null, + !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + ? new ClusterStateChecksum(clusterState) + : null, true, previousManifest.getCodecVersion() ); @@ -1012,9 +939,11 @@ public void start() { blobStoreRepository = (BlobStoreRepository) repository; String clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value(); blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); - + Random random = new Random(Randomness.get().nextLong()); remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, + remoteClusterStateSettings, + random, clusterName, blobStoreRepository, blobStoreTransferService, @@ -1052,10 +981,6 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) { this.slowWriteLoggingThreshold = slowWriteLoggingThreshold; } - private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteClusterStateValidationMode) { - this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; - } - // Package private for unit test RemoteRoutingTableService getRemoteRoutingTableService() { return this.remoteRoutingTableService; @@ -1302,9 +1227,10 @@ ClusterState readClusterStateInParallel( } try { - if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { + if (latch.await(remoteClusterStateSettings.getRemoteStateReadTimeout().getMillis(), TimeUnit.MILLISECONDS) == false) { RemoteStateTransferException exception = new RemoteStateTransferException( - "Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout + "Timed out waiting to read cluster state from remote within timeout " + + remoteClusterStateSettings.getRemoteStateReadTimeout().getMillis() ); exceptionList.forEach(exception::addSuppressed); throw exception; @@ -1426,7 +1352,7 @@ public ClusterState getClusterStateForManifest( ); if (includeEphemeral - && !remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) + && !remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) && manifest.getClusterStateChecksum() != null) { validateClusterStateFromChecksum(manifest, clusterState, clusterName, localNodeId, true); } @@ -1458,7 +1384,7 @@ public ClusterState getClusterStateForManifest( final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateFullDownloadSucceeded(); remoteStateStats.stateFullDownloadTook(durationMillis); - + logger.info("The remote state download took [{}]ms", durationMillis); return clusterState; } @@ -1549,13 +1475,14 @@ public ClusterState getClusterStateUsingDiff(ClusterMetadataManifest manifest, C .routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables)) .build(); - if (!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE) && manifest.getClusterStateChecksum() != null) { + if (!remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.NONE) + && manifest.getClusterStateChecksum() != null) { validateClusterStateFromChecksum(manifest, clusterState, previousState.getClusterName().value(), localNodeId, false); } final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); remoteStateStats.stateDiffDownloadSucceeded(); remoteStateStats.stateDiffDownloadTook(durationMillis); - + logger.info("The remote state diff download took [{}]ms", durationMillis); return clusterState; } @@ -1579,13 +1506,14 @@ void validateClusterStateFromChecksum( failedValidation ) ); - if (isFullStateDownload && remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE)) { + if (isFullStateDownload + && remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE)) { throw new IllegalStateException( "Cluster state checksums do not match during full state read. Validation failed for " + failedValidation ); } - if (remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE) - || remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.TRACE)) { + if (remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE) + || remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.TRACE)) { // download full cluster state and match against state created for the failing entities ClusterState fullClusterState = readClusterStateInParallel( ClusterState.builder(new ClusterName(clusterName)).build(), @@ -1719,7 +1647,7 @@ void validateClusterStateFromChecksum( } } } - if (remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.FAILURE)) { + if (remoteClusterStateSettings.getRemoteClusterStateValidationMode().equals(RemoteClusterStateValidationMode.FAILURE)) { throw new IllegalStateException( "Cluster state checksums do not match during diff read. Validation failed for " + failedValidation ); @@ -1752,10 +1680,6 @@ public String getLastKnownUUIDFromRemote(String clusterName) { } } - public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { - this.remoteStateReadTimeout = remoteStateReadTimeout; - } - private BlobStoreTransferService getBlobStoreTransferService() { if (blobStoreTransferService == null) { blobStoreTransferService = new BlobStoreTransferService(getBlobStore(), threadpool); diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java new file mode 100644 index 0000000000000..e7b0c166e23f9 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateSettings.java @@ -0,0 +1,144 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +import java.util.Arrays; +import java.util.Locale; + +public class RemoteClusterStateSettings { + + /** + * Used to specify if cluster state metadata should be published to remote store + */ + public static final Setting REMOTE_CLUSTER_STATE_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.state.enabled", + false, + Setting.Property.NodeScope, + Setting.Property.Final + ); + + public static final TimeValue REMOTE_STATE_READ_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000); + + public static final Setting REMOTE_STATE_READ_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.remote_store.state.read_timeout", + REMOTE_STATE_READ_TIMEOUT_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING = new Setting<>( + "cluster.remote_store.state.checksum_validation.mode", + RemoteClusterStateValidationMode.NONE.name(), + RemoteClusterStateValidationMode::parseString, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Controls the fixed prefix for the cluster state path on remote store. + */ + public static final Setting CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX = Setting.simpleString( + "cluster.remote_store.state.path.prefix", + "", + Setting.Property.NodeScope, + Setting.Property.Final + ); + + public static final TimeValue REMOTE_STATE_READ_MAX_JITTER_DEFAULT = TimeValue.timeValueMillis(500); + + public static final Setting REMOTE_STATE_READ_MAX_JITTER = Setting.timeSetting( + "cluster.remote_store.state.read.max_jitter", + REMOTE_STATE_READ_MAX_JITTER_DEFAULT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private TimeValue remoteStateReadTimeout; + private RemoteClusterStateValidationMode remoteClusterStateValidationMode; + private final String remotePathPrefix; + private TimeValue remoteStateReadMaxJitter; + + public RemoteClusterStateSettings(Settings settings, ClusterSettings clusterSettings) { + this.remoteStateReadTimeout = clusterSettings.get(REMOTE_STATE_READ_TIMEOUT_SETTING); + clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_TIMEOUT_SETTING, this::setRemoteStateReadTimeout); + this.remoteClusterStateValidationMode = REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING, this::setChecksumValidationMode); + this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings); + this.remoteStateReadMaxJitter = REMOTE_STATE_READ_MAX_JITTER.get(settings); + clusterSettings.addSettingsUpdateConsumer(REMOTE_STATE_READ_MAX_JITTER, this::setRemoteStateReadMaxJitter); + } + + public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) { + this.remoteStateReadTimeout = remoteStateReadTimeout; + } + + private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteClusterStateValidationMode) { + this.remoteClusterStateValidationMode = remoteClusterStateValidationMode; + } + + private void setRemoteStateReadMaxJitter(TimeValue remoteStateReadMaxJitter) { + this.remoteStateReadMaxJitter = remoteStateReadMaxJitter; + } + + public TimeValue getRemoteStateReadTimeout() { + return remoteStateReadTimeout; + } + + public RemoteClusterStateValidationMode getRemoteClusterStateValidationMode() { + return remoteClusterStateValidationMode; + } + + public String getRemotePathPrefix() { + return remotePathPrefix; + } + + public TimeValue getRemoteStateReadMaxJitter() { + return remoteStateReadMaxJitter; + } + + /** + * Validation mode for cluster state checksum. + * None: Validation will be disabled. + * Debug: Validation enabled but only matches checksum and logs failing entities. + * Trace: Matches checksum and downloads full cluster state to find diff in failing entities. Only logs failures. + * Failure: Throws exception on failing validation. + */ + public enum RemoteClusterStateValidationMode { + DEBUG("debug"), + TRACE("trace"), + FAILURE("failure"), + NONE("none"); + + public final String mode; + + RemoteClusterStateValidationMode(String mode) { + this.mode = mode; + } + + public static RemoteClusterStateValidationMode parseString(String mode) { + try { + return RemoteClusterStateValidationMode.valueOf(mode.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "[" + + mode + + "] mode is not supported. " + + "supported modes are [" + + Arrays.toString(RemoteClusterStateValidationMode.values()) + + "]" + ); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java index 74cb838286961..757fc476a5327 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateUtils.java @@ -22,6 +22,7 @@ import java.util.Locale; import java.util.Map; +import static org.opensearch.cluster.coordination.ElectionSchedulerFactory.nonNegative; import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V1; /** @@ -73,6 +74,11 @@ static BlobContainer clusterUUIDContainer(BlobStoreRepository blobStoreRepositor ); } + public static long toPositiveLongAtMost(long randomNumber, long upperBound) { + assert 0 < upperBound : upperBound; + return nonNegative(randomNumber) % upperBound + 1; + } + /** * Container class to keep metadata of all uploaded attributes */ diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 763a8e3ff4951..693775e4300b9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -44,6 +44,7 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.stream.Collectors; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -70,9 +71,12 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan private final Compressor compressor; private final NamedXContentRegistry namedXContentRegistry; private final NamedWriteableRegistry namedWriteableRegistry; + private Random random; RemoteGlobalMetadataManager( ClusterSettings clusterSettings, + RemoteClusterStateSettings remoteClusterStateSettings, + Random random, String clusterName, BlobStoreRepository blobStoreRepository, BlobStoreTransferService blobStoreTransferService, @@ -83,6 +87,8 @@ public class RemoteGlobalMetadataManager extends AbstractRemoteWritableEntityMan this.compressor = blobStoreRepository.getCompressor(); this.namedXContentRegistry = blobStoreRepository.getNamedXContentRegistry(); this.namedWriteableRegistry = namedWriteableRegistry; + this.remoteClusterStateSettings = remoteClusterStateSettings; + this.random = random; this.remoteWritableEntityStores.put( RemoteGlobalMetadata.GLOBAL_METADATA, new RemoteWriteableEntityBlobStore<>( diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index 55971398634c5..3e60de66cec55 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -13,7 +13,6 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Settings; -import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.Node; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -28,6 +27,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.gateway.remote.RemoteClusterStateSettings.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + /** * This is an abstraction for validating and storing information specific to remote backed storage nodes. * @@ -194,8 +195,7 @@ public static String getRemoteStoreTranslogRepo(Settings settings) { } public static boolean isRemoteStoreClusterStateEnabled(Settings settings) { - return RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) - && isRemoteClusterStateAttributePresent(settings); + return REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) && isRemoteClusterStateAttributePresent(settings); } private static boolean isRemoteRoutingTableAttributePresent(Settings settings) {