From f6c88ae9959748180c6c07392e2b56080fc9a249 Mon Sep 17 00:00:00 2001 From: Philip Sampaio Date: Wed, 30 Mar 2022 21:45:42 -0300 Subject: [PATCH] Read and write dataframes from/to NDJSON files (#145) NDJSON is an useful way to store multiple JSON documents in a single file, separating them by new lines. Closes https://github.com/elixir-nx/explorer/issues/139 --- lib/explorer/backend/data_frame.ex | 7 +++ lib/explorer/data_frame.ex | 38 +++++++++++ lib/explorer/polars_backend/data_frame.ex | 14 +++++ lib/explorer/polars_backend/native.ex | 13 ++++ native/explorer/src/dataframe.rs | 27 ++++++++ native/explorer/src/lib.rs | 2 + test/explorer/data_frame_test.exs | 77 +++++++++++++++++++++++ 7 files changed, 178 insertions(+) diff --git a/lib/explorer/backend/data_frame.ex b/lib/explorer/backend/data_frame.ex index 268d1cb06..716930133 100644 --- a/lib/explorer/backend/data_frame.ex +++ b/lib/explorer/backend/data_frame.ex @@ -35,6 +35,13 @@ defmodule Explorer.Backend.DataFrame do @callback read_ipc(filename :: String.t()) :: result(df) @callback write_ipc(df, filename :: String.t()) :: result(String.t()) + @callback read_ndjson( + filename :: String.t(), + infer_schema_length :: integer(), + with_batch_size :: integer() + ) :: result(df) + @callback write_ndjson(df, filename :: String.t()) :: result(String.t()) + # Conversion @callback from_columns(map() | Keyword.t()) :: df diff --git a/lib/explorer/data_frame.ex b/lib/explorer/data_frame.ex index bac0f76f7..d416051fa 100644 --- a/lib/explorer/data_frame.ex +++ b/lib/explorer/data_frame.ex @@ -168,6 +168,44 @@ defmodule Explorer.DataFrame do end end + @doc """ + Read a file of JSON objects or lists separated by new lines + + ## Options + + * `with_batch_size` - Sets the batch size for reading rows. + This value may have significant impact in performance, so adjust it for your needs (default: `1000`). + + * `infer_schema_length` - Maximum number of rows read for schema inference. + Setting this to nil will do a full table scan and will be slow (default: `1000`). + """ + @spec read_ndjson(filename :: String.t(), opts :: Keyword.t()) :: + {:ok, DataFrame.t()} | {:error, term()} + def read_ndjson(filename, opts \\ []) do + opts = + keyword!(opts, + with_batch_size: 1000, + infer_schema_length: @default_infer_schema_length + ) + + backend = backend_from_options!(opts) + + backend.read_ndjson( + filename, + opts[:infer_schema_length], + opts[:with_batch_size] + ) + end + + @doc """ + Writes a dataframe to a ndjson file. + """ + @spec write_ndjson(df :: DataFrame.t(), filename :: String.t()) :: + {:ok, String.t()} | {:error, term()} + def write_ndjson(df, filename) do + apply_impl(df, :write_ndjson, [filename]) + end + @doc """ Creates a new dataframe from a map or keyword of lists or series. diff --git a/lib/explorer/polars_backend/data_frame.ex b/lib/explorer/polars_backend/data_frame.ex index 144dda425..d60cf8681 100644 --- a/lib/explorer/polars_backend/data_frame.ex +++ b/lib/explorer/polars_backend/data_frame.ex @@ -91,6 +91,20 @@ defmodule Explorer.PolarsBackend.DataFrame do end end + @impl true + def read_ndjson(filename, infer_schema_length, with_batch_size) do + with {:ok, df} <- Native.df_read_ndjson(filename, infer_schema_length, with_batch_size) do + {:ok, Shared.to_dataframe(df)} + end + end + + @impl true + def write_ndjson(%DataFrame{data: df}, filename) do + with {:ok, _} <- Native.df_write_ndjson(df, filename) do + {:ok, filename} + end + end + @impl true def to_binary(%DataFrame{} = df, header?, delimiter) do <> = delimiter diff --git a/lib/explorer/polars_backend/native.ex b/lib/explorer/polars_backend/native.ex index 0309e9b65..681f760db 100644 --- a/lib/explorer/polars_backend/native.ex +++ b/lib/explorer/polars_backend/native.ex @@ -45,6 +45,19 @@ defmodule Explorer.PolarsBackend.Native do ), do: err() + def df_read_ndjson( + _filename, + _infer_schema_length, + _with_batch_size + ), + do: err() + + def df_write_ndjson( + _df, + _filename + ), + do: err() + def df_as_str(_df), do: err() def df_clone(_df), do: err() def df_column(_df, _name), do: err() diff --git a/native/explorer/src/dataframe.rs b/native/explorer/src/dataframe.rs index 621f1222d..6b94caf37 100644 --- a/native/explorer/src/dataframe.rs +++ b/native/explorer/src/dataframe.rs @@ -3,6 +3,7 @@ use rustler::{Term, TermType}; use std::collections::HashMap; use std::fs::File; +use std::io::BufReader; use std::result::Result; use crate::series::{to_ex_series_collection, to_series_collection}; @@ -162,6 +163,32 @@ pub fn df_write_ipc(data: ExDataFrame, filename: &str) -> Result<(), ExplorerErr }) } +#[rustler::nif] +pub fn df_read_ndjson( + filename: &str, + infer_schema_length: Option, + with_batch_size: usize, +) -> Result { + let file = File::open(filename)?; + let buf_reader = BufReader::new(file); + let df = JsonReader::new(buf_reader) + .with_json_format(JsonFormat::JsonLines) + .with_batch_size(with_batch_size) + .infer_schema_len(infer_schema_length) + .finish()?; + + Ok(ExDataFrame::new(df)) +} + +#[rustler::nif] +pub fn df_write_ndjson(data: ExDataFrame, filename: &str) -> Result<(), ExplorerError> { + df_read!(data, df, { + let file = File::create(filename).expect("could not create file"); + JsonWriter::new(file).finish(&mut df.clone())?; + Ok(()) + }) +} + #[rustler::nif] pub fn df_as_str(data: ExDataFrame) -> Result { df_read!(data, df, { Ok(format!("{:?}", df)) }) diff --git a/native/explorer/src/lib.rs b/native/explorer/src/lib.rs index 083b0366c..229fcf88a 100644 --- a/native/explorer/src/lib.rs +++ b/native/explorer/src/lib.rs @@ -57,6 +57,8 @@ rustler::init!( df_read_csv, df_read_ipc, df_read_parquet, + df_read_ndjson, + df_write_ndjson, df_select, df_select_at_idx, df_set_column_names, diff --git a/test/explorer/data_frame_test.exs b/test/explorer/data_frame_test.exs index 2ecbc1c4e..c4d6d0851 100644 --- a/test/explorer/data_frame_test.exs +++ b/test/explorer/data_frame_test.exs @@ -313,6 +313,83 @@ defmodule Explorer.DataFrameTest do end end + describe "read_ndjson/2" do + @tag :tmp_dir + test "reads from file with default options", %{tmp_dir: tmp_dir} do + ndjson_path = write_ndjson(tmp_dir) + + assert {:ok, df} = DF.read_ndjson(ndjson_path) + + assert DF.names(df) == ~w[a b c d] + assert DF.dtypes(df) == [:integer, :float, :boolean, :string] + + assert take_five(df["a"]) == [1, -10, 2, 1, 7] + assert take_five(df["b"]) == [2.0, -3.5, 0.6, 2.0, -3.5] + assert take_five(df["c"]) == [false, true, false, false, true] + assert take_five(df["d"]) == ["4", "4", "text", "4", "4"] + + assert {:error, _message} = DF.read_ndjson(Path.join(tmp_dir, "idontexist.ndjson")) + end + + @tag :tmp_dir + test "reads from file with options", %{tmp_dir: tmp_dir} do + ndjson_path = write_ndjson(tmp_dir) + + assert {:ok, df} = DF.read_ndjson(ndjson_path, infer_schema_length: 3, with_batch_size: 3) + + assert DF.names(df) == ~w[a b c d] + assert DF.dtypes(df) == [:integer, :float, :boolean, :string] + end + + defp write_ndjson(tmp_dir) do + ndjson_path = Path.join(tmp_dir, "test.ndjson") + + contents = """ + {"a":1, "b":2.0, "c":false, "d":"4"} + {"a":-10, "b":-3.5, "c":true, "d":"4"} + {"a":2, "b":0.6, "c":false, "d":"text"} + {"a":1, "b":2.0, "c":false, "d":"4"} + {"a":7, "b":-3.5, "c":true, "d":"4"} + {"a":1, "b":0.6, "c":false, "d":"text"} + {"a":1, "b":2.0, "c":false, "d":"4"} + {"a":5, "b":-3.5, "c":true, "d":"4"} + {"a":1, "b":0.6, "c":false, "d":"text"} + {"a":1, "b":2.0, "c":false, "d":"4"} + {"a":1, "b":-3.5, "c":true, "d":"4"} + {"a":100000000000000, "b":0.6, "c":false, "d":"text"} + """ + + :ok = File.write!(ndjson_path, contents) + ndjson_path + end + + defp take_five(series) do + series |> Series.to_list() |> Enum.take(5) + end + end + + describe "write_ndjson" do + @tag :tmp_dir + test "writes to a file", %{tmp_dir: tmp_dir} do + df = + DF.from_columns( + a: [1, -10, 2, 1, 7, 1, 1, 5, 1, 1, 1, 100_000_000_000_000], + b: [2.0, -3.5, 0.6, 2.0, -3.5, 0.6, 2.0, -3.5, 0.6, 2.0, -3.5, 0.6], + c: [false, true, false, false, true, false, false, true, false, false, true, false], + d: ["4", "4", "text", "4", "4", "text", "4", "4", "text", "4", "4", "text"] + ) + + ndjson_path = Path.join(tmp_dir, "test-write.ndjson") + + assert {:ok, ^ndjson_path} = DF.write_ndjson(df, ndjson_path) + assert {:ok, ndjson_df} = DF.read_ndjson(ndjson_path) + + assert DF.names(df) == DF.names(ndjson_df) + assert DF.dtypes(df) == DF.dtypes(ndjson_df) + assert DF.to_map(df) == DF.to_map(ndjson_df) + end + end + describe "table/1" do test "prints what we expect" do df = Datasets.iris()