Skip to content

Commit

Permalink
Managed Transform protos & translation; Iceberg SchemaTransforms & tr…
Browse files Browse the repository at this point in the history
…anslation (#30910)

* iceberg write schematransform and test

* IcebergIO translation and tests

* unify iceberg urns and identifiers; update some comments

* replace icebergIO translation with iceberg schematransform translation; fix Schema::sorted to do recursive sorting

* removed new proto file and moved Managed URNs to beam_runner_api.proto; we now use SchemaTransformPayload for all schematransforms, including Managed; adding a version number to FileWriteResult encoding so that we can use it to fork in the future whhen needed

* Row and Schema snake_case <-> camelCase conversion logic

* Row sorted() util

* use Row::sorted to fetch Managed & Iceberg row configs

* use snake_case convention when translating transforms to spec; remove Managed and Iceberg urns from proto and use SCHEMA_TRANSFORM URN

* perform snake_case <-> camelCase conversions directly in TypedSchemaTransformProvider

* add SchemaTransformTranslation abstraction. when encountering a SCHEMA_TRANSFORM urn, fetch underlying identifier

* prioritize registered providers; remove snake_case <-> camelCase conversions from python side
  • Loading branch information
ahmedabu98 authored Apr 22, 2024
1 parent a0dad08 commit 37609ba
Show file tree
Hide file tree
Showing 83 changed files with 2,566 additions and 434 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ jobs:
- name: run Cross-Language Wrapper Validation script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit
gradle-command: :sdks:python:test-suites:direct:crossLanguageWrapperValidationPreCommit --info
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ message BuilderMethod {
bytes payload = 3;
}

// Payload for a Schema-aware PTransform.
// This is a transform that is aware of its input and output PCollection schemas
// and is configured using Beam Schema-compatible parameters.
// The information available in the payload can be used to instantiate the schema-aware transform.
message SchemaTransformPayload {
// The identifier of the SchemaTransform (typically a URN).
string identifier = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.stream.Collectors;
import javax.annotation.concurrent.Immutable;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBiMap;
Expand Down Expand Up @@ -326,7 +327,10 @@ public static Schema of(Field... fields) {
return Schema.builder().addFields(fields).build();
}

/** Returns an identical Schema with sorted fields. */
/**
* Returns an identical Schema with lexicographically sorted fields. Recursively sorts nested
* fields.
*/
public Schema sorted() {
// Create a new schema and copy over the appropriate Schema object attributes:
// {fields, uuid, options}
Expand All @@ -336,6 +340,16 @@ public Schema sorted() {
Schema sortedSchema =
this.fields.stream()
.sorted(Comparator.comparing(Field::getName))
.map(
field -> {
FieldType innerType = field.getType();
if (innerType.getRowSchema() != null) {
Schema innerSortedSchema = innerType.getRowSchema().sorted();
innerType = innerType.toBuilder().setRowSchema(innerSortedSchema).build();
return field.toBuilder().setType(innerType).build();
}
return field;
})
.collect(Schema.toSchema())
.withOptions(getOptions());
sortedSchema.setUUID(getUUID());
Expand Down Expand Up @@ -1451,4 +1465,42 @@ public int getFieldCount() {
public Options getOptions() {
return this.options;
}

/** Recursively converts all field names to `snake_case`. */
public Schema toSnakeCase() {
return this.getFields().stream()
.map(
field -> {
FieldType innerType = field.getType();
if (innerType.getRowSchema() != null) {
Schema innerSnakeCaseSchema = innerType.getRowSchema().toSnakeCase();
innerType = innerType.toBuilder().setRowSchema(innerSnakeCaseSchema).build();
field = field.toBuilder().setType(innerType).build();
}
return field
.toBuilder()
.setName(CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName()))
.build();
})
.collect(toSchema());
}

/** Recursively converts all field names to `lowerCamelCase`. */
public Schema toCamelCase() {
return this.getFields().stream()
.map(
field -> {
FieldType innerType = field.getType();
if (innerType.getRowSchema() != null) {
Schema innerCamelCaseSchema = innerType.getRowSchema().toCamelCase();
innerType = innerType.toBuilder().setRowSchema(innerCamelCaseSchema).build();
field = field.toBuilder().setType(innerType).build();
}
return field
.toBuilder()
.setName(CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()))
.build();
})
.collect(toSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,29 @@ public <T> SchemaCoder<T> getSchemaCoder(TypeDescriptor<T> typeDescriptor)
getFromRowFunction(typeDescriptor));
}

/**
* Retrieve a registered {@link SchemaProvider} for a given {@link TypeDescriptor}. If no schema
* exists, throws {@link * NoSuchSchemaException}.
*/
public <T> SchemaProvider getSchemaProvider(TypeDescriptor<T> typeDescriptor)
throws NoSuchSchemaException {
for (SchemaProvider provider : providers) {
Schema schema = provider.schemaFor(typeDescriptor);
if (schema != null) {
return provider;
}
}
throw new NoSuchSchemaException();
}

/**
* Retrieve a registered {@link SchemaProvider} for a given {@link Class}. If no schema exists,
* throws {@link * NoSuchSchemaException}.
*/
public <T> SchemaProvider getSchemaProvider(Class<T> clazz) throws NoSuchSchemaException {
return getSchemaProvider(TypeDescriptor.of(clazz));
}

private <ReturnT> ReturnT getProviderResult(Function<SchemaProvider, ReturnT> f)
throws NoSuchSchemaException {
for (SchemaProvider provider : providers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@ public ProviderAndDescriptor(
});
}

/**
* Retrieves the underlying {@link SchemaProvider} for the given {@link TypeDescriptor}. If no
* provider is found, returns null.
*/
public @Nullable <T> SchemaProvider getUnderlyingSchemaProvider(
TypeDescriptor<T> typeDescriptor) {
ProviderAndDescriptor providerAndDescriptor = getSchemaProvider(typeDescriptor);
return providerAndDescriptor != null ? providerAndDescriptor.schemaProvider : null;
}

/**
* Retrieves the underlying {@link SchemaProvider} for the given {@link Class}. If no provider
* is found, returns null.
*/
public @Nullable <T> SchemaProvider getUnderlyingSchemaProvider(Class<T> clazz) {
return getUnderlyingSchemaProvider(TypeDescriptor.of(clazz));
}

@Override
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
ProviderAndDescriptor providerAndDescriptor = getSchemaProvider(typeDescriptor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
import static org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import static org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.util.construction.BeamUrns;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@link TransformPayloadTranslator} implementation that translates between a Java {@link
* SchemaTransform} and a protobuf payload for that transform.
*/
public class SchemaTransformTranslation {
public abstract static class SchemaTransformPayloadTranslator<T extends SchemaTransform>
implements TransformPayloadTranslator<T> {
public abstract SchemaTransformProvider provider();

@Override
public String getUrn() {
return BeamUrns.getUrn(SCHEMA_TRANSFORM);
}

@Override
@SuppressWarnings("argument")
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, T> application, SdkComponents components) throws IOException {
SchemaApi.Schema expansionSchema =
SchemaTranslation.schemaToProto(provider().configurationSchema(), true);
Row configRow = toConfigRow(application.getTransform());
ByteArrayOutputStream os = new ByteArrayOutputStream();
RowCoder.of(provider().configurationSchema()).encode(configRow, os);

return FunctionSpec.newBuilder()
.setUrn(getUrn())
.setPayload(
ExternalTransforms.SchemaTransformPayload.newBuilder()
.setIdentifier(provider().identifier())
.setConfigurationSchema(expansionSchema)
.setConfigurationRow(ByteString.copyFrom(os.toByteArray()))
.build()
.toByteString())
.build();
}

@Override
public T fromConfigRow(Row configRow, PipelineOptions options) {
return (T) provider().from(configRow);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.schemas.annotations.DefaultSchema.DefaultSchemaProvider;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaProvider;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.values.Row;

Expand All @@ -37,6 +41,13 @@
*
* <p>ConfigT should be available in the SchemaRegistry.
*
* <p>{@link #configurationSchema()} produces a configuration {@link Schema} that is inferred from
* {@code ConfigT} using the SchemaRegistry. A Beam {@link Row} can still be used produce a {@link
* SchemaTransform} using {@link #from(Row)}, as long as the Row fits the configuration Schema.
*
* <p>NOTE: The inferred field names in the configuration {@link Schema} and {@link Row} follow the
* {@code snake_case} naming convention.
*
* <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as
* we provide implementations for more standard Beam transforms. We provide no backwards
* compatibility guarantees and it should not be implemented outside of the Beam repository.
Expand Down Expand Up @@ -77,7 +88,8 @@ Optional<List<String>> dependencies(ConfigT configuration, PipelineOptions optio
public final Schema configurationSchema() {
try {
// Sort the fields by name to ensure a consistent schema is produced
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted();
// We also establish a `snake_case` convention for all SchemaTransform configurations
return SchemaRegistry.createDefault().getSchema(configurationClass()).sorted().toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for "
Expand All @@ -86,6 +98,10 @@ public final Schema configurationSchema() {
}
}

/**
* Produces a {@link SchemaTransform} from a Row configuration. Row fields are expected to have
* `snake_case` naming convention.
*/
@Override
public final SchemaTransform from(Row configuration) {
return from(configFromRow(configuration));
Expand All @@ -98,9 +114,20 @@ public final Optional<List<String>> dependencies(Row configuration, PipelineOpti

private ConfigT configFromRow(Row configuration) {
try {
return SchemaRegistry.createDefault()
.getFromRowFunction(configurationClass())
.apply(configuration);
SchemaRegistry registry = SchemaRegistry.createDefault();

// Configuration objects handled by the AutoValueSchema provider will expect Row fields with
// camelCase naming convention
SchemaProvider schemaProvider = registry.getSchemaProvider(configurationClass());
if (schemaProvider.getClass().equals(DefaultSchemaProvider.class)
&& checkNotNull(
((DefaultSchemaProvider) schemaProvider)
.getUnderlyingSchemaProvider(configurationClass()))
.getClass()
.equals(AutoValueSchema.class)) {
configuration = configuration.toCamelCase();
}
return registry.getFromRowFunction(configurationClass()).apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException(
"Unable to find schema for " + identifier() + "SchemaTransformProvider's config");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,27 @@ public static Row toBeamRow(
}

@SuppressWarnings("nullness")
public static Row toBeamRow(Map<String, Object> yamlMap, Schema rowSchema, boolean toCamelCase) {
public static Row toBeamRow(
@Nullable Map<String, Object> map, Schema rowSchema, boolean toCamelCase) {
if (map == null || map.isEmpty()) {
List<Field> requiredFields =
rowSchema.getFields().stream()
.filter(field -> !field.getType().getNullable())
.collect(Collectors.toList());
if (requiredFields.isEmpty()) {
return Row.nullRow(rowSchema);
} else {
throw new IllegalArgumentException(
String.format(
"Received an empty Map, but output schema contains required fields: %s",
requiredFields));
}
}
return rowSchema.getFields().stream()
.map(
field ->
toBeamValue(
field,
yamlMap.get(maybeGetSnakeCase(field.getName(), toCamelCase)),
toCamelCase))
field, map.get(maybeGetSnakeCase(field.getName(), toCamelCase)), toCamelCase))
.collect(toRow(rowSchema));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.util.construction;

import static org.apache.beam.model.pipeline.v1.ExternalTransforms.ExpansionMethods.Enum.SCHEMA_TRANSFORM;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand All @@ -30,6 +31,7 @@
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
Expand Down Expand Up @@ -100,6 +102,7 @@ public class PTransformTranslation {
public static final String CONFIG_ROW_KEY = "config_row";

public static final String CONFIG_ROW_SCHEMA_KEY = "config_row_schema";
public static final String SCHEMATRANSFORM_URN_KEY = "schematransform_urn";

// DeprecatedPrimitives
/**
Expand Down Expand Up @@ -509,6 +512,14 @@ public RunnerApi.PTransform translate(
components.getEnvironmentIdFor(appliedPTransform.getResourceHints()));
}
}

if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) {
transformBuilder.putAnnotations(
SCHEMATRANSFORM_URN_KEY,
ByteString.copyFromUtf8(
ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload())
.getIdentifier()));
}
}

Row configRow = null;
Expand Down
Loading

0 comments on commit 37609ba

Please sign in to comment.