Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS Source connector: Reached the end of stream with xx bytes left to read #1378

Open
replikeit opened this issue Sep 26, 2024 · 1 comment

Comments

@replikeit
Copy link

What version of the Stream Reactor are you reporting this issue for?

Release 8.1.4

Are you running the correct version of Kafka/Confluent for the Stream Reactor release?

I am running on Aiven Apache Kafka 3.8.0. My Kafka Connect is deployed using Strimzi on Kubernetes.

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Yes, I am using GCS (Google Cloud Storage) as the data source and Kafka as the sink.

Have you read the docs?

Yes, I have read the documentation.

What is the expected behaviour?

I expect the connector to transfer Parquet files from GCS to a Kafka topic.

What was observed?

I encountered the following error:
java.io.EOFException: Reached the end of stream with 8861 bytes left to read

What is your Connect cluster configuration (connect-avro-distributed.properties)?

group.id: test-cluster
auto.create.topics.enable: true
offset.storage.topic: test-cluster-offsets
config.storage.topic: test-cluster-configs
status.storage.topic: test-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
publication.autocreate.mode: "filtered"
config.providers: env
config.providers.env.class: io.strimzi.kafka.EnvVarConfigProvider

What is your connector properties configuration (my-connector.properties)?

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: eth-etl-gcs-source-parquet-connector0
  labels:
    strimzi.io/cluster: lambda-kafka-connect
spec:
  class: io.lenses.streamreactor.connect.gcp.storage.source.GCPStorageSourceConnector
  tasksMax: 1
  config:
    connect.gcpstorage.kcql: "
    insert into eth-etl-tokens-from-parquet select * from data-lambda-ethereum-etl-tokens:tokens_parquet2 BATCH=10 STOREAS `Parquet` LIMIT 100;
    "
    topics: "eth-etl-tokens-from-parquet"
    connect.gcpstorage.gcp.auth.mode: "File"
    connect.gcpstorage.gcp.file: "${env:GOOGLE_APPLICATION_CREDENTIALS}"
    connect.gcpstorage.gcp.project.id: "p2p-data-lambda"
    connect.gcpstorage.error.policy: "THROW"
    connect.gcpstorage.http.socket.timeout: 300000
    connect.gcpstorage.source.extension.includes: "parquet"
    connect.gcpstorage.source.partition.search.continuous: true
    connect.gcpstorage.source.partition.search.interval: 300000
    connect.gcpstorage.source.partition.search.recurse.levels: 0
    errors.log.enable: "true"
    errors.log.include.messages: "true"
    log4j.logger.io.lenses.streamreactor.connect: "DEBUG"
    log4j.logger.org.apache.parquet: "DEBUG"
    log4j.logger.org.apache.hadoop: "DEBUG"
    log4j.logger.io.lenses.streamreactor.connect.cloud.common: "DEBUG"
    log4j.logger.io.lenses.streamreactor.connect.cloud.common.formats.reader: "DEBUG"
    log4j.logger.com.google.cloud: "DEBUG"
    log4j.logger.com.google.auth: "DEBUG"
    log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask: "DEBUG"
    log4j.logger.org.apache.kafka.connect.runtime.WorkerTask: "DEBUG"

Please provide full log files (redact and sensitive information)

java.io.EOFException: Reached the end of stream with 8861 bytes left to read
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:104)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFullyHeapBuffer(DelegatingSeekableInputStream.java:126)
    at org.apache.parquet.io.DelegatingSeekableInputStream.readFully(DelegatingSeekableInputStream.java:91)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.parquet.ParquetSeekableInputStream.readFully(ParquetSeekableInputStream.scala:79)
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutivePartList.readAll(ParquetFileReader.java:2165)
    at org.apache.parquet.hadoop.ParquetFileReader.readAllPartsVectoredOrNormal(ParquetFileReader.java:1199)
    at org.apache.parquet.hadoop.ParquetFileReader.internalReadRowGroup(ParquetFileReader.java:1101)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:1051)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextFilteredRowGroup(ParquetFileReader.java:1296)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:140)
    at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:245)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
    at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:140)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetReaderIteratorAdaptor.<init>(ParquetReaderIteratorAdaptor.scala:25)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader.<init>(ParquetStreamReader.scala:35)
    at io.lenses.streamreactor.connect.cloud.common.formats.reader.ParquetStreamReader$.apply(ParquetStreamReader.scala:76)
    at io.lenses.streamreactor.connect.cloud.common.config.ParquetFormatSelection$.toStreamReader(FormatSelection.scala:196)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$11(ResultReader.scala:100)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$9(ResultReader.scala:86)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$7(ResultReader.scala:82)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$5(ResultReader.scala:81)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$3(ResultReader.scala:80)
    at scala.util.Either.flatMap(Either.scala:360)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ResultReader$.$anonfun$create$1(ResultReader.scala:79)
    at io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$9(ReaderManager.scala:55)
    at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:53)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$7(ReaderManager.scala:52)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$2(ReaderManager.scala:48)
    at delay @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.closeAndLog(ReaderManager.scala:111)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.$anonfun$poll$1(ReaderManager.scala:45)
    at getAndSet @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.fromNexFile$1(ReaderManager.scala:44)
    at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.reader.ReaderManager.acc$1(ReaderManager.scala:74)
    at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
    at traverse @ io.lenses.streamreactor.connect.cloud.common.source.distribution.CloudPartitionSearcher.find(CloudPartitionSearcher.scala:53)
    at traverse @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
    at map @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.$anonfun$poll$1(CloudSourceTaskState.scala:36)
    at get @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
    at map @ io.lenses.streamreactor.connect.cloud.common.source.CloudSourceTask.$anonfun$make$11(CloudSourceTask.scala:155)
    at flatMap @ io.lenses.streamreactor.connect.cloud.common.source.state.CloudSourceTaskState.poll(CloudSourceTaskState.scala:35)
@replikeit
Copy link
Author

Parquet example:

❯ parquet meta ~/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet                                                                        20:23:49 

File path:  /Users/alinaglumova/Downloads/tokens_parquet2_ethereum_tokens_000000000000.parquet
Created by: parquet-cpp-arrow version 13.0.0
Properties: (none)
Schema:
message schema {
  required binary address (STRING);
  optional binary symbol (STRING);
  optional binary name (STRING);
  optional binary decimals (STRING);
  optional binary total_supply (STRING);
  required int64 block_timestamp (TIMESTAMP(MICROS,false));
  required int64 block_number;
  required binary block_hash (STRING);
}


Row group 0:  count: 1068  159.64 B records  start: 4  total(compressed): 166.502 kB total(uncompressed):166.502 kB 
--------------------------------------------------------------------------------
                 type      encodings count     avg size   nulls   min / max
address          BINARY    _ _ R     1068      47.51 B    0       "0x005c97569a24303e9ba6de6..." / "0xffffe5b9cb42b4996997c92..."
symbol           BINARY    _ _ R     1068      6.21 B     10      "" / "��"
name             BINARY    _ _ R     1068      10.27 B    10      "" / "����������"
decimals         BINARY    _ _ R     1068      0.47 B     65      "0" / "9"
total_supply     BINARY    _ _ R     1068      6.24 B     9       "0" / "9999999999999999999900000..."
block_timestamp  INT64     _ _ R     1068      9.32 B     0       "2024-02-08T15:50:47.000000" / "2024-09-11T06:57:23.000000"
block_number     INT64     _ _ R     1068      9.32 B     0       "19184445" / "20725728"
block_hash       BINARY    _ _ R     1068      70.31 B    0       "0x0002376d87ff1bbe5310679..." / "0xffae2542617a1ee9204fb27..."

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant