diff --git a/server/src/main/java/org/opensearch/index/shard/GatedDelegateRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/GatedDelegateRefreshListener.java new file mode 100644 index 0000000000000..e67817587380e --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/GatedDelegateRefreshListener.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.lucene.search.ReferenceManager; +import org.opensearch.common.Nullable; +import org.opensearch.common.metrics.MeanMetric; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Delegate listener that delegates the call iff the required permits are obtained. Once the listener is closed, no + * future calls to delgate should be allowed* + */ +public class GatedDelegateRefreshListener implements ReferenceManager.RefreshListener, Closeable { + + static final int TOTAL_PERMITS = Integer.MAX_VALUE; + final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); + final AtomicBoolean closed = new AtomicBoolean(); + private ReferenceManager.RefreshListener delegateListener; + @Nullable + private MeanMetric refreshListenerMetrics; + + /** + * The ctor for gated delegate listener* + * @param delegateListener the delegate listener + * @param refreshListenerMetrics an optional refresh listener metrics + */ + GatedDelegateRefreshListener(ReferenceManager.RefreshListener delegateListener, @Nullable MeanMetric refreshListenerMetrics) { + this.delegateListener = delegateListener; + //TODO instrument metrics for listeners + this.refreshListenerMetrics = refreshListenerMetrics; + } + + @Override + public void beforeRefresh() throws IOException { + assert Thread.holdsLock(this); + try { + if (closed.get() == false) { + if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { + try { + delegateListener.beforeRefresh(); + } catch (IOException e) { + semaphore.release(1); + } + } + } else { + // this should never happen, if it does something is deeply wrong + throw new IllegalStateException("failed to obtain permit but operations are not delayed"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + try { + delegateListener.afterRefresh(didRefresh); + } finally { + semaphore.release(1); + } + } + + @Override + public void close() throws IOException { + try { + if (semaphore.tryAcquire(TOTAL_PERMITS, 30, TimeUnit.SECONDS)) { + boolean result = closed.compareAndSet(false, true); + assert result; + assert semaphore.availablePermits() == 0; + } else { + throw new IllegalStateException("timeout while blocking operations"); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e7720e9343b80..0fb3892eb746f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -44,7 +44,6 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.Term; -import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; @@ -294,6 +293,7 @@ Runnable getGlobalCheckpointSyncer() { private final RecoveryStats recoveryStats = new RecoveryStats(); private final MeanMetric refreshMetric = new MeanMetric(); + private final MeanMetric refreshListenerMetric = new MeanMetric(); private final MeanMetric externalRefreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); private final CounterMetric periodicFlushMetric = new CounterMetric(); @@ -337,6 +337,7 @@ Runnable getGlobalCheckpointSyncer() { private final BiFunction translogFactorySupplier; private final boolean isTimeSeriesIndex; private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final List internalRefreshListeners; public IndexShard( final ShardRouting shardRouting, @@ -458,6 +459,7 @@ public boolean shouldCache(Query query) { ? false : mapperService.documentMapper().mappers().containsTimeStampField(); this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService; + this.internalRefreshListeners = new ArrayList<>(2); } public ThreadPool getThreadPool() { @@ -801,9 +803,17 @@ public void relocated( final Runnable performSegRep ) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; + //Force refreshes pending refresh listeners try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) { + //Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside + //the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost + //of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store. + performSegRep.run(); indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { + //Prevents new refresh listeners to be registered while all permits are acquired forceRefreshes.close(); + //Ensures all in-flight remote store operations drain, before we hand-off. + internalRefreshListeners.stream().filter(i -> i instanceof Closeable).map(i -> (Closeable)i).close(); boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability(); // Since all the index permits are acquired at this point, the translog buffer will not change. @@ -816,7 +826,7 @@ public void relocated( assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; - performSegRep.run(); + /* * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. @@ -3659,21 +3669,21 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro } }; - final List internalRefreshListener = new ArrayList<>(); - internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + internalRefreshListeners.add(new RefreshMetricUpdater(refreshMetric)); if (isRemoteStoreEnabled()) { - internalRefreshListener.add( + internalRefreshListeners.add(new GatedDelegateRefreshListener( new RemoteStoreRefreshListener( this, // Add the checkpoint publisher if the Segment Replciation via remote store is enabled. indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY, remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId()) - ) + ), refreshListenerMetric) ); } if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { - internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); + internalRefreshListeners.add(new GatedDelegateRefreshListener( + new CheckpointRefreshListener(this, this.checkpointPublisher), null)); } /** * With segment replication enabled for primary relocation, recover replica shard initially as read only and @@ -3699,7 +3709,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, refreshPendingLocationListener), - internalRefreshListener, + internalRefreshListeners, indexSort, circuitBreakerService, globalCheckpointSupplier,