Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/sofa-registry-kubernetes' into s…
Browse files Browse the repository at this point in the history
…ofa-registry-kubernetes
  • Loading branch information
呈铭 committed Mar 14, 2024
2 parents d2c502e + af2a8c4 commit 0d3ca60
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 29 deletions.
2 changes: 1 addition & 1 deletion all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-all</artifactId>
<version>5.12.0</version>
<version>5.13.0-SNAPSHOT</version>


<name>${project.groupId}:${project.artifactId}</name>
Expand Down
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<packaging>pom</packaging>

<properties>
<revision>5.12.0</revision>
<revision>5.13.0-SNAPSHOT</revision>
<javassist.version>3.29.2-GA</javassist.version>
<bytebuddy.version>1.9.8</bytebuddy.version>
<netty.version>4.1.77.Final</netty.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ public final class Version {
/**
* 当前RPC版本,例如:5.6.7
*/
public static final String VERSION = "5.12.0";
public static final String VERSION = "5.13.0";

/**
* 当前RPC版本,例如: 5.6.7 对应 50607
*/
public static final int RPC_VERSION = 51200;
public static final int RPC_VERSION = 51300;

/**
* 当前Build版本,每次发布修改
*/
public static final String BUILD_VERSION = "5.12.0_20240122111527";
public static final String BUILD_VERSION = "5.13.0_20240222103719";

}
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ public static UserThreadPool getUserThread(String service) {
public static Set<UserThreadPool> getUserThreadPoolSet() {
Set<UserThreadPool> userThreadPoolSet = new HashSet<>();
if (hasUserThread()) {
for (UserThreadPool userThreadPool : userThreadMap.values()) {
userThreadPoolSet.add(userThreadPool);
}
userThreadPoolSet.addAll(userThreadMap.values());
}
return userThreadPoolSet;
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@

<properties>
<!-- Build args -->
<revision>5.12.0</revision>
<revision>5.13.0-SNAPSHOT</revision>
<jmh.version>1.33</jmh.version>
<module.install.skip>true</module.install.skip>
<module.deploy.skip>true</module.deploy.skip>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.annotation.VisibleForTesting;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
Expand Down Expand Up @@ -146,6 +146,10 @@ public synchronized void init() {
nacosConfig.putAll(parameters);
}

if (providerObserver == null) {
providerObserver = new NacosRegistryProviderObserver();
}

try {
namingService = NamingFactory.createNamingService(nacosConfig);
} catch (NacosException e) {
Expand Down Expand Up @@ -272,26 +276,19 @@ public List<ProviderGroup> subscribe(final ConsumerConfig config) {
}

try {
if (providerObserver == null) {
providerObserver = new NacosRegistryProviderObserver();
}

ProviderInfoListener providerInfoListener = config.getProviderInfoListener();
providerObserver.addProviderListener(config, providerInfoListener);

EventListener eventListener = new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
// avoid npe
if (null == instances) {
instances = new ArrayList<Instance>();
}
instances.removeIf(i -> !i.isEnabled());
providerObserver.updateProviders(config, instances);
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
// avoid npe
if (null == instances) {
instances = new ArrayList<Instance>();
}
instances.removeIf(i -> !i.isEnabled());
providerObserver.updateProviders(config, instances);
}
};
namingService.subscribe(serviceName, defaultCluster, eventListener);
Expand Down Expand Up @@ -359,4 +356,14 @@ public void destroy() {
public Properties getNacosConfig() {
return nacosConfig;
}

/**
* UT only
*
* @return
*/
@VisibleForTesting
public NacosRegistryProviderObserver getProviderObserver() {
return providerObserver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.struct.ConcurrentHashSet;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand All @@ -36,9 +37,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -66,8 +70,6 @@ public void setUp() {
.setRegister(true);

registry = (NacosRegistry) RegistryFactory.getRegistry(registryConfig);
registry.init();
Assert.assertTrue(registry.start());
}

/**
Expand All @@ -77,7 +79,30 @@ public void setUp() {
public void tearDown() {
registry.destroy();
registry = null;
serverConfig.destroy();
}

@Test
public void testMuiltInit() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
final CountDownLatch latch = new CountDownLatch(10);
Set<NacosRegistryProviderObserver> sets = new ConcurrentHashSet<>();

for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
registry.init();
NacosRegistryProviderObserver providerObserver = registry.getProviderObserver();
sets.add(providerObserver);
} finally {
latch.countDown();
}
});
}

latch.await();
executorService.shutdown();

Assert.assertEquals(1, sets.size());
}

/**
Expand All @@ -87,6 +112,8 @@ public void tearDown() {
*/
@Test
public void testProviderObserver() throws Exception {
registry.init();
Assert.assertTrue(registry.start());
int timeoutPerSub = 2000;

//wait nacos startup ok
Expand Down Expand Up @@ -227,6 +254,7 @@ public void testProviderObserver() throws Exception {
List<ConsumerConfig> consumerConfigList = new ArrayList<>();
consumerConfigList.add(consumer2);
registry.batchUnSubscribe(consumerConfigList);
serverConfig.destroy();
}

/**
Expand All @@ -236,6 +264,8 @@ public void testProviderObserver() throws Exception {
*/
@Test
public void testVirtualHostAndVirtualPort() throws Exception {
registry.init();
Assert.assertTrue(registry.start());
//wait nacos startup ok
TimeUnit.SECONDS.sleep(10);
// 模拟的场景 client -> proxy:127.7.7.7:8888 -> netty:0.0.0.0:12200
Expand Down Expand Up @@ -297,6 +327,7 @@ public void testVirtualHostAndVirtualPort() throws Exception {
virtualHost + ":" + virtualPort);
Assert.assertEquals("The provider's host should be virtualHost", virtualHost, pri.getHost());
Assert.assertEquals("The provider's port should be virtualPort", virtualPort, pri.getPort());
serverConfig.destroy();
}

private static class MockProviderInfoListener implements ProviderInfoListener {
Expand Down

0 comments on commit 0d3ca60

Please sign in to comment.