diff --git a/README.md b/README.md index 16099a9..c313bfa 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ Build the bundle ```shell cd nifi-standardize-date-bundle -mvn clean install +mvn initialize +mvn clean package ``` Copy Nar file to $NIFI_HOME/lib @@ -54,5 +55,5 @@ The originating timezone of the date fields in the FlowFile. Short or standard I - Use drop-down (with custom option) for timezone - Allow choice of Avro compression (Snappy, bzip2, etc.) -- Infer Avro schema if not passed in +- ~~Infer Avro schema if not passed in~~ - Better unit tests for Avro diff --git a/nifi-standardize-date-nar/pom.xml b/nifi-standardize-date-nar/pom.xml index 47c8ffe..a425471 100644 --- a/nifi-standardize-date-nar/pom.xml +++ b/nifi-standardize-date-nar/pom.xml @@ -19,11 +19,11 @@ com.nineteen04labs nifi-standardize-date-bundle - 18.07.4 + 18.07.5 nifi-standardize-date-nar - 18.07.4 + 18.07.5 nar true @@ -34,7 +34,7 @@ com.nineteen04labs nifi-standardize-date-processors - 18.07.4 + 18.07.5 diff --git a/nifi-standardize-date-processors/pom.xml b/nifi-standardize-date-processors/pom.xml index b99136a..d0a75f8 100644 --- a/nifi-standardize-date-processors/pom.xml +++ b/nifi-standardize-date-processors/pom.xml @@ -20,7 +20,7 @@ com.nineteen04labs nifi-standardize-date-bundle - 18.07.4 + 18.07.5 nifi-standardize-date-processors diff --git a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java index 54b5050..22c7c5a 100644 --- a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java +++ b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java @@ -119,7 +119,12 @@ public void process(InputStream in, OutputStream out) throws IOException { List schemaFields; Set newSchemaFields = new HashSet<>(); if (flowFormat.equals("AVRO")) { - schema = new Schema.Parser().parse(schemaString); + try { + schema = new Schema.Parser().parse(schemaString); + } catch (NullPointerException e) { + schema = FormatStream.getEmbeddedSchema(in); + in.reset(); + } schemaFields = schema.getFields(); for(Schema.Field f : schemaFields) { Schema.Field oldField = new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()); diff --git a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java index de424ac..e2f705a 100644 --- a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java +++ b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java @@ -43,7 +43,7 @@ public class FormatStream { private static final Logger logger = LoggerFactory.getLogger(FormatStream.class); public static InputStream avroToJson(InputStream in, Schema schema) throws IOException { - GenericDatumReader reader = new GenericDatumReader(); + DatumReader reader = new GenericDatumReader(); DataFileStream streamReader = new DataFileStream(in, reader); DatumWriter writer = new ExtendedGenericDatumWriter<>(schema); @@ -87,6 +87,14 @@ public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream, return baos; } + public static Schema getEmbeddedSchema(InputStream in) throws IOException { + DatumReader reader = new GenericDatumReader(); + DataFileStream streamReader = new DataFileStream(in, reader); + streamReader.close(); + + return streamReader.getSchema(); +} + private static InputStream convertStream(ByteArrayOutputStream baos) throws IOException { PipedInputStream pin = new PipedInputStream(); PipedOutputStream pout = new PipedOutputStream(pin); diff --git a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java index 14029ef..2d65548 100644 --- a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java +++ b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java @@ -43,7 +43,6 @@ public void setSchema() throws IOException { public void testNoProcessing() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO"); runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema); - runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); @@ -62,7 +61,19 @@ public void testStandardization() throws IOException { runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema); runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\",\"bad_date_union\":\"MM/dd/yy\"}"); runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago"); - runner.setValidateExpressionUsage(false); + + runner.enqueue(unprocessedFile); + + runner.run(); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(StandardizeDateRelationships.REL_SUCCESS, 1); + } + + @Test + public void testStandardizationNoSchema() throws IOException { + runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO"); + runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\",\"bad_date_union\":\"MM/dd/yy\"}"); + runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago"); runner.enqueue(unprocessedFile); diff --git a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java index e04f0a0..01d8b8b 100644 --- a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java +++ b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java @@ -33,7 +33,6 @@ public class StandardizeDateJsonTest { @Test public void testNoProcessing() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON"); - runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); @@ -53,7 +52,6 @@ public void testStandardization() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON"); runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}"); runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago"); - runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); @@ -73,7 +71,6 @@ public void testShortZoneId() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON"); runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}"); runner.setProperty(StandardizeDateProperties.TIMEZONE, "CST"); - runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); diff --git a/pom.xml b/pom.xml index b9d2523..9d65fd5 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ com.nineteen04labs nifi-standardize-date-bundle - 18.07.4 + 18.07.5 pom