Skip to content

Commit

Permalink
Merge pull request #3 from Bukhtawar/ashish-8345
Browse files Browse the repository at this point in the history
Await for in-flight segments to complete transfer before hand-off
  • Loading branch information
Bukhtawar committed Jul 11, 2023
2 parents ac97a43 + 0a14445 commit c12c142
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.Nullable;
import org.opensearch.common.metrics.MeanMetric;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Delegate listener that delegates the call iff the required permits are obtained. Once the listener is closed, no
* future calls to delgate should be allowed*
*/
public class GatedDelegateRefreshListener implements ReferenceManager.RefreshListener, Closeable {

static final int TOTAL_PERMITS = Integer.MAX_VALUE;
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
final AtomicBoolean closed = new AtomicBoolean();
private ReferenceManager.RefreshListener delegateListener;
@Nullable
private MeanMetric refreshListenerMetrics;

/**
* The ctor for gated delegate listener*
* @param delegateListener the delegate listener
* @param refreshListenerMetrics an optional refresh listener metrics
*/
GatedDelegateRefreshListener(ReferenceManager.RefreshListener delegateListener, @Nullable MeanMetric refreshListenerMetrics) {
this.delegateListener = delegateListener;
//TODO instrument metrics for listeners
this.refreshListenerMetrics = refreshListenerMetrics;
}

@Override
public void beforeRefresh() throws IOException {
assert Thread.holdsLock(this);
try {
if (closed.get() == false) {
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) {
try {
delegateListener.beforeRefresh();
} catch (IOException e) {
semaphore.release(1);
}
}
} else {
// this should never happen, if it does something is deeply wrong
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
try {
delegateListener.afterRefresh(didRefresh);
} finally {
semaphore.release(1);
}
}

@Override
public void close() throws IOException {
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, 30, TimeUnit.SECONDS)) {
boolean result = closed.compareAndSet(false, true);
assert result;
assert semaphore.availablePermits() == 0;
} else {
throw new IllegalStateException("timeout while blocking operations");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
26 changes: 18 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -294,6 +293,7 @@ Runnable getGlobalCheckpointSyncer() {

private final RecoveryStats recoveryStats = new RecoveryStats();
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric refreshListenerMetric = new MeanMetric();
private final MeanMetric externalRefreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final CounterMetric periodicFlushMetric = new CounterMetric();
Expand Down Expand Up @@ -337,6 +337,7 @@ Runnable getGlobalCheckpointSyncer() {
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final List<ReferenceManager.RefreshListener> internalRefreshListeners;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -458,6 +459,7 @@ public boolean shouldCache(Query query) {
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.internalRefreshListeners = new ArrayList<>(2);
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -801,9 +803,17 @@ public void relocated(
final Runnable performSegRep
) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
//Force refreshes pending refresh listeners
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
//Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
//the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
//of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
performSegRep.run();
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
//Prevents new refresh listeners to be registered while all permits are acquired
forceRefreshes.close();
//Ensures all in-flight remote store operations drain, before we hand-off.
internalRefreshListeners.stream().filter(i -> i instanceof Closeable).map(i -> (Closeable)i).close();

boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
// Since all the index permits are acquired at this point, the translog buffer will not change.
Expand All @@ -816,7 +826,7 @@ public void relocated(
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";

performSegRep.run();

/*
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
Expand Down Expand Up @@ -3659,21 +3669,21 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
internalRefreshListeners.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
)
), refreshListenerMetric)
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new CheckpointRefreshListener(this, this.checkpointPublisher), null));
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
Expand All @@ -3699,7 +3709,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
internalRefreshListener,
internalRefreshListeners,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down

0 comments on commit c12c142

Please sign in to comment.