Skip to content

Commit

Permalink
feat: change animation duration to 100ms (DataLinkDC#2561)
Browse files Browse the repository at this point in the history
* feat: change animation duration to 100ms

Signed-off-by: licho <[email protected]>

* fix: spotless

Signed-off-by: licho <[email protected]>

* fix: spotless

Signed-off-by: licho <[email protected]>

* fix: spotless

Signed-off-by: licho <[email protected]>

* chore: java spotless

* chore: remove empty tsx file

* refactor: spotless for ts* format regrex

Signed-off-by: licho <[email protected]>

* Spotless Apply

---------

Signed-off-by: licho <[email protected]>
Co-authored-by: leechor <[email protected]>
  • Loading branch information
leechor and leechor authored Nov 22, 2023
1 parent 16f634d commit b466597
Show file tree
Hide file tree
Showing 285 changed files with 3,593 additions and 3,544 deletions.
2 changes: 1 addition & 1 deletion dinky-cdc/dinky-cdc-plus/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@
<build>
<resources>
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>**/*.*</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
</build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

package org.dinky.cdc.doris;

import com.google.common.base.Strings;
import org.dinky.cdc.SinkBuilder;
import org.dinky.data.model.Column;
import org.dinky.data.model.FlinkCDCConfig;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.table.data.RowData;
Expand All @@ -36,18 +39,18 @@
import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.RowKind;
import org.dinky.cdc.SinkBuilder;
import org.dinky.data.model.Column;
import org.dinky.data.model.FlinkCDCConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Strings;

public class DorisExtendSinkBuilder extends DorisSinkBuilder implements Serializable {

public static final String KEY_WORD = "datastream-doris-ext";
Expand Down Expand Up @@ -76,11 +79,7 @@ public SinkBuilder create(FlinkCDCConfig config) {

@SuppressWarnings("rawtypes")
@Override
protected Object buildRowDataValues(
Map value,
RowKind rowKind,
String columnName,
LogicalType columnType) {
protected Object buildRowDataValues(Map value, RowKind rowKind, String columnName, LogicalType columnType) {
if (additionalColumnConfigList == null) {
return convertValue(getOriginRowData(rowKind, value).get(columnName), columnType);
}
Expand All @@ -99,23 +98,20 @@ protected Object buildRowDataValues(
case "op_ts":
Object opVal = source.get("ts_ms");
if (opVal instanceof Integer) {
return TimestampData.fromLocalDateTime(
Instant.ofEpochMilli(((Integer) opVal).longValue())
.atZone(this.getSinkTimeZone())
.toLocalDateTime());
return TimestampData.fromLocalDateTime(Instant.ofEpochMilli(((Integer) opVal).longValue())
.atZone(this.getSinkTimeZone())
.toLocalDateTime());
}

if (opVal instanceof Long) {
return TimestampData.fromLocalDateTime(
Instant.ofEpochMilli((long) opVal)
.atZone(this.getSinkTimeZone())
.toLocalDateTime());
return TimestampData.fromLocalDateTime(Instant.ofEpochMilli((long) opVal)
.atZone(this.getSinkTimeZone())
.toLocalDateTime());
}

return TimestampData.fromLocalDateTime(
Instant.parse(value.toString())
.atZone(this.getSinkTimeZone())
.toLocalDateTime());
return TimestampData.fromLocalDateTime(Instant.parse(value.toString())
.atZone(this.getSinkTimeZone())
.toLocalDateTime());
case "database_name":
return convertValue(source.get("db"), columnType);
case "table_name":
Expand All @@ -136,23 +132,19 @@ protected DataStream<RowData> buildRowData(
List<LogicalType> columnTypeList,
String schemaTableName) {
logger.info("sinkTimeZone:{}", this.getSinkTimeZone());
return filterOperator.flatMap(
sinkRowDataFunction(columnNameList, columnTypeList, schemaTableName));
return filterOperator.flatMap(sinkRowDataFunction(columnNameList, columnTypeList, schemaTableName));
}


@Override
protected void buildColumn(
List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
protected void buildColumn(List<String> columnNameList, List<LogicalType> columnTypeList, List<Column> columns) {
for (Column column : columns) {
columnNameList.add(column.getName());
columnTypeList.add(getLogicalType(column));
}
if (this.additionalColumnConfigList != null && this.additionalColumnConfigList.size() > 0) {
logger.info("Start add additional column");
this.additionalColumnConfigList.forEach((key, value) -> {
logger.info(
"col: { name: {}, type:{}, val: {}}", key, value.getKey(), value.getValue());
logger.info("col: { name: {}, type:{}, val: {}}", key, value.getKey(), value.getValue());

switch (value.getKey()) {
case "META":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@

public class DorisExtendSinkOptions extends DorisSinkOptions {

public static final ConfigOption<String> AdditionalColumns =
ConfigOptions.key("additional-columns")
.stringType()
.noDefaultValue()
.withDescription(
"Additional columns for sink, support meta column and fix value column."
+ "(meta: op_ts,database_name,schema_name,table_name; "
+ "fix value column type:BOOLEAN,INT,TINYINT,BIGINT,DECIMAL,FLOAT,DATE,TIMESTAMP,CHAR,VARCHAR,STRING)");
public static final ConfigOption<String> AdditionalColumns = ConfigOptions.key("additional-columns")
.stringType()
.noDefaultValue()
.withDescription(
"Additional columns for sink, support meta column and fix value column."
+ "(meta: op_ts,database_name,schema_name,table_name; "
+ "fix value column type:BOOLEAN,INT,TINYINT,BIGINT,DECIMAL,FLOAT,DATE,TIMESTAMP,CHAR,VARCHAR,STRING)");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@

package org.dinky.cdc.doris;

import org.dinky.assertion.Asserts;
import org.dinky.cdc.AbstractSinkBuilder;
import org.dinky.cdc.CDCBuilder;
import org.dinky.cdc.SinkBuilder;
import org.dinky.data.model.FlinkCDCConfig;
import org.dinky.data.model.Schema;
import org.dinky.data.model.Table;
import org.dinky.executor.CustomTableEnvironment;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
Expand All @@ -30,14 +39,6 @@
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.dinky.assertion.Asserts;
import org.dinky.cdc.AbstractSinkBuilder;
import org.dinky.cdc.CDCBuilder;
import org.dinky.cdc.SinkBuilder;
import org.dinky.data.model.FlinkCDCConfig;
import org.dinky.data.model.Schema;
import org.dinky.data.model.Table;
import org.dinky.executor.CustomTableEnvironment;

import java.io.Serializable;
import java.util.HashMap;
Expand Down Expand Up @@ -82,7 +83,6 @@ public DataStreamSource<String> build(
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");


final List<Schema> schemaList = config.getSchemaList();
if (!Asserts.isNotNullCollection(schemaList)) {
return dataStreamSource;
Expand All @@ -102,106 +102,79 @@ public DataStreamSource<String> build(
}
}

SingleOutputStreamOperator<String> process =
mapOperator.process(
new ProcessFunction<Map, String>() {

@Override
public void processElement(
Map map, Context ctx, Collector<String> out)
throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
String result = objectMapper.writeValueAsString(map);
try {
Table table =
tableMap.get(
source.get(schemaFieldName).toString()
+ "."
+ source.get("table").toString());
ctx.output(tagMap.get(table), result);
} catch (Exception e) {
out.collect(result);
}
}
});

tagMap.forEach(
(table, v) -> {
DorisOptions dorisOptions =
DorisOptions.builder()
.setFenodes(
config.getSink()
.get(DorisSinkOptions.FENODES.key()))
.setTableIdentifier(
getSinkSchemaName(table)
+ "."
+ getSinkTableName(table))
.setUsername(
config.getSink()
.get(DorisSinkOptions.USERNAME.key()))
.setPassword(
config.getSink()
.get(DorisSinkOptions.PASSWORD.key()))
.build();

DorisExecutionOptions.Builder executionBuilder =
DorisExecutionOptions.builder();

if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_COUNT.key())) {
executionBuilder.setBufferCount(
Integer.parseInt(
sink.get(DorisSinkOptions.SINK_BUFFER_COUNT.key())));
}

if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_SIZE.key())) {
executionBuilder.setBufferSize(
Integer.parseInt(
sink.get(DorisSinkOptions.SINK_BUFFER_SIZE.key())));
}

if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) {
executionBuilder.setDeletable(
Boolean.valueOf(
sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key())));
} else {
executionBuilder.setDeletable(true);
}

if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) {
executionBuilder.setLabelPrefix(
String.format("%s-%s_%s", sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key()),
getSinkSchemaName(table), getSinkTableName(table)));
} else {
executionBuilder.setLabelPrefix(
String.format("dinky-%s_%s%s", getSinkSchemaName(table), getSinkTableName(table),
UUID.randomUUID()));
}

if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) {
executionBuilder.setMaxRetries(
Integer.valueOf(
sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key())));
}

executionBuilder.setStreamLoadProp(properties).setDeletable(true);

DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
SingleOutputStreamOperator<String> process = mapOperator.process(new ProcessFunction<Map, String>() {

@Override
public void processElement(Map map, Context ctx, Collector<String> out) throws Exception {
LinkedHashMap source = (LinkedHashMap) map.get("source");
String result = objectMapper.writeValueAsString(map);
try {
Table table = tableMap.get(source.get(schemaFieldName).toString()
+ "."
+ source.get("table").toString());
ctx.output(tagMap.get(table), result);
} catch (Exception e) {
out.collect(result);
}
}
});

tagMap.forEach((table, v) -> {
DorisOptions dorisOptions = DorisOptions.builder()
.setFenodes(config.getSink().get(DorisSinkOptions.FENODES.key()))
.setTableIdentifier(getSinkSchemaName(table) + "." + getSinkTableName(table))
.setUsername(config.getSink().get(DorisSinkOptions.USERNAME.key()))
.setPassword(config.getSink().get(DorisSinkOptions.PASSWORD.key()))
.build();

DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();

if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_COUNT.key())) {
executionBuilder.setBufferCount(Integer.parseInt(sink.get(DorisSinkOptions.SINK_BUFFER_COUNT.key())));
}

if (sink.containsKey(DorisSinkOptions.SINK_BUFFER_SIZE.key())) {
executionBuilder.setBufferSize(Integer.parseInt(sink.get(DorisSinkOptions.SINK_BUFFER_SIZE.key())));
}

if (sink.containsKey(DorisSinkOptions.SINK_ENABLE_DELETE.key())) {
executionBuilder.setDeletable(Boolean.valueOf(sink.get(DorisSinkOptions.SINK_ENABLE_DELETE.key())));
} else {
executionBuilder.setDeletable(true);
}

if (sink.containsKey(DorisSinkOptions.SINK_LABEL_PREFIX.key())) {
executionBuilder.setLabelPrefix(String.format(
"%s-%s_%s",
sink.get(DorisSinkOptions.SINK_LABEL_PREFIX.key()),
getSinkSchemaName(table),
getSinkTableName(table)));
} else {
executionBuilder.setLabelPrefix(String.format(
"dinky-%s_%s%s", getSinkSchemaName(table), getSinkTableName(table), UUID.randomUUID()));
}

if (sink.containsKey(DorisSinkOptions.SINK_MAX_RETRIES.key())) {
executionBuilder.setMaxRetries(Integer.valueOf(sink.get(DorisSinkOptions.SINK_MAX_RETRIES.key())));
}

executionBuilder.setStreamLoadProp(properties).setDeletable(true);

DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
.setDorisExecutionOptions(executionBuilder.build())
.setDorisOptions(dorisOptions)
.setSerializer(JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisOptions)
.setSerializer(
JsonDebeziumSchemaSerializer.builder()
.setDorisOptions(dorisOptions)
.build());

process.getSideOutput(v)
.rebalance()
.sinkTo(builder.build())
.name(
String.format("Doris Schema Evolution Sink(table=[%s.%s])",
getSinkSchemaName(table),
getSinkTableName(table)));
});
.build());

process.getSideOutput(v)
.rebalance()
.sinkTo(builder.build())
.name(String.format(
"Doris Schema Evolution Sink(table=[%s.%s])",
getSinkSchemaName(table), getSinkTableName(table)));
});
return dataStreamSource;
}

Expand All @@ -213,8 +186,7 @@ protected Properties getProperties() {
if (Asserts.isNotNullString(entry.getKey())
&& entry.getKey().startsWith("sink.properties")
&& Asserts.isNotNullString(entry.getValue())) {
properties.setProperty(
entry.getKey().replace("sink.properties.", ""), entry.getValue());
properties.setProperty(entry.getKey().replace("sink.properties.", ""), entry.getValue());
}
}
return properties;
Expand Down
Loading

0 comments on commit b466597

Please sign in to comment.