Skip to content

Commit

Permalink
support sofa registry kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
呈铭 committed Mar 18, 2024
1 parent 0d3ca60 commit 2da24a7
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,7 @@ public String getParameter(String key, String defaultValue) {
* @return the value
*/
public int getParameter(String key, int defaultValue) {
return (parameters == null || parameters.get(key) == null) ? defaultValue : Integer.parseInt(parameters
.get(key));
return getParameter(key) == null ? defaultValue : Integer.parseInt(parameters.get(key));
}

/**
Expand All @@ -426,8 +425,7 @@ public int getParameter(String key, int defaultValue) {
* @return the value
*/
public boolean getParameter(String key, boolean defaultValue) {
return (parameters == null || parameters.get(key) == null) ? defaultValue : Boolean.parseBoolean(parameters
.get(key));
return getParameter(key) == null ? defaultValue : Boolean.parseBoolean(parameters.get(key));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY;

@Extension("kubernetes")
public class KubernetesRegistry extends Registry {

Expand Down Expand Up @@ -121,13 +119,14 @@ public void register(ProviderConfig config) {

if (CommonUtils.isNotEmpty(serverConfigs)) {
for (ServerConfig serverConfig : serverConfigs) {
providerInstances.put(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()), config);
}
String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol());
providerInstances.put(dataId, config);

podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
// 将ProviderConfig存在Annotations上
.addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true))
.endMetadata().build());
podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
// 将ProviderConfig存在Annotations上
.addToAnnotations(dataId, JSON.toJSONString(config, true))
.endMetadata().build());
}
}
}
}
Expand All @@ -143,29 +142,21 @@ public void unRegister(ProviderConfig config) {
}

if (config.isRegister()) {
List<ServerConfig> serverConfigs = config.getServer();
if (CommonUtils.isNotEmpty(serverConfigs)) {
for (ServerConfig serverConfig : serverConfigs) {
providerInstances.remove(KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol()));
}
}

PodResource podResource = kubernetesClient.pods()
.inNamespace(namespace)
.withName(currentHostname);

if (CommonUtils.isEmpty(providerInstances)) {
podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
.removeFromAnnotations(ANNOTATION_KEY)
.endMetadata()
.build());
} else {
// 考虑可能会有一个pod上多个ProviderConfig的场景,所以先全部移除然后将最新的providerInstances存在Annotations上
podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
.removeFromAnnotations(ANNOTATION_KEY)
.addToAnnotations(ANNOTATION_KEY, JSON.toJSONString(providerInstances, true))
.endMetadata()
.build());
List<ServerConfig> serverConfigs = config.getServer();
if (CommonUtils.isNotEmpty(serverConfigs)) {
for (ServerConfig serverConfig : serverConfigs) {
String dataId = KubernetesRegistryHelper.buildDataId(config, serverConfig.getProtocol());
providerInstances.remove(dataId);

podResource.edit(pod -> new PodBuilder(pod).editOrNewMetadata()
.removeFromAnnotations(dataId)
.endMetadata()
.build());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import java.util.List;
import java.util.Map;

import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY;

public class KubernetesRegistryHelper extends RegistryUtils {

private final static Logger LOGGER = LoggerFactory.getLogger(KubernetesRegistryHelper.class);
Expand Down Expand Up @@ -86,18 +84,19 @@ private static String convertInstanceToUrl(Pod pod, ServerConfig serverConfig, P

private static ProviderConfig getProviderConfig(Pod pod, ConsumerConfig config) {
try {
String providerConfigStr = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY);
String dataId = buildDataId(config, config.getProtocol());
String providerConfigStr = pod.getMetadata().getAnnotations().get(dataId);
if (StringUtils.isBlank(providerConfigStr)) {
return null;
}

Map<String, ProviderConfig> map = JSON.parseObject(providerConfigStr, Map.class);
ProviderConfig providerConfig = JSON.parseObject(providerConfigStr, ProviderConfig.class);

if (null == map) {
if (null == providerConfig) {
return null;
}

return map.get(KubernetesRegistryHelper.buildDataId(config, config.getProtocol()));
return providerConfig;
} catch (Exception e) {
LOGGER.info("get provider config error with pod");
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ public class KubernetesClientConstants {

public static final String DEFAULT_MASTER_URL = "https://kubernetes.default.svc";

public static final String ANNOTATION_KEY = "com.sofa.rpc/annotation";

public static final String TRUST_CERTS = "trustCerts";

public static final String USE_HTTPS = "useHttps";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static com.alipay.sofa.rpc.registry.kubernetes.constant.KubernetesClientConstants.ANNOTATION_KEY;
import static com.alipay.sofa.rpc.registry.kubernetes.KubernetesRegistryHelper.buildDataId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

Expand Down Expand Up @@ -205,16 +205,10 @@ public void testAll() throws InterruptedException {

Assert.assertEquals(1, items.size());
Pod pod = items.get(0);
String annotation = pod.getMetadata().getAnnotations().get(ANNOTATION_KEY);
Assert.assertNotNull(annotation);
Map<String, ProviderConfig> map = JSON.parseObject(annotation, Map.class);
ProviderConfig bolt = map.get(KubernetesRegistryHelper.buildDataId(providerConfig1, "bolt"));
List<RegistryConfig> boltRegistry = bolt.getRegistry();
Assert.assertEquals("kubernetes", boltRegistry.get(0).getProtocol());

ProviderConfig h2c = map.get(KubernetesRegistryHelper.buildDataId(providerConfig2, "h2c"));
List<RegistryConfig> h2cRegistry = h2c.getRegistry();
Assert.assertEquals("kubernetes", h2cRegistry.get(0).getProtocol());
String annotationBolt = pod.getMetadata().getAnnotations().get(buildDataId(providerConfig1, "bolt"));
Assert.assertNotNull(annotationBolt);
String annotationH2c = pod.getMetadata().getAnnotations().get(buildDataId(providerConfig2, "h2c"));
Assert.assertNotNull(annotationH2c);

// 订阅
consumer = new ConsumerConfig();
Expand Down Expand Up @@ -267,9 +261,10 @@ public void testAll() throws InterruptedException {

List<Pod> unRegisterItems = mockClient.pods().inNamespace(NAMESPACE).list().getItems();
Assert.assertEquals(0, kubernetesRegistry.getProviderInstances().size());
String unRegisterAnnotations = unRegisterItems.get(0).getMetadata().getAnnotations().get(ANNOTATION_KEY);
Assert.assertNull(unRegisterAnnotations);

String unRegisterAnnotationsBolt = unRegisterItems.get(0).getMetadata().getAnnotations().get(buildDataId(providerConfig1, "bolt"));
Assert.assertNull(unRegisterAnnotationsBolt);
String unRegisterAnnotationsH2c = unRegisterItems.get(0).getMetadata().getAnnotations().get(buildDataId(providerConfig2, "h2c"));
Assert.assertNull(unRegisterAnnotationsH2c);
}

private static class MockProviderInfoListener implements ProviderInfoListener {
Expand Down

0 comments on commit 2da24a7

Please sign in to comment.