diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 813a6c5cedff..0f9a75b0ae79 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -671,8 +671,6 @@ impl BatchedParquetReader { let use_statistics = self.use_statistics; let hive_partition_columns = self.hive_partition_columns.clone(); - // This is a synchronous blocking operation so we run it in a way - // that makes sure we don't block the polling of async tasks. let f = move || { let dfs = rg_to_dfs( &store, @@ -692,15 +690,16 @@ impl BatchedParquetReader { tx.send((dfs, rows_read, limit)).unwrap(); }; + // Spawn the task and wait on it asynchronously. let (dfs, rows_read, limit) = if POOL.current_thread_index().is_some() { - // We are a rayon thread, so we must actively participate in driving rayon tasks, thus we use POOL.install. - // Using POOL.spawn as a rayon thread essentially means that a rayon thread is spawning a task that it refuses to run, - // and if every rayon thread did this then we would deadlock. - // - // POOL.install blocks the current thread, so we activate another tokio thread using tokio::spawn. - // This (should) ensure that futures continue to be polled in a timely manner, since the new - // tokio thread would be able to work-steal async tasks from threads that are blocking for too long. + // We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until + // another rayon thread executes it - we would deadlock if all rayon threads did this. + + // Activate another tokio thread to poll futures. There should be at least 1 tokio thread that is + // not a rayon thread. let handle = tokio::spawn(async { rx.await.unwrap() }); + // Now spawn the task onto rayon and participate in executing it. The current thread will no longer + // poll async futures until this rayon task is complete. POOL.install(f); handle.await.unwrap() } else {