Skip to content

Commit

Permalink
Merge pull request #651 from data-integrations/sql-temp
Browse files Browse the repository at this point in the history
Transformation and Row Directives
  • Loading branch information
shrverma authored Aug 1, 2023
2 parents 36326f4 + bec9679 commit 39e360f
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -32,6 +36,7 @@
import io.cdap.wrangler.api.parser.ColumnNameList;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -101,4 +106,18 @@ public Mutation lineage() {
.relation(Many.columns(columns), targetColumn)
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
java.util.Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}
String getColumnString = String.join(",", columns);
return relation.setColumn(targetColumn, expressionFactory.get().compile(String
.format("struct(%s)", getColumnString)));
}

}
21 changes: 21 additions & 0 deletions wrangler-core/src/main/java/io/cdap/directives/column/Swap.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -32,8 +36,10 @@
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;

import java.util.List;
import java.util.Optional;

/**
* A directive for swapping the column names.
Expand Down Expand Up @@ -93,4 +99,19 @@ public Mutation lineage() {
.relation(Many.of(left, right), Many.of(right, left))
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {

Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}

Relation tempRel = relation.setColumn("tempColumn", expressionFactory.get().compile(right));
tempRel = tempRel.setColumn(right, expressionFactory.get().compile(left));
return tempRel.setColumn(left, expressionFactory.get().compile("tempColumn"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -37,6 +41,7 @@
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -148,4 +153,33 @@ public Mutation lineage() {
.all(Many.of(col), Many.of(col))
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
java.util.Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}

int currentpos = 1;
int columncounter = 1;

for (int width : widths) {
String fixedLengthParseExpression = String.format("replace(substr(%s, %d, %d), '%s', \"\")",
col, currentpos, width, padding);
String filterExcessLengthExpression = String.format("%d <= (length(%s) - %d + 1)", width, col, currentpos);

relation = relation.setColumn(String.format("%s_%d", col, columncounter),
expressionFactory.get().compile(fixedLengthParseExpression))
.filter(expressionFactory.get().compile(filterExcessLengthExpression));

currentpos += width;
columncounter++;
}

return relation;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -33,10 +37,12 @@
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -147,5 +153,16 @@ private boolean matchPattern(String value) {
}
return matches;
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}
return relation.filter(expressionFactory.get().compile("rlike(" + column + ", '" + pattern + "')"));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.Expression;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -35,9 +40,13 @@
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* A directive for parsing a string into record using the record delimiter.
Expand Down Expand Up @@ -112,4 +121,18 @@ public Mutation lineage() {
.relation(column, column)
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
java.util.Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}
Map<String, Expression> columnExpMap = new LinkedHashMap<>();
columnExpMap.put(column, expressionFactory.get().compile(
String.format("explode(split(%s, \"%s\", %d))", column, delimiter, limit)));
return relation.select(columnExpMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -33,9 +37,11 @@
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

/**
* A directive for splitting the string into multiple {@link Row}s.
Expand Down Expand Up @@ -109,5 +115,19 @@ public Mutation lineage() {
.relation(Many.columns(column), Many.columns(column))
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {

Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}

return relation.setColumn(column, expressionFactory.get()
.compile(String.format("explode(split(%s, '%s'))", column, regex)));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -32,9 +36,11 @@
import io.cdap.wrangler.api.parser.Text;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;
import org.json.JSONObject;

import java.util.List;
import java.util.Optional;

/**
* A directive to fill null or empty column values with a fixed value.
Expand Down Expand Up @@ -104,4 +110,17 @@ public Mutation lineage() {
.relation(column, column)
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}
return relation.setColumn(column, expressionFactory.get().compile(String
.format("nvl2(%s, if(length(%s) == 0, \"%s\", %s), \"%s\")",
column, column, value, column, value)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -31,8 +35,10 @@
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;

import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;

Expand Down Expand Up @@ -87,4 +93,18 @@ public Mutation lineage() {
.relation(column, column)
.build();
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {

Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);
if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}

return relation.setColumn(column, expressionFactory.get()
.compile(String.format("uuid()")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.cdap.api.annotation.Plugin;
import io.cdap.cdap.etl.api.relational.Expression;
import io.cdap.cdap.etl.api.relational.ExpressionFactory;
import io.cdap.cdap.etl.api.relational.InvalidRelation;
import io.cdap.cdap.etl.api.relational.Relation;
import io.cdap.cdap.etl.api.relational.RelationalTranformContext;
import io.cdap.wrangler.api.Arguments;
import io.cdap.wrangler.api.Directive;
import io.cdap.wrangler.api.DirectiveExecutionException;
Expand All @@ -33,8 +38,10 @@
import io.cdap.wrangler.api.parser.ColumnName;
import io.cdap.wrangler.api.parser.TokenType;
import io.cdap.wrangler.api.parser.UsageDefinition;
import io.cdap.wrangler.utils.SqlExpressionGenerator;

import java.util.List;
import java.util.Optional;

/**
* A directive to split email address into account and domain.
Expand Down Expand Up @@ -128,4 +135,24 @@ private Pair<String, String> extractDomainAndAccount(String emailId) {
return new Pair<>(emailId.substring(0, lastidx), emailId.substring(lastidx + 1));
}
}

@Override
public Relation transform(RelationalTranformContext relationalTranformContext,
Relation relation) {
Optional<ExpressionFactory<String>> expressionFactory = SqlExpressionGenerator
.getExpressionFactory(relationalTranformContext);

if (!expressionFactory.isPresent()) {
return new InvalidRelation("Cannot find an Expression Factory");
}

String accountExpression = String
.format("substring(%s, 1, char_length(%s) - locate('@', reverse(%s)))", column, column, column);
String domainExpression = String.format("substring_index(%s, '@', -1)", column);
Relation accountRelation = relation
.setColumn(generatedAccountCol, expressionFactory.get().compile(accountExpression));
return accountRelation.setColumn(generatedDomainCol,
expressionFactory.get().compile(domainExpression));
}

}
Loading

0 comments on commit 39e360f

Please sign in to comment.