Skip to content

Commit

Permalink
Merge branch 'data-integrations:develop' into Import_e2e_Wrangler_plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
bharatgulati authored Jul 27, 2023
2 parents 03c1588 + 6e5814c commit 07d8405
Show file tree
Hide file tree
Showing 11 changed files with 440 additions and 65 deletions.
18 changes: 17 additions & 1 deletion wrangler-api/src/main/java/io/cdap/wrangler/api/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package io.cdap.wrangler.api;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.wrangler.api.annotations.PublicEvolving;

import java.io.Serializable;
import javax.annotation.Nullable;

/**
* A interface defining the wrangle Executor in the wrangling {@link RecipePipeline}.
Expand Down Expand Up @@ -80,5 +82,19 @@ O execute(I rows, ExecutorContext context)
* correct at this phase of invocation.
*/
void destroy();
}

/**
* This method is used to get the updated schema of the data after the directive's transformation has been applied.
*
* @param schemaResolutionContext context containing necessary information for getting output schema
* @return output {@link Schema} of the transformed data
* @implNote By default, returns a null and the schema is inferred from the data when necessary.
* <p>For consistent handling, override for directives that perform column renames,
* column data type changes or column additions with specific schemas.</p>
*/
@Nullable
default Schema getOutputSchema(SchemaResolutionContext schemaResolutionContext) {
// no op
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.api;

import io.cdap.cdap.api.data.schema.Schema;

/**
* Interface to pass contextual information related to getting or generating the output schema of a {@link Executor}
*/
public interface SchemaResolutionContext {
/**
* @return {@link Schema} of the input data before transformation
*/
Schema getInputSchema();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
import io.cdap.wrangler.schema.TransientStoreKeys;
import io.cdap.wrangler.utils.RecordConvertor;
import io.cdap.wrangler.utils.RecordConvertorException;
import io.cdap.wrangler.utils.SchemaConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,6 +52,7 @@ public final class RecipePipelineExecutor implements RecipePipeline<Row, Structu

private final ErrorRecordCollector collector = new ErrorRecordCollector();
private final RecordConvertor convertor = new RecordConvertor();
private final SchemaConverter generator = new SchemaConverter();
private final RecipeParser recipeParser;
private final ExecutorContext context;
private List<Directive> directives;
Expand Down Expand Up @@ -103,6 +108,19 @@ public List<Row> execute(List<Row> rows) throws RecipeException {
List<Row> results = new ArrayList<>();
int i = 0;
int directiveIndex = 0;
// Initialize schema with input schema from TransientStore if running in service env (design-time) / testing env
boolean designTime = context != null && context.getEnvironment() != null &&
(context.getEnvironment().equals(ExecutorContext.Environment.SERVICE) ||
context.getEnvironment().equals(ExecutorContext.Environment.TESTING));
Schema inputSchema = designTime ? context.getTransientStore().get(TransientStoreKeys.INPUT_SCHEMA) : null;

List<DirectiveOutputSchemaGenerator> outputSchemaGenerators = new ArrayList<>();
if (designTime && inputSchema != null) {
for (Directive directive : directives) {
outputSchemaGenerators.add(new DirectiveOutputSchemaGenerator(directive, generator));
}
}

try {
collector.reset();
while (i < rows.size()) {
Expand All @@ -122,6 +140,9 @@ public List<Row> execute(List<Row> rows) throws RecipeException {
if (cumulativeRows.size() < 1) {
break;
}
if (designTime && inputSchema != null) {
outputSchemaGenerators.get(directiveIndex - 1).addNewOutputFields(cumulativeRows);
}
} catch (ReportErrorAndProceed e) {
messages.add(String.format("%s (ecode: %d)", e.getMessage(), e.getCode()));
collector
Expand All @@ -142,6 +163,11 @@ public List<Row> execute(List<Row> rows) throws RecipeException {
} catch (DirectiveExecutionException e) {
throw new RecipeException(e.getMessage(), e, i, directiveIndex);
}
// Schema generation
if (designTime && inputSchema != null) {
context.getTransientStore().set(TransientVariableScope.GLOBAL, TransientStoreKeys.OUTPUT_SCHEMA,
getOutputSchema(inputSchema, outputSchemaGenerators));
}
return results;
}

Expand All @@ -161,4 +187,17 @@ private List<Directive> getDirectives() throws RecipeException {
}
return directives;
}

private Schema getOutputSchema(Schema inputSchema, List<DirectiveOutputSchemaGenerator> outputSchemaGenerators)
throws RecipeException {
Schema schema = inputSchema;
for (DirectiveOutputSchemaGenerator outputSchemaGenerator : outputSchemaGenerators) {
try {
schema = outputSchemaGenerator.getDirectiveOutputSchema(new DirectiveSchemaResolutionContext(schema));
} catch (RecordConvertorException e) {
throw new RecipeException("Error while generating output schema for a directive: " + e, e);
}
}
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.schema;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.Pair;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.SchemaResolutionContext;
import io.cdap.wrangler.utils.RecordConvertorException;
import io.cdap.wrangler.utils.SchemaConverter;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

/**
* This class can be used to generate the output schema for the output data of a directive. It maintains a map of
* output fields present across all output rows after applying a directive. This map is used to generate the schema
* if the directive does not return a custom output schema.
*/
public class DirectiveOutputSchemaGenerator {
private final SchemaConverter schemaGenerator;
private final Map<String, Object> outputFieldMap;
private final Directive directive;

public DirectiveOutputSchemaGenerator(Directive directive, SchemaConverter schemaGenerator) {
this.directive = directive;
this.schemaGenerator = schemaGenerator;
outputFieldMap = new LinkedHashMap<>();
}

/**
* Method to add new fields from the given output to the map of fieldName --> value maintained for schema generation.
* A value is added to the map only if it is absent (or) if the existing value is null and given value is non-null
* @param output list of output {@link Row}s after applying directive.
*/
public void addNewOutputFields(List<Row> output) {
for (Row row : output) {
for (Pair<String, Object> field : row.getFields()) {
String fieldName = field.getFirst();
Object fieldValue = field.getSecond();
if (outputFieldMap.containsKey(fieldName)) {
// If existing value is null, override with this non-null value
if (fieldValue != null && outputFieldMap.get(fieldName) == null) {
outputFieldMap.put(fieldName, fieldValue);
}
} else {
outputFieldMap.put(fieldName, fieldValue);
}
}
}
}

/**
* Method to get the output schema of the directive. Returns a generated schema based on maintained map of fields
* only if directive does not return a custom output schema.
* @param context input {@link Schema} of the data before applying the directive
* @return {@link Schema} corresponding to the output data
*/
public Schema getDirectiveOutputSchema(SchemaResolutionContext context) throws RecordConvertorException {
Schema directiveOutputSchema = directive.getOutputSchema(context);
return directiveOutputSchema != null ? directiveOutputSchema :
generateDirectiveOutputSchema(context.getInputSchema());
}

// Given the schema from previous step and output of current directive, generates the directive output schema.
private Schema generateDirectiveOutputSchema(Schema inputSchema)
throws RecordConvertorException {
List<Schema.Field> outputFields = new ArrayList<>();
for (Map.Entry<String, Object> field : outputFieldMap.entrySet()) {
String fieldName = field.getKey();
Object fieldValue = field.getValue();

Schema existing = inputSchema.getField(fieldName) != null ? inputSchema.getField(fieldName).getSchema() : null;
Schema generated = fieldValue != null && !isValidSchemaForValue(existing, fieldValue) ?
schemaGenerator.getSchema(fieldValue, fieldName) : null;

if (generated != null) {
outputFields.add(Schema.Field.of(fieldName, generated));
} else if (existing != null) {
if (!existing.isNullable()) {
existing = Schema.nullableOf(existing);
}
outputFields.add(Schema.Field.of(fieldName, existing));
} else {
outputFields.add(Schema.Field.of(fieldName, Schema.of(Schema.Type.NULL)));
}
}
return Schema.recordOf("output", outputFields);
}

// Checks whether the provided input schema is of valid type for given object
private boolean isValidSchemaForValue(@Nullable Schema schema, Object value) throws RecordConvertorException {
if (schema == null) {
return false;
}
Schema generated = schemaGenerator.getSchema(value, "temp_field_name");
generated = generated.isNullable() ? generated.getNonNullable() : generated;
schema = schema.isNullable() ? schema.getNonNullable() : schema;
return generated.getLogicalType() == schema.getLogicalType() && generated.getType() == schema.getType();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.schema;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.SchemaResolutionContext;

/**
* Context to pass information related to getting or generating the output schema of a {@link Directive}
*/
public class DirectiveSchemaResolutionContext implements SchemaResolutionContext {
private final Schema inputSchema;
public DirectiveSchemaResolutionContext(Schema inputSchema) {
this.inputSchema = inputSchema;
}

@Override
public Schema getInputSchema() {
return inputSchema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.schema;

/**
* TransientStoreKeys for storing Workspace schema in TransientStore
*/
public final class TransientStoreKeys {
public static final String INPUT_SCHEMA = "ws_input_schema";
public static final String OUTPUT_SCHEMA = "ws_output_schema";

private TransientStoreKeys() {
throw new AssertionError("Cannot instantiate a static utility class.");
}
}
Loading

0 comments on commit 07d8405

Please sign in to comment.