From 49349b2e52ef5b657a121c772ca13fcc6cbeda60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=88=E9=93=AD?= Date: Wed, 24 Jan 2024 20:07:34 +0800 Subject: [PATCH] rebase support sofa registry kubernetes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes fix support sofa registry kubernetes fix KubernetesRegistryHelper support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support k8s registry support k8s registry update rpc version to 5.13.0-SNAPSHOT (#1396) Co-authored-by: liujianjun.ljj optimize UserThreadPoolManager (#1390) Co-authored-by: 呈铭 fix #1380, create NacosRegistryProviderObserver when init method is executed (#1401) * fix https://github.com/sofastack/sofa-rpc/issues/1380 * fix https://github.com/sofastack/sofa-rpc/issues/1380 --------- Co-authored-by: 呈铭 support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes support sofa registry kubernetes --- all/pom.xml | 8 +- bom/pom.xml | 32 +- .../com/alipay/sofa/rpc/common/Version.java | 6 +- .../sofa/rpc/config/RegistryConfig.java | 30 ++ .../rpc/config/UserThreadPoolManager.java | 4 +- pom.xml | 2 +- registry/registry-kubernetes/pom.xml | 72 ---- .../kubernetes/KubernetesConstant.java | 46 --- .../kubernetes/KubernetesRegistry.java | 211 +++++++---- .../kubernetes/KubernetesRegistryHelper.java | 101 +++++ .../KubernetesRegistryProviderWatcher.java | 77 ++++ .../constant/KubernetesClientConstants.java | 57 ++- .../util/KubernetesConfigUtils.java | 77 ---- .../KubernetesClientUtils.java | 2 +- .../utils/KubernetesConfigUtils.java | 104 ++++++ .../kubernetes/KubernetesRegistryTest.java | 345 ++++++++++++++++++ .../rpc/registry/kubernetes/TestService.java | 22 ++ .../rpc/registry/kubernetes/TestService2.java | 22 ++ .../registry/kubernetes/TestServiceImpl.java | 25 ++ .../registry/kubernetes/TestServiceImpl2.java | 25 ++ .../org.mockito.plugins.MockMaker | 1 + .../rpc/registry/nacos/NacosRegistry.java | 41 ++- .../rpc/registry/nacos/NacosRegistryTest.java | 37 +- 23 files changed, 1009 insertions(+), 338 deletions(-) delete mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesConstant.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java delete mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java rename registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/{util => utils}/KubernetesClientUtils.java (95%) create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java create mode 100644 registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/all/pom.xml b/all/pom.xml index 20e592155..df0690331 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -6,7 +6,7 @@ com.alipay.sofa sofa-rpc-all - 5.12.0 + 5.13.0-SNAPSHOT ${project.groupId}:${project.artifactId} @@ -244,6 +244,11 @@ sofa-rpc-registry-polaris ${project.version} + + com.alipay.sofa + sofa-rpc-registry-kubernetes + ${project.version} + com.alipay.sofa sofa-rpc-remoting-bolt @@ -545,6 +550,7 @@ com.alipay.sofa:sofa-rpc-registry-multicast com.alipay.sofa:sofa-rpc-registry-sofa com.alipay.sofa:sofa-rpc-registry-polaris + com.alipay.sofa:sofa-rpc-registry-kubernetes com.alipay.sofa:sofa-rpc-remoting-bolt com.alipay.sofa:sofa-rpc-remoting-http com.alipay.sofa:sofa-rpc-remoting-resteasy diff --git a/bom/pom.xml b/bom/pom.xml index a29df3225..d43a6183d 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -10,7 +10,7 @@ pom - 5.12.0 + 5.13.0-SNAPSHOT 3.29.2-GA 1.9.8 4.1.77.Final @@ -31,8 +31,6 @@ 7.0 32.0.0-jre 0.16.0 - - 6.9.2 3.5.2 0.9.2 @@ -62,6 +60,8 @@ true true + + 6.9.2 @@ -516,6 +516,19 @@ ${grpc.version} + + + io.fabric8 + kubernetes-client + ${fabric8_kubernetes_version} + + + io.fabric8 + kubernetes-server-mock + test + ${fabric8_kubernetes_version} + + org.apache.curator @@ -600,19 +613,6 @@ 0.16.0 test - - - - io.fabric8 - kubernetes-client - ${fabric8_kubernetes_version} - - - io.fabric8 - kubernetes-server-mock - test - ${fabric8_kubernetes_version} - diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java b/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java index 9fb2b736b..0860ef500 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/common/Version.java @@ -27,16 +27,16 @@ public final class Version { /** * 当前RPC版本,例如:5.6.7 */ - public static final String VERSION = "5.12.0"; + public static final String VERSION = "5.13.0"; /** * 当前RPC版本,例如: 5.6.7 对应 50607 */ - public static final int RPC_VERSION = 51200; + public static final int RPC_VERSION = 51300; /** * 当前Build版本,每次发布修改 */ - public static final String BUILD_VERSION = "5.12.0_20240122111527"; + public static final String BUILD_VERSION = "5.13.0_20240222103719"; } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java index b11533728..674a89fa6 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java @@ -398,6 +398,36 @@ public String getParameter(String key) { return parameters == null ? null : parameters.get(key); } + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public String getParameter(String key, String defaultValue) { + return getParameter(key) == null ? defaultValue : getParameter(key); + } + + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public int getParameter(String key, int defaultValue) { + return getParameter(key) == null ? defaultValue : Integer.parseInt(parameters.get(key)); + } + + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public boolean getParameter(String key, boolean defaultValue) { + return getParameter(key) == null ? defaultValue : Boolean.parseBoolean(parameters.get(key)); + } + @Override public String toString() { return "RegistryConfig{" + diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java index 6f044f986..0f667ca0f 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/UserThreadPoolManager.java @@ -81,9 +81,7 @@ public static UserThreadPool getUserThread(String service) { public static Set getUserThreadPoolSet() { Set userThreadPoolSet = new HashSet<>(); if (hasUserThread()) { - for (UserThreadPool userThreadPool : userThreadMap.values()) { - userThreadPoolSet.add(userThreadPool); - } + userThreadPoolSet.addAll(userThreadMap.values()); } return userThreadPoolSet; } diff --git a/pom.xml b/pom.xml index 3c8ab4cd2..ad0a95221 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ - 5.12.0 + 5.13.0-SNAPSHOT 1.33 true true diff --git a/registry/registry-kubernetes/pom.xml b/registry/registry-kubernetes/pom.xml index 7c762aba5..b3bb820ff 100644 --- a/registry/registry-kubernetes/pom.xml +++ b/registry/registry-kubernetes/pom.xml @@ -34,78 +34,6 @@ kubernetes-server-mock test - - org.slf4j - slf4j-log4j12 - test - - - junit - junit - test - - - src/main/java - - - src/main/resources - false - - **/** - - - - src/test/java - - - src/test/resources - false - - **/** - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - ${maven.compiler.source} - ${maven.compiler.target} - ${project.build.sourceEncoding} - - - - org.apache.maven.plugins - maven-install-plugin - - ${module.install.skip} - - - - org.apache.maven.plugins - maven-deploy-plugin - - ${module.deploy.skip} - - - - org.apache.maven.plugins - maven-surefire-plugin - - ${skipTests} - - - **/*Test.java - - - once - - - - - diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesConstant.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesConstant.java deleted file mode 100644 index 1dc12733a..000000000 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesConstant.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.rpc.registry.kubernetes; - -import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; - -public class KubernetesConstant { - - public static CustomResourceDefinitionContext getVsDefinition() { - - return new CustomResourceDefinitionContext.Builder() - .withGroup("service.com.alipay.sofa.rpc") - .withVersion("v1alpha1") - .withScope("Namespaced") - .withName("virtualservices.service.com.alipay.sofa.rpc") - .withPlural("virtualservices") - .withKind("VirtualService") - .build(); - } - - public static CustomResourceDefinitionContext getDrDefinition() { - - return new CustomResourceDefinitionContext.Builder() - .withGroup("service.com.alipay.sofa.rpc") - .withVersion("v1alpha1") - .withScope("Namespaced") - .withName("destinationrules.service.com.alipay.sofa.rpc") - .withPlural("destinationrules") - .withKind("DestinationRule") - .build(); - } -} diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java index 2d093eebe..7f4e616a4 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java @@ -17,63 +17,79 @@ package com.alipay.sofa.rpc.registry.kubernetes; import com.alipay.sofa.rpc.client.ProviderGroup; -import com.alipay.sofa.rpc.common.json.JSON; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; +import com.alipay.sofa.rpc.common.utils.CommonUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; import com.alipay.sofa.rpc.log.LogCodes; import com.alipay.sofa.rpc.log.Logger; import com.alipay.sofa.rpc.log.LoggerFactory; import com.alipay.sofa.rpc.registry.Registry; -import com.alipay.sofa.rpc.registry.kubernetes.util.KubernetesClientUtils; -import com.alipay.sofa.rpc.registry.kubernetes.util.KubernetesConfigUtils; -import io.fabric8.kubernetes.api.model.Endpoints; +import com.alipay.sofa.rpc.registry.kubernetes.utils.KubernetesClientUtils; +import com.alipay.sofa.rpc.registry.kubernetes.utils.KubernetesConfigUtils; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; -import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; @Extension("kubernetes") public class KubernetesRegistry extends Registry { - public static final String EXT_NAME = "KubernetesRegistry"; - /** * slf4j Logger for this class */ private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistry.class); - private static final String kubernetesPropertiesKey = "io.sofa.rpc/metadata"; - private KubernetesClient kubernetesClient; private String currentHostname; private String namespace; - private static final ConcurrentHashMap> serviceInformer = new ConcurrentHashMap<>(64); + private KubernetesRegistryProviderWatcher kubernetesRegistryProviderWatcher; - private static final ConcurrentHashMap> podsInformer = new ConcurrentHashMap<>(64); - - private static final ConcurrentHashMap> endpointsInformer = new ConcurrentHashMap<>(64); + private final ConcurrentMap> consumerListeners = new ConcurrentHashMap<>(64); /** * Instantiates a new kubernetes registry. * * @param registryConfig */ - protected KubernetesRegistry(RegistryConfig registryConfig) { + public KubernetesRegistry(RegistryConfig registryConfig) { super(registryConfig); } + @Override + public synchronized void init() { + // init kubernetes config + Config config = KubernetesConfigUtils.buildKubernetesConfig(registryConfig); + // init kubernetes client + if (kubernetesClient == null) { + this.kubernetesClient = KubernetesClientUtils.buildKubernetesClient(config); + } + // init Watcher + if (kubernetesRegistryProviderWatcher == null) { + kubernetesRegistryProviderWatcher = new KubernetesRegistryProviderWatcher(); + } + this.currentHostname = System.getenv("HOSTNAME"); + this.namespace = config.getNamespace(); + } + @Override public boolean start() { return true; @@ -82,45 +98,65 @@ public boolean start() { @Override public void register(ProviderConfig config) { String appName = config.getAppName(); - if (!config.isRegister()) { + if (!registryConfig.isRegister()) { if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); } return; } - kubernetesClient - .pods() - .inNamespace(namespace) - .withName(currentHostname) - .edit(pod -> new PodBuilder(pod) - .editOrNewMetadata() - .addToAnnotations( - kubernetesPropertiesKey, JSON.toJSONString(config.getApplication().getInsId())) - .endMetadata() - .build()); + if (config.isRegister()) { + PodResource podResource = kubernetesClient.pods() + .inNamespace(namespace) + .withName(currentHostname); + + List serverConfigs = config.getServer(); + + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()); + // 对外提供服务的URL + String url = KubernetesRegistryHelper.convertToUrl(podResource.get(), serverConfig, config); + + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + // 将ProviderConfig存在Annotations上 + .addToAnnotations(dataId, url) + // 为了过滤pod、其实value是用不到的 + .addToLabels(dataId, "") + .endMetadata().build()); + } + } + } } @Override public void unRegister(ProviderConfig config) { String appName = config.getAppName(); - if (!config.isRegister()) { + if (!registryConfig.isRegister()) { if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); } return; } - kubernetesClient - .pods() - .inNamespace(namespace) - .withName(currentHostname) - .edit(pod -> new PodBuilder(pod) - .editOrNewMetadata() - .removeFromAnnotations(kubernetesPropertiesKey) - .endMetadata() - .build()); + if (config.isRegister()) { + PodResource podResource = kubernetesClient.pods() + .inNamespace(namespace) + .withName(currentHostname); + + List serverConfigs = config.getServer(); + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()); + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + .removeFromAnnotations(dataId) + .removeFromLabels(dataId) + .endMetadata() + .build()); + } + } + } } @Override @@ -137,13 +173,65 @@ public void batchUnRegister(List configs) { @Override public List subscribe(ConsumerConfig config) { - config.getProviderInfoListener() + String appName = config.getAppName(); + if (!registryConfig.isSubscribe()) { + // registry ignored + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return null; + } + + if (config.isSubscribe()) { + + ProviderInfoListener providerInfoListener = config.getProviderInfoListener(); + kubernetesRegistryProviderWatcher.addProviderListener(config, providerInfoListener); + + String dataId = KubernetesRegistryHelper.buildDataId(config, config.getProtocol()); + FilterWatchListDeletable podPodListPodResourceFilterWatchListDeletable = + kubernetesClient.pods() + .inNamespace(namespace) + .withLabel(dataId); + + SharedIndexInformer inform = podPodListPodResourceFilterWatchListDeletable.inform(new ResourceEventHandler() { + @Override + public void onAdd(Pod pod) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + + @Override + public void onUpdate(Pod pod, Pod t1) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + + @Override + public void onDelete(Pod pod, boolean b) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + }); + + consumerListeners.put(config, inform); + + inform.start(); + + List pods = podPodListPodResourceFilterWatchListDeletable.list().getItems(); + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(pods, config); + return Collections.singletonList(new ProviderGroup().addAll(providerInfos)); + } + return null; } @Override public void unSubscribe(ConsumerConfig config) { + if (config.isSubscribe()) { + SharedIndexInformer informer = consumerListeners.remove(config); + if (null != informer) { + informer.stop(); + } + } + kubernetesRegistryProviderWatcher.removeProviderListener(config); } @Override @@ -160,42 +248,33 @@ public void batchUnSubscribe(List configs) { @Override public void destroy() { - // stop service - serviceInformer.forEach((k, v) -> v.close()); - serviceInformer.clear(); - - // stop pod - podsInformer.forEach((k, v) -> v.close()); - podsInformer.clear(); - - // stop endpoints - endpointsInformer.forEach((k, v) -> v.close()); - endpointsInformer.clear(); + // unRegister consumer + consumerListeners.forEach((k, v) -> unSubscribe(k)); // close kubernetes client kubernetesClient.close(); } - @Override - public void init() { - // init kubernetes config - Config config = KubernetesConfigUtils.buildKubernetesConfig(registryConfig); - // init kubernetes client - this.kubernetesClient = KubernetesClientUtils.buildKubernetesClient(config); - this.currentHostname = System.getenv("HOSTNAME"); - this.namespace = config.getNamespace(); + private List getPods() { + return kubernetesClient.pods() + .inNamespace(namespace) + .list() + .getItems(); } - private Map getServiceSelector(String serviceName) { - Service service = kubernetesClient - .services() - .inNamespace(namespace) - .withName(serviceName) - .get(); - if (service == null) { - return null; - } - return service.getSpec().getSelector(); + /** + * UT used only + */ + @VisibleForTesting + public void setCurrentHostname(String currentHostname) { + this.currentHostname = currentHostname; } + /** + * UT used only + */ + @VisibleForTesting + public ConcurrentMap> getConsumerListeners() { + return consumerListeners; + } } \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java new file mode 100644 index 000000000..ca61fe851 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderHelper; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.AbstractInterfaceConfig; +import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.registry.utils.RegistryUtils; +import io.fabric8.kubernetes.api.model.Pod; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class KubernetesRegistryHelper extends RegistryUtils { + + private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistryHelper.class); + + public static List convertPodsToProviders(List pods, ConsumerConfig config) { + List providerInfos = new ArrayList<>(); + if (CommonUtils.isEmpty(pods) || null == config) { + return providerInfos; + } + + for (Pod pod : pods) { + ProviderInfo providerInfo = getProviderInfo(pod, config); + if (null == providerInfo) { + continue; + } + providerInfos.add(providerInfo); + } + + return providerInfos; + } + + public static String convertToUrl(Pod pod, ServerConfig serverConfig, ProviderConfig providerConfig) { + String uri = ""; + String protocol = serverConfig.getProtocol(); + if (StringUtils.isNotEmpty(protocol)) { + uri = protocol + "://"; + } + uri += pod.getStatus().getPodIP() + ":" + serverConfig.getPort(); + + Map metaData = RegistryUtils.convertProviderToMap(providerConfig, serverConfig); + + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : metaData.entrySet()) { + sb.append("&").append(entry.getKey()).append("=").append(entry.getValue()); + } + if (sb.length() > 0) { + uri += sb.replace(0, 1, "?").toString(); + } + return uri; + } + + private static ProviderInfo getProviderInfo(Pod pod, ConsumerConfig config) { + try { + String dataId = buildDataId(config, config.getProtocol()); + String providerUrlString = pod.getMetadata().getAnnotations().get(dataId); + + if (StringUtils.isBlank(providerUrlString)) { + return null; + } + return ProviderHelper.toProviderInfo(providerUrlString); + } catch (Exception e) { + LOGGER.info("get provider config error with pod"); + return null; + } + } + + public static String buildDataId(AbstractInterfaceConfig config, String protocol) { + if (RpcConstants.PROTOCOL_TYPE_BOLT.equals(protocol) || RpcConstants.PROTOCOL_TYPE_TR.equals(protocol)) { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@DEFAULT"; + } else { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@" + protocol; + } + } +} diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java new file mode 100644 index 000000000..8c026427b --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import com.alipay.sofa.rpc.registry.utils.RegistryUtils; +import io.fabric8.kubernetes.api.model.Pod; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class KubernetesRegistryProviderWatcher { + + /** + * The Provider add listener map. + */ + private final ConcurrentMap> providerListenerMap = new ConcurrentHashMap<>(); + + /** + * Add provider listener. + * + * @param consumerConfig the consumer config + * @param listener the listener + */ + public void addProviderListener(ConsumerConfig consumerConfig, ProviderInfoListener listener) { + if (listener != null) { + RegistryUtils.initOrAddList(providerListenerMap, consumerConfig, listener); + } + } + + /** + * Remove provider listener. + * + * @param consumerConfig the consumer config + */ + public void removeProviderListener(ConsumerConfig consumerConfig) { + providerListenerMap.remove(consumerConfig); + } + + /** + * Update providers. + * + * @param config the config + * @param podList the pod list + */ + public void updateProviders(ConsumerConfig config, List podList) { + List providerInfoListeners = providerListenerMap.get(config); + if (CommonUtils.isNotEmpty(providerInfoListeners)) { + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(podList, config); + + for (ProviderInfoListener providerInfoListener : providerInfoListeners) { + providerInfoListener.updateAllProviders(Collections.singletonList(new ProviderGroup().addAll(providerInfos))); + } + } + } + +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java index ceb68a4a4..4bd508296 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java @@ -18,63 +18,56 @@ public class KubernetesClientConstants { - public static final String DEFAULT_MASTER_PLACEHOLDER = "DEFAULT_MASTER_HOST"; + public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc"; - public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc"; + public static final String TRUST_CERTS = "trustCerts"; - public static final String ENABLE_REGISTER = "enableRegister"; + public static final String USE_HTTPS = "useHttps"; - public static final String TRUST_CERTS = "trustCerts"; + public static final String HTTP2_DISABLE = "http2Disable"; - public static final String USE_HTTPS = "useHttps"; + public static final String NAMESPACE = "namespace"; - public static final String HTTP2_DISABLE = "http2Disable"; + public static final String API_VERSION = "apiVersion"; - public static final String NAMESPACE = "namespace"; + public static final String CA_CERT_FILE = "caCertFile"; - public static final String API_VERSION = "apiVersion"; + public static final String CA_CERT_DATA = "caCertData"; - public static final String CA_CERT_FILE = "caCertFile"; + public static final String CLIENT_CERT_FILE = "clientCertFile"; - public static final String CA_CERT_DATA = "caCertData"; + public static final String CLIENT_CERT_DATA = "clientCertData"; - public static final String CLIENT_CERT_FILE = "clientCertFile"; + public static final String CLIENT_KEY_FILE = "clientKeyFile"; - public static final String CLIENT_CERT_DATA = "clientCertData"; + public static final String CLIENT_KEY_DATA = "clientKeyData"; - public static final String CLIENT_KEY_FILE = "clientKeyFile"; + public static final String CLIENT_KEY_ALGO = "clientKeyAlgo"; - public static final String CLIENT_KEY_DATA = "clientKeyData"; + public static final String CLIENT_KEY_PASSPHRASE = "clientKeyPassphrase"; - public static final String CLIENT_KEY_ALGO = "clientKeyAlgo"; + public static final String OAUTH_TOKEN = "oauthToken"; - public static final String CLIENT_KEY_PASSPHRASE = "clientKeyPassphrase"; + public static final String USERNAME = "username"; - public static final String OAUTH_TOKEN = "oauthToken"; - - public static final String USERNAME = "username"; - - public static final String PASSWORD = "password"; + public static final String PASSWORD = "password"; public static final String WATCH_RECONNECT_INTERVAL = "watchReconnectInterval"; - public static final String WATCH_RECONNECT_LIMIT = "watchReconnectLimit"; - - public static final String CONNECTION_TIMEOUT = "connectionTimeout"; + public static final String WATCH_RECONNECT_LIMIT = "watchReconnectLimit"; - public static final String REQUEST_TIMEOUT = "requestTimeout"; + public static final String CONNECTION_TIMEOUT = "connectionTimeout"; - public static final String ROLLING_TIMEOUT = "rollingTimeout"; + public static final String REQUEST_TIMEOUT = "requestTimeout"; - public static final String LOGGING_INTERVAL = "loggingInterval"; + public static final String LOGGING_INTERVAL = "loggingInterval"; - public static final String HTTP_PROXY = "httpProxy"; + public static final String HTTP_PROXY = "httpProxy"; - public static final String HTTPS_PROXY = "httpsProxy"; + public static final String HTTPS_PROXY = "httpsProxy"; - public static final String PROXY_USERNAME = "proxyUsername"; + public static final String PROXY_USERNAME = "proxyUsername"; - public static final String PROXY_PASSWORD = "proxyPassword"; + public static final String PROXY_PASSWORD = "proxyPassword"; - public static final String NO_PROXY = "noProxy"; } diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java deleted file mode 100644 index 32002fc41..000000000 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesConfigUtils.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alipay.sofa.rpc.registry.kubernetes.util; - -import com.alipay.sofa.rpc.common.utils.StringUtils; -import com.alipay.sofa.rpc.config.RegistryConfig; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; - -import java.util.Base64; - -import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.DEFAULT_MASTER_URL; - -public class KubernetesConfigUtils { - - public static Config buildKubernetesConfig(RegistryConfig registryConfig) { - - // Init default config - Config base = Config.autoConfigure(null); - - // TODO should be set by user config - return new ConfigBuilder(base) - .withMasterUrl(buildMasterUrl(registryConfig)) - .withApiVersion(base.getApiVersion()) - .withNamespace(base.getNamespace()) - .withUsername(base.getUsername()) - .withPassword(base.getPassword()) - .withOauthToken(base.getOauthToken()) - .withCaCertFile(base.getCaCertFile()) - .withCaCertData(decodeBase64(base.getCaCertData())) - .withClientKeyFile(base.getClientKeyFile()) - .withClientKeyData(decodeBase64(base.getClientKeyData())) - .withClientCertFile(base.getClientCertFile()) - .withClientCertData(decodeBase64(base.getClientCertData())) - .withClientKeyAlgo(base.getClientKeyAlgo()) - .withClientKeyPassphrase(base.getClientKeyPassphrase()) - .withConnectionTimeout(base.getConnectionTimeout()) - .withRequestTimeout(base.getRequestTimeout()) - .withWatchReconnectInterval(base.getWatchReconnectInterval()) - .withWatchReconnectLimit(base.getWatchReconnectLimit()) - .withLoggingInterval(base.getLoggingInterval()) - .withTrustCerts(base.isTrustCerts()) - .withHttp2Disable(base.isHttp2Disable()) - .withHttpProxy(base.getHttpProxy()) - .withHttpsProxy(base.getHttpsProxy()) - .withProxyUsername(base.getProxyUsername()) - .withProxyPassword(base.getProxyPassword()) - .withNoProxy(base.getNoProxy()) - .build(); - } - - private static String buildMasterUrl(RegistryConfig registryConfig) { - String address = registryConfig.getAddress(); - if (StringUtils.isNotBlank(address)) { - return "http://" + address; - } - return DEFAULT_MASTER_URL; - } - - private static String decodeBase64(String str) { - return StringUtils.isNotEmpty(str) ? new String(Base64.getDecoder().decode(str)) : null; - } -} diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesClientUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java similarity index 95% rename from registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesClientUtils.java rename to registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java index 49f8246ce..274d40a99 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/util/KubernetesClientUtils.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alipay.sofa.rpc.registry.kubernetes.util; +package com.alipay.sofa.rpc.registry.kubernetes.utils; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java new file mode 100644 index 000000000..f7aea188f --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes.utils; + +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.RegistryConfig; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; + +import java.util.Base64; + +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.API_VERSION; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CA_CERT_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CA_CERT_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_CERT_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_CERT_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_ALGO; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_PASSPHRASE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CONNECTION_TIMEOUT; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.DEFAULT_MASTER_URL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTP2_DISABLE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTPS_PROXY; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTP_PROXY; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LOGGING_INTERVAL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.NAMESPACE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.OAUTH_TOKEN; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PASSWORD; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PROXY_PASSWORD; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PROXY_USERNAME; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.REQUEST_TIMEOUT; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.TRUST_CERTS; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.USERNAME; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.USE_HTTPS; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.WATCH_RECONNECT_INTERVAL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.WATCH_RECONNECT_LIMIT; + +public class KubernetesConfigUtils { + + public static Config buildKubernetesConfig(RegistryConfig registryConfig) { + + // Init default config + Config base = Config.autoConfigure(null); + + return new ConfigBuilder(base) + .withMasterUrl(buildMasterUrl(registryConfig)) + .withApiVersion(registryConfig.getParameter(API_VERSION, base.getApiVersion())) + .withNamespace(registryConfig.getParameter(NAMESPACE, base.getNamespace())) + .withUsername(registryConfig.getParameter(USERNAME, base.getUsername())) + .withPassword(registryConfig.getParameter(PASSWORD, base.getPassword())) + .withOauthToken(registryConfig.getParameter(OAUTH_TOKEN, base.getOauthToken())) + .withCaCertFile(registryConfig.getParameter(CA_CERT_FILE, base.getCaCertFile())) + .withCaCertData(registryConfig.getParameter(CA_CERT_DATA, decodeBase64(base.getCaCertData()))) + .withClientKeyFile(registryConfig.getParameter(CLIENT_KEY_FILE, base.getClientKeyFile())) + .withClientKeyData(registryConfig.getParameter(CLIENT_KEY_DATA, decodeBase64(base.getClientKeyData()))) + .withClientCertFile(registryConfig.getParameter(CLIENT_CERT_FILE, base.getClientCertFile())) + .withClientCertData(registryConfig.getParameter(CLIENT_CERT_DATA, decodeBase64(base.getClientCertData()))) + .withClientKeyAlgo(registryConfig.getParameter(CLIENT_KEY_ALGO, base.getClientKeyAlgo())) + .withClientKeyPassphrase(registryConfig.getParameter(CLIENT_KEY_PASSPHRASE, base.getClientKeyPassphrase())) + .withConnectionTimeout(registryConfig.getParameter(CONNECTION_TIMEOUT, base.getConnectionTimeout())) + .withRequestTimeout(registryConfig.getParameter(REQUEST_TIMEOUT, base.getRequestTimeout())) + .withWatchReconnectInterval( + registryConfig.getParameter(WATCH_RECONNECT_INTERVAL, base.getWatchReconnectInterval())) + .withWatchReconnectLimit(registryConfig.getParameter(WATCH_RECONNECT_LIMIT, base.getWatchReconnectLimit())) + .withLoggingInterval(registryConfig.getParameter(LOGGING_INTERVAL, base.getLoggingInterval())) + .withTrustCerts(registryConfig.getParameter(TRUST_CERTS, base.isTrustCerts())) + .withHttp2Disable(registryConfig.getParameter(HTTP2_DISABLE, base.isHttp2Disable())) + .withHttpProxy(registryConfig.getParameter(HTTP_PROXY, base.getHttpProxy())) + .withHttpsProxy(registryConfig.getParameter(HTTPS_PROXY, base.getHttpsProxy())) + .withProxyUsername(registryConfig.getParameter(PROXY_USERNAME, base.getProxyUsername())) + .withProxyPassword(registryConfig.getParameter(PROXY_PASSWORD, base.getProxyPassword())) + .build(); + } + + private static String buildMasterUrl(RegistryConfig registryConfig) { + String address = registryConfig.getAddress(); + if (StringUtils.isBlank(address)) { + return DEFAULT_MASTER_URL; + } + if (address.startsWith("http")) { + return address; + } + return registryConfig.getParameter(USE_HTTPS, true) ? "https://" + address : "http://" + address; + } + + private static String decodeBase64(String str) { + return StringUtils.isNotEmpty(str) ? new String(Base64.getDecoder().decode(str)) : null; + } +} diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java new file mode 100644 index 000000000..7f543df82 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.api.model.EndpointsBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static com.alipay.sofa.rpc.registry.kubernetes.KubernetesRegistryHelper.buildDataId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class KubernetesRegistryTest { + + private static final String NAMESPACE = "TestNameSpace"; + private static final String POD_NAME = "TestPodName"; + private static final String APP_NAME = "TestAppName"; + private static final String SERVICE_NAME = "TestService"; + + public KubernetesServer mockServer; + + private NamespacedKubernetesClient mockClient; + + private static KubernetesRegistry kubernetesRegistry; + + private static RegistryConfig registryConfig; + + private static ConsumerConfig consumer; + + /** + * Ad before class. + */ + @BeforeClass + public static void adBeforeClass() { + RpcRunningState.setUnitTestMode(true); + } + + /** + * Ad after class. + */ + @AfterClass + public static void adAfterClass() { + RpcRuntimeContext.destroy(); + RpcInternalContext.removeContext(); + RpcInvokeContext.removeContext(); + } + + @Before + public void setup() { + mockServer = new KubernetesServer(false, true); + mockServer.before(); + mockClient = mockServer.getClient().inNamespace(NAMESPACE); + + registryConfig = new RegistryConfig(); + registryConfig.setProtocol("kubernetes"); + registryConfig.setAddress(mockClient.getConfiguration().getMasterUrl()); + // registryConfig.setParameter("trustCerts", "true"); + registryConfig.setParameter("namespace", NAMESPACE); + registryConfig.setParameter("useHttps", "false"); + registryConfig.setParameter("http2Disable", "true"); + + kubernetesRegistry = new KubernetesRegistry(registryConfig); + kubernetesRegistry.init(); + kubernetesRegistry.setCurrentHostname(POD_NAME); + + System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false"); + System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false"); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .endMetadata() + .withNewStatus() + .withPodIP("192.168.1.100") + .endStatus() + .build(); + + Service service = new ServiceBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .withNewSpec() + .endSpec() + .build(); + + Endpoints endPoints = new EndpointsBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .addNewSubset() + .addNewAddress() + .withIp("ip1") + .withNewTargetRef() + .withUid("uid1") + .withName(POD_NAME) + .endTargetRef() + .endAddress() + .addNewPort("Test", "Test", 12345, "TCP") + .endSubset() + .build(); + + mockClient.pods().inNamespace(NAMESPACE).create(pod); + mockClient.services().inNamespace(NAMESPACE).create(service); + mockClient.endpoints().inNamespace(NAMESPACE).create(endPoints); + + Assert.assertTrue(kubernetesRegistry.start()); + } + + @After + public void cleanup() { + kubernetesRegistry.destroy(); + mockClient.close(); + mockServer.after(); + } + + @Test + public void testAll() throws InterruptedException { + ApplicationConfig applicationConfig = new ApplicationConfig() + .setAppName(APP_NAME); + + ServerConfig serverConfig1 = new ServerConfig() + .setProtocol("bolt") + .setPort(12200) + .setDaemon(false); + + ProviderConfig providerConfig1 = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl()) + .setDelay(20) + .setServer(serverConfig1); + + // 注册第一个providerConfig1 + kubernetesRegistry.register(providerConfig1); + + ServerConfig serverConfig2 = new ServerConfig() + .setProtocol("h2c") + .setPort(12202) + .setDaemon(false); + + ProviderConfig providerConfig2 = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService2.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl2()) + .setDelay(20) + .setServer(serverConfig2); + + // 注册第二个providerConfig2 + kubernetesRegistry.register(providerConfig2); + + List items = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + + Assert.assertEquals(1, items.size()); + Pod pod = items.get(0); + String annotationBolt = pod.getMetadata().getAnnotations().get(buildDataId(providerConfig1, "bolt")); + Assert.assertNotNull(annotationBolt); + String annotationH2c = pod.getMetadata().getAnnotations().get(buildDataId(providerConfig2, "h2c")); + Assert.assertNotNull(annotationH2c); + + // 订阅 + consumer = new ConsumerConfig(); + consumer.setInterfaceId("com.alipay.sofa.rpc.registry.kubernetes.TestService") + .setApplication(applicationConfig) + .setProxy("javassist") + .setSubscribe(true) + .setSerialization("java") + .setInvokeType("sync") + .setTimeout(4444); + + CountDownLatch latch = new CountDownLatch(1); + MockProviderInfoListener providerInfoListener = new MockProviderInfoListener(); + providerInfoListener.setCountDownLatch(latch); + consumer.setProviderInfoListener(providerInfoListener); + List all = kubernetesRegistry.subscribe(consumer); + providerInfoListener.updateAllProviders(all); + latch.await(5000, TimeUnit.MILLISECONDS); + Map ps = providerInfoListener.getData(); + + Assert.assertEquals(1, kubernetesRegistry.getConsumerListeners().size()); + Assert.assertTrue(ps.size() > 0); + Assert.assertEquals(1, ps.size()); + Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); + Assert.assertTrue(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size() > 0); + + // 一次发2个端口的再次注册 + latch = new CountDownLatch(2); + providerInfoListener.setCountDownLatch(latch); + ServerConfig serverConfig = new ServerConfig() + .setProtocol("bolt") + .setHost("0.0.0.0") + .setDaemon(false) + .setPort(12201); + providerConfig1.getServer().add(serverConfig); + kubernetesRegistry.register(providerConfig1); + latch.await(5000 * 2, TimeUnit.MILLISECONDS); + Assert.assertTrue(ps.size() > 0); + Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); + Assert.assertEquals(1, ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size()); + + // 反订阅 + kubernetesRegistry.unSubscribe(consumer); + Assert.assertEquals(0, kubernetesRegistry.getConsumerListeners().size()); + + // 反注册providerConfig1 + kubernetesRegistry.unRegister(providerConfig1); + // 反注册providerConfig2 + kubernetesRegistry.unRegister(providerConfig2); + + List unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + Assert.assertEquals(0, unRegisterItems.get(0).getMetadata().getAnnotations().size()); + } + + private static class MockProviderInfoListener implements ProviderInfoListener { + + Map providerGroupMap = new HashMap<>(); + + private CountDownLatch countDownLatch; + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void addProvider(ProviderGroup providerGroup) { + + } + + @Override + public void removeProvider(ProviderGroup providerGroup) { + + } + + @Override + public void updateProviders(ProviderGroup providerGroup) { + + providerGroupMap.put(providerGroup.getName(), providerGroup); + if (countDownLatch != null) { + countDownLatch.countDown(); + countDownLatch = null; + } + } + + @Override + public void updateAllProviders(List providerGroups) { + providerGroupMap.clear(); + + if (providerGroups == null || providerGroups.size() == 0) { + } else { + for (ProviderGroup p : providerGroups) { + providerGroupMap.put(p.getName(), p); + } + + } + } + + public Map getData() { + return providerGroupMap; + } + } + + @Test + public void testUpdatePodAnnotations() { + + // 创建一个新的 Pod + String podName = "test-pod"; + String namespace = "test-namespace"; + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(podName) + .withNamespace(namespace) + .endMetadata() + .build(); + + // 在模拟环境中创建 Pod + pod = mockClient.pods().inNamespace(namespace).create(pod); + assertNotNull(pod); + + // 准备要更新的 annotations + Map annotations = new HashMap<>(); + annotations.put("example.com/annotation", "value"); + + // 更新 Pod 的 annotations + pod = new PodBuilder(pod) + .editMetadata() + .addToAnnotations(annotations) + .endMetadata() + .build(); + + // 在模拟环境中更新 Pod + pod = mockClient.pods().inNamespace(namespace).withName(podName).replace(pod); + + // 获取并验证 annotations 是否已更新 + assertEquals("value", pod.getMetadata().getAnnotations().get("example.com/annotation")); + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java new file mode 100644 index 000000000..8cb791154 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java new file mode 100644 index 000000000..30d7392e6 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService2 { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java new file mode 100644 index 000000000..25b7cc7c3 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl implements TestService { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java new file mode 100644 index 000000000..bc6c3aa1b --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl2 implements TestService2 { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java b/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java index 7161a8702..98dec21bc 100644 --- a/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java +++ b/registry/registry-nacos/src/main/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistry.java @@ -20,12 +20,12 @@ import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; -import com.alibaba.nacos.api.naming.listener.Event; import com.alibaba.nacos.api.naming.listener.EventListener; import com.alibaba.nacos.api.naming.listener.NamingEvent; import com.alibaba.nacos.api.naming.pojo.Instance; import com.alipay.sofa.rpc.client.ProviderGroup; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; import com.alipay.sofa.rpc.common.utils.CommonUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; @@ -146,6 +146,10 @@ public synchronized void init() { nacosConfig.putAll(parameters); } + if (providerObserver == null) { + providerObserver = new NacosRegistryProviderObserver(); + } + try { namingService = NamingFactory.createNamingService(nacosConfig); } catch (NacosException e) { @@ -272,26 +276,19 @@ public List subscribe(final ConsumerConfig config) { } try { - if (providerObserver == null) { - providerObserver = new NacosRegistryProviderObserver(); - } - ProviderInfoListener providerInfoListener = config.getProviderInfoListener(); providerObserver.addProviderListener(config, providerInfoListener); - EventListener eventListener = new EventListener() { - @Override - public void onEvent(Event event) { - if (event instanceof NamingEvent) { - NamingEvent namingEvent = (NamingEvent) event; - List instances = namingEvent.getInstances(); - // avoid npe - if (null == instances) { - instances = new ArrayList(); - } - instances.removeIf(i -> !i.isEnabled()); - providerObserver.updateProviders(config, instances); + EventListener eventListener = event -> { + if (event instanceof NamingEvent) { + NamingEvent namingEvent = (NamingEvent) event; + List instances = namingEvent.getInstances(); + // avoid npe + if (null == instances) { + instances = new ArrayList(); } + instances.removeIf(i -> !i.isEnabled()); + providerObserver.updateProviders(config, instances); } }; namingService.subscribe(serviceName, defaultCluster, eventListener); @@ -359,4 +356,14 @@ public void destroy() { public Properties getNacosConfig() { return nacosConfig; } + + /** + * UT only + * + * @return + */ + @VisibleForTesting + public NacosRegistryProviderObserver getProviderObserver() { + return providerObserver; + } } diff --git a/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java b/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java index 373af0eb6..dc6e7f114 100644 --- a/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java +++ b/registry/registry-nacos/src/test/java/com/alipay/sofa/rpc/registry/nacos/NacosRegistryTest.java @@ -18,6 +18,7 @@ import com.alipay.sofa.rpc.client.ProviderGroup; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.struct.ConcurrentHashSet; import com.alipay.sofa.rpc.config.ApplicationConfig; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; @@ -36,9 +37,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -66,8 +70,6 @@ public void setUp() { .setRegister(true); registry = (NacosRegistry) RegistryFactory.getRegistry(registryConfig); - registry.init(); - Assert.assertTrue(registry.start()); } /** @@ -77,7 +79,30 @@ public void setUp() { public void tearDown() { registry.destroy(); registry = null; - serverConfig.destroy(); + } + + @Test + public void testMuiltInit() throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(10); + final CountDownLatch latch = new CountDownLatch(10); + Set sets = new ConcurrentHashSet<>(); + + for (int i = 0; i < 10; i++) { + executorService.submit(() -> { + try { + registry.init(); + NacosRegistryProviderObserver providerObserver = registry.getProviderObserver(); + sets.add(providerObserver); + } finally { + latch.countDown(); + } + }); + } + + latch.await(); + executorService.shutdown(); + + Assert.assertEquals(1, sets.size()); } /** @@ -87,6 +112,8 @@ public void tearDown() { */ @Test public void testProviderObserver() throws Exception { + registry.init(); + Assert.assertTrue(registry.start()); int timeoutPerSub = 2000; //wait nacos startup ok @@ -227,6 +254,7 @@ public void testProviderObserver() throws Exception { List consumerConfigList = new ArrayList<>(); consumerConfigList.add(consumer2); registry.batchUnSubscribe(consumerConfigList); + serverConfig.destroy(); } /** @@ -236,6 +264,8 @@ public void testProviderObserver() throws Exception { */ @Test public void testVirtualHostAndVirtualPort() throws Exception { + registry.init(); + Assert.assertTrue(registry.start()); //wait nacos startup ok TimeUnit.SECONDS.sleep(10); // 模拟的场景 client -> proxy:127.7.7.7:8888 -> netty:0.0.0.0:12200 @@ -297,6 +327,7 @@ public void testVirtualHostAndVirtualPort() throws Exception { virtualHost + ":" + virtualPort); Assert.assertEquals("The provider's host should be virtualHost", virtualHost, pri.getHost()); Assert.assertEquals("The provider's port should be virtualPort", virtualPort, pri.getPort()); + serverConfig.destroy(); } private static class MockProviderInfoListener implements ProviderInfoListener {