diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java index 2654f8730..d49d88625 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,6 +36,7 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.Arrays; @@ -101,4 +106,18 @@ public Mutation lineage() { .relation(Many.columns(columns), targetColumn) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + String getColumnString = String.join(",", columns); + return relation.setColumn(targetColumn, expressionFactory.get().compile(String + .format("struct(%s)", getColumnString))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java index b35da3283..62a90119c 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,8 +36,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive for swapping the column names. @@ -93,4 +99,19 @@ public Mutation lineage() { .relation(Many.of(left, right), Many.of(right, left)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + Relation tempRel = relation.setColumn("tempColumn", expressionFactory.get().compile(right)); + tempRel = tempRel.setColumn(right, expressionFactory.get().compile(left)); + return tempRel.setColumn(left, expressionFactory.get().compile("tempColumn")); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java index 525f7ba7a..65a8255c8 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java +++ b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -37,6 +41,7 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; @@ -148,4 +153,33 @@ public Mutation lineage() { .all(Many.of(col), Many.of(col)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + int currentpos = 1; + int columncounter = 1; + + for (int width : widths) { + String fixedLengthParseExpression = String.format("replace(substr(%s, %d, %d), '%s', \"\")", + col, currentpos, width, padding); + String filterExcessLengthExpression = String.format("%d <= (length(%s) - %d + 1)", width, col, currentpos); + + relation = relation.setColumn(String.format("%s_%d", col, columncounter), + expressionFactory.get().compile(fixedLengthParseExpression)) + .filter(expressionFactory.get().compile(filterExcessLengthExpression)); + + currentpos += width; + columncounter++; + } + + return relation; + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java index 5e7a6d7de..916575a1a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,10 +37,12 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.json.JSONObject; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.regex.Pattern; /** @@ -147,5 +153,16 @@ private boolean matchPattern(String value) { } return matches; } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.filter(expressionFactory.get().compile("rlike(" + column + ", '" + pattern + "')")); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java index 5b4f57f58..cbdf290b2 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -35,9 +40,13 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * A directive for parsing a string into record using the record delimiter. @@ -112,4 +121,18 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + Map columnExpMap = new LinkedHashMap<>(); + columnExpMap.put(column, expressionFactory.get().compile( + String.format("explode(split(%s, \"%s\", %d))", column, delimiter, limit))); + return relation.select(columnExpMap); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java index 8848ed2a9..75645dc4a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,9 +37,11 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * A directive for splitting the string into multiple {@link Row}s. @@ -109,5 +115,19 @@ public Mutation lineage() { .relation(Many.columns(column), Many.columns(column)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("explode(split(%s, '%s'))", column, regex))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java index 45542b254..5532620ba 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,9 +36,11 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.json.JSONObject; import java.util.List; +import java.util.Optional; /** * A directive to fill null or empty column values with a fixed value. @@ -104,4 +110,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile(String + .format("nvl2(%s, if(length(%s) == 0, \"%s\", %s), \"%s\")", + column, column, value, column, value))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java index 46656f5a5..808b6f300 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -87,4 +93,18 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("uuid()"))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java index 60a359f05..a0438dabf 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,8 +38,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive to split email address into account and domain. @@ -128,4 +135,24 @@ private Pair extractDomainAndAccount(String emailId) { return new Pair<>(emailId.substring(0, lastidx), emailId.substring(lastidx + 1)); } } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + String accountExpression = String + .format("substring(%s, 1, char_length(%s) - locate('@', reverse(%s)))", column, column, column); + String domainExpression = String.format("substring_index(%s, '@', -1)", column); + Relation accountRelation = relation + .setColumn(generatedAccountCol, expressionFactory.get().compile(accountExpression)); + return accountRelation.setColumn(generatedDomainCol, + expressionFactory.get().compile(domainExpression)); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java index 0ae4df352..98ca135cd 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,10 +35,12 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.List; +import java.util.Optional; /** * A Executor to decodes a column with url encoding. @@ -101,4 +107,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn( + column, expressionFactory.get().compile( + String.format("reflect('java.net.URLDecoder', 'decode', %s, 'utf-8')", column))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java index 204e380c7..b693672f4 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,10 +35,12 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.List; +import java.util.Optional; /** * A Executor to encode a column with url encoding. @@ -101,4 +107,16 @@ public List execute(List rows, ExecutorContext context) throws Directi } return rows; } + + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn( + column, expressionFactory.get().compile( + String.format("reflect('java.net.URLEncoder', 'encode', %s, 'utf-8')", column))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java index b25cd8a65..5a7ef7139 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java @@ -23,6 +23,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -38,6 +42,7 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; @@ -115,4 +120,18 @@ public Mutation lineage() { columns.forEach(column -> builder.relation(column, column)); return builder.build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + String getColumnString = String.join(",", columns); + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("struct(%s)", getColumnString))); + } + }