Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 source connector: Reached the end of stream with xx bytes left to read #1255

Open
JKCai opened this issue Jun 3, 2024 · 2 comments
Open
Labels

Comments

@JKCai
Copy link

JKCai commented Jun 3, 2024

What version of the Stream Reactor are you reporting this issue for?

Build from the master branch on May 26, 2024 by myself.

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Running Kafka cluster (MSK) on AWS, under version 2.8.2.tiered

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Have you read the docs?

Yes

What is the expected behaviour?

Restore the backup files into Kafka Topic.
It should restore all the messages into Kafka topic without any errors.

What was observed?

java.io.EOFException: Reached the end of stream with 8388608 bytes left to read

What is your Connect cluster configuration (connect-avro-distributed.properties)?

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
exactly.once.source.support=enabled

What is your connector properties configuration (my-connector.properties)?

connector.class=io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
connect.s3.kcql=INSERT INTO test_topic SELECT * FROM test-bucket:aaaaa/tiered_3_partition_10GiB_format_test STOREAS `Parquet` LIMIT 1000 PROPERTIES('store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=3
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Please provide full log files (redact and sensitive information)

2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] java.io.EOFException: Reached the end of stream with 8388608 bytes left to read
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:127)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetSeekableInputStream.readFully(ParquetSeekableInputStream.scala:79)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:1850)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:990)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:940)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1082)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:130)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetReaderIteratorAdaptor.next(ParquetReaderIteratorAdaptor.scala:33)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.next(ParquetStreamReader.scala:45)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.next(ParquetStreamReader.scala:31)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:53)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.formats.reader.DelegateIteratorCloudStreamReader.next(CloudStreamReader.scala:34)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.accumulate(ResultReader.scala:56)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader.retrieveResults(ResultReader.scala:45)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$12(ReaderManager.scala:78)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at set @ io.lenses.streamreactor.connect.cloud.common.source.reader.PartitionDiscovery$.$anonfun$run$9(PartitionDiscovery.scala:56)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$10(ReaderManager.scala:56)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
2024-06-03T20:23:47.000+10:00	[Worker-0cca08d722bc3578f] at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
@JKCai
Copy link
Author

JKCai commented Jun 3, 2024

The parquet file that generated by S3 Sink connector seems valid to us. See below for a few validations:

# Full scan of file
% parquet scan 000000049999_1717383243638_1717383465319.parquet
Scanned 50000 records from 1 file(s)
Time: 0.27 s

# Metadata

% parquet meta 000000049999_1717383243638_1717383465319.parquet
 
File path:  000000049999_1717383243638_1717383465319.parquet
Created by: parquet-mr version 1.13.1 (build db4183109d5b734ec5930d870cdae161e408ddba)
Properties:
  parquet.avro.schema: {"type":"record","name":"ConnectDefault","namespace":"lshaded.confluent.connect.avro","fields":[{"name":"key","type":["null","bytes"],"default":null},{"name":"value","type":["null","bytes"],"default":null},{"name":"metadata","type":["null",{"type":"record","name":"ConnectDefault2","fields":[{"name":"timestamp","type":"long"},{"name":"topic","type":"string"},{"name":"partition","type":"int"},{"name":"offset","type":"long"}]}],"default":null}]}
    writer.model.name: avro
Schema:
message lshaded.confluent.connect.avro.ConnectDefault {
  optional binary key;
  optional binary value;
  optional group metadata {
    required int64 timestamp;
    required binary topic (STRING);
    required int32 partition;
    required int64 offset;
  }
}
 
 
Row group 0:  count: 25886  5.052 kB records  start: 4  total(compressed): 127.699 MB total(uncompressed):127.699 MB
--------------------------------------------------------------------------------
                    type      encodings count     avg size   nulls   min / max
key                 BINARY    _ _ R     25886     0.64 B     0       "0x6B65792D31" / "0x6B65792D3936"
value               BINARY    _   _     25886     5.035 kB
metadata.timestamp  INT64     _   _     25886     8.00 B     0       "1717383243638" / "1717383358522"
metadata.topic      BINARY    _ _ R     25886     0.01 B     0       "tiered_3_partition_10GiB_..." / "tiered_3_partition_10GiB_..."
metadata.partition  INT32     _ _ R     25886     0.00 B     0       "0" / "0"
metadata.offset     INT64     _   _     25886     8.00 B     0       "0" / "25885"
 
Row group 1:  count: 24114  5.052 kB records  start: 133901686  total(compressed): 118.959 MB total(uncompressed):118.959 MB
--------------------------------------------------------------------------------
                    type      encodings count     avg size   nulls   min / max
key                 BINARY    _ _ R     24114     0.64 B     0       "0x6B65792D31" / "0x6B65792D3936"
value               BINARY    _   _     24114     5.035 kB
metadata.timestamp  INT64     _   _     24114     8.00 B     0       "1717383358524" / "1717383465319"
metadata.topic      BINARY    _ _ R     24114     0.01 B     0       "tiered_3_partition_10GiB_..." / "tiered_3_partition_10GiB_..."
metadata.partition  INT32     _ _ R     24114     0.00 B     0       "0" / "0"
metadata.offset     INT64     _   _     24114     8.00 B     0       "25886" / "49999"

# Show schema
% parquet schema 000000049999_1717383243638_1717383465319.parquet
{
"type" : "record",
"name" : "ConnectDefault",
"namespace" : "lshaded.confluent.connect.avro",
"fields" : [ {
  "name" : "key",
  "type" : [ "null", "bytes" ],
  "default" : null
}, {
  "name" : "value",
  "type" : [ "null", "bytes" ],
  "default" : null
}, {
  "name" : "metadata",
  "type" : [ "null", {
    "type" : "record",
    "name" : "ConnectDefault2",
    "fields" : [ {
      "name" : "timestamp",
      "type" : "long"
    }, {
      "name" : "topic",
      "type" : "string"
    }, {
      "name" : "partition",
      "type" : "int"
    }, {
      "name" : "offset",
      "type" : "long"
    } ]
  } ],
  "default" : null
} ]
}

# Cat/print first message in the Parquet file.
% parquet cat 000000049999_1717383243638_1717383465319.parquet -n 1
{"key": "key-53", "value": "\u0003\u0000\u009D['åÿ\u0089Iÿ\u008F\u009Fì\u009A<¤T#\u0016\u0080(ufefxaamxdewurnjl[...]kedqceuespwsba\u0000\u0010\u0096Dð\u0001\u0000", "metadata": {"timestamp": 1717383243638, "topic": "tiered_3_partition_10GiB_format_test", "partition": 0, "offset": 0}}

@JKCai
Copy link
Author

JKCai commented Jun 3, 2024

Sink connector config that I used

connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO test-bucket:aaaaa SELECT * FROM tiered_3_partition_10GiB_format_test STOREAS `JSON` PROPERTIES('flush.count'=50000,'flush.interval'=600,'store.envelope'=true)
aws.region=ap-southeast-2
tasks.max=3
topics=tiered_3_partition_10GiB_format_test
connect.s3.aws.region=ap-southeast-2
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
errors.log.enable=true
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants