Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Add a parallel local CSV reader #2772

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

desmondcheongzx
Copy link
Contributor

@desmondcheongzx desmondcheongzx commented Aug 30, 2024

Adds a parallel CSV reader to speed up ingestion of CSV. The approach adapts some ideas laid out in [1], but the majority of performance gains came from the use of buffer pools to minimize memory allocations.

Some performance numbers

We consider a simple case of reading and performing .collect() on a CSV file with 10^8 rows of 9 fields: 3 string fields, 5 int64 fields, and 1 double field. This file is roughly 5GB in size.

Non-native executor:                 38.71140212500177s
Non-native executor, new CSV reader: 7.432862582994858s
Native executor:                     44.55550079200475s
Native executor, new CSV reader:     4.117344291880727s

This represents a roughly 10x speedup on CSV reads for the native executor.

Followup work

  • The schema is currently taken from the convert options, but we should also perform schema inference.
  • We need to add better estimators for record size, either via sampling, or by keeping track of stats as we go.
  • Currently, for each read, the reader creates a buffer pool for reading CSV records plus a pool of slabs for reading the CSV file. We might need to change these to per-process pools to avoid high memory pressure on concurrent reads. However care must be taken otherwise it's possible for use to deadlock.

[1]: Ge, Chang et al. “Speculative Distributed CSV Data Parsing for Big Data Analytics.” Proceedings of the 2019 International Conference on Management of Data (2019).

@desmondcheongzx desmondcheongzx changed the title Parallel local CSV reader [PERF] Add an experimental parallel local CSV reader Aug 30, 2024
Copy link

codspeed-hq bot commented Aug 30, 2024

CodSpeed Performance Report

Merging #2772 will degrade performances by 33.78%

Comparing desmondcheongzx:local-csv-reader-experiment (7b40f23) with main (cad9168)

Summary

⚡ 2 improvements
❌ 1 regressions
✅ 13 untouched benchmarks

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark main desmondcheongzx:local-csv-reader-experiment Change
test_count[1 Small File] 16.5 ms 24.9 ms -33.78%
test_explain[100 Small Files] 52.8 ms 39.9 ms +32.33%
test_show[100 Small Files] 597.4 ms 355.8 ms +67.91%

Comment on lines +286 to +291
fn next_line_position(input: &[u8]) -> Option<usize> {
// Assuming we are searching for the ASCII `\n` character, we don't need to do any special
// handling for UTF-8, since a `\n` value always corresponds to an ASCII `\n`.
// For more details, see: https://en.wikipedia.org/wiki/UTF-8#Encoding
memchr::memchr(NEWLINE, input)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this break on quoted columns with a newline in them?

c0,c1,quoted_column
1,true,"this is \n quoted"

Copy link
Contributor Author

@desmondcheongzx desmondcheongzx Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's correct. I left some followup TODOs below, but we haven't handled this case yet.

Some ideas we tossed around:

  • Do speculative parsing (assume each chunk could start in quoted or unquoted case, then prune the possibility as we go).
  • Check if we're in a file without quoted fields/let users specify this. In the happy path without quoted fields we can just rip through it. If not use speculative parsing.
  • We can even imagine other edge cases that we might not handle correctly. In those cases simply fallback to an existing mature csv reader.

Comment on lines +495 to +497
// TODO(desmond): Need better upfront estimators. Sample or keep running count of stats.
let estimated_mean_row_size = 100f64;
let estimated_std_row_size = 20f64;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to do something similar to what I did in JSON here to get the estimated sizes

// the following approach still isn't quite right.
// TODO(desmond): Also, is this usage of max_chunks_in_flight correct?
let (sender, receiver) =
crossbeam_channel::bounded(max_chunks_in_flight.unwrap_or(n_threads * 2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might deadlock here, e.g. if your channel is full and all threads are trying to send chunks, they will all block. For parquet, we capped it to expected_num_chunks, i.e. the total amount of chunks that will ever be read. See: https://github.com/Eventual-Inc/Daft/pull/2620/files#diff-f65cce5db7fd92cf9fe1cc38ebd495165b7ac59444a91f0c93c4c4c027089429R392-R400

Copy link
Contributor

@colin-ho colin-ho Aug 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh and also (maybe you have this as a todo but just in case), the ordering should also be preserved, (for the streaming executor, there's a maintain_order flag that indicates if order needs to be preserved, but the non-streaming one requires order). I don't think into_par_iter.for_each will preserve it but correct me if im wrong (I believe it only preserves order if it ends in a collect rayon-rs/rayon#551)

@desmondcheongzx desmondcheongzx changed the title [PERF] Add an experimental parallel local CSV reader [PERF] Add a parallel local CSV reader Sep 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants