Skip to content

Commit

Permalink
[Remote Store] Use Index permits during segments upload
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Jul 10, 2023
1 parent a15f0ed commit ac97a43
Showing 1 changed file with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
Expand All @@ -31,6 +32,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.RunUnderPrimaryPermit;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -112,6 +114,8 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres

private final FileUploader fileUploader;

private final CancellableThreads cancellableThreads = new CancellableThreads();

public RemoteStoreRefreshListener(
IndexShard indexShard,
SegmentReplicationCheckpointPublisher checkpointPublisher,
Expand Down Expand Up @@ -174,7 +178,18 @@ public void afterRefresh(boolean didRefresh) {
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
updateLocalRefreshTimeAndSeqNo();
try {
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
indexShard.getThreadPool()
.executor(ThreadPool.Names.REMOTE_REFRESH)
.submit(
() -> RunUnderPrimaryPermit.run(
() -> syncSegments(false),
"Remote Store Sync",
indexShard,
cancellableThreads,
logger
)
)
.get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
}
Expand Down Expand Up @@ -344,7 +359,11 @@ private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) {
// refresh not occurring until write happens.
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) {
scheduledCancellableRetry = indexShard.getThreadPool()
.schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH);
.schedule(
() -> RunUnderPrimaryPermit.run(() -> syncSegments(true), "Remote Store Sync", indexShard, cancellableThreads, logger),
backoffDelayIterator.next(),
ThreadPool.Names.REMOTE_REFRESH
);
}
}

Expand Down

0 comments on commit ac97a43

Please sign in to comment.