From 4b5a9f364dcd287ff8757ce497ea819838dd8e3a Mon Sep 17 00:00:00 2001
From: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
Date: Fri, 18 Oct 2024 13:36:16 +0800
Subject: [PATCH] Code clean-up & add comments
Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
---
.../flink/cdc/common/utils/SchemaUtils.java | 19 +++++++++
.../transform/PreTransformOperator.java | 41 ++++---------------
2 files changed, 28 insertions(+), 32 deletions(-)
diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index feb5121bc0..d3073cee1b 100644
--- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -351,6 +351,25 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche
return oldSchema.copy(columns);
}
+ /**
+ * This function determines if the given schema change event {@code event} should be sent to
+ * downstream based on if the given transform rule has asterisk, and what columns are
+ * referenced.
+ *
+ *
For example, if {@code hasAsterisk} is false, then all {@code AddColumnEvent} and {@code
+ * DropColumnEvent} should be ignored since asterisk-less transform should not emit schema
+ * change events that change number of downstream columns.
+ *
+ *
Also, {@code referencedColumns} will be used to determine if the schema change event
+ * affects any referenced columns, since if a column has been projected out of downstream, its
+ * corresponding schema change events should not be emitted, either.
+ *
+ *
For the case when {@code hasAsterisk} is true, things will be cleaner since we don't have
+ * to filter out any schema change events. All we need to do is to change {@code
+ * AddColumnEvent}'s inserting position, and replacing `FIRST` / `LAST` with column-relative
+ * position indicators. This is necessary since extra calculated columns might be added, and
+ * `FIRST` / `LAST` position might differ.
+ */
public static Optional transformSchemaChangeEvent(
boolean hasAsterisk, List referencedColumns, SchemaChangeEvent event) {
Optional evolvedSchemaChangeEvent = Optional.empty();
diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 9f80c89b22..dd11a86aed 100644
--- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -29,7 +29,6 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
-import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -52,7 +51,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -73,7 +71,6 @@ public class PreTransformOperator extends AbstractStreamOperator
private List udfDescriptors;
private Map preTransformProcessorMap;
private Map hasAsteriskMap;
- private Map> referencedColumnsMap;
public static PreTransformOperator.Builder newBuilder() {
return new PreTransformOperator.Builder();
@@ -165,7 +162,6 @@ public void setup(
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
- this.referencedColumnsMap = new ConcurrentHashMap<>();
}
@Override
@@ -188,8 +184,7 @@ public void initializeState(StateInitializationContext context) throws Exception
new CreateTableEvent(
stateTableChangeInfo.getTableId(),
stateTableChangeInfo.getPreTransformedSchema());
- // hasAsteriskMap and referencedColumnsMap needs to be recalculated after restoring
- // from a checkpoint.
+ // hasAsteriskMap needs to be recalculated after restoring from a checkpoint.
cacheTransformRuleInfo(restoredCreateTableEvent);
// Since PostTransformOperator doesn't preserve state, pre-transformed schema
@@ -265,7 +260,6 @@ private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
private Optional cacheChangeSchema(SchemaChangeEvent event) {
TableId tableId = event.tableId();
PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId);
- List columnNamesBeforeChange = tableChangeInfo.getSourceSchema().getColumnNames();
Schema originalSchema =
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
@@ -279,11 +273,17 @@ private Optional cacheChangeSchema(SchemaChangeEvent event) {
// this TableId has not been captured by any transform rules, and should be regarded as
// asterisk-ful by default.
schemaChangeEvent =
- SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
+ SchemaUtils.transformSchemaChangeEvent(
+ true, tableChangeInfo.getSourceSchema().getColumnNames(), event);
} else {
+ // Otherwise, we will use the pre-transformed columns to determine if the given schema
+ // change event should be passed to downstream, only when it is presented in the
+ // pre-transformed schema.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
- false, referencedColumnsMap.get(tableId), event);
+ false,
+ tableChangeInfo.getPreTransformedSchema().getColumnNames(),
+ event);
}
if (schemaChangeEvent.isPresent()) {
preTransformedSchema =
@@ -297,26 +297,8 @@ private Optional cacheChangeSchema(SchemaChangeEvent event) {
private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
TableId tableId = createTableEvent.tableId();
- Set referencedColumnsSet =
- transforms.stream()
- .filter(t -> t.getSelectors().isMatch(tableId))
- .flatMap(
- rule ->
- TransformParser.generateReferencedColumns(
- rule.getProjection()
- .map(TransformProjection::getProjection)
- .orElse(null),
- rule.getFilter()
- .map(TransformFilter::getExpression)
- .orElse(null),
- createTableEvent.getSchema().getColumns())
- .stream())
- .map(Column::getName)
- .collect(Collectors.toSet());
-
boolean notTransformed =
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));
-
if (notTransformed) {
// If this TableId isn't presented in any transform block, it should behave like a "*"
// projection and should be regarded as asterisk-ful.
@@ -334,11 +316,6 @@ private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
}
- referencedColumnsMap.put(
- createTableEvent.tableId(),
- createTableEvent.getSchema().getColumnNames().stream()
- .filter(referencedColumnsSet::contains)
- .collect(Collectors.toList()));
}
private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {