Skip to content

Commit

Permalink
[Improvement] Optimize the AbstractHAServer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Nov 15, 2024
1 parent 9a64534 commit 29f8c4c
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
package org.apache.dolphinscheduler.alert;

import org.apache.dolphinscheduler.alert.metrics.AlertServerMetrics;
import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;
import org.apache.dolphinscheduler.alert.service.AlertBootstrapService;
import org.apache.dolphinscheduler.alert.service.AlertHAServer;
import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
import org.apache.dolphinscheduler.registry.api.ha.AbstractServerStatusChangeListener;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -44,6 +49,18 @@
@SpringBootApplication
public class AlertServer {

@Autowired
private AlertRpcServer alertRpcServer;

@Autowired
private AlertPluginManager alertPluginManager;

@Autowired
private AlertRegistryClient alertRegistryClient;

@Autowired
private AlertHAServer alertHAServer;

@Autowired
private AlertBootstrapService alertBootstrapService;

Expand All @@ -58,7 +75,25 @@ public static void main(String[] args) {
public void run() {
ServerLifeCycleManager.toRunning();
log.info("AlertServer is staring ...");
alertBootstrapService.start();
alertPluginManager.start();
alertRpcServer.start();
alertRegistryClient.start();

alertHAServer.addServerStatusChangeListener(new AbstractServerStatusChangeListener() {

@Override
public void changeToActive() {
alertBootstrapService.start();
}

@Override
public void changeToStandBy() {
close();
}
});

alertHAServer.start();

log.info("AlertServer is started ...");
}

Expand All @@ -73,7 +108,12 @@ public void close() {
return;
}
log.info("AlertServer is stopping, cause: {}", cause);
alertBootstrapService.close();
try (
final AlertRpcServer ignore = alertRpcServer;
final AlertRegistryClient ignore1 = alertRegistryClient;
final AlertHAServer ignore2 = alertHAServer;
final AlertBootstrapService ignore3 = alertBootstrapService;) {
}
// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(Constants.SERVER_CLOSE_WAIT_TIME.toMillis());
log.info("AlertServer stopped, cause: {}", cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.alert.service;

import org.apache.dolphinscheduler.alert.plugin.AlertPluginManager;
import org.apache.dolphinscheduler.alert.registry.AlertRegistryClient;
import org.apache.dolphinscheduler.alert.rpc.AlertRpcServer;

Expand All @@ -32,39 +31,21 @@
@Service
public final class AlertBootstrapService implements AutoCloseable {

private final AlertRpcServer alertRpcServer;

private final AlertRegistryClient alertRegistryClient;

private final AlertPluginManager alertPluginManager;

private final AlertHAServer alertHAServer;

private final AlertEventFetcher alertEventFetcher;

private final AlertEventLoop alertEventLoop;

public AlertBootstrapService(AlertRpcServer alertRpcServer,
AlertRegistryClient alertRegistryClient,
AlertPluginManager alertPluginManager,
AlertHAServer alertHAServer,
AlertEventFetcher alertEventFetcher,
AlertEventLoop alertEventLoop) {
this.alertRpcServer = alertRpcServer;
this.alertRegistryClient = alertRegistryClient;
this.alertPluginManager = alertPluginManager;
this.alertHAServer = alertHAServer;
this.alertEventFetcher = alertEventFetcher;
this.alertEventLoop = alertEventLoop;
}

public void start() {
log.info("AlertBootstrapService starting...");
alertPluginManager.start();
alertRpcServer.start();
alertRegistryClient.start();
alertHAServer.start();

alertEventFetcher.start();
alertEventLoop.start();
log.info("AlertBootstrapService started...");
Expand All @@ -73,15 +54,8 @@ public void start() {
@Override
public void close() {
log.info("AlertBootstrapService stopping...");
try (
AlertRpcServer closedAlertRpcServer = alertRpcServer;
AlertRegistryClient closedAlertRegistryClient = alertRegistryClient) {
// close resource
alertEventFetcher.shutdown();

alertEventLoop.shutdown();
alertHAServer.shutdown();
}
alertEventFetcher.shutdown();
alertEventLoop.shutdown();
log.info("AlertBootstrapService stopped...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.alert.service;

import org.apache.dolphinscheduler.alert.config.AlertConfig;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.registry.api.ha.AbstractHAServer;
Expand All @@ -29,8 +30,18 @@
@Component
public class AlertHAServer extends AbstractHAServer {

public AlertHAServer(Registry registry) {
super(registry, RegistryNodeType.ALERT_LOCK.getRegistryPath());
public AlertHAServer(final Registry registry, final AlertConfig alertConfig) {
super(registry, RegistryNodeType.ALERT_HA_LEADER.getRegistryPath(), alertConfig.getAlertServerAddress());
}

@Override
public void start() {
super.start();
log.info("AlertHAServer started...");
}

@Override
public void close() {
log.info("AlertHAServer shutdown...");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
@AllArgsConstructor
public enum RegistryNodeType {

ALL_SERVERS("nodes", "/nodes"),
MASTER("Master", "/nodes/master"),
MASTER_NODE_LOCK("MasterNodeLock", "/lock/master-node"),
MASTER_FAILOVER_LOCK("MasterFailoverLock", "/lock/master-failover"),
MASTER_TASK_GROUP_COORDINATOR_LOCK("TaskGroupCoordinatorLock", "/lock/master-task-group-coordinator"),
MASTER_SERIAL_COORDINATOR_LOCK("SerialWorkflowCoordinator", "/lock/master-serial-workflow-coordinator"),

WORKER("Worker", "/nodes/worker"),

ALERT_SERVER("AlertServer", "/nodes/alert-server"),
ALERT_LOCK("AlertNodeLock", "/lock/alert");
ALERT_HA_LEADER("AlertHALeader", "/nodes/alert-server-ha-leader");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.dolphinscheduler.registry.api.ha;

import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import static com.google.common.base.Preconditions.checkNotNull;

import org.apache.dolphinscheduler.registry.api.Event;
import org.apache.dolphinscheduler.registry.api.Registry;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -34,38 +33,41 @@ public abstract class AbstractHAServer implements HAServer {

private final Registry registry;

private final String serverPath;
private final String selectorPath;

private final String serverIdentify;

private ServerStatus serverStatus;

private final List<ServerStatusChangeListener> serverStatusChangeListeners;

public AbstractHAServer(Registry registry, String serverPath) {
public AbstractHAServer(final Registry registry, final String selectorPath, final String serverIdentify) {
this.registry = registry;
this.serverPath = serverPath;
this.selectorPath = checkNotNull(selectorPath);
this.serverIdentify = checkNotNull(serverIdentify);
this.serverStatus = ServerStatus.STAND_BY;
this.serverStatusChangeListeners = Lists.newArrayList(new DefaultServerStatusChangeListener());
}

@Override
public void start() {
registry.subscribe(serverPath, event -> {
registry.subscribe(selectorPath, event -> {
if (Event.Type.REMOVE.equals(event.type())) {
if (isActive() && !participateElection()) {
if (serverIdentify.equals(event.data())) {
statusChange(ServerStatus.STAND_BY);
} else {
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
}
}
}
});
ScheduledExecutorService electionSelectionThread =
ThreadUtils.newSingleDaemonScheduledExecutorService("election-selection-thread");
electionSelectionThread.schedule(() -> {
if (isActive()) {
return;
}
if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
}
}, 10, TimeUnit.SECONDS);

if (participateElection()) {
statusChange(ServerStatus.ACTIVE);
} else {
log.info("Server {} is standby", serverIdentify);
}
}

@Override
Expand All @@ -75,7 +77,22 @@ public boolean isActive() {

@Override
public boolean participateElection() {
return registry.acquireLock(serverPath, 3_000);
final String electionLock = selectorPath + "-lock";
try {
if (registry.acquireLock(electionLock)) {
if (!registry.exists(selectorPath)) {
registry.put(selectorPath, serverIdentify, true);
return true;
}
return serverIdentify.equals(registry.get(selectorPath));
}
return false;
} catch (Exception e) {
log.error("participate election error", e);
return false;
} finally {
registry.releaseLock(electionLock);
}
}

@Override
Expand All @@ -88,18 +105,15 @@ public ServerStatus getServerStatus() {
return serverStatus;
}

@Override
public void shutdown() {
if (isActive()) {
registry.releaseLock(serverPath);
}
}

private void statusChange(ServerStatus targetStatus) {
final ServerStatus originStatus = serverStatus;
serverStatus = targetStatus;
synchronized (this) {
ServerStatus originStatus = serverStatus;
serverStatus = targetStatus;
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
try {
serverStatusChangeListeners.forEach(listener -> listener.change(originStatus, serverStatus));
} catch (Exception ex) {
log.error("Trigger ServerStatusChangeListener from {} -> {} error", originStatus, targetStatus, ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public abstract class AbstractServerStatusChangeListener implements ServerStatus

@Override
public void change(HAServer.ServerStatus originStatus, HAServer.ServerStatus currentStatus) {
log.info("The status change from {} to {}.", originStatus, currentStatus);
if (originStatus == HAServer.ServerStatus.ACTIVE) {
if (currentStatus == HAServer.ServerStatus.STAND_BY) {
changeToStandBy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* Interface for HA server, used to select a active server from multiple servers.
* In HA mode, there are multiple servers, only one server is active, others are standby.
*/
public interface HAServer {
public interface HAServer extends AutoCloseable {

/**
* Start the server.
Expand Down Expand Up @@ -57,7 +57,8 @@ public interface HAServer {
/**
* Shutdown the server, release resources.
*/
void shutdown();
@Override
void close();

enum ServerStatus {
ACTIVE,
Expand Down

0 comments on commit 29f8c4c

Please sign in to comment.