From b24328863f0743de851e722c92a2a29d386d2ddb Mon Sep 17 00:00:00 2001 From: Nicholas Roberts Date: Fri, 16 Aug 2024 14:02:23 +1000 Subject: [PATCH] Replicate py-polars API surface for streaming IPC formats (#249) TLDR: Solves https://github.com/pola-rs/nodejs-polars/issues/109 More or less the IPC Stream methods are straight copies of the IPC File (Feather) ones, swapping out the IpcReader, IpcWriter for their streaming equivalents; the API should be identical to py-polars (with the exception of file-like objects as input for read_ipc, read_ipc_stream - not much point adding that until streaming IO is exposed upstream). I've left the docstrings basically untouched, let me know if you want those tweaked (the `@param` s appear to have drifted over time). --- Cargo.toml | 1 + __tests__/io.test.ts | 36 +++++++++++++++++++++++ polars/dataframe.ts | 40 ++++++++++++++++++++++--- polars/index.ts | 2 ++ polars/io.ts | 34 ++++++++++++++++++++-- src/dataframe.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 176 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 750b1edc6..021e8087f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ features = [ "parquet", "to_dummies", "ipc", + "ipc_streaming", "avro", "list_eval", "arg_where", diff --git a/__tests__/io.test.ts b/__tests__/io.test.ts index 29fd8b4bf..7fdc9b0a1 100644 --- a/__tests__/io.test.ts +++ b/__tests__/io.test.ts @@ -334,6 +334,42 @@ describe("ipc", () => { expect(ipcDF).toFrameEqual(csvDF); }); }); +describe("ipc stream", () => { + beforeEach(() => { + pl.readCSV(csvpath).writeIPCStream(ipcpath); + }); + afterEach(() => { + fs.rmSync(ipcpath); + }); + + test("read", () => { + const df = pl.readIPCStream(ipcpath); + expect(df.shape).toEqual({ height: 27, width: 4 }); + }); + test("read/write:buffer", () => { + const buff = pl.readCSV(csvpath).writeIPCStream(); + const df = pl.readIPCStream(buff); + expect(df.shape).toEqual({ height: 27, width: 4 }); + }); + test("read:compressed", () => { + const csvDF = pl.readCSV(csvpath); + csvDF.writeIPCStream(ipcpath, { compression: "lz4" }); + const ipcDF = pl.readIPCStream(ipcpath); + expect(ipcDF).toFrameEqual(csvDF); + }); + + test("read:options", () => { + const df = pl.readIPCStream(ipcpath, { nRows: 4 }); + expect(df.shape).toEqual({ height: 4, width: 4 }); + }); + + test("writeIPCStream", () => { + const csvDF = pl.readCSV(csvpath); + csvDF.writeIPCStream(ipcpath); + const ipcDF = pl.readIPCStream(ipcpath); + expect(ipcDF).toFrameEqual(csvDF); + }); +}); describe("avro", () => { beforeEach(() => { diff --git a/polars/dataframe.ts b/polars/dataframe.ts index e2b85dbb6..1ba053bac 100644 --- a/polars/dataframe.ts +++ b/polars/dataframe.ts @@ -141,17 +141,29 @@ interface WriteMethods { options?: { format: "lines" | "json" }, ): void; /** - * Write to Arrow IPC binary stream, or a feather file. - * @param file File path to which the file should be written. + * Write to Arrow IPC feather file, either to a file path or to a write stream. + * @param destination File path to which the file should be written, or writable. * @param options.compression Compression method *defaults to "uncompressed"* * @category IO */ writeIPC(options?: WriteIPCOptions): Buffer; writeIPC(destination: string | Writable, options?: WriteIPCOptions): void; + /** + * Write to Arrow IPC stream file, either to a file path or to a write stream. + * @param destination File path to which the file should be written, or writable. + * @param options.compression Compression method *defaults to "uncompressed"* + * @category IO + */ + writeIPCStream(options?: WriteIPCOptions): Buffer; + writeIPCStream( + destination: string | Writable, + options?: WriteIPCOptions, + ): void; + /** * Write the DataFrame disk in parquet format. - * @param file File path to which the file should be written. + * @param destination File path to which the file should be written, or writable. * @param options.compression Compression method *defaults to "uncompressed"* * @category IO */ @@ -163,7 +175,7 @@ interface WriteMethods { /** * Write the DataFrame disk in avro format. - * @param file File path to which the file should be written. + * @param destination File path to which the file should be written, or writable. * @param options.compression Compression method *defaults to "uncompressed"* * @category IO */ @@ -2511,6 +2523,26 @@ export const _DataFrame = (_df: any): DataFrame => { return Buffer.concat(buffers); }, + writeIPCStream(dest?, options = { compression: "uncompressed" }) { + if (dest instanceof Writable || typeof dest === "string") { + return _df.writeIpcStream(dest, options.compression) as any; + } + const buffers: Buffer[] = []; + const writeStream = new Stream.Writable({ + write(chunk, _encoding, callback) { + buffers.push(chunk); + callback(null); + }, + }); + + _df.writeIpcStream( + writeStream, + dest?.compression ?? options?.compression, + ); + writeStream.end(""); + + return Buffer.concat(buffers); + }, toSeries: (index = 0) => _Series(_df.selectAtIdx(index) as any) as any, toStruct(name) { return _Series(_df.toStruct(name)); diff --git a/polars/index.ts b/polars/index.ts index 31090053b..a0a6af301 100644 --- a/polars/index.ts +++ b/polars/index.ts @@ -44,6 +44,7 @@ export namespace pl { export import readRecords = io.readRecords; export import readCSV = io.readCSV; export import readIPC = io.readIPC; + export import readIPCStream = io.readIPCStream; export import readJSON = io.readJSON; export import readParquet = io.readParquet; export import readAvro = io.readAvro; @@ -188,6 +189,7 @@ export import scanParquet = io.scanParquet; export import readRecords = io.readRecords; export import readCSV = io.readCSV; export import readIPC = io.readIPC; +export import readIPCStream = io.readIPCStream; export import readJSON = io.readJSON; export import readParquet = io.readParquet; export import readAvro = io.readAvro; diff --git a/polars/io.ts b/polars/io.ts index d90f79b42..f25af201c 100644 --- a/polars/io.ts +++ b/polars/io.ts @@ -531,7 +531,7 @@ export interface ReadIPCOptions { } /** - * __Read into a DataFrame from Arrow IPC (Feather v2) file.__ + * __Read into a DataFrame from Arrow IPC file (Feather v2).__ * ___ * @param pathOrBody - path or buffer or string * - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`. @@ -558,6 +558,36 @@ export function readIPC(pathOrBody, options = {}) { throw new Error("must supply either a path or body"); } +/** + * __Read into a DataFrame from Arrow IPC stream.__ + * ___ + * @param pathOrBody - path or buffer or string + * - path: Path to a file or a file like string. Any valid filepath can be used. Example: `file.ipc`. + * - body: String or buffer to be read as Arrow IPC + * @param options.columns Columns to select. Accepts a list of column names. + * @param options.nRows Stop reading from parquet file after reading ``nRows``. + */ +export function readIPCStream( + pathOrBody: string | Buffer, + options?: Partial, +): DataFrame; +export function readIPCStream(pathOrBody, options = {}) { + if (Buffer.isBuffer(pathOrBody)) { + return _DataFrame(pli.readIpcStream(pathOrBody, options)); + } + + if (typeof pathOrBody === "string") { + const inline = !isPath(pathOrBody, [".ipc"]); + if (inline) { + return _DataFrame( + pli.readIpcStream(Buffer.from(pathOrBody, "utf-8"), options), + ); + } + return _DataFrame(pli.readIpcStream(pathOrBody, options)); + } + throw new Error("must supply either a path or body"); +} + export interface ScanIPCOptions { nRows: number; cache: boolean; @@ -565,7 +595,7 @@ export interface ScanIPCOptions { } /** - * __Lazily read from an Arrow IPC (Feather v2) file or multiple files via glob patterns.__ + * __Lazily read from an Arrow IPC file (Feather v2) or multiple files via glob patterns.__ * ___ * @param path Path to a IPC file. * @param options.nRows Stop reading from IPC file after reading ``nRows`` diff --git a/src/dataframe.rs b/src/dataframe.rs index f68650468..1472c258b 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -349,6 +349,43 @@ pub fn read_ipc( Ok(JsDataFrame::new(df)) } +#[napi(catch_unwind)] +pub fn read_ipc_stream( + path_or_buffer: Either, + options: ReadIpcOptions, +) -> napi::Result { + let columns = options.columns; + let projection = options + .projection + .map(|projection| projection.into_iter().map(|p| p as usize).collect()); + let row_count = options.row_count.map(|rc| rc.into()); + let n_rows = options.n_rows.map(|nr| nr as usize); + + let result = match path_or_buffer { + Either::A(path) => { + let f = File::open(&path)?; + let reader = BufReader::new(f); + IpcStreamReader::new(reader) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .with_row_index(row_count) + .finish() + } + Either::B(buf) => { + let cursor = Cursor::new(buf.as_ref()); + IpcStreamReader::new(cursor) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .with_row_index(row_count) + .finish() + } + }; + let df = result.map_err(JsPolarsErr::from)?; + Ok(JsDataFrame::new(df)) +} + #[napi(object)] pub struct ReadAvroOptions { pub columns: Option>, @@ -1426,6 +1463,38 @@ impl JsDataFrame { Ok(()) } #[napi(catch_unwind)] + pub fn write_ipc_stream( + &mut self, + path_or_buffer: JsUnknown, + compression: Wrap>, + env: Env, + ) -> napi::Result<()> { + let compression = compression.0; + + match path_or_buffer.get_type()? { + ValueType::String => { + let path: napi::JsString = unsafe { path_or_buffer.cast() }; + let path = path.into_utf8()?.into_owned()?; + let f = std::fs::File::create(path).unwrap(); + let f = BufWriter::new(f); + IpcStreamWriter::new(f) + .with_compression(compression) + .finish(&mut self.df) + .map_err(JsPolarsErr::from)?; + } + ValueType::Object => { + let inner: napi::JsObject = unsafe { path_or_buffer.cast() }; + let writeable = JsWriteStream { inner, env: &env }; + IpcStreamWriter::new(writeable) + .with_compression(compression) + .finish(&mut self.df) + .map_err(JsPolarsErr::from)?; + } + _ => panic!(), + }; + Ok(()) + } + #[napi(catch_unwind)] pub fn write_json( &mut self, path_or_buffer: JsUnknown,