Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34865][pipeline-connector]support sync table and column comments #3482

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ pipeline:
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。<br>
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>是否启用同步表、字段注释特性,默认关闭。注意:开启此特性将会对内存使用产生影响。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ pipeline:
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>include-comments.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether enable include table and column comments, by default is false, if set to true, the table and column comments will be sent.<br>
Note: Enable this option will bring the implications on memory usage.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public String toString() {
if (!partitionKeys.isEmpty()) {
sb.append(", partitionKeys=").append(String.join(";", partitionKeys));
}
sb.append(", comment=").append(comment);
sb.append(", options=").append(describeOptions());

return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
class FlinkPipelineComposerITCase {

private static final int MAX_PARALLELISM = 4;
private static final String LINE_SEPARATOR = System.lineSeparator();

// Always use parent-first classloader for CDC classes.
// The reason is that ValuesDatabase uses static field for holding data, we need to make sure
Expand Down Expand Up @@ -154,10 +155,10 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi sinkApi) throws Exception
"default_namespace.default_schema.table1:col1=3;newCol3=");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -216,11 +217,11 @@ void testSingleSplitMultipleTables(ValuesDataSink.SinkApi sinkApi) throws Except
"default_namespace.default_schema.table2:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -323,10 +324,10 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, comment=null, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
Expand Down Expand Up @@ -390,10 +391,10 @@ void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, partitionKeys=col12, comment=null, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 11], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 22], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
Expand Down Expand Up @@ -453,11 +454,11 @@ void testOneToOneRouting() throws Exception {
"default_namespace.default_schema.routed2:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.routed1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.routed2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.routed1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.routed2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.routed1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down Expand Up @@ -710,10 +711,10 @@ void testMergingWithRoute() throws Exception {
.physicalColumn("gender", DataTypes.STRING())
.primaryKey("id")
.build());
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.merged, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, comment=null, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
Expand Down Expand Up @@ -768,11 +769,11 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
String[] outputEvents = outCaptor.toString().trim().split(LINE_SEPARATOR);
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"CreateTableEvent{tableId=replaced_namespace.replaced_schema.table2, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, comment=null, options=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ limitations under the License.

<properties>
<doris.connector.version>1.6.2</doris.connector.version>
<testcontainers.version>1.18.3</testcontainers.version>
<mysql.connector.version>8.0.26</mysql.connector.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -84,13 +86,13 @@ limitations under the License.
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<version>1.18.3</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
<version>${mysql.connector.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.AUTO_REDIRECT;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.BENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.FENODES;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.JDBC_URL;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.PASSWORD;
Expand Down Expand Up @@ -143,6 +144,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(JDBC_URL);
options.add(PASSWORD);
options.add(AUTO_REDIRECT);
options.add(CHARSET_ENCODING);

options.add(SINK_CHECK_INTERVAL);
options.add(SINK_ENABLE_2PC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class DorisDataSinkOptions {
.withDescription(
"Use automatic redirection of fe without explicitly obtaining the be list");

public static final ConfigOption<String> CHARSET_ENCODING =
ConfigOptions.key("charset-encoding")
.stringType()
.defaultValue("UTF-8")
.withDescription("Charset encoding for doris http client, default UTF-8.");

qg-lin marked this conversation as resolved.
Show resolved Hide resolved
// Streaming Sink options
public static final ConfigOption<Boolean> SINK_ENABLE_2PC =
ConfigOptions.key("sink.enable-2pc")
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this PR also enables Doris connector to apply comments from upstream. Could you please also add an comment sync E2e case in MySql2DorisE2eITCase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My pleasure

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.CHARSET_ENCODING;
import static org.apache.flink.cdc.connectors.doris.sink.DorisDataSinkOptions.TABLE_CREATE_PROPERTIES_PREFIX;

/** Supports {@link DorisDataSink} to schema evolution. */
Expand All @@ -74,7 +75,8 @@ public class DorisMetadataApplier implements MetadataApplier {

public DorisMetadataApplier(DorisOptions dorisOptions, Configuration config) {
this.dorisOptions = dorisOptions;
this.schemaChangeManager = new SchemaChangeManager(dorisOptions);
this.schemaChangeManager =
new SchemaChangeManager(dorisOptions, config.get(CHARSET_ENCODING));
this.config = config;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
Expand Down Expand Up @@ -127,6 +129,7 @@ private void applyCreateTableEvent(CreateTableEvent event)
tableSchema.setDatabase(tableId.getSchemaName());
tableSchema.setFields(buildFields(schema));
tableSchema.setDistributeKeys(buildDistributeKeys(schema));
tableSchema.setTableComment(schema.comment());

if (CollectionUtil.isNullOrEmpty(schema.primaryKeys())) {
tableSchema.setModel(DataModel.DUPLICATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.cdc.debezium.table.DebeziumOptions;
import org.apache.flink.table.api.ValidationException;

import io.debezium.relational.RelationalDatabaseConnectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,6 +59,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CONNECT_TIMEOUT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
Expand Down Expand Up @@ -118,6 +121,7 @@ public DataSource createDataSource(Context context) {
double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);

boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
boolean includeComments = config.get(INCLUDE_COMMENTS_ENABLED);

Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL);
Duration connectTimeout = config.get(CONNECT_TIMEOUT);
Expand All @@ -134,6 +138,13 @@ public DataSource createDataSource(Context context) {

Map<String, String> configMap = config.toMap();
OptionUtils.printOptions(IDENTIFIER, config.toMap());
if (includeComments) {
// set debezium config 'include.schema.comments' to true
configMap.put(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
"true");
}

MySqlSourceConfigFactory configFactory =
new MySqlSourceConfigFactory()
Expand Down Expand Up @@ -220,6 +231,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(INCLUDE_COMMENTS_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;

import io.debezium.relational.RelationalDatabaseConnectorConfig;

/** A {@link DataSource} for mysql cdc connector. */
@Internal
public class MySqlDataSource implements DataSource {
Expand All @@ -43,9 +45,18 @@ public MySqlDataSource(MySqlSourceConfigFactory configFactory) {

@Override
public EventSourceProvider getEventSourceProvider() {
boolean includeComments =
sourceConfig
.getDbzConfiguration()
.getBoolean(
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
false);

MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL, sourceConfig.isIncludeSchemaChanges());
DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges(),
includeComments);

MySqlSource<Event> source =
new MySqlSource<>(
Expand Down
Loading
Loading