diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java index 2a199263b..78038afef 100644 --- a/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/Directive.java @@ -16,6 +16,8 @@ package io.cdap.wrangler.api; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.parser.UsageDefinition; import java.util.List; @@ -51,7 +53,8 @@ * } * */ -public interface Directive extends Executor, List>, EntityMetrics { +public interface Directive extends Executor, List>, EntityMetrics, + DirectiveRelationalTransform { /** * This defines a interface variable that is static and final for specify * the {@code type} of the plugin this interface would provide. @@ -126,4 +129,11 @@ default List getCountMetrics() { // no op return null; } + + @Override + default Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + // no-op + return relation; + } } diff --git a/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java b/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java new file mode 100644 index 000000000..943f564f6 --- /dev/null +++ b/wrangler-api/src/main/java/io/cdap/wrangler/api/DirectiveRelationalTransform.java @@ -0,0 +1,53 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.wrangler.api; + +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; + +/** + * {@link DirectiveRelationalTransform} provides relational transform support for + * wrangler directives. + */ +public interface DirectiveRelationalTransform extends LinearRelationalTransform { + + /** + * Implementation of linear relational transform for each supported directive. + * + * @param relationalTranformContext transformation context with engine, input and output parameters + * @param relation input relation upon which the transformation is applied. + * @return transformed relation as the output relation. By default, returns an Invalid relation + * for unsupported directives. + */ + default Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + return new InvalidRelation("SQL execution for the directive is currently not supported."); + } + + /** + * Indicates whether the directive is supported by relational transformation or not. + * + * @return boolean value for the directive SQL support. + * By default, returns false, indicating that the directive is currently not supported. + */ + default boolean isSQLSupported() { + return false; + } + +} diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java b/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java index 2114ec2cf..6ff191863 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Drop.java @@ -19,6 +19,8 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -88,4 +90,18 @@ public Mutation lineage() { .drop(Many.of(columns)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + for (String col: columns) { + relation = relation.dropColumn(col); + } + return relation; + } + + @Override + public boolean isSQLSupported() { + return true; + } } diff --git a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java index d5e57ae69..f489f169f 100644 --- a/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java +++ b/wrangler-transform/src/main/java/io/cdap/wrangler/Wrangler.java @@ -38,7 +38,6 @@ import io.cdap.cdap.etl.api.relational.Expression; import io.cdap.cdap.etl.api.relational.ExpressionFactory; import io.cdap.cdap.etl.api.relational.InvalidRelation; -import io.cdap.cdap.etl.api.relational.LinearRelationalTransform; import io.cdap.cdap.etl.api.relational.Relation; import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.cdap.etl.api.relational.StringExpressionFactoryType; @@ -50,9 +49,11 @@ import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveLoadException; import io.cdap.wrangler.api.DirectiveParseException; +import io.cdap.wrangler.api.DirectiveRelationalTransform; import io.cdap.wrangler.api.EntityCountMetric; import io.cdap.wrangler.api.ErrorRecord; import io.cdap.wrangler.api.ExecutorContext; +import io.cdap.wrangler.api.RecipeException; import io.cdap.wrangler.api.RecipeParser; import io.cdap.wrangler.api.RecipePipeline; import io.cdap.wrangler.api.RecipeSymbol; @@ -103,7 +104,8 @@ @Plugin(type = "transform") @Name("Wrangler") @Description("Wrangler - A interactive tool for data cleansing and transformation.") -public class Wrangler extends Transform implements LinearRelationalTransform { +public class Wrangler extends Transform implements DirectiveRelationalTransform { + private static final Logger LOG = LoggerFactory.getLogger(Wrangler.class); // Configuration specifying the dataprep application and service name. @@ -124,6 +126,12 @@ public class Wrangler extends Transform impl private static final String PRECONDITION_LANGUAGE_JEXL = "jexl"; private static final String PRECONDITION_LANGUAGE_SQL = "sql"; + // Sql execution value + private static final String SQL_ENABLED = "yes"; + + // wrangler sql execution mode enabled or not + public boolean isSqlEnabled = false; + // Plugin configuration. private final Config config; @@ -172,7 +180,7 @@ public void configurePipeline(PipelineConfigurer configurer) { try { Schema iSchema = configurer.getStageConfigurer().getInputSchema(); if (!config.containsMacro(Config.NAME_FIELD) && !(config.getField().equals("*") - || config.getField().equals("#"))) { + || config.getField().equals("#"))) { validateInputSchema(iSchema, collector); } @@ -185,12 +193,22 @@ public void configurePipeline(PipelineConfigurer configurer) { } } - if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { - if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { + isSqlEnabled = checkSQLExecutionEnabled(config); + if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE) + || !config.containsMacro(Config.NAME_SQL_EXECUTION)) { + if (!config.containsMacro(Config.NAME_SQL_EXECUTION)) { + if (isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage())) { + collector.addFailure("JEXL Precondition is not supported when SQL execution is enabled.", null); + } + if (!isSqlEnabled && PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { + collector.addFailure("SQL Precondition is only supported when SQL execution is enabled.", null); + } + } + if (isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) { if (!config.containsMacro(Config.NAME_PRECONDITION_SQL)) { validatePrecondition(config.getPreconditionSQL(), true, collector); } - validateSQLModeDirectives(collector); + validateSQLUDDs(collector); } else { if (!config.containsMacro(Config.NAME_PRECONDITION)) { validatePrecondition(config.getPreconditionJEXL(), false, collector); @@ -248,6 +266,12 @@ public void configurePipeline(PipelineConfigurer configurer) { collector.addFailure(e.getMessage(), null); } + // If SQL Execution is enabled, check that the directives supports sql execution + if (isSqlEnabled) { + List sqlDirectives = getDirectivesList(config); + validateSQLModeDirectives(collector, sqlDirectives); + } + // Based on the configuration create output schema. try { if (!config.containsMacro(Config.NAME_SCHEMA)) { @@ -259,7 +283,7 @@ public void configurePipeline(PipelineConfigurer configurer) { } // Check if jexl pre-condition is not null or empty and if so compile expression. - if (!config.containsMacro(Config.NAME_PRECONDITION) && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { + if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION)) { if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) && checkPreconditionNotEmpty(false)) { try { @@ -352,7 +376,7 @@ public void initialize(TransformContext context) throws Exception { } // Check if jexl pre-condition is not null or empty and if so compile expression. - if (!config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { + if (!isSqlEnabled && !config.containsMacro(Config.NAME_PRECONDITION_LANGUAGE)) { if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) && checkPreconditionNotEmpty(false)) { try { @@ -411,7 +435,7 @@ public void transform(StructuredRecord input, Emitter emitter) } // If pre-condition is set, then evaluate the precondition - if (PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) + if (!isSqlEnabled && PRECONDITION_LANGUAGE_JEXL.equalsIgnoreCase(config.getPreconditionLanguage()) && checkPreconditionNotEmpty(false)) { boolean skip = condition.apply(row); if (skip) { @@ -520,12 +544,17 @@ private void validatePrecondition(String precondition, Boolean isConditionSQL, F } } - private void validateSQLModeDirectives(FailureCollector collector) { - if (!Strings.isNullOrEmpty(config.getDirectives())) { - collector.addFailure("Directives are not supported for precondition of type SQL", null) - .withConfigProperty(Config.NAME_DIRECTIVES); + private void validateSQLModeDirectives(FailureCollector collector, List directives) { + for (Directive directive : directives) { + if (!directive.isSQLSupported()) { + collector.addFailure(String.format("%s directive is not supported by SQL execution.", + directive.define().getDirectiveName()), null) + .withConfigProperty(Config.NAME_DIRECTIVES); + } } + } + private void validateSQLUDDs(FailureCollector collector) { if (!Strings.isNullOrEmpty(config.getUDDs())) { collector.addFailure("UDDs are not supported for precondition of type SQL", null) .withConfigProperty(Config.NAME_UDD); @@ -544,6 +573,23 @@ private boolean checkPreconditionNotEmpty(Boolean isConditionSQL) { return false; } + private boolean checkSQLExecutionEnabled(Config config) { + if (!Feature.WRANGLER_EXECUTION_SQL.isEnabled(getContext())) { + return false; + } + + return SQL_ENABLED.equalsIgnoreCase(config.getSqlExecution()); + } + + List getDirectivesList(Config config) throws DirectiveParseException, RecipeException { + String recipe = config.getDirectives(); + List directives = null; + GrammarBasedParser parser = new GrammarBasedParser("default", + new MigrateToV2(recipe).migrate(), registry); + directives = parser.parse(); + return directives; + } + /** * This method creates a {@link CompositeDirectiveRegistry} and initializes the {@link RecipeParser} * with {@link NoOpDirectiveContext} @@ -569,23 +615,46 @@ private RecipeParser getRecipeParser(StageContext context) @Override public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { - if (PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage()) - && checkPreconditionNotEmpty(true)) { + isSqlEnabled = checkSQLExecutionEnabled(config); + if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext) + && !Feature.WRANGLER_EXECUTION_SQL.isEnabled(relationalTranformContext)) { + return new InvalidRelation("Plugin is not configured for relational transformation"); + } + + if (Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { + if ((isSqlEnabled || PRECONDITION_LANGUAGE_SQL.equalsIgnoreCase(config.getPreconditionLanguage())) + && checkPreconditionNotEmpty(true)) { + Optional> expressionFactory = getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); + relation = relation.filter(filterExpression); + } + } - if (!Feature.WRANGLER_PRECONDITION_SQL.isEnabled(relationalTranformContext)) { - throw new RuntimeException("SQL Precondition feature is not available"); + if (isSqlEnabled) { + registry = SystemDirectiveRegistry.INSTANCE; + try { + registry.reload("default"); + } catch (DirectiveLoadException e) { + throw new RuntimeException(e); } - Optional> expressionFactory = getExpressionFactory(relationalTranformContext); - if (!expressionFactory.isPresent()) { - return new InvalidRelation("Cannot find an Expression Factory"); + List directives = null; + try { + directives = getDirectivesList(config); + } catch (DirectiveParseException | RecipeException e) { + throw new RuntimeException(e); } - Expression filterExpression = expressionFactory.get().compile(config.getPreconditionSQL()); - return relation.filter(filterExpression); + for (Directive directive : directives) { + relation = directive + .transform(relationalTranformContext, relation); + } } - return new InvalidRelation("Plugin is not configured for relational transformation"); + return relation; } private Optional> getExpressionFactory(RelationalTranformContext ctx) { @@ -637,6 +706,8 @@ private Map getEntityMetricTags(EntityCountMetric metricDef) { * Config for the plugin. */ public static class Config extends PluginConfig { + + static final String NAME_SQL_EXECUTION = "sqlExecution"; static final String NAME_PRECONDITION = "precondition"; static final String NAME_PRECONDITION_SQL = "preconditionSQL"; static final String NAME_PRECONDITION_LANGUAGE = "expressionLanguage"; @@ -646,6 +717,11 @@ public static class Config extends PluginConfig { static final String NAME_SCHEMA = "schema"; static final String NAME_ON_ERROR = "on-error"; + @Name(NAME_SQL_EXECUTION) + @Description("Toggle to configure execution language between JEXL and SQL") + @Macro + @Nullable + private String sqlExecution; @Name(NAME_PRECONDITION_LANGUAGE) @Description("Toggle to configure precondition language between JEXL and SQL") @Macro @@ -693,8 +769,9 @@ public static class Config extends PluginConfig { @Nullable private final String onError; - public Config(String preconditionLanguage, String precondition, String directives, String udds, - String field, String schema, String onError) { + public Config(String sqlExecution, String preconditionLanguage, String precondition, String directives, String udds, + String field, String schema, String onError) { + this.sqlExecution = sqlExecution; this.preconditionLanguage = preconditionLanguage; this.precondition = precondition; this.directives = directives; @@ -713,13 +790,13 @@ public String getOnError() { } public String getPreconditionLanguage() { - if (Strings.isNullOrEmpty(preconditionLanguage)) { - // due to backward compatibility... - return PRECONDITION_LANGUAGE_JEXL; - } return preconditionLanguage; } + public String getSqlExecution() { + return sqlExecution; + } + public String getPreconditionJEXL() { return precondition; } @@ -741,4 +818,3 @@ public String getUDDs() { } } } - diff --git a/wrangler-transform/widgets/Wrangler-transform.json b/wrangler-transform/widgets/Wrangler-transform.json index 9f9c7611e..cac41124e 100644 --- a/wrangler-transform/widgets/Wrangler-transform.json +++ b/wrangler-transform/widgets/Wrangler-transform.json @@ -37,6 +37,25 @@ ] } }, + { + "widget-type": "radio-group", + "name": "sqlExecution", + "label": "Enable SQL Execution", + "widget-attributes": { + "layout": "inline", + "default": "no", + "options": [ + { + "id": "yes", + "label": "Yes" + }, + { + "id": "no", + "label": "No" + } + ] + } + }, { "widget-type": "textbox", "label": "Precondition (JEXL)", @@ -49,8 +68,8 @@ "widget-type": "textbox", "label": "Precondition (SQL)", "name": "preconditionSQL", - "widget-attributes" : { - "default" : "false" + "widget-attributes": { + "default": "true" } } ] @@ -106,38 +125,50 @@ "emit-errors": true, "filters": [ { - "name": "PreconditionValueNotSQL", + "name": "executionSQLEnabled", "condition": { - "expression": "expressionLanguage != 'sql'" + "expression": "featureFlags['wrangler.execution.sql.enabled'] == true" }, "show": [ { "type": "properties", - "name": "precondition" + "name": "sqlExecution" } ] }, { - "name": "preconditionValueSQL", + "name": "SQLExecutionDisabledPreconditionEnabled", "condition": { - "expression": "expressionLanguage == 'sql'" + "expression": "featureFlags['wrangler.precondition.sql.enabled'] == true && featureFlags['wrangler.execution.sql.enabled'] == false" }, "show": [ { "type": "properties", - "name": "preconditionSQL" + "name": "expressionLanguage" } ] }, { - "name": "preconditionSQLEnabled", + "name": "PreconditionValueNotSQL", "condition": { - "expression": "featureFlags['wrangler.precondition.sql.enabled'] == true" + "expression": "expressionLanguage == 'jexl' && sqlExecution == 'no'" }, "show": [ { "type": "properties", - "name": "expressionLanguage" + "name": "precondition" + } + ] + }, + { + "name": "preconditionValueSQL", + "condition": { + "expression": "expressionLanguage == 'sql' || sqlExecution == 'yes'" + }, + "show": [ + { + "type": "properties", + "name": "preconditionSQL" } ] }