From 4c601ff27e11a9eade0b742815ba912aba5fef25 Mon Sep 17 00:00:00 2001 From: mchades Date: Thu, 12 Oct 2023 11:57:47 +0800 Subject: [PATCH] [#488] fix(api, Iceberg): Add truncate and bucket transform (#495) ### What changes were proposed in this pull request? - Add convenient usage for `bucket` and `truncate` partitioning - fix Iceberg catalog partitioning convert bug ### Why are the changes needed? `bucket` and `truncate` partitioning are common to Iceberg Fix: #488 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? UTs modified --- .../graviton/rel/transforms/Transforms.java | 30 +++ .../converter/FromIcebergPartitionSpec.java | 11 +- .../converter/ToIcebergPartitionSpec.java | 26 +-- .../iceberg/converter/ToIcebergSortOrder.java | 8 +- .../lakehouse/iceberg/TestIcebergTable.java | 9 +- .../converter/TestToIcebergPartitionSpec.java | 12 +- .../dto/rel/ExpressionPartitionDTO.java | 37 +++- .../graviton/dto/rel/ListPartitionDTO.java | 2 +- .../graviton/dto/rel/PartitionUtils.java | 202 +++++++++--------- .../graviton/dto/rel/RangePartitionDTO.java | 4 +- .../graviton/dto/rel/SimplePartitionDTO.java | 43 +++- .../graviton/json/TestDTOJsonSerDe.java | 52 +++-- 12 files changed, 269 insertions(+), 167 deletions(-) diff --git a/api/src/main/java/com/datastrato/graviton/rel/transforms/Transforms.java b/api/src/main/java/com/datastrato/graviton/rel/transforms/Transforms.java index 3a45d5138a..5b23e2fd17 100644 --- a/api/src/main/java/com/datastrato/graviton/rel/transforms/Transforms.java +++ b/api/src/main/java/com/datastrato/graviton/rel/transforms/Transforms.java @@ -4,6 +4,8 @@ */ package com.datastrato.graviton.rel.transforms; +import static io.substrait.expression.ExpressionCreator.i32; + import com.datastrato.graviton.rel.Column; import com.google.common.annotations.VisibleForTesting; import io.substrait.expression.Expression; @@ -17,6 +19,8 @@ public class Transforms { public static final String NAME_OF_MONTH = "month"; public static final String NAME_OF_DAY = "day"; public static final String NAME_OF_HOUR = "hour"; + public static final String NAME_OF_TRUNCATE = "truncate"; + public static final String NAME_OF_BUCKET = "bucket"; public static final String NAME_OF_LIST = "list"; public static final String NAME_OF_RANGE = "range"; @@ -80,6 +84,32 @@ public static FunctionTransform hour(String[] fieldName) { return function(NAME_OF_HOUR, new Transform[] {field(fieldName)}); } + /** + * Creates a bucket partitioning by the given field name and number of buckets. + * + * @param fieldName The field name to partition by. + * @param numBuckets The number of buckets. + * @return The bucket partitioning. + */ + public static FunctionTransform bucket(String[] fieldName, int numBuckets) { + Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, numBuckets)); + Transforms.NamedReference field = Transforms.field(fieldName); + return function(NAME_OF_BUCKET, new Transform[] {field, bucketNum}); + } + + /** + * Creates a truncate partitioning by the given field name and width. + * + * @param fieldName The field name to partition by. + * @param width The width. + * @return The truncate partitioning. + */ + public static FunctionTransform truncate(String[] fieldName, int width) { + Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, width)); + Transforms.NamedReference field = Transforms.field(fieldName); + return function(NAME_OF_TRUNCATE, new Transform[] {field, bucketNum}); + } + /** * Creates a list partitioning by the given field names. For dynamically partitioned tables only. * diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/FromIcebergPartitionSpec.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/FromIcebergPartitionSpec.java index 0deffba1cc..2c38503868 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/FromIcebergPartitionSpec.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/FromIcebergPartitionSpec.java @@ -4,8 +4,6 @@ */ package com.datastrato.graviton.catalog.lakehouse.iceberg.converter; -import static io.substrait.expression.ExpressionCreator.i32; - import com.datastrato.graviton.rel.transforms.Transform; import com.datastrato.graviton.rel.transforms.Transforms; import com.google.common.annotations.VisibleForTesting; @@ -38,17 +36,12 @@ public Transform identity(String sourceName, int sourceId) { @Override public Transform bucket(String sourceName, int sourceId, int numBuckets) { - Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, numBuckets)); - Transforms.NamedReference field = Transforms.field(new String[] {idToName.get(sourceId)}); - // bucket(fieldName, bucketNum) - return Transforms.function("bucket", new Transform[] {field, bucketNum}); + return Transforms.bucket(new String[] {idToName.get(sourceId)}, numBuckets); } @Override public Transform truncate(String sourceName, int sourceId, int width) { - Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, width)); - Transforms.NamedReference field = Transforms.field(new String[] {idToName.get(sourceId)}); - return Transforms.function("truncate", new Transform[] {field, bucketNum}); + return Transforms.truncate(new String[] {idToName.get(sourceId)}, width); } @Override diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergPartitionSpec.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergPartitionSpec.java index 0ed71cdb1a..008453b3f6 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergPartitionSpec.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergPartitionSpec.java @@ -4,15 +4,16 @@ */ package com.datastrato.graviton.catalog.lakehouse.iceberg.converter; +import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_BUCKET; +import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_TRUNCATE; + import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergTable; import com.datastrato.graviton.rel.transforms.Transform; import com.datastrato.graviton.rel.transforms.Transforms; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.substrait.expression.ImmutableExpression; -import java.util.Arrays; import java.util.Locale; -import java.util.stream.Collectors; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -54,20 +55,10 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition String colName = String.join(DOT, fieldName); builder.identity(colName); } else if (transform instanceof Transforms.FunctionTrans) { - Preconditions.checkArgument( - transform.arguments().length == 1, - "Iceberg partition does not support nested field", - transform); String colName = - Arrays.stream(transform.arguments()) - .map(t -> ((Transforms.NamedReference) t).value()[0]) - .collect(Collectors.joining(DOT)); + String.join(DOT, ((Transforms.NamedReference) transform.arguments()[0]).value()); switch (transform.name().toLowerCase(Locale.ROOT)) { - // TODO minghuang add support for other transforms. - case "identity": - builder.identity(colName); - break; - case "bucket": + case NAME_OF_BUCKET: builder.bucket(colName, findWidth(transform)); break; case Transforms.NAME_OF_YEAR: @@ -82,7 +73,7 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition case Transforms.NAME_OF_HOUR: builder.hour(colName); break; - case "truncate": + case NAME_OF_TRUNCATE: builder.truncate(colName, findWidth(transform)); break; default: @@ -97,9 +88,8 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition } private static int findWidth(Transform transform) { - Preconditions.checkArgument( - transform.arguments().length == 1, "Transform with multiple arguments is not supported"); - Transform expr = transform.arguments()[0]; + // transform here format is: truncate(fieldName, width) or bucket(fieldName, numBuckets) + Transform expr = transform.arguments()[1]; if (expr instanceof Transforms.LiteralReference) { Transforms.LiteralReference literalReference = (Transforms.LiteralReference) expr; if (literalReference.value() instanceof ImmutableExpression.I8Literal) { diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergSortOrder.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergSortOrder.java index eb5e6fda66..16c676f0d9 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergSortOrder.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/ToIcebergSortOrder.java @@ -4,6 +4,9 @@ */ package com.datastrato.graviton.catalog.lakehouse.iceberg.converter; +import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_BUCKET; +import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_TRUNCATE; + import com.datastrato.graviton.rel.SortOrder; import com.datastrato.graviton.rel.transforms.Transform; import com.datastrato.graviton.rel.transforms.Transforms; @@ -54,15 +57,14 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[ .collect(Collectors.joining(DOT)); UnboundTerm expression; switch (transform.name().toLowerCase(Locale.ROOT)) { - // TODO minghuang - add more functions implementation. - case "bucket": + case NAME_OF_BUCKET: int numBuckets = ((Expression.I32Literal) ((Transforms.LiteralReference) transform.arguments()[0]).value()) .value(); expression = Expressions.bucket(colName, numBuckets); break; - case "truncate": + case NAME_OF_TRUNCATE: int width = ((Expression.I32Literal) ((Transforms.LiteralReference) transform.arguments()[0]).value()) diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/TestIcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/TestIcebergTable.java index 603503984b..a6dadbebbf 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/TestIcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/TestIcebergTable.java @@ -4,8 +4,10 @@ */ package com.datastrato.graviton.catalog.lakehouse.iceberg; +import static com.datastrato.graviton.rel.transforms.Transforms.bucket; import static com.datastrato.graviton.rel.transforms.Transforms.day; import static com.datastrato.graviton.rel.transforms.Transforms.identity; +import static com.datastrato.graviton.rel.transforms.Transforms.truncate; import com.datastrato.graviton.NameIdentifier; import com.datastrato.graviton.Namespace; @@ -218,7 +220,12 @@ public void testCreatePartitionedIcebergTable() { .build(); Column[] columns = new Column[] {col1, col2}; - Transform[] partitions = new Transform[] {day(new String[] {col2.name()})}; + Transform[] partitions = + new Transform[] { + day(new String[] {col2.name()}), + bucket(new String[] {col1.name()}, 10), + truncate(new String[] {col1.name()}, 2) + }; Table table = icebergCatalog diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/TestToIcebergPartitionSpec.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/TestToIcebergPartitionSpec.java index 777c73d0f1..cb7daac5f0 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/TestToIcebergPartitionSpec.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/graviton/catalog/lakehouse/iceberg/converter/TestToIcebergPartitionSpec.java @@ -4,10 +4,12 @@ */ package com.datastrato.graviton.catalog.lakehouse.iceberg.converter; +import static com.datastrato.graviton.rel.transforms.Transforms.bucket; import static com.datastrato.graviton.rel.transforms.Transforms.day; import static com.datastrato.graviton.rel.transforms.Transforms.hour; import static com.datastrato.graviton.rel.transforms.Transforms.identity; import static com.datastrato.graviton.rel.transforms.Transforms.month; +import static com.datastrato.graviton.rel.transforms.Transforms.truncate; import com.datastrato.graviton.rel.transforms.Transform; import java.util.Arrays; @@ -35,13 +37,19 @@ void testToPartitionSpec() { ArrayUtils.add(nestedFields, createNestedField(6, "col_6", Types.TimestampType.withZone())); nestedFields = ArrayUtils.add(nestedFields, createNestedField(7, "col_7", Types.DateType.get())); + nestedFields = + ArrayUtils.add(nestedFields, createNestedField(8, "col_8", Types.LongType.get())); + nestedFields = + ArrayUtils.add(nestedFields, createNestedField(9, "col_9", Types.StringType.get())); Transform[] partitioning = new Transform[] { identity(new String[] {nestedFields[0].name()}), day(new String[] {nestedFields[4].name()}), hour(new String[] {nestedFields[5].name()}), - month(new String[] {nestedFields[6].name()}) + month(new String[] {nestedFields[6].name()}), + bucket(new String[] {nestedFields[4].name()}, 10), + truncate(new String[] {nestedFields[7].name()}, 20), }; Schema schema = new Schema(nestedFields); @@ -50,7 +58,7 @@ void testToPartitionSpec() { List fields = partitionSpec.fields(); Assertions.assertEquals(partitioning.length, fields.size()); - Assertions.assertEquals(4, fields.size()); + Assertions.assertEquals(6, fields.size()); Map idToName = schema.idToName(); Map nestedFieldByName = Arrays.stream(nestedFields).collect(Collectors.toMap(Types.NestedField::name, v -> v)); diff --git a/common/src/main/java/com/datastrato/graviton/dto/rel/ExpressionPartitionDTO.java b/common/src/main/java/com/datastrato/graviton/dto/rel/ExpressionPartitionDTO.java index 275d1f8393..acd356303b 100644 --- a/common/src/main/java/com/datastrato/graviton/dto/rel/ExpressionPartitionDTO.java +++ b/common/src/main/java/com/datastrato/graviton/dto/rel/ExpressionPartitionDTO.java @@ -7,6 +7,8 @@ import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.ExpressionType.FIELD; import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.ExpressionType.FUNCTION; import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.ExpressionType.LITERAL; +import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_BUCKET; +import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_TRUNCATE; import com.datastrato.graviton.json.JsonUtils; import com.fasterxml.jackson.annotation.JsonCreator; @@ -17,6 +19,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.google.common.base.Preconditions; import io.substrait.type.Type; +import io.substrait.type.TypeCreator; import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.logging.log4j.util.Strings; @@ -46,7 +49,7 @@ private void validateExpression(ColumnDTO[] columns, Expression expression) { switch (expression.expressionType()) { case FIELD: FieldExpression fieldExpression = (FieldExpression) expression; - PartitionUtils.validateFieldExist(columns, fieldExpression.fieldName); + PartitionUtils.validateFieldExistence(columns, fieldExpression.fieldName); break; case FUNCTION: validateExpression(columns, expression); @@ -74,6 +77,38 @@ public ExpressionPartitionDTO build() { } } + public static ExpressionPartitionDTO bucket(String[] fieldName, int numBuckets) { + return new ExpressionPartitionDTO.Builder( + new FunctionExpression.Builder() + .withFuncName(NAME_OF_BUCKET) + .withArgs( + new Expression[] { + new FieldExpression.Builder().withFieldName(fieldName).build(), + new LiteralExpression.Builder() + .withType(TypeCreator.REQUIRED.I32) + .withValue(String.valueOf(numBuckets)) + .build() + }) + .build()) + .build(); + } + + public static ExpressionPartitionDTO truncate(String[] fieldName, int width) { + return new ExpressionPartitionDTO.Builder( + new FunctionExpression.Builder() + .withFuncName(NAME_OF_TRUNCATE) + .withArgs( + new Expression[] { + new FieldExpression.Builder().withFieldName(fieldName).build(), + new LiteralExpression.Builder() + .withType(TypeCreator.REQUIRED.I32) + .withValue(String.valueOf(width)) + .build() + }) + .build()) + .build(); + } + enum ExpressionType { @JsonProperty("field") FIELD, diff --git a/common/src/main/java/com/datastrato/graviton/dto/rel/ListPartitionDTO.java b/common/src/main/java/com/datastrato/graviton/dto/rel/ListPartitionDTO.java index 52ab3d0aa9..a44b1e9cbc 100644 --- a/common/src/main/java/com/datastrato/graviton/dto/rel/ListPartitionDTO.java +++ b/common/src/main/java/com/datastrato/graviton/dto/rel/ListPartitionDTO.java @@ -53,7 +53,7 @@ public Strategy strategy() { @Override public void validate(ColumnDTO[] columns) throws IllegalArgumentException { for (String[] fieldName : fieldNames) { - PartitionUtils.validateFieldExist(columns, fieldName); + PartitionUtils.validateFieldExistence(columns, fieldName); } } diff --git a/common/src/main/java/com/datastrato/graviton/dto/rel/PartitionUtils.java b/common/src/main/java/com/datastrato/graviton/dto/rel/PartitionUtils.java index f354c14a83..a60d45a5a1 100644 --- a/common/src/main/java/com/datastrato/graviton/dto/rel/PartitionUtils.java +++ b/common/src/main/java/com/datastrato/graviton/dto/rel/PartitionUtils.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.lang3.ArrayUtils; public class PartitionUtils { public static Transform[] toTransforms(Partition[] partitions) { @@ -65,30 +66,6 @@ public static Partition[] toPartitions(Transform[] transforms) { return Arrays.stream(transforms).map(PartitionUtils::toPartition).toArray(Partition[]::new); } - private static Transform toTransform(Partition partition) { - switch (partition.strategy()) { - case IDENTITY: - return identity(((SimplePartitionDTO) partition).getFieldName()); - case YEAR: - case MONTH: - case DAY: - case HOUR: - return function( - partition.strategy().name().toLowerCase(), - new Transform[] {field(((SimplePartitionDTO) partition).getFieldName())}); - case LIST: - // TODO(minghuang): add Assignments after Transform support partition value - return list(((ListPartitionDTO) partition).getFieldNames()); - case RANGE: - // TODO(minghuang): add Ranges after Transform support partition value - return range(((RangePartitionDTO) partition).getFieldName()); - case EXPRESSION: - return toTransform(((ExpressionPartitionDTO) partition).getExpression()); - } - throw new IllegalArgumentException( - "Unsupported partition type " + partition.getClass().getCanonicalName()); - } - public static Transform toTransform(ExpressionPartitionDTO.Expression expression) { switch (expression.expressionType()) { case FIELD: @@ -106,7 +83,43 @@ public static Transform toTransform(ExpressionPartitionDTO.Expression expression "Unsupported expression type " + expression.getClass().getCanonicalName()); } - public static Expression.Literal getLiteral( + public static ExpressionPartitionDTO.Expression toExpression(Transform transform) { + if (transform instanceof Transforms.NamedReference) { + return new ExpressionPartitionDTO.FieldExpression.Builder() + .withFieldName(((Transforms.NamedReference) transform).value()) + .build(); + } else if (transform instanceof Transforms.LiteralReference) { + Expression.Literal literal = ((Transforms.LiteralReference) transform).value(); + return new ExpressionPartitionDTO.LiteralExpression.Builder() + .withType(literal.getType()) + .withValue(literal.accept(LiteralStringConverter.INSTANCE)) + .build(); + } else { + return new ExpressionPartitionDTO.FunctionExpression.Builder() + .withFuncName(transform.name()) + .withArgs( + Arrays.stream(transform.arguments()) + .map(PartitionUtils::toExpression) + .toArray(ExpressionPartitionDTO.Expression[]::new)) + .build(); + } + } + + public static void validateFieldExistence(ColumnDTO[] columns, String[] fieldName) + throws IllegalArgumentException { + Preconditions.checkArgument(ArrayUtils.isNotEmpty(columns), "columns cannot be null or empty"); + + List partitionColumn = + Arrays.stream(columns) + .filter(c -> c.name().equals(fieldName[0])) + .collect(Collectors.toList()); + Preconditions.checkArgument( + partitionColumn.size() == 1, "partition field %s not found in table", fieldName[0]); + + // TODO: should validate nested fieldName after column type support namedStruct + } + + private static Expression.Literal getLiteral( ExpressionPartitionDTO.LiteralExpression literalExpression) { LiteralConverter literalConverter = new LiteralConverter(literalExpression.getValue()); return literalExpression.getType().accept(literalConverter); @@ -189,73 +202,28 @@ public Expression.Literal visit(Type.Decimal type) throws RuntimeException { } } - private static Partition toPartition(Transform transform) { - if (transform instanceof Transforms.NamedReference) { - return new SimplePartitionDTO.Builder() - .withStrategy(IDENTITY) - .withFieldName(((Transforms.NamedReference) transform).value()) - .build(); - } - - if (transform instanceof Transforms.FunctionTrans) { - Transform[] arguments = transform.arguments(); - switch (transform.name().toLowerCase()) { - case NAME_OF_YEAR: - case NAME_OF_MONTH: - case NAME_OF_DAY: - case NAME_OF_HOUR: - if (arguments.length == 1 && arguments[0] instanceof Transforms.NamedReference) { - return new SimplePartitionDTO.Builder() - .withStrategy(Partition.Strategy.valueOf(transform.name().toUpperCase())) - .withFieldName(((Transforms.NamedReference) arguments[0]).value()) - .build(); - } - case NAME_OF_LIST: - if (Arrays.stream(arguments).allMatch(arg -> arg instanceof Transforms.NamedReference)) { - return new ListPartitionDTO.Builder() - .withFieldNames( - Arrays.stream(arguments) - .map(arg -> ((Transforms.NamedReference) arg).value()) - .toArray(String[][]::new)) - // TODO(minghuang): add Assignments after Transform support partition value - .build(); - } - case NAME_OF_RANGE: - if (arguments.length == 1 && arguments[0] instanceof Transforms.NamedReference) { - return new RangePartitionDTO.Builder() - .withFieldName(((Transforms.NamedReference) arguments[0]).value()) - // TODO(minghuang): add Ranges after Transform support partition value - .build(); - } - default: - return new ExpressionPartitionDTO.Builder(toExpression(transform)).build(); - } + private static Transform toTransform(Partition partition) { + switch (partition.strategy()) { + case IDENTITY: + return identity(((SimplePartitionDTO) partition).getFieldName()); + case YEAR: + case MONTH: + case DAY: + case HOUR: + return function( + partition.strategy().name().toLowerCase(), + new Transform[] {field(((SimplePartitionDTO) partition).getFieldName())}); + case LIST: + // TODO(minghuang): add Assignments after Transform support partition value + return list(((ListPartitionDTO) partition).getFieldNames()); + case RANGE: + // TODO(minghuang): add Ranges after Transform support partition value + return range(((RangePartitionDTO) partition).getFieldName()); + case EXPRESSION: + return toTransform(((ExpressionPartitionDTO) partition).getExpression()); } - throw new IllegalArgumentException( - "Unsupported transform type " + transform.getClass().getCanonicalName()); - } - - public static ExpressionPartitionDTO.Expression toExpression(Transform transform) { - if (transform instanceof Transforms.NamedReference) { - return new ExpressionPartitionDTO.FieldExpression.Builder() - .withFieldName(((Transforms.NamedReference) transform).value()) - .build(); - } else if (transform instanceof Transforms.LiteralReference) { - Expression.Literal literal = ((Transforms.LiteralReference) transform).value(); - return new ExpressionPartitionDTO.LiteralExpression.Builder() - .withType(literal.getType()) - .withValue(literal.accept(LiteralStringConverter.INSTANCE)) - .build(); - } else { - return new ExpressionPartitionDTO.FunctionExpression.Builder() - .withFuncName(transform.name()) - .withArgs( - Arrays.stream(transform.arguments()) - .map(PartitionUtils::toExpression) - .toArray(ExpressionPartitionDTO.Expression[]::new)) - .build(); - } + "Unsupported partition type " + partition.getClass().getCanonicalName()); } private static class LiteralStringConverter @@ -359,18 +327,50 @@ public String visit(Expression.DecimalLiteral expr) throws UnsupportedOperationE } } - public static void validateFieldExist(ColumnDTO[] columns, String[] fieldName) - throws IllegalArgumentException { - Preconditions.checkArgument( - columns != null && columns.length != 0, "columns cannot be null or empty"); + private static Partition toPartition(Transform transform) { + if (transform instanceof Transforms.NamedReference) { + return new SimplePartitionDTO.Builder() + .withStrategy(IDENTITY) + .withFieldName(((Transforms.NamedReference) transform).value()) + .build(); + } - List partitionColumn = - Arrays.stream(columns) - .filter(c -> c.name().equals(fieldName[0])) - .collect(Collectors.toList()); - Preconditions.checkArgument( - partitionColumn.size() == 1, "partition field %s not found in table", fieldName[0]); + if (transform instanceof Transforms.FunctionTrans) { + Transform[] arguments = transform.arguments(); + switch (transform.name().toLowerCase()) { + case NAME_OF_YEAR: + case NAME_OF_MONTH: + case NAME_OF_DAY: + case NAME_OF_HOUR: + if (arguments.length == 1 && arguments[0] instanceof Transforms.NamedReference) { + return new SimplePartitionDTO.Builder() + .withStrategy(Partition.Strategy.valueOf(transform.name().toUpperCase())) + .withFieldName(((Transforms.NamedReference) arguments[0]).value()) + .build(); + } + case NAME_OF_LIST: + if (Arrays.stream(arguments).allMatch(arg -> arg instanceof Transforms.NamedReference)) { + return new ListPartitionDTO.Builder() + .withFieldNames( + Arrays.stream(arguments) + .map(arg -> ((Transforms.NamedReference) arg).value()) + .toArray(String[][]::new)) + // TODO(minghuang): add Assignments after Transform support partition value + .build(); + } + case NAME_OF_RANGE: + if (arguments.length == 1 && arguments[0] instanceof Transforms.NamedReference) { + return new RangePartitionDTO.Builder() + .withFieldName(((Transforms.NamedReference) arguments[0]).value()) + // TODO(minghuang): add Ranges after Transform support partition value + .build(); + } + default: + return new ExpressionPartitionDTO.Builder(toExpression(transform)).build(); + } + } - // TODO: should validate nested fieldName after column type support namedStruct + throw new IllegalArgumentException( + "Unsupported transform type " + transform.getClass().getCanonicalName()); } } diff --git a/common/src/main/java/com/datastrato/graviton/dto/rel/RangePartitionDTO.java b/common/src/main/java/com/datastrato/graviton/dto/rel/RangePartitionDTO.java index c8eb65608a..0744e46b6a 100644 --- a/common/src/main/java/com/datastrato/graviton/dto/rel/RangePartitionDTO.java +++ b/common/src/main/java/com/datastrato/graviton/dto/rel/RangePartitionDTO.java @@ -4,7 +4,7 @@ */ package com.datastrato.graviton.dto.rel; -import static com.datastrato.graviton.dto.rel.PartitionUtils.validateFieldExist; +import static com.datastrato.graviton.dto.rel.PartitionUtils.validateFieldExistence; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -45,7 +45,7 @@ public Strategy strategy() { @Override public void validate(ColumnDTO[] columns) throws IllegalArgumentException { - validateFieldExist(columns, fieldName); + validateFieldExistence(columns, fieldName); } @EqualsAndHashCode diff --git a/common/src/main/java/com/datastrato/graviton/dto/rel/SimplePartitionDTO.java b/common/src/main/java/com/datastrato/graviton/dto/rel/SimplePartitionDTO.java index 6e62fcf00c..d190a50094 100644 --- a/common/src/main/java/com/datastrato/graviton/dto/rel/SimplePartitionDTO.java +++ b/common/src/main/java/com/datastrato/graviton/dto/rel/SimplePartitionDTO.java @@ -4,7 +4,7 @@ */ package com.datastrato.graviton.dto.rel; -import static com.datastrato.graviton.dto.rel.PartitionUtils.validateFieldExist; +import static com.datastrato.graviton.dto.rel.PartitionUtils.validateFieldExistence; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -12,6 +12,10 @@ import lombok.EqualsAndHashCode; import lombok.Getter; +/** + * Represent some common and simple partitioning which only references single field, such as year, + * month, day, hour, etc. Mainly for the convenience of usage. + */ @EqualsAndHashCode(callSuper = false) public class SimplePartitionDTO implements Partition { @@ -39,7 +43,7 @@ public Strategy strategy() { @Override public void validate(ColumnDTO[] columns) throws IllegalArgumentException { - validateFieldExist(columns, fieldName); + validateFieldExistence(columns, fieldName); } public static class Builder { @@ -64,4 +68,39 @@ public SimplePartitionDTO build() { return new SimplePartitionDTO(strategy.name(), fieldName); } } + + public static SimplePartitionDTO identity(String[] fieldName) { + return new SimplePartitionDTO.Builder() + .withStrategy(Partition.Strategy.IDENTITY) + .withFieldName(fieldName) + .build(); + } + + public static SimplePartitionDTO year(String[] fieldName) { + return new SimplePartitionDTO.Builder() + .withStrategy(Partition.Strategy.YEAR) + .withFieldName(fieldName) + .build(); + } + + public static SimplePartitionDTO month(String[] fieldName) { + return new SimplePartitionDTO.Builder() + .withStrategy(Partition.Strategy.MONTH) + .withFieldName(fieldName) + .build(); + } + + public static SimplePartitionDTO day(String[] fieldName) { + return new SimplePartitionDTO.Builder() + .withStrategy(Partition.Strategy.DAY) + .withFieldName(fieldName) + .build(); + } + + public static SimplePartitionDTO hour(String[] fieldName) { + return new SimplePartitionDTO.Builder() + .withStrategy(Partition.Strategy.HOUR) + .withFieldName(fieldName) + .build(); + } } diff --git a/common/src/test/java/com/datastrato/graviton/json/TestDTOJsonSerDe.java b/common/src/test/java/com/datastrato/graviton/json/TestDTOJsonSerDe.java index 5608f9ae5a..353a15d0e0 100644 --- a/common/src/test/java/com/datastrato/graviton/json/TestDTOJsonSerDe.java +++ b/common/src/test/java/com/datastrato/graviton/json/TestDTOJsonSerDe.java @@ -4,6 +4,14 @@ */ package com.datastrato.graviton.json; +import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.bucket; +import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.truncate; +import static com.datastrato.graviton.dto.rel.SimplePartitionDTO.day; +import static com.datastrato.graviton.dto.rel.SimplePartitionDTO.hour; +import static com.datastrato.graviton.dto.rel.SimplePartitionDTO.identity; +import static com.datastrato.graviton.dto.rel.SimplePartitionDTO.month; +import static com.datastrato.graviton.dto.rel.SimplePartitionDTO.year; + import com.datastrato.graviton.Catalog; import com.datastrato.graviton.dto.AuditDTO; import com.datastrato.graviton.dto.CatalogDTO; @@ -13,7 +21,6 @@ import com.datastrato.graviton.dto.rel.ListPartitionDTO; import com.datastrato.graviton.dto.rel.Partition; import com.datastrato.graviton.dto.rel.RangePartitionDTO; -import com.datastrato.graviton.dto.rel.SimplePartitionDTO; import com.datastrato.graviton.dto.rel.TableDTO; import com.fasterxml.jackson.databind.cfg.EnumFeature; import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; @@ -230,31 +237,11 @@ public void testPartitionDTOSerDe() throws Exception { String[] field2 = new String[] {"city"}; // construct simple partition - Partition identity = - new SimplePartitionDTO.Builder() - .withStrategy(Partition.Strategy.IDENTITY) - .withFieldName(field1) - .build(); - Partition hourPart = - new SimplePartitionDTO.Builder() - .withStrategy(Partition.Strategy.HOUR) - .withFieldName(field1) - .build(); - Partition dayPart = - new SimplePartitionDTO.Builder() - .withStrategy(Partition.Strategy.DAY) - .withFieldName(field1) - .build(); - Partition monthPart = - new SimplePartitionDTO.Builder() - .withStrategy(Partition.Strategy.MONTH) - .withFieldName(field1) - .build(); - Partition yearPart = - new SimplePartitionDTO.Builder() - .withStrategy(Partition.Strategy.YEAR) - .withFieldName(field1) - .build(); + Partition identity = identity(field1); + Partition hourPart = hour(field1); + Partition dayPart = day(field1); + Partition monthPart = month(field1); + Partition yearPart = year(field1); // construct list partition String[][] p1Value = {{"2023-04-01", "San Francisco"}, {"2023-04-01", "San Francisco"}}; @@ -293,9 +280,20 @@ public void testPartitionDTOSerDe() throws Exception { .withArgs(new ExpressionPartitionDTO.Expression[] {toDateFunc}) .build(); Partition expressionPart = new ExpressionPartitionDTO.Builder(monthFunc).build(); + Partition bucketPart = bucket(field1, 10); + Partition truncatePart = truncate(field2, 20); Partition[] partitions = { - identity, hourPart, dayPart, monthPart, yearPart, listPart, rangePart, expressionPart + identity, + hourPart, + dayPart, + monthPart, + yearPart, + listPart, + rangePart, + expressionPart, + bucketPart, + truncatePart }; String serJson = JsonMapper.builder()