Skip to content

Commit

Permalink
Adding cloud options to scan_parquet (#173)
Browse files Browse the repository at this point in the history
* Adding cloud options to scan_parquet

* Removing extra function

* Removing extra ,

* Removing duplicated option
  • Loading branch information
Bidek56 authored Mar 21, 2024
1 parent 65581cd commit 6793fe5
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 23 deletions.
67 changes: 67 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,73 @@ features = [
"cov",
"group_by_list",
"sql",
"binary_encoding",
"rolling_window",
"json",
"dynamic_group_by",
"zip_with",
"simd",
"lazy",
"strings",
"temporal",
"random",
"object",
"fmt",
"performant",
"dtype-full",
"rows",
"round_series",
"is_unique",
"is_in",
"is_first_distinct",
"asof_join",
"cross_join",
"dot_product",
"concat_str",
"row_hash",
"reinterpret",
"mode",
"extract_jsonpath",
"cum_agg",
"rolling_window",
"repeat_by",
"interpolate",
"ewma",
"rank",
"propagate_nans",
"diff",
"pct_change",
"moment",
"diagonal_concat",
"abs",
"dot_diagram",
"dataframe_arithmetic",
"json",
"string_encoding",
"product",
"ndarray",
"unique_counts",
"log",
"serde-lazy",
"partition_by",
"pivot",
"semi_anti_join",
"parquet",
"to_dummies",
"ipc",
"avro",
"list_eval",
"arg_where",
"timezones",
"peaks",
"string_pad",
"cov",
"group_by_list",
"http",
"cloud",
"aws",
"gcp",
"azure"
]
git = "https://github.com/pola-rs/polars.git"
rev = "3cf4897e679b056d17a235d48867035265d43cdc"
Expand Down
2 changes: 1 addition & 1 deletion __tests__/io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ describe("parquet", () => {
});

test("scan:options", () => {
const df = pl.scanParquet(parquetpath, { numRows: 4 }).collectSync();
const df = pl.scanParquet(parquetpath, { nRows: 4 }).collectSync();
expect(df.shape).toEqual({ height: 4, width: 4 });
});
});
Expand Down
Binary file added bun.lockb
Binary file not shown.
59 changes: 40 additions & 19 deletions polars/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,33 +476,54 @@ interface RowCount {
}

interface ScanParquetOptions {
columns?: string[] | number[];
numRows?: number;
nRows?: number;
cache?: boolean;
parallel?: "auto" | "columns" | "row_groups" | "none";
rowCount?: RowCount;
cache?: boolean;
rechunk?: boolean;
hive_partitioning?: boolean;
lowMemory?: boolean;
useStatistics?: boolean;
hivePartitioning?: boolean;
cloudOptions?: Map<string, string>;
retries?: number;
}

/**
* __Lazily read from a parquet file or multiple files via glob patterns.__
* ___
* Lazily read from a local or cloud-hosted parquet file (or files).
This function allows the query optimizer to push down predicates and projections to
the scan level, typically increasing performance and reducing memory overhead.
* This allows the query optimizer to push down predicates and projections to the scan level,
* thereby potentially reducing memory overhead.
* @param path Path to a file or or glob pattern
* @param options.numRows Stop reading from parquet file after reading ``numRows``.
* @param options.cache Cache the result after reading.
* @param options.parallel Read the parquet file in parallel. The single threaded reader consumes less memory.
* @param options.rechunk In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
* thereby potentially reducing memory overhead.
* @param source - Path(s) to a file. If a single path is given, it can be a globbing pattern.
@param options.nRows - Stop reading from parquet file after reading `n_rows`.
@param options.rowIndexName - If not None, this will insert a row index column with the given name into the DataFrame
@param options.rowIndexOffset - Offset to start the row index column (only used if the name is set)
@param options.parallel : {'auto', 'columns', 'row_groups', 'none'}
This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
@param options.useStatistics - Use statistics in the parquet to determine if pages can be skipped from reading.
@param options.hivePartitioning - Infer statistics and schema from hive partitioned URL and use them to prune reads.
@param options.rechunk - In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
@param options.lowMemory - Reduce memory pressure at the expense of performance.
@param options.cache - Cache the result after reading.
@param options.storageOptions - Options that indicate how to connect to a cloud provider.
If the cloud provider is not supported by Polars, the storage options are passed to `fsspec.open()`.
The cloud providers currently supported are AWS, GCP, and Azure.
See supported keys here:
* `aws <https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html>`_
* `gcp <https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html>`_
* `azure <https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html>`_
If `storage_options` is not provided, Polars will try to infer the information from environment variables.
@param retries - Number of retries if accessing a cloud instance fails.
*/
export function scanParquet(path: string, options: ScanParquetOptions = {}) {
const pliOptions: any = {};

pliOptions.nRows = options?.numRows;
pliOptions.rowCount = options?.rowCount;
pliOptions.parallel = options?.parallel ?? "auto";
return _LazyDataFrame(pli.scanParquet(path, pliOptions));
export function scanParquet(source: string, options: ScanParquetOptions = {}) {
const defaultOptions = { parallel: "auto" };
const pliOptions = { ...defaultOptions, ...options };
return _LazyDataFrame(pli.scanParquet(source, pliOptions));
}

export interface ReadIPCOptions {
Expand Down
24 changes: 21 additions & 3 deletions src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,11 @@ pub struct ScanParquetOptions {
pub parallel: Wrap<ParallelStrategy>,
pub row_count: Option<JsRowCount>,
pub rechunk: Option<bool>,
pub row_count_name: Option<String>,
pub row_count_offset: Option<u32>,
pub low_memory: Option<bool>,
pub use_statistics: Option<bool>,
pub hive_partitioning: Option<bool>,
pub cloud_options: Option<HashMap::<String, String>>,
pub retries: Option<i64>,
}

#[napi(catch_unwind)]
Expand All @@ -725,7 +725,25 @@ pub fn scan_parquet(path: String, options: ScanParquetOptions) -> napi::Result<J
let rechunk = options.rechunk.unwrap_or(false);
let low_memory = options.low_memory.unwrap_or(false);
let use_statistics = options.use_statistics.unwrap_or(false);
let cloud_options = Some(CloudOptions::default());

let mut cloud_options: Option<CloudOptions> = if let Some(o) = options.cloud_options {
let co: Vec<(String, String)> = o.into_iter().map(|kv: (String, String)| kv).collect();
Some(CloudOptions::from_untyped_config(&path, co).map_err(JsPolarsErr::from)?)
} else {
None
};

let retries = options.retries.unwrap_or_else(|| 2) as usize;
if retries > 0 {
cloud_options =
cloud_options
.or_else(|| Some(CloudOptions::default()))
.map(|mut options| {
options.max_retries = retries;
options
});
}

let hive_partitioning: bool = options.hive_partitioning.unwrap_or(false);
let args = ScanArgsParquet {
n_rows,
Expand Down

0 comments on commit 6793fe5

Please sign in to comment.