Skip to content

Commit

Permalink
Code clean-up & add comments
Browse files Browse the repository at this point in the history
Signed-off-by: yuxiqian <[email protected]>
  • Loading branch information
yuxiqian committed Oct 25, 2024
1 parent 6e0d03a commit 4b5a9f3
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,25 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche
return oldSchema.copy(columns);
}

/**
* This function determines if the given schema change event {@code event} should be sent to
* downstream based on if the given transform rule has asterisk, and what columns are
* referenced.
*
* <p>For example, if {@code hasAsterisk} is false, then all {@code AddColumnEvent} and {@code
* DropColumnEvent} should be ignored since asterisk-less transform should not emit schema
* change events that change number of downstream columns.
*
* <p>Also, {@code referencedColumns} will be used to determine if the schema change event
* affects any referenced columns, since if a column has been projected out of downstream, its
* corresponding schema change events should not be emitted, either.
*
* <p>For the case when {@code hasAsterisk} is true, things will be cleaner since we don't have
* to filter out any schema change events. All we need to do is to change {@code
* AddColumnEvent}'s inserting position, and replacing `FIRST` / `LAST` with column-relative
* position indicators. This is necessary since extra calculated columns might be added, and
* `FIRST` / `LAST` position might differ.
*/
public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) {
Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
Expand All @@ -52,7 +51,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -73,7 +71,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
private List<UserDefinedFunctionDescriptor> udfDescriptors;
private Map<TableId, PreTransformProcessor> preTransformProcessorMap;
private Map<TableId, Boolean> hasAsteriskMap;
private Map<TableId, List<String>> referencedColumnsMap;

public static PreTransformOperator.Builder newBuilder() {
return new PreTransformOperator.Builder();
Expand Down Expand Up @@ -165,7 +162,6 @@ public void setup(
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
this.referencedColumnsMap = new ConcurrentHashMap<>();
}

@Override
Expand All @@ -188,8 +184,7 @@ public void initializeState(StateInitializationContext context) throws Exception
new CreateTableEvent(
stateTableChangeInfo.getTableId(),
stateTableChangeInfo.getPreTransformedSchema());
// hasAsteriskMap and referencedColumnsMap needs to be recalculated after restoring
// from a checkpoint.
// hasAsteriskMap needs to be recalculated after restoring from a checkpoint.
cacheTransformRuleInfo(restoredCreateTableEvent);

// Since PostTransformOperator doesn't preserve state, pre-transformed schema
Expand Down Expand Up @@ -265,7 +260,6 @@ private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
TableId tableId = event.tableId();
PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId);
List<String> columnNamesBeforeChange = tableChangeInfo.getSourceSchema().getColumnNames();

Schema originalSchema =
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
Expand All @@ -279,11 +273,17 @@ private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
// this TableId has not been captured by any transform rules, and should be regarded as
// asterisk-ful by default.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
SchemaUtils.transformSchemaChangeEvent(
true, tableChangeInfo.getSourceSchema().getColumnNames(), event);
} else {
// Otherwise, we will use the pre-transformed columns to determine if the given schema
// change event should be passed to downstream, only when it is presented in the
// pre-transformed schema.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
false, referencedColumnsMap.get(tableId), event);
false,
tableChangeInfo.getPreTransformedSchema().getColumnNames(),
event);
}
if (schemaChangeEvent.isPresent()) {
preTransformedSchema =
Expand All @@ -297,26 +297,8 @@ private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {

private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
TableId tableId = createTableEvent.tableId();
Set<String> referencedColumnsSet =
transforms.stream()
.filter(t -> t.getSelectors().isMatch(tableId))
.flatMap(
rule ->
TransformParser.generateReferencedColumns(
rule.getProjection()
.map(TransformProjection::getProjection)
.orElse(null),
rule.getFilter()
.map(TransformFilter::getExpression)
.orElse(null),
createTableEvent.getSchema().getColumns())
.stream())
.map(Column::getName)
.collect(Collectors.toSet());

boolean notTransformed =
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));

if (notTransformed) {
// If this TableId isn't presented in any transform block, it should behave like a "*"
// projection and should be regarded as asterisk-ful.
Expand All @@ -334,11 +316,6 @@ private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {

hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
}
referencedColumnsMap.put(
createTableEvent.tableId(),
createTableEvent.getSchema().getColumnNames().stream()
.filter(referencedColumnsSet::contains)
.collect(Collectors.toList()));
}

private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
Expand Down

0 comments on commit 4b5a9f3

Please sign in to comment.