Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
HxpSerein committed Oct 29, 2024
1 parent 0296878 commit a7beaa5
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,13 @@ sso:
# Optional, change by authentication client
# Please replace and fill in your client config below when enabled SSO

registry:
# default using jdbc as registry
type: jdbc
heartbeat-refresh-interval: 1s
session-timeout: 3s

zookeeper:
# zookeeper address
address: localhost:2181
high-availability:
enable: false # true
# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
zookeeper.quorum: 192.168.100.128:2181,192.168.100.129:2181

network:
# network interface preferred like eth0, default: empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console;

import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.base.config.SpringProperties;
import org.apache.streampark.console.core.service.RegistryService;

Expand Down Expand Up @@ -64,11 +65,17 @@ public static void main(String[] args) throws Exception {

@PostConstruct
public void init() {
registryService.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
registryService.close();
log.info("RegistryService close success.");
}));
if (enableHA()) {
registryService.startListening();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
registryService.deRegistry();
log.info("RegistryService close success.");
}));
}
}

public boolean enableHA() {
return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ public interface RegistryService {
/**
* Start the registry service.
*/
void start();
void startListening();

/**
* Close the registry service.
*/
void close();
void deRegistry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,12 @@ public class RegistryServiceImpl implements RegistryService {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

@PostConstruct
public void init() {
public void registry() {
if (!enableHA()) {
return;
}
try {
zk_address = SystemPropertyUtils.get("zookeeper.address", "localhost:2181");
zk_address = SystemPropertyUtils.get("high-availability.zookeeper.quorum", "localhost:2181");
zk = new ZooKeeper(zk_address, HEARTBEAT_TIMEOUT, watcher);

if (zk.exists(REGISTRY_PATH, false) == null) {
Expand All @@ -82,7 +85,8 @@ public void init() {

String ip = InetAddress.getLocalHost().getHostAddress();
String port = SystemPropertyUtils.get("server.port", "10000");
nodePath = zk.create(REGISTRY_PATH + "/" + ip + ":" + port, new byte[0],
String server_id = ip + ":" + port;
nodePath = zk.create(REGISTRY_PATH + "/" + server_id, new byte[0],
OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

currentNodes.add(nodePath);
Expand All @@ -92,7 +96,10 @@ public void init() {
}

@Override
public void start() {
public void startListening() {
if (!enableHA()) {
return;
}
try {
distributedTaskService.init(currentNodes, nodePath);
startHeartbeat();
Expand Down Expand Up @@ -193,7 +200,10 @@ private void reconnectAndRegister() {
}

@Override
public void close() {
public void deRegistry() {
if (!enableHA()) {
return;
}
try {
zk.close();
scheduler.shutdown();
Expand All @@ -204,4 +214,8 @@ public void close() {
}
}

public boolean enableHA() {
return SystemPropertyUtils.get("high-availability.enable", "false").equals("true");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.streampark.console.core.service;

import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.core.service.impl.RegistryServiceImpl;

import org.junit.jupiter.api.Assertions;
Expand All @@ -28,9 +29,10 @@ public class RegistryServiceTest {

@Test
public void testRegister() {
registryService.init();
SystemPropertyUtils.set("high-availability.enable", "true");
registryService.registry();
Assertions.assertEquals(1, registryService.getCurrentNodes().size());
registryService.close();
registryService.deRegistry();
}

}

0 comments on commit a7beaa5

Please sign in to comment.