Skip to content

Commit

Permalink
Fix copy directive schema generation bug
Browse files Browse the repository at this point in the history
  • Loading branch information
vanathi-g committed Nov 9, 2023
1 parent 9d0175e commit 8422169
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
11 changes: 8 additions & 3 deletions wrangler-core/src/main/java/io/cdap/directives/column/Copy.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,23 @@ public Schema getOutputSchema(SchemaResolutionContext context) {
Schema inputSchema = context.getInputSchema();
List<Schema.Field> outputFields = new ArrayList<>();
Schema sourceSchema = inputSchema.getField(source.value()).getSchema();
boolean added = false;

for (Schema.Field field : inputSchema.getFields()) {
if (field.getName().equals(destination.value())) {
outputFields.add(Schema.Field.of(destination.value(), sourceSchema));
if (force) {
outputFields.add(Schema.Field.of(destination.value(), sourceSchema));
added = true;
} else {
throw new RuntimeException("Failed to copy to existing column when 'force' is false");
}
} else {
outputFields.add(field);
}
}
if (!force) {
if (!added) {
outputFields.add(Schema.Field.of(destination.value(), sourceSchema));
}

return Schema.recordOf("outputSchema", outputFields);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void testForceCopy() throws Exception {
public void testGetOutputSchemaForForceCopiedColumn() throws Exception {
String[] directives = new String[] {
"copy :col_B :col_A true",
"copy :col_B :col_C true",
};
List<Row> rows = Collections.singletonList(
new Row("col_A", 1).add("col_B", new BigDecimal("143235.016"))
Expand All @@ -115,7 +116,8 @@ public void testGetOutputSchemaForForceCopiedColumn() throws Exception {
Schema expectedSchema = Schema.recordOf(
"expectedSchema",
Schema.Field.of("col_A", Schema.decimalOf(10, 3)),
Schema.Field.of("col_B", Schema.decimalOf(10, 3))
Schema.Field.of("col_B", Schema.decimalOf(10, 3)),
Schema.Field.of("col_C", Schema.decimalOf(10, 3))
);

Schema outputSchema = TestingRig.executeAndGetSchema(directives, rows, inputSchema);
Expand Down

0 comments on commit 8422169

Please sign in to comment.