Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](iceberg)Parallelize splits for count(*) #41169

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx, int64_t push_down_count)
io::IOContext* io_ctx)
: TableFormatReader(std::move(file_format_reader)),
_profile(profile),
_state(state),
_params(params),
_range(range),
_kv_cache(kv_cache),
_io_ctx(io_ctx),
_remaining_push_down_count(push_down_count) {
_io_ctx(io_ctx) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
Expand All @@ -95,6 +94,9 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
_remaining_push_down_count = range.table_format_params.iceberg_params.row_count;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the _remaining_push_down_count is not initialized if row_count is not set.

}
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
Expand Down
13 changes: 6 additions & 7 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class IcebergTableReader : public TableFormatReader {

IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx,
int64_t push_down_count);
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx);
~IcebergTableReader() override = default;

Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;
Expand Down Expand Up @@ -197,9 +197,9 @@ class IcebergParquetReader final : public IcebergTableReader {
IcebergParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx, int64_t push_down_count)
io::IOContext* io_ctx)
: IcebergTableReader(std::move(file_format_reader), profile, state, params, range,
kv_cache, io_ctx, push_down_count) {}
kv_cache, io_ctx) {}
Status init_reader(
const std::vector<std::string>& file_col_names,
const std::unordered_map<int, std::string>& col_id_name_map,
Expand Down Expand Up @@ -237,10 +237,9 @@ class IcebergOrcReader final : public IcebergTableReader {

IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx,
int64_t push_down_count)
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx)
: IcebergTableReader(std::move(file_format_reader), profile, state, params, range,
kv_cache, io_ctx, push_down_count) {}
kv_cache, io_ctx) {}

void set_delete_rows() override {
auto* orc_reader = (OrcReader*)_file_format_reader.get();
Expand Down
8 changes: 4 additions & 4 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ Status VFileScanner::_get_next_reader() {
std::unique_ptr<IcebergParquetReader> iceberg_reader =
IcebergParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params, range, _kv_cache,
_io_ctx.get(), _get_push_down_count());
_io_ctx.get());
init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
_push_down_conjuncts, _real_tuple_desc, _default_val_row_desc.get(),
Expand Down Expand Up @@ -878,9 +878,9 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
std::unique_ptr<IcebergOrcReader> iceberg_reader = IcebergOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params, range, _kv_cache,
_io_ctx.get(), _get_push_down_count());
std::unique_ptr<IcebergOrcReader> iceberg_reader =
IcebergOrcReader::create_unique(std::move(orc_reader), _profile, _state,
*_params, range, _kv_cache, _io_ctx.get());

init_status = iceberg_reader->init_reader(
_file_col_names, _col_id_name_map, _colname_to_value_range,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class IcebergScanNode extends FileQueryScanNode {
private IcebergSource source;
private Table icebergTable;
private List<String> pushdownIcebergPredicates = Lists.newArrayList();
private boolean pushDownCount = false;
private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;

/**
* External file scan node for Query iceberg table
Expand Down Expand Up @@ -137,6 +140,9 @@ private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSpli
int formatVersion = icebergSplit.getFormatVersion();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
if (pushDownCount) {
fileDesc.setRowCount(icebergSplit.getRowCount());
}
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
Expand Down Expand Up @@ -255,9 +261,24 @@ private List<Split> doGetSplits() throws UserException {
}

TPushAggOp aggOp = getPushDownAggNoGroupingOp();
if (aggOp.equals(TPushAggOp.COUNT) && getCountFromSnapshot() >= 0) {
if (aggOp.equals(TPushAggOp.COUNT)) {
// we can create a special empty split and skip the plan process
return splits.isEmpty() ? splits : Collections.singletonList(splits.get(0));
if (splits.isEmpty()) {
return splits;
}
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot >= 0) {
pushDownCount = true;
List<Split> pushDownCountSplits;
if (countFromSnapshot > COUNT_WITH_PARALLEL_SPLITS) {
int parallelNum = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
pushDownCountSplits = splits.subList(0, Math.min(splits.size(), parallelNum));
} else {
pushDownCountSplits = Collections.singletonList(splits.get(0));
}
assignCountToSplits(pushDownCountSplits, countFromSnapshot);
return pushDownCountSplits;
}
}

selectedPartitionNum = partitionPathSet.size();
Expand Down Expand Up @@ -374,12 +395,6 @@ private long getCountFromSnapshot() {
@Override
protected void toThrift(TPlanNode planNode) {
super.toThrift(planNode);
if (getPushDownAggNoGroupingOp().equals(TPushAggOp.COUNT)) {
long countFromSnapshot = getCountFromSnapshot();
if (countFromSnapshot >= 0) {
planNode.setPushDownCount(countFromSnapshot);
}
}
}

@Override
Expand All @@ -399,4 +414,13 @@ public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb);
}

private void assignCountToSplits(List<Split> splits, long totalCount) {
int size = splits.size();
long countPerSplit = totalCount / size;
for (int i = 0; i < size - 1; i++) {
((IcebergSplit) splits.get(i)).setRowCount(countPerSplit);
}
((IcebergSplit) splits.get(size - 1)).setRowCount(countPerSplit + totalCount % size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class IcebergSplit extends FileSplit {
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
private Map<String, String> config;
private long rowCount = -1;

// File path will be changed if the file is modified, so there's no need to get modification time.
public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts,
Expand All @@ -47,4 +48,12 @@ public IcebergSplit(LocationPath file, long start, long length, long fileLength,
this.config = config;
this.originalPath = originalPath;
}

public long getRowCount() {
return rowCount;
}

public void setRowCount(long rowCount) {
this.rowCount = rowCount;
}
}
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ struct TIcebergFileDesc {
// Deprecated
5: optional Exprs.TExpr file_select_conjunct;
6: optional string original_file_path;
7: optional i64 row_count;
}

struct TPaimonDeletionFileDesc {
Expand Down
Loading