Skip to content

Commit

Permalink
split if block too big during append
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Sep 10, 2024
1 parent 16c7d99 commit 39b48b3
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 254 deletions.
15 changes: 0 additions & 15 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,21 +318,6 @@ impl DataBlock {
res
}

pub fn split_by_rows_if_needed_no_tail(&self, min_rows_per_block: usize) -> Vec<Self> {
let max_rows_per_block = min_rows_per_block * 2;
let mut res = vec![];
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
let cut = self.slice(offset..(offset + min_rows_per_block));
res.push(cut);
offset += min_rows_per_block;
remain_rows -= min_rows_per_block;
}
res.push(self.slice(offset..(offset + remain_rows)));
res
}

#[inline]
pub fn merge_block(&mut self, block: DataBlock) {
self.columns.reserve(block.num_columns());
Expand Down
38 changes: 22 additions & 16 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,32 @@ impl BlockThresholds {
total_rows <= self.max_rows_per_block && total_bytes <= self.max_bytes_per_block
}

#[inline]
pub fn calc_rows_per_block(&self, total_bytes: usize, total_rows: usize) -> usize {
let mut block_num = std::cmp::max(total_bytes / self.max_bytes_per_block, 1);
let mut rows_per_block = total_rows.div_ceil(block_num);
if self.check_for_compact(total_rows, total_bytes) {
return total_rows;
}

let max_bytes_per_block = if rows_per_block < self.max_rows_per_block / 10 {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
} else if rows_per_block < self.max_rows_per_block / 2 {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
} else if rows_per_block < self.min_rows_per_block {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
} else {
self.max_bytes_per_block
let mut rows_per_block = total_rows.div_ceil(total_bytes / self.max_bytes_per_block);
let max_bytes_per_block = match rows_per_block {
v if v < self.max_rows_per_block / 10 => {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
}
v if v < self.max_rows_per_block / 2 => {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
}
v if v < self.min_rows_per_block => {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
}
_ => self.max_bytes_per_block,
};

if block_num > 1 && max_bytes_per_block > self.max_bytes_per_block {
block_num = std::cmp::max(total_bytes / max_bytes_per_block, 1);
rows_per_block = total_rows.div_ceil(block_num);
if max_bytes_per_block > self.max_bytes_per_block {
rows_per_block =
total_rows.div_ceil(std::cmp::max(total_bytes / max_bytes_per_block, 1));
}

rows_per_block.min(self.max_rows_per_block)
Expand Down
15 changes: 0 additions & 15 deletions src/query/expression/tests/it/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,6 @@ use databend_common_expression::Value;

use crate::common::new_block;

#[test]
fn test_split_block() {
let value = "abc";
let n = 10;
let block = new_block(&[Column::String(
StringColumnBuilder::repeat(value, n).build(),
)]);
let sizes = block
.split_by_rows_if_needed_no_tail(3)
.iter()
.map(|b| b.num_rows())
.collect::<Vec<_>>();
assert_eq!(sizes, vec![3, 3, 4]);
}

#[test]
fn test_box_render_block() {
let value = "abc";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ mod transform_accumulating;
mod transform_accumulating_async;
mod transform_async;
mod transform_block_compact;
mod transform_block_compact_for_copy;
mod transform_blocking;
mod transform_compact;
mod transform_dummy;
Expand All @@ -35,7 +34,6 @@ pub use transform_accumulating::*;
pub use transform_accumulating_async::*;
pub use transform_async::*;
pub use transform_block_compact::*;
pub use transform_block_compact_for_copy::*;
pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_dummy::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,23 @@ use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;

use super::Compactor;
use crate::processors::Compactor;

pub struct BlockCompactor {
thresholds: BlockThresholds,
aborting: Arc<AtomicBool>,
// call block.memory_size() only once.
// we may no longer need it if we start using jsonb, otherwise it should be put in CompactorState
accumulated_rows: usize,
accumulated_bytes: usize,
}

impl BlockCompactor {
pub fn new(thresholds: BlockThresholds) -> Self {
BlockCompactor {
thresholds,
accumulated_rows: 0,
accumulated_bytes: 0,
aborting: Arc::new(AtomicBool::new(false)),
}
}
Expand All @@ -57,97 +63,55 @@ impl Compactor for BlockCompactor {

let size = blocks.len();
let mut res = Vec::with_capacity(size);
let block = blocks[size - 1].clone();
let num_rows = blocks[size - 1].num_rows();
let num_bytes = blocks[size - 1].memory_size();

// perfect block
if self
.thresholds
.check_perfect_block(block.num_rows(), block.memory_size())
if num_rows > self.thresholds.max_rows_per_block
|| num_bytes > self.thresholds.max_bytes_per_block * 2
{
// holding slices of blocks to merge later may lead to oom, so
// 1. we expect blocks from file formats are not slice.
// 2. if block is split here, cut evenly and emit them at once.
let rows_per_block = self.thresholds.calc_rows_per_block(num_bytes, num_rows);
let block = blocks.pop().unwrap();
res.extend(block.split_by_rows_no_tail(rows_per_block));
} else if self.thresholds.check_large_enough(num_rows, num_bytes) {
// pass through the new data block just arrived
let block = blocks.pop().unwrap();
res.push(block);
blocks.remove(size - 1);
} else {
let accumulated_rows: usize = blocks.iter_mut().map(|b| b.num_rows()).sum();
let accumulated_bytes: usize = blocks.iter_mut().map(|b| b.memory_size()).sum();

let merged = DataBlock::concat(blocks)?;
blocks.clear();

if accumulated_rows >= self.thresholds.max_rows_per_block {
let (perfect, remain) = merged.split_by_rows(self.thresholds.max_rows_per_block);
res.extend(perfect);
if let Some(b) = remain {
blocks.push(b);
}
} else if accumulated_bytes >= self.thresholds.max_bytes_per_block {
// too large for merged block, flush to results
let accumulated_rows_new = self.accumulated_rows + num_rows;
let accumulated_bytes_new = self.accumulated_bytes + num_bytes;

if self
.thresholds
.check_large_enough(accumulated_rows_new, accumulated_bytes_new)
{
// avoid call concat_blocks for each new block
let merged = DataBlock::concat(blocks)?;
blocks.clear();
self.accumulated_rows = 0;
self.accumulated_bytes = 0;
res.push(merged);
} else {
// keep the merged block into blocks for future merge
blocks.push(merged);
self.accumulated_rows = accumulated_rows_new;
self.accumulated_bytes = accumulated_bytes_new;
}
}

Ok(res)
}

fn compact_final(&mut self, blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
let mut res = Vec::with_capacity(blocks.len());
let mut temp_blocks = vec![];
let mut accumulated_rows = 0;
let aborted_query_err = || {
Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
))
};
for block in blocks.iter() {
if self.aborting.load(Ordering::Relaxed) {
return aborted_query_err();
}

// Perfect block, no need to compact
if self
.thresholds
.check_perfect_block(block.num_rows(), block.memory_size())
{
res.push(block.clone());
} else {
let block = if block.num_rows() > self.thresholds.max_rows_per_block {
let b = block.slice(0..self.thresholds.max_rows_per_block);
res.push(b);
block.slice(self.thresholds.max_rows_per_block..block.num_rows())
} else {
block.clone()
};

accumulated_rows += block.num_rows();
temp_blocks.push(block);

while accumulated_rows >= self.thresholds.max_rows_per_block {
if self.aborting.load(Ordering::Relaxed) {
return aborted_query_err();
}

let block = DataBlock::concat(&temp_blocks)?;
res.push(block.slice(0..self.thresholds.max_rows_per_block));
accumulated_rows -= self.thresholds.max_rows_per_block;

temp_blocks.clear();
if accumulated_rows != 0 {
temp_blocks.push(
block.slice(self.thresholds.max_rows_per_block..block.num_rows()),
);
}
}
}
}

if accumulated_rows != 0 {
let mut res = vec![];
if self.accumulated_rows != 0 {
if self.aborting.load(Ordering::Relaxed) {
return aborted_query_err();
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let block = DataBlock::concat(&temp_blocks)?;
let block = DataBlock::concat(&blocks)?;
res.push(block);
}

Expand Down

This file was deleted.

Loading

0 comments on commit 39b48b3

Please sign in to comment.