Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Synchronize ddl schema change to Starrocks by parsing … #338

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/content/Realtime synchronization from MySQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d
--sink-conf password= Password1 \
--sink-conf jdbc-url=jdbc:mysql://ip:9030 \
--sink-conf sink.label-prefix=superman \
--table-conf replication_num=1
--table-conf replication_num=1 \
--table-conf fast_schema_evolution=true
```

## Options
Expand All @@ -103,4 +104,4 @@ Flink has been deployed. If Flink has not been deployed, follow these steps to d
| --sink-conf password | Yes | NONE | The password of the StarRocks |
| --sink-conf sink.label-prefix | Yes | No | stream load label |
| --table-conf replication_num | Yes | 3 | table property |

| --table-conf fast_schema_evolution| Yes | TRUE | table property for fast schema evolution, add/drop column
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

public abstract class DatabaseSync {
private static final Logger LOG = LoggerFactory.getLogger(DatabaseSync.class);
private static final String FAST_SCHEMA_EVOLUTION = "fast_schema_evolution";
protected Configuration config;
protected String database;
protected TableNameConverter converter;
Expand Down Expand Up @@ -79,6 +80,10 @@ public void create(StreamExecutionEnvironment env, String database, Configuratio
this.excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables);
this.sinkConfig = sinkConfig;
this.tableConfig = tableConfig == null ? new HashMap<>() : tableConfig;

if (!this.tableConfig.containsKey(FAST_SCHEMA_EVOLUTION)) {
this.tableConfig.put(FAST_SCHEMA_EVOLUTION, "true");
}
}

public void build() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ public class StarRocksOptions implements Serializable {
private String tableIdentifier;

public StarRocksOptions(String username, String password, String tableIdentifier, String jdbcUrl) {
this.opts = new StarRocksJdbcConnectionOptions(username, password, jdbcUrl);
this.opts = new StarRocksJdbcConnectionOptions(jdbcUrl, username, password);
this.tableIdentifier = tableIdentifier;
}

public String getTableIdentifier() {
return tableIdentifier;
}

public StarRocksJdbcConnectionOptions getOpts() {
return opts;
}

public String save() throws IllegalArgumentException {
Properties copy = new Properties();
return IOUtils.propsToString(copy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.starrocks.connector.flink.cdc.StarRocksOptions;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
Expand All @@ -33,9 +35,12 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -59,6 +64,7 @@ public class DebeziumJsonSerializer implements Serializable {
private String table;
//table name of the cdc upstream, format is db.tbl
private String sourceTableName;
private StarRocksCatalog starRocksCatalog;

public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern, String sourceTableName) {
this.starRocksOptions = starRocksOptions;
Expand All @@ -71,6 +77,9 @@ public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern
this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
this.objectMapper.setNodeFactory(jsonNodeFactory);
this.starRocksCatalog = new StarRocksCatalog(starRocksOptions.getOpts().getDbURL(),
starRocksOptions.getOpts().getUsername().get(), starRocksOptions.getOpts().getPassword().get());
this.starRocksCatalog.open();
}

public String process(String record) throws IOException {
Expand All @@ -79,8 +88,7 @@ public String process(String record) throws IOException {
String op = extractJsonNode(recordRoot, "op");
if (Objects.isNull(op)) {
// schema change ddl
// starrocks 存算分离版本目前不支持schemaChange, 先注释掉
// schemaChange(recordRoot);
schemaChange(recordRoot);
return INVALID_RESULT;
}
Map<String, String> valueMap;
Expand All @@ -107,22 +115,17 @@ public String process(String record) throws IOException {

@VisibleForTesting
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;

try{
if(!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)){
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
String ddl = extractDDL(recordRoot);
if(StringUtils.isNullOrWhitespaceOnly(ddl)){
LOG.info("ddl can not do schema change:{}", recordRoot);
return false;
}
// TODO Exec schema change
LOG.info("schema change status:{}", status);

extractDDLAndExecute(recordRoot);
}catch (Exception ex){
LOG.warn("schema change error :", ex);
}
return status;
return true;
}

/**
Expand Down Expand Up @@ -174,27 +177,46 @@ private Map<String, String> extractRow(JsonNode recordRow) {
return recordMap != null ? recordMap : new HashMap<>();
}

public String extractDDL(JsonNode record) throws JsonProcessingException {
private void extractDDLAndExecute(JsonNode record) throws JsonProcessingException {
String historyRecord = extractJsonNode(record, "historyRecord");
if (Objects.isNull(historyRecord)) {
return null;
return;
}
String ddl = extractJsonNode(objectMapper.readTree(historyRecord), "ddl");
LOG.debug("received debezium ddl :{}", ddl);
if (!Objects.isNull(ddl)) {
//filter add/drop operation
Matcher matcher = addDropDDLPattern.matcher(ddl);
if(matcher.find()){
if (matcher.find()) {
String op = matcher.group(1);
String col = matcher.group(3);

if (op.equalsIgnoreCase("drop")) {
execDropDDL(col);
return;
}

String type = matcher.group(5);
type = handleType(type);
ddl = String.format(EXECUTE_DDL, starRocksOptions.getTableIdentifier(), op, col, type);
LOG.info("parse ddl:{}", ddl);
return ddl;
execAddDDL(col, type);
}
}
return null;
}

private void execAddDDL(String col, String type) {
List<StarRocksColumn> toAddColumns = new ArrayList<>();
StarRocksColumn.Builder builder = new StarRocksColumn.Builder()
.setColumnName(col)
.setDataType(type);

toAddColumns.add(builder.build());

starRocksCatalog.alterAddColumns(database, table, toAddColumns, 30);
}

private void execDropDDL(String col) {
List<String> cols = Arrays.asList(col);
starRocksCatalog.alterDropColumns(database, table, cols, 30);
}

public static DebeziumJsonSerializer.Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static void main(String[] args) throws Exception{

Map<String,String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
tableConfig.put("fast_schema_evolution", "true");

String includingTables = "tbl1|tbl2|tbl3";
String excludingTables = "";
Expand Down
Loading