Skip to content

Commit

Permalink
Prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Aug 30, 2024
1 parent 554856d commit 7b40f23
Show file tree
Hide file tree
Showing 7 changed files with 618 additions and 26 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ build: check-toolchain .venv ## Compile and install Daft for development
build-release: check-toolchain .venv ## Compile and install a faster Daft binary
@unset CONDA_PREFIX && PYO3_PYTHON=$(VENV_BIN)/python $(VENV_BIN)/maturin develop --release

.PHONY: build-bench
build-bench: check-toolchain .venv ## Compile and install a faster Daft binary
@unset CONDA_PREFIX && PYO3_PYTHON=$(VENV_BIN)/python $(VENV_BIN)/maturin develop --profile dev-bench

.PHONY: test
test: .venv build ## Run tests
HYPOTHESIS_MAX_EXAMPLES=$(HYPOTHESIS_MAX_EXAMPLES) $(VENV_BIN)/pytest --hypothesis-seed=$(HYPOTHESIS_SEED)
Expand Down
25 changes: 24 additions & 1 deletion src/arrow2/src/io/csv/read_async/reader.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use futures::AsyncRead;

use super::{AsyncReader, ByteRecord};
use crate::io::csv::read;

use crate::error::{Error, Result};

/// Asynchronosly read `len` rows from `reader` into `row`, skipping the first `skip`.
/// Asynchronosly read `rows.len` rows from `reader` into `rows`, skipping the first `skip`.
/// This operation has minimal CPU work and is thus the fastest way to read through a CSV
/// without deserializing the contents to Arrow.
pub async fn read_rows<R>(
Expand Down Expand Up @@ -37,3 +38,25 @@ where
}
Ok(row_number)
}

/// Synchronously read `rows.len` rows from `reader` into `rows`. This is used in the local i/o case.
pub fn local_read_rows<R>(
reader: &mut read::Reader<R>,
rows: &mut [read::ByteRecord],
) -> Result<(usize, bool)>
where
R: std::io::Read,
{
let mut row_number = 0;
let mut has_more = true;
for row in rows.iter_mut() {
has_more = reader
.read_byte_record(row)
.map_err(|e| Error::External(format!(" at line {}", row_number), Box::new(e)))?;
if !has_more {
break;
}
row_number += 1;
}
Ok((row_number, has_more))
}
4 changes: 4 additions & 0 deletions src/daft-csv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ async-compression = {workspace = true}
async-stream = {workspace = true}
common-error = {path = "../common/error", default-features = false}
common-py-serde = {path = "../common/py-serde", default-features = false}
crossbeam-channel = "0.5.1"
csv-async = "1.3.0"
daft-compression = {path = "../daft-compression", default-features = false}
daft-core = {path = "../daft-core", default-features = false}
Expand All @@ -13,6 +14,9 @@ daft-dsl = {path = "../daft-dsl", default-features = false}
daft-io = {path = "../daft-io", default-features = false}
daft-table = {path = "../daft-table", default-features = false}
futures = {workspace = true}
indexmap = {workspace = true, features = ["serde"]}
memchr = "2.7.2"
memmap2 = "0.9.4"
pyo3 = {workspace = true, optional = true}
rayon = {workspace = true}
serde = {workspace = true}
Expand Down
4 changes: 4 additions & 0 deletions src/daft-csv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#![feature(let_chains)]
#![feature(trait_alias)]
#![feature(trait_upcasting)]
#![feature(test)]
extern crate test;
use common_error::DaftError;
use snafu::Snafu;

Expand All @@ -23,6 +25,8 @@ pub enum Error {
#[snafu(display("{source}"))]
IOError { source: daft_io::Error },
#[snafu(display("{source}"))]
StdIOError { source: std::io::Error },
#[snafu(display("{source}"))]
CSVError { source: csv_async::Error },
#[snafu(display("Invalid char: {}", val))]
WrongChar {
Expand Down
Loading

0 comments on commit 7b40f23

Please sign in to comment.