Skip to content

Commit

Permalink
[PERF] enable jemalloc with background threads (#1361)
Browse files Browse the repository at this point in the history
* Switches allocator to Jemalloc
* Leverages background threads for linux with drop rate of 1 second
* Use 65k size pages for linux arm64
  • Loading branch information
samster25 committed Sep 10, 2023
1 parent aa1ee28 commit 0e3754f
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/python-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ jobs:
# GCC 4.8.5 in manylinux2014 container doesn't support c11 atomic. This caused issues with the `ring` crate that causes TLS to fail
container: messense/manylinux_2_24-cross:aarch64
args: --profile release-lto --out dist --sdist
env:
JEMALLOC_SYS_WITH_LG_PAGE: 16

- name: Build wheels - Mac aarch64
if: ${{ (matrix.os == 'macos') && (matrix.compile_arch == 'aarch64') }}
Expand Down
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ lto = 'fat'
[profile.rust-analyzer]
inherits = "dev"

[target.'cfg(not(target_env = "msvc"))'.dependencies]
libc = {version = "^0.2.8", default-features = false}
tikv-jemallocator = {version = "0.5.4", features = ["disable_initial_exec_tls"]}

[workspace]
members = [
"src/common/error",
Expand Down
5 changes: 3 additions & 2 deletions src/daft-parquet/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ fn streaming_decompression<S: futures::Stream<Item = parquet2::error::Result<Com

rayon::spawn(move || {
let mut buffer = vec![];
let _ = send.send(decompress(compressed_page, &mut buffer));
let page = decompress(compressed_page, &mut buffer);
let _ = send.send(page);

});
yield recv.await.expect("panic while decompressing page");
Expand Down Expand Up @@ -399,7 +400,6 @@ impl ParquetFileReader {
all_arrays.push(arr);
}
}

all_arrays
.into_iter()
.map(|a| {
Expand Down Expand Up @@ -432,6 +432,7 @@ impl ParquetFileReader {
rayon::spawn(move || {
let concated =
Series::concat(&series_to_concat.iter().flatten().collect::<Vec<_>>());
drop(series_to_concat);
let _ = send.send(concated);
});
recv.await.context(OneShotRecvSnafu {})?
Expand Down
33 changes: 32 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,37 @@
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

union U {
x: &'static u8,
y: &'static libc::c_char,
}

#[cfg(target_env = "gnu")]
#[allow(non_upper_case_globals)]
#[export_name = "_rjem_malloc_conf"]
pub static malloc_conf: Option<&'static libc::c_char> = Some(unsafe {
U {
x: &b"oversize_threshold:1,background_thread:true,dirty_decay_ms:1000,muzzy_decay_ms:1000\0"[0],
}
.y
});

#[cfg(target_os = "macos")]
#[allow(non_upper_case_globals)]
#[export_name = "_rjem_malloc_conf"]
pub static malloc_conf: Option<&'static libc::c_char> = Some(unsafe {
U {
x: &b"oversize_threshold:1,background_thread:false,dirty_decay_ms:0,muzzy_decay_ms:0\0"[0],
}
.y
});

#[cfg(feature = "python")]
pub mod pylib {

use pyo3::prelude::*;

#[pyfunction]
Expand Down

0 comments on commit 0e3754f

Please sign in to comment.