Skip to content

Commit

Permalink
feat: release 3.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
fetchadd committed Jun 28, 2024
1 parent 9b7fbc4 commit 8ea790d
Show file tree
Hide file tree
Showing 118 changed files with 15,741 additions and 12,896 deletions.
44 changes: 43 additions & 1 deletion mma-common/src/main/java/com/aliyun/odps/mma/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public abstract class Config {
protected List<ConfigItem> configItems;
protected Map<String, String> mem;

protected List<String> itemMasks() {
return Collections.emptyList();
}

public Config() {
configItemMap = new HashMap<>();
configItems = new ArrayList<>();
Expand All @@ -28,17 +32,23 @@ public Config() {

protected void initConfigItemMap(Class<? extends Config> thisC) {
Field[] fields = thisC.getFields();
List<String> masks = this.itemMasks();

for (Field field: fields) {
if (!field.isAnnotationPresent(ConfigItem.class)) {
continue;
}

ConfigItem configItem = field.getAnnotation(ConfigItem.class);
configItems.add(configItem);

try {
String configKey = (String) field.get(this);

if (masks.contains(configKey)) {
continue;
}

configItems.add(configItem);
configKeys.add(configKey);
configItemMap.put(configKey, configItem);
} catch (Exception e) {
Expand Down Expand Up @@ -157,6 +167,15 @@ public Map<String, String> getMap(String name) {
return gson.fromJson(value, new TypeToken<Map<String, String>>() {}.getType());
}

public TimerConfig getTimer(String name) {
String value = getConfig(name);
if (Objects.isNull(value)) {
return new TimerConfig();
}

return TimerConfig.parse(value);
}

public void setList(String name, List<String> value) {
Gson gson = new Gson();
String json = gson.toJson(value);
Expand Down Expand Up @@ -198,6 +217,9 @@ public List<Map<String, Object>> toJsonObj() {
case "boolean":
configValue = this.getBoolean(configKey);
break;
case "timer":
configValue = this.getMap(configKey);
break;
default:
configValue = this.getConfig(configKey);
break;
Expand All @@ -212,6 +234,15 @@ public List<Map<String, Object>> toJsonObj() {
if (configItem.required()) {
item.put("required", true);
}

if (! configItem.editable()) {
if (Objects.isNull(configValue)) {
continue;
}

item.put("editable", false);
}

item.put("type", configItem.type());
item.put("desc", configItem.desc());
items.add(item);
Expand Down Expand Up @@ -311,6 +342,17 @@ public Map<String, String> addConfigItems(Map<String, Object> items) {
if (! (configValue instanceof Map)) {
errors.put(configKey, "is not a valid json of Map");
}
break;
case "timer":
if (! (configValue instanceof Map)) {
errors.put(configKey, "is not a valid timer value");
} else {
String error = TimerConfig.verifyConfig((Map<String, String>)configValue);
if (StringUtils.isNotBlank(error)) {
errors.put(configKey, error);
}
}

break;
case "boolean":
if ((!"true".equals(configStrVal)) && (!"false".equals(configStrVal))) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.aliyun.odps.mma.config;

import com.aliyun.odps.mma.service.ConfigService;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.ApplicationContext;
Expand All @@ -9,9 +12,7 @@
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import com.aliyun.odps.mma.service.ConfigService;

@Configuration
public class ConfigInitializer {
Expand All @@ -34,25 +35,23 @@ public MMAConfig getMMAConfig() {
return config;
}

@Bean
@Primary
@Bean(name="HIVE")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public HiveConfig getHiveConfig() {
HiveConfig hiveConfig = new HiveConfig();
initConfig(hiveConfig);
return hiveConfig;
}

@Bean
// @Primary
@Bean(name="HIVE_OSS")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public HiveOssConfig getOssConfig() {
HiveOssConfig ossConfig = new HiveOssConfig();
initConfig(ossConfig);
return ossConfig;
}

@Bean
@Bean(name="ODPS")
@Primary
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public OdpsConfig getOdpsConfig() {
Expand All @@ -61,26 +60,61 @@ public OdpsConfig getOdpsConfig() {
return odpsConfig;
}

@Bean(name="DATABRICKS")
@Primary
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public DatabricksConfig getDatabricksConfig() {
DatabricksConfig config = new DatabricksConfig();
initConfig(config);
return config;
}


@Bean(name="HIVE_GLUE")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public HiveGlueConfig getHiveGlueConfig() {
HiveGlueConfig config = new HiveGlueConfig();
initConfig(config);
return config;
}

// @Bean(name="ODPS_OSS")
// @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// public OdpsOssConfig getOdpsOssConfig() {
// OdpsOssConfig config = new OdpsOssConfig();
// initConfig(config);
//
// return config;
// }

private void initConfig(Config config) {
config.setService(configService);
Map<String, String> defaultValues = config.defaultValues;

try {
Class<?> c = config.getClass();
Field[] fields = c.getFields();

for (Field field: fields) {
if (! field.isAnnotationPresent(ConfigItem.class)) {
continue;
}
// 先parent class的filed, 在本class的field,让本class filed默认值覆盖parent class field的默认值
Field[][] fieldsArray = new Field[][] {
c.getSuperclass().getFields(),
c.getDeclaredFields()
};

for (Field[] fields: fieldsArray) {
for (Field field: fields) {
if (! field.isAnnotationPresent(ConfigItem.class)) {
continue;
}

field.setAccessible(true);

field.setAccessible(true);
ConfigItem configItem = field.getAnnotation(ConfigItem.class);
String defaultValue = configItem.defaultValue();
if (! Objects.equals("", defaultValue)) {
String configKey = (String) field.get(config);

ConfigItem configItem = field.getAnnotation(ConfigItem.class);
String defaultValue = configItem.defaultValue();
if (! Objects.equals("", defaultValue)) {
String configKey = (String) field.get(config);
defaultValues.put(configKey, configItem.defaultValue());
defaultValues.put(configKey, configItem.defaultValue());
}
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@
String defaultValue() default "";
String[] enums() default {};
boolean required() default false;

boolean editable() default true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package com.aliyun.odps.mma.config;

import com.aliyun.odps.mma.constant.SourceType;
import com.aliyun.odps.mma.constant.TaskType;
import com.aliyun.odps.mma.task.DatabricksPartitionGrouping;
import com.aliyun.odps.mma.task.PartitionGrouping;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class DatabricksConfig extends SourceConfig {
@ConfigItem(desc = "Workspace URL", required = true)
public static final String DB_WORKSPACE_URL = "db.workspace.url";
@ConfigItem(desc = "Id of cluster with databricks runtime version of Spark 3.3.2", required = true)
public static final String DB_CLUSTER_ID = "db.cluster.id";
@ConfigItem(desc = "Access Token", required = true)
public static final String DB_ACCESS_TOKEN = "db.access.token";
@ConfigItem(desc = "DBFS root", required = true, defaultValue = "/FileStore/jars/")
public static final String DB_DBFS_ROOT = "db.dbfs.root";
@ConfigItem(desc = "Notebook Job Name", required = true, defaultValue = "data_migration")
public static final String DB_NOTEBOOK_JOB_NAME = "db.notebook.job.name";
@ConfigItem(desc = "Notebook github url", required = true, defaultValue = "https://github.com/fetchadd/mma_databricks.git")
public static final String DB_NOTEBOOK_GITHUB_URL = "db.notebook.github.url";
@ConfigItem(desc = "Notebook github path", required = true, defaultValue = "notebooks/mma_task")
public static final String DB_NOTEBOOK_GITHUB_PATH = "db.notebook.github.path";
@ConfigItem(desc = "Notebook github branch", required = true, defaultValue = "main")
public static final String DB_NOTEBOOK_GITHUB_BRANCH = "db.notebook.github.main";
@ConfigItem(desc = "Notebook job id", required = false, editable = false)
public static final String DB_NOTEBOOK_JOB_ID = "db.notebook.job.id";
@ConfigItem(desc = "单个任务处理的最多分区数量", type = "int", defaultValue = "50")
public static final String DB_TASK_PARTITION_MAX_NUM = "db.task.partition.max.num";
@ConfigItem(desc = "单个任务处理的最大数量(单位G)", type = "int", defaultValue = "5")
public static final String DB_TASK_PARTITION_MAX_SIZE = "db.task.partition.max.size";

@ConfigItem(desc = "表黑名单, 格式为db.table", type="list", defaultValue = "[" +
"\"*.catalog_privileges\",\"*.catalog_tags\",\"*.catalogs\",\"*.check_constraints\"," +
"\"*.column_masks\",\"*.column_tags\",\"*.columns\",\"*.constraint_column_usage\"," +
"\"*.constraint_table_usage\",\"*.information_schema_catalog_name\",\"*.key_column_usage\"," +
"\"*.parameters\",\"*.referential_constraints\",\"*.routine_columns\",\"*.routine_privileges\"," +
"\"*.routines\",\"*.row_filters\",\"*.schema_privileges\",\"*.schema_tags\",\"*.schemata\"," +
"\"*.table_constraints\",\"*.table_privileges\",\"*.table_tags\",\"*.tables\",\"*.views\"," +
"\"*.volume_privileges\",\"*.volume_tags\",\"*.volumes\",\"*.catalog_provider_share_usage\"," +
"\"*.connections\",\"*.external_location_privileges\",\"*.external_locations\"," +
"\"*.metastore_privileges\",\"*.metastores\",\"*.providers\",\"*.recipient_allowed_ip_ranges\"," +
"\"*.recipient_tokens\",\"*.recipients\",\"*.share_recipient_privileges\",\"*.shares\"," +
"\"*.storage_credential_privileges\",\"*.storage_credentials\",\"*.table_share_usage\"]"
)
public static String TABLE_BLACKLIST = "source.table.blacklist";

public String getWorkspaceUrl() {
return getConfig(DB_WORKSPACE_URL);
}

public String getClusterId() {
return getConfig(DB_CLUSTER_ID);
}

public String getJdbcUrl() {
Pattern urlPattern = Pattern.compile(
"https://(?<host>adb-(?<hostId>\\d+)\\.(?:\\d+)\\.(?:\\w+)\\.(?:net|com))"
);
String workspaceURL = getConfig(DB_WORKSPACE_URL);
Matcher matcher = urlPattern.matcher(workspaceURL);

if (!matcher.matches()) {
throw new RuntimeException("invalid workspace url: " + workspaceURL);
}

String host = matcher.group("host");
String hostId = matcher.group("hostId");
String clusterId = getConfig(DB_CLUSTER_ID);

return String.format(
"jdbc:databricks://%s:443/default;transportMode=http;ssl=1;httpPath=sql/protocolv1/o/%s/%s;AuthMech=3;UID=token;",
host,
hostId,
clusterId
);
}

public String getNotebookJobName() {
return getConfig(DB_NOTEBOOK_JOB_NAME);
}

public String getGithubURL() {
return getConfig(DB_NOTEBOOK_GITHUB_URL);
}

public String getGithubBranch() {
return getConfig(DB_NOTEBOOK_GITHUB_BRANCH);
}

public String getGithubPath() {
return getConfig(DB_NOTEBOOK_GITHUB_PATH);
}

public String getAccessToken() {
return getConfig(DB_ACCESS_TOKEN);
}

public String getDbfsRoot() {
return getConfig(DB_DBFS_ROOT);
}

public Long getNotebookJobId() {
return getLong(DB_NOTEBOOK_JOB_ID);
}

public void setNotebookJobId(Long jobId) {
setConfig(DB_NOTEBOOK_JOB_ID, jobId.toString());
}

@Override
public PartitionGrouping getPartitionGrouping() {
return new DatabricksPartitionGrouping(
getInteger(DB_TASK_PARTITION_MAX_NUM),
getInteger(DB_TASK_PARTITION_MAX_SIZE)
);
}

@Override
public TaskType defaultTaskType() {
return TaskType.DATABRICKS;
}

@Override
public SourceType getSourceType() {
return SourceType.DATABRICKS;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.aliyun.odps.mma.config;

import java.util.Arrays;
import java.util.List;

import com.aliyun.odps.mma.constant.SourceType;
import com.aliyun.odps.mma.constant.TaskType;
import com.aliyun.odps.mma.task.HivePartitionGrouping;
Expand Down Expand Up @@ -48,6 +51,12 @@ public class HiveConfig extends SourceConfig {
public static String DATABASE_WHITELIST = "source.database.whitelist";
@ConfigItem(desc = "数据库黑名单", type="list", defaultValue = "[\"default\"]")
public static String DATABASE_BLACKLIST = "source.database.blacklist";
@ConfigItem(desc = "Hive UDTF 下载链接", defaultValue = "https://mma-v3.oss-cn-zhangjiakou.aliyuncs.com/udtf/hive-udtf.jar")
public static String HIVE_UDTF_JAR_OSS_URL = "hive.udtf.jar.oss.url";
@ConfigItem(desc = "Hive UDTF 名字", defaultValue = "default.odps_data_dump_multi")
public static String HIVE_UDTF_NAME = "hive.udtf.name";
@ConfigItem(desc = "Hive UDTF Class", defaultValue = "hive.com.aliyun.odps.mma.io.McDataTransmissionUDTF")
public static String HIVE_UDTF_CLASS = "hive.udtf.class";


public HiveConfig() {
Expand All @@ -71,4 +80,12 @@ public TaskType defaultTaskType() {
public SourceType getSourceType() {
return SourceType.HIVE;
}

@Override
protected List<String> itemMasks() {
return Arrays.asList(
HiveConfig.HIVE_UDTF_NAME,
HiveConfig.HIVE_UDTF_CLASS
);
}
}
Loading

0 comments on commit 8ea790d

Please sign in to comment.