Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-16809] Optimize the AbstractHAServer implementation #16810

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
package org.apache.dolphinscheduler.alert;

import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.AlertHAServer;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -44,6 +49,18 @@
@SpringBootApplication
public class AlertServer {

@Autowired
private AlertRpcServer alertRpcServer;

@Autowired
private AlertPluginManager alertPluginManager;

@Autowired
private AlertRegistryClient alertRegistryClient;

@Autowired
private AlertHAServer alertHAServer;

@Autowired
private AlertBootstrapService alertBootstrapService;

Expand All @@ -58,7 +75,25 @@ public static void main(String[] args) {
public void run() {
ServerLifeCycleManager.toRunning();
log.info("AlertServer is staring ...");
alertBootstrapService.start();
alertPluginManager.start();
alertRpcServer.start();
alertRegistryClient.start();

alertHAServer.addServerStatusChangeListener(new AbstractServerStatusChangeListener() {

@Override
public void changeToActive() {
alertBootstrapService.start();
}

@Override
public void changeToStandBy() {
close();
}
});

alertHAServer.start();

log.info("AlertServer is started ...");
}

Expand All @@ -73,7 +108,12 @@ public void close() {
return;
}
log.info("AlertServer is stopping, cause: {}", cause);
alertBootstrapService.close();
try (
final AlertRpcServer ignore = alertRpcServer;
final AlertRegistryClient ignore1 = alertRegistryClient;
final AlertHAServer ignore2 = alertHAServer;
final AlertBootstrapService ignore3 = alertBootstrapService;) {
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
log.info("AlertServer stopped, cause: {}", cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.alert.service;

import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;

Expand All @@ -32,39 +31,21 @@
@Service
public final class AlertBootstrapService implements AutoCloseable {

private final AlertRpcServer alertRpcServer;

private final AlertRegistryClient alertRegistryClient;

private final AlertPluginManager alertPluginManager;

private final AlertHAServer alertHAServer;

private final AlertEventFetcher alertEventFetcher;

private final AlertEventLoop alertEventLoop;

public AlertBootstrapService(AlertRpcServer alertRpcServer,
AlertRegistryClient alertRegistryClient,
AlertPluginManager alertPluginManager,
AlertHAServer alertHAServer,
AlertEventFetcher alertEventFetcher,
AlertEventLoop alertEventLoop) {
this.alertRpcServer = alertRpcServer;
this.alertRegistryClient = alertRegistryClient;
this.alertPluginManager = alertPluginManager;
this.alertHAServer = alertHAServer;
this.alertEventFetcher = alertEventFetcher;
this.alertEventLoop = alertEventLoop;
}

public void start() {
log.info("AlertBootstrapService starting...");
alertPluginManager.start();
alertRpcServer.start();
alertRegistryClient.start();
alertHAServer.start();

alertEventFetcher.start();
alertEventLoop.start();
log.info("AlertBootstrapService started...");
Expand All @@ -73,15 +54,8 @@ public void start() {
@Override
public void close() {
log.info("AlertBootstrapService stopping...");
try (
AlertRpcServer closedAlertRpcServer = alertRpcServer;
AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) {
// close resource
alertEventFetcher.shutdown();

alertEventLoop.shutdown();
alertHAServer.shutdown();
}
alertEventFetcher.shutdown();
alertEventLoop.shutdown();
log.info("AlertBootstrapService stopped...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.alert.service;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
Expand All @@ -29,8 +30,18 @@
@Component
public class AlertHAServer extends AbstractHAServer {

public AlertHAServer(Registry registry) {
super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath());
public AlertHAServer(final Registry registry, final AlertConfig alertConfig) {
super(registry, RegistryNodeType.ALERT_HA_LEADER.getRegistryPath(), alertConfig.getAlertServerAddress());
}

@Override
public void start() {
super.start();
log.info("AlertHAServer started...");
}

@Override
public void close() {
log.info("AlertHAServer shutdown...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
@AllArgsConstructor
public enum RegistryNodeType {

ALL_SERVERS("nodes", "/nodes"),
MASTER("Master", "/nodes/master"),
MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"),
MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", "/lock/master-serial-workflow-coordinator"),

WORKER("Worker", "/nodes/worker"),

ALERT_SERVER("AlertServer", "/nodes/alert-server"),
ALERT_LOCK("AlertNodeLock", "/lock/alert");
ALERT_HA_LEADER("AlertHALeader", "/nodes/alert-server-ha-leader");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.dolphinscheduler.registry.api.ha;

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -34,38 +33,41 @@ public abstract class AbstractHAServer implements HAServer {

private final Registry registry;

private final String serverPath;
private final String selectorPath;

private final String serverIdentify;

private ServerStatus serverStatus;

private final List<ServerStatusChangeListener> serverStatusChangeListeners;

public AbstractHAServer(Registry registry, String serverPath) {
public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) {
this.registry = registry;
this.serverPath = serverPath;
this.selectorPath = checkNotNull(selectorPath);
this.serverIdentify = checkNotNull(serverIdentify);
this.serverStatus = ServerStatus.STAND_BY;
this.serverStatusChangeListeners = Lists.newArrayList(new DefaultServerStatusChangeListener());
}

@Override
public void start() {
registry.subscribe(serverPath, event -> {
registry.subscribe(selectorPath, event -> {
if (Event.Type.REMOVE.equals(event.type())) {
if (isActive() && !participateElection()) {
if (serverIdentify.equals(event.data())) {
statusChange(ServerStatus.STAND_BY);
} else {
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
}
}
}
});
ScheduledExecutorService electionSelectionThread =
ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread");
electionSelectionThread.schedule(() -> {
if (isActive()) {
return;
}
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
}
}, 10, TimeUnit.SECONDS);

if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
} else {
log.info("Server {} is standby", serverIdentify);
}
}

@Override
Expand All @@ -75,7 +77,22 @@ public boolean isActive() {

@Override
public boolean participateElection() {
return registry.acquireLock(serverPath, 3_000);
final String electionLock = selectorPath + "-lock";
try {
if (registry.acquireLock(electionLock)) {
if (!registry.exists(selectorPath)) {
registry.put(selectorPath, serverIdentify, true);
return true;
}
return serverIdentify.equals(registry.get(selectorPath));
}
return false;
} catch (Exception e) {
log.error("participate election error", e);
return false;
} finally {
registry.releaseLock(electionLock);
}
}

@Override
Expand All @@ -88,18 +105,15 @@ public ServerStatus getServerStatus() {
return serverStatus;
}

@Override
public void shutdown() {
if (isActive()) {
registry.releaseLock(serverPath);
}
}

private void statusChange(ServerStatus targetStatus) {
final ServerStatus originStatus = serverStatus;
serverStatus = targetStatus;
synchronized (this) {
ServerStatus originStatus = serverStatus;
serverStatus = targetStatus;
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
try {
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
} catch (Exception ex) {
log.error("Trigger ServerStatusChangeListener from {} -> {} error", originStatus, targetStatus, ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public abstract class AbstractServerStatusChangeListener implements ServerStatus

@Override
public void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus) {
log.info("The status change from {} to {}.", originStatus, currentStatus);
if (originStatus == HAServer.ServerStatus.ACTIVE) {
if (currentStatus == HAServer.ServerStatus.STAND_BY) {
changeToStandBy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* Interface for HA server, used to select a active server from multiple servers.
* In HA mode, there are multiple servers, only one server is active, others are standby.
*/
public interface HAServer {
public interface HAServer extends AutoCloseable {

/**
* Start the server.
Expand Down Expand Up @@ -57,7 +57,8 @@ public interface HAServer {
/**
* Shutdown the server, release resources.
*/
void shutdown();
@Override
void close();
Fixed Show fixed Hide fixed

enum ServerStatus {
ACTIVE,
Expand Down
Loading