From 2798dc40aa99536a2270275a406070fcf5b12b00 Mon Sep 17 00:00:00 2001 From: minurajeeve Date: Tue, 14 Nov 2023 18:06:23 +0530 Subject: [PATCH] Add precision as an optional parameter in set-type --- .../io/cdap/directives/column/SetType.java | 74 ++++++++++++++++--- .../datamodel/DataModelMapColumn.java | 2 +- .../io/cdap/wrangler/parser/MigrateToV2.java | 5 +- .../cdap/wrangler/utils/ColumnConverter.java | 65 +++++++++++----- .../cdap/directives/column/SetTypeTest.java | 53 ++++++++++--- .../wrangler/parser/GrammarMigratorTest.java | 2 +- wrangler-docs/directives/set-type.md | 5 +- .../directive/AbstractDirectiveHandler.java | 2 +- 8 files changed, 164 insertions(+), 44 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java index 1ecc3b159..985a6719a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SetType.java @@ -20,12 +20,14 @@ import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.data.schema.Schema.LogicalType; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; import io.cdap.wrangler.api.DirectiveParseException; import io.cdap.wrangler.api.ExecutorContext; import io.cdap.wrangler.api.Optional; +import io.cdap.wrangler.api.Pair; import io.cdap.wrangler.api.Row; import io.cdap.wrangler.api.SchemaResolutionContext; import io.cdap.wrangler.api.annotations.Categories; @@ -40,19 +42,20 @@ import io.cdap.wrangler.utils.ColumnConverter; import java.math.RoundingMode; +import java.util.HashMap; import java.util.List; import java.util.stream.Collectors; /** * A Wrangler step for converting data type of column * Accepted types are: int, short, long, double, float, string, boolean and bytes - * When decimal type is selected, can also specify the scale and rounding mode + * When decimal type is selected, can also specify the scale, precision and rounding mode */ @Plugin(type = "directives") @Name(SetType.NAME) @Categories(categories = {"column"}) -@Description("Converting data type of a column. Optional arguments scale and rounding-mode " + - "are used only when type is decimal.") +@Description("Converting data type of a column. Optional arguments scale, precision and " + + "rounding-mode are used only when type is decimal.") public final class SetType implements Directive, Lineage { public static final String NAME = "set-type"; @@ -60,6 +63,7 @@ public final class SetType implements Directive, Lineage { private String type; private Integer scale; private RoundingMode roundingMode; + private Integer precision; @Override public UsageDefinition define() { @@ -68,6 +72,7 @@ public UsageDefinition define() { builder.define("type", TokenType.IDENTIFIER); builder.define("scale", TokenType.NUMERIC, Optional.TRUE); builder.define("rounding-mode", TokenType.TEXT, Optional.TRUE); + builder.define("precision", TokenType.PROPERTIES, "prop:{precision=}", Optional.TRUE); return builder.build(); } @@ -76,14 +81,19 @@ public void initialize(Arguments args) throws DirectiveParseException { col = ((ColumnName) args.value("column")).value(); type = ((Identifier) args.value("type")).value(); if (type.equalsIgnoreCase("decimal")) { + precision = args.contains("precision") ? (Integer) ((HashMap) args. + value("precision").value()).get("precision").value().intValue() : null; + if (precision != null && precision < 1) { + throw new DirectiveParseException("precision cannot be less than 1"); + } scale = args.contains("scale") ? ((Numeric) args.value("scale")).value().intValue() : null; - if (scale == null && args.contains("rounding-mode")) { - throw new DirectiveParseException("'rounding-mode' can only be specified when a 'scale' is set"); + if (scale == null && precision == null && args.contains("rounding-mode")) { + throw new DirectiveParseException("'rounding-mode' can only be specified when a 'scale' or 'precision' is set"); } try { roundingMode = args.contains("rounding-mode") ? RoundingMode.valueOf(((Text) args.value("rounding-mode")).value()) : - (scale == null ? RoundingMode.UNNECESSARY : RoundingMode.HALF_EVEN); + (scale == null && precision == null ? RoundingMode.UNNECESSARY : RoundingMode.HALF_EVEN); } catch (IllegalArgumentException e) { throw new DirectiveParseException(String.format( "Specified rounding-mode '%s' is not a valid Java rounding mode", args.value("rounding-mode").value()), e); @@ -99,7 +109,7 @@ public void destroy() { @Override public List execute(List rows, ExecutorContext context) throws DirectiveExecutionException { for (Row row : rows) { - ColumnConverter.convertType(NAME, row, col, type, scale, roundingMode); + ColumnConverter.convertType(NAME, row, col, type, scale, precision, roundingMode); } return rows; } @@ -121,8 +131,41 @@ public Schema getOutputSchema(SchemaResolutionContext context) { .map( field -> { try { - return field.getName().equals(col) ? - Schema.Field.of(col, ColumnConverter.getSchemaForType(type, scale)) : field; + if (field.getName().equals(col)) { + Integer outputScale = scale; + Integer outputPrecision = precision; + Schema fieldSchema = field.getSchema().getNonNullable(); + Pair scaleAndPrecision = getPrecisionAndScale(fieldSchema); + Integer inputSchemaScale = scaleAndPrecision.getSecond(); + Integer inputSchemaPrecision = scaleAndPrecision.getFirst(); + + if (scale == null && precision == null) { + outputScale = inputSchemaScale; + outputPrecision = inputSchemaPrecision; + } else if (scale == null && inputSchemaScale != null) { + if (precision - inputSchemaScale < 1) { + throw new DirectiveParseException(String.format( + "Cannot set scale as '%s' and precision as '%s' when " + + "given precision - scale is less than 1 ", inputSchemaScale, + precision)); + } + outputScale = inputSchemaScale; + outputPrecision = precision; + + } else if (precision == null && inputSchemaPrecision != null) { + if (inputSchemaPrecision - scale < 1) { + throw new DirectiveParseException(String.format( + "Cannot set scale as '%s' and precision as '%s' when " + + "given precision - scale is less than 1 ", scale, + inputSchemaPrecision)); + } + outputScale = scale; + outputPrecision = inputSchemaPrecision; + } + return Schema.Field.of(col, ColumnConverter.getSchemaForType(type, + outputScale, outputPrecision)); + } + return field; } catch (DirectiveParseException e) { throw new RuntimeException(e); } @@ -131,4 +174,17 @@ public Schema getOutputSchema(SchemaResolutionContext context) { .collect(Collectors.toList()) ); } + + /** + * extracts precision and scale from schema string + */ + public static Pair getPrecisionAndScale(Schema fieldSchema) { + Integer precision = null; + Integer scale = null; + if (fieldSchema.getLogicalType() == LogicalType.DECIMAL) { + precision = fieldSchema.getPrecision(); + scale = fieldSchema.getScale(); + } + return new Pair(precision, scale); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/datamodel/DataModelMapColumn.java b/wrangler-core/src/main/java/io/cdap/directives/datamodel/DataModelMapColumn.java index f68f09c2b..84b11b25d 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/datamodel/DataModelMapColumn.java +++ b/wrangler-core/src/main/java/io/cdap/directives/datamodel/DataModelMapColumn.java @@ -152,7 +152,7 @@ public void initialize(Arguments args) throws DirectiveParseException { @Override public List execute(List rows, ExecutorContext context) throws DirectiveExecutionException { for (Row row : rows) { - ColumnConverter.convertType(NAME, row, column, targetFieldTypeName, null, RoundingMode.UNNECESSARY); + ColumnConverter.convertType(NAME, row, column, targetFieldTypeName, null, null, RoundingMode.UNNECESSARY); ColumnConverter.rename(NAME, row, column, targetFieldName); } return rows; diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/parser/MigrateToV2.java b/wrangler-core/src/main/java/io/cdap/wrangler/parser/MigrateToV2.java index 1fcf030c5..6ef865232 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/parser/MigrateToV2.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/parser/MigrateToV2.java @@ -126,13 +126,14 @@ public String migrate() throws DirectiveParseException { } break; - //set-type [ ] + //set-type [ prop:{precision=}] case "set-type": { String col = getNextToken(tokenizer, command, "col", lineno); String type = getNextToken(tokenizer, command, "type", lineno); String scale = getNextToken(tokenizer, null, command, "scale", lineno, true); String roundingMode = getNextToken(tokenizer, null, command, "rounding-mode", lineno, true); - transformed.add(String.format("set-type %s %s %s %s;", col(col), type, scale, roundingMode)); + String precision = getNextToken(tokenizer, null, command, "precision", lineno, true); + transformed.add(String.format("set-type %s %s %s %s %s;", col(col), type, scale, roundingMode, precision)); } break; diff --git a/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java b/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java index 29d64a596..24db532e7 100644 --- a/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java +++ b/wrangler-core/src/main/java/io/cdap/wrangler/utils/ColumnConverter.java @@ -17,11 +17,13 @@ import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.data.schema.Schema.LogicalType; import io.cdap.wrangler.api.DirectiveExecutionException; import io.cdap.wrangler.api.DirectiveParseException; import io.cdap.wrangler.api.Row; import java.math.BigDecimal; +import java.math.MathContext; import java.math.RoundingMode; import java.util.Collections; import java.util.HashMap; @@ -45,7 +47,7 @@ private ColumnConverter() { * @throws DirectiveExecutionException when a column matching the target name already exists */ public static void rename(String directiveName, Row row, String column, String toName) - throws DirectiveExecutionException { + throws DirectiveExecutionException { int idx = row.find(column); int existingColumn = row.find(toName); if (idx == -1) { @@ -57,9 +59,9 @@ public static void rename(String directiveName, Row row, String column, String t row.setColumn(idx, toName); } else { throw new DirectiveExecutionException( - directiveName, String.format("Column '%s' already exists. Apply the 'drop %s' directive before " + - "renaming '%s' to '%s'.", - toName, toName, column, toName)); + directiveName, String.format("Column '%s' already exists. Apply the 'drop %s' directive before " + + "renaming '%s' to '%s'.", + toName, toName, column, toName)); } } @@ -73,8 +75,8 @@ public static void rename(String directiveName, Row row, String column, String t * @throws DirectiveExecutionException when an unsupported type is specified or the column can not be converted. */ public static void convertType(String directiveName, Row row, String column, String toType, - Integer scale, RoundingMode roundingMode) - throws DirectiveExecutionException { + Integer scale, Integer precision, RoundingMode roundingMode) + throws DirectiveExecutionException { int idx = row.find(column); if (idx != -1) { Object object = row.getValue(idx); @@ -84,7 +86,8 @@ public static void convertType(String directiveName, Row row, String column, Str try { Object converted = ColumnConverter.convertType(column, toType, object); if (toType.equalsIgnoreCase(ColumnTypeNames.DECIMAL)) { - row.setValue(idx, setDecimalScale((BigDecimal) converted, scale, roundingMode)); + row.setValue(idx, setDecimalScaleAndPrecision((BigDecimal) converted, scale, + precision, roundingMode)); } else { row.setValue(idx, converted); } @@ -92,13 +95,13 @@ public static void convertType(String directiveName, Row row, String column, Str throw e; } catch (Exception e) { throw new DirectiveExecutionException( - directiveName, String.format("Column '%s' cannot be converted to a '%s'.", column, toType), e); + directiveName, String.format("Column '%s' cannot be converted to a '%s'.", column, toType), e); } } } private static Object convertType(String col, String toType, Object object) - throws Exception { + throws Exception { toType = toType.toUpperCase(); switch (toType) { case ColumnTypeNames.INTEGER: @@ -291,38 +294,62 @@ private static Object convertType(String col, String toType, Object object) default: throw new DirectiveExecutionException(String.format( - "Column '%s' is of unsupported type '%s'. Supported types are: " + - "int, short, long, double, decimal, boolean, string, bytes", col, toType)); + "Column '%s' is of unsupported type '%s'. Supported types are: " + + "int, short, long, double, decimal, boolean, string, bytes", col, toType)); } throw new DirectiveExecutionException( String.format("Column '%s' has value of type '%s' and cannot be converted to a '%s'.", col, object.getClass().getSimpleName(), toType)); } - private static BigDecimal setDecimalScale(BigDecimal decimal, Integer scale, RoundingMode roundingMode) - throws DirectiveExecutionException { - if (scale == null) { + private static BigDecimal setDecimalScaleAndPrecision(BigDecimal decimal, Integer scale, + Integer precision, RoundingMode roundingMode) + throws DirectiveExecutionException { + if (scale == null && precision == null) { return decimal; } try { - return decimal.setScale(scale, roundingMode); + if (precision == null) { + return decimal.setScale(scale, roundingMode); + } else if (scale == null) { + return decimal.round(new MathContext(precision, roundingMode)); + } else { + BigDecimal result; + if (validateScaleAndPrecision(scale, precision, decimal)) { + result = decimal.setScale(scale, roundingMode); + result = result.round(new MathContext(precision, roundingMode)); + } else { + throw new DirectiveExecutionException(String.format( + "Cannot set scale as '%s' and precision as '%s' for value '%s' when" + + "given precision - scale is less than number of digits" + + " before decimal point ", scale, precision, decimal)); + } + return result; + } } catch (ArithmeticException e) { throw new DirectiveExecutionException(String.format( - "Cannot set scale as '%s' for value '%s' when rounding-mode is '%s'", scale, decimal, roundingMode), e); + "Cannot set scale as '%s' and precision '%s' for value '%s' when rounding-mode " + + "is '%s'", scale, precision, decimal, roundingMode), e); } } - public static Schema getSchemaForType(String type, Integer scale) throws DirectiveParseException { + private static Boolean validateScaleAndPrecision(Integer scale, Integer precision, BigDecimal decimal) { + int digitsBeforeDecimalPoint = decimal.signum() == 0 ? 1 : decimal.precision() - decimal.scale(); + return precision - scale >= digitsBeforeDecimalPoint; + } + + public static Schema getSchemaForType(String type, Integer scale, Integer precision) throws DirectiveParseException { Schema typeSchema; type = type.toUpperCase(); if (type.equals(ColumnTypeNames.DECIMAL)) { // TODO make set-type support setting decimal precision + precision = precision != null ? precision : 77; scale = scale != null ? scale : 38; - typeSchema = Schema.nullableOf(Schema.decimalOf(77, scale)); + typeSchema = Schema.nullableOf(Schema.decimalOf(precision, scale)); } else { if (!SCHEMA_TYPE_MAP.containsKey(type)) { throw new DirectiveParseException(String.format("'%s' is an unsupported type. " + - "Supported types are: int, short, long, double, decimal, boolean, string, bytes", type)); + "Supported types are: int, short, long, double, decimal, boolean, string, bytes", type)); } typeSchema = Schema.nullableOf(Schema.of(SCHEMA_TYPE_MAP.get(type))); } diff --git a/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java b/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java index b5f83c9e3..92b7f8f9f 100644 --- a/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java +++ b/wrangler-core/src/test/java/io/cdap/directives/column/SetTypeTest.java @@ -213,13 +213,48 @@ public void testToDecimalInvalidRoundingMode() throws Exception { TestingRig.execute(directives, rows); } + @Test + public void testToDecimalWithScalePrecisionAndRoundingMode() throws Exception { + List rows = Collections.singletonList(new Row("scale_1_precision_4", "122.5") + .add("scale_3_precision_6", "456.789")); + String[] directives = new String[] {"set-type :scale_1_precision_4 decimal 0 'FLOOR' prop:{precision=3}", + "set-type :scale_3_precision_6 decimal 0 prop:{precision=5}"}; + List results = TestingRig.execute(directives, rows); + Row row = results.get(0); + + Assert.assertTrue(row.getValue(0) instanceof BigDecimal); + Assert.assertEquals(row.getValue(0), new BigDecimal("122")); + + Assert.assertTrue(row.getValue(1) instanceof BigDecimal); + Assert.assertEquals(row.getValue(1), new BigDecimal("457")); + } + + @Test + public void testToDecimalWithPrecision() throws Exception { + List rows = Collections.singletonList(new Row("scale_1_precision_4", "122.5")); + String[] directives = new String[] {"set-type :scale_1_precision_4 decimal 'FLOOR' prop:{precision=3}"}; + List results = TestingRig.execute(directives, rows); + Row row = results.get(0); + + Assert.assertTrue(row.getValue(0) instanceof BigDecimal); + Assert.assertEquals(row.getValue(0), new BigDecimal("122")); + + } + + @Test(expected = RecipeException.class) + public void testToDecimalWithInvalidPrecision() throws Exception { + List rows = Collections.singletonList(new Row("scale_1_precision_4", "122.5")); + String[] directives = new String[] {"set-type :scale_1_precision_4 decimal 0 'FLOOR' prop:{precision=-1}"}; + TestingRig.execute(directives, rows); + } + @Test public void testToDecimalScaleIsNull() throws Exception { List rows = Collections.singletonList(new Row("scale_2", "125.45")); String[] directives = new String[] {"set-type scale_2 decimal"}; Schema inputSchema = Schema.recordOf( "inputSchema", - Schema.Field.of("scale_2", Schema.of(Schema.Type.DOUBLE)) + Schema.Field.of("scale_2", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))) ); Schema expectedSchema = Schema.recordOf( @@ -377,14 +412,14 @@ public void testGetOutputSchemaForTypeChangedColumn() throws Exception { .add("D", "random").add("E", 123).add("F", "true").add("G", 12L) ); Schema inputSchema = Schema.recordOf( - "inputSchema", - Schema.Field.of("A", Schema.of(Schema.Type.STRING)), - Schema.Field.of("B", Schema.of(Schema.Type.STRING)), - Schema.Field.of("C", Schema.of(Schema.Type.STRING)), - Schema.Field.of("D", Schema.of(Schema.Type.STRING)), - Schema.Field.of("E", Schema.of(Schema.Type.INT)), - Schema.Field.of("F", Schema.of(Schema.Type.STRING)), - Schema.Field.of("G", Schema.of(Schema.Type.LONG)) + "inputSchema", + Schema.Field.of("A", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("B", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("C", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("D", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("E", Schema.nullableOf(Schema.of(Schema.Type.INT))), + Schema.Field.of("F", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("G", Schema.nullableOf(Schema.of(Schema.Type.LONG))) ); Schema expectedSchema = Schema.recordOf( "expectedSchema", diff --git a/wrangler-core/src/test/java/io/cdap/wrangler/parser/GrammarMigratorTest.java b/wrangler-core/src/test/java/io/cdap/wrangler/parser/GrammarMigratorTest.java index 5357c392f..db724c663 100644 --- a/wrangler-core/src/test/java/io/cdap/wrangler/parser/GrammarMigratorTest.java +++ b/wrangler-core/src/test/java/io/cdap/wrangler/parser/GrammarMigratorTest.java @@ -110,7 +110,7 @@ public class GrammarMigratorTest { "set-column :salary exp:{salary > 100 ? 100 : salary};", "set-headers :fname,:lname,:address,:city,:state,:country,:zipcode;", "rename :body_fname :fname;", - "set-type :value int null null;", + "set-type :value int null null null;", "drop :fname,:lname,:address,:zipcode,:city;", "merge :fname :lname :name ',';", "uppercase :name;", diff --git a/wrangler-docs/directives/set-type.md b/wrangler-docs/directives/set-type.md index 644539d6a..055b2a68d 100644 --- a/wrangler-docs/directives/set-type.md +++ b/wrangler-docs/directives/set-type.md @@ -4,11 +4,12 @@ Convert data type of a column ## Syntax ``` -set-type [ ] +set-type [ prop:{precision=}] ``` The `` is converted to the type in ``. Acceptable types are: int, short, long, float, double, decimal, string, bytes, boolean. When `decimal` type is specified, two optional arguments can be given: - ``: set the scale of the decimal value. - ``: Java [rounding-mode](https://docs.oracle.com/javase/7/docs/api/java/math/RoundingMode.html) -to use when decimal value's scale is not equal to the set scale. By default, `HALF_EVEN` is assumed. \ No newline at end of file +to use when decimal value's scale is not equal to the set scale. By default, `HALF_EVEN` is assumed. +- ``: set the precision of the decimal value. \ No newline at end of file diff --git a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java index 92231dd7d..d7eae7960 100644 --- a/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java +++ b/wrangler-service/src/main/java/io/cdap/wrangler/service/directive/AbstractDirectiveHandler.java @@ -109,7 +109,7 @@ public void destroy() { try { composite.close(); } catch (IOException e) { - // If something bad happens here, you might see a a lot of open file handles. + // If something bad happens here, you might see a lot of open file handles. LOG.warn("Unable to close the directive registry. You might see increasing number of open file handle.", e); } }