-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel Apache Arrow + DuckDB Solution to the One Trillion Row Challenge 26 mins 15 secs on Dell Workstation #3
Comments
Addendum. Although not specified in the task spec, the original output looked untidy so I modified the final DuckDB query to perform a diacritic insensitive sort of the station names.
Which generated the much neater looking output:
The final output, in CSV format, may be found here: |
Cool. I'm curious, where was the data stored, on the local RAID? |
That's fun. Did you achieve that read speed during the calculation? (if so, my guess is that the choice of computational tool doesn't matter that much, and we're just IO bound here) |
I was monitoring the I/O. It peaked at around 7 GB/s per worker, but also had idle spots in between. My estimate of the split is 18 minutes I/O and data transfer, 8 minutes computation time. |
I'm surprirsed that they weren't overlapping more. Need more concurrency maybe? |
The Python Multiprocessing library, which is what I used in this case, is quite basic and lacks many controls. There is almost certainly a better approach. I have some ideas to try. |
I hear dask is quite a solid upgrade from the stdlib multiprocessing library 😉 |
I tried Dask and Dask-CUDF (see my LinkedIn posts). Both worked, but took around 4 to 5 times longer than the native Apache Arrow solution. Running Arrow batches in parallel, with optimal batch size and number of workers, while not particularly elegant was by far the fastest on this particular system. |
I think that @shughes-uk is likely talking about just using raw Dask the parallel computing solution, not Dask Dataframe the big pandas implementation (sometimes people conflate the two). This doc might be of interest: https://docs.dask.org/en/stable/futures.html |
That's really interesting. Being totally honest, I have always thought of the Dask Dataframe. I will definitely look into this. Thank you @shughes-uk and @mrocklin |
I really appreciate the advice @shughes-uk @mrocklin. I just read the docs, and tried the example notebook on the workstation. Dask futures is a really powerful tool, thank you. |
I'm glad you like it. Was it able to boost your preference at all or did you get the same as with multiprocessing? |
It's was marginally faster, but it's a neater solution to code as I could tier the final stage aggregation of the chunk summaries as a function using the results from the first stage. Total run time came down to 24 minutes, 10 seconds. The task is very much I/O bound but by tweaking parameters I was able to reduce idle times and speed it up slightly. I experimented with parameters. It's a trade off between maximising occupancy and minimising file system contention on the parallel tasks. On this hardware I found:
Initialisation:
Aggregate chunks function:
Aggregate results of chunks to produce summary:
Set the number of parameters and create the partial function:
Map the aggregate chunks function:
Submit the summary aggregation function to work on the futures from the above:
Get the results from the summary aggregation:
And here is the summary:
|
This is odd. It sounds like the partial'ed function might be really hard to serialize, maybe because it contains the list of the 100,000 files? I'll bet that if you were to partition the files ahead of time (maybe with |
That workstation could do lots of fun things with Dask |
I wanted to test my new Dell Precision 7960 workstation on this task.
The hardware spec is: Intel(R) Xeon(R) w5-3435X CPU 16 core/32 thread max speed 4.7GHz, 512GB DDR5 LRDIMMs running at 4400 MHz, 4 x Samsung Pro 990 2TB Gen 4 NVMe in a RAID 0 in a Dell UltraSpeed card in a PCIe 5.0 x 16 slot, NVIDIA RTX A6000 (Ampere) GPU 48GB.
I tried several approaches, but settled for a native Apache Arrow table group by solution using parallel workers to execute the chunks. The first stage aggregation uses Apache Arrow tables to compute min, max, sum and count of temperature for each station in a group by.
Following concatenation of the group by tables, a second stage aggregation is run using DuckDB to group by station name and compute min and max of the aggregate and mean by dividing the aggregate sum by the aggregate count.
Which generates the following output:
Interestingly the optimal solution, found by trial and error, was to use a smaller file batch size (10 files) and 8 parallel workers.
Following concatenation of the group by tables, a second stage aggregation is run using DuckDB to group and sort by station name then compute the min and max of the aggregate chunks and the mean computed by dividing the aggregate sum by the aggregate count.
Explanation:
Execute the query:
Generates the output:
Total elapsed time:
The text was updated successfully, but these errors were encountered: