Skip to content

Commit

Permalink
Fix up gated listener
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed Jul 11, 2023
1 parent 0a14445 commit dbc4f72
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -24,9 +25,9 @@
*/
public class GatedDelegateRefreshListener implements ReferenceManager.RefreshListener, Closeable {

static final int TOTAL_PERMITS = Integer.MAX_VALUE;
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
final AtomicBoolean closed = new AtomicBoolean();
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
private final AtomicBoolean closed = new AtomicBoolean();
private ReferenceManager.RefreshListener delegateListener;
@Nullable
private MeanMetric refreshListenerMetrics;
Expand All @@ -44,47 +45,58 @@ public class GatedDelegateRefreshListener implements ReferenceManager.RefreshLis

@Override
public void beforeRefresh() throws IOException {
handleDelegate(() -> {
try {
delegateListener.beforeRefresh();
} catch (IOException e) {
throw new RuntimeException("Failed to execute before refresh due to ", e);
}
});
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
handleDelegate(() -> {
try {
delegateListener.afterRefresh(didRefresh);
} catch (IOException e) {
throw new RuntimeException("Failed to execute after refresh due to ", e);
}
});
}

private void handleDelegate(Runnable delegate) throws IOException {
assert Thread.holdsLock(this);
try {
if (closed.get() == false) {
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) {
try {
delegateListener.beforeRefresh();
} catch (IOException e) {
delegate.run();
} finally {
semaphore.release(1);
}
}
} else {
// this should never happen, if it does something is deeply wrong
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
throw new TimeoutException("failed to obtain permit but operations are not delayed");
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
try {
delegateListener.afterRefresh(didRefresh);
} finally {
semaphore.release(1);
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException("Failed to handle delegate due to ", e);
}
}

@Override
public void close() throws IOException {
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, 30, TimeUnit.SECONDS)) {
if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) {
boolean result = closed.compareAndSet(false, true);
assert result;
assert semaphore.availablePermits() == 0;
} else {
throw new IllegalStateException("timeout while blocking operations");
throw new TimeoutException("timeout while blocking operations");
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InterruptedException | TimeoutException e) {
throw new RuntimeException("Failed to close the gated listener due to ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -805,10 +805,6 @@ public void relocated(
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
//Force refreshes pending refresh listeners
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
//Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
//the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
//of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
performSegRep.run();
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
//Prevents new refresh listeners to be registered while all permits are acquired
forceRefreshes.close();
Expand All @@ -825,7 +821,10 @@ public void relocated(
// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";

//Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
//the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
//of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
performSegRep.run();

/*
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.InternalEngine;
Expand All @@ -32,7 +31,6 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.RunUnderPrimaryPermit;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -114,8 +112,6 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres

private final FileUploader fileUploader;

private final CancellableThreads cancellableThreads = new CancellableThreads();

public RemoteStoreRefreshListener(
IndexShard indexShard,
SegmentReplicationCheckpointPublisher checkpointPublisher,
Expand Down Expand Up @@ -178,18 +174,7 @@ public void afterRefresh(boolean didRefresh) {
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
updateLocalRefreshTimeAndSeqNo();
try {
indexShard.getThreadPool()
.executor(ThreadPool.Names.REMOTE_REFRESH)
.submit(
() -> RunUnderPrimaryPermit.run(
() -> syncSegments(false),
"Remote Store Sync",
indexShard,
cancellableThreads,
logger
)
)
.get();
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
}
Expand Down Expand Up @@ -359,11 +344,7 @@ private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) {
// refresh not occurring until write happens.
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) {
scheduledCancellableRetry = indexShard.getThreadPool()
.schedule(
() -> RunUnderPrimaryPermit.run(() -> syncSegments(true), "Remote Store Sync", indexShard, cancellableThreads, logger),
backoffDelayIterator.next(),
ThreadPool.Names.REMOTE_REFRESH
);
.schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH);
}
}

Expand Down

0 comments on commit dbc4f72

Please sign in to comment.