Skip to content

Commit

Permalink
Add AdministrativeCommands (#899)
Browse files Browse the repository at this point in the history
Motivation:

While mirroring migration job is implemeted #880, read-only mode is necessary to safely migrate the legacy `mirrors.json` and `credentials.json` the HEAD revision. As the current read-only mode rejects all commands, the migration job is also unable to push commits.

A new command type is required to rejects only users requests but allows administrative cases. I propose to introduce two new command types for the smooth migration job.

Modifications:

- Add `UpdateServerStatusCommand` so as to toggle the read-only mode for the cluster.
- Add `ForcePush` so as to wrap a push `Command` to be executed even in the read-only mode.

Result:

`UpdateServerStatusCommand` and `ForcePush` have been added as new commands to update the cluster status and forcibly push commits in the read-only mode.
  • Loading branch information
ikhoon authored Feb 2, 2024
1 parent 406de29 commit 8f66975
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.common.util.Exceptions;
Expand All @@ -36,6 +39,8 @@
*/
public abstract class AbstractCommandExecutor implements CommandExecutor {

private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutor.class);

@Nullable
private final Consumer<CommandExecutor> onTakeLeadership;
@Nullable
Expand Down Expand Up @@ -69,7 +74,12 @@ protected final boolean isStopping() {

@Override
public final CompletableFuture<Void> start() {
return startStop.start(false).thenRun(() -> started = true);
return startStop.start(false).thenRun(() -> {
started = true;
if (!writable) {
logger.warn("Started a command executor with read-only mode.");
}
});
}

protected abstract void doStart(@Nullable Runnable onTakeLeadership,
Expand Down Expand Up @@ -97,8 +107,11 @@ public final void setWritable(boolean writable) {
@Override
public final <T> CompletableFuture<T> execute(Command<T> command) {
requireNonNull(command, "command");
if (!isWritable()) {
throw new IllegalStateException("running in read-only mode");
if (!isWritable() && !(command instanceof AdministrativeCommand)) {
// Reject all commands except for AdministrativeCommand when the replica is in read-only mode.
// AdministrativeCommand is allowed because it is used to change the read-only mode or migrate
// metadata under maintenance mode.
throw new IllegalStateException("running in read-only mode. command: " + command);
}

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.command;

import javax.annotation.Nullable;

import com.linecorp.centraldogma.common.Author;

abstract class AdministrativeCommand<T> extends RootCommand<T> {
AdministrativeCommand(CommandType commandType, @Nullable Long timestamp,
@Nullable Author author) {
super(commandType, timestamp, author);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.centraldogma.server.command;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -52,6 +53,8 @@
@Type(value = PushAsIsCommand.class, name = "PUSH"),
@Type(value = CreateSessionCommand.class, name = "CREATE_SESSIONS"),
@Type(value = RemoveSessionCommand.class, name = "REMOVE_SESSIONS"),
@Type(value = UpdateServerStatusCommand.class, name = "UPDATE_SERVER_STATUS"),
@Type(value = ForcePushCommand.class, name = "FORCE_PUSH_COMMAND"),
})
public interface Command<T> {

Expand Down Expand Up @@ -355,6 +358,27 @@ static Command<Void> removeSession(String sessionId) {
return new RemoveSessionCommand(null, null, sessionId);
}

/**
* Returns a new {@link Command} which is used to update the status of the server.
*/
static Command<Void> updateServerStatus(boolean writable) {
return new UpdateServerStatusCommand(null, null, writable);
}

/**
* Returns a new {@link Command} which is used to force-push {@link Command} even the server is in
* read-only mode. This command is useful for migrating the repository content during maintenance mode.
*
* <p>Note that {@link CommandType#NORMALIZING_PUSH} and {@link CommandType#PUSH} are allowed as the
* delegate.
*/
static <T> Command<T> forcePush(Command<T> delegate) {
requireNonNull(delegate, "delegate");
checkArgument(delegate.type() == CommandType.NORMALIZING_PUSH || delegate.type() == CommandType.PUSH,
"delegate: %s (expected: NORMALIZING_PUSH or PUSH)", delegate);
return new ForcePushCommand<>(delegate);
}

/**
* Returns the {@link CommandType} of the command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public enum CommandType {
CREATE_SESSION(Void.class),
REMOVE_SESSION(Void.class),
PURGE_PROJECT(Void.class),
PURGE_REPOSITORY(Void.class);
PURGE_REPOSITORY(Void.class),
UPDATE_SERVER_STATUS(Void.class),
// The result type of FORCE_PUSH is Object because it can be any type.
FORCE_PUSH(Object.class);

/**
* The type of an object which is returned as a result after executing the command.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.command;

import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects.ToStringHelper;

/**
* A {@link Command} which is used to force-push {@code delegate} even the server is in read-only mode.
* This command is useful for migrating the repository content during maintenance mode.
*/
public final class ForcePushCommand<T> extends AdministrativeCommand<T> {

private final Command<T> delegate;

@JsonCreator
ForcePushCommand(@JsonProperty("delegate") Command<T> delegate) {
super(CommandType.FORCE_PUSH, requireNonNull(delegate, "delegate").timestamp(), delegate.author());
this.delegate = delegate;
}

/**
* Returns the {@link Command} to be force-pushed.
*/
@JsonProperty("delegate")
public Command<T> delegate() {
return delegate;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ForcePushCommand)) {
return false;
}
final ForcePushCommand<?> that = (ForcePushCommand<?>) o;
return super.equals(that) && delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return super.hashCode() * 31 + delegate.hashCode();
}

@Override
ToStringHelper toStringHelper() {
return super.toStringHelper().add("delegate", delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exceptio
return (CompletableFuture<T>) removeSession((RemoveSessionCommand) command);
}

if (command instanceof UpdateServerStatusCommand) {
return (CompletableFuture<T>) updateServerStatus((UpdateServerStatusCommand) command);
}

if (command instanceof ForcePushCommand) {
//noinspection TailRecursion
return doExecute(((ForcePushCommand<T>) command).delegate());
}

throw new UnsupportedOperationException(command.toString());
}

Expand Down Expand Up @@ -350,4 +359,9 @@ private CompletableFuture<Void> removeSession(RemoveSessionCommand c) {
return null;
});
}

private CompletableFuture<Void> updateServerStatus(UpdateServerStatusCommand c) {
setWritable(c.writable());
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.centraldogma.server.command;

import java.util.Objects;

import javax.annotation.Nullable;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.MoreObjects.ToStringHelper;

import com.linecorp.centraldogma.common.Author;

/**
* A {@link Command} which is used to update the status of all servers in the cluster.
*/
public final class UpdateServerStatusCommand extends AdministrativeCommand<Void> {

private final boolean writable;

@JsonCreator
UpdateServerStatusCommand(@JsonProperty("timestamp") @Nullable Long timestamp,
@JsonProperty("author") @Nullable Author author,
@JsonProperty("writable") boolean writable) {
super(CommandType.UPDATE_SERVER_STATUS, timestamp, author);
this.writable = writable;
}

/**
* Returns whether the cluster is writable.
*/
@JsonProperty("writable")
public boolean writable() {
return writable;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof UpdateServerStatusCommand)) {
return false;
}
final UpdateServerStatusCommand that = (UpdateServerStatusCommand) o;

return super.equals(that) && writable == that.writable;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), writable);
}

@Override
ToStringHelper toStringHelper() {
return super.toStringHelper()
.add("writable", writable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.centraldogma.server.internal.replication;

import static com.linecorp.centraldogma.internal.Util.unsafeCast;
import static java.util.Objects.requireNonNull;

import java.util.Objects;
Expand All @@ -28,8 +29,9 @@
import com.fasterxml.jackson.databind.JsonNode;

import com.linecorp.centraldogma.internal.Jackson;
import com.linecorp.centraldogma.internal.Util;
import com.linecorp.centraldogma.server.command.Command;
import com.linecorp.centraldogma.server.command.CommandType;
import com.linecorp.centraldogma.server.command.ForcePushCommand;
import com.linecorp.centraldogma.server.command.NormalizingPushCommand;

public final class ReplicationLog<T> {
Expand Down Expand Up @@ -58,7 +60,12 @@ private static <T> T deserializeResult(
result = null;
}

final Class<T> resultType = Util.unsafeCast(command.type().resultType());
final Class<T> resultType;
if (command.type() == CommandType.FORCE_PUSH) {
resultType = unsafeCast(((ForcePushCommand<?>) command).delegate().type().resultType());
} else {
resultType = unsafeCast(command.type().resultType());
}
if (resultType == Void.class) {
if (result != null) {
rejectIncompatibleResult(result, Void.class);
Expand All @@ -77,7 +84,12 @@ private static <T> T deserializeResult(
: NormalizingPushCommand.class.getSimpleName() + " cannot be replicated.";
this.command = requireNonNull(command, "command");

final Class<?> resultType = command.type().resultType();
final Class<?> resultType;
if (command.type() == CommandType.FORCE_PUSH) {
resultType = ((ForcePushCommand<?>) command).delegate().type().resultType();
} else {
resultType = command.type().resultType();
}
if (resultType == Void.class) {
if (result != null) {
rejectIncompatibleResult(result, Void.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@
import com.linecorp.centraldogma.server.command.CommandExecutor;
import com.linecorp.centraldogma.server.command.CommandType;
import com.linecorp.centraldogma.server.command.CommitResult;
import com.linecorp.centraldogma.server.command.ForcePushCommand;
import com.linecorp.centraldogma.server.command.NormalizingPushCommand;
import com.linecorp.centraldogma.server.command.RemoveRepositoryCommand;
import com.linecorp.centraldogma.server.command.UpdateServerStatusCommand;
import com.linecorp.centraldogma.server.metadata.MetadataService;
import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
import com.linecorp.centraldogma.server.storage.project.Project;
Expand Down Expand Up @@ -1103,7 +1105,28 @@ private <T> T blockingExecute(Command<T> command) throws Exception {
final CommitResult commitResult = (CommitResult) result;
final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult);
log = new ReplicationLog<>(replicaId(), pushAsIsCommand, commitResult.revision());
} else if (command.type() == CommandType.FORCE_PUSH &&
((ForcePushCommand<?>) command).delegate().type() == CommandType.NORMALIZING_PUSH) {
final NormalizingPushCommand delegated =
(NormalizingPushCommand) ((ForcePushCommand<?>) command).delegate();
final CommitResult commitResult = (CommitResult) result;
final Command<Revision> command0 = Command.forcePush(delegated.asIs(commitResult));
log = new ReplicationLog<>(replicaId(), command0, commitResult.revision());
} else {
if (command.type() == CommandType.UPDATE_SERVER_STATUS) {
final UpdateServerStatusCommand command0 = (UpdateServerStatusCommand) command;
final boolean writable = command0.writable();
final boolean wasWritable = isWritable();
setWritable(writable);
if (writable != wasWritable) {
if (writable) {
logger.warn("Left read-only mode.");
} else {
logger.warn("Entered read-only mode.");
}
}
}

log = new ReplicationLog<>(replicaId(), command, result);
}

Expand Down
Loading

0 comments on commit 8f66975

Please sign in to comment.