Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No-op replication for primary term validation with NRTSegRep #4127

Closed
wants to merge 8 commits into from

Conversation

ashking94
Copy link
Member

@ashking94 ashking94 commented Aug 4, 2022

Signed-off-by: Ashish Singh [email protected]

Description

As part of implementing #3706, this is the initial commit that does the following -

  • Introduces an abstraction for developing No Op replication (for primary term validation) on top of NRT segment replication.
  • Implements Engine (NRTReplicationNoOpEngine) for No-op replication use case where the calls to replica does not persist any operation onto the replicas. There is in-memory storage, however, of the last seq no seen. This is to handle recovery. The translog manager being used is NoOpTranslogManager that does not perform any operation.
  • Follow things are working -
    • Primary term validation during the indexing/delete/update/bulk calls.
    • Peer recovery of replicas are working fine. Currently, the replica is brought to speed upto the last successful commit on Primary.

Following items have to be handled, should be followed with PRs -

Issues Resolved

[List any issues this PR will resolve]

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Contributor

github-actions bot commented Aug 4, 2022

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Aug 4, 2022

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Aug 4, 2022

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Aug 4, 2022

Gradle Check (Jenkins) Run Completed with:

Comment on lines 3527 to 3528
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please remove all unintended formatting changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack!

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we could add tests to exercise the new code path?
Will review in details once we add tests and resolve conflicts. At this point the concern I have is this change is already hitting the Engine, can we not return pre-emptively shortly after hitting the replica once we have validated the primary term invariant?
Do you think the below change in TransportShardBulkAction might work

@Override
    protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
        ActionListener.completeWith(listener, () -> {
            Translog.Location location = new Translog.Location(0,0,0);
            if (replica.indexSettings().isRemoteStoreEnabled() && replica.indexSettings().isSegRepEnabled()) {
                replica.ensureWriteAllowed(Engine.Operation.Origin.REPLICA);
            } else {
                location = performOnReplica(request, replica);
            }
            return new WriteReplicaResult<>(request, location, null, replica, logger);
        });
    }

@ashking94
Copy link
Member Author

ashking94 commented Aug 5, 2022

Do you think we could add tests to exercise the new code path? Will review in details once we add tests and resolve conflicts. At this point the concern I have is this change is already hitting the Engine, can we not return pre-emptively shortly after hitting the replica once we have validated the primary term invariant? Do you think the below change in TransportShardBulkAction might work

@Override
    protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
        ActionListener.completeWith(listener, () -> {
            Translog.Location location = new Translog.Location(0,0,0);
            if (replica.indexSettings().isRemoteStoreEnabled() && replica.indexSettings().isSegRepEnabled()) {
                replica.ensureWriteAllowed(Engine.Operation.Origin.REPLICA);
            } else {
                location = performOnReplica(request, replica);
            }
            return new WriteReplicaResult<>(request, location, null, replica, logger);
        });
    }

This is something that I have explored. Currently when a shard recovery happens, one of the step involves replaying translog. And for recovery to complete, at the end of replay translog operation, it should return the expected value which is the highest sequence number seen during the replay translog step. When we refactor the recovery code and skip translog replay and directly jump to finalize step, we can probably totally avoid the performOnReplica method. And this is the plan as well (have mentioned in the PR description). We also need to see later what changes would be required for primary-primary recovery, and hence we can make this change in Recovery finally then.

@ashking94
Copy link
Member Author

UTs and ITs would follow soon.

@ashking94
Copy link
Member Author

cc @mch2 @dreamer-89 @sachinpkale

@ashking94 ashking94 force-pushed the 3706-1 branch 2 times, most recently from 7344aaa to b420b3d Compare August 5, 2022 15:42
@github-actions
Copy link
Contributor

github-actions bot commented Aug 5, 2022

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Aug 5, 2022

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Aug 8, 2022

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

github-actions bot commented Aug 8, 2022

Gradle Check (Jenkins) Run Completed with:

@codecov-commenter
Copy link

codecov-commenter commented Aug 8, 2022

Codecov Report

Merging #4127 (5f93b80) into main (5f2e66b) will decrease coverage by 0.13%.
The diff coverage is 75.42%.

@@             Coverage Diff              @@
##               main    #4127      +/-   ##
============================================
- Coverage     70.78%   70.65%   -0.14%     
+ Complexity    57218    57104     -114     
============================================
  Files          4605     4607       +2     
  Lines        274695   274730      +35     
  Branches      40228    40228              
============================================
- Hits         194441   194098     -343     
- Misses        63955    64381     +426     
+ Partials      16299    16251      -48     
Impacted Files Coverage Δ
...main/java/org/opensearch/common/lucene/Lucene.java 66.02% <ø> (-1.28%) ⬇️
...index/codec/PerFieldMappingPostingFormatCodec.java 64.28% <ø> (ø)
...arch/index/engine/NRTReplicationReaderManager.java 86.95% <ø> (ø)
...s/replication/SegmentReplicationSourceHandler.java 87.71% <ø> (-0.81%) ⬇️
...va/org/opensearch/index/engine/EngineTestCase.java 86.46% <66.66%> (+0.52%) ⬆️
...arch/index/engine/NRTReplicationEngineFactory.java 77.77% <71.42%> (-22.23%) ⬇️
...nsearch/index/engine/NRTReplicationNoOpEngine.java 71.42% <71.42%> (ø)
...rch/index/engine/AbstractNRTReplicationEngine.java 73.50% <73.50%> (ø)
.../opensearch/index/engine/NRTReplicationEngine.java 80.95% <75.00%> (+5.75%) ⬆️
...in/java/org/opensearch/index/shard/IndexShard.java 68.92% <80.00%> (-0.40%) ⬇️
... and 516 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

Replica Recovery is Working along with no-replication call for primary term validation

Signed-off-by: Ashish Singh <[email protected]>
This reverts commit ebc57ca71a57ecac72845259b1f50dc2ef61f1a0.

Signed-off-by: Ashish Singh <[email protected]>
@ashking94 ashking94 marked this pull request as ready for review August 16, 2022 08:41
@ashking94 ashking94 requested a review from a team as a code owner August 16, 2022 08:41
@ashking94 ashking94 requested a review from reta as a code owner August 16, 2022 08:41
@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

* @return index result.
*/
@Override
public IndexResult index(Index index) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these methods (index, delete and noOp) are delegating to super, do we need to override them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we are delegating to parent. In future, the plan is to throw Exceptions from these 3 methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we override them when we want to throw the exception? I understand that it does not make any difference in functionality but is creating confusion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why throw exceptions from these? Can we rather add assertions in existing Engine and avoid a new one?


@Override
public long getLastSyncedGlobalCheckpoint() {
return -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why -1?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, when no ops are performed, we return -1. Currently I have kept it to return -1 as this value is generally returned by the translog manager, and here we have no op translog manager hooked with this engine. In case if replica to primary promotion fails, will make appropriate changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add javadoc with the same description?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

/**
* This method tracks the maximum sequence number of the request that has been given for indexing to this replica.
* Currently, the recovery process involves replaying translog operation on the replica by the primary. For recovery
* step to finish, and finalize step to kick in, this method should return expected value. Hence it has been overridden.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would expected value be equal to maxSeqNo always?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, what happens currently (in translog replay from primary during recovery) is that the applyIndexOperation is performed. This in the doc replication world would sync to translog before acking back and hence updating the persisted sequence number in local checkpoint tracker. AFAIK, this is expected. Since, we are freeing translog on replicas, we have to have this interim solution for recovery to work and when we tweak the recovery for no-op replication use case, the method might become unnecessary on replica's engine.

Copy link
Member Author

@ashking94 ashking94 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responded to @sachinpkale's comments.

Comment on lines 3527 to 3528
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
*
* @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack!


@Override
public long getLastSyncedGlobalCheckpoint() {
return -1;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, when no ops are performed, we return -1. Currently I have kept it to return -1 as this value is generally returned by the translog manager, and here we have no op translog manager hooked with this engine. In case if replica to primary promotion fails, will make appropriate changes.

* @return index result.
*/
@Override
public IndexResult index(Index index) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we are delegating to parent. In future, the plan is to throw Exceptions from these 3 methods.

/**
* This method tracks the maximum sequence number of the request that has been given for indexing to this replica.
* Currently, the recovery process involves replaying translog operation on the replica by the primary. For recovery
* step to finish, and finalize step to kick in, this method should return expected value. Hence it has been overridden.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, what happens currently (in translog replay from primary during recovery) is that the applyIndexOperation is performed. This in the doc replication world would sync to translog before acking back and hence updating the persisted sequence number in local checkpoint tracker. AFAIK, this is expected. Since, we are freeing translog on replicas, we have to have this interim solution for recovery to work and when we tweak the recovery for no-op replication use case, the method might become unnecessary on replica's engine.

@Bukhtawar
Copy link
Collaborator

Bukhtawar commented Aug 16, 2022

Do you think we could add tests to exercise the new code path? Will review in details once we add tests and resolve conflicts. At this point the concern I have is this change is already hitting the Engine, can we not return pre-emptively shortly after hitting the replica once we have validated the primary term invariant? Do you think the below change in TransportShardBulkAction might work

@Override
    protected void dispatchedShardOperationOnReplica(BulkShardRequest request, IndexShard replica, ActionListener<ReplicaResult> listener) {
        ActionListener.completeWith(listener, () -> {
            Translog.Location location = new Translog.Location(0,0,0);
            if (replica.indexSettings().isRemoteStoreEnabled() && replica.indexSettings().isSegRepEnabled()) {
                replica.ensureWriteAllowed(Engine.Operation.Origin.REPLICA);
            } else {
                location = performOnReplica(request, replica);
            }
            return new WriteReplicaResult<>(request, location, null, replica, logger);
        });
    }

This is something that I have explored. Currently when a shard recovery happens, one of the step involves replaying translog. And for recovery to complete, at the end of replay translog operation, it should return the expected value which is the highest sequence number seen during the replay translog step. When we refactor the recovery code and skip translog replay and directly jump to finalize step, we can probably totally avoid the performOnReplica method. And this is the plan as well (have mentioned in the PR description). We also need to see later what changes would be required for primary-primary recovery, and hence we can make this change in Recovery finally then.

So if I understand this correct we can totally avoid performOnReplica method then we don't really need a NRTReplicationNoOpEngine as call to any engine will be short-circuited and new engine changes would be effectively dead-code.
I would prefer avoiding any engine changes and rather use assertions in the existing engine to ensure there are no calls made to the engine if the mode if replica and remote translogs are enabled.
"Best code is no code" :)

The way I would approach this is starting from refactoring the recovery code to see if the eventual state can get rid of performOnReplica and work backwards.
If that's something that is not possible I would consider using a gating mechanism like a feature flag or even a feature branch to avoid breaking existing feature sets and develop NoOp replication in isolation till we can integrate the eventual solution incrementally into mainline rather than building abstraction that would eventually be dead code

return noOpResult;
}

protected abstract TranslogManager createTranslogManager(String translogUUID, SetOnce<TranslogManager> translogManager)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delegate the creation of the TranslogManager to a factory and inject that rather than these 3 NRT engine types?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, had to create an abstract engine so that the seg rep can still work for the newer engine. And since there are some other engine methods that had to be overridden, there came the need to create a newer engine altogether.
Around the creation of TranslogManager, this should be doable.

@ashking94
Copy link
Member Author

Revisiting the recovery code to allow for the sync replication call to return before reaching engine. cc @Bukhtawar @mch2

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@github-actions
Copy link
Contributor

Gradle Check (Jenkins) Run Completed with:

@ashking94 ashking94 closed this Jan 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants