Skip to content

Commit

Permalink
shorten improve comment
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Apr 2, 2024
1 parent 8a1c3d2 commit 275e2aa
Showing 1 changed file with 8 additions and 9 deletions.
17 changes: 8 additions & 9 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,6 @@ impl BatchedParquetReader {
let use_statistics = self.use_statistics;
let hive_partition_columns = self.hive_partition_columns.clone();

// This is a synchronous blocking operation so we run it in a way
// that makes sure we don't block the polling of async tasks.
let f = move || {
let dfs = rg_to_dfs(
&store,
Expand All @@ -692,15 +690,16 @@ impl BatchedParquetReader {
tx.send((dfs, rows_read, limit)).unwrap();
};

// Spawn the task and wait on it asynchronously.
let (dfs, rows_read, limit) = if POOL.current_thread_index().is_some() {
// We are a rayon thread, so we must actively participate in driving rayon tasks, thus we use POOL.install.
// Using POOL.spawn as a rayon thread essentially means that a rayon thread is spawning a task that it refuses to run,
// and if every rayon thread did this then we would deadlock.
//
// POOL.install blocks the current thread, so we activate another tokio thread using tokio::spawn.
// This (should) ensure that futures continue to be polled in a timely manner, since the new
// tokio thread would be able to work-steal async tasks from threads that are blocking for too long.
// We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until
// another rayon thread executes it - we would deadlock if all rayon threads did this.

// Activate another tokio thread to poll futures. There should be at least 1 tokio thread that is
// not a rayon thread.
let handle = tokio::spawn(async { rx.await.unwrap() });
// Now spawn the task onto rayon and participate in executing it. The current thread will no longer
// poll async futures until this rayon task is complete.
POOL.install(f);
handle.await.unwrap()
} else {
Expand Down

0 comments on commit 275e2aa

Please sign in to comment.