Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Sink connector reports consumer lag of 1 despite processing all records #1132

Open
jamielwhite opened this issue Apr 10, 2024 · 2 comments
Labels

Comments

@jamielwhite
Copy link

Issue Guidelines

What version of the Stream Reactor are you reporting this issue for?

6.3.0

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

Yes (Kafka 3.6.0, Confluent 7.6.0)

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

Yes (S3 sink 6.3.0)

Have you read the docs?

Yes

What is the expected behaviour?

I expect the Kafka consumer group to report a lag of 0 once it has processed all records in the topic.

What was observed?

The consumer group lag remained at 1 once it caught up to new messages. When I read the files from S3, it had written the latest message on the topic. So the connector appears to be processing all of the messages but not committing the offsets how I'd expect.

➜  ~ kafka-consumer-groups --bootstrap-server localhost:19092 --describe --group connect-backup-s3-sink

GROUP                  TOPIC               PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        
connect-backup-s3-sink test_backup_topic_2 0          131             132             1            
connect-backup-s3-sink test_backup_topic_1 0          199             200             1  

What is your Connect cluster configuration (connect-avro-distributed.properties)?

group.id=test-kafka-connect
status.storage.replication.factor=1
key.converter=io.confluent.connect.avro.AvroConverter
config.storage.topic=connect-config
offset.storage.replication.factor=1
plugin.path=/usr/share/java/plugins
offset.storage.topic=connect-offsets
bootstrap.servers=kafka:9092
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
rest.advertised.host.name=localhost
rest.port=8083
status.storage.topic=connect-status
value.converter.schema.registry.url=http://schema-registry:8081
config.storage.replication.factor=1

What is your connector properties configuration (my-connector.properties)?

{
  "name": "backup-s3-sink",
  "config": {
    "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
    "connect.s3.kcql": "INSERT INTO my-bucket-name SELECT * FROM `*` STOREAS `AVRO` WITH_FLUSH_INTERVAL = 30 PROPERTIES('store.envelope'=true)",
    "connect.s3.custom.endpoint": "http://localstack:4566",
    "connect.s3.vhost.bucket": true,
    "topics": "test_backup_topic_1,test_backup_topic_2"
  }
}
@brandon-powers
Copy link
Contributor

+1, I'm seeing this as well on 6.3.0.

@JKCai
Copy link

JKCai commented Apr 14, 2024

Seeing the same behaviour too on version 6.3.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants