Skip to content

Commit

Permalink
Merge branch 'branch-2.0' into fix-quotes
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored Oct 9, 2024
2 parents ef6ca54 + 66967cb commit bcabb6f
Show file tree
Hide file tree
Showing 192 changed files with 5,070 additions and 1,739 deletions.
4 changes: 4 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
_is_partial_update = pschema.partial_update();
_is_strict_mode = pschema.is_strict_mode();
_timestamp_ms = pschema.timestamp_ms();
if (pschema.has_nano_seconds()) {
_nano_seconds = pschema.nano_seconds();
}
_timezone = pschema.timezone();

for (auto& col : pschema.partial_update_input_columns()) {
Expand Down Expand Up @@ -211,6 +214,7 @@ void OlapTableSchemaParam::to_protobuf(POlapTableSchemaParam* pschema) const {
pschema->set_is_strict_mode(_is_strict_mode);
pschema->set_timestamp_ms(_timestamp_ms);
pschema->set_timezone(_timezone);
pschema->set_nano_seconds(_nano_seconds);
for (auto col : _partial_update_input_columns) {
*pschema->add_partial_update_input_columns() = col;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class OlapTableSchemaParam {
}
void set_timestamp_ms(int64_t timestamp_ms) { _timestamp_ms = timestamp_ms; }
int64_t timestamp_ms() const { return _timestamp_ms; }
void set_nano_seconds(int32_t nano_seconds) { _nano_seconds = nano_seconds; }
int32_t nano_seconds() const { return _nano_seconds; }
void set_timezone(std::string timezone) { _timezone = timezone; }
std::string timezone() const { return _timezone; }
bool is_strict_mode() const { return _is_strict_mode; }
Expand All @@ -109,6 +111,7 @@ class OlapTableSchemaParam {
std::set<std::string> _partial_update_input_columns;
bool _is_strict_mode = false;
int64_t _timestamp_ms = 0;
int32_t _nano_seconds {0};
std::string _timezone;
};

Expand Down
14 changes: 13 additions & 1 deletion be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_requests_total, MetricUnit::
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_current_processing, MetricUnit::REQUESTS);

bvar::LatencyRecorder g_stream_load_receive_data_latency_ms("stream_load_receive_data_latency_ms");

static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024;
static const string CHUNK = "chunked";

Expand Down Expand Up @@ -188,8 +190,10 @@ int StreamLoadAction::on_header(HttpRequest* req) {

LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
<< ", tbl=" << ctx->table;
ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos();

auto st = _on_header(req, ctx);

if (!st.ok()) {
ctx->status = std::move(st);
if (ctx->need_rollback) {
Expand Down Expand Up @@ -340,7 +344,15 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
}
ctx->receive_bytes += remove_bytes;
}
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
int64_t read_data_time = MonotonicNanos() - start_read_data_time;
int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos;
ctx->read_data_cost_nanos += read_data_time;
ctx->receive_and_read_data_cost_nanos =
MonotonicNanos() - ctx->begin_receive_and_read_data_cost_nanos;
g_stream_load_receive_data_latency_ms
<< (ctx->receive_and_read_data_cost_nanos - last_receive_and_read_data_cost_nanos -
read_data_time) /
1000000;
}

void StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
Expand Down
64 changes: 1 addition & 63 deletions be/src/olap/cumulative_compaction_time_series_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@ namespace doris {

uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
uint32_t score = 0;
uint32_t level0_score = 0;
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();

int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
std::list<RowsetMetaSharedPtr> checked_rs_metas;
// NOTE: tablet._meta_lock is hold
auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
// check the base rowset and collect the rowsets of cumulative part
Expand All @@ -54,12 +51,6 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
} else {
// collect the rowsets of cumulative part
score += rs_meta->get_compaction_score();
if (rs_meta->compaction_level() == 0) {
level0_total_size += rs_meta->total_disk_size();
level0_score += rs_meta->get_compaction_score();
} else {
checked_rs_metas.push_back(rs_meta);
}
}
}

Expand All @@ -74,60 +65,7 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return 0;
}

// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
int64_t compaction_goal_size_mbytes =
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}

// Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold
if (level0_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) {
return score;
}

// Condition 3: level1 achieve compaction_goal_size
if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
return a->version().first < b->version().first;
});
int32_t rs_meta_count = 0;
int64_t continuous_size = 0;
for (const auto& rs_meta : checked_rs_metas) {
rs_meta_count++;
continuous_size += rs_meta->total_disk_size();
if (rs_meta_count >= 2) {
if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) {
return score;
}
}
}
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;

// Condition 4: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
if (cumu_interval >
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) {
return score;
}
} else if (score > 0) {
// If the compaction process has not been successfully executed,
// the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met
tablet->set_last_cumu_compaction_success_time(now);
}

// Condition 5: If there is a continuous set of empty rowsets, prioritize merging.
auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets(
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
if (!consecutive_empty_rowsets.empty()) {
return score;
}

return 0;
return score;
}

void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -648,11 +648,11 @@ void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
_tablet_schema->set_table_id(table_schema_param->table_id());
// set partial update columns info
_partial_update_info = std::make_shared<PartialUpdateInfo>();
_partial_update_info->init(*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(),
table_schema_param->timestamp_ms(), table_schema_param->timezone(),
_cur_max_version);
_partial_update_info->init(
*_tablet_schema, table_schema_param->is_partial_update(),
table_schema_param->partial_update_input_columns(),
table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
table_schema_param->nano_seconds(), table_schema_param->timezone(), _cur_max_version);
}

void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
Expand Down
41 changes: 35 additions & 6 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,21 +356,50 @@ void StorageEngine::_unused_rowset_monitor_thread_callback() {
!k_doris_exit);
}

int32_t StorageEngine::_auto_get_interval_by_disk_capacity(DataDir* data_dir) {
double disk_used = data_dir->get_usage(0);
double remain_used = 1 - disk_used;
DCHECK(remain_used >= 0 && remain_used <= 1);
DCHECK(config::path_gc_check_interval_second >= 0);
int32_t ret = 0;
if (remain_used > 0.9) {
// if config::path_gc_check_interval_second == 24h
ret = config::path_gc_check_interval_second;
} else if (remain_used > 0.7) {
// 12h
ret = config::path_gc_check_interval_second / 2;
} else if (remain_used > 0.5) {
// 6h
ret = config::path_gc_check_interval_second / 4;
} else if (remain_used > 0.3) {
// 4h
ret = config::path_gc_check_interval_second / 6;
} else {
// 3h
ret = config::path_gc_check_interval_second / 8;
}
return ret;
}

void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) {
LOG(INFO) << "try to start path gc thread!";
int32_t interval = config::path_gc_check_interval_second;
int32_t last_exec_time = 0;
do {
LOG(INFO) << "try to perform path gc!";
data_dir->perform_path_gc();
int32_t current_time = time(nullptr);

interval = config::path_gc_check_interval_second;
int32_t interval = _auto_get_interval_by_disk_capacity(data_dir);
if (interval <= 0) {
LOG(WARNING) << "path gc thread check interval config is illegal:" << interval
<< "will be forced set to half hour";
interval = 1800; // 0.5 hour
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)) &&
!k_doris_exit);
if (current_time - last_exec_time >= interval) {
LOG(INFO) << "try to perform path gc! disk remain [" << 1 - data_dir->get_usage(0)
<< "] internal [" << interval << "]";
data_dir->perform_path_gc();
last_exec_time = time(nullptr);
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5)) && !k_doris_exit);
LOG(INFO) << "stop path gc thread!";
}

Expand Down
24 changes: 19 additions & 5 deletions be/src/olap/partial_update_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ namespace doris {

void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone,
int64_t cur_max_version) {
int64_t timestamp_ms, int32_t nano_seconds,
const std::string& timezone, int64_t cur_max_version) {
is_partial_update = partial_update;
partial_update_input_columns = partial_update_cols;
max_version_in_flush_phase = cur_max_version;
this->timestamp_ms = timestamp_ms;
this->nano_seconds = nano_seconds;
this->timezone = timezone;
missing_cids.clear();
update_cids.clear();
Expand Down Expand Up @@ -66,6 +67,7 @@ void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const
can_insert_new_rows_in_partial_update);
partial_update_info_pb->set_is_strict_mode(is_strict_mode);
partial_update_info_pb->set_timestamp_ms(timestamp_ms);
partial_update_info_pb->set_nano_seconds(nano_seconds);
partial_update_info_pb->set_timezone(timezone);
for (const auto& value : default_values) {
partial_update_info_pb->add_default_values(value);
Expand Down Expand Up @@ -94,6 +96,9 @@ void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
is_strict_mode = partial_update_info_pb->is_strict_mode();
timestamp_ms = partial_update_info_pb->timestamp_ms();
timezone = partial_update_info_pb->timezone();
if (partial_update_info_pb->has_nano_seconds()) {
nano_seconds = partial_update_info_pb->nano_seconds();
}
default_values.clear();
for (const auto& value : partial_update_info_pb->default_values()) {
default_values.push_back(value);
Expand All @@ -117,9 +122,18 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids(
to_lower(tablet_schema.column(cur_cid).default_value())
.find(to_lower("CURRENT_TIMESTAMP")) !=
std::string::npos)) {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
auto pos = to_lower(tablet_schema.column(cur_cid).default_value()).find('(');
if (pos == std::string::npos) {
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, timezone);
default_value = dtv.debug_string();
} else {
int precision = std::stoi(
tablet_schema.column(cur_cid).default_value().substr(pos + 1));
vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
default_value = dtv.debug_string();
}
} else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
FieldType::OLAP_FIELD_TYPE_DATEV2 &&
to_lower(tablet_schema.column(cur_cid).default_value())
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/partial_update_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class PartialUpdateInfoPB;
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
const std::set<std::string>& partial_update_cols, bool is_strict_mode,
int64_t timestamp_ms, const std::string& timezone, int64_t cur_max_version = -1);
int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
int64_t cur_max_version = -1);
void to_pb(PartialUpdateInfoPB* partial_update_info) const;
void from_pb(PartialUpdateInfoPB* partial_update_info);
std::string summary() const;
Expand All @@ -47,6 +48,7 @@ struct PartialUpdateInfo {
bool can_insert_new_rows_in_partial_update {true};
bool is_strict_mode {false};
int64_t timestamp_ms {0};
int32_t nano_seconds {0};
std::string timezone;

// default values for missing cids
Expand Down
63 changes: 63 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,69 @@ Status RowsetMetaManager::_get_rowset_binlog_metas(OlapMeta* meta, const TabletU
return status;
}

Status RowsetMetaManager::get_rowset_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
Version version, RowsetBinlogMetasPB* metas_pb) {
Status status;
auto tablet_uid_str = tablet_uid.to_string();
auto prefix_key = make_binlog_meta_key_prefix(tablet_uid);
auto begin_key = make_binlog_meta_key_prefix(tablet_uid, version.first);
auto end_key = make_binlog_meta_key_prefix(tablet_uid, version.second + 1);
auto traverse_func = [meta, metas_pb, &status, &tablet_uid_str, &end_key](
std::string_view key, std::string_view value) -> bool {
VLOG_DEBUG << fmt::format("get rowset binlog metas, key={}, value={}", key, value);
if (key.compare(end_key) > 0) { // the binlog meta key is binary comparable.
// All binlog meta has been scanned
return false;
}

if (!starts_with_binlog_meta(key)) {
auto err_msg = fmt::format("invalid binlog meta key:{}", key);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}

BinlogMetaEntryPB binlog_meta_entry_pb;
if (!binlog_meta_entry_pb.ParseFromArray(value.data(), value.size())) {
auto err_msg = fmt::format("fail to parse binlog meta value:{}", value);
status = Status::InternalError(err_msg);
LOG(WARNING) << err_msg;
return false;
}

const auto& rowset_id = binlog_meta_entry_pb.rowset_id_v2();
auto* binlog_meta_pb = metas_pb->add_rowset_binlog_metas();
binlog_meta_pb->set_rowset_id(rowset_id);
binlog_meta_pb->set_version(binlog_meta_entry_pb.version());
binlog_meta_pb->set_num_segments(binlog_meta_entry_pb.num_segments());
binlog_meta_pb->set_meta_key(std::string {key});
binlog_meta_pb->set_meta(std::string {value});

auto binlog_data_key =
make_binlog_data_key(tablet_uid_str, binlog_meta_entry_pb.version(), rowset_id);
std::string binlog_data;
status = meta->get(META_COLUMN_FAMILY_INDEX, binlog_data_key, &binlog_data);
if (!status.ok()) {
LOG(WARNING) << status.to_string();
return false;
}
binlog_meta_pb->set_data_key(binlog_data_key);
binlog_meta_pb->set_data(binlog_data);

return true;
};

Status iterStatus =
meta->iterate(META_COLUMN_FAMILY_INDEX, begin_key, prefix_key, traverse_func);
if (!iterStatus.ok()) {
LOG(WARNING) << fmt::format(
"fail to iterate binlog meta. prefix_key:{}, version:{}, status:{}", prefix_key,
version.to_string(), iterStatus.to_string());
return iterStatus;
}
return status;
}

Status RowsetMetaManager::_get_all_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb) {
Status status;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_meta_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class RowsetMetaManager {
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
const std::vector<int64_t>& binlog_versions,
RowsetBinlogMetasPB* metas_pb);
// get all binlog metas of a tablet in version.
static Status get_rowset_binlog_metas(OlapMeta* meta, const TabletUid tablet_uid,
Version version, RowsetBinlogMetasPB* metas_pb);
static Status remove_binlog(OlapMeta* meta, const std::string& suffix);
static Status ingest_binlog_metas(OlapMeta* meta, TabletUid tablet_uid,
RowsetBinlogMetasPB* metas_pb);
Expand Down
Loading

0 comments on commit bcabb6f

Please sign in to comment.