From 1388cf99061b36f398aaf593a634a12e7b784c7f Mon Sep 17 00:00:00 2001 From: Wink <32723967+aiwenmo@users.noreply.github.com> Date: Wed, 31 Jul 2024 23:32:39 +0800 Subject: [PATCH] [FLINK-34877][cdc] Support type cast conversion in pipeline transform This closes #3357. --- .../functions/SystemFunctionUtils.java | 83 +++ .../cdc/runtime/parser/JaninoCompiler.java | 66 ++ .../data/writer/AbstractBinaryWriter.java | 2 +- .../transform/TransformDataOperatorTest.java | 663 ++++++++++++++++++ .../runtime/parser/TransformParserTest.java | 28 + 5 files changed, 841 insertions(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java index f3dae8c1a3..34c299567a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import java.math.BigDecimal; +import java.math.MathContext; import java.math.RoundingMode; import java.text.ParseException; import java.text.SimpleDateFormat; @@ -510,4 +511,86 @@ public static Object coalesce(Object... objects) { } return null; } + + public static String castToString(Object object) { + if (object == null) { + return null; + } + return object.toString(); + } + + public static Byte castToByte(Object object) { + if (object == null) { + return null; + } + return Byte.valueOf(castObjectIntoString(object)); + } + + public static Boolean castToBoolean(Object object) { + if (object == null) { + return null; + } + if (object instanceof Byte + || object instanceof Short + || object instanceof Integer + || object instanceof Long + || object instanceof Float + || object instanceof Double + || object instanceof BigDecimal) { + return !object.equals(0); + } + return Boolean.valueOf(castToString(object)); + } + + public static Short castToShort(Object object) { + if (object == null) { + return null; + } + return Short.valueOf(castObjectIntoString(object)); + } + + public static Integer castToInteger(Object object) { + if (object == null) { + return null; + } + return Integer.valueOf(castObjectIntoString(object)); + } + + public static Long castToLong(Object object) { + if (object == null) { + return null; + } + return Long.valueOf(castObjectIntoString(object)); + } + + public static Float castToFloat(Object object) { + if (object == null) { + return null; + } + return Float.valueOf(castObjectIntoString(object)); + } + + public static Double castToDouble(Object object) { + if (object == null) { + return null; + } + return Double.valueOf(castObjectIntoString(object)); + } + + public static BigDecimal castToBigDecimal(Object object, int precision, int scale) { + if (object == null) { + return null; + } + BigDecimal bigDecimal = + new BigDecimal(castObjectIntoString(object), new MathContext(precision)); + bigDecimal = bigDecimal.setScale(scale, BigDecimal.ROUND_HALF_UP); + return bigDecimal; + } + + private static String castObjectIntoString(Object object) { + if (object instanceof Boolean) { + return Boolean.valueOf(castToString(object)) ? "1" : "0"; + } + return String.valueOf(object); + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 5af9755edf..2dd1b8402e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -22,7 +22,9 @@ import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.calcite.sql.SqlBasicCall; +import org.apache.calcite.sql.SqlBasicTypeNameSpec; import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; @@ -228,6 +230,8 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue( case LESS_THAN_OR_EQUAL: case GREATER_THAN_OR_EQUAL: return generateBinaryOperation(sqlBasicCall, atoms, sqlBasicCall.getKind().sql); + case CAST: + return generateCastOperation(sqlBasicCall, atoms); case OTHER: return generateOtherOperation(sqlBasicCall, atoms); default: @@ -256,6 +260,16 @@ private static Java.Rvalue generateEqualsOperation( Location.NOWHERE, null, StringUtils.convertToCamelCase("VALUE_EQUALS"), atoms); } + private static Java.Rvalue generateCastOperation( + SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { + if (atoms.length != 1) { + throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString()); + } + List operandList = sqlBasicCall.getOperandList(); + SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1); + return generateTypeConvertMethod(sqlDataTypeSpec, atoms); + } + private static Java.Rvalue generateOtherOperation( SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) { if (sqlBasicCall.getOperator().getName().equals("||")) { @@ -298,4 +312,56 @@ private static Java.Rvalue generateNoOperandTimestampFunctionOperation(String op StringUtils.convertToCamelCase(operationName), timestampFunctionParam.toArray(new Java.Rvalue[0])); } + + private static Java.Rvalue generateTypeConvertMethod( + SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) { + switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) { + case "BOOLEAN": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToBoolean", atoms); + case "TINYINT": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToByte", atoms); + case "SMALLINT": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToShort", atoms); + case "INTEGER": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToInteger", atoms); + case "BIGINT": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToLong", atoms); + case "FLOAT": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToFloat", atoms); + case "DOUBLE": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToDouble", atoms); + case "DECIMAL": + int precision = 10; + int scale = 0; + if (sqlDataTypeSpec.getTypeNameSpec() instanceof SqlBasicTypeNameSpec) { + SqlBasicTypeNameSpec typeNameSpec = + (SqlBasicTypeNameSpec) sqlDataTypeSpec.getTypeNameSpec(); + if (typeNameSpec.getPrecision() > -1) { + precision = typeNameSpec.getPrecision(); + } + if (typeNameSpec.getScale() > -1) { + scale = typeNameSpec.getScale(); + } + } + List newAtoms = new ArrayList<>(Arrays.asList(atoms)); + newAtoms.add( + new Java.AmbiguousName( + Location.NOWHERE, new String[] {String.valueOf(precision)})); + newAtoms.add( + new Java.AmbiguousName( + Location.NOWHERE, new String[] {String.valueOf(scale)})); + return new Java.MethodInvocation( + Location.NOWHERE, + null, + "castToBigDecimal", + newAtoms.toArray(new Java.Rvalue[0])); + case "CHAR": + case "VARCHAR": + case "STRING": + return new Java.MethodInvocation(Location.NOWHERE, null, "castToString", atoms); + default: + throw new ParseException( + "Unsupported data type cast: " + sqlDataTypeSpec.toString()); + } + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java index acf3dac62e..1422935ea9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/AbstractBinaryWriter.java @@ -138,7 +138,7 @@ public void writeBinary(int pos, byte[] bytes) { @Override public void writeDecimal(int pos, DecimalData value, int precision) { - assert value == null || (value.precision() == precision); + assert value == null || (value.precision() <= precision); if (DecimalData.isCompact(precision)) { assert value != null; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java index 09d12e8230..2213127a4a 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperatorTest.java @@ -115,6 +115,43 @@ public class TransformDataOperatorTest { .primaryKey("col1") .build(); + private static final TableId NULL_TABLEID = + TableId.tableId("my_company", "my_branch", "data_null"); + private static final Schema NULL_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("colString", DataTypes.STRING()) + .physicalColumn("nullInt", DataTypes.INT()) + .physicalColumn("nullBoolean", DataTypes.BOOLEAN()) + .physicalColumn("nullTinyint", DataTypes.TINYINT()) + .physicalColumn("nullSmallint", DataTypes.SMALLINT()) + .physicalColumn("nullBigint", DataTypes.BIGINT()) + .physicalColumn("nullFloat", DataTypes.FLOAT()) + .physicalColumn("nullDouble", DataTypes.DOUBLE()) + .physicalColumn("nullChar", DataTypes.CHAR(1)) + .physicalColumn("nullVarchar", DataTypes.VARCHAR(1)) + .physicalColumn("nullDecimal", DataTypes.DECIMAL(4, 2)) + .primaryKey("col1") + .build(); + + private static final TableId CAST_TABLEID = + TableId.tableId("my_company", "my_branch", "data_cast"); + private static final Schema CAST_SCHEMA = + Schema.newBuilder() + .physicalColumn("col1", DataTypes.STRING()) + .physicalColumn("castInt", DataTypes.INT()) + .physicalColumn("castBoolean", DataTypes.BOOLEAN()) + .physicalColumn("castTinyint", DataTypes.TINYINT()) + .physicalColumn("castSmallint", DataTypes.SMALLINT()) + .physicalColumn("castBigint", DataTypes.BIGINT()) + .physicalColumn("castFloat", DataTypes.FLOAT()) + .physicalColumn("castDouble", DataTypes.DOUBLE()) + .physicalColumn("castChar", DataTypes.CHAR(1)) + .physicalColumn("castVarchar", DataTypes.VARCHAR(1)) + .physicalColumn("castDecimal", DataTypes.DECIMAL(4, 2)) + .primaryKey("col1") + .build(); + private static final TableId CONDITION_TABLEID = TableId.tableId("my_company", "my_branch", "condition_table"); private static final Schema CONDITION_SCHEMA = @@ -555,6 +592,612 @@ void testTimestampDiffTransform() throws Exception { .isEqualTo(new StreamRecord<>(insertEventExpect)); } + @Test + void testNullCastTransform() throws Exception { + TransformDataOperator transform = + TransformDataOperator.newBuilder() + .addTransform( + NULL_TABLEID.identifier(), + "col1" + + ",colString" + + ",cast(colString as int) as nullInt" + + ",cast(colString as boolean) as nullBoolean" + + ",cast(colString as tinyint) as nullTinyint" + + ",cast(colString as smallint) as nullSmallint" + + ",cast(colString as bigint) as nullBigint" + + ",cast(colString as float) as nullFloat" + + ",cast(colString as double) as nullDouble" + + ",cast(colString as char) as nullChar" + + ",cast(colString as varchar) as nullVarchar" + + ",cast(colString as DECIMAL(4,2)) as nullDecimal", + null) + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = new CreateTableEvent(NULL_TABLEID, NULL_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) NULL_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent = + DataChangeEvent.insertEvent( + NULL_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(new CreateTableEvent(NULL_TABLEID, NULL_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEvent)); + } + + @Test + void testCastTransform() throws Exception { + TransformDataOperator transform = + TransformDataOperator.newBuilder() + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",cast(col1 as int) as castInt" + + ",cast(col1 as boolean) as castBoolean" + + ",cast(col1 as tinyint) as castTinyint" + + ",cast(col1 as smallint) as castSmallint" + + ",cast(col1 as bigint) as castBigint" + + ",cast(col1 as float) as castFloat" + + ",cast(col1 as double) as castDouble" + + ",cast(col1 as char) as castChar" + + ",cast(col1 as varchar) as castVarchar" + + ",cast(col1 as DECIMAL(4,2)) as castDecimal", + "col1 = '1'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",cast(castInt as int) as castInt" + + ",cast(castInt as boolean) as castBoolean" + + ",cast(castInt as tinyint) as castTinyint" + + ",cast(castInt as smallint) as castSmallint" + + ",cast(castInt as bigint) as castBigint" + + ",cast(castInt as float) as castFloat" + + ",cast(castInt as double) as castDouble" + + ",cast(castInt as char) as castChar" + + ",cast(castInt as varchar) as castVarchar" + + ",cast(castInt as DECIMAL(4,2)) as castDecimal", + "col1 = '2'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",cast(castBoolean as int) as castInt" + + ",cast(castBoolean as boolean) as castBoolean" + + ",cast(castBoolean as tinyint) as castTinyint" + + ",cast(castBoolean as smallint) as castSmallint" + + ",cast(castBoolean as bigint) as castBigint" + + ",cast(castBoolean as float) as castFloat" + + ",cast(castBoolean as double) as castDouble" + + ",cast(castBoolean as char) as castChar" + + ",cast(castBoolean as varchar) as castVarchar" + + ",cast(castBoolean as DECIMAL(4,2)) as castDecimal", + "col1 = '3'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",cast(castTinyint as int) as castInt" + + ",cast(castTinyint as boolean) as castBoolean" + + ",cast(castTinyint as tinyint) as castTinyint" + + ",cast(castTinyint as smallint) as castSmallint" + + ",cast(castTinyint as bigint) as castBigint" + + ",cast(castTinyint as float) as castFloat" + + ",cast(castTinyint as double) as castDouble" + + ",cast(castTinyint as char) as castChar" + + ",cast(castTinyint as varchar) as castVarchar" + + ",cast(castTinyint as DECIMAL(4,2)) as castDecimal", + "col1 = '4'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",cast(castSmallint as int) as castInt" + + ",cast(castSmallint as boolean) as castBoolean" + + ",cast(castSmallint as tinyint) as castTinyint" + + ",cast(castSmallint as smallint) as castSmallint" + + ",cast(castSmallint as bigint) as castBigint" + + ",cast(castSmallint as float) as castFloat" + + ",cast(castSmallint as double) as castDouble" + + ",cast(castSmallint as char) as castChar" + + ",cast(castSmallint as varchar) as castVarchar" + + ",cast(castSmallint as DECIMAL(4,2)) as castDecimal", + "col1 = '5'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",cast(castBigint as int) as castInt" + + ",cast(castBigint as boolean) as castBoolean" + + ",cast(castBigint as tinyint) as castTinyint" + + ",cast(castBigint as smallint) as castSmallint" + + ",cast(castBigint as bigint) as castBigint" + + ",cast(castBigint as float) as castFloat" + + ",cast(castBigint as double) as castDouble" + + ",cast(castBigint as char) as castChar" + + ",cast(castBigint as varchar) as castVarchar" + + ",cast(castBigint as DECIMAL(4,2)) as castDecimal", + "col1 = '6'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",castInt" + + ",cast(castFloat as boolean) as castBoolean" + + ",castTinyint" + + ",castSmallint" + + ",castBigint" + + ",cast(castFloat as float) as castFloat" + + ",cast(castFloat as double) as castDouble" + + ",cast(castFloat as char) as castChar" + + ",cast(castFloat as varchar) as castVarchar" + + ",cast(castFloat as DECIMAL(4,2)) as castDecimal", + "col1 = '7'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",castInt" + + ",cast(castDouble as boolean) as castBoolean" + + ",castTinyint" + + ",castSmallint" + + ",castBigint" + + ",cast(castDouble as float) as castFloat" + + ",cast(castDouble as double) as castDouble" + + ",cast(castDouble as char) as castChar" + + ",cast(castDouble as varchar) as castVarchar" + + ",cast(castDouble as DECIMAL(4,2)) as castDecimal", + "col1 = '8'") + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",castInt" + + ",cast(castDecimal as boolean) as castBoolean" + + ",castTinyint" + + ",castSmallint" + + ",castBigint" + + ",cast(castDecimal as float) as castFloat" + + ",cast(castDecimal as double) as castDouble" + + ",cast(castDecimal as char) as castChar" + + ",cast(castDecimal as varchar) as castVarchar" + + ",cast(castDecimal as DECIMAL(4,2)) as castDecimal", + "col1 = '9'") + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = new CreateTableEvent(CAST_TABLEID, CAST_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) CAST_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + })); + DataChangeEvent insertEventExpect1 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + new Integer(1), + new Boolean(false), + new Byte("1"), + new Short("1"), + new Long(1), + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1"), + new BinaryStringData("1"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(new CreateTableEvent(CAST_TABLEID, CAST_SCHEMA))); + transform.processElement(new StreamRecord<>(insertEvent1)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect1)); + DataChangeEvent insertEvent2 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new Integer(1), + null, + null, + null, + null, + null, + null, + null, + null, + null, + })); + DataChangeEvent insertEventExpect2 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("2"), + new Integer(1), + new Boolean(true), + new Byte("1"), + new Short("1"), + new Long(1), + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1"), + new BinaryStringData("1"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent2)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect2)); + DataChangeEvent insertEvent3 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + null, + new Boolean(true), + null, + null, + null, + null, + null, + null, + null, + null, + })); + DataChangeEvent insertEventExpect3 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("3"), + new Integer(1), + new Boolean(true), + new Byte("1"), + new Short("1"), + new Long(1), + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("true"), + new BinaryStringData("true"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent3)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect3)); + DataChangeEvent insertEvent4 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + null, + null, + new Byte("1"), + null, + null, + null, + null, + null, + null, + null, + })); + DataChangeEvent insertEventExpect4 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("4"), + new Integer(1), + new Boolean(true), + new Byte("1"), + new Short("1"), + new Long(1), + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1"), + new BinaryStringData("1"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent4)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect4)); + DataChangeEvent insertEvent5 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("5"), + null, + null, + null, + new Short("1"), + null, + null, + null, + null, + null, + null, + })); + DataChangeEvent insertEventExpect5 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("5"), + new Integer(1), + new Boolean(true), + new Byte("1"), + new Short("1"), + new Long(1), + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1"), + new BinaryStringData("1"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent5)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect5)); + DataChangeEvent insertEvent6 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("6"), + null, + null, + null, + null, + new Long(1), + null, + null, + null, + null, + null, + })); + DataChangeEvent insertEventExpect6 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("6"), + new Integer(1), + new Boolean(true), + new Byte("1"), + new Short("1"), + new Long(1), + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1"), + new BinaryStringData("1"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent6)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect6)); + DataChangeEvent insertEvent7 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("7"), + null, + null, + null, + null, + null, + new Float(1.0f), + null, + null, + null, + null, + })); + DataChangeEvent insertEventExpect7 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("7"), + null, + new Boolean(true), + null, + null, + null, + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1.0"), + new BinaryStringData("1.0"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent7)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect7)); + DataChangeEvent insertEvent8 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("8"), + null, + null, + null, + null, + null, + null, + new Double(1.0d), + null, + null, + null, + })); + DataChangeEvent insertEventExpect8 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("8"), + null, + new Boolean(true), + null, + null, + null, + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1.0"), + new BinaryStringData("1.0"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent8)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect8)); + DataChangeEvent insertEvent9 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("9"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + DataChangeEvent insertEventExpect9 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("9"), + null, + new Boolean(true), + null, + null, + null, + new Float(1.0f), + new Double(1.0d), + new BinaryStringData("1.00"), + new BinaryStringData("1.00"), + DecimalData.fromBigDecimal(new BigDecimal(1.0), 4, 2), + })); + transform.processElement(new StreamRecord<>(insertEvent9)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(insertEventExpect9)); + } + + @Test + void testCastErrorTransform() throws Exception { + TransformDataOperator transform = + TransformDataOperator.newBuilder() + .addTransform( + CAST_TABLEID.identifier(), + "col1" + + ",cast(castFloat as int) as castInt" + + ",cast(castFloat as boolean) as castBoolean" + + ",cast(castFloat as tinyint) as castTinyint" + + ",cast(castFloat as smallint) as castSmallint" + + ",cast(castFloat as bigint) as castBigint" + + ",cast(castFloat as float) as castFloat" + + ",cast(castFloat as double) as castDouble" + + ",cast(castFloat as char) as castChar" + + ",cast(castFloat as varchar) as castVarchar" + + ",cast(castFloat as DECIMAL(4,2)) as castDecimal", + "col1 = '1'") + .build(); + EventOperatorTestHarness + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = new CreateTableEvent(CAST_TABLEID, CAST_SCHEMA); + BinaryRecordDataGenerator recordDataGenerator = + new BinaryRecordDataGenerator(((RowType) CAST_SCHEMA.toRowDataType())); + // Insert + DataChangeEvent insertEvent1 = + DataChangeEvent.insertEvent( + CAST_TABLEID, + recordDataGenerator.generate( + new Object[] { + new BinaryStringData("1"), + null, + null, + null, + null, + null, + new Float(1.0f), + null, + null, + null, + null, + })); + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo(new StreamRecord<>(new CreateTableEvent(CAST_TABLEID, CAST_SCHEMA))); + Assertions.assertThatThrownBy( + () -> { + transform.processElement(new StreamRecord<>(insertEvent1)); + }) + .isExactlyInstanceOf(RuntimeException.class) + .hasRootCauseInstanceOf(NumberFormatException.class) + .hasRootCauseMessage("For input string: \"1.0\""); + } + @Test void testBuildInFunctionTransform() throws Exception { testExpressionConditionTransform( @@ -592,6 +1235,26 @@ void testBuildInFunctionTransform() throws Exception { "case 1 when 1 then 'a' when 2 then 'b' else 'c' end = 'a'"); testExpressionConditionTransform("case col1 when '1' then true else false end"); testExpressionConditionTransform("case when col1 = '1' then true else false end"); + testExpressionConditionTransform("cast(col1 as int) = 1"); + testExpressionConditionTransform("cast('true' as boolean)"); + testExpressionConditionTransform("cast(col1 as tinyint) = cast(1 as tinyint)"); + testExpressionConditionTransform("cast(col1 as smallint) = cast(1 as smallint)"); + testExpressionConditionTransform("cast(col1 as bigint) = cast(1 as bigint)"); + testExpressionConditionTransform("cast(col1 as float) = cast(1 as float)"); + testExpressionConditionTransform("cast(col1 as double) = cast(1 as double)"); + testExpressionConditionTransform("cast('1' as char) = '1'"); + testExpressionConditionTransform("cast(col1 as varchar) = '1'"); + testExpressionConditionTransform("cast(col1 as DECIMAL(4,2)) = cast(1.0 as DECIMAL(4,2))"); + testExpressionConditionTransform("cast(null as int) is null"); + testExpressionConditionTransform("cast(null as boolean) is null"); + testExpressionConditionTransform("cast(null as tinyint) is null"); + testExpressionConditionTransform("cast(null as smallint) is null"); + testExpressionConditionTransform("cast(null as bigint) is null"); + testExpressionConditionTransform("cast(null as float) is null"); + testExpressionConditionTransform("cast(null as double) is null"); + testExpressionConditionTransform("cast(null as char) is null"); + testExpressionConditionTransform("cast(null as varchar) is null"); + testExpressionConditionTransform("cast(null as DECIMAL(4,2)) is null"); } private void testExpressionConditionTransform(String expression) throws Exception { diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 9dffeb84a1..f81c4a92c2 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -264,6 +264,34 @@ public void testTranslateFilterToJaninoExpression() { testFilterExpression( "case when id = 1 then 'a' when id = 2 then 'b' else 'c' end", "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : \"c\")"); + testFilterExpression( + "case id when 1 then 'a' when 2 then 'b' else 'c' end", + "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : \"c\")"); + testFilterExpression( + "case when id = 1 then 'a' when id = 2 then 'b' else 'c' end", + "(valueEquals(id, 1) ? \"a\" : valueEquals(id, 2) ? \"b\" : \"c\")"); + testFilterExpression("cast(id||'0' as int)", "castToInteger(concat(id, \"0\"))"); + testFilterExpression("cast(1 as string)", "castToString(1)"); + testFilterExpression("cast(1 as boolean)", "castToBoolean(1)"); + testFilterExpression("cast(1 as tinyint)", "castToByte(1)"); + testFilterExpression("cast(1 as smallint)", "castToShort(1)"); + testFilterExpression("cast(1 as bigint)", "castToLong(1)"); + testFilterExpression("cast(1 as float)", "castToFloat(1)"); + testFilterExpression("cast(1 as double)", "castToDouble(1)"); + testFilterExpression("cast(1 as decimal)", "castToBigDecimal(1, 10, 0)"); + testFilterExpression("cast(1 as char)", "castToString(1)"); + testFilterExpression("cast(1 as varchar)", "castToString(1)"); + testFilterExpression("cast(null as int)", "castToInteger(null)"); + testFilterExpression("cast(null as string)", "castToString(null)"); + testFilterExpression("cast(null as boolean)", "castToBoolean(null)"); + testFilterExpression("cast(null as tinyint)", "castToByte(null)"); + testFilterExpression("cast(null as smallint)", "castToShort(null)"); + testFilterExpression("cast(null as bigint)", "castToLong(null)"); + testFilterExpression("cast(null as float)", "castToFloat(null)"); + testFilterExpression("cast(null as double)", "castToDouble(null)"); + testFilterExpression("cast(null as decimal)", "castToBigDecimal(null, 10, 0)"); + testFilterExpression("cast(null as char)", "castToString(null)"); + testFilterExpression("cast(null as varchar)", "castToString(null)"); } private void testFilterExpression(String expression, String expressionExpect) {