From bb09b83f72384d71e36683f229db7929f2f8daca Mon Sep 17 00:00:00 2001 From: thalassemia <67928790+thalassemia@users.noreply.github.com> Date: Wed, 8 May 2024 13:54:22 -0700 Subject: [PATCH] fix(rust): Bitpack full buffer when full --- .../parquet/encoding/hybrid_rle/encoder.rs | 50 ++++++++++++------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs index 963499cf324f..7e1858e44979 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/encoder.rs @@ -73,7 +73,15 @@ impl Encoder for u32 { .for_each(|(item, buf)| *buf = item); let mut packed = [0u8; 4 * U32_BLOCK_LEN]; - bitpacked::encode_pack(&buffer[..remainder], num_bits, packed.as_mut()); + // No need to zero rest of buffer because remainder is either: + // * Multiple of 8: We pad non-terminal literal runs to have a + // multiple of 8 values. Once compressed, the data will end on + // clean byte boundaries and packed[..compressed_remainder_size] + // will include only the remainder values and nothing extra. + // * Final run: Extra values from buffer will be included in + // packed[..compressed_remainder_size] but ignored when decoding + // because they extend beyond known column length + bitpacked::encode_pack(&buffer, num_bits, packed.as_mut()); writer.write_all(&packed[..compressed_remainder_size])?; }; Ok(()) @@ -149,14 +157,15 @@ pub fn encode, W: Write, I: Iterator< for val in iterator { if val == previous_val { consecutive_repeats += 1; - // Run is long enough to RLE, no need to buffer values if consecutive_repeats >= 8 { // Run is long enough to RLE, no need to buffer values if consecutive_repeats > 8 { continue; } else { - // Ensure literal run has multiple of 8 values - // Take from consecutive repeats if needed to pad up + // When we encounter a run long enough to potentially RLE, + // we must first ensure that the buffered literal run has + // a multiple of 8 values for bit-packing. If not, we pad + // up by taking some of the consecutive repeats let literal_padding = (8 - (literal_run_idx % 8)) % 8; consecutive_repeats -= literal_padding; literal_run_idx += literal_padding; @@ -164,11 +173,14 @@ pub fn encode, W: Write, I: Iterator< } // Too short to RLE, continue to buffer values } else if consecutive_repeats > 8 { - // Flush literal run, if any, before RLE run + // Value changed so start a new run but the current run is long + // enough to RLE. First, bit-pack any buffered literal run. Then, + // RLE current run and reset consecutive repeat counter and buffer. if literal_run_idx > 0 { + debug_assert!(literal_run_idx % 8 == 0); T::bitpacked_encode( writer, - buffered_bits.iter().copied().take(literal_run_idx), + buffered_bits.iter().take(literal_run_idx).copied(), num_bits as usize, )?; literal_run_idx = 0; @@ -177,19 +189,22 @@ pub fn encode, W: Write, I: Iterator< consecutive_repeats = 1; buffer_idx = 0; } else { - // Not enough consecutive repeats to RLE, extend literal run + // Value changed so start a new run but the current run is not long + // enough to RLE. Consolidate all consecutive repeats into buffered + // literal run. literal_run_idx = buffer_idx; consecutive_repeats = 1; } // If buffer is full, bit-pack as literal run and reset if buffer_idx == MAX_VALUES_PER_LITERAL_RUN { - T::bitpacked_encode( - writer, - buffered_bits.iter().copied().take(literal_run_idx), - num_bits as usize, - )?; - // Consecutive repeats may be consolidated into literal run - consecutive_repeats -= buffer_idx - literal_run_idx; + T::bitpacked_encode(writer, buffered_bits.iter().copied(), num_bits as usize)?; + // If buffer fills up in the middle of a run, all but the last + // repeat is consolidated into the literal run. + debug_assert!( + (consecutive_repeats < 8) + && (buffer_idx - literal_run_idx == consecutive_repeats - 1) + ); + consecutive_repeats = 1; buffer_idx = 0; literal_run_idx = 0; } @@ -197,18 +212,19 @@ pub fn encode, W: Write, I: Iterator< previous_val = val; buffer_idx += 1; } - // Not enough consecutive repeats to RLE, extend literal run + // Final run not long enough to RLE, extend literal run. if consecutive_repeats <= 8 { literal_run_idx = buffer_idx; - consecutive_repeats = 0; } + // Bit-pack final buffered literal run, if any if literal_run_idx > 0 { T::bitpacked_encode( writer, - buffered_bits.iter().copied().take(literal_run_idx), + buffered_bits.iter().take(literal_run_idx).copied(), num_bits as usize, )?; } + // RLE final consecutive run if long enough if consecutive_repeats > 8 { T::run_length_encode(writer, consecutive_repeats, previous_val, num_bits)?; }