Skip to content

Commit

Permalink
[#488] fix(api, Iceberg): Add truncate and bucket transform (#495)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
 - Add convenient usage for `bucket` and `truncate` partitioning
 - fix Iceberg catalog partitioning convert bug

### Why are the changes needed?
`bucket` and `truncate` partitioning are common to Iceberg

Fix: #488 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
UTs modified
  • Loading branch information
mchades authored Oct 12, 2023
1 parent 605f474 commit 4c601ff
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.graviton.rel.transforms;

import static io.substrait.expression.ExpressionCreator.i32;

import com.datastrato.graviton.rel.Column;
import com.google.common.annotations.VisibleForTesting;
import io.substrait.expression.Expression;
Expand All @@ -17,6 +19,8 @@ public class Transforms {
public static final String NAME_OF_MONTH = "month";
public static final String NAME_OF_DAY = "day";
public static final String NAME_OF_HOUR = "hour";
public static final String NAME_OF_TRUNCATE = "truncate";
public static final String NAME_OF_BUCKET = "bucket";
public static final String NAME_OF_LIST = "list";
public static final String NAME_OF_RANGE = "range";

Expand Down Expand Up @@ -80,6 +84,32 @@ public static FunctionTransform hour(String[] fieldName) {
return function(NAME_OF_HOUR, new Transform[] {field(fieldName)});
}

/**
* Creates a bucket partitioning by the given field name and number of buckets.
*
* @param fieldName The field name to partition by.
* @param numBuckets The number of buckets.
* @return The bucket partitioning.
*/
public static FunctionTransform bucket(String[] fieldName, int numBuckets) {
Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, numBuckets));
Transforms.NamedReference field = Transforms.field(fieldName);
return function(NAME_OF_BUCKET, new Transform[] {field, bucketNum});
}

/**
* Creates a truncate partitioning by the given field name and width.
*
* @param fieldName The field name to partition by.
* @param width The width.
* @return The truncate partitioning.
*/
public static FunctionTransform truncate(String[] fieldName, int width) {
Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, width));
Transforms.NamedReference field = Transforms.field(fieldName);
return function(NAME_OF_TRUNCATE, new Transform[] {field, bucketNum});
}

/**
* Creates a list partitioning by the given field names. For dynamically partitioned tables only.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.converter;

import static io.substrait.expression.ExpressionCreator.i32;

import com.datastrato.graviton.rel.transforms.Transform;
import com.datastrato.graviton.rel.transforms.Transforms;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -38,17 +36,12 @@ public Transform identity(String sourceName, int sourceId) {

@Override
public Transform bucket(String sourceName, int sourceId, int numBuckets) {
Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, numBuckets));
Transforms.NamedReference field = Transforms.field(new String[] {idToName.get(sourceId)});
// bucket(fieldName, bucketNum)
return Transforms.function("bucket", new Transform[] {field, bucketNum});
return Transforms.bucket(new String[] {idToName.get(sourceId)}, numBuckets);
}

@Override
public Transform truncate(String sourceName, int sourceId, int width) {
Transforms.LiteralReference bucketNum = Transforms.literal(i32(false, width));
Transforms.NamedReference field = Transforms.field(new String[] {idToName.get(sourceId)});
return Transforms.function("truncate", new Transform[] {field, bucketNum});
return Transforms.truncate(new String[] {idToName.get(sourceId)}, width);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.converter;

import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_BUCKET;
import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_TRUNCATE;

import com.datastrato.graviton.catalog.lakehouse.iceberg.IcebergTable;
import com.datastrato.graviton.rel.transforms.Transform;
import com.datastrato.graviton.rel.transforms.Transforms;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.substrait.expression.ImmutableExpression;
import java.util.Arrays;
import java.util.Locale;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;

Expand Down Expand Up @@ -54,20 +55,10 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition
String colName = String.join(DOT, fieldName);
builder.identity(colName);
} else if (transform instanceof Transforms.FunctionTrans) {
Preconditions.checkArgument(
transform.arguments().length == 1,
"Iceberg partition does not support nested field",
transform);
String colName =
Arrays.stream(transform.arguments())
.map(t -> ((Transforms.NamedReference) t).value()[0])
.collect(Collectors.joining(DOT));
String.join(DOT, ((Transforms.NamedReference) transform.arguments()[0]).value());
switch (transform.name().toLowerCase(Locale.ROOT)) {
// TODO minghuang add support for other transforms.
case "identity":
builder.identity(colName);
break;
case "bucket":
case NAME_OF_BUCKET:
builder.bucket(colName, findWidth(transform));
break;
case Transforms.NAME_OF_YEAR:
Expand All @@ -82,7 +73,7 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition
case Transforms.NAME_OF_HOUR:
builder.hour(colName);
break;
case "truncate":
case NAME_OF_TRUNCATE:
builder.truncate(colName, findWidth(transform));
break;
default:
Expand All @@ -97,9 +88,8 @@ public static PartitionSpec toPartitionSpec(Schema schema, Transform[] partition
}

private static int findWidth(Transform transform) {
Preconditions.checkArgument(
transform.arguments().length == 1, "Transform with multiple arguments is not supported");
Transform expr = transform.arguments()[0];
// transform here format is: truncate(fieldName, width) or bucket(fieldName, numBuckets)
Transform expr = transform.arguments()[1];
if (expr instanceof Transforms.LiteralReference) {
Transforms.LiteralReference literalReference = (Transforms.LiteralReference) expr;
if (literalReference.value() instanceof ImmutableExpression.I8Literal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.converter;

import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_BUCKET;
import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_TRUNCATE;

import com.datastrato.graviton.rel.SortOrder;
import com.datastrato.graviton.rel.transforms.Transform;
import com.datastrato.graviton.rel.transforms.Transforms;
Expand Down Expand Up @@ -54,15 +57,14 @@ public static org.apache.iceberg.SortOrder toSortOrder(Schema schema, SortOrder[
.collect(Collectors.joining(DOT));
UnboundTerm<Object> expression;
switch (transform.name().toLowerCase(Locale.ROOT)) {
// TODO minghuang - add more functions implementation.
case "bucket":
case NAME_OF_BUCKET:
int numBuckets =
((Expression.I32Literal)
((Transforms.LiteralReference) transform.arguments()[0]).value())
.value();
expression = Expressions.bucket(colName, numBuckets);
break;
case "truncate":
case NAME_OF_TRUNCATE:
int width =
((Expression.I32Literal)
((Transforms.LiteralReference) transform.arguments()[0]).value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg;

import static com.datastrato.graviton.rel.transforms.Transforms.bucket;
import static com.datastrato.graviton.rel.transforms.Transforms.day;
import static com.datastrato.graviton.rel.transforms.Transforms.identity;
import static com.datastrato.graviton.rel.transforms.Transforms.truncate;

import com.datastrato.graviton.NameIdentifier;
import com.datastrato.graviton.Namespace;
Expand Down Expand Up @@ -218,7 +220,12 @@ public void testCreatePartitionedIcebergTable() {
.build();
Column[] columns = new Column[] {col1, col2};

Transform[] partitions = new Transform[] {day(new String[] {col2.name()})};
Transform[] partitions =
new Transform[] {
day(new String[] {col2.name()}),
bucket(new String[] {col1.name()}, 10),
truncate(new String[] {col1.name()}, 2)
};

Table table =
icebergCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
*/
package com.datastrato.graviton.catalog.lakehouse.iceberg.converter;

import static com.datastrato.graviton.rel.transforms.Transforms.bucket;
import static com.datastrato.graviton.rel.transforms.Transforms.day;
import static com.datastrato.graviton.rel.transforms.Transforms.hour;
import static com.datastrato.graviton.rel.transforms.Transforms.identity;
import static com.datastrato.graviton.rel.transforms.Transforms.month;
import static com.datastrato.graviton.rel.transforms.Transforms.truncate;

import com.datastrato.graviton.rel.transforms.Transform;
import java.util.Arrays;
Expand Down Expand Up @@ -35,13 +37,19 @@ void testToPartitionSpec() {
ArrayUtils.add(nestedFields, createNestedField(6, "col_6", Types.TimestampType.withZone()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(7, "col_7", Types.DateType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(8, "col_8", Types.LongType.get()));
nestedFields =
ArrayUtils.add(nestedFields, createNestedField(9, "col_9", Types.StringType.get()));

Transform[] partitioning =
new Transform[] {
identity(new String[] {nestedFields[0].name()}),
day(new String[] {nestedFields[4].name()}),
hour(new String[] {nestedFields[5].name()}),
month(new String[] {nestedFields[6].name()})
month(new String[] {nestedFields[6].name()}),
bucket(new String[] {nestedFields[4].name()}, 10),
truncate(new String[] {nestedFields[7].name()}, 20),
};

Schema schema = new Schema(nestedFields);
Expand All @@ -50,7 +58,7 @@ void testToPartitionSpec() {

List<PartitionField> fields = partitionSpec.fields();
Assertions.assertEquals(partitioning.length, fields.size());
Assertions.assertEquals(4, fields.size());
Assertions.assertEquals(6, fields.size());
Map<Integer, String> idToName = schema.idToName();
Map<String, Types.NestedField> nestedFieldByName =
Arrays.stream(nestedFields).collect(Collectors.toMap(Types.NestedField::name, v -> v));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.ExpressionType.FIELD;
import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.ExpressionType.FUNCTION;
import static com.datastrato.graviton.dto.rel.ExpressionPartitionDTO.ExpressionType.LITERAL;
import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_BUCKET;
import static com.datastrato.graviton.rel.transforms.Transforms.NAME_OF_TRUNCATE;

import com.datastrato.graviton.json.JsonUtils;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -17,6 +19,7 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.base.Preconditions;
import io.substrait.type.Type;
import io.substrait.type.TypeCreator;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.logging.log4j.util.Strings;
Expand Down Expand Up @@ -46,7 +49,7 @@ private void validateExpression(ColumnDTO[] columns, Expression expression) {
switch (expression.expressionType()) {
case FIELD:
FieldExpression fieldExpression = (FieldExpression) expression;
PartitionUtils.validateFieldExist(columns, fieldExpression.fieldName);
PartitionUtils.validateFieldExistence(columns, fieldExpression.fieldName);
break;
case FUNCTION:
validateExpression(columns, expression);
Expand Down Expand Up @@ -74,6 +77,38 @@ public ExpressionPartitionDTO build() {
}
}

public static ExpressionPartitionDTO bucket(String[] fieldName, int numBuckets) {
return new ExpressionPartitionDTO.Builder(
new FunctionExpression.Builder()
.withFuncName(NAME_OF_BUCKET)
.withArgs(
new Expression[] {
new FieldExpression.Builder().withFieldName(fieldName).build(),
new LiteralExpression.Builder()
.withType(TypeCreator.REQUIRED.I32)
.withValue(String.valueOf(numBuckets))
.build()
})
.build())
.build();
}

public static ExpressionPartitionDTO truncate(String[] fieldName, int width) {
return new ExpressionPartitionDTO.Builder(
new FunctionExpression.Builder()
.withFuncName(NAME_OF_TRUNCATE)
.withArgs(
new Expression[] {
new FieldExpression.Builder().withFieldName(fieldName).build(),
new LiteralExpression.Builder()
.withType(TypeCreator.REQUIRED.I32)
.withValue(String.valueOf(width))
.build()
})
.build())
.build();
}

enum ExpressionType {
@JsonProperty("field")
FIELD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Strategy strategy() {
@Override
public void validate(ColumnDTO[] columns) throws IllegalArgumentException {
for (String[] fieldName : fieldNames) {
PartitionUtils.validateFieldExist(columns, fieldName);
PartitionUtils.validateFieldExistence(columns, fieldName);
}
}

Expand Down
Loading

0 comments on commit 4c601ff

Please sign in to comment.