You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When Trying to write timestamp data into an Iceberg Table I get the following exception:
Caused by: java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.OffsetDateTime (java.lang.Long and java.time.OffsetDateTime are in module java.base of loader 'bootstrap')
at org.apache.iceberg.data.parquet.BaseParquetWriter$TimestamptzWriter.write(BaseParquetWriter.java:281)
at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:356)
at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
at org.apache.beam.sdk.io.iceberg.RecordWriter.write(RecordWriter.java:105)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:144)
at org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:234)
at org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:224)
Suppressed: java.lang.IllegalArgumentException: Expected all data writers to be closed, but found 1 data writer(s) still open
Source code:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class HardcodedDataToIcebergPipeline {
public static void main(String[] args) {
// Define Beam pipeline options
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
// Define Beam schema with a timestamp field
Schema beamSchema = Schema.builder()
.addStringField("id")
.addDateTimeField("event_timestamp")
.build();
Map<String, Object> catalogConfig = new HashMap<String, Object>() {{
put("warehouse", "<warehouse-dir>");
put("uri", "<thrift-uri>");
put("type", "iceberg");
put("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
put("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
}};
Map<String, Object> icebergConfig = new HashMap<String, Object>() {{
put("catalog_name", appConfig.get("catalog"));
put("catalog_properties", catalogConfig);
}};
String table = String.format("%s.%s", "myschema", "test");
icebergConfig.put("table", table);
// Create hardcoded data with timestamp
List<Row> hardcodedData = Arrays.asList(
Row.withSchema(beamSchema)
.addValues("record-1", Instant.now()) // Use Joda-Time Instant
.build(),
Row.withSchema(beamSchema)
.addValues("record-2", Instant.now().minus(3600 * 1000)) // 1 hour ago
.build(),
Row.withSchema(beamSchema)
.addValues("record-3", Instant.now().minus(24 * 3600 * 1000)) // 1 day ago
.build()
);
// Build the pipeline using the hardcoded data
pipeline
.apply("CreateHardcodedData", Create.of(hardcodedData).withRowSchema(beamSchema)) // Hardcoded
.apply("WriteToIceberg", Managed.write(Managed.ICEBERG).withConfig(icebergConfig));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
Component: Python SDK
Component: Java SDK
Component: Go SDK
Component: Typescript SDK
Component: IO connector
Component: Beam YAML
Component: Beam examples
Component: Beam playground
Component: Beam katas
Component: Website
Component: Infrastructure
Component: Spark Runner
Component: Flink Runner
Component: Samza Runner
Component: Twister2 Runner
Component: Hazelcast Jet Runner
Component: Google Cloud Dataflow Runner
The text was updated successfully, but these errors were encountered:
@DanielMorales9 Scratch that -- we can add support for both Joda and Java libraries when writing to Iceberg (so your existing code should still work).
However, when reading from Iceberg, I'm afraid we will have to stick with just the Java time library (meaning the output Row field type will be SqlType.DATETIME)
What happened?
When Trying to write timestamp data into an Iceberg Table I get the following exception:
Source code:
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
The text was updated successfully, but these errors were encountered: