Skip to content

Commit

Permalink
support k8s registry
Browse files Browse the repository at this point in the history
  • Loading branch information
呈铭 committed Mar 14, 2024
1 parent 5aa05d2 commit d2c502e
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.annotation.VisibleForTesting;
import com.alipay.sofa.rpc.common.json.JSON;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.RegistryConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.listener.ProviderInfoListener;
import com.alipay.sofa.rpc.log.LogCodes;
Expand All @@ -48,7 +50,6 @@
import java.util.concurrent.ConcurrentMap;

import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY;
import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LABEL_KEY;

@Extension("kubernetes")
public class KubernetesRegistry extends Registry {
Expand All @@ -66,7 +67,8 @@ public class KubernetesRegistry extends Registry {

private KubernetesRegistryProviderWatcher kubernetesRegistryProviderWatcher;

private final ConcurrentMap<ProviderConfig, Pod> providerInstances = new ConcurrentHashMap<>(64);
// {dataId:ProviderConfig}
private final ConcurrentHashMap<String, ProviderConfig> providerInstances = new ConcurrentHashMap<>(64);

private final ConcurrentMap<ConsumerConfig, SharedIndexInformer<Pod>> consumerListeners = new ConcurrentHashMap<>(64);

Expand All @@ -87,6 +89,10 @@ public void init() {
if (kubernetesClient == null) {
this.kubernetesClient = KubernetesClientUtils.buildKubernetesClient(config);
}
// init Watcher
if (kubernetesRegistryProviderWatcher == null) {
kubernetesRegistryProviderWatcher = new KubernetesRegistryProviderWatcher();
}
this.currentHostname = System.getenv("HOSTNAME");
this.namespace = config.getNamespace();
}
Expand All @@ -111,13 +117,18 @@ public void register(ProviderConfig config) {
.inNamespace(namespace)
.withName(currentHostname);

Pod curPod = podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
// sava provider config to k8s pod
.addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(config))
// add labels on service provider and have the service consumer subscribe based on the labels for selection
.addToLabels(LABEL_KEY, config.getInterfaceId()).endMetadata().build());
List<ServerConfig> serverConfigs = config.getServer();

if (CommonUtils.isNotEmpty(serverConfigs)) {
for (ServerConfig serverConfig : serverConfigs) {
providerInstances.put(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()), config);
}

providerInstances.put(config, curPod);
podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
// 将ProviderConfig存在Annotations上
.addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true))
.endMetadata().build());
}
}
}

Expand All @@ -132,16 +143,30 @@ public void unRegister(ProviderConfig config) {
}

if (config.isRegister()) {
providerInstances.remove(config);
List<ServerConfig> serverConfigs = config.getServer();
if (CommonUtils.isNotEmpty(serverConfigs)) {
for (ServerConfig serverConfig : serverConfigs) {
providerInstances.remove(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()));
}
}

kubernetesClient.pods()
PodResource podResource = kubernetesClient.pods()
.inNamespace(namespace)
.withName(currentHostname)
.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
.removeFromAnnotations(ANNOTATION_KEY)
.removeFromLabels(LABEL_KEY)
.endMetadata()
.build());
.withName(currentHostname);

if (CommonUtils.isEmpty(providerInstances)) {
podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
.removeFromAnnotations(ANNOTATION_KEY)
.endMetadata()
.build());
} else {
// 考虑可能会有一个pod上多个ProviderConfig的场景,所以先全部移除然后将最新的providerInstances存在Annotations上
podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
.removeFromAnnotations(ANNOTATION_KEY)
.addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true))
.endMetadata()
.build());
}
}
}

Expand Down Expand Up @@ -170,17 +195,12 @@ public List<ProviderGroup> subscribe(ConsumerConfig config) {

if (config.isSubscribe()) {

if (kubernetesRegistryProviderWatcher == null) {
kubernetesRegistryProviderWatcher = new KubernetesRegistryProviderWatcher();
}

ProviderInfoListener providerInfoListener = config.getProviderInfoListener();
kubernetesRegistryProviderWatcher.addProviderListener(config, providerInfoListener);

FilterWatchListDeletable<Pod, PodList, PodResource> podPodListPodResourceFilterWatchListDeletable =
kubernetesClient.pods()
.inNamespace(namespace)
.withLabel(LABEL_KEY);
.inNamespace(namespace);

SharedIndexInformer<Pod> inform = podPodListPodResourceFilterWatchListDeletable.inform(new ResourceEventHandler<Pod>() {
@Override
Expand All @@ -204,7 +224,7 @@ public void onDelete(Pod pod, boolean b) {
inform.start();

List<Pod> pods = podPodListPodResourceFilterWatchListDeletable.list().getItems();
List<ProviderInfo> providerInfos = KubernetesRegistryHelper.convertPodsToProviders(pods);
List<ProviderInfo> providerInfos = KubernetesRegistryHelper.convertPodsToProviders(pods, config);
List<ProviderInfo> matchProviders = RegistryUtils.matchProviderInfos(config, providerInfos);

return Collections.singletonList(new ProviderGroup().addAll(matchProviders));
Expand Down Expand Up @@ -240,19 +260,20 @@ public void batchUnSubscribe(List<ConsumerConfig> configs) {
@Override
public void destroy() {
// unRegister provider
providerInstances.forEach((k, v) -> unRegister(k));
providerInstances.forEach((k, v) -> unRegister(v));

// unRegister consumer
consumerListeners.forEach((k, v) -> unSubscribe(k));

// close kubernetes client
kubernetesClient.close();
if (CommonUtils.isEmpty(providerInstances)) {
kubernetesClient.close();
}
}

private List<Pod> getPods() {
return kubernetesClient.pods()
.inNamespace(namespace)
.withLabel(LABEL_KEY)
.list()
.getItems();
}
Expand All @@ -273,4 +294,11 @@ public ConcurrentMap<ConsumerConfig, SharedIndexInformer<Pod>> getConsumerListen
return consumerListeners;
}

/**
* UT used only
*/
@VisibleForTesting
public ConcurrentMap<String, ProviderConfig> getProviderInstances() {
return providerInstances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import com.alipay.sofa.rpc.client.ProviderHelper;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.json.JSON;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.log.Logger;
Expand All @@ -33,20 +37,19 @@
import java.util.Map;

import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY;
import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LABEL_KEY;

public class KubernetesRegistryHelper extends RegistryUtils {

private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistryHelper.class);

public static List<ProviderInfo> convertPodsToProviders(List<Pod> pods) {
public static List<ProviderInfo> convertPodsToProviders(List<Pod> pods, ConsumerConfig config) {
List<ProviderInfo> providerInfos = new ArrayList<>();
if (CommonUtils.isEmpty(pods)) {
if (CommonUtils.isEmpty(pods) || null == config) {
return providerInfos;
}

for (Pod pod : pods) {
ProviderConfig providerConfig = getProviderConfig(pod);
ProviderConfig providerConfig = getProviderConfig(pod, config);
if (null == providerConfig || null == providerConfig.getServer()) {
continue;
}
Expand Down Expand Up @@ -81,28 +84,31 @@ private static String convertInstanceToUrl(Pod pod, ServerConfig serverConfig, P
return uri;
}

private static ProviderConfig getProviderConfig(Pod pod) {
private static ProviderConfig getProviderConfig(Pod pod, ConsumerConfig config) {
try {
String interfaceId = pod.getMetadata().getLabels().get(LABEL_KEY);
if (StringUtils.isBlank(interfaceId)) {
return null;
}

String providerConfigStr = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY);
if (StringUtils.isBlank(providerConfigStr)) {
return null;
}

ProviderConfig providerConfig = JSON.parseObject(providerConfigStr, ProviderConfig.class);
if (null == providerConfig) {
Map<String, ProviderConfig> map = JSON.parseObject(providerConfigStr, Map.class);

if (null == map) {
return null;
}

return providerConfig;
return map.get(KubernetesRegistryHelper.buildDataId(config, config.getProtocol()));
} catch (Exception e) {
LOGGER.info("get provider config error with pod");
return null;
}
}

public static String buildDataId(AbstractInterfaceConfig config, String protocol) {
if (RpcConstants.PROTOCOL_TYPE_BOLT.equals(protocol) || RpcConstants.PROTOCOL_TYPE_TR.equals(protocol)) {
return ConfigUniqueNameGenerator.getUniqueName(config) + "@DEFAULT";
} else {
return ConfigUniqueNameGenerator.getUniqueName(config) + "@" + protocol;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void removeProviderListener(ConsumerConfig consumerConfig) {
public void updateProviders(ConsumerConfig config, List<Pod> podList) {
List<ProviderInfoListener> providerInfoListeners = providerListenerMap.get(config);
if (CommonUtils.isNotEmpty(providerInfoListeners)) {
List<ProviderInfo> providerInfos = KubernetesRegistryHelper.convertPodsToProviders(podList);
List<ProviderInfo> providerInfos = KubernetesRegistryHelper.convertPodsToProviders(podList, config);
List<ProviderInfo> matchProviders = RegistryUtils.matchProviderInfos(config, providerInfos);

for (ProviderInfoListener providerInfoListener : providerInfoListeners) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ public class KubernetesClientConstants {

public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc";

public static final String LABEL_KEY = "com.sofa.rpc/label";

public static final String ANNOTATION_KEY = "com.sofa.rpc/annotation";

public static final String TRUST_CERTS = "trustCerts";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.utils.JSONUtils;
import com.alipay.sofa.rpc.common.json.JSON;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand Down Expand Up @@ -52,7 +52,6 @@
import java.util.concurrent.TimeUnit;

import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY;
import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.LABEL_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

Expand Down Expand Up @@ -164,35 +163,58 @@ public void testAll() throws InterruptedException {
ApplicationConfig applicationConfig = new ApplicationConfig()
.setAppName(APP_NAME);

ServerConfig serverConfig = new ServerConfig()
ServerConfig serverConfig1 = new ServerConfig()
.setProtocol("bolt")
.setPort(12200)
.setDaemon(false);

ProviderConfig<TestService> providerConfig = new ProviderConfig<TestService>()
ProviderConfig<TestService> providerConfig1 = new ProviderConfig<TestService>()
.setApplication(applicationConfig)
.setInterfaceId(TestService.class.getName())
.setRegistry(registryConfig)
.setRegister(true)
// .setUniqueId("standalone")
.setRef(new TestServiceImpl())
.setServer(serverConfig);
.setDelay(20)
.setServer(serverConfig1);

// 注册第一个providerConfig1
kubernetesRegistry.register(providerConfig1);

ServerConfig serverConfig2 = new ServerConfig()
.setProtocol("h2c")
.setPort(12202)
.setDaemon(false);

ProviderConfig<TestService2> providerConfig2 = new ProviderConfig<TestService2>()
.setApplication(applicationConfig)
.setInterfaceId(TestService2.class.getName())
.setRegistry(registryConfig)
.setRegister(true)
// .setUniqueId("standalone")
.setRef(new TestServiceImpl2())
.setDelay(20)
.setServer(serverConfig2);

// 注册
kubernetesRegistry.register(providerConfig);
// 注册第二个providerConfig2
kubernetesRegistry.register(providerConfig2);

List<Pod> items = mockClient.pods().inNamespace(NAMESPACE).withLabel(LABEL_KEY).list().getItems();
Assert.assertEquals(2, kubernetesRegistry.getProviderInstances().size());

List<Pod> items = mockClient.pods().inNamespace(NAMESPACE).list().getItems();

Assert.assertEquals(1, items.size());
Pod pod = items.get(0);
String label = pod.getMetadata().getLabels().get(LABEL_KEY);
Assert.assertNotNull(label);

String annotation = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY);
Assert.assertNotNull(annotation);
ProviderConfig result = JSONUtils.parseObject(annotation, ProviderConfig.class);
RegistryConfig registryConfig = (RegistryConfig) result.getRegistry().get(0);
Assert.assertEquals(registryConfig.getProtocol(), "kubernetes");
Map<String, ProviderConfig> map = JSON.parseObject(annotation, Map.class);
ProviderConfig bolt = map.get(KubernetesRegistryHelper.buildDataId(providerConfig1, "bolt"));
List<RegistryConfig> boltRegistry = bolt.getRegistry();
Assert.assertEquals("kubernetes", boltRegistry.get(0).getProtocol());

ProviderConfig h2c = map.get(KubernetesRegistryHelper.buildDataId(providerConfig2, "h2c"));
List<RegistryConfig> h2cRegistry = h2c.getRegistry();
Assert.assertEquals("kubernetes", h2cRegistry.get(0).getProtocol());

// 订阅
consumer = new ConsumerConfig();
Expand Down Expand Up @@ -222,13 +244,13 @@ public void testAll() throws InterruptedException {
// 一次发2个端口的再次注册
latch = new CountDownLatch(2);
providerInfoListener.setCountDownLatch(latch);
ServerConfig serverConfig2 = new ServerConfig()
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt")
.setHost("0.0.0.0")
.setDaemon(false)
.setPort(12201);
providerConfig.getServer().add(serverConfig2);
kubernetesRegistry.register(providerConfig);
providerConfig1.getServer().add(serverConfig);
kubernetesRegistry.register(providerConfig1);
latch.await(5000 * 2, TimeUnit.MILLISECONDS);
Assert.assertTrue(ps.size() > 0);
Assert.assertNotNull(ps.get(RpcConstants.ADDRESS_DEFAULT_GROUP));
Expand All @@ -238,16 +260,14 @@ public void testAll() throws InterruptedException {
kubernetesRegistry.unSubscribe(consumer);
Assert.assertEquals(0, kubernetesRegistry.getConsumerListeners().size());

// 反注册
kubernetesRegistry.unRegister(providerConfig);

List<Pod> unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).withLabel(LABEL_KEY).list().getItems();
Assert.assertEquals(0, unRegisterItems.size());
// 反注册providerConfig1
kubernetesRegistry.unRegister(providerConfig1);
// 反注册providerConfig2
kubernetesRegistry.unRegister(providerConfig2);

List<Pod> unRegisterItems1 = mockClient.pods().inNamespace(NAMESPACE).list().getItems();
String unRegisterLabel = unRegisterItems1.get(0).getMetadata().getLabels().get(LABEL_KEY);
Assert.assertNull(unRegisterLabel);
String unRegisterAnnotations = unRegisterItems1.get(0).getMetadata().getAnnotations().get(ANNOTATION_KEY);
List<Pod> unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).list().getItems();
Assert.assertEquals(0, kubernetesRegistry.getProviderInstances().size());
String unRegisterAnnotations = unRegisterItems.get(0).getMetadata().getAnnotations().get(ANNOTATION_KEY);
Assert.assertNull(unRegisterAnnotations);

}
Expand Down
Loading

0 comments on commit d2c502e

Please sign in to comment.