Skip to content

Commit

Permalink
fix(rust): Bitpack full buffer when full
Browse files Browse the repository at this point in the history
  • Loading branch information
thalassemia committed May 9, 2024
1 parent f419a10 commit 6c3380c
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@ impl Encoder<u32> for u32 {
.for_each(|(item, buf)| *buf = item);

let mut packed = [0u8; 4 * U32_BLOCK_LEN];
bitpacked::encode_pack(&buffer[..remainder], num_bits, packed.as_mut());
// No need to zero rest of buffer because remainder is either:
// * Multiple of 8: We pad non-terminal literal runs to have a
// multiple of 8 values. Once compressed, the data will end on
// clean byte boundaries and packed[..compressed_remainder_size]
// will include only the remainder values and nothing extra.
// * Final run: Extra values from buffer will be included in
// packed[..compressed_remainder_size] but ignored when decoding
// because they extend beyond known column length
bitpacked::encode_pack(&buffer, num_bits, packed.as_mut());
writer.write_all(&packed[..compressed_remainder_size])?;
};
Ok(())
Expand Down Expand Up @@ -149,26 +157,30 @@ pub fn encode<T: PartialEq + Default + Copy + Encoder<T>, W: Write, I: Iterator<
for val in iterator {
if val == previous_val {
consecutive_repeats += 1;
// Run is long enough to RLE, no need to buffer values
if consecutive_repeats >= 8 {
// Run is long enough to RLE, no need to buffer values
if consecutive_repeats > 8 {
continue;
} else {
// Ensure literal run has multiple of 8 values
// Take from consecutive repeats if needed to pad up
// When we encounter a run long enough to potentially RLE,
// we must first ensure that the buffered literal run has
// a multiple of 8 values for bit-packing. If not, we pad
// up by taking some of the consecutive repeats
let literal_padding = (8 - (literal_run_idx % 8)) % 8;
consecutive_repeats -= literal_padding;
literal_run_idx += literal_padding;
}
}
// Too short to RLE, continue to buffer values
} else if consecutive_repeats > 8 {
// Flush literal run, if any, before RLE run
// Value changed so start a new run but the current run is long
// enough to RLE. First, bit-pack any buffered literal run. Then,
// RLE current run and reset consecutive repeat counter and buffer.
if literal_run_idx > 0 {
debug_assert!(literal_run_idx % 8 == 0);
T::bitpacked_encode(
writer,
buffered_bits.iter().copied().take(literal_run_idx),
buffered_bits.iter().take(literal_run_idx).copied(),
num_bits as usize,
)?;
literal_run_idx = 0;
Expand All @@ -177,44 +189,56 @@ pub fn encode<T: PartialEq + Default + Copy + Encoder<T>, W: Write, I: Iterator<
consecutive_repeats = 1;
buffer_idx = 0;
} else {
// Not enough consecutive repeats to RLE, extend literal run
// Value changed so start a new run but the current run is not long
// enough to RLE. Consolidate all consecutive repeats into buffered
// literal run.
literal_run_idx = buffer_idx;
consecutive_repeats = 1;
}
// If buffer is full, bit-pack as literal run and reset
if buffer_idx == MAX_VALUES_PER_LITERAL_RUN {
T::bitpacked_encode(
writer,
buffered_bits.iter().copied().take(literal_run_idx),
num_bits as usize,
)?;
// Consecutive repeats may be consolidated into literal run
consecutive_repeats -= buffer_idx - literal_run_idx;
T::bitpacked_encode(writer, buffered_bits.iter().copied(), num_bits as usize)?;
// If buffer fills up in the middle of a run, all but the last
// repeat is consolidated into the literal run.
debug_assert!(
(consecutive_repeats < 8)
&& (buffer_idx - literal_run_idx == consecutive_repeats - 1)
);
consecutive_repeats = 1;
buffer_idx = 0;
literal_run_idx = 0;
}
buffered_bits[buffer_idx] = val;
previous_val = val;
buffer_idx += 1;
}
// Not enough consecutive repeats to RLE, extend literal run
// Final run not long enough to RLE, extend literal run.
if consecutive_repeats <= 8 {
literal_run_idx = buffer_idx;
consecutive_repeats = 0;
}
// Bit-pack final buffered literal run, if any
if literal_run_idx > 0 {
T::bitpacked_encode(
writer,
buffered_bits.iter().copied().take(literal_run_idx),
buffered_bits.iter().take(literal_run_idx).copied(),
num_bits as usize,
)?;
}
// RLE final consecutive run if long enough
if consecutive_repeats > 8 {
T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?;
}
Ok(())
}

// If buffer fills up in the middle of a run, all but the last
// repeat is consolidated into the literal run.
debug_assert!(
(consecutive_repeats < 8)
&& (buffer_idx - literal_run_idx == consecutive_repeats - 1)
);
consecutive_repeats -= buffer_idx - literal_run_idx;

#[cfg(test)]
mod tests {
use super::super::bitmap::BitmapIter;
Expand Down

0 comments on commit 6c3380c

Please sign in to comment.