Skip to content

Commit

Permalink
Move transformSchemaChangeEvent to SchemaUtils
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 22, 2024
1 parent f328961 commit d6f3a9a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -348,4 +350,73 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche
});
return oldSchema.copy(columns);
}

public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) {
Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty();
if (event instanceof AddColumnEvent) {
// Send add column events to downstream iff there's an asterisk
if (hasAsterisk) {
List<AddColumnEvent.ColumnWithPosition> addedColumns =
((AddColumnEvent) event)
.getAddedColumns().stream()
.map(
e -> {
if (AddColumnEvent.ColumnPosition.LAST.equals(
e.getPosition())) {
return new AddColumnEvent
.ColumnWithPosition(
e.getAddColumn(),
AddColumnEvent.ColumnPosition.AFTER,
referencedColumns.get(
referencedColumns.size()
- 1));
} else if (AddColumnEvent.ColumnPosition.FIRST
.equals(e.getPosition())) {
return new AddColumnEvent
.ColumnWithPosition(
e.getAddColumn(),
AddColumnEvent.ColumnPosition
.BEFORE,
referencedColumns.get(0));
} else {
return e;
}
})
.collect(Collectors.toList());
evolvedSchemaChangeEvent =
Optional.of(new AddColumnEvent(event.tableId(), addedColumns));
}
} else if (event instanceof AlterColumnTypeEvent) {
AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) event;
if (hasAsterisk) {
// In wildcard mode, all alter column type events should be sent to downstream
evolvedSchemaChangeEvent = Optional.of(event);
} else {
// Or, we need to filter out those referenced columns and reconstruct
// SchemaChangeEvents
Map<String, DataType> newDataTypeMap =
alterColumnTypeEvent.getTypeMapping().entrySet().stream()
.filter(e -> referencedColumns.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!newDataTypeMap.isEmpty()) {
evolvedSchemaChangeEvent =
Optional.of(
new AlterColumnTypeEvent(
alterColumnTypeEvent.tableId(), newDataTypeMap));
}
}
} else if (event instanceof RenameColumnEvent) {
if (hasAsterisk) {
evolvedSchemaChangeEvent = Optional.of(event);
}
} else if (event instanceof DropColumnEvent) {
if (hasAsterisk) {
evolvedSchemaChangeEvent = Optional.of(event);
}
} else {
evolvedSchemaChangeEvent = Optional.of(event);
}
return evolvedSchemaChangeEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws
if (event instanceof CreateTableEvent) {
return Optional.of(new CreateTableEvent(tableId, projectedSchema));
} else {
return TransformParser.transformSchemaChangeEvent(
return SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();
Optional<SchemaChangeEvent> schemaChangeEvent =
TransformParser.transformSchemaChangeEvent(
SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event);
if (schemaChangeEvent.isPresent()) {
preTransformedSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@
package org.apache.flink.cdc.runtime.parser;

import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
Expand Down Expand Up @@ -612,73 +607,4 @@ private static boolean hasAsterisk(SqlNode sqlNode) {
return false;
}
}

public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) {
Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty();
if (event instanceof AddColumnEvent) {
// Send add column events to downstream iff there's an asterisk
if (hasAsterisk) {
List<AddColumnEvent.ColumnWithPosition> addedColumns =
((AddColumnEvent) event)
.getAddedColumns().stream()
.map(
e -> {
if (AddColumnEvent.ColumnPosition.LAST.equals(
e.getPosition())) {
return new AddColumnEvent
.ColumnWithPosition(
e.getAddColumn(),
AddColumnEvent.ColumnPosition.AFTER,
referencedColumns.get(
referencedColumns.size()
- 1));
} else if (AddColumnEvent.ColumnPosition.FIRST
.equals(e.getPosition())) {
return new AddColumnEvent
.ColumnWithPosition(
e.getAddColumn(),
AddColumnEvent.ColumnPosition
.BEFORE,
referencedColumns.get(0));
} else {
return e;
}
})
.collect(Collectors.toList());
evolvedSchemaChangeEvent =
Optional.of(new AddColumnEvent(event.tableId(), addedColumns));
}
} else if (event instanceof AlterColumnTypeEvent) {
AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) event;
if (hasAsterisk) {
// In wildcard mode, all alter column type events should be sent to downstream
evolvedSchemaChangeEvent = Optional.of(event);
} else {
// Or, we need to filter out those referenced columns and reconstruct
// SchemaChangeEvents
Map<String, DataType> newDataTypeMap =
alterColumnTypeEvent.getTypeMapping().entrySet().stream()
.filter(e -> referencedColumns.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!newDataTypeMap.isEmpty()) {
evolvedSchemaChangeEvent =
Optional.of(
new AlterColumnTypeEvent(
alterColumnTypeEvent.tableId(), newDataTypeMap));
}
}
} else if (event instanceof RenameColumnEvent) {
if (hasAsterisk) {
evolvedSchemaChangeEvent = Optional.of(event);
}
} else if (event instanceof DropColumnEvent) {
if (hasAsterisk) {
evolvedSchemaChangeEvent = Optional.of(event);
}
} else {
evolvedSchemaChangeEvent = Optional.of(event);
}
return evolvedSchemaChangeEvent;
}
}

0 comments on commit d6f3a9a

Please sign in to comment.