diff --git a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml index a12277c625..91e843a2b9 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml +++ b/streampark-console/streampark-console-service/src/main/assembly/conf/config.yaml @@ -100,15 +100,13 @@ sso: # Optional, change by authentication client # Please replace and fill in your client config below when enabled SSO -registry: - # default using jdbc as registry - type: jdbc - heartbeat-refresh-interval: 1s - session-timeout: 3s - -zookeeper: - # zookeeper address - address: localhost:2181 +high-availability: + enable: false # true + # The list of ZooKeeper quorum peers that coordinate the high-availability + # setup. This must be a list of the form: + # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) + # + zookeeper.quorum: 192.168.100.128:2181,192.168.100.129:2181 network: # network interface preferred like eth0, default: empty diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index fda4c7d827..1d3492b681 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -17,6 +17,7 @@ package org.apache.streampark.console; +import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.streampark.console.base.config.SpringProperties; import org.apache.streampark.console.core.service.RegistryService; @@ -64,11 +65,17 @@ public static void main(String[] args) throws Exception { @PostConstruct public void init() { - registryService.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - registryService.close(); - log.info("RegistryService close success."); - })); + if (enableHA()) { + registryService.startListening(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + registryService.deRegistry(); + log.info("RegistryService close success."); + })); + } + } + + public boolean enableHA() { + return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java index 1e362ad1bb..d6a0cfb196 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/RegistryService.java @@ -22,10 +22,10 @@ public interface RegistryService { /** * Start the registry service. */ - void start(); + void startListening(); /** * Close the registry service. */ - void close(); + void deRegistry(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java index 3fbc182345..cab5afadf5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/RegistryServiceImpl.java @@ -71,9 +71,12 @@ public class RegistryServiceImpl implements RegistryService { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); @PostConstruct - public void init() { + public void registry() { + if (!enableHA()) { + return; + } try { - zk_address = SystemPropertyUtils.get("zookeeper.address", "localhost:2181"); + zk_address = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181"); zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher); if (zk.exists(REGISTRY_PATH, false) == null) { @@ -82,7 +85,8 @@ public void init() { String ip = InetAddress.getLocalHost().getHostAddress(); String port = SystemPropertyUtils.get("server.port", "10000"); - nodePath = zk.create(REGISTRY_PATH + "/" + ip + ":" + port, new byte[0], + String server_id = ip + ":" + port; + nodePath = zk.create(REGISTRY_PATH + "/" + server_id, new byte[0], OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); currentNodes.add(nodePath); @@ -92,7 +96,10 @@ public void init() { } @Override - public void start() { + public void startListening() { + if (!enableHA()) { + return; + } try { distributedTaskService.init(currentNodes, nodePath); startHeartbeat(); @@ -193,7 +200,10 @@ private void reconnectAndRegister() { } @Override - public void close() { + public void deRegistry() { + if (!enableHA()) { + return; + } try { zk.close(); scheduler.shutdown(); @@ -204,4 +214,8 @@ public void close() { } } + public boolean enableHA() { + return SystemPropertyUtils.get("high-availability.enable", "false").equals("true"); + } + } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java index 756c0911db..e701b2a614 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/RegistryServiceTest.java @@ -17,6 +17,7 @@ package org.apache.streampark.console.core.service; +import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.streampark.console.core.service.impl.RegistryServiceImpl; import org.junit.jupiter.api.Assertions; @@ -28,9 +29,10 @@ public class RegistryServiceTest { @Test public void testRegister() { - registryService.init(); + SystemPropertyUtils.set("high-availability.enable", "true"); + registryService.registry(); Assertions.assertEquals(1, registryService.getCurrentNodes().size()); - registryService.close(); + registryService.deRegistry(); } }