From af2a8c4e5eb11a11e0a6863f7a00484adf58a7ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=88=E9=93=AD?= Date: Wed, 24 Jan 2024 14:21:38 +0800 Subject: [PATCH] support sofa registry kubernetes --- all/pom.xml | 6 + bom/pom.xml | 15 + .../sofa/rpc/config/RegistryConfig.java | 32 ++ registry/pom.xml | 1 + registry/registry-kubernetes/pom.xml | 39 ++ .../kubernetes/KubernetesRegistry.java | 304 +++++++++++++++ .../kubernetes/KubernetesRegistryHelper.java | 114 ++++++ .../KubernetesRegistryProviderWatcher.java | 78 ++++ .../constant/KubernetesClientConstants.java | 75 ++++ .../utils/KubernetesClientUtils.java | 28 ++ .../utils/KubernetesConfigUtils.java | 104 +++++ .../com.alipay.sofa.rpc.registry.Registry | 1 + .../kubernetes/KubernetesRegistryTest.java | 357 ++++++++++++++++++ .../rpc/registry/kubernetes/TestService.java | 22 ++ .../rpc/registry/kubernetes/TestService2.java | 22 ++ .../registry/kubernetes/TestServiceImpl.java | 25 ++ .../registry/kubernetes/TestServiceImpl2.java | 25 ++ .../src/test/resources/log4j.xml | 16 + .../org.mockito.plugins.MockMaker | 1 + .../test/resources/sofa-rpc/rpc-config.json | 4 + 20 files changed, 1269 insertions(+) create mode 100644 registry/registry-kubernetes/pom.xml create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java create mode 100644 registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java create mode 100644 registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java create mode 100755 registry/registry-kubernetes/src/test/resources/log4j.xml create mode 100644 registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker create mode 100644 registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json diff --git a/all/pom.xml b/all/pom.xml index 3daa4e511..df0690331 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -244,6 +244,11 @@ sofa-rpc-registry-polaris ${project.version} + + com.alipay.sofa + sofa-rpc-registry-kubernetes + ${project.version} + com.alipay.sofa sofa-rpc-remoting-bolt @@ -545,6 +550,7 @@ com.alipay.sofa:sofa-rpc-registry-multicast com.alipay.sofa:sofa-rpc-registry-sofa com.alipay.sofa:sofa-rpc-registry-polaris + com.alipay.sofa:sofa-rpc-registry-kubernetes com.alipay.sofa:sofa-rpc-remoting-bolt com.alipay.sofa:sofa-rpc-remoting-http com.alipay.sofa:sofa-rpc-remoting-resteasy diff --git a/bom/pom.xml b/bom/pom.xml index a97d8cc8f..d43a6183d 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -60,6 +60,8 @@ true true + + 6.9.2 @@ -514,6 +516,19 @@ ${grpc.version} + + + io.fabric8 + kubernetes-client + ${fabric8_kubernetes_version} + + + io.fabric8 + kubernetes-server-mock + test + ${fabric8_kubernetes_version} + + org.apache.curator diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java index b11533728..a3b865904 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/RegistryConfig.java @@ -398,6 +398,38 @@ public String getParameter(String key) { return parameters == null ? null : parameters.get(key); } + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public String getParameter(String key, String defaultValue) { + return getParameter(key) == null ? defaultValue : getParameter(key); + } + + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public int getParameter(String key, int defaultValue) { + return (parameters == null || parameters.get(key) == null) ? defaultValue : Integer.parseInt(parameters + .get(key)); + } + + /** + * Gets parameter or default. + * + * @param key the key + * @return the value + */ + public boolean getParameter(String key, boolean defaultValue) { + return (parameters == null || parameters.get(key) == null) ? defaultValue : Boolean.parseBoolean(parameters + .get(key)); + } + @Override public String toString() { return "RegistryConfig{" + diff --git a/registry/pom.xml b/registry/pom.xml index e65411b8e..4dc28e046 100644 --- a/registry/pom.xml +++ b/registry/pom.xml @@ -22,6 +22,7 @@ registry-multicast registry-sofa registry-polaris + registry-kubernetes diff --git a/registry/registry-kubernetes/pom.xml b/registry/registry-kubernetes/pom.xml new file mode 100644 index 000000000..b3bb820ff --- /dev/null +++ b/registry/registry-kubernetes/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + com.alipay.sofa + sofa-rpc-registry + ${revision} + + + sofa-rpc-registry-kubernetes + + + + com.alipay.sofa + sofa-rpc-log + + + com.alipay.sofa + sofa-rpc-api + + + com.alipay.sofa + sofa-rpc-codec-api + + + io.fabric8 + kubernetes-client + + + io.fabric8 + kubernetes-server-mock + test + + + + diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java new file mode 100644 index 000000000..bf91a5152 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; +import com.alipay.sofa.rpc.common.json.JSON; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import com.alipay.sofa.rpc.log.LogCodes; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.registry.Registry; +import com.alipay.sofa.rpc.registry.kubernetes.utils.KubernetesClientUtils; +import com.alipay.sofa.rpc.registry.kubernetes.utils.KubernetesConfigUtils; +import com.alipay.sofa.rpc.registry.utils.RegistryUtils; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; +import io.fabric8.kubernetes.client.dsl.PodResource; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY; + +@Extension("kubernetes") +public class KubernetesRegistry extends Registry { + + /** + * slf4j Logger for this class + */ + private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistry.class); + + private KubernetesClient kubernetesClient; + + private String currentHostname; + + private String namespace; + + private KubernetesRegistryProviderWatcher kubernetesRegistryProviderWatcher; + + // {dataId:ProviderConfig} + private final ConcurrentHashMap providerInstances = new ConcurrentHashMap<>(64); + + private final ConcurrentMap> consumerListeners = new ConcurrentHashMap<>(64); + + /** + * Instantiates a new kubernetes registry. + * + * @param registryConfig + */ + public KubernetesRegistry(RegistryConfig registryConfig) { + super(registryConfig); + } + + @Override + public void init() { + // init kubernetes config + Config config = KubernetesConfigUtils.buildKubernetesConfig(registryConfig); + // init kubernetes client + if (kubernetesClient == null) { + this.kubernetesClient = KubernetesClientUtils.buildKubernetesClient(config); + } + // init Watcher + if (kubernetesRegistryProviderWatcher == null) { + kubernetesRegistryProviderWatcher = new KubernetesRegistryProviderWatcher(); + } + this.currentHostname = System.getenv("HOSTNAME"); + this.namespace = config.getNamespace(); + } + + @Override + public boolean start() { + return true; + } + + @Override + public void register(ProviderConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isRegister()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return; + } + + if (config.isRegister()) { + PodResource podResource = kubernetesClient.pods() + .inNamespace(namespace) + .withName(currentHostname); + + List serverConfigs = config.getServer(); + + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + providerInstances.put(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()), config); + } + + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + // 将ProviderConfig存在Annotations上 + .addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true)) + .endMetadata().build()); + } + } + } + + @Override + public void unRegister(ProviderConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isRegister()) { + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return; + } + + if (config.isRegister()) { + List serverConfigs = config.getServer(); + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + providerInstances.remove(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol())); + } + } + + PodResource podResource = kubernetesClient.pods() + .inNamespace(namespace) + .withName(currentHostname); + + if (CommonUtils.isEmpty(providerInstances)) { + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + .removeFromAnnotations(ANNOTATION_KEY) + .endMetadata() + .build()); + } else { + // 考虑可能会有一个pod上多个ProviderConfig的场景,所以先全部移除然后将最新的providerInstances存在Annotations上 + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + .removeFromAnnotations(ANNOTATION_KEY) + .addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true)) + .endMetadata() + .build()); + } + } + } + + @Override + public void batchUnRegister(List configs) { + // one by one + for (ProviderConfig config : configs) { + try { + this.unRegister(config); + } catch (Exception e) { + LOGGER.errorWithApp(config.getAppName(), "Batch unregister error", e); + } + } + } + + @Override + public List subscribe(ConsumerConfig config) { + String appName = config.getAppName(); + if (!registryConfig.isSubscribe()) { + // registry ignored + if (LOGGER.isInfoEnabled(appName)) { + LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE)); + } + return null; + } + + if (config.isSubscribe()) { + + ProviderInfoListener providerInfoListener = config.getProviderInfoListener(); + kubernetesRegistryProviderWatcher.addProviderListener(config, providerInfoListener); + + FilterWatchListDeletable podPodListPodResourceFilterWatchListDeletable = + kubernetesClient.pods() + .inNamespace(namespace); + + SharedIndexInformer inform = podPodListPodResourceFilterWatchListDeletable.inform(new ResourceEventHandler() { + @Override + public void onAdd(Pod pod) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + + @Override + public void onUpdate(Pod pod, Pod t1) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + + @Override + public void onDelete(Pod pod, boolean b) { + kubernetesRegistryProviderWatcher.updateProviders(config, getPods()); + } + }); + + consumerListeners.put(config, inform); + + inform.start(); + + List pods = podPodListPodResourceFilterWatchListDeletable.list().getItems(); + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(pods, config); + List matchProviders = RegistryUtils.matchProviderInfos(config, providerInfos); + + return Collections.singletonList(new ProviderGroup().addAll(matchProviders)); + } + + return null; + } + + @Override + public void unSubscribe(ConsumerConfig config) { + if (config.isSubscribe()) { + SharedIndexInformer informer = consumerListeners.remove(config); + if (null != informer) { + informer.stop(); + } + } + + kubernetesRegistryProviderWatcher.removeProviderListener(config); + } + + @Override + public void batchUnSubscribe(List configs) { + // one by one + for (ConsumerConfig config : configs) { + try { + this.unSubscribe(config); + } catch (Exception e) { + LOGGER.errorWithApp(config.getAppName(), "Batch unSubscribe error", e); + } + } + } + + @Override + public void destroy() { + // unRegister provider + providerInstances.forEach((k, v) -> unRegister(v)); + + // unRegister consumer + consumerListeners.forEach((k, v) -> unSubscribe(k)); + + // close kubernetes client + if (CommonUtils.isEmpty(providerInstances)) { + kubernetesClient.close(); + } + } + + private List getPods() { + return kubernetesClient.pods() + .inNamespace(namespace) + .list() + .getItems(); + } + + /** + * UT used only + */ + @VisibleForTesting + public void setCurrentHostname(String currentHostname) { + this.currentHostname = currentHostname; + } + + /** + * UT used only + */ + @VisibleForTesting + public ConcurrentMap> getConsumerListeners() { + return consumerListeners; + } + + /** + * UT used only + */ + @VisibleForTesting + public ConcurrentMap getProviderInstances() { + return providerInstances; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java new file mode 100644 index 000000000..40678f448 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderHelper; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.json.JSON; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.AbstractInterfaceConfig; +import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.registry.utils.RegistryUtils; +import io.fabric8.kubernetes.api.model.Pod; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY; + +public class KubernetesRegistryHelper extends RegistryUtils { + + private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistryHelper.class); + + public static List convertPodsToProviders(List pods, ConsumerConfig config) { + List providerInfos = new ArrayList<>(); + if (CommonUtils.isEmpty(pods) || null == config) { + return providerInfos; + } + + for (Pod pod : pods) { + ProviderConfig providerConfig = getProviderConfig(pod, config); + if (null == providerConfig || null == providerConfig.getServer()) { + continue; + } + + providerConfig.getServer().forEach(serverConfig -> { + String url = convertInstanceToUrl(pod, (ServerConfig) serverConfig, providerConfig); + ProviderInfo providerInfo = ProviderHelper.toProviderInfo(url); + providerInfos.add(providerInfo); + }); + } + + return providerInfos; + } + + private static String convertInstanceToUrl(Pod pod, ServerConfig serverConfig, ProviderConfig providerConfig) { + String uri = ""; + String protocol = serverConfig.getProtocol(); + if (StringUtils.isNotEmpty(protocol)) { + uri = protocol + "://"; + } + uri += pod.getStatus().getPodIP() + ":" + serverConfig.getPort(); + + Map metaData = RegistryUtils.convertProviderToMap(providerConfig, serverConfig); + + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : metaData.entrySet()) { + sb.append("&").append(entry.getKey()).append("=").append(entry.getValue()); + } + if (sb.length() > 0) { + uri += sb.replace(0, 1, "?").toString(); + } + return uri; + } + + private static ProviderConfig getProviderConfig(Pod pod, ConsumerConfig config) { + try { + String providerConfigStr = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY); + if (StringUtils.isBlank(providerConfigStr)) { + return null; + } + + Map map = JSON.parseObject(providerConfigStr, Map.class); + + if (null == map) { + return null; + } + + return map.get(KubernetesRegistryHelper.buildDataId(config, config.getProtocol())); + } catch (Exception e) { + LOGGER.info("get provider config error with pod"); + return null; + } + } + + public static String buildDataId(AbstractInterfaceConfig config, String protocol) { + if (RpcConstants.PROTOCOL_TYPE_BOLT.equals(protocol) || RpcConstants.PROTOCOL_TYPE_TR.equals(protocol)) { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@DEFAULT"; + } else { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@" + protocol; + } + } +} diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java new file mode 100644 index 000000000..50bf0d7e7 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import com.alipay.sofa.rpc.registry.utils.RegistryUtils; +import io.fabric8.kubernetes.api.model.Pod; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class KubernetesRegistryProviderWatcher { + + /** + * The Provider add listener map. + */ + private final ConcurrentMap> providerListenerMap = new ConcurrentHashMap<>(); + + /** + * Add provider listener. + * + * @param consumerConfig the consumer config + * @param listener the listener + */ + public void addProviderListener(ConsumerConfig consumerConfig, ProviderInfoListener listener) { + if (listener != null) { + RegistryUtils.initOrAddList(providerListenerMap, consumerConfig, listener); + } + } + + /** + * Remove provider listener. + * + * @param consumerConfig the consumer config + */ + public void removeProviderListener(ConsumerConfig consumerConfig) { + providerListenerMap.remove(consumerConfig); + } + + /** + * Update providers. + * + * @param config the config + * @param podList the pod list + */ + public void updateProviders(ConsumerConfig config, List podList) { + List providerInfoListeners = providerListenerMap.get(config); + if (CommonUtils.isNotEmpty(providerInfoListeners)) { + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(podList, config); + List matchProviders = RegistryUtils.matchProviderInfos(config, providerInfos); + + for (ProviderInfoListener providerInfoListener : providerInfoListeners) { + providerInfoListener.updateAllProviders(Collections.singletonList(new ProviderGroup().addAll(matchProviders))); + } + } + } + +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java new file mode 100644 index 000000000..959400d7c --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes.constant; + +public class KubernetesClientConstants { + + public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc"; + + public static final String ANNOTATION_KEY = "com.sofa.rpc/annotation"; + + public static final String TRUST_CERTS = "trustCerts"; + + public static final String USE_HTTPS = "useHttps"; + + public static final String HTTP2_DISABLE = "http2Disable"; + + public static final String NAMESPACE = "namespace"; + + public static final String API_VERSION = "apiVersion"; + + public static final String CA_CERT_FILE = "caCertFile"; + + public static final String CA_CERT_DATA = "caCertData"; + + public static final String CLIENT_CERT_FILE = "clientCertFile"; + + public static final String CLIENT_CERT_DATA = "clientCertData"; + + public static final String CLIENT_KEY_FILE = "clientKeyFile"; + + public static final String CLIENT_KEY_DATA = "clientKeyData"; + + public static final String CLIENT_KEY_ALGO = "clientKeyAlgo"; + + public static final String CLIENT_KEY_PASSPHRASE = "clientKeyPassphrase"; + + public static final String OAUTH_TOKEN = "oauthToken"; + + public static final String USERNAME = "username"; + + public static final String PASSWORD = "password"; + + public static final String WATCH_RECONNECT_INTERVAL = "watchReconnectInterval"; + + public static final String WATCH_RECONNECT_LIMIT = "watchReconnectLimit"; + + public static final String CONNECTION_TIMEOUT = "connectionTimeout"; + + public static final String REQUEST_TIMEOUT = "requestTimeout"; + + public static final String LOGGING_INTERVAL = "loggingInterval"; + + public static final String HTTP_PROXY = "httpProxy"; + + public static final String HTTPS_PROXY = "httpsProxy"; + + public static final String PROXY_USERNAME = "proxyUsername"; + + public static final String PROXY_PASSWORD = "proxyPassword"; + +} diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java new file mode 100644 index 000000000..274d40a99 --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesClientUtils.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes.utils; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; + +public class KubernetesClientUtils { + + public static KubernetesClient buildKubernetesClient(Config config) { + return new KubernetesClientBuilder().withConfig(config).build(); + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java new file mode 100644 index 000000000..f7aea188f --- /dev/null +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/utils/KubernetesConfigUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes.utils; + +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.RegistryConfig; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; + +import java.util.Base64; + +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.API_VERSION; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CA_CERT_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CA_CERT_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_CERT_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_CERT_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_ALGO; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_DATA; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_FILE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CLIENT_KEY_PASSPHRASE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.CONNECTION_TIMEOUT; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.DEFAULT_MASTER_URL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTP2_DISABLE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTPS_PROXY; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.HTTP_PROXY; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LOGGING_INTERVAL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.NAMESPACE; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.OAUTH_TOKEN; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PASSWORD; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PROXY_PASSWORD; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.PROXY_USERNAME; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.REQUEST_TIMEOUT; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.TRUST_CERTS; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.USERNAME; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.USE_HTTPS; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.WATCH_RECONNECT_INTERVAL; +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.WATCH_RECONNECT_LIMIT; + +public class KubernetesConfigUtils { + + public static Config buildKubernetesConfig(RegistryConfig registryConfig) { + + // Init default config + Config base = Config.autoConfigure(null); + + return new ConfigBuilder(base) + .withMasterUrl(buildMasterUrl(registryConfig)) + .withApiVersion(registryConfig.getParameter(API_VERSION, base.getApiVersion())) + .withNamespace(registryConfig.getParameter(NAMESPACE, base.getNamespace())) + .withUsername(registryConfig.getParameter(USERNAME, base.getUsername())) + .withPassword(registryConfig.getParameter(PASSWORD, base.getPassword())) + .withOauthToken(registryConfig.getParameter(OAUTH_TOKEN, base.getOauthToken())) + .withCaCertFile(registryConfig.getParameter(CA_CERT_FILE, base.getCaCertFile())) + .withCaCertData(registryConfig.getParameter(CA_CERT_DATA, decodeBase64(base.getCaCertData()))) + .withClientKeyFile(registryConfig.getParameter(CLIENT_KEY_FILE, base.getClientKeyFile())) + .withClientKeyData(registryConfig.getParameter(CLIENT_KEY_DATA, decodeBase64(base.getClientKeyData()))) + .withClientCertFile(registryConfig.getParameter(CLIENT_CERT_FILE, base.getClientCertFile())) + .withClientCertData(registryConfig.getParameter(CLIENT_CERT_DATA, decodeBase64(base.getClientCertData()))) + .withClientKeyAlgo(registryConfig.getParameter(CLIENT_KEY_ALGO, base.getClientKeyAlgo())) + .withClientKeyPassphrase(registryConfig.getParameter(CLIENT_KEY_PASSPHRASE, base.getClientKeyPassphrase())) + .withConnectionTimeout(registryConfig.getParameter(CONNECTION_TIMEOUT, base.getConnectionTimeout())) + .withRequestTimeout(registryConfig.getParameter(REQUEST_TIMEOUT, base.getRequestTimeout())) + .withWatchReconnectInterval( + registryConfig.getParameter(WATCH_RECONNECT_INTERVAL, base.getWatchReconnectInterval())) + .withWatchReconnectLimit(registryConfig.getParameter(WATCH_RECONNECT_LIMIT, base.getWatchReconnectLimit())) + .withLoggingInterval(registryConfig.getParameter(LOGGING_INTERVAL, base.getLoggingInterval())) + .withTrustCerts(registryConfig.getParameter(TRUST_CERTS, base.isTrustCerts())) + .withHttp2Disable(registryConfig.getParameter(HTTP2_DISABLE, base.isHttp2Disable())) + .withHttpProxy(registryConfig.getParameter(HTTP_PROXY, base.getHttpProxy())) + .withHttpsProxy(registryConfig.getParameter(HTTPS_PROXY, base.getHttpsProxy())) + .withProxyUsername(registryConfig.getParameter(PROXY_USERNAME, base.getProxyUsername())) + .withProxyPassword(registryConfig.getParameter(PROXY_PASSWORD, base.getProxyPassword())) + .build(); + } + + private static String buildMasterUrl(RegistryConfig registryConfig) { + String address = registryConfig.getAddress(); + if (StringUtils.isBlank(address)) { + return DEFAULT_MASTER_URL; + } + if (address.startsWith("http")) { + return address; + } + return registryConfig.getParameter(USE_HTTPS, true) ? "https://" + address : "http://" + address; + } + + private static String decodeBase64(String str) { + return StringUtils.isNotEmpty(str) ? new String(Base64.getDecoder().decode(str)) : null; + } +} diff --git a/registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry b/registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry new file mode 100644 index 000000000..3bf1f8e63 --- /dev/null +++ b/registry/registry-kubernetes/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.registry.Registry @@ -0,0 +1 @@ +kubernetes=com.alipay.sofa.rpc.registry.kubernetes.KubernetesRegistry \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java new file mode 100644 index 000000000..e9f689d5d --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.json.JSON; +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.config.ProviderConfig; +import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import com.alipay.sofa.rpc.listener.ProviderInfoListener; +import io.fabric8.kubernetes.api.model.Endpoints; +import io.fabric8.kubernetes.api.model.EndpointsBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.api.model.ServiceBuilder; +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.NamespacedKubernetesClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class KubernetesRegistryTest { + + private static final String NAMESPACE = "TestNameSpace"; + private static final String POD_NAME = "TestPodName"; + private static final String APP_NAME = "TestAppName"; + private static final String SERVICE_NAME = "TestService"; + + public KubernetesServer mockServer; + + private NamespacedKubernetesClient mockClient; + + private static KubernetesRegistry kubernetesRegistry; + + private static RegistryConfig registryConfig; + + private static ConsumerConfig consumer; + + /** + * Ad before class. + */ + @BeforeClass + public static void adBeforeClass() { + RpcRunningState.setUnitTestMode(true); + } + + /** + * Ad after class. + */ + @AfterClass + public static void adAfterClass() { + RpcRuntimeContext.destroy(); + RpcInternalContext.removeContext(); + RpcInvokeContext.removeContext(); + } + + @Before + public void setup() { + mockServer = new KubernetesServer(false, true); + mockServer.before(); + mockClient = mockServer.getClient().inNamespace(NAMESPACE); + + registryConfig = new RegistryConfig(); + registryConfig.setProtocol("kubernetes"); + registryConfig.setAddress(mockClient.getConfiguration().getMasterUrl()); + // registryConfig.setParameter("trustCerts", "true"); + registryConfig.setParameter("namespace", NAMESPACE); + registryConfig.setParameter("useHttps", "false"); + registryConfig.setParameter("http2Disable", "true"); + + kubernetesRegistry = new KubernetesRegistry(registryConfig); + kubernetesRegistry.init(); + kubernetesRegistry.setCurrentHostname(POD_NAME); + + System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false"); + System.setProperty(Config.KUBERNETES_AUTH_TRYSERVICEACCOUNT_SYSTEM_PROPERTY, "false"); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .endMetadata() + .withNewStatus() + .withPodIP("192.168.1.100") + .endStatus() + .build(); + + Service service = new ServiceBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .withNewSpec() + .endSpec() + .build(); + + Endpoints endPoints = new EndpointsBuilder() + .withNewMetadata() + .withName(SERVICE_NAME) + .endMetadata() + .addNewSubset() + .addNewAddress() + .withIp("ip1") + .withNewTargetRef() + .withUid("uid1") + .withName(POD_NAME) + .endTargetRef() + .endAddress() + .addNewPort("Test", "Test", 12345, "TCP") + .endSubset() + .build(); + + mockClient.pods().inNamespace(NAMESPACE).create(pod); + mockClient.services().inNamespace(NAMESPACE).create(service); + mockClient.endpoints().inNamespace(NAMESPACE).create(endPoints); + + Assert.assertTrue(kubernetesRegistry.start()); + } + + @After + public void cleanup() { + kubernetesRegistry.destroy(); + mockClient.close(); + mockServer.after(); + } + + @Test + public void testAll() throws InterruptedException { + ApplicationConfig applicationConfig = new ApplicationConfig() + .setAppName(APP_NAME); + + ServerConfig serverConfig1 = new ServerConfig() + .setProtocol("bolt") + .setPort(12200) + .setDaemon(false); + + ProviderConfig providerConfig1 = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl()) + .setDelay(20) + .setServer(serverConfig1); + + // 注册第一个providerConfig1 + kubernetesRegistry.register(providerConfig1); + + ServerConfig serverConfig2 = new ServerConfig() + .setProtocol("h2c") + .setPort(12202) + .setDaemon(false); + + ProviderConfig providerConfig2 = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService2.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl2()) + .setDelay(20) + .setServer(serverConfig2); + + // 注册第二个providerConfig2 + kubernetesRegistry.register(providerConfig2); + + Assert.assertEquals(2, kubernetesRegistry.getProviderInstances().size()); + + List items = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + + Assert.assertEquals(1, items.size()); + Pod pod = items.get(0); + String annotation = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY); + Assert.assertNotNull(annotation); + Map map = JSON.parseObject(annotation, Map.class); + ProviderConfig bolt = map.get(KubernetesRegistryHelper.buildDataId(providerConfig1, "bolt")); + List boltRegistry = bolt.getRegistry(); + Assert.assertEquals("kubernetes", boltRegistry.get(0).getProtocol()); + + ProviderConfig h2c = map.get(KubernetesRegistryHelper.buildDataId(providerConfig2, "h2c")); + List h2cRegistry = h2c.getRegistry(); + Assert.assertEquals("kubernetes", h2cRegistry.get(0).getProtocol()); + + // 订阅 + consumer = new ConsumerConfig(); + consumer.setInterfaceId("com.alipay.sofa.rpc.registry.kubernetes.TestService") + .setApplication(applicationConfig) + .setProxy("javassist") + .setSubscribe(true) + .setSerialization("java") + .setInvokeType("sync") + .setTimeout(4444); + + CountDownLatch latch = new CountDownLatch(1); + MockProviderInfoListener providerInfoListener = new MockProviderInfoListener(); + providerInfoListener.setCountDownLatch(latch); + consumer.setProviderInfoListener(providerInfoListener); + List all = kubernetesRegistry.subscribe(consumer); + providerInfoListener.updateAllProviders(all); + latch.await(5000, TimeUnit.MILLISECONDS); + Map ps = providerInfoListener.getData(); + + Assert.assertEquals(1, kubernetesRegistry.getConsumerListeners().size()); + Assert.assertTrue(ps.size() > 0); + Assert.assertEquals(1, ps.size()); + Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); + Assert.assertTrue(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size() > 0); + + // 一次发2个端口的再次注册 + latch = new CountDownLatch(2); + providerInfoListener.setCountDownLatch(latch); + ServerConfig serverConfig = new ServerConfig() + .setProtocol("bolt") + .setHost("0.0.0.0") + .setDaemon(false) + .setPort(12201); + providerConfig1.getServer().add(serverConfig); + kubernetesRegistry.register(providerConfig1); + latch.await(5000 * 2, TimeUnit.MILLISECONDS); + Assert.assertTrue(ps.size() > 0); + Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); + Assert.assertEquals(2, ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP).size()); + + // 反订阅 + kubernetesRegistry.unSubscribe(consumer); + Assert.assertEquals(0, kubernetesRegistry.getConsumerListeners().size()); + + // 反注册providerConfig1 + kubernetesRegistry.unRegister(providerConfig1); + // 反注册providerConfig2 + kubernetesRegistry.unRegister(providerConfig2); + + List unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + Assert.assertEquals(0, kubernetesRegistry.getProviderInstances().size()); + String unRegisterAnnotations = unRegisterItems.get(0).getMetadata().getAnnotations().get(ANNOTATION_KEY); + Assert.assertNull(unRegisterAnnotations); + + } + + private static class MockProviderInfoListener implements ProviderInfoListener { + + Map providerGroupMap = new HashMap<>(); + + private CountDownLatch countDownLatch; + + public void setCountDownLatch(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void addProvider(ProviderGroup providerGroup) { + + } + + @Override + public void removeProvider(ProviderGroup providerGroup) { + + } + + @Override + public void updateProviders(ProviderGroup providerGroup) { + + providerGroupMap.put(providerGroup.getName(), providerGroup); + if (countDownLatch != null) { + countDownLatch.countDown(); + countDownLatch = null; + } + } + + @Override + public void updateAllProviders(List providerGroups) { + providerGroupMap.clear(); + + if (providerGroups == null || providerGroups.size() == 0) { + } else { + for (ProviderGroup p : providerGroups) { + providerGroupMap.put(p.getName(), p); + } + + } + } + + public Map getData() { + return providerGroupMap; + } + } + + @Test + public void testUpdatePodAnnotations() { + + // 创建一个新的 Pod + String podName = "test-pod"; + String namespace = "test-namespace"; + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(podName) + .withNamespace(namespace) + .endMetadata() + .build(); + + // 在模拟环境中创建 Pod + pod = mockClient.pods().inNamespace(namespace).create(pod); + assertNotNull(pod); + + // 准备要更新的 annotations + Map annotations = new HashMap<>(); + annotations.put("example.com/annotation", "value"); + + // 更新 Pod 的 annotations + pod = new PodBuilder(pod) + .editMetadata() + .addToAnnotations(annotations) + .endMetadata() + .build(); + + // 在模拟环境中更新 Pod + pod = mockClient.pods().inNamespace(namespace).withName(podName).replace(pod); + + // 获取并验证 annotations 是否已更新 + assertEquals("value", pod.getMetadata().getAnnotations().get("example.com/annotation")); + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java new file mode 100644 index 000000000..8cb791154 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java new file mode 100644 index 000000000..30d7392e6 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService2 { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java new file mode 100644 index 000000000..25b7cc7c3 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl implements TestService { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java new file mode 100644 index 000000000..bc6c3aa1b --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl2 implements TestService2 { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/log4j.xml b/registry/registry-kubernetes/src/test/resources/log4j.xml new file mode 100755 index 000000000..e95634f16 --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/log4j.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000..ca6ee9cea --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json b/registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json new file mode 100644 index 000000000..a555027fe --- /dev/null +++ b/registry/registry-kubernetes/src/test/resources/sofa-rpc/rpc-config.json @@ -0,0 +1,4 @@ +{ + "rpc.config.order": 999, // 加载顺序,越大越后加载 + "logger.impl" : "com.alipay.sofa.rpc.log.SLF4JLoggerImpl" +} \ No newline at end of file