We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release 8.1.4
I am running on Aiven Apache Kafka 3.8.0. My Kafka Connect is deployed using Strimzi on Kubernetes.
Yes, I am using GCS (Google Cloud Storage) as the data source and Kafka as the sink.
Yes, I have read the documentation.
I expect the connector to transfer Parquet files from GCS to a Kafka topic.
I encountered the following error: java.io.EOFException: Reached the end of stream with 8861 bytes left to read
java.io.EOFException: Reached the end of stream with 8861 bytes left to read
group.id: test-cluster auto.create.topics.enable: true offset.storage.topic: test-cluster-offsets config.storage.topic: test-cluster-configs status.storage.topic: test-cluster-status config.storage.replication.factor: 3 offset.storage.replication.factor: 3 status.storage.replication.factor: 3 key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable: false value.converter.schemas.enable: false publication.autocreate.mode: "filtered" config.providers: env config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: eth-etl-gcs-source-parquet-connector0 labels: strimzi.io/cluster: lambda-kafka-connect spec: class: io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector tasksMax: 1 config: connect.gcpstorage.kcql: " insert into eth-etl-tokens-from-parquet select * from data-lambda-ethereum-etl-tokens:tokens_parquet2 BATCH=10 STOREAS `Parquet` LIMIT 100; " topics: "eth-etl-tokens-from-parquet" connect.gcpstorage.gcp.auth.mode: "File" connect.gcpstorage.gcp.file: "${env:GOOGLE_APPLICATION_CREDENTIALS}" connect.gcpstorage.gcp.project.id: "p2p-data-lambda" connect.gcpstorage.error.policy: "THROW" connect.gcpstorage.http.socket.timeout: 300000 connect.gcpstorage.source.extension.includes: "parquet" connect.gcpstorage.source.partition.search.continuous: true connect.gcpstorage.source.partition.search.interval: 300000 connect.gcpstorage.source.partition.search.recurse.levels: 0 errors.log.enable: "true" errors.log.include.messages: "true" log4j.logger.io.lenses.streamreactor.connect: "DEBUG" log4j.logger.org.apache.parquet: "DEBUG" log4j.logger.org.apache.hadoop: "DEBUG" log4j.logger.io.lenses.streamreactor.connect.cloud.common: "DEBUG" log4j.logger.io.lenses.streamreactor.connect.cloud.common.formats.reader: "DEBUG" log4j.logger.com.google.cloud: "DEBUG" log4j.logger.com.google.auth: "DEBUG" log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask: "DEBUG" log4j.logger.org.apache.kafka.connect.runtime.WorkerTask: "DEBUG"
java.io.EOFException: Reached the end of stream with 8861 bytes left to read at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104) at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:126) at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91) at io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetSeekableInputStream.readFully(ParquetSeekableInputStream.scala:79) at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:2165) at org.apache.parquet.hadoop.ParquetFileReader.readAllPartsVectoredOrNormal(ParquetFileReader.java:1199) at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:1101) at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:1051) at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1296) at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140) at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:245) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:140) at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetReaderIteratorAdaptor.<init>(ParquetReaderIteratorAdaptor.scala:25) at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.<init>(ParquetStreamReader.scala:35) at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader$.apply(ParquetStreamReader.scala:76) at io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection$.toStreamReader(FormatSelection.scala:196) at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100) at scala.util.Either.flatMap(Either.scala:360) at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86) at scala.util.Either.flatMap(Either.scala:360) at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82) at scala.util.Either.flatMap(Either.scala:360) at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81) at scala.util.Either.flatMap(Either.scala:360) at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80) at scala.util.Either.flatMap(Either.scala:360) at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79) at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55) at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53) at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52) at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48) at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111) at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45) at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44) at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44) at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155) at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74) at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53) at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53) at traverse @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36) at map @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36) at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155) at map @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155) at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.poll(CloudSourceTaskState.scala:35)
The text was updated successfully, but these errors were encountered:
Parquet example:
❯ parquet meta ~/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet 20:23:49 File path: /Users/alinaglumova/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet Created by: parquet-cpp-arrow version 13.0.0 Properties: (none) Schema: message schema { required binary address (STRING); optional binary symbol (STRING); optional binary name (STRING); optional binary decimals (STRING); optional binary total_supply (STRING); required int64 block_timestamp (TIMESTAMP(MICROS,false)); required int64 block_number; required binary block_hash (STRING); } Row group 0: count: 1068 159.64 B records start: 4 total(compressed): 166.502 kB total(uncompressed):166.502 kB -------------------------------------------------------------------------------- type encodings count avg size nulls min / max address BINARY _ _ R 1068 47.51 B 0 "0x005c97569a24303e9ba6de6..." / "0xffffe5b9cb42b4996997c92..." symbol BINARY _ _ R 1068 6.21 B 10 "" / "��" name BINARY _ _ R 1068 10.27 B 10 "" / "����������" decimals BINARY _ _ R 1068 0.47 B 65 "0" / "9" total_supply BINARY _ _ R 1068 6.24 B 9 "0" / "9999999999999999999900000..." block_timestamp INT64 _ _ R 1068 9.32 B 0 "2024-02-08T15:50:47.000000" / "2024-09-11T06:57:23.000000" block_number INT64 _ _ R 1068 9.32 B 0 "19184445" / "20725728" block_hash BINARY _ _ R 1068 70.31 B 0 "0x0002376d87ff1bbe5310679..." / "0xffae2542617a1ee9204fb27..."
Sorry, something went wrong.
No branches or pull requests
What version of the Stream Reactor are you reporting this issue for?
Release 8.1.4
Are you running the correct version of Kafka/Confluent for the Stream Reactor release?
I am running on Aiven Apache Kafka 3.8.0. My Kafka Connect is deployed using Strimzi on Kubernetes.
Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?
Yes, I am using GCS (Google Cloud Storage) as the data source and Kafka as the sink.
Have you read the docs?
Yes, I have read the documentation.
What is the expected behaviour?
I expect the connector to transfer Parquet files from GCS to a Kafka topic.
What was observed?
I encountered the following error:
java.io.EOFException: Reached the end of stream with 8861 bytes left to read
What is your Connect cluster configuration (connect-avro-distributed.properties)?
What is your connector properties configuration (my-connector.properties)?
Please provide full log files (redact and sensitive information)
The text was updated successfully, but these errors were encountered: