Skip to content

Commit

Permalink
[FLINK-36474] Support merging timestamp columns when routing
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Oct 25, 2024
1 parent f24399c commit 4af2359
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -176,6 +179,24 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
if (lType.equals(rType)) {
// identical type
mergedType = rType;
} else if (lType instanceof TimestampType && rType instanceof TimestampType) {
return DataTypes.TIMESTAMP(
Math.max(
((TimestampType) lType).getPrecision(),
((TimestampType) rType).getPrecision()));
} else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) {
return DataTypes.TIMESTAMP_TZ(
Math.max(
((ZonedTimestampType) lType).getPrecision(),
((ZonedTimestampType) rType).getPrecision()));
} else if (lType instanceof LocalZonedTimestampType
&& rType instanceof LocalZonedTimestampType) {
return DataTypes.TIMESTAMP_LTZ(
Math.max(
((LocalZonedTimestampType) lType).getPrecision(),
((LocalZonedTimestampType) rType).getPrecision()));
} else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) {
return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
Expand All @@ -185,7 +206,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
} else if (lType instanceof DecimalType && rType instanceof DecimalType) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
Expand All @@ -195,7 +216,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
Expand All @@ -204,7 +225,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,35 @@ public void testInferWiderType() {
DataTypes.INT().nullable(), DataTypes.INT().nullable()))
.isEqualTo(DataTypes.INT().nullable());

// Test merging temporal types
Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(9), DataTypes.TIMESTAMP(6)))
.isEqualTo(DataTypes.TIMESTAMP(9));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(7)))
.isEqualTo(DataTypes.TIMESTAMP_TZ(7));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(2), DataTypes.TIMESTAMP_LTZ(1)))
.isEqualTo(DataTypes.TIMESTAMP_LTZ(2));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP()))
.isEqualTo(DataTypes.TIMESTAMP(9));

Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP()))
.isEqualTo(DataTypes.TIMESTAMP(9));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_TZ()))
.isEqualTo(DataTypes.TIMESTAMP(9));

// incompatible type merges test
Assertions.assertThatThrownBy(
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
Expand Down Expand Up @@ -88,17 +89,19 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking

@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
Configuration pipelineDefConfig = pipelineDef.getConfig();

int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

SchemaChangeBehavior schemaChangeBehavior =
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
pipelineDef.getSource(), env, pipelineDefConfig, parallelism);

// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
Expand All @@ -110,10 +113,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
schemaChangeBehavior,
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDef
.getConfig()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());

Expand All @@ -122,13 +124,13 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
transformTranslator.translatePostTransform(
stream,
pipelineDef.getTransforms(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDef.getUdfs());

// Build DataSink in advance as schema operator requires MetadataApplier
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);

stream =
schemaOperatorTranslator.translate(
Expand Down Expand Up @@ -157,7 +159,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
addFrameworkJars();

return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), isBlocking);
}

private void addFrameworkJars() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand All @@ -39,24 +38,27 @@
public class SchemaOperatorTranslator {
private final SchemaChangeBehavior schemaChangeBehavior;
private final String schemaOperatorUid;

private final Duration rpcTimeOut;
private final String timezone;

public SchemaOperatorTranslator(
SchemaChangeBehavior schemaChangeBehavior,
String schemaOperatorUid,
Duration rpcTimeOut) {
Duration rpcTimeOut,
String timezone) {
this.schemaChangeBehavior = schemaChangeBehavior;
this.schemaOperatorUid = schemaOperatorUid;
this.rpcTimeOut = rpcTimeOut;
this.timezone = timezone;
}

public DataStream<Event> translate(
DataStream<Event> input,
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
return addSchemaOperator(input, parallelism, metadataApplier, routes, schemaChangeBehavior);
return addSchemaOperator(
input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone);
}

public String getSchemaOperatorUid() {
Expand All @@ -68,7 +70,8 @@ private DataStream<Event> addSchemaOperator(
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes,
SchemaChangeBehavior schemaChangeBehavior) {
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
List<RouteRule> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
routingRules.add(
Expand All @@ -82,27 +85,12 @@ private DataStream<Event> addSchemaOperator(
"SchemaOperator",
new EventTypeInfo(),
new SchemaOperatorFactory(
metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior));
metadataApplier,
routingRules,
rpcTimeOut,
schemaChangeBehavior,
timezone));
stream.uid(schemaOperatorUid).setParallelism(parallelism);
return stream;
}

private DataStream<Event> dropSchemaChangeEvent(DataStream<Event> input, int parallelism) {
return input.filter(event -> !(event instanceof SchemaChangeEvent))
.setParallelism(parallelism);
}

private DataStream<Event> exceptionOnSchemaChange(DataStream<Event> input, int parallelism) {
return input.map(
event -> {
if (event instanceof SchemaChangeEvent) {
throw new RuntimeException(
String.format(
"Aborting execution as the pipeline encountered a schema change event: %s",
event));
}
return event;
})
.setParallelism(parallelism);
}
}
Loading

0 comments on commit 4af2359

Please sign in to comment.