From d2c502e5df8ea39be03e5fccd5f2ef3565815618 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=91=88=E9=93=AD?= Date: Thu, 14 Mar 2024 11:27:23 +0800 Subject: [PATCH] support k8s registry --- .../kubernetes/KubernetesRegistry.java | 80 +++++++++++++------ .../kubernetes/KubernetesRegistryHelper.java | 32 +++++--- .../KubernetesRegistryProviderWatcher.java | 2 +- .../constant/KubernetesClientConstants.java | 2 - .../kubernetes/KubernetesRegistryTest.java | 72 +++++++++++------ .../rpc/registry/kubernetes/TestService2.java | 22 +++++ .../registry/kubernetes/TestServiceImpl2.java | 25 ++++++ 7 files changed, 167 insertions(+), 68 deletions(-) create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java create mode 100644 registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java index 8c36817c5..bf91a5152 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistry.java @@ -20,9 +20,11 @@ import com.alipay.sofa.rpc.client.ProviderInfo; import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; import com.alipay.sofa.rpc.common.json.JSON; +import com.alipay.sofa.rpc.common.utils.CommonUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.RegistryConfig; +import com.alipay.sofa.rpc.config.ServerConfig; import com.alipay.sofa.rpc.ext.Extension; import com.alipay.sofa.rpc.listener.ProviderInfoListener; import com.alipay.sofa.rpc.log.LogCodes; @@ -48,7 +50,6 @@ import java.util.concurrent.ConcurrentMap; import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY; -import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LABEL_KEY; @Extension("kubernetes") public class KubernetesRegistry extends Registry { @@ -66,7 +67,8 @@ public class KubernetesRegistry extends Registry { private KubernetesRegistryProviderWatcher kubernetesRegistryProviderWatcher; - private final ConcurrentMap providerInstances = new ConcurrentHashMap<>(64); + // {dataId:ProviderConfig} + private final ConcurrentHashMap providerInstances = new ConcurrentHashMap<>(64); private final ConcurrentMap> consumerListeners = new ConcurrentHashMap<>(64); @@ -87,6 +89,10 @@ public void init() { if (kubernetesClient == null) { this.kubernetesClient = KubernetesClientUtils.buildKubernetesClient(config); } + // init Watcher + if (kubernetesRegistryProviderWatcher == null) { + kubernetesRegistryProviderWatcher = new KubernetesRegistryProviderWatcher(); + } this.currentHostname = System.getenv("HOSTNAME"); this.namespace = config.getNamespace(); } @@ -111,13 +117,18 @@ public void register(ProviderConfig config) { .inNamespace(namespace) .withName(currentHostname); - Pod curPod = podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() - // sava provider config to k8s pod - .addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(config)) - // add labels on service provider and have the service consumer subscribe based on the labels for selection - .addToLabels(LABEL_KEY, config.getInterfaceId()).endMetadata().build()); + List serverConfigs = config.getServer(); + + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + providerInstances.put(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()), config); + } - providerInstances.put(config, curPod); + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + // 将ProviderConfig存在Annotations上 + .addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true)) + .endMetadata().build()); + } } } @@ -132,16 +143,30 @@ public void unRegister(ProviderConfig config) { } if (config.isRegister()) { - providerInstances.remove(config); + List serverConfigs = config.getServer(); + if (CommonUtils.isNotEmpty(serverConfigs)) { + for (ServerConfig serverConfig : serverConfigs) { + providerInstances.remove(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol())); + } + } - kubernetesClient.pods() + PodResource podResource = kubernetesClient.pods() .inNamespace(namespace) - .withName(currentHostname) - .edit(pod -> new PodBuilder(pod).editOrNewMetadata() - .removeFromAnnotations(ANNOTATION_KEY) - .removeFromLabels(LABEL_KEY) - .endMetadata() - .build()); + .withName(currentHostname); + + if (CommonUtils.isEmpty(providerInstances)) { + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + .removeFromAnnotations(ANNOTATION_KEY) + .endMetadata() + .build()); + } else { + // 考虑可能会有一个pod上多个ProviderConfig的场景,所以先全部移除然后将最新的providerInstances存在Annotations上 + podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata() + .removeFromAnnotations(ANNOTATION_KEY) + .addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true)) + .endMetadata() + .build()); + } } } @@ -170,17 +195,12 @@ public List subscribe(ConsumerConfig config) { if (config.isSubscribe()) { - if (kubernetesRegistryProviderWatcher == null) { - kubernetesRegistryProviderWatcher = new KubernetesRegistryProviderWatcher(); - } - ProviderInfoListener providerInfoListener = config.getProviderInfoListener(); kubernetesRegistryProviderWatcher.addProviderListener(config, providerInfoListener); FilterWatchListDeletable podPodListPodResourceFilterWatchListDeletable = kubernetesClient.pods() - .inNamespace(namespace) - .withLabel(LABEL_KEY); + .inNamespace(namespace); SharedIndexInformer inform = podPodListPodResourceFilterWatchListDeletable.inform(new ResourceEventHandler() { @Override @@ -204,7 +224,7 @@ public void onDelete(Pod pod, boolean b) { inform.start(); List pods = podPodListPodResourceFilterWatchListDeletable.list().getItems(); - List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(pods); + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(pods, config); List matchProviders = RegistryUtils.matchProviderInfos(config, providerInfos); return Collections.singletonList(new ProviderGroup().addAll(matchProviders)); @@ -240,19 +260,20 @@ public void batchUnSubscribe(List configs) { @Override public void destroy() { // unRegister provider - providerInstances.forEach((k, v) -> unRegister(k)); + providerInstances.forEach((k, v) -> unRegister(v)); // unRegister consumer consumerListeners.forEach((k, v) -> unSubscribe(k)); // close kubernetes client - kubernetesClient.close(); + if (CommonUtils.isEmpty(providerInstances)) { + kubernetesClient.close(); + } } private List getPods() { return kubernetesClient.pods() .inNamespace(namespace) - .withLabel(LABEL_KEY) .list() .getItems(); } @@ -273,4 +294,11 @@ public ConcurrentMap> getConsumerListen return consumerListeners; } + /** + * UT used only + */ + @VisibleForTesting + public ConcurrentMap getProviderInstances() { + return providerInstances; + } } \ No newline at end of file diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java index 8afa0fab2..40678f448 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryHelper.java @@ -18,9 +18,13 @@ import com.alipay.sofa.rpc.client.ProviderHelper; import com.alipay.sofa.rpc.client.ProviderInfo; +import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.common.json.JSON; import com.alipay.sofa.rpc.common.utils.CommonUtils; import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.config.AbstractInterfaceConfig; +import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator; +import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; import com.alipay.sofa.rpc.log.Logger; @@ -33,20 +37,19 @@ import java.util.Map; import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY; -import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LABEL_KEY; public class KubernetesRegistryHelper extends RegistryUtils { private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistryHelper.class); - public static List convertPodsToProviders(List pods) { + public static List convertPodsToProviders(List pods, ConsumerConfig config) { List providerInfos = new ArrayList<>(); - if (CommonUtils.isEmpty(pods)) { + if (CommonUtils.isEmpty(pods) || null == config) { return providerInfos; } for (Pod pod : pods) { - ProviderConfig providerConfig = getProviderConfig(pod); + ProviderConfig providerConfig = getProviderConfig(pod, config); if (null == providerConfig || null == providerConfig.getServer()) { continue; } @@ -81,28 +84,31 @@ private static String convertInstanceToUrl(Pod pod, ServerConfig serverConfig, P return uri; } - private static ProviderConfig getProviderConfig(Pod pod) { + private static ProviderConfig getProviderConfig(Pod pod, ConsumerConfig config) { try { - String interfaceId = pod.getMetadata().getLabels().get(LABEL_KEY); - if (StringUtils.isBlank(interfaceId)) { - return null; - } - String providerConfigStr = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY); if (StringUtils.isBlank(providerConfigStr)) { return null; } - ProviderConfig providerConfig = JSON.parseObject(providerConfigStr, ProviderConfig.class); - if (null == providerConfig) { + Map map = JSON.parseObject(providerConfigStr, Map.class); + + if (null == map) { return null; } - return providerConfig; + return map.get(KubernetesRegistryHelper.buildDataId(config, config.getProtocol())); } catch (Exception e) { LOGGER.info("get provider config error with pod"); return null; } } + public static String buildDataId(AbstractInterfaceConfig config, String protocol) { + if (RpcConstants.PROTOCOL_TYPE_BOLT.equals(protocol) || RpcConstants.PROTOCOL_TYPE_TR.equals(protocol)) { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@DEFAULT"; + } else { + return ConfigUniqueNameGenerator.getUniqueName(config) + "@" + protocol; + } + } } diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java index 1843aa9be..50bf0d7e7 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryProviderWatcher.java @@ -66,7 +66,7 @@ public void removeProviderListener(ConsumerConfig consumerConfig) { public void updateProviders(ConsumerConfig config, List podList) { List providerInfoListeners = providerListenerMap.get(config); if (CommonUtils.isNotEmpty(providerInfoListeners)) { - List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(podList); + List providerInfos = KubernetesRegistryHelper.convertPodsToProviders(podList, config); List matchProviders = RegistryUtils.matchProviderInfos(config, providerInfos); for (ProviderInfoListener providerInfoListener : providerInfoListeners) { diff --git a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java index 49f251aa6..959400d7c 100644 --- a/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java +++ b/registry/registry-kubernetes/src/main/java/com/alipay/sofa/rpc/registry/kubernetes/constant/KubernetesClientConstants.java @@ -20,8 +20,6 @@ public class KubernetesClientConstants { public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc"; - public static final String LABEL_KEY = "com.sofa.rpc/label"; - public static final String ANNOTATION_KEY = "com.sofa.rpc/annotation"; public static final String TRUST_CERTS = "trustCerts"; diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java index 16ceabdee..e9f689d5d 100644 --- a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/KubernetesRegistryTest.java @@ -18,7 +18,7 @@ import com.alipay.sofa.rpc.client.ProviderGroup; import com.alipay.sofa.rpc.common.RpcConstants; -import com.alipay.sofa.rpc.common.utils.JSONUtils; +import com.alipay.sofa.rpc.common.json.JSON; import com.alipay.sofa.rpc.config.ApplicationConfig; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; @@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit; import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY; -import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LABEL_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -164,35 +163,58 @@ public void testAll() throws InterruptedException { ApplicationConfig applicationConfig = new ApplicationConfig() .setAppName(APP_NAME); - ServerConfig serverConfig = new ServerConfig() + ServerConfig serverConfig1 = new ServerConfig() .setProtocol("bolt") .setPort(12200) .setDaemon(false); - ProviderConfig providerConfig = new ProviderConfig() + ProviderConfig providerConfig1 = new ProviderConfig() .setApplication(applicationConfig) .setInterfaceId(TestService.class.getName()) .setRegistry(registryConfig) .setRegister(true) // .setUniqueId("standalone") .setRef(new TestServiceImpl()) - .setServer(serverConfig); + .setDelay(20) + .setServer(serverConfig1); + + // 注册第一个providerConfig1 + kubernetesRegistry.register(providerConfig1); + + ServerConfig serverConfig2 = new ServerConfig() + .setProtocol("h2c") + .setPort(12202) + .setDaemon(false); + + ProviderConfig providerConfig2 = new ProviderConfig() + .setApplication(applicationConfig) + .setInterfaceId(TestService2.class.getName()) + .setRegistry(registryConfig) + .setRegister(true) + // .setUniqueId("standalone") + .setRef(new TestServiceImpl2()) + .setDelay(20) + .setServer(serverConfig2); - // 注册 - kubernetesRegistry.register(providerConfig); + // 注册第二个providerConfig2 + kubernetesRegistry.register(providerConfig2); - List items = mockClient.pods().inNamespace(NAMESPACE).withLabel(LABEL_KEY).list().getItems(); + Assert.assertEquals(2, kubernetesRegistry.getProviderInstances().size()); + + List items = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); Assert.assertEquals(1, items.size()); Pod pod = items.get(0); - String label = pod.getMetadata().getLabels().get(LABEL_KEY); - Assert.assertNotNull(label); - String annotation = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY); Assert.assertNotNull(annotation); - ProviderConfig result = JSONUtils.parseObject(annotation, ProviderConfig.class); - RegistryConfig registryConfig = (RegistryConfig) result.getRegistry().get(0); - Assert.assertEquals(registryConfig.getProtocol(), "kubernetes"); + Map map = JSON.parseObject(annotation, Map.class); + ProviderConfig bolt = map.get(KubernetesRegistryHelper.buildDataId(providerConfig1, "bolt")); + List boltRegistry = bolt.getRegistry(); + Assert.assertEquals("kubernetes", boltRegistry.get(0).getProtocol()); + + ProviderConfig h2c = map.get(KubernetesRegistryHelper.buildDataId(providerConfig2, "h2c")); + List h2cRegistry = h2c.getRegistry(); + Assert.assertEquals("kubernetes", h2cRegistry.get(0).getProtocol()); // 订阅 consumer = new ConsumerConfig(); @@ -222,13 +244,13 @@ public void testAll() throws InterruptedException { // 一次发2个端口的再次注册 latch = new CountDownLatch(2); providerInfoListener.setCountDownLatch(latch); - ServerConfig serverConfig2 = new ServerConfig() + ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") .setHost("0.0.0.0") .setDaemon(false) .setPort(12201); - providerConfig.getServer().add(serverConfig2); - kubernetesRegistry.register(providerConfig); + providerConfig1.getServer().add(serverConfig); + kubernetesRegistry.register(providerConfig1); latch.await(5000 * 2, TimeUnit.MILLISECONDS); Assert.assertTrue(ps.size() > 0); Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP)); @@ -238,16 +260,14 @@ public void testAll() throws InterruptedException { kubernetesRegistry.unSubscribe(consumer); Assert.assertEquals(0, kubernetesRegistry.getConsumerListeners().size()); - // 反注册 - kubernetesRegistry.unRegister(providerConfig); - - List unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).withLabel(LABEL_KEY).list().getItems(); - Assert.assertEquals(0, unRegisterItems.size()); + // 反注册providerConfig1 + kubernetesRegistry.unRegister(providerConfig1); + // 反注册providerConfig2 + kubernetesRegistry.unRegister(providerConfig2); - List unRegisterItems1 = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); - String unRegisterLabel = unRegisterItems1.get(0).getMetadata().getLabels().get(LABEL_KEY); - Assert.assertNull(unRegisterLabel); - String unRegisterAnnotations = unRegisterItems1.get(0).getMetadata().getAnnotations().get(ANNOTATION_KEY); + List unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).list().getItems(); + Assert.assertEquals(0, kubernetesRegistry.getProviderInstances().size()); + String unRegisterAnnotations = unRegisterItems.get(0).getMetadata().getAnnotations().get(ANNOTATION_KEY); Assert.assertNull(unRegisterAnnotations); } diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java new file mode 100644 index 000000000..30d7392e6 --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestService2.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public interface TestService2 { + + String sayHello(String str); +} \ No newline at end of file diff --git a/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java new file mode 100644 index 000000000..bc6c3aa1b --- /dev/null +++ b/registry/registry-kubernetes/src/test/java/com/alipay/sofa/rpc/registry/kubernetes/TestServiceImpl2.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.registry.kubernetes; + +public class TestServiceImpl2 implements TestService2 { + + @Override + public String sayHello(String str) { + return str; + } +} \ No newline at end of file