From 2a1c2b79c7bfb1be60c43737fb8124bcaae75396 Mon Sep 17 00:00:00 2001 From: shrverma Date: Wed, 12 Jul 2023 12:34:59 +0000 Subject: [PATCH 01/14] Implement swap directive --- .../java/io/cdap/directives/column/Swap.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java index b35da3283..55ea1b27f 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,8 +36,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive for swapping the column names. @@ -93,4 +99,16 @@ public Mutation lineage() { .relation(Many.of(left, right), Many.of(right, left)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + Relation tempRel = relation.setColumn(left, expressionFactory.get().compile(right)); + return tempRel.setColumn(right, expressionFactory.get().compile(left)); + } } From 35fbf9349106af5272c65b0bbc412fcbf8b22f2d Mon Sep 17 00:00:00 2001 From: shrverma Date: Fri, 14 Jul 2023 07:00:57 +0000 Subject: [PATCH 02/14] Implement filter directives --- .../directives/row/RecordConditionFilter.java | 16 ++++++++++++++++ .../row/RecordMissingOrNullFilter.java | 17 +++++++++++++++++ .../cdap/directives/row/RecordRegexFilter.java | 17 +++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java index b3eb4adb2..c327e55f9 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java @@ -20,6 +20,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -38,6 +42,7 @@ import io.cdap.wrangler.expression.EL; import io.cdap.wrangler.expression.ELContext; import io.cdap.wrangler.expression.ELException; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; @@ -129,4 +134,15 @@ public List getCountMetrics() { EntityCountMetric jexlCategoryMetric = getJexlCategoryMetric(el.getScriptParsedText()); return (jexlCategoryMetric == null) ? null : ImmutableList.of(jexlCategoryMetric); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.filter(expressionFactory.get().compile(el.getScriptParsedText())); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java index e6a6c8d8d..c4bb509ea 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,10 +35,12 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; /** * Filters records if they don't have all the columns specified or they have null values or combination. @@ -97,4 +103,15 @@ public Mutation lineage() { cols.forEach(column -> builder.relation(column, column)); return builder.build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.filter(expressionFactory.get().compile("nvl(" + columns[0] + ", false)")); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java index 5e7a6d7de..916575a1a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordRegexFilter.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,10 +37,12 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.json.JSONObject; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.regex.Pattern; /** @@ -147,5 +153,16 @@ private boolean matchPattern(String value) { } return matches; } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.filter(expressionFactory.get().compile("rlike(" + column + ", '" + pattern + "')")); + } } From 8f926a59baa874cd9e5d96547cf99ce55a3f0064 Mon Sep 17 00:00:00 2001 From: shrverma Date: Sat, 15 Jul 2023 09:49:40 +0000 Subject: [PATCH 03/14] Implement SetRecordDelimiter directive --- .../directives/row/SetRecordDelimiter.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java index 5b4f57f58..cbdf290b2 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SetRecordDelimiter.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -35,9 +40,13 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * A directive for parsing a string into record using the record delimiter. @@ -112,4 +121,18 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + Map columnExpMap = new LinkedHashMap<>(); + columnExpMap.put(column, expressionFactory.get().compile( + String.format("explode(split(%s, \"%s\", %d))", column, delimiter, limit))); + return relation.select(columnExpMap); + } } From 68785b5daa37497f2a7a3615edb03af93f1870df Mon Sep 17 00:00:00 2001 From: shrverma Date: Sat, 15 Jul 2023 12:15:14 +0000 Subject: [PATCH 04/14] Implement split-email directive --- .../directives/transformation/SplitEmail.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java index 60a359f05..d7976cc42 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java @@ -19,6 +19,11 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.Expression; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,8 +38,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; /** * A directive to split email address into account and domain. @@ -128,4 +135,24 @@ private Pair extractDomainAndAccount(String emailId) { return new Pair<>(emailId.substring(0, lastidx), emailId.substring(lastidx + 1)); } } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + Relation accountRelation = relation.setColumn(generatedAccountCol, getExpression(expressionFactory)); + return accountRelation.setColumn(generatedDomainCol, + expressionFactory.get().compile(String.format("substring_index(%s, '@', -1)", column))); + } + + Expression getExpression(Optional> expfactory) { + return expfactory.get().compile(String + .format("substring(%s, 1, char_length(%s) - locate('@', reverse(%s)))", column, column, column)); + } } From 6404c8d7f91ef45d0b948bbacf9d92992021d360 Mon Sep 17 00:00:00 2001 From: shrverma Date: Mon, 17 Jul 2023 11:17:32 +0000 Subject: [PATCH 05/14] Implement transformation directives --- .../cdap/directives/column/CreateRecord.java | 21 +++++++++++++++++++ .../directives/column/SplitToColumns.java | 20 ++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java index 2654f8730..5d0e6dd40 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,6 +36,7 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.Arrays; @@ -101,4 +106,20 @@ public Mutation lineage() { .relation(Many.columns(columns), targetColumn) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(targetColumn, expressionFactory.get().compile(String + .format("struct(%s)", getColumnString(columns)))); + } + + public String getColumnString(String[] columns) { + return String.join(",", columns); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java b/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java index 6ade24fdb..fa8bf695d 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,9 +37,11 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * A directive for splitting the columns into multiple columns. @@ -123,5 +129,19 @@ public Mutation lineage() { String.format("%s_%d", column, 10))) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("explode(split(%s, '%s'))", column, regex))); + } } From ca890344adcc3b766fea562b64af40ed4dc5e292 Mon Sep 17 00:00:00 2001 From: shrverma Date: Sat, 22 Jul 2023 08:34:06 +0000 Subject: [PATCH 06/14] Refactor code --- .../directives/column/SplitToColumns.java | 20 ------------------- .../java/io/cdap/directives/column/Swap.java | 7 +++++-- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java b/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java index fa8bf695d..6ade24fdb 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/SplitToColumns.java @@ -19,10 +19,6 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; -import io.cdap.cdap.etl.api.relational.ExpressionFactory; -import io.cdap.cdap.etl.api.relational.InvalidRelation; -import io.cdap.cdap.etl.api.relational.Relation; -import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -37,11 +33,9 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; -import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; -import java.util.Optional; /** * A directive for splitting the columns into multiple columns. @@ -129,19 +123,5 @@ public Mutation lineage() { String.format("%s_%d", column, 10))) .build(); } - - @Override - public Relation transform(RelationalTranformContext relationalTranformContext, - Relation relation) { - - Optional> expressionFactory = SqlExpressionGenerator - .getExpressionFactory(relationalTranformContext); - if (!expressionFactory.isPresent()) { - return new InvalidRelation("Cannot find an Expression Factory"); - } - - return relation.setColumn(column, expressionFactory.get() - .compile(String.format("explode(split(%s, '%s'))", column, regex))); - } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java index 55ea1b27f..62a90119c 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/Swap.java @@ -103,12 +103,15 @@ public Mutation lineage() { @Override public Relation transform(RelationalTranformContext relationalTranformContext, Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator .getExpressionFactory(relationalTranformContext); if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - Relation tempRel = relation.setColumn(left, expressionFactory.get().compile(right)); - return tempRel.setColumn(right, expressionFactory.get().compile(left)); + + Relation tempRel = relation.setColumn("tempColumn", expressionFactory.get().compile(right)); + tempRel = tempRel.setColumn(right, expressionFactory.get().compile(left)); + return tempRel.setColumn(left, expressionFactory.get().compile("tempColumn")); } } From 09aa10e0325150e22a7c26ff925a7d4a21c7415b Mon Sep 17 00:00:00 2001 From: shrverma Date: Sat, 22 Jul 2023 08:44:59 +0000 Subject: [PATCH 07/14] Implement UUID, split-rows and JSON-object directives --- .../io/cdap/directives/row/SplitToRows.java | 20 ++++++++++++++++++ .../transformation/GenerateUUID.java | 20 ++++++++++++++++++ .../directives/writer/WriteAsJsonObject.java | 21 +++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java index 8848ed2a9..75645dc4a 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/SplitToRows.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -33,9 +37,11 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; +import java.util.Optional; /** * A directive for splitting the string into multiple {@link Row}s. @@ -109,5 +115,19 @@ public Mutation lineage() { .relation(Many.columns(column), Many.columns(column)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("explode(split(%s, '%s'))", column, regex))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java index 46656f5a5..808b6f300 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/GenerateUUID.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,8 +35,10 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; +import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -87,4 +93,18 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("uuid()"))); + } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java index b25cd8a65..3d69fd55e 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java @@ -23,6 +23,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -38,6 +42,7 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.List; @@ -115,4 +120,20 @@ public Mutation lineage() { columns.forEach(column -> builder.relation(column, column)); return builder.build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn( + column, expressionFactory.get().compile(String.format("struct(%s)", getJSONColumns(columns)))); + } + + String getJSONColumns(List columnList) { + return String.join(",", columnList); + } } From 278a30eb654b8b3d194a373ab9db12ef40c45539 Mon Sep 17 00:00:00 2001 From: shrverma Date: Sat, 22 Jul 2023 09:00:40 +0000 Subject: [PATCH 08/14] Remove row filter directive implementation --- .../directives/row/RecordConditionFilter.java | 16 ---------------- .../row/RecordMissingOrNullFilter.java | 17 ----------------- 2 files changed, 33 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java index c327e55f9..b3eb4adb2 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordConditionFilter.java @@ -20,10 +20,6 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; -import io.cdap.cdap.etl.api.relational.ExpressionFactory; -import io.cdap.cdap.etl.api.relational.InvalidRelation; -import io.cdap.cdap.etl.api.relational.Relation; -import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -42,7 +38,6 @@ import io.cdap.wrangler.expression.EL; import io.cdap.wrangler.expression.ELContext; import io.cdap.wrangler.expression.ELException; -import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; @@ -134,15 +129,4 @@ public List getCountMetrics() { EntityCountMetric jexlCategoryMetric = getJexlCategoryMetric(el.getScriptParsedText()); return (jexlCategoryMetric == null) ? null : ImmutableList.of(jexlCategoryMetric); } - - @Override - public Relation transform(RelationalTranformContext relationalTranformContext, - Relation relation) { - java.util.Optional> expressionFactory = SqlExpressionGenerator - .getExpressionFactory(relationalTranformContext); - if (!expressionFactory.isPresent()) { - return new InvalidRelation("Cannot find an Expression Factory"); - } - return relation.filter(expressionFactory.get().compile(el.getScriptParsedText())); - } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java b/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java index c4bb509ea..e6a6c8d8d 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java +++ b/wrangler-core/src/main/java/io/cdap/directives/row/RecordMissingOrNullFilter.java @@ -19,10 +19,6 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; -import io.cdap.cdap.etl.api.relational.ExpressionFactory; -import io.cdap.cdap.etl.api.relational.InvalidRelation; -import io.cdap.cdap.etl.api.relational.Relation; -import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -35,12 +31,10 @@ import io.cdap.wrangler.api.parser.ColumnNameList; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; -import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Optional; /** * Filters records if they don't have all the columns specified or they have null values or combination. @@ -103,15 +97,4 @@ public Mutation lineage() { cols.forEach(column -> builder.relation(column, column)); return builder.build(); } - - @Override - public Relation transform(RelationalTranformContext relationalTranformContext, - Relation relation) { - Optional> expressionFactory = SqlExpressionGenerator - .getExpressionFactory(relationalTranformContext); - if (!expressionFactory.isPresent()) { - return new InvalidRelation("Cannot find an Expression Factory"); - } - return relation.filter(expressionFactory.get().compile("nvl(" + columns[0] + ", false)")); - } } From 480f2aa047505622c431c83b273845abb0e7ebbe Mon Sep 17 00:00:00 2001 From: shrverma Date: Sat, 22 Jul 2023 09:07:04 +0000 Subject: [PATCH 09/14] Implement fill-null-or-empty --- .../transformation/FillNullOrEmpty.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java index 45542b254..5532620ba 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/FillNullOrEmpty.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -32,9 +36,11 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import org.json.JSONObject; import java.util.List; +import java.util.Optional; /** * A directive to fill null or empty column values with a fixed value. @@ -104,4 +110,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn(column, expressionFactory.get().compile(String + .format("nvl2(%s, if(length(%s) == 0, \"%s\", %s), \"%s\")", + column, column, value, column, value))); + } } From 7f3f3cb14fa92b4f19e1c86a9c5ef8306fcaae0a Mon Sep 17 00:00:00 2001 From: shrverma Date: Sat, 22 Jul 2023 09:20:33 +0000 Subject: [PATCH 10/14] Implement URL encoding and decoding directives --- .../directives/transformation/UrlDecode.java | 19 +++++++++++++++++++ .../directives/transformation/UrlEncode.java | 18 ++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java index 0ae4df352..d7ef6003e 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,10 +35,12 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.List; +import java.util.Optional; /** * A Executor to decodes a column with url encoding. @@ -101,4 +107,17 @@ public Mutation lineage() { .relation(column, column) .build(); } + + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn( + column, expressionFactory.get().compile( + String.format("reflect('java.net.url.Decoder', 'decode', %s, 'utf-8')", column))); + } + } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java index 204e380c7..dcdd27e49 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -31,10 +35,12 @@ import io.cdap.wrangler.api.parser.ColumnName; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.List; +import java.util.Optional; /** * A Executor to encode a column with url encoding. @@ -101,4 +107,16 @@ public List execute(List rows, ExecutorContext context) throws Directi } return rows; } + + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + return relation.setColumn( + column, expressionFactory.get().compile( + String.format("reflect('java.net.url.Encoder', 'encode', %s, 'utf-8')", column))); + } } From 69e852737c9b5f36c9e24c206ccd3822462c5a74 Mon Sep 17 00:00:00 2001 From: shrverma Date: Mon, 24 Jul 2023 09:24:23 +0000 Subject: [PATCH 11/14] Implement fixed-length-parser --- .../directives/parser/FixedLengthParser.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java index 525f7ba7a..f9cdf569d 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java +++ b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java @@ -19,6 +19,10 @@ import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.relational.ExpressionFactory; +import io.cdap.cdap.etl.api.relational.InvalidRelation; +import io.cdap.cdap.etl.api.relational.Relation; +import io.cdap.cdap.etl.api.relational.RelationalTranformContext; import io.cdap.wrangler.api.Arguments; import io.cdap.wrangler.api.Directive; import io.cdap.wrangler.api.DirectiveExecutionException; @@ -37,6 +41,7 @@ import io.cdap.wrangler.api.parser.Text; import io.cdap.wrangler.api.parser.TokenType; import io.cdap.wrangler.api.parser.UsageDefinition; +import io.cdap.wrangler.utils.SqlExpressionGenerator; import java.util.ArrayList; import java.util.List; @@ -148,4 +153,32 @@ public Mutation lineage() { .all(Many.of(col), Many.of(col)) .build(); } + + @Override + public Relation transform(RelationalTranformContext relationalTranformContext, + Relation relation) { + java.util.Optional> expressionFactory = SqlExpressionGenerator + .getExpressionFactory(relationalTranformContext); + if (!expressionFactory.isPresent()) { + return new InvalidRelation("Cannot find an Expression Factory"); + } + + int currentpos = 1; + int columncounter = 1; + + for (int width : widths) { + relation = relation.setColumn(String.format("%s_%d", col, columncounter), + expressionFactory.get().compile(String + .format("replace(substr(%s, %d, %d), '%s', \"\")" + , col, currentpos, width, padding))) + .filter(expressionFactory.get().compile(String.format("%d <= (length(%s) - %d + 1)", + width, col, currentpos))); + + currentpos += width; + columncounter++; + } + + return relation; + } + } From b5ce52967092752faa6e5a4c34a00d395868b772 Mon Sep 17 00:00:00 2001 From: shrverma Date: Wed, 26 Jul 2023 11:25:34 +0000 Subject: [PATCH 12/14] Refactor code --- .../cdap/directives/transformation/SplitEmail.java | 12 ++++++------ .../io/cdap/directives/writer/WriteAsJsonObject.java | 6 ++---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java index d7976cc42..a0438dabf 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/SplitEmail.java @@ -146,13 +146,13 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return new InvalidRelation("Cannot find an Expression Factory"); } - Relation accountRelation = relation.setColumn(generatedAccountCol, getExpression(expressionFactory)); + String accountExpression = String + .format("substring(%s, 1, char_length(%s) - locate('@', reverse(%s)))", column, column, column); + String domainExpression = String.format("substring_index(%s, '@', -1)", column); + Relation accountRelation = relation + .setColumn(generatedAccountCol, expressionFactory.get().compile(accountExpression)); return accountRelation.setColumn(generatedDomainCol, - expressionFactory.get().compile(String.format("substring_index(%s, '@', -1)", column))); + expressionFactory.get().compile(domainExpression)); } - Expression getExpression(Optional> expfactory) { - return expfactory.get().compile(String - .format("substring(%s, 1, char_length(%s) - locate('@', reverse(%s)))", column, column, column)); - } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java index 3d69fd55e..e0f3d0fd8 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java @@ -130,10 +130,8 @@ public Relation transform(RelationalTranformContext relationalTranformContext, return new InvalidRelation("Cannot find an Expression Factory"); } return relation.setColumn( - column, expressionFactory.get().compile(String.format("struct(%s)", getJSONColumns(columns)))); + column, expressionFactory.get() + .compile(String.format("struct(%s)", String.join(",", columns)))); } - String getJSONColumns(List columnList) { - return String.join(",", columnList); - } } From ed0e9a439d8ffd1f5ac3407c273ecd34df513c54 Mon Sep 17 00:00:00 2001 From: shrverma Date: Wed, 26 Jul 2023 13:08:53 +0000 Subject: [PATCH 13/14] Fix class not found error --- .../main/java/io/cdap/directives/transformation/UrlDecode.java | 2 +- .../main/java/io/cdap/directives/transformation/UrlEncode.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java index d7ef6003e..98ca135cd 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlDecode.java @@ -117,7 +117,7 @@ public Relation transform(RelationalTranformContext relationalTranformContext, } return relation.setColumn( column, expressionFactory.get().compile( - String.format("reflect('java.net.url.Decoder', 'decode', %s, 'utf-8')", column))); + String.format("reflect('java.net.URLDecoder', 'decode', %s, 'utf-8')", column))); } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java index dcdd27e49..b693672f4 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java +++ b/wrangler-core/src/main/java/io/cdap/directives/transformation/UrlEncode.java @@ -117,6 +117,6 @@ public Relation transform(RelationalTranformContext relationalTranformContext, } return relation.setColumn( column, expressionFactory.get().compile( - String.format("reflect('java.net.url.Encoder', 'encode', %s, 'utf-8')", column))); + String.format("reflect('java.net.URLEncoder', 'encode', %s, 'utf-8')", column))); } } From bec96796463cccccef69861affb74d9b65c5fbd2 Mon Sep 17 00:00:00 2001 From: shrverma Date: Tue, 1 Aug 2023 05:17:39 +0000 Subject: [PATCH 14/14] Remove extra function --- .../java/io/cdap/directives/column/CreateRecord.java | 6 ++---- .../io/cdap/directives/parser/FixedLengthParser.java | 11 ++++++----- .../io/cdap/directives/writer/WriteAsJsonObject.java | 6 +++--- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java index 5d0e6dd40..d49d88625 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java +++ b/wrangler-core/src/main/java/io/cdap/directives/column/CreateRecord.java @@ -115,11 +115,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } + String getColumnString = String.join(",", columns); return relation.setColumn(targetColumn, expressionFactory.get().compile(String - .format("struct(%s)", getColumnString(columns)))); + .format("struct(%s)", getColumnString))); } - public String getColumnString(String[] columns) { - return String.join(",", columns); - } } diff --git a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java index f9cdf569d..65a8255c8 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java +++ b/wrangler-core/src/main/java/io/cdap/directives/parser/FixedLengthParser.java @@ -167,12 +167,13 @@ public Relation transform(RelationalTranformContext relationalTranformContext, int columncounter = 1; for (int width : widths) { + String fixedLengthParseExpression = String.format("replace(substr(%s, %d, %d), '%s', \"\")", + col, currentpos, width, padding); + String filterExcessLengthExpression = String.format("%d <= (length(%s) - %d + 1)", width, col, currentpos); + relation = relation.setColumn(String.format("%s_%d", col, columncounter), - expressionFactory.get().compile(String - .format("replace(substr(%s, %d, %d), '%s', \"\")" - , col, currentpos, width, padding))) - .filter(expressionFactory.get().compile(String.format("%d <= (length(%s) - %d + 1)", - width, col, currentpos))); + expressionFactory.get().compile(fixedLengthParseExpression)) + .filter(expressionFactory.get().compile(filterExcessLengthExpression)); currentpos += width; columncounter++; diff --git a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java index e0f3d0fd8..5a7ef7139 100644 --- a/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java +++ b/wrangler-core/src/main/java/io/cdap/directives/writer/WriteAsJsonObject.java @@ -129,9 +129,9 @@ public Relation transform(RelationalTranformContext relationalTranformContext, if (!expressionFactory.isPresent()) { return new InvalidRelation("Cannot find an Expression Factory"); } - return relation.setColumn( - column, expressionFactory.get() - .compile(String.format("struct(%s)", String.join(",", columns)))); + String getColumnString = String.join(",", columns); + return relation.setColumn(column, expressionFactory.get() + .compile(String.format("struct(%s)", getColumnString))); } }