Skip to content

Commit

Permalink
fix: Fix deadlock in async parquet scan (#15440)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Apr 3, 2024
1 parent b431cff commit 7e7a35f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 9 deletions.
7 changes: 1 addition & 6 deletions crates/polars-arrow/src/compute/cast/utf8_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,7 @@ fn truncate_buffer(buf: &Buffer<u8>) -> Buffer<u8> {
pub fn binary_to_binview<O: Offset>(arr: &BinaryArray<O>) -> BinaryViewArray {
// Ensure we didn't accidentally set wrong type
#[cfg(not(debug_assertions))]
{
assert_eq!(
std::mem::size_of::<u32>(),
std::mem::size_of::<OffsetType>()
);
}
let _ = std::mem::transmute::<OffsetType, u32>;

let mut views = Vec::with_capacity(arr.len());
let mut uses_buffer = false;
Expand Down
24 changes: 21 additions & 3 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,8 @@ impl BatchedParquetReader {
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
let hive_partition_columns = self.hive_partition_columns.clone();
POOL.spawn(move || {

let f = move || {
let dfs = rg_to_dfs(
&store,
&mut rows_read,
Expand All @@ -687,8 +688,25 @@ impl BatchedParquetReader {
hive_partition_columns.as_deref(),
);
tx.send((dfs, rows_read, limit)).unwrap();
});
let (dfs, rows_read, limit) = rx.await.unwrap();
};

// Spawn the task and wait on it asynchronously.
let (dfs, rows_read, limit) = if POOL.current_thread_index().is_some() {
// We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until
// another rayon thread executes it - we would deadlock if all rayon threads did this.

// Activate another tokio thread to poll futures. There should be at least 1 tokio thread that is
// not a rayon thread.
let handle = tokio::spawn(async { rx.await.unwrap() });
// Now spawn the task onto rayon and participate in executing it. The current thread will no longer
// poll async futures until this rayon task is complete.
POOL.install(f);
handle.await.unwrap()
} else {
POOL.spawn(f);
rx.await.unwrap()
};

self.rows_read = rows_read;
self.limit = limit;
dfs
Expand Down
25 changes: 25 additions & 0 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from collections import OrderedDict
from pathlib import Path
from threading import Thread
from typing import TYPE_CHECKING, Any

import pandas as pd
Expand Down Expand Up @@ -407,3 +408,27 @@ def test_nested_slice_12480(tmp_path: Path) -> None:
df.write_parquet(path, use_pyarrow=True, pyarrow_options={"data_page_size": 1})

assert pl.scan_parquet(path).slice(0, 1).collect().height == 1


@pytest.mark.write_disk()
def test_scan_deadlock_rayon_spawn_from_async_15172(
monkeypatch: Any, tmp_path: Path
) -> None:
monkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
monkeypatch.setenv("POLARS_MAX_THREADS", "1")
path = tmp_path / "data.parquet"

df = pl.Series("x", [1]).to_frame()
df.write_parquet(path)

results = [pl.DataFrame()]

def scan_collect() -> None:
results[0] = pl.collect_all([pl.scan_parquet(path)])[0]

# Make sure we don't sit there hanging forever on the broken case
t = Thread(target=scan_collect, daemon=True)
t.start()
t.join(5)

assert results[0].equals(df)

0 comments on commit 7e7a35f

Please sign in to comment.