Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BP-3.2][FLINK-35805][transform] Add __data_event_type__ metadata column #3536

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions docs/content.zh/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.

| Field | Data Type | Description |
|--------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
|---------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
| __data_event_type__ | String | Operation type of data change event. |

## Metadata relationship

Expand Down
9 changes: 5 additions & 4 deletions docs/content/docs/core-concept/transform.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ Multiple rules can be declared in one single pipeline YAML file.
There are some hidden columns used to access metadata information. They will only take effect when explicitly referenced in the transform rules.

| Field | Data Type | Description |
|--------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
|---------------------|-----------|----------------------------------------------|
| __namespace_name__ | String | Name of the namespace that contains the row. |
| __schema_name__ | String | Name of the schema that contains the row. |
| __table_name__ | String | Name of the table that contains the row. |
| __data_event_type__ | String | Operation type of data change event. |

## Metadata relationship

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,66 @@ void testTransform(ValuesDataSink.SinkApi sinkApi) throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20], after=[2, x, 20], op=UPDATE, meta=()}");
}

@ParameterizedTest
@EnumSource
void testOpTypeMetadataColumn(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup transform
TransformDef transformDef =
new TransformDef(
"default_namespace.default_schema.table1",
"*,concat(col1,'0') as col12,__data_event_type__ as rk",
"col1 <> '3'",
"col1",
"col12",
"key1=value1",
"");

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
new ArrayList<>(Collections.singletonList(transformDef)),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1, 10, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2, 20, +I], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 10, -D], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 20, -U], after=[2, x, 20, +U], op=UPDATE, meta=()}");
}

@ParameterizedTest
@EnumSource
void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
+ " - source-table: %s.TABLEBETA\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name\n"
+ " projection: \\*, __namespace_name__ || '.' || __schema_name__ || '.' || __table_name__ AS identifier_name, __data_event_type__ AS type\n"
+ "pipeline:\n"
+ " parallelism: 1",
INTER_CONTAINER_MYSQL_ALIAS,
Expand All @@ -462,25 +462,25 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);

waitUntilSpecificEvent(
String.format(
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING}, primaryKeys=ID, options=()}",
"CreateTableEvent{tableId=%s.TABLEBETA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`CODENAMESBETA` VARCHAR(17),`AGEBETA` INT,`NAMEBETA` VARCHAR(128),`identifier_name` STRING,`type` STRING}, primaryKeys=ID, options=()}",
transformTestDatabase.getDatabaseName()),
60000L);

validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA], op=INSERT, meta=()}");
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Alice, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2012, 12, Monterey, 22, Fred, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2014, 14, Sonoma, 24, Henry, null.%s.TABLEBETA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[], after=[2013, 13, Ventura, 23, Gus, null.%s.TABLEBETA, +I], op=INSERT, meta=()}");

// generate binlogs
String mysqlJdbcUrl =
Expand All @@ -492,9 +492,9 @@ public void testWildcardWithMetadataColumnTransform() throws Exception {
insertBinlogEvents(mysqlJdbcUrl);

validateEvents(
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA], after=[], op=DELETE, meta=()}");
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, null.%s.TABLEALPHA, -U], after=[1009, 100, 0, 18, Bob, null.%s.TABLEALPHA, +U], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, null.%s.TABLEALPHA, +I], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, null.%s.TABLEBETA, -D], after=[], op=DELETE, meta=()}");
}

private static void insertBinlogEvents(String mysqlJdbcUrl) throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
Expand Down Expand Up @@ -389,13 +390,15 @@ private Optional<DataChangeEvent> processFilter(
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
// insert and update event only process afterData, delete only process beforeData
if (after != null) {
if (transformFilterProcessor.process(after, epochTime)) {
if (transformFilterProcessor.process(
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
}
} else if (before != null) {
if (transformFilterProcessor.process(before, epochTime)) {
if (transformFilterProcessor.process(
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'))) {
return Optional.of(dataChangeEvent);
} else {
return Optional.empty();
Expand All @@ -412,11 +415,14 @@ private Optional<DataChangeEvent> processProjection(
BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
if (before != null) {
BinaryRecordData projectedBefore =
postTransformProcessor.processData(before, epochTime);
postTransformProcessor.processData(
before, epochTime, opTypeToRowKind(dataChangeEvent.op(), '-'));
dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
}
if (after != null) {
BinaryRecordData projectedAfter = postTransformProcessor.processData(after, epochTime);
BinaryRecordData projectedAfter =
postTransformProcessor.processData(
after, epochTime, opTypeToRowKind(dataChangeEvent.op(), '+'));
dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
}
return Optional.of(dataChangeEvent);
Expand Down Expand Up @@ -499,4 +505,8 @@ private void destroyUdf() {
}
});
}

private String opTypeToRowKind(OperationType opType, char beforeOrAfter) {
return String.format("%c%c", beforeOrAfter, opType.name().charAt(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.runtime.parser.JaninoCompiler;
import org.apache.flink.cdc.runtime.parser.TransformParser;
import org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns;
import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;

import org.codehaus.janino.ExpressionEvaluator;
Expand All @@ -33,6 +33,8 @@
import java.util.LinkedHashSet;
import java.util.List;

import static org.apache.flink.cdc.runtime.parser.metadata.MetadataColumns.METADATA_COLUMNS;

/**
* The processor of the projection column. It processes the data column and the user-defined
* computed columns.
Expand Down Expand Up @@ -79,9 +81,9 @@ public ProjectionColumn getProjectionColumn() {
return projectionColumn;
}

public Object evaluate(BinaryRecordData after, long epochTime) {
public Object evaluate(BinaryRecordData record, long epochTime, String opType) {
try {
return expressionEvaluator.evaluate(generateParams(after, epochTime));
return expressionEvaluator.evaluate(generateParams(record, epochTime, opType));
} catch (InvocationTargetException e) {
LOG.error(
"Table:{} column:{} projection:{} execute failed. {}",
Expand All @@ -93,7 +95,7 @@ public Object evaluate(BinaryRecordData after, long epochTime) {
}
}

private Object[] generateParams(BinaryRecordData after, long epochTime) {
private Object[] generateParams(BinaryRecordData record, long epochTime, String opType) {
List<Object> params = new ArrayList<>();
List<Column> columns = tableInfo.getPreTransformedSchema().getColumns();

Expand All @@ -103,15 +105,18 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) {
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
case MetadataColumns.DEFAULT_NAMESPACE_NAME:
params.add(tableInfo.getNamespace());
continue;
case TransformParser.DEFAULT_SCHEMA_NAME:
case MetadataColumns.DEFAULT_SCHEMA_NAME:
params.add(tableInfo.getSchemaName());
continue;
case TransformParser.DEFAULT_TABLE_NAME:
case MetadataColumns.DEFAULT_TABLE_NAME:
params.add(tableInfo.getTableName());
continue;
case MetadataColumns.DEFAULT_DATA_EVENT_TYPE:
params.add(opType);
continue;
}

boolean argumentFound = false;
Expand All @@ -120,7 +125,7 @@ private Object[] generateParams(BinaryRecordData after, long epochTime) {
if (column.getName().equals(originalColumnName)) {
params.add(
DataTypeConverter.convertToOriginal(
fieldGetters[i].getFieldOrNull(after), column.getType()));
fieldGetters[i].getFieldOrNull(record), column.getType()));
argumentFound = true;
break;
}
Expand Down Expand Up @@ -158,20 +163,14 @@ private TransformExpressionKey generateTransformExpressionKey() {
}

for (String originalColumnName : originalColumnNames) {
switch (originalColumnName) {
case TransformParser.DEFAULT_NAMESPACE_NAME:
argumentNames.add(TransformParser.DEFAULT_NAMESPACE_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_SCHEMA_NAME:
argumentNames.add(TransformParser.DEFAULT_SCHEMA_NAME);
paramTypes.add(String.class);
break;
case TransformParser.DEFAULT_TABLE_NAME:
argumentNames.add(TransformParser.DEFAULT_TABLE_NAME);
paramTypes.add(String.class);
break;
}
METADATA_COLUMNS.stream()
.filter(col -> col.f0.equals(originalColumnName))
.findFirst()
.ifPresent(
col -> {
argumentNames.add(col.f0);
paramTypes.add(col.f2);
});
}

argumentNames.add(JaninoCompiler.DEFAULT_TIME_ZONE);
Expand Down
Loading
Loading