Skip to content

Commit

Permalink
[chore](routine-load) optimize out of range error message (#36450) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Jul 11, 2024
1 parent 47a865b commit 057767f
Showing 1 changed file with 21 additions and 1 deletion.
22 changes: 21 additions & 1 deletion be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "runtime/small_file_mgr.h"
#include "service/backend_options.h"
#include "util/blocking_queue.hpp"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/string_util.h"
Expand Down Expand Up @@ -218,6 +219,16 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
consumer_watch.start();
std::unique_ptr<RdKafka::Message> msg(_k_consumer->consume(1000 /* timeout, ms */));
consumer_watch.stop();
DBUG_EXECUTE_IF("KafkaDataConsumer.group_consume.out_of_range", {
done = true;
std::stringstream ss;
ss << "Offset out of range"
<< ", consume partition " << msg->partition() << ", consume offset "
<< msg->offset();
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
st = Status::InternalError<false>(ss.str());
break;
});
switch (msg->err()) {
case RdKafka::ERR_NO_ERROR:
if (_consuming_partition_ids.count(msg->partition()) <= 0) {
Expand Down Expand Up @@ -249,6 +260,15 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
break;
}
[[fallthrough]];
case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
done = true;
std::stringstream ss;
ss << msg->errstr() << ", consume partition " << msg->partition() << ", consume offset "
<< msg->offset();
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << ss.str();
st = Status::InternalError<false>(ss.str());
break;
}
case RdKafka::ERR__PARTITION_EOF: {
LOG(INFO) << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
Expand All @@ -261,7 +281,7 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(msg->errstr());
st = Status::InternalError<false>(msg->errstr());
break;
}

Expand Down

0 comments on commit 057767f

Please sign in to comment.