Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Date & Timestamp parsing fails #358

Open
johanhenriksson opened this issue Oct 17, 2022 · 0 comments
Open

Date & Timestamp parsing fails #358

johanhenriksson opened this issue Oct 17, 2022 · 0 comments

Comments

@johanhenriksson
Copy link

Im trying to read from a spark stream with a schema consisting of two test fields, one of type Date and the other of type Timestamp.

The row that causes problems looks as follows:

xadd events * time "2022-10-14T13:33:15.012345" date "2022-10-14" event click

This exact data parses totally fine if i read from a stream of json files without any manual conversion. However, when i read the very same row from redis streams, i first get a format error:

Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]

Which is strange, since the json stream parses just fine. However, changing the timestamp to this format causes an even weirder error:

class java.sql.Timestamp cannot be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of loader 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
     at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
     at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     ...

Removing the timestamp column entirely and trying to parse a simple date 2022-10-14 causes a similar error:

Caused by: java.lang.ClassCastException: class java.sql.Date cannot be cast to class java.lang.Integer (java.sql.Date is in module java.sql of loader 'platform'; java.lang.Integer is in module java.base of loader 'bootstrap')
     at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:103)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt(rows.scala:41)
     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getInt$(rows.scala:41)
     at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:195)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
     ...

My code doesn't do anything special, it basically looks like this:

    schema = StructType([
        StructField('time', TimestampType()),
        StructField('date', DateType()),
        StructField('event', StringType()),
    ])

    df = spark.readStream \
        .format('redis') \
        .option('stream.keys', redis_stream) \
        .schema(schema) \
        .load()

    def batch_writer(bdf: DataFrame, batch_id: int) -> None:
        bdf.write.format('delta') \
            .mode('append') \
            .save(table_path)

    query = df.writeStream.format('delta') \
        .foreachBatch(batch_writer) \
        .outputMode('update') \
        .option('checkpointLocation', f'{table_path}/_checkpoints') \
        .start()

    query.awaitTermination()

Any ideas?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant