You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is there a difference when committing a consumer's offset between the two approaches? I Used to use the TopicPartition() approach but recently found the consumer.position() approach in the docs:
This is used when sending offsets for EOS consumed messages.
# Send messages resulting from the processed message# At the end of consumer's callback for its while-true loopnext_offset=consumer.position(consumer.assignment())
try:
send_eos_messages(out_messages, next_offset, consumer.consumer_group_metadata())
_decay_send_error()
exceptKWPostAbortTransaction:
# When a transaction has been aborted, rewind the consumer's offset. By# decrementing the offset, the current message will be processed again.next_offset[0].offset-=1consumer.commit(offsets=next_offset)
_increment_send_error()
Context for the above send messages code snippet
importosfromdataclassesimportdataclassfromtypingimportAnyfromconfluent_kafkaimportConsumer, KafkaException, Producer, TopicPartitionconsumer_configs= {
"bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"isolation.level": "read_committed",
"auto.offset.reset": "latest",
"group.id": os.getenv("KAFKA_CONSUMER_GROUP"),
"enable.auto.commit": False,
"enable.auto.offset.store": False,
}
producer_configs= {
"bootstrap.servers": os.getenv("KAFKA_BOOTSTRAP_SERVERS"),
"enable.idempotence": True,
"transactional.id": os.getenv("KAFKA_CONSUMER_GROUP"),
}
consumer=Consumer(consumer_configs)
producer=Producer(producer_configs)
classKWPostAbortTransaction(Exception):
...
@dataclassclassKWMessage:
topic: strvalue: strkey: strdefsend_eos_messages(
messages: list[KWMessage],
offsets: list[TopicPartition],
group_metadata: Any,
) ->None:
"""Send Exactly-Once-Semantic messages with the transactional API."""producer.begin_transaction()
try:
formessageinmessages:
producer.produce(message.topic, message.value, message.key)
except (BufferError, KafkaException) aserr:
raiseKWPostAbortTransaction() fromerrproducer.send_offsets_to_transaction(offsets, group_metadata)
whileTrue:
try:
producer.commit_transaction(10.0)
breakexceptKafkaExceptionaserr: # noqaiferr.args[0].retriable():
# retriable error, try againcontinueeliferr.args[0].txn_requires_abort():
producer.abort_transaction()
raiseKWPostAbortTransaction()
else:
# treat all other errors as fatalraisedef_decay_send_error():
...
def_increment_send_error():
...
# Send messages resulting from the processed message.# This code is at the end of consumer's callback for its while-true-poll loop
...
next_offset=consumer.position(consumer.assignment())
try:
send_eos_messages(out_messages, next_offset, consumer.consumer_group_metadata())
_decay_send_error()
exceptKWPostAbortTransaction:
# When a transaction has been aborted, rewind the consumer's offset. By# decrementing the offset, the current message will be processed again.next_offset[0].offset-=1consumer.commit(offsets=next_offset)
_increment_send_error()
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Is there a difference when committing a consumer's offset between the two approaches? I Used to use the
TopicPartition()
approach but recently found theconsumer.position()
approach in the docs:This is used when sending offsets for EOS consumed messages.
Context for the above send messages code snippet
Beta Was this translation helpful? Give feedback.
All reactions