Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vanathi-g committed Jul 24, 2023
1 parent 9c45337 commit f7fd80d
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 40 deletions.
7 changes: 4 additions & 3 deletions wrangler-api/src/main/java/io/cdap/wrangler/api/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,15 @@ O execute(I rows, ExecutorContext context)

/**
* This method is used to get the updated schema of the data after the directive's transformation has been applied.
*
* @param schemaResolutionContext context containing necessary information for getting output schema
* @return output {@link Schema} of the transformed data
* @implNote By default, returns a null and the schema is inferred from the data when necessary.
* <p>For consistent handling, override for directives that perform column renames,
* column data type changes or column additions with specific schemas.</p>
* @param inputSchema input {@link Schema} of the data before transformation
* @return output {@link Schema} of the transformed data
*/
@Nullable
default Schema getOutputSchema(Schema inputSchema) {
default Schema getOutputSchema(SchemaResolutionContext schemaResolutionContext) {
// no op
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.api;

import io.cdap.cdap.api.data.schema.Schema;

/**
* Interface to pass contextual information related to getting or generating the output schema of a {@link Executor}
*/
public interface SchemaResolutionContext {
/**
* @return {@link Schema} of the input data before transformation
*/
Schema getInputSchema();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@
import io.cdap.wrangler.api.ReportErrorAndProceed;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.utils.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveOutputSchemaGenerator;
import io.cdap.wrangler.schema.DirectiveSchemaResolutionContext;
import io.cdap.wrangler.schema.TransientStoreKeys;
import io.cdap.wrangler.utils.RecordConvertor;
import io.cdap.wrangler.utils.RecordConvertorException;
import io.cdap.wrangler.utils.SchemaConverter;
import io.cdap.wrangler.utils.TransientStoreKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -192,7 +193,7 @@ private Schema getOutputSchema(Schema inputSchema, List<DirectiveOutputSchemaGen
Schema schema = inputSchema;
for (DirectiveOutputSchemaGenerator outputSchemaGenerator : outputSchemaGenerators) {
try {
schema = outputSchemaGenerator.getDirectiveOutputSchema(schema);
schema = outputSchemaGenerator.getDirectiveOutputSchema(new DirectiveSchemaResolutionContext(schema));
} catch (RecordConvertorException e) {
throw new RecipeException("Error while generating output schema for a directive: " + e, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
* the License.
*/

package io.cdap.wrangler.utils;
package io.cdap.wrangler.schema;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.Pair;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.SchemaResolutionContext;
import io.cdap.wrangler.utils.RecordConvertorException;
import io.cdap.wrangler.utils.SchemaConverter;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -59,7 +62,7 @@ public void addNewOutputFields(List<Row> output) {
outputFieldMap.put(fieldName, fieldValue);
}
} else {
outputFieldMap.putIfAbsent(fieldName, fieldValue);
outputFieldMap.put(fieldName, fieldValue);
}
}
}
Expand All @@ -68,20 +71,22 @@ public void addNewOutputFields(List<Row> output) {
/**
* Method to get the output schema of the directive. Returns a generated schema based on maintained map of fields
* only if directive does not return a custom output schema.
* @param inputSchema input {@link Schema} of the data before applying the directive
* @param context input {@link Schema} of the data before applying the directive
* @return {@link Schema} corresponding to the output data
*/
public Schema getDirectiveOutputSchema(Schema inputSchema) throws RecordConvertorException {
Schema directiveOutputSchema = directive.getOutputSchema(inputSchema);
return directiveOutputSchema != null ? directiveOutputSchema : generateDirectiveOutputSchema(inputSchema);
public Schema getDirectiveOutputSchema(SchemaResolutionContext context) throws RecordConvertorException {
Schema directiveOutputSchema = directive.getOutputSchema(context);
return directiveOutputSchema != null ? directiveOutputSchema :
generateDirectiveOutputSchema(context.getInputSchema());
}

// Given the schema from previous step and output of current directive, generates the directive output schema.
private Schema generateDirectiveOutputSchema(Schema inputSchema)
throws RecordConvertorException {
List<Schema.Field> outputFields = new LinkedList<>();
for (String fieldName : outputFieldMap.keySet()) {
Object fieldValue = outputFieldMap.get(fieldName);
List<Schema.Field> outputFields = new ArrayList<>();
for (Map.Entry<String, Object> field : outputFieldMap.entrySet()) {
String fieldName = field.getKey();
Object fieldValue = field.getValue();

Schema existing = inputSchema.getField(fieldName) != null ? inputSchema.getField(fieldName).getSchema() : null;
Schema generated = fieldValue != null && !isValidSchemaForValue(existing, fieldValue) ?
Expand All @@ -90,6 +95,9 @@ private Schema generateDirectiveOutputSchema(Schema inputSchema)
if (generated != null) {
outputFields.add(Schema.Field.of(fieldName, generated));
} else if (existing != null) {
if (!existing.isNullable()) {
existing = Schema.nullableOf(existing);
}
outputFields.add(Schema.Field.of(fieldName, existing));
} else {
outputFields.add(Schema.Field.of(fieldName, Schema.of(Schema.Type.NULL)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.schema;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.SchemaResolutionContext;

/**
* Context to pass information related to getting or generating the output schema of a {@link Directive}
*/
public class DirectiveSchemaResolutionContext implements SchemaResolutionContext {
private final Schema inputSchema;
public DirectiveSchemaResolutionContext(Schema inputSchema) {
this.inputSchema = inputSchema;
}

@Override
public Schema getInputSchema() {
return inputSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* the License.
*/

package io.cdap.wrangler.utils;
package io.cdap.wrangler.schema;

/**
* TransientStoreKeys for storing Workspace schema in TransientStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.cdap.wrangler.api.RecipePipeline;
import io.cdap.wrangler.api.Row;
import io.cdap.wrangler.api.TransientVariableScope;
import io.cdap.wrangler.utils.TransientStoreKeys;
import io.cdap.wrangler.schema.TransientStoreKeys;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -120,7 +120,7 @@ public void testOutputSchemaGeneration() throws Exception {
);
Schema expectedSchema = Schema.recordOf(
"expected",
Schema.Field.of("decimal_col", Schema.decimalOf(10, 2)),
Schema.Field.of("decimal_col", Schema.nullableOf(Schema.decimalOf(10, 2))),
Schema.Field.of("name", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("timestamp", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
Schema.Field.of("weight", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Expand Down Expand Up @@ -152,8 +152,8 @@ public void testOutputSchemaGeneration_doesNotDropNullColumn() throws Exception
String[] commands = new String[]{"set-type :id int"};
Schema expectedSchema = Schema.recordOf(
"expected",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("null_col", Schema.of(Schema.Type.STRING))
Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.INT))),
Schema.Field.of("null_col", Schema.nullableOf(Schema.of(Schema.Type.STRING)))
);
Row row = new Row();
row.add("id", "123");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@
import io.cdap.wrangler.registry.DirectiveRegistry;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
import io.cdap.wrangler.registry.UserDirectiveRegistry;
import io.cdap.wrangler.schema.TransientStoreKeys;
import io.cdap.wrangler.service.common.AbstractWranglerHandler;
import io.cdap.wrangler.statistics.BasicStatistics;
import io.cdap.wrangler.statistics.Statistics;
import io.cdap.wrangler.utils.TransientStoreKeys;
import io.cdap.wrangler.validator.ColumnNameValidator;
import io.cdap.wrangler.validator.Validator;
import io.cdap.wrangler.validator.ValidatorException;
Expand All @@ -64,7 +64,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -265,21 +264,6 @@ protected WorkspaceValidationResult getWorkspaceSummary(List<Row> rows) throws E
return new WorkspaceValidationResult(columnValidationResults, statistics);
}

/**
* Method to get the list of columns across all the given rows
* @param rows list of rows
* @return list of columns (union across columns in all rows)
*/
public static List<String> getAllColumns(List<Row> rows) {
Set<String> columns = new LinkedHashSet<>();
for (Row row : rows) {
for (int i = 0; i < row.width(); i++) {
columns.add(row.getColumn(i));
}
}
return new ArrayList<>(columns);
}

/**
* Creates a uber record after iterating through all rows.
*
Expand All @@ -301,5 +285,4 @@ public static Row createUberRecord(List<Row> rows) {
}
return uber;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@
import io.cdap.wrangler.proto.workspace.v2.WorkspaceUpdateRequest;
import io.cdap.wrangler.registry.DirectiveInfo;
import io.cdap.wrangler.registry.SystemDirectiveRegistry;
import io.cdap.wrangler.schema.TransientStoreKeys;
import io.cdap.wrangler.store.recipe.RecipeStore;
import io.cdap.wrangler.store.workspace.WorkspaceStore;
import io.cdap.wrangler.utils.ObjectSerDe;
import io.cdap.wrangler.utils.StructuredToRowTransformer;
import io.cdap.wrangler.utils.TransientStoreKeys;
import org.apache.commons.lang3.StringEscapeUtils;

import java.net.HttpURLConnection;
Expand Down

0 comments on commit f7fd80d

Please sign in to comment.