Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await for in-flight segments to complete transfer before hand-off #3

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.Nullable;
import org.opensearch.common.metrics.MeanMetric;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Delegate listener that delegates the call iff the required permits are obtained. Once the listener is closed, no
* future calls to delgate should be allowed*
*/
public class GatedDelegateRefreshListener implements ReferenceManager.RefreshListener, Closeable {

static final int TOTAL_PERMITS = Integer.MAX_VALUE;
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
final AtomicBoolean closed = new AtomicBoolean();
private ReferenceManager.RefreshListener delegateListener;
@Nullable
private MeanMetric refreshListenerMetrics;

/**
* The ctor for gated delegate listener*
* @param delegateListener the delegate listener
* @param refreshListenerMetrics an optional refresh listener metrics
*/
GatedDelegateRefreshListener(ReferenceManager.RefreshListener delegateListener, @Nullable MeanMetric refreshListenerMetrics) {
this.delegateListener = delegateListener;
//TODO instrument metrics for listeners
this.refreshListenerMetrics = refreshListenerMetrics;
}

@Override
public void beforeRefresh() throws IOException {
assert Thread.holdsLock(this);
try {
if (closed.get() == false) {
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) {
try {
delegateListener.beforeRefresh();
} catch (IOException e) {
semaphore.release(1);
}
}
} else {
// this should never happen, if it does something is deeply wrong
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
try {
delegateListener.afterRefresh(didRefresh);
} finally {
semaphore.release(1);
}
}

@Override
public void close() throws IOException {
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, 30, TimeUnit.SECONDS)) {
boolean result = closed.compareAndSet(false, true);
assert result;
assert semaphore.availablePermits() == 0;
} else {
throw new IllegalStateException("timeout while blocking operations");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
26 changes: 18 additions & 8 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
Expand Down Expand Up @@ -294,6 +293,7 @@ Runnable getGlobalCheckpointSyncer() {

private final RecoveryStats recoveryStats = new RecoveryStats();
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric refreshListenerMetric = new MeanMetric();
private final MeanMetric externalRefreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final CounterMetric periodicFlushMetric = new CounterMetric();
Expand Down Expand Up @@ -337,6 +337,7 @@ Runnable getGlobalCheckpointSyncer() {
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;
private final List<ReferenceManager.RefreshListener> internalRefreshListeners;

public IndexShard(
final ShardRouting shardRouting,
Expand Down Expand Up @@ -458,6 +459,7 @@ public boolean shouldCache(Query query) {
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
this.internalRefreshListeners = new ArrayList<>(2);
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -801,9 +803,17 @@ public void relocated(
final Runnable performSegRep
) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
//Force refreshes pending refresh listeners
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
//Run a round of segrep while we are waiting for the permits. We need to evaluate if this needs to be put inside
//the permit to synchronise segments. However since indexing operations are stalled, we need to justify the cost
//of blocking segrep round, which for remote store enabled nodes will require operations to drain on the remote store.
performSegRep.run();
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
//Prevents new refresh listeners to be registered while all permits are acquired
forceRefreshes.close();
//Ensures all in-flight remote store operations drain, before we hand-off.
internalRefreshListeners.stream().filter(i -> i instanceof Closeable).map(i -> (Closeable)i).close();

boolean syncTranslog = isRemoteTranslogEnabled() && Durability.ASYNC == indexSettings.getTranslogDurability();
// Since all the index permits are acquired at this point, the translog buffer will not change.
Expand All @@ -816,7 +826,7 @@ public void relocated(
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";

performSegRep.run();

/*
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
* network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations.
Expand Down Expand Up @@ -3659,21 +3669,21 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
internalRefreshListeners.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new RemoteStoreRefreshListener(
this,
// Add the checkpoint publisher if the Segment Replciation via remote store is enabled.
indexSettings.isSegRepWithRemoteEnabled() ? this.checkpointPublisher : SegmentReplicationCheckpointPublisher.EMPTY,
remoteRefreshSegmentPressureService.getRemoteRefreshSegmentTracker(shardId())
)
), refreshListenerMetric)
);
}

if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
internalRefreshListeners.add(new GatedDelegateRefreshListener(
new CheckpointRefreshListener(this, this.checkpointPublisher), null));
}
/**
* With segment replication enabled for primary relocation, recover replica shard initially as read only and
Expand All @@ -3699,7 +3709,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
internalRefreshListener,
internalRefreshListeners,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down
Loading