From 057767f2bbddf03552d33ad4da54d06070ec43b8 Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Thu, 11 Jul 2024 10:52:01 +0800 Subject: [PATCH] [chore](routine-load) optimize out of range error message (#36450) (#37456) pick #36450 --- be/src/runtime/routine_load/data_consumer.cpp | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index ccf5fb4cb25ba1..2874a43ae0e4ad 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -36,6 +36,7 @@ #include "runtime/small_file_mgr.h" #include "service/backend_options.h" #include "util/blocking_queue.hpp" +#include "util/debug_points.h" #include "util/defer_op.h" #include "util/stopwatch.hpp" #include "util/string_util.h" @@ -218,6 +219,16 @@ Status KafkaDataConsumer::group_consume(BlockingQueue* queue, consumer_watch.start(); std::unique_ptr msg(_k_consumer->consume(1000 /* timeout, ms */)); consumer_watch.stop(); + DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", { + done = true; + std::stringstream ss; + ss << "Offset out of range" + << ", consume partition " << msg->partition() << ", consume offset " + << msg->offset(); + LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); + st = Status::InternalError(ss.str()); + break; + }); switch (msg->err()) { case RdKafka::ERR_NO_ERROR: if (_consuming_partition_ids.count(msg->partition()) <= 0) { @@ -249,6 +260,15 @@ Status KafkaDataConsumer::group_consume(BlockingQueue* queue, break; } [[fallthrough]]; + case RdKafka::ERR_OFFSET_OUT_OF_RANGE: { + done = true; + std::stringstream ss; + ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset " + << msg->offset(); + LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str(); + st = Status::InternalError(ss.str()); + break; + } case RdKafka::ERR__PARTITION_EOF: { LOG(INFO) << "consumer meet partition eof: " << _id << " partition offset: " << msg->offset(); @@ -261,7 +281,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue* queue, default: LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); done = true; - st = Status::InternalError(msg->errstr()); + st = Status::InternalError(msg->errstr()); break; }