Skip to content

Commit

Permalink
[CELEBORN-1264] ConfigService supports TENANT_USER config level
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
 ConfigService support user level config

### Why are the changes needed?
Support more case of config, later can integrate with quota manager

### Does this PR introduce _any_ user-facing change?
With this pr, user's setting form config service will have three level

- User
- Tenant
- System

User identifier is construct by username and tenantId,
If there is no specify setting for username, will fallback to tenant level setting, if tenant level setting also not set, fallback to system setting

### How was this patch tested?
Added UT

Closes apache#2285 from AngersZhuuuu/CELEBORN-1264.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Angerszhuuuu <[email protected]>
  • Loading branch information
AngersZhuuuu committed Feb 18, 2024
1 parent 7a05b2f commit e4f7ea8
Show file tree
Hide file tree
Showing 14 changed files with 217 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,6 +40,9 @@ public abstract class BaseConfigServiceImpl implements ConfigService {
protected final AtomicReference<Map<String, TenantConfig>> tenantConfigAtomicReference =
new AtomicReference<>(new HashMap<>());

protected final AtomicReference<Map<Pair<String, String>, TenantConfig>>
tenantUserConfigAtomicReference = new AtomicReference<>(new HashMap<>());

private final ScheduledExecutorService configRefreshService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-config-refresher");

Expand Down Expand Up @@ -81,6 +85,11 @@ public TenantConfig getRawTenantConfigFromCache(String tenantId) {
return tenantConfigAtomicReference.get().get(tenantId);
}

@Override
public TenantConfig getRawTenantUserConfig(String tenantId, String userId) {
return tenantUserConfigAtomicReference.get().get(Pair.of(tenantId, userId));
}

@Override
public void shutdown() {
ThreadUtils.shutdown(configRefreshService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
public enum ConfigLevel {
SYSTEM,
TENANT,
TENANT_USER
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ default DynamicConfig getTenantConfigFromCache(String tenantId) {
}
}

TenantConfig getRawTenantUserConfig(String tenantId, String userId);

default DynamicConfig getTenantUserConfig(String tenantId, String userId) {
TenantConfig tenantConfig = getRawTenantUserConfig(tenantId, userId);
if (tenantConfig == null) {
return getTenantConfigFromCache(tenantId);
} else {
return tenantConfig;
}
}

void refreshAllCache() throws IOException;

void shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.server.common.service.store.IServiceManager;
import org.apache.celeborn.server.common.service.store.db.DbServiceManagerImpl;
Expand Down Expand Up @@ -50,5 +52,13 @@ public void refreshAllCache() throws IOException {
allTenantConfigs.stream()
.collect(Collectors.toMap(TenantConfig::getTenantId, Function.identity()));
tenantConfigAtomicReference.set(tenantConfigMap);
List<TenantConfig> allTenantUserConfigs = iServiceManager.getAllTenantUserConfigs();
Map<Pair<String, String>, TenantConfig> tenantUserConfigMap =
allTenantUserConfigs.stream()
.collect(
Collectors.toMap(
tenantConfig -> Pair.of(tenantConfig.getTenantId(), tenantConfig.getName()),
Function.identity()));
tenantUserConfigAtomicReference.set(tenantUserConfigMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
Expand All @@ -35,6 +36,8 @@
public class FsConfigServiceImpl extends BaseConfigServiceImpl implements ConfigService {
private static final Logger LOG = LoggerFactory.getLogger(FsConfigServiceImpl.class);
private static final String CONF_TENANT_ID = "tenantId";
private static final String CONF_TENANT_USERS = "users";
private static final String CONF_TENANT_NAME = "name";
private static final String CONF_LEVEL = "level";
private static final String CONF_CONFIG = "config";

Expand All @@ -51,34 +54,62 @@ public synchronized void refreshAllCache() {

SystemConfig systemConfig = null;
Map<String, TenantConfig> tenantConfs = new HashMap<>();
Map<Pair<String, String>, TenantConfig> tenantUserConfs = new HashMap<>();
try (FileInputStream fileInputStream = new FileInputStream(configurationFile)) {
Yaml yaml = new Yaml();
List<Map<String, Object>> dynamicConfigs = yaml.load(fileInputStream);
for (Map<String, Object> settings : dynamicConfigs) {
String tenantId = (String) settings.get(CONF_TENANT_ID);
String level = (String) settings.get(CONF_LEVEL);
Map<String, String> config =
((Map<String, Object>) settings.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));
if (ConfigLevel.TENANT.name().equals(level)) {
TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, config);
tenantConfs.put(tenantId, tenantConfig);
if (settings.containsKey(CONF_TENANT_ID)) {
String tenantId = (String) settings.get(CONF_TENANT_ID);
if (settings.containsKey(CONF_CONFIG)) {
Map<String, String> config = extractConfig(settings);
TenantConfig tenantConfig = new TenantConfig(this, tenantId, null, config);
tenantConfs.put(tenantId, tenantConfig);
}
if (settings.containsKey(CONF_TENANT_USERS)) {
List<Map<String, Object>> users =
(List<Map<String, Object>>) settings.get(CONF_TENANT_USERS);
for (Map<String, Object> userSetting : users) {
if (userSetting.containsKey(CONF_TENANT_NAME)
&& userSetting.containsKey(CONF_CONFIG)) {
String name = (String) userSetting.get(CONF_TENANT_NAME);
Map<String, String> userConfig = extractConfig(userSetting);
TenantConfig tenantUserConfig =
new TenantConfig(this, tenantId, name, userConfig);
tenantUserConfs.put(Pair.of(tenantId, name), tenantUserConfig);
}
}
}
}
} else {
systemConfig = new SystemConfig(celebornConf, config);
if (settings.containsKey(CONF_CONFIG)) {
Map<String, String> config = extractConfig(settings);
systemConfig = new SystemConfig(celebornConf, config);
}
}
}
} catch (Exception e) {
LOG.warn("Refresh dynamic config error: {}", e.getMessage(), e);
return;
}

tenantUserConfigAtomicReference.set(tenantUserConfs);
tenantConfigAtomicReference.set(tenantConfs);
if (systemConfig != null) {
systemConfigAtomicReference.set(systemConfig);
}
}

private Map<String, String> extractConfig(Map<String, Object> setting) {
Map<String, String> config =
((Map<String, Object>) setting.get(CONF_CONFIG))
.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, a -> a.getValue().toString()));
return config;
}

private File getConfigurationFile(Map<String, String> env) {
if (!this.celebornConf.quotaConfigurationPath().isEmpty()) {
return new File(this.celebornConf.quotaConfigurationPath().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public String getName() {

@Override
public DynamicConfig getParentLevelConfig() {
return configService.getSystemConfigFromCache();
if (name == null) {
return configService.getSystemConfigFromCache();
} else {
return configService.getTenantConfigFromCache(tenantId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ClusterTenantConfig {
private Integer clusterId;
private String tenantId;
private String level;
private String user;
private String name;
private String configKey;
private String configValue;
private String type;
Expand Down Expand Up @@ -67,12 +67,12 @@ public void setLevel(String level) {
this.level = level;
}

public String getUser() {
return StringUtils.isBlank(user) ? null : user;
public String getName() {
return StringUtils.isBlank(name) ? null : name;
}

public void setUser(String user) {
this.user = user;
public void setName(String name) {
this.name = name;
}

public String getConfigKey() {
Expand Down Expand Up @@ -116,7 +116,7 @@ public void setGmtModify(Date gmtModify) {
}

public Pair getTenantInfo() {
return Pair.of(tenantId, user);
return Pair.of(tenantId, name);
}

@Override
Expand All @@ -126,7 +126,7 @@ public String toString() {
sb.append(", clusterId=").append(clusterId);
sb.append(", tenantId='").append(tenantId).append('\'');
sb.append(", level='").append(level).append('\'');
sb.append(", user='").append(user).append('\'');
sb.append(", user='").append(name).append('\'');
sb.append(", configKey='").append(configKey).append('\'');
sb.append(", configValue='").append(configValue).append('\'');
sb.append(", type='").append(type).append('\'');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ public interface IServiceManager {

List<TenantConfig> getAllTenantConfigs();

List<TenantConfig> getAllTenantUserConfigs();

SystemConfig getSystemConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -113,6 +114,33 @@ public List<TenantConfig> getAllTenantConfigs() {
}
}

@Override
public List<TenantConfig> getAllTenantUserConfigs() {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
ClusterTenantConfigMapper mapper = sqlSession.getMapper(ClusterTenantConfigMapper.class);
int totalNum = mapper.getClusterTenantConfigsNum(clusterId, ConfigLevel.TENANT_USER.name());
int offset = 0;
List<ClusterTenantConfig> clusterAllTenantConfigs = new ArrayList<>();
while (offset < totalNum) {
List<ClusterTenantConfig> clusterTenantConfigs =
mapper.getClusterTenantConfigs(
clusterId, ConfigLevel.TENANT_USER.name(), offset, pageSize);
clusterAllTenantConfigs.addAll(clusterTenantConfigs);
offset = offset + pageSize;
}

Map<Pair<String, String>, List<ClusterTenantConfig>> tenantConfigMaps =
clusterAllTenantConfigs.stream()
.collect(Collectors.groupingBy(ClusterTenantConfig::getTenantInfo));
return tenantConfigMaps.entrySet().stream()
.map(
t ->
new TenantConfig(
configService, t.getKey().getKey(), t.getKey().getValue(), t.getValue()))
.collect(Collectors.toList());
}
}

@Override
public SystemConfig getSystemConfig() {
try (SqlSession sqlSession = sqlSessionFactory.openSession()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public interface ClusterTenantConfigMapper {

@Select(
"SELECT id, cluster_id, tenant_id, level, user, config_key, config_value, type, gmt_create, gmt_modify "
"SELECT id, cluster_id, tenant_id, level, name, config_key, config_value, type, gmt_create, gmt_modify "
+ "FROM celeborn_cluster_tenant_config WHERE cluster_id = #{clusterId} AND level=#{level} LIMIT #{offset}, #{pageSize}")
List<ClusterTenantConfig> getClusterTenantConfigs(
@Param("clusterId") int clusterId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public void testDbConfig() throws IOException {
CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_DRIVER_CLASS_NAME(), "org.h2.Driver");
celebornConf.set(CelebornConf.DYNAMIC_CONFIG_STORE_DB_HIKARI_MAXIMUM_POOL_SIZE(), "1");
configService = new DbConfigServiceImpl(celebornConf);
verifyConfig(configService);
verifySystemConfig(configService);
verifyTenantConfig(configService);
verifyTenantUserConfig(configService);

SqlSessionFactory sqlSessionFactory = DBSessionFactory.get(celebornConf);
try (SqlSession sqlSession = sqlSessionFactory.openSession(true)) {
Expand All @@ -68,7 +70,9 @@ public void testFsConfig() throws IOException {
celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
celebornConf.set(CelebornConf.DYNAMIC_CONFIG_REFRESH_INTERVAL(), 5L);
configService = new FsConfigServiceImpl(celebornConf);
verifyConfig(configService);
verifySystemConfig(configService);
verifyTenantConfig(configService);
verifyTenantUserConfig(configService);
// change -> refresh config
file = getClass().getResource("/dynamicConfig_2.yaml").getFile();
celebornConf.set(CelebornConf.QUOTA_CONFIGURATION_PATH(), file);
Expand All @@ -84,7 +88,7 @@ public void teardown() {
}
}

public void verifyConfig(ConfigService configService) {
public void verifySystemConfig(ConfigService configService) {
// ------------- Verify SystemConfig ----------------- //
SystemConfig systemConfig = configService.getSystemConfigFromCache();
// verify systemConfig's bytesConf -- use systemConfig
Expand Down Expand Up @@ -135,11 +139,13 @@ public void verifyConfig(ConfigService configService) {
Integer intConfValue =
systemConfig.getValue("celeborn.test.int.only", null, Integer.TYPE, ConfigType.STRING);
Assert.assertEquals(intConfValue.intValue(), 10);
}

public void verifyTenantConfig(ConfigService configService) {
// ------------- Verify TenantConfig ----------------- //
DynamicConfig tenantConfig = configService.getTenantConfigFromCache("tenant_id");
// verify tenantConfig's bytesConf -- use tenantConf
value =
Long value =
tenantConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Expand Down Expand Up @@ -201,6 +207,69 @@ public void verifyConfig(ConfigService configService) {
Assert.assertEquals(withDefaultValue.longValue(), 10);
}

public void verifyTenantUserConfig(ConfigService configService) {
// ------------- Verify UserConfig ----------------- //
DynamicConfig userConfig = configService.getTenantUserConfig("tenant_id1", "Jerry");
// verify userConfig's bytesConf -- use userConf
Long value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_INITIAL_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024);

// verify userConfig's bytesConf -- defer to tenantConf
value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY().key(),
CelebornConf.CLIENT_PUSH_QUEUE_CAPACITY(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024);

// verify userConfig's bytesConf -- defer to systemConf
value =
userConfig.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024000);

// verify userConfig's bytesConf -- defer to celebornConf
value =
userConfig.getValue(
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD().key(),
CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1073741824);

// verify userConfig's bytesConf with none
value =
userConfig.getValue(
"celeborn.client.push.buffer.initial.size.only.none",
null,
Long.TYPE,
ConfigType.BYTES);
Assert.assertNull(value);

DynamicConfig userConfigNone = configService.getTenantUserConfig("tenant_id", "non_exist");
// verify userConfig's bytesConf -- defer to tenantConf
value =
userConfigNone.getValue(
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE().key(),
CelebornConf.CLIENT_PUSH_BUFFER_MAX_SIZE(),
Long.TYPE,
ConfigType.BYTES);
Assert.assertEquals(value.longValue(), 1024000);

Long withDefaultValue =
userConfigNone.getWithDefaultValue("none", 10L, Long.TYPE, ConfigType.STRING);
Assert.assertEquals(withDefaultValue.longValue(), 10);
}

public void verifyConfigChanged(ConfigService configService) {

SystemConfig systemConfig = configService.getSystemConfigFromCache();
Expand Down
Loading

0 comments on commit e4f7ea8

Please sign in to comment.