Skip to content

Commit

Permalink
Introduce TranslogFactory for Local/Remote Translog support (opensear…
Browse files Browse the repository at this point in the history
…ch-project#4172)

* Introduce TranslogFactory for Local/Remote Translog support

Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar authored and gbbafna committed Jan 9, 2023
1 parent 6eca25f commit 111fc82
Show file tree
Hide file tree
Showing 17 changed files with 164 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogStats;
Expand Down Expand Up @@ -675,6 +676,7 @@ public static final IndexShard newIndexShard(
() -> {},
RetentionLeaseSyncer.EMPTY,
cbs,
new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null
);
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.indices.cluster.IndicesClusterStateService;
Expand Down Expand Up @@ -547,7 +548,9 @@ public synchronized IndexShard createShard(
() -> globalCheckpointSyncer.accept(shardId),
retentionLeaseSyncer,
circuitBreakerService,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
// TODO Replace with remote translog factory in the follow up PR
this.indexSettings.isRemoteTranslogStoreEnabled() ? null : new InternalTranslogFactory(),
this.indexSettings.isSegRepEnabled() && routing.primary() ? checkpointPublisher : null,
remoteStore
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
Expand Down
19 changes: 17 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/EngineConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -150,6 +152,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final TranslogConfig translogConfig;

private final TranslogFactory translogFactory;

public EngineConfig(
ShardId shardId,
ThreadPool threadPool,
Expand Down Expand Up @@ -253,7 +257,8 @@ public EngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
false,
new InternalTranslogFactory()
);
}

Expand Down Expand Up @@ -284,7 +289,8 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
Expand Down Expand Up @@ -328,6 +334,7 @@ public EngineConfig(
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
this.translogFactory = translogFactory;
}

/**
Expand Down Expand Up @@ -532,6 +539,14 @@ public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}

/**
* Returns the underlying translog factory
* @return the translog factory
*/
public TranslogFactory getTranslogFactory() {
return translogFactory;
}

/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.indices.breaker.CircuitBreakerService;
import org.opensearch.plugins.EnginePlugin;
import org.opensearch.plugins.PluginsService;
Expand Down Expand Up @@ -147,7 +148,8 @@ public EngineConfig newEngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
boolean isReadOnlyReplica,
TranslogFactory translogFactory
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
Expand Down Expand Up @@ -178,7 +180,8 @@ public EngineConfig newEngineConfig(
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
isReadOnlyReplica
isReadOnlyReplica,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ public void onFailure(String reason, Exception ex) {
() -> getLocalCheckpointTracker(),
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen
this::ensureOpen,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
this.softDeletesPolicy = newSoftDeletesPolicy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public void onAfterTranslogSync() {
}
}
},
this
this,
engineConfig.getTranslogFactory()
);
this.translogManager = translogManagerRef;
} catch (IOException e) {
Expand Down
17 changes: 9 additions & 8 deletions server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,15 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
final TranslogDeletionPolicy translogDeletionPolicy = new DefaultTranslogDeletionPolicy(-1, -1, 0);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(
translogConfig,
translogUuid,
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
)
Translog translog = engineConfig.getTranslogFactory()
.newTranslog(
translogConfig,
translogUuid,
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
)
) {
translog.trimUnreferencedReaders();
// refresh the translog stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,15 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (
Translog translog = new Translog(
translogConfig,
translogUuid,
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
)
Translog translog = config.getTranslogFactory()
.newTranslog(
translogConfig,
translogUuid,
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
)
) {
return translog.stats();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@
import org.opensearch.index.store.StoreStats;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogFactory;
import org.opensearch.index.translog.TranslogRecoveryRunner;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.warmer.ShardIndexWarmerService;
import org.opensearch.index.warmer.WarmerStats;
Expand Down Expand Up @@ -320,6 +322,7 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;

private final Store remoteStore;
private final TranslogFactory translogFactory;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -342,6 +345,7 @@ public IndexShard(
final Runnable globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final CircuitBreakerService circuitBreakerService,
final TranslogFactory translogFactory,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
) throws IOException {
Expand Down Expand Up @@ -428,6 +432,7 @@ public boolean shouldCache(Query query) {
this.refreshPendingLocationListener = new RefreshPendingLocationListener();
this.checkpointPublisher = checkpointPublisher;
this.remoteStore = remoteStore;
this.translogFactory = translogFactory;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3359,7 +3364,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier(),
indexSettings.isSegRepEnabled() && shardRouting.primary() == false
indexSettings.isSegRepEnabled() && shardRouting.primary() == false,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

/**
* Translog Factory for the local on-disk {@link Translog}
*
* @opensearch.internal
*/
public class InternalTranslogFactory implements TranslogFactory {

@Override
public Translog newTranslog(
TranslogConfig translogConfig,
String translogUUID,
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier,
LongConsumer persistedSequenceNumberConsumer
) throws IOException {

return new Translog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
primaryTermSupplier,
persistedSequenceNumberConsumer
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public InternalTranslogManager(
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifeCycleAware
LifecycleAware engineLifeCycleAware,
TranslogFactory translogFactory
) throws IOException {
this.shardId = shardId;
this.readLock = readLock;
Expand All @@ -71,7 +72,7 @@ public InternalTranslogManager(
if (tracker != null) {
tracker.markSeqNoAsPersisted(seqNo);
}
}, translogUUID);
}, translogUUID, translogFactory);
assert translog.getGeneration() != null;
this.translog = translog;
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -337,10 +338,11 @@ protected Translog openTranslog(
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer,
String translogUUID
String translogUUID,
TranslogFactory translogFactory
) throws IOException {

return new Translog(
return translogFactory.newTranslog(
translogConfig,
translogUUID,
translogDeletionPolicy,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.translog;

import java.io.IOException;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;

/**
* Translog Factory to enable creation of various local on-disk
* and remote store flavors of {@link Translog}
*
* @opensearch.internal
*/
@FunctionalInterface
public interface TranslogFactory {

Translog newTranslog(
final TranslogConfig config,
final String translogUUID,
final TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer
) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public WriteOnlyTranslogManager(
Supplier<LocalCheckpointTracker> localCheckpointTrackerSupplier,
String translogUUID,
TranslogEventListener translogEventListener,
LifecycleAware engineLifecycleAware
LifecycleAware engineLifecycleAware,
TranslogFactory translogFactory
) throws IOException {
super(
translogConfig,
Expand All @@ -47,7 +48,8 @@ public WriteOnlyTranslogManager(
localCheckpointTrackerSupplier,
translogUUID,
translogEventListener,
engineLifecycleAware
engineLifecycleAware,
translogFactory
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.codec.CodecServiceFactory;
import org.opensearch.index.seqno.RetentionLeases;
import org.opensearch.index.translog.InternalTranslogFactory;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogDeletionPolicyFactory;
import org.opensearch.index.translog.TranslogReader;
Expand Down Expand Up @@ -66,7 +67,8 @@ public void testCreateEngineConfigFromFactory() {
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null,
false
false,
new InternalTranslogFactory()
);

assertNotNull(config.getCodec());
Expand Down Expand Up @@ -143,7 +145,8 @@ public void testCreateCodecServiceFromFactory() {
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null,
false
false,
new InternalTranslogFactory()
);
assertNotNull(config.getCodec());
}
Expand Down
Loading

0 comments on commit 111fc82

Please sign in to comment.