Skip to content

Commit

Permalink
Adding ldf sinkParquet (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bidek56 authored Feb 27, 2024
1 parent 8ecf9bd commit ea4ddc5
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 13 deletions.
34 changes: 30 additions & 4 deletions __tests__/lazyframe.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ describe("lazyframe", () => {
ldf.sinkCSV("./test.csv");
const newDF: pl.DataFrame = pl.readCSV("./test.csv");
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
expect(newDF.sort("foo")).toFrameEqual(actualDf);
fs.rmSync("./test.csv");
});
test("sinkCSV:noHeader", async () => {
Expand All @@ -1255,7 +1255,7 @@ describe("lazyframe", () => {
ldf.sinkCSV("./test.csv", { includeHeader: false });
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { hasHeader: false });
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("column_1").toString()).toEqual(actualDf.toString());
expect(newDF.sort("column_1")).toFrameEqual(actualDf);
fs.rmSync("./test.csv");
});
test("sinkCSV:separator", async () => {
Expand All @@ -1268,7 +1268,7 @@ describe("lazyframe", () => {
ldf.sinkCSV("./test.csv", { separator: "|" });
const newDF: pl.DataFrame = pl.readCSV("./test.csv", { sep: "|" });
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
expect(newDF.sort("foo")).toFrameEqual(actualDf);
fs.rmSync("./test.csv");
});
test("sinkCSV:nullValue", async () => {
Expand All @@ -1283,7 +1283,33 @@ describe("lazyframe", () => {
const actualDf: pl.DataFrame = await (await ldf.collect()).withColumn(
pl.col("bar").fillNull("BOOM"),
);
expect(newDF.sort("foo").toString()).toEqual(actualDf.toString());
expect(newDF.sort("foo")).toFrameEqual(actualDf);
fs.rmSync("./test.csv");
});
test("sinkParquet:path", async () => {
const ldf = pl
.DataFrame([
pl.Series("foo", [1, 2, 3], pl.Int64),
pl.Series("bar", ["a", "b", "c"]),
])
.lazy();
ldf.sinkParquet("./test.parquet");
const newDF: pl.DataFrame = pl.readParquet("./test.parquet");
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("foo")).toFrameEqual(actualDf);
fs.rmSync("./test.parquet");
});
test("sinkParquet:compression:gzip", async () => {
const ldf = pl
.DataFrame([
pl.Series("foo", [1, 2, 3], pl.Int64),
pl.Series("bar", ["a", "b", "c"]),
])
.lazy();
ldf.sinkParquet("./test.parquet", { compression: "gzip" });
const newDF: pl.DataFrame = pl.readParquet("./test.parquet");
const actualDf: pl.DataFrame = await ldf.collect();
expect(newDF.sort("foo")).toFrameEqual(actualDf);
fs.rmSync("./test.parquet");
});
});
66 changes: 58 additions & 8 deletions polars/lazy/dataframe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import {
} from "../utils";
import { _LazyGroupBy, LazyGroupBy } from "./groupby";
import { Deserialize, GroupByOps, Serialize } from "../shared_traits";
import { LazyOptions, LazyJoinOptions, SinkCsvOptions } from "../types";
import {
LazyOptions,
LazyJoinOptions,
SinkCsvOptions,
SinkParquetOptions,
} from "../types";
import { Series } from "../series";

const inspect = Symbol.for("nodejs.util.inspect.custom");
Expand Down Expand Up @@ -513,7 +518,52 @@ export interface LazyDataFrame extends Serialize, GroupByOps<LazyGroupBy> {
>>> lf.sinkCsv("out.csv")
*/

sinkCSV(dest: string, options?: SinkCsvOptions): void;
sinkCSV(path: string, options?: SinkCsvOptions): void;

/***
*
* Evaluate the query in streaming mode and write to a Parquet file.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
This allows streaming results that are larger than RAM to be written to disk.
Parameters
----------
@param path - File path to which the file should be written.
@param compression : {'lz4', 'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'zstd'}
Choose "zstd" for good compression performance. (default)
Choose "lz4" for fast compression/decompression.
Choose "snappy" for more backwards compatibility guarantees
when you deal with older parquet readers.
@param compressionLevel - The level of compression to use. Higher compression means smaller files on disk.
- "gzip" : min-level: 0, max-level: 10.
- "brotli" : min-level: 0, max-level: 11.
- "zstd" : min-level: 1, max-level: 22.
@param statistics - Write statistics to the parquet headers. This requires extra compute. Default - false
@param rowGroupSize - Size of the row groups in number of rows.
If None (default), the chunks of the `DataFrame` are
used. Writing in smaller chunks may reduce memory pressure and improve
writing speeds.
@param dataPagesizeLimit - Size limit of individual data pages.
If not set defaults to 1024 * 1024 bytes
@param maintainOrder - Maintain the order in which data is processed. Default -> true
Setting this to `False` will be slightly faster.
@param typeCoercion - Do type coercion optimization. Default -> true
@param predicatePushdown - Do predicate pushdown optimization. Default -> true
@param projectionPushdown - Do projection pushdown optimization. Default -> true
@param simplifyExpression - Run simplify expressions optimization. Default -> true
@param slicePushdown - Slice pushdown optimization. Default -> true
@param noOptimization - Turn off (certain) optimizations. Default -> false
Examples
--------
>>> const lf = pl.scanCsv("/path/to/my_larger_than_ram_file.csv") # doctest: +SKIP
>>> lf.sinkParquet("out.parquet") # doctest: +SKIP
*/
sinkParquet(path: string, options?: SinkParquetOptions): void;
}

const prepareGroupbyInputs = (by) => {
Expand Down Expand Up @@ -956,13 +1006,13 @@ export const _LazyDataFrame = (_ldf: any): LazyDataFrame => {
withRowCount(name = "row_nr") {
return _LazyDataFrame(_ldf.withRowCount(name));
},
sinkCSV(dest?, options = {}) {
sinkCSV(path, options: SinkCsvOptions = {}) {
options.maintainOrder = options.maintainOrder ?? false;
if (typeof dest === "string") {
_ldf.sinkCsv(dest, options);
} else {
throw new TypeError("Expected a string destination");
}
_ldf.sinkCsv(path, options);
},
sinkParquet(path: string, options: SinkParquetOptions = {}) {
options.compression = options.compression ?? "zstd";
_ldf.sinkParquet(path, options);
},
};
};
Expand Down
20 changes: 19 additions & 1 deletion polars/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export interface WriteCsvOptions {
sep?: string;
}
/**
* Options for {@link LazyDataFrame.sinkCSV}
* Options for @see {@link LazyDataFrame.sinkCSV}
* @category Options
*/
export interface SinkCsvOptions {
Expand All @@ -65,6 +65,24 @@ export interface SinkCsvOptions {
nullValue?: string;
maintainOrder?: boolean;
}
/**
* Options for @see {@link LazyDataFrame.sinkParquet}
* @category Options
*/
export interface SinkParquetOptions {
compression?: string;
compressionLevel?: number;
statistics?: boolean;
rowGroupSize?: number;
dataPagesizeLimit?: number;
maintainOrder?: boolean;
typeCoercion?: boolean;
predicatePushdown?: boolean;
projectionPushdown?: boolean;
simplifyExpression?: boolean;
slicePushdown?: boolean;
noOptimization?: boolean;
}
/**
* Options for {@link DataFrame.writeJSON}
* @category Options
Expand Down
59 changes: 59 additions & 0 deletions src/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,22 @@ pub struct SinkCsvOptions {
pub maintain_order: bool,
}

#[napi(object)]
pub struct SinkParquetOptions {
pub compression: Option<String>,
pub compression_level: Option<i32>,
pub statistics: Option<bool>,
pub row_group_size: Option<i16>,
pub data_pagesize_limit: Option<i64>,
pub maintain_order: Option<bool>,
pub type_coercion: Option<bool>,
pub predicate_pushdown: Option<bool>,
pub projection_pushdown: Option<bool>,
pub simplify_expression: Option<bool>,
pub slice_pushdown: Option<bool>,
pub no_optimization: Option<bool>,
}

#[napi(object)]
pub struct Shape {
pub height: i64,
Expand Down Expand Up @@ -1197,3 +1213,46 @@ where
{
container.into_iter().map(|s| s.as_ref().into()).collect()
}

pub(crate) fn parse_parquet_compression(
compression: String,
compression_level: Option<i32>,
) -> JsResult<ParquetCompression> {
let parsed = match compression.as_ref() {
"uncompressed" => ParquetCompression::Uncompressed,
"snappy" => ParquetCompression::Snappy,
"gzip" => ParquetCompression::Gzip(
compression_level
.map(|lvl| {
GzipLevel::try_new(lvl as u8)
// .map_err(|e| JsValueErr::new_err(format!("{e:?}")))
.map_err(|e| napi::Error::from_reason(format!("{:?}", e)))
})
.transpose()?,
),
"lzo" => ParquetCompression::Lzo,
"brotli" => ParquetCompression::Brotli(
compression_level
.map(|lvl| {
BrotliLevel::try_new(lvl as u32)
.map_err(|e| napi::Error::from_reason(format!("{e:?}")))
})
.transpose()?,
),
"lz4" => ParquetCompression::Lz4Raw,
"zstd" => ParquetCompression::Zstd(
compression_level
.map(|lvl| {
ZstdLevel::try_new(lvl)
.map_err(|e| napi::Error::from_reason(format!("{e:?}")))
})
.transpose()?,
),
e => {
return Err(napi::Error::from_reason(format!(
"parquet `compression` must be one of {{'uncompressed', 'snappy', 'gzip', 'lzo', 'brotli', 'lz4', 'zstd'}}, got {e}",
)))
}
};
Ok(parsed)
}
23 changes: 23 additions & 0 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,29 @@ impl JsLazyFrame {
let _ = ldf.sink_csv(path_buf, options).map_err(JsPolarsErr::from);
Ok(())
}

#[napi(catch_unwind)]
pub fn sink_parquet(&self, path: String, options: SinkParquetOptions) -> napi::Result<()> {
let compression_str = options.compression.unwrap_or("zstd".to_string());
let compression = parse_parquet_compression(compression_str, options.compression_level)?;
let statistics = options.statistics.unwrap_or(false);
let row_group_size = options.row_group_size.map(|i| i as usize);
let data_pagesize_limit = options.data_pagesize_limit.map(|i| i as usize);
let maintain_order = options.maintain_order.unwrap_or(true);

let options = ParquetWriteOptions {
compression,
statistics,
row_group_size,
data_pagesize_limit,
maintain_order,
};

let path_buf: PathBuf = PathBuf::from(path);
let ldf = self.ldf.clone().with_comm_subplan_elim(false);
let _ = ldf.sink_parquet(path_buf, options).map_err(JsPolarsErr::from);
Ok(())
}
}

#[napi(object)]
Expand Down

0 comments on commit ea4ddc5

Please sign in to comment.