From cc14a3eef9d8c3ec9620dda9d93d992fc70b3500 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 16 Jul 2024 19:08:36 +0800 Subject: [PATCH] Rename `__op_type__` to more generic `__data_event_type__` --- docs/content.zh/docs/core-concept/transform.md | 12 ++++++------ docs/content/docs/core-concept/transform.md | 12 ++++++------ .../composer/flink/FlinkPipelineComposerITCase.java | 4 ++-- .../transform/ProjectionColumnProcessor.java | 8 ++++---- .../transform/TransformFilterProcessor.java | 8 ++++---- .../flink/cdc/runtime/parser/TransformParser.java | 11 ++++++----- 6 files changed, 28 insertions(+), 27 deletions(-) diff --git a/docs/content.zh/docs/core-concept/transform.md b/docs/content.zh/docs/core-concept/transform.md index a0cc93b916..dd7555cf61 100644 --- a/docs/content.zh/docs/core-concept/transform.md +++ b/docs/content.zh/docs/core-concept/transform.md @@ -47,12 +47,12 @@ Multiple rules can be declared in one single pipeline YAML file. ## Fields definition There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules. -| Field | Data Type | Description | -|--------------------|-----------|----------------------------------------------| -| __namespace_name__ | String | Name of the namespace that contains the row. | -| __schema_name__ | String | Name of the schema that contains the row. | -| __table_name__ | String | Name of the table that contains the row. | -| __row_kind__ | String | Operation type of data change event. | +| Field | Data Type | Description | +|---------------------|-----------|----------------------------------------------| +| __namespace_name__ | String | Name of the namespace that contains the row. | +| __schema_name__ | String | Name of the schema that contains the row. | +| __table_name__ | String | Name of the table that contains the row. | +| __data_event_type__ | String | Operation type of data change event. | ## Metadata relationship diff --git a/docs/content/docs/core-concept/transform.md b/docs/content/docs/core-concept/transform.md index a0cc93b916..dd7555cf61 100644 --- a/docs/content/docs/core-concept/transform.md +++ b/docs/content/docs/core-concept/transform.md @@ -47,12 +47,12 @@ Multiple rules can be declared in one single pipeline YAML file. ## Fields definition There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules. -| Field | Data Type | Description | -|--------------------|-----------|----------------------------------------------| -| __namespace_name__ | String | Name of the namespace that contains the row. | -| __schema_name__ | String | Name of the schema that contains the row. | -| __table_name__ | String | Name of the table that contains the row. | -| __row_kind__ | String | Operation type of data change event. | +| Field | Data Type | Description | +|---------------------|-----------|----------------------------------------------| +| __namespace_name__ | String | Name of the namespace that contains the row. | +| __schema_name__ | String | Name of the schema that contains the row. | +| __table_name__ | String | Name of the table that contains the row. | +| __data_event_type__ | String | Operation type of data change event. | ## Metadata relationship diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 42a55a49cd..034f143b46 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -358,7 +358,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { TransformDef transformDef = new TransformDef( "default_namespace.default_schema.table1", - "*,concat(col1,'0') as col12,__row_kind__ as rk", + "*,concat(col1,'0') as col12,__data_event_type__ as rk", "col1 <> '3'", "col1", "col12", @@ -384,7 +384,7 @@ void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { String[] outputEvents = outCaptor.toString().trim().split("\n"); assertThat(outputEvents) .containsExactly( - "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`__row_kind__` STRING NOT NULL,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", + "CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`__data_event_type__` STRING NOT NULL,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, +I, 10, +I], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, +I, 20, +I], op=INSERT, meta=()}", "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 74495ecafe..9e14899141 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -90,7 +90,7 @@ private Object[] generateParams(BinaryRecordData data, long epochTime, String op params.add(tableInfo.getTableName()); continue; } - if (originalColumnName.equals(TransformParser.DEFAULT_ROW_KIND)) { + if (originalColumnName.equals(TransformParser.DEFAULT_DATA_EVENT_TYPE)) { params.add(opType); continue; } @@ -143,9 +143,9 @@ private TransformExpressionKey generateTransformExpressionKey() { paramTypes.add(String.class); } - if (scriptExpression.contains(TransformParser.DEFAULT_ROW_KIND) - && !argumentNames.contains(TransformParser.DEFAULT_ROW_KIND)) { - argumentNames.add(TransformParser.DEFAULT_ROW_KIND); + if (scriptExpression.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE) + && !argumentNames.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE)) { + argumentNames.add(TransformParser.DEFAULT_DATA_EVENT_TYPE); paramTypes.add(String.class); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 8a9b43c7a1..c5ae869432 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -85,7 +85,7 @@ private Object[] generateParams(BinaryRecordData data, long epochTime, String op params.add(tableInfo.getTableName()); continue; } - if (columnName.equals(TransformParser.DEFAULT_ROW_KIND)) { + if (columnName.equals(TransformParser.DEFAULT_DATA_EVENT_TYPE)) { params.add(opType); continue; } @@ -138,9 +138,9 @@ private TransformExpressionKey generateTransformExpressionKey() { paramTypes.add(String.class); } - if (scriptExpression.contains(TransformParser.DEFAULT_ROW_KIND) - && !argumentNames.contains(TransformParser.DEFAULT_ROW_KIND)) { - argumentNames.add(TransformParser.DEFAULT_ROW_KIND); + if (scriptExpression.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE) + && !argumentNames.contains(TransformParser.DEFAULT_DATA_EVENT_TYPE)) { + argumentNames.add(TransformParser.DEFAULT_DATA_EVENT_TYPE); paramTypes.add(String.class); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index ebfa517978..c7527cef3b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -77,7 +77,7 @@ public class TransformParser { public static final String DEFAULT_NAMESPACE_NAME = "__namespace_name__"; public static final String DEFAULT_SCHEMA_NAME = "__schema_name__"; public static final String DEFAULT_TABLE_NAME = "__table_name__"; - public static final String DEFAULT_ROW_KIND = "__row_kind__"; + public static final String DEFAULT_DATA_EVENT_TYPE = "__data_event_type__"; private static SqlParser getCalciteParser(String sql) { return SqlParser.create( @@ -368,9 +368,10 @@ private static List copyFillMetadataColumn( && !containsMetadataColumn(columnsWithMetadata, DEFAULT_TABLE_NAME)) { columnsWithMetadata.add(Column.physicalColumn(DEFAULT_TABLE_NAME, DataTypes.STRING())); } - if (transformStatement.contains(DEFAULT_ROW_KIND) - && !containsMetadataColumn(columnsWithMetadata, DEFAULT_ROW_KIND)) { - columnsWithMetadata.add(Column.physicalColumn(DEFAULT_ROW_KIND, DataTypes.STRING())); + if (transformStatement.contains(DEFAULT_DATA_EVENT_TYPE) + && !containsMetadataColumn(columnsWithMetadata, DEFAULT_DATA_EVENT_TYPE)) { + columnsWithMetadata.add( + Column.physicalColumn(DEFAULT_DATA_EVENT_TYPE, DataTypes.STRING())); } return columnsWithMetadata; } @@ -383,7 +384,7 @@ private static boolean isMetadataColumn(String columnName) { return DEFAULT_TABLE_NAME.equals(columnName) || DEFAULT_SCHEMA_NAME.equals(columnName) || DEFAULT_NAMESPACE_NAME.equals(columnName) - || DEFAULT_ROW_KIND.equals(columnName); + || DEFAULT_DATA_EVENT_TYPE.equals(columnName); } public static SqlSelect parseFilterExpression(String filterExpression) {