Skip to content

Commit

Permalink
[FLINK-34877][cdc] Support type cast conversion in pipeline transform
Browse files Browse the repository at this point in the history
This closes #3357.
  • Loading branch information
aiwenmo authored Jul 31, 2024
1 parent a39959f commit 1388cf9
Show file tree
Hide file tree
Showing 5 changed files with 841 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -510,4 +511,86 @@ public static Object coalesce(Object... objects) {
}
return null;
}

public static String castToString(Object object) {
if (object == null) {
return null;
}
return object.toString();
}

public static Byte castToByte(Object object) {
if (object == null) {
return null;
}
return Byte.valueOf(castObjectIntoString(object));
}

public static Boolean castToBoolean(Object object) {
if (object == null) {
return null;
}
if (object instanceof Byte
|| object instanceof Short
|| object instanceof Integer
|| object instanceof Long
|| object instanceof Float
|| object instanceof Double
|| object instanceof BigDecimal) {
return !object.equals(0);
}
return Boolean.valueOf(castToString(object));
}

public static Short castToShort(Object object) {
if (object == null) {
return null;
}
return Short.valueOf(castObjectIntoString(object));
}

public static Integer castToInteger(Object object) {
if (object == null) {
return null;
}
return Integer.valueOf(castObjectIntoString(object));
}

public static Long castToLong(Object object) {
if (object == null) {
return null;
}
return Long.valueOf(castObjectIntoString(object));
}

public static Float castToFloat(Object object) {
if (object == null) {
return null;
}
return Float.valueOf(castObjectIntoString(object));
}

public static Double castToDouble(Object object) {
if (object == null) {
return null;
}
return Double.valueOf(castObjectIntoString(object));
}

public static BigDecimal castToBigDecimal(Object object, int precision, int scale) {
if (object == null) {
return null;
}
BigDecimal bigDecimal =
new BigDecimal(castObjectIntoString(object), new MathContext(precision));
bigDecimal = bigDecimal.setScale(scale, BigDecimal.ROUND_HALF_UP);
return bigDecimal;
}

private static String castObjectIntoString(Object object) {
if (object instanceof Boolean) {
return Boolean.valueOf(castToString(object)) ? "1" : "0";
}
return String.valueOf(object);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.cdc.common.utils.StringUtils;

import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -228,6 +230,8 @@ private static Java.Rvalue sqlBasicCallToJaninoRvalue(
case LESS_THAN_OR_EQUAL:
case GREATER_THAN_OR_EQUAL:
return generateBinaryOperation(sqlBasicCall, atoms, sqlBasicCall.getKind().sql);
case CAST:
return generateCastOperation(sqlBasicCall, atoms);
case OTHER:
return generateOtherOperation(sqlBasicCall, atoms);
default:
Expand Down Expand Up @@ -256,6 +260,16 @@ private static Java.Rvalue generateEqualsOperation(
Location.NOWHERE, null, StringUtils.convertToCamelCase("VALUE_EQUALS"), atoms);
}

private static Java.Rvalue generateCastOperation(
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (atoms.length != 1) {
throw new ParseException("Unrecognized expression: " + sqlBasicCall.toString());
}
List<SqlNode> operandList = sqlBasicCall.getOperandList();
SqlDataTypeSpec sqlDataTypeSpec = (SqlDataTypeSpec) operandList.get(1);
return generateTypeConvertMethod(sqlDataTypeSpec, atoms);
}

private static Java.Rvalue generateOtherOperation(
SqlBasicCall sqlBasicCall, Java.Rvalue[] atoms) {
if (sqlBasicCall.getOperator().getName().equals("||")) {
Expand Down Expand Up @@ -298,4 +312,56 @@ private static Java.Rvalue generateNoOperandTimestampFunctionOperation(String op
StringUtils.convertToCamelCase(operationName),
timestampFunctionParam.toArray(new Java.Rvalue[0]));
}

private static Java.Rvalue generateTypeConvertMethod(
SqlDataTypeSpec sqlDataTypeSpec, Java.Rvalue[] atoms) {
switch (sqlDataTypeSpec.getTypeName().getSimple().toUpperCase()) {
case "BOOLEAN":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToBoolean", atoms);
case "TINYINT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToByte", atoms);
case "SMALLINT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToShort", atoms);
case "INTEGER":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToInteger", atoms);
case "BIGINT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToLong", atoms);
case "FLOAT":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToFloat", atoms);
case "DOUBLE":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToDouble", atoms);
case "DECIMAL":
int precision = 10;
int scale = 0;
if (sqlDataTypeSpec.getTypeNameSpec() instanceof SqlBasicTypeNameSpec) {
SqlBasicTypeNameSpec typeNameSpec =
(SqlBasicTypeNameSpec) sqlDataTypeSpec.getTypeNameSpec();
if (typeNameSpec.getPrecision() > -1) {
precision = typeNameSpec.getPrecision();
}
if (typeNameSpec.getScale() > -1) {
scale = typeNameSpec.getScale();
}
}
List<Java.Rvalue> newAtoms = new ArrayList<>(Arrays.asList(atoms));
newAtoms.add(
new Java.AmbiguousName(
Location.NOWHERE, new String[] {String.valueOf(precision)}));
newAtoms.add(
new Java.AmbiguousName(
Location.NOWHERE, new String[] {String.valueOf(scale)}));
return new Java.MethodInvocation(
Location.NOWHERE,
null,
"castToBigDecimal",
newAtoms.toArray(new Java.Rvalue[0]));
case "CHAR":
case "VARCHAR":
case "STRING":
return new Java.MethodInvocation(Location.NOWHERE, null, "castToString", atoms);
default:
throw new ParseException(
"Unsupported data type cast: " + sqlDataTypeSpec.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void writeBinary(int pos, byte[] bytes) {

@Override
public void writeDecimal(int pos, DecimalData value, int precision) {
assert value == null || (value.precision() == precision);
assert value == null || (value.precision() <= precision);

if (DecimalData.isCompact(precision)) {
assert value != null;
Expand Down
Loading

0 comments on commit 1388cf9

Please sign in to comment.