Skip to content

Commit

Permalink
Infer avro schema if not provided
Browse files Browse the repository at this point in the history
  • Loading branch information
cavemandaveman committed Jul 27, 2018
1 parent 4ddce31 commit bc13860
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 14 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Build the bundle

```shell
cd nifi-standardize-date-bundle
mvn clean install
mvn initialize
mvn clean package
```

Copy Nar file to $NIFI_HOME/lib
Expand Down Expand Up @@ -54,5 +55,5 @@ The originating timezone of the date fields in the FlowFile. Short or standard I

- Use drop-down (with custom option) for timezone
- Allow choice of Avro compression (Snappy, bzip2, etc.)
- Infer Avro schema if not passed in
- ~~Infer Avro schema if not passed in~~
- Better unit tests for Avro
6 changes: 3 additions & 3 deletions nifi-standardize-date-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-bundle</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
</parent>

<artifactId>nifi-standardize-date-nar</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
Expand All @@ -34,7 +34,7 @@
<dependency>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-processors</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion nifi-standardize-date-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-bundle</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
</parent>

<artifactId>nifi-standardize-date-processors</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ public void process(InputStream in, OutputStream out) throws IOException {
List<Schema.Field> schemaFields;
Set<Schema.Field> newSchemaFields = new HashSet<>();
if (flowFormat.equals("AVRO")) {
schema = new Schema.Parser().parse(schemaString);
try {
schema = new Schema.Parser().parse(schemaString);
} catch (NullPointerException e) {
schema = FormatStream.getEmbeddedSchema(in);
in.reset();
}
schemaFields = schema.getFields();
for(Schema.Field f : schemaFields) {
Schema.Field oldField = new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class FormatStream {
private static final Logger logger = LoggerFactory.getLogger(FormatStream.class);

public static InputStream avroToJson(InputStream in, Schema schema) throws IOException {
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
DatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileStream<Object> streamReader = new DataFileStream<Object>(in, reader);
DatumWriter<Object> writer = new ExtendedGenericDatumWriter<>(schema);

Expand Down Expand Up @@ -87,6 +87,14 @@ public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream,
return baos;
}

public static Schema getEmbeddedSchema(InputStream in) throws IOException {
DatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileStream<Object> streamReader = new DataFileStream<Object>(in, reader);
streamReader.close();

return streamReader.getSchema();
}

private static InputStream convertStream(ByteArrayOutputStream baos) throws IOException {
PipedInputStream pin = new PipedInputStream();
PipedOutputStream pout = new PipedOutputStream(pin);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public void setSchema() throws IOException {
public void testNoProcessing() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO");
runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema);
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand All @@ -62,7 +61,19 @@ public void testStandardization() throws IOException {
runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema);
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\",\"bad_date_union\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(StandardizeDateRelationships.REL_SUCCESS, 1);
}

@Test
public void testStandardizationNoSchema() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO");
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\",\"bad_date_union\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago");

runner.enqueue(unprocessedFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class StandardizeDateJsonTest {
@Test
public void testNoProcessing() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand All @@ -53,7 +52,6 @@ public void testStandardization() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON");
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand All @@ -73,7 +71,6 @@ public void testShortZoneId() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON");
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.TIMEZONE, "CST");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-bundle</artifactId>
<version>18.07.4</version>
<version>18.07.5</version>
<packaging>pom</packaging>

<modules>
Expand Down

0 comments on commit bc13860

Please sign in to comment.