Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add non-order preserving variable row-encoding #15414

Merged
merged 2 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/polars-core/src/chunked_array/logical/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use arrow::legacy::trusted_len::TrustedLenPush;
use arrow::offset::OffsetsBuffer;
use smartstring::alias::String as SmartString;

use self::sort::arg_sort_multiple::_get_rows_encoded_ca;
use super::*;
use crate::chunked_array::iterator::StructIter;
use crate::datatypes::*;
use crate::prelude::sort::arg_sort_multiple::_get_rows_encoded_ca_unordered;
use crate::utils::index_to_chunked_index;

/// This is logical type [`StructChunked`] that
Expand Down Expand Up @@ -415,8 +415,7 @@ impl StructChunked {
}

pub fn rows_encode(&self) -> PolarsResult<BinaryOffsetChunked> {
let descending = vec![false; self.fields.len()];
_get_rows_encoded_ca(self.name(), &self.fields, &descending, false)
_get_rows_encoded_ca_unordered(self.name(), &self.fields)
}

pub fn iter(&self) -> StructIter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use compare_inner::NullOrderCmp;
use polars_row::{convert_columns, RowsEncoded, SortField};
use polars_row::{convert_columns, EncodingField, RowsEncoded};
use polars_utils::iter::EnumerateIdxTrait;

use super::*;
Expand Down Expand Up @@ -87,31 +87,55 @@ pub fn _get_rows_encoded_compat_array(by: &Series) -> PolarsResult<ArrayRef> {
Ok(out)
}

pub(crate) fn encode_rows_vertical_par_default(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
pub(crate) fn encode_rows_vertical_par_unordered(
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
let n_threads = POOL.current_num_threads();
let len = by[0].len();
let splits = _split_offsets(len, n_threads);
let descending = vec![false; by.len()];

let chunks = splits.into_par_iter().map(|(offset, len)| {
let sliced = by
.iter()
.map(|s| s.slice(offset as i64, len))
.collect::<Vec<_>>();
let rows = _get_rows_encoded(&sliced, &descending, false)?;
let rows = _get_rows_encoded_unordered(&sliced)?;
Ok(rows.into_array())
});
let chunks = POOL.install(|| chunks.collect::<PolarsResult<Vec<_>>>());

Ok(BinaryOffsetChunked::from_chunk_iter("", chunks?))
}

pub(crate) fn encode_rows_default(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let descending = vec![false; by.len()];
let rows = _get_rows_encoded(by, &descending, false)?;
pub(crate) fn encode_rows_unordered(by: &[Series]) -> PolarsResult<BinaryOffsetChunked> {
let rows = _get_rows_encoded_unordered(by)?;
Ok(BinaryOffsetChunked::with_chunk("", rows.into_array()))
}

pub fn _get_rows_encoded_unordered(by: &[Series]) -> PolarsResult<RowsEncoded> {
let mut cols = Vec::with_capacity(by.len());
let mut fields = Vec::with_capacity(by.len());
for by in by {
let arr = _get_rows_encoded_compat_array(by)?;
let field = EncodingField::new_unsorted();
match arr.data_type() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
for arr in arr.values() {
cols.push(arr.clone() as ArrayRef);
fields.push(field)
}
},
_ => {
cols.push(arr);
fields.push(field)
},
}
}
Ok(convert_columns(&cols, &fields))
}

pub fn _get_rows_encoded(
by: &[Series],
descending: &[bool],
Expand All @@ -123,17 +147,18 @@ pub fn _get_rows_encoded(
for (by, descending) in by.iter().zip(descending) {
let arr = _get_rows_encoded_compat_array(by)?;

let sort_field = SortField {
let sort_field = EncodingField {
descending: *descending,
nulls_last,
no_order: false,
};
match arr.data_type() {
// Flatten the struct fields.
ArrowDataType::Struct(_) => {
let arr = arr.as_any().downcast_ref::<StructArray>().unwrap();
for arr in arr.values() {
cols.push(arr.clone() as ArrayRef);
fields.push(sort_field.clone())
fields.push(sort_field)
}
},
_ => {
Expand All @@ -155,6 +180,14 @@ pub fn _get_rows_encoded_ca(
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}

pub fn _get_rows_encoded_ca_unordered(
name: &str,
by: &[Series],
) -> PolarsResult<BinaryOffsetChunked> {
_get_rows_encoded_unordered(by)
.map(|rows| BinaryOffsetChunked::with_chunk(name, rows.into_array()))
}

pub(crate) fn argsort_multiple_row_fmt(
by: &[Series],
mut descending: Vec<bool>,
Expand Down
1 change: 0 additions & 1 deletion crates/polars-core/src/frame/group_by/into_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ impl IntoGroupsProxy for BinaryChunked {
})
.collect::<Vec<_>>()
});
let byte_hashes = byte_hashes.iter().collect::<Vec<_>>();
group_by_threaded_slice(byte_hashes, n_partitions, sorted)
} else {
let byte_hashes = fill_bytes_hashes(self, null_h, hb.clone());
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-core/src/frame/group_by/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub use into_groups::*;
pub use proxy::*;

use crate::prelude::sort::arg_sort_multiple::{
encode_rows_default, encode_rows_vertical_par_default,
encode_rows_unordered, encode_rows_vertical_par_unordered,
};

impl DataFrame {
Expand Down Expand Up @@ -84,9 +84,9 @@ impl DataFrame {
})
} else {
let rows = if multithreaded {
encode_rows_vertical_par_default(&by)
encode_rows_vertical_par_unordered(&by)
} else {
encode_rows_default(&by)
encode_rows_unordered(&by)
}?
.into_series();
rows.group_tuples(multithreaded, sorted)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cell::UnsafeCell;

use polars_core::export::ahash::RandomState;
use polars_row::{RowsEncoded, SortField};
use polars_row::{EncodingField, RowsEncoded};

use super::*;
use crate::executors::sinks::group_by::utils::prepare_key;
Expand All @@ -18,7 +18,7 @@ pub(super) struct Eval {
aggregation_series: UnsafeCell<Vec<Series>>,
keys_columns: UnsafeCell<Vec<ArrayRef>>,
hashes: Vec<u64>,
key_fields: Vec<SortField>,
key_fields: Vec<EncodingField>,
// amortizes the encoding buffers
rows_encoded: RowsEncoded,
}
Expand Down
15 changes: 6 additions & 9 deletions crates/polars-pipe/src/executors/sinks/sort/sink_multiple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,20 @@ use polars_core::prelude::*;
use polars_core::series::IsSorted;
use polars_plan::prelude::*;
use polars_row::decode::decode_rows_from_binary;
use polars_row::SortField;
use polars_row::EncodingField;

use super::*;
use crate::operators::{
DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, Source, SourceResult,
};
const POLARS_SORT_COLUMN: &str = "__POLARS_SORT_COLUMN";

fn get_sort_fields(sort_idx: &[usize], sort_args: &SortArguments) -> Vec<SortField> {
fn get_sort_fields(sort_idx: &[usize], sort_args: &SortArguments) -> Vec<EncodingField> {
let mut descending = sort_args.descending.clone();
_broadcast_descending(sort_idx.len(), &mut descending);
descending
.into_iter()
.map(|descending| SortField {
descending,
nulls_last: sort_args.nulls_last,
})
.map(|descending| EncodingField::new_sorted(descending, sort_args.nulls_last))
.collect()
}

Expand Down Expand Up @@ -61,7 +58,7 @@ fn finalize_dataframe(
can_decode: bool,
sort_dtypes: Option<&[ArrowDataType]>,
rows: &mut Vec<&'static [u8]>,
sort_fields: &[SortField],
sort_fields: &[EncodingField],
schema: &Schema,
) {
unsafe {
Expand Down Expand Up @@ -126,7 +123,7 @@ pub struct SortSinkMultiple {
sort_sink: Box<dyn Sink>,
sort_args: SortArguments,
// Needed for encoding
sort_fields: Arc<[SortField]>,
sort_fields: Arc<[EncodingField]>,
sort_dtypes: Option<Arc<[DataType]>>,
// amortize allocs
sort_column: Vec<ArrayRef>,
Expand Down Expand Up @@ -320,7 +317,7 @@ struct DropEncoded {
can_decode: bool,
sort_dtypes: Option<Vec<ArrowDataType>>,
rows: Vec<&'static [u8]>,
sort_fields: Arc<[SortField]>,
sort_fields: Arc<[EncodingField]>,
output_schema: SchemaRef,
}

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-row/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::variable::{decode_binary, decode_binview};
/// encodings.
pub unsafe fn decode_rows_from_binary<'a>(
arr: &'a BinaryArray<i64>,
fields: &[SortField],
fields: &[EncodingField],
data_types: &[ArrowDataType],
rows: &mut Vec<&'a [u8]>,
) -> Vec<ArrayRef> {
Expand All @@ -27,7 +27,7 @@ pub unsafe fn decode_rows_from_binary<'a>(
pub unsafe fn decode_rows(
// the rows will be updated while the data is decoded
rows: &mut [&[u8]],
fields: &[SortField],
fields: &[EncodingField],
data_types: &[ArrowDataType],
) -> Vec<ArrayRef> {
assert_eq!(fields.len(), data_types.len());
Expand All @@ -38,7 +38,7 @@ pub unsafe fn decode_rows(
.collect()
}

unsafe fn decode(rows: &mut [&[u8]], field: &SortField, data_type: &ArrowDataType) -> ArrayRef {
unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, data_type: &ArrowDataType) -> ArrayRef {
match data_type {
ArrowDataType::Null => NullArray::new(ArrowDataType::Null, rows.len()).to_boxed(),
ArrowDataType::Boolean => decode_bool(rows, field).to_boxed(),
Expand Down
Loading
Loading