Skip to content

Commit

Permalink
fix #1380, create NacosRegistryProviderObserver when init method is e…
Browse files Browse the repository at this point in the history
…xecuted (#1401)

* fix #1380

* fix #1380

---------

Co-authored-by: 呈铭 <[email protected]>
  • Loading branch information
wangchengming666 and 呈铭 committed Mar 12, 2024
1 parent df2dcae commit ac2a73e
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.annotation.VisibleForTesting;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
Expand Down Expand Up @@ -146,6 +146,10 @@ public synchronized void init() {
nacosConfig.putAll(parameters);
}

if (providerObserver == null) {
providerObserver = new NacosRegistryProviderObserver();
}

try {
namingService = NamingFactory.createNamingService(nacosConfig);
} catch (NacosException e) {
Expand Down Expand Up @@ -272,26 +276,19 @@ public List<ProviderGroup> subscribe(final ConsumerConfig config) {
}

try {
if (providerObserver == null) {
providerObserver = new NacosRegistryProviderObserver();
}

ProviderInfoListener providerInfoListener = config.getProviderInfoListener();
providerObserver.addProviderListener(config, providerInfoListener);

EventListener eventListener = new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
// avoid npe
if (null == instances) {
instances = new ArrayList<Instance>();
}
instances.removeIf(i -> !i.isEnabled());
providerObserver.updateProviders(config, instances);
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
List<Instance> instances = namingEvent.getInstances();
// avoid npe
if (null == instances) {
instances = new ArrayList<Instance>();
}
instances.removeIf(i -> !i.isEnabled());
providerObserver.updateProviders(config, instances);
}
};
namingService.subscribe(serviceName, defaultCluster, eventListener);
Expand Down Expand Up @@ -359,4 +356,14 @@ public void destroy() {
public Properties getNacosConfig() {
return nacosConfig;
}

/**
* UT only
*
* @return
*/
@VisibleForTesting
public NacosRegistryProviderObserver getProviderObserver() {
return providerObserver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.struct.ConcurrentHashSet;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand All @@ -36,9 +37,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -66,8 +70,6 @@ public void setUp() {
.setRegister(true);

registry = (NacosRegistry) RegistryFactory.getRegistry(registryConfig);
registry.init();
Assert.assertTrue(registry.start());
}

/**
Expand All @@ -77,7 +79,30 @@ public void setUp() {
public void tearDown() {
registry.destroy();
registry = null;
serverConfig.destroy();
}

@Test
public void testMuiltInit() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
final CountDownLatch latch = new CountDownLatch(10);
Set<NacosRegistryProviderObserver> sets = new ConcurrentHashSet<>();

for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
try {
registry.init();
NacosRegistryProviderObserver providerObserver = registry.getProviderObserver();
sets.add(providerObserver);
} finally {
latch.countDown();
}
});
}

latch.await();
executorService.shutdown();

Assert.assertEquals(1, sets.size());
}

/**
Expand All @@ -87,6 +112,8 @@ public void tearDown() {
*/
@Test
public void testProviderObserver() throws Exception {
registry.init();
Assert.assertTrue(registry.start());
int timeoutPerSub = 2000;

//wait nacos startup ok
Expand Down Expand Up @@ -227,6 +254,7 @@ public void testProviderObserver() throws Exception {
List<ConsumerConfig> consumerConfigList = new ArrayList<>();
consumerConfigList.add(consumer2);
registry.batchUnSubscribe(consumerConfigList);
serverConfig.destroy();
}

/**
Expand All @@ -236,6 +264,8 @@ public void testProviderObserver() throws Exception {
*/
@Test
public void testVirtualHostAndVirtualPort() throws Exception {
registry.init();
Assert.assertTrue(registry.start());
//wait nacos startup ok
TimeUnit.SECONDS.sleep(10);
// 模拟的场景 client -> proxy:127.7.7.7:8888 -> netty:0.0.0.0:12200
Expand Down Expand Up @@ -297,6 +327,7 @@ public void testVirtualHostAndVirtualPort() throws Exception {
virtualHost + ":" + virtualPort);
Assert.assertEquals("The provider's host should be virtualHost", virtualHost, pri.getHost());
Assert.assertEquals("The provider's port should be virtualPort", virtualPort, pri.getPort());
serverConfig.destroy();
}

private static class MockProviderInfoListener implements ProviderInfoListener {
Expand Down

0 comments on commit ac2a73e

Please sign in to comment.