diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index aaba74cd54341..c8b244b1e3314 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -23,6 +23,7 @@ import org.opensearch.common.CheckedFunction; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.InternalEngine; @@ -31,6 +32,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.threadpool.Scheduler; @@ -112,6 +114,8 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final FileUploader fileUploader; + private final CancellableThreads cancellableThreads = new CancellableThreads(); + public RemoteStoreRefreshListener( IndexShard indexShard, SegmentReplicationCheckpointPublisher checkpointPublisher, @@ -174,7 +178,18 @@ public void afterRefresh(boolean didRefresh) { || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { updateLocalRefreshTimeAndSeqNo(); try { - indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); + indexShard.getThreadPool() + .executor(ThreadPool.Names.REMOTE_REFRESH) + .submit( + () -> RunUnderPrimaryPermit.run( + () -> syncSegments(false), + "Remote Store Sync", + indexShard, + cancellableThreads, + logger + ) + ) + .get(); } catch (InterruptedException | ExecutionException e) { logger.info("Exception occurred while scheduling syncSegments", e); } @@ -344,7 +359,11 @@ private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { // refresh not occurring until write happens. if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) { scheduledCancellableRetry = indexShard.getThreadPool() - .schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH); + .schedule( + () -> RunUnderPrimaryPermit.run(() -> syncSegments(true), "Remote Store Sync", indexShard, cancellableThreads, logger), + backoffDelayIterator.next(), + ThreadPool.Names.REMOTE_REFRESH + ); } }