Skip to content

Commit

Permalink
Merge pull request #653 from data-integrations/Directive-validation
Browse files Browse the repository at this point in the history
Directive validation
  • Loading branch information
shrverma authored Jul 31, 2023
2 parents 1e1d898 + 5ce676d commit 36326f4
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* </code>
*/
public interface Directive extends Executor<List<Row>, List<Row>>, EntityMetrics,
LinearRelationalTransform {
DirectiveRelationalTransform {
/**
* This defines a interface variable that is static and final for specify
* the {@code type} of the plugin this interface would provide.
Expand Down Expand Up @@ -131,10 +131,4 @@ default List<EntityCountMetric> getCountMetrics() {
return null;
}

@Override
default Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
// no-op
return relation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.wrangler.api;

import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.cdap.etl.api.relational.RelationalTransform;

/**
* {@link DirectiveRelationalTransform} provides relational transform support for
* wrangler directives.
*/
public interface DirectiveRelationalTransform extends LinearRelationalTransform {

/**
* Implementation of linear relational transform for each supported directive.
*
* @param relationalTranformContext transformation context with engine, input and output parameters
* @param relation input relation upon which the transformation is applied.
* @return transformed relation as the output relation. By default, returns an Invalid relation
* for unsupported directives.
*/
default Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
return new InvalidRelation("SQL execution for the directive is currently not supported.");
}

/**
* Indicates whether the directive is supported by relational transformation or not.
*
* @return boolean value for the directive SQL support.
* By default, returns false, indicating that the directive is currently not supported.
*/
default boolean isSQLSupported() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
return relation.setColumn(destination.value(), expressionFactory.get().compile(source.value()));
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
}
return relation;
}

@Override
public boolean isSQLSupported() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
return relation.select(keepCol);
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
.compile(String.format("CONCAT(%s,'%s',%s)", col1, delimiter, col2)));
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
return relation.dropColumn(source.value());
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,10 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
}
return relation.setColumn(col, expressionFactory.get().compile("LTRIM(" + col + ")"));
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
return relation.setColumn(column, expressionFactory.get().compile("RTRIM(" + column + ")"));
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
return relation.setColumn(column, expressionFactory.get().compile("initcap(" + column + ")"));
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
return relation.setColumn(column, expressionFactory.get().compile("TRIM(" + column + ")"));
}

@Override
public boolean isSQLSupported() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext,
return relation.setColumn(column, expressionFactory.get().compile("UPPER(" + column + ")"));
}

@Override
public boolean isSQLSupported() {
return true;
}

}
55 changes: 39 additions & 16 deletions wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@
import io.cdap.cdap.etl.api.relational.Expression;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.LinearRelationalTransform;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType;
import io.cdap.cdap.features.Feature;
import io.cdap.directives.aggregates.DefaultTransientStore;
import io.cdap.wrangler.api.CompileException;
import io.cdap.wrangler.api.CompileStatus;
import io.cdap.wrangler.api.Compiler;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
import io.cdap.wrangler.api.DirectiveLoadException;
import io.cdap.wrangler.api.DirectiveParseException;
import io.cdap.wrangler.api.DirectiveRelationalTransform;
import io.cdap.wrangler.api.EntityCountMetric;
import io.cdap.wrangler.api.ErrorRecord;
import io.cdap.wrangler.api.ExecutorContext;
Expand Down Expand Up @@ -106,7 +106,7 @@
@Plugin(type = "transform")
@Name("Wrangler")
@Description("Wrangler - A interactive tool for data cleansing and transformation.")
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements LinearRelationalTransform {
public class Wrangler extends Transform<StructuredRecord, StructuredRecord> implements DirectiveRelationalTransform {
private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class);

// Configuration specifying the dataprep application and service name.
Expand All @@ -130,6 +130,9 @@ public class Wrangler extends Transform<StructuredRecord, StructuredRecord> impl
// Sql execution value
private static final String SQL_ENABLED = "yes";

// wrangler sql execution mode enabled or not
public boolean isSqlenabled = false;

// Plugin configuration.
private final Config config;

Expand Down Expand Up @@ -196,7 +199,7 @@ public void configurePipeline(PipelineConfigurer configurer) {
if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) {
validatePrecondition(config.getPreconditionSQL(), true, collector);
}
validateSQLModeDirectives(collector);
validateSQLUDDs(collector);
} else {
if (!config.containsMacro(Config.NAME_PRECONDITION)) {
validatePrecondition(config.getPreconditionJEXL(), false, collector);
Expand Down Expand Up @@ -248,6 +251,12 @@ public void configurePipeline(PipelineConfigurer configurer) {
}
}
}

// check if the directive is supported by SQL
if (checkSQLExecution(config)) {
validateSQLModeDirectives(collector, getDirectivesList(config));
}

} catch (CompileException e) {
collector.addFailure("Compilation error occurred : " + e.getMessage(), null);
} catch (DirectiveParseException e) {
Expand Down Expand Up @@ -355,10 +364,12 @@ public void initialize(TransformContext context) throws Exception {
context.getStageName()), e
);
}

// initialize the wrangler sql mode
isSqlenabled = checkSQLExecution(config);

// Check if jexl pre-condition is not null or empty and if so compile expression.
if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) {
if (!checkSQLExecution(config) && checkPreconditionNotEmpty(false)) {
if (!isSqlenabled && checkPreconditionNotEmpty(false)) {
try {
condition = new Precondition(config.getPreconditionJEXL());
} catch (PreconditionException e) {
Expand Down Expand Up @@ -415,7 +426,7 @@ public void transform(StructuredRecord input, Emitter<StructuredRecord> emitter)
}

// If pre-condition is set, then evaluate the precondition
if (!checkSQLExecution(config) && checkPreconditionNotEmpty(false)) {
if (!isSqlenabled && checkPreconditionNotEmpty(false)) {
boolean skip = condition.apply(row);
if (skip) {
getContext().getMetrics().count("precondition.filtered", 1);
Expand Down Expand Up @@ -523,7 +534,17 @@ private void validatePrecondition(String precondition, Boolean isConditionSQL, F
}
}

private void validateSQLModeDirectives(FailureCollector collector) {
private void validateSQLModeDirectives(FailureCollector collector, List<Directive> directives) {
for (Directive directive : directives) {
if (!directive.isSQLSupported()) {
collector.addFailure(String.format("%s directive is not supported by SQL execution.",
directive.define().getDirectiveName()), null)
.withConfigProperty(Config.NAME_DIRECTIVES);
}
}
}

private void validateSQLUDDs (FailureCollector collector) {
if (!Strings.isNullOrEmpty(config.getUDDs())) {
collector.addFailure("UDDs are not supported for precondition of type SQL", null)
.withConfigProperty(Config.NAME_UDD);
Expand Down Expand Up @@ -561,6 +582,14 @@ private boolean checkSQLExecution(Config config) {
return false;
}

List<Directive> getDirectivesList(Config config) throws DirectiveParseException, RecipeException {
String recipe = config.getDirectives();
GrammarBasedParser parser = new GrammarBasedParser("default",
new MigrateToV2(recipe).migrate(), registry);
List<Directive> directives = parser.parse();
return directives;
}

/**
* This method creates a {@link CompositeDirectiveRegistry} and initializes the {@link RecipeParser}
* with {@link NoOpDirectiveContext}
Expand Down Expand Up @@ -599,8 +628,6 @@ public Relation transform(RelationalTranformContext relationalTranformContext, R
Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL());
Relation filteredRelation = relation.filter(filterExpression);

String recipe = config.getDirectives();

registry = SystemDirectiveRegistry.INSTANCE;
try {
registry.reload("default");
Expand All @@ -610,12 +637,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, R

List<Directive> directives = null;
try {
GrammarBasedParser parser = new GrammarBasedParser("default",
new MigrateToV2(recipe).migrate(), registry);
directives = parser.parse();
} catch (DirectiveParseException e) {
throw new RuntimeException(e);
} catch (RecipeException e) {
directives = getDirectivesList(config);
} catch (DirectiveParseException | RecipeException e) {
throw new RuntimeException(e);
}

Expand Down

0 comments on commit 36326f4

Please sign in to comment.