Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: IcebergIO cannot write Timestamp columns #32680

Closed
3 of 17 tasks
DanielMorales9 opened this issue Oct 7, 2024 · 3 comments · Fixed by #32688
Closed
3 of 17 tasks

[Bug]: IcebergIO cannot write Timestamp columns #32680

DanielMorales9 opened this issue Oct 7, 2024 · 3 comments · Fixed by #32688
Assignees
Labels
bug dataflow IcebergIO IcebergIO: can only be used through ManagedIO io P2

Comments

@DanielMorales9
Copy link

DanielMorales9 commented Oct 7, 2024

What happened?

When Trying to write timestamp data into an Iceberg Table I get the following exception:

Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.OffsetDateTime (java.lang.Long and java.time.OffsetDateTime are in module java.base of loader 'bootstrap')
        at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestamptzWriter.write(BaseParquetWriter.java:281)
        at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:356)
        at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
        at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
        at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
        at org.apache.beam.sdk.io.iceberg.RecordWriter.write(RecordWriter.java:105)
        at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:144)
        at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:234)
        at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:224)
        Suppressed: java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open

Source code:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class HardcodedDataToIcebergPipeline {

    public static void main(String[] args) {
        // Define Beam pipeline options
        PipelineOptions options =
                PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);

        // Define Beam schema with a timestamp field
        Schema beamSchema = Schema.builder()
                .addStringField("id")
                .addDateTimeField("event_timestamp")
                .build();
        
        Map<String, Object> catalogConfig = new HashMap<String, Object>() {{
            put("warehouse", "<warehouse-dir>");
            put("uri", "<thrift-uri>");
            put("type", "iceberg");
            put("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
            put("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
        }};

        Map<String, Object> icebergConfig = new HashMap<String, Object>() {{
            put("catalog_name", appConfig.get("catalog"));
            put("catalog_properties", catalogConfig);
        }};
        String table = String.format("%s.%s", "myschema", "test");
        icebergConfig.put("table", table);

        // Create hardcoded data with timestamp
        List<Row> hardcodedData = Arrays.asList(
                Row.withSchema(beamSchema)
                        .addValues("record-1", Instant.now()) // Use Joda-Time Instant
                        .build(),
                Row.withSchema(beamSchema)
                        .addValues("record-2", Instant.now().minus(3600 * 1000)) // 1 hour ago
                        .build(),
                Row.withSchema(beamSchema)
                        .addValues("record-3", Instant.now().minus(24 * 3600 * 1000)) // 1 day ago
                        .build()
        );

        // Build the pipeline using the hardcoded data
        pipeline
                .apply("CreateHardcodedData", Create.of(hardcodedData).withRowSchema(beamSchema)) // Hardcoded 
                .apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergConfig));

        // Run the pipeline
        pipeline.run().waitUntilFinish();
    }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@ahmedabu98
Copy link
Contributor

Thanks for the feedback @DanielMorales9.
Opened a PR to add support for this: #32688

P.S. Iceberg API works with Java time library, not Joda. After the PR is merged, you should be able to write timestamps if you change your schema to:

import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
...
        // Define Beam schema with a timestamp field
        Schema beamSchema = Schema.builder()
                .addStringField("id")
                .addLogicalTypeField("event_timestamp", SqlTypes.DATETIME)
                .build();

and your Row values to:

import java.time.LocalDateTime;
...
        List<Row> hardcodedData = Arrays.asList(
                Row.withSchema(beamSchema)
                        .addValues("record-1", LocalDateTime.now()) // Use Java-Time LocalDateTime
                        .build(),
                        ....

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Oct 8, 2024

@DanielMorales9 Scratch that -- we can add support for both Joda and Java libraries when writing to Iceberg (so your existing code should still work).

However, when reading from Iceberg, I'm afraid we will have to stick with just the Java time library (meaning the output Row field type will be SqlType.DATETIME)

@DanielMorales9
Copy link
Author

Hi @ahmedabu98,
thank you for you quick reply 🙏
I left a few comments on the PR.

@github-actions github-actions bot added this to the 2.61.0 Release milestone Oct 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug dataflow IcebergIO IcebergIO: can only be used through ManagedIO io P2
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants