Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34865][pipeline-connector]support sync table and column comments #3482

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ pipeline:
<td>String</td>
<td> 是否通过FE重定向写入,直连BE写入 </td>
</tr>
<tr>
<td>charset-encoding</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td> Doris Http客户端字符集编码,默认UTF-8 </td>
</tr>
<tr>
<td>sink.enable.batch-mode</td>
<td>optional</td>
Expand Down
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ pipeline:
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ pipeline:
<td>String</td>
<td> Whether to write through FE redirection and directly connect to BE to write </td>
</tr>
<tr>
<td>charset-encoding</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td> Charset encoding for doris http client, default UTF-8 </td>
</tr>
<tr>
<td>sink.enable.batch-mode</td>
<td>optional</td>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ pipeline:
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.<br>
Note: Enable this option will bring the implications on memory usage.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public String toString() {
if (!partitionKeys.isEmpty()) {
sb.append(", partitionKeys=").append(String.join(";", partitionKeys));
}
sb.append(", comment=").append(comment);
sb.append(", options=").append(describeOptions());

return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
class FlinkPipelineComposerITCase {

private static final int MAX_PARALLELISM = 4;
private static final String LINE_SEPARATOR = System.lineSeparator();

// Always use parent-first classloader for CDC classes.
// The reason is that ValuesDatabase uses static field for holding data, we need to make sure
Expand Down Expand Up @@ -154,10 +155,10 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception
"default_namespace.default_schema.table1:col1=3;newCol3=");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -216,11 +217,11 @@ void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Except
"default_namespace.default_schema.table2:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -323,10 +324,10 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, comment=null, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
Expand Down Expand Up @@ -390,10 +391,10 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, comment=null, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
Expand Down Expand Up @@ -453,11 +454,11 @@ void testOneToOneRouting() throws Exception {
"default_namespace.default_schema.routed2:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.routed1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.routed2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.routed1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.routed2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -521,11 +522,11 @@ void testIdenticalOneToOneRouting() throws Exception {
"default_namespace.default_schema.table2:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -710,10 +711,10 @@ void testMergingWithRoute() throws Exception {
.physicalColumn("gender", DataTypes.STRING())
.primaryKey("id")
.build());
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
Expand Down Expand Up @@ -910,10 +911,10 @@ void testTransformMergingWithRoute() throws Exception {
.physicalColumn("gender", DataTypes.STRING())
.primaryKey("id")
.build());
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, primaryKeys=id, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18, last_name], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20, last_name], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}",
Expand Down Expand Up @@ -967,11 +968,11 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ limitations under the License.

<properties>
<doris.connector.version>1.6.2</doris.connector.version>
<testcontainers.version>1.18.3</testcontainers.version>
<mysql.connector.version>8.0.26</mysql.connector.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -84,13 +86,13 @@ limitations under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<version>${mysql.connector.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
Expand Down Expand Up @@ -143,6 +144,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(JDBC_URL);
options.add(PASSWORD);
options.add(AUTO_REDIRECT);
options.add(CHARSET_ENCODING);

options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class DorisDataSinkOptions {
.withDescription(
"Use automatic redirection of fe without explicitly obtaining the be list");

public static final ConfigOption<String> CHARSET_ENCODING =
ConfigOptions.key("charset-encoding")
.stringType()
.defaultValue("UTF-8")
.withDescription("Charset encoding for doris http client, default UTF-8.");

qg-lin marked this conversation as resolved.
Show resolved Hide resolved
// Streaming Sink options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
ConfigOptions.key("sink.enable-2pc")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;

/** Supports {@link DorisDataSink} to schema evolution. */
Expand All @@ -74,7 +75,8 @@ public class DorisMetadataApplier implements MetadataApplier {

public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
this.dorisOptions = dorisOptions;
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
this.schemaChangeManager =
new SchemaChangeManager(dorisOptions, config.get(CHARSET_ENCODING));
this.config = config;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
Expand Down Expand Up @@ -127,6 +129,7 @@ private void applyCreateTableEvent(CreateTableEvent event)
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
tableSchema.setTableComment(schema.comment());

if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);
Expand Down
Loading
Loading