Skip to content

Commit

Permalink
Replicate py-polars API surface for streaming IPC formats (#249)
Browse files Browse the repository at this point in the history
TLDR: Solves #109

More or less the IPC Stream methods are straight copies of the IPC File
(Feather) ones, swapping out the IpcReader, IpcWriter for their
streaming equivalents; the API should be identical to py-polars (with
the exception of file-like objects as input for read_ipc,
read_ipc_stream - not much point adding that until streaming IO is
exposed upstream).

I've left the docstrings basically untouched, let me know if you want
those tweaked (the `@param` s appear to have drifted over time).
  • Loading branch information
H-Plus-Time authored Aug 16, 2024
1 parent eae13e8 commit b243288
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ features = [
"parquet",
"to_dummies",
"ipc",
"ipc_streaming",
"avro",
"list_eval",
"arg_where",
Expand Down
36 changes: 36 additions & 0 deletions __tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,42 @@ describe("ipc", () => {
expect(ipcDF).toFrameEqual(csvDF);
});
});
describe("ipc stream", () => {
beforeEach(() => {
pl.readCSV(csvpath).writeIPCStream(ipcpath);
});
afterEach(() => {
fs.rmSync(ipcpath);
});

test("read", () => {
const df = pl.readIPCStream(ipcpath);
expect(df.shape).toEqual({ height: 27, width: 4 });
});
test("read/write:buffer", () => {
const buff = pl.readCSV(csvpath).writeIPCStream();
const df = pl.readIPCStream(buff);
expect(df.shape).toEqual({ height: 27, width: 4 });
});
test("read:compressed", () => {
const csvDF = pl.readCSV(csvpath);
csvDF.writeIPCStream(ipcpath, { compression: "lz4" });
const ipcDF = pl.readIPCStream(ipcpath);
expect(ipcDF).toFrameEqual(csvDF);
});

test("read:options", () => {
const df = pl.readIPCStream(ipcpath, { nRows: 4 });
expect(df.shape).toEqual({ height: 4, width: 4 });
});

test("writeIPCStream", () => {
const csvDF = pl.readCSV(csvpath);
csvDF.writeIPCStream(ipcpath);
const ipcDF = pl.readIPCStream(ipcpath);
expect(ipcDF).toFrameEqual(csvDF);
});
});

describe("avro", () => {
beforeEach(() => {
Expand Down
40 changes: 36 additions & 4 deletions polars/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,29 @@ interface WriteMethods {
options?: { format: "lines" | "json" },
): void;
/**
* Write to Arrow IPC binary stream, or a feather file.
* @param file File path to which the file should be written.
* Write to Arrow IPC feather file, either to a file path or to a write stream.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
writeIPC(options?: WriteIPCOptions): Buffer;
writeIPC(destination: string | Writable, options?: WriteIPCOptions): void;

/**
* Write to Arrow IPC stream file, either to a file path or to a write stream.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
writeIPCStream(options?: WriteIPCOptions): Buffer;
writeIPCStream(
destination: string | Writable,
options?: WriteIPCOptions,
): void;

/**
* Write the DataFrame disk in parquet format.
* @param file File path to which the file should be written.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
Expand All @@ -163,7 +175,7 @@ interface WriteMethods {

/**
* Write the DataFrame disk in avro format.
* @param file File path to which the file should be written.
* @param destination File path to which the file should be written, or writable.
* @param options.compression Compression method *defaults to "uncompressed"*
* @category IO
*/
Expand Down Expand Up @@ -2511,6 +2523,26 @@ export const _DataFrame = (_df: any): DataFrame => {

return Buffer.concat(buffers);
},
writeIPCStream(dest?, options = { compression: "uncompressed" }) {
if (dest instanceof Writable || typeof dest === "string") {
return _df.writeIpcStream(dest, options.compression) as any;
}
const buffers: Buffer[] = [];
const writeStream = new Stream.Writable({
write(chunk, _encoding, callback) {
buffers.push(chunk);
callback(null);
},
});

_df.writeIpcStream(
writeStream,
dest?.compression ?? options?.compression,
);
writeStream.end("");

return Buffer.concat(buffers);
},
toSeries: (index = 0) => _Series(_df.selectAtIdx(index) as any) as any,
toStruct(name) {
return _Series(_df.toStruct(name));
Expand Down
2 changes: 2 additions & 0 deletions polars/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export namespace pl {
export import readRecords = io.readRecords;
export import readCSV = io.readCSV;
export import readIPC = io.readIPC;
export import readIPCStream = io.readIPCStream;
export import readJSON = io.readJSON;
export import readParquet = io.readParquet;
export import readAvro = io.readAvro;
Expand Down Expand Up @@ -188,6 +189,7 @@ export import scanParquet = io.scanParquet;
export import readRecords = io.readRecords;
export import readCSV = io.readCSV;
export import readIPC = io.readIPC;
export import readIPCStream = io.readIPCStream;
export import readJSON = io.readJSON;
export import readParquet = io.readParquet;
export import readAvro = io.readAvro;
Expand Down
34 changes: 32 additions & 2 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ export interface ReadIPCOptions {
}

/**
* __Read into a DataFrame from Arrow IPC (Feather v2) file.__
* __Read into a DataFrame from Arrow IPC file (Feather v2).__
* ___
* @param pathOrBody - path or buffer or string
* - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`.
Expand All @@ -558,14 +558,44 @@ export function readIPC(pathOrBody, options = {}) {
throw new Error("must supply either a path or body");
}

/**
* __Read into a DataFrame from Arrow IPC stream.__
* ___
* @param pathOrBody - path or buffer or string
* - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`.
* - body: String or buffer to be read as Arrow IPC
* @param options.columns Columns to select. Accepts a list of column names.
* @param options.nRows Stop reading from parquet file after reading ``nRows``.
*/
export function readIPCStream(
pathOrBody: string | Buffer,
options?: Partial<ReadIPCOptions>,
): DataFrame;
export function readIPCStream(pathOrBody, options = {}) {
if (Buffer.isBuffer(pathOrBody)) {
return _DataFrame(pli.readIpcStream(pathOrBody, options));
}

if (typeof pathOrBody === "string") {
const inline = !isPath(pathOrBody, [".ipc"]);
if (inline) {
return _DataFrame(
pli.readIpcStream(Buffer.from(pathOrBody, "utf-8"), options),
);
}
return _DataFrame(pli.readIpcStream(pathOrBody, options));
}
throw new Error("must supply either a path or body");
}

export interface ScanIPCOptions {
nRows: number;
cache: boolean;
rechunk: boolean;
}

/**
* __Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns.__
* __Lazily read from an Arrow IPC file (Feather v2) or multiple files via glob patterns.__
* ___
* @param path Path to a IPC file.
* @param options.nRows Stop reading from IPC file after reading ``nRows``
Expand Down
69 changes: 69 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,43 @@ pub fn read_ipc(
Ok(JsDataFrame::new(df))
}

#[napi(catch_unwind)]
pub fn read_ipc_stream(
path_or_buffer: Either<String, Buffer>,
options: ReadIpcOptions,
) -> napi::Result<JsDataFrame> {
let columns = options.columns;
let projection = options
.projection
.map(|projection| projection.into_iter().map(|p| p as usize).collect());
let row_count = options.row_count.map(|rc| rc.into());
let n_rows = options.n_rows.map(|nr| nr as usize);

let result = match path_or_buffer {
Either::A(path) => {
let f = File::open(&path)?;
let reader = BufReader::new(f);
IpcStreamReader::new(reader)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_count)
.finish()
}
Either::B(buf) => {
let cursor = Cursor::new(buf.as_ref());
IpcStreamReader::new(cursor)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.with_row_index(row_count)
.finish()
}
};
let df = result.map_err(JsPolarsErr::from)?;
Ok(JsDataFrame::new(df))
}

#[napi(object)]
pub struct ReadAvroOptions {
pub columns: Option<Vec<String>>,
Expand Down Expand Up @@ -1426,6 +1463,38 @@ impl JsDataFrame {
Ok(())
}
#[napi(catch_unwind)]
pub fn write_ipc_stream(
&mut self,
path_or_buffer: JsUnknown,
compression: Wrap<Option<IpcCompression>>,
env: Env,
) -> napi::Result<()> {
let compression = compression.0;

match path_or_buffer.get_type()? {
ValueType::String => {
let path: napi::JsString = unsafe { path_or_buffer.cast() };
let path = path.into_utf8()?.into_owned()?;
let f = std::fs::File::create(path).unwrap();
let f = BufWriter::new(f);
IpcStreamWriter::new(f)
.with_compression(compression)
.finish(&mut self.df)
.map_err(JsPolarsErr::from)?;
}
ValueType::Object => {
let inner: napi::JsObject = unsafe { path_or_buffer.cast() };
let writeable = JsWriteStream { inner, env: &env };
IpcStreamWriter::new(writeable)
.with_compression(compression)
.finish(&mut self.df)
.map_err(JsPolarsErr::from)?;
}
_ => panic!(),
};
Ok(())
}
#[napi(catch_unwind)]
pub fn write_json(
&mut self,
path_or_buffer: JsUnknown,
Expand Down

0 comments on commit b243288

Please sign in to comment.