Skip to content

Commit

Permalink
Read and write dataframes from/to NDJSON files (#145)
Browse files Browse the repository at this point in the history
NDJSON is an useful way to store multiple JSON documents in a single
file, separating them by new lines.

Closes #139
  • Loading branch information
philss authored Mar 31, 2022
1 parent 4f0f29b commit f6c88ae
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 0 deletions.
7 changes: 7 additions & 0 deletions lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ defmodule Explorer.Backend.DataFrame do
@callback read_ipc(filename :: String.t()) :: result(df)
@callback write_ipc(df, filename :: String.t()) :: result(String.t())

@callback read_ndjson(
filename :: String.t(),
infer_schema_length :: integer(),
with_batch_size :: integer()
) :: result(df)
@callback write_ndjson(df, filename :: String.t()) :: result(String.t())

# Conversion

@callback from_columns(map() | Keyword.t()) :: df
Expand Down
38 changes: 38 additions & 0 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,44 @@ defmodule Explorer.DataFrame do
end
end

@doc """
Read a file of JSON objects or lists separated by new lines
## Options
* `with_batch_size` - Sets the batch size for reading rows.
This value may have significant impact in performance, so adjust it for your needs (default: `1000`).
* `infer_schema_length` - Maximum number of rows read for schema inference.
Setting this to nil will do a full table scan and will be slow (default: `1000`).
"""
@spec read_ndjson(filename :: String.t(), opts :: Keyword.t()) ::
{:ok, DataFrame.t()} | {:error, term()}
def read_ndjson(filename, opts \\ []) do
opts =
keyword!(opts,
with_batch_size: 1000,
infer_schema_length: @default_infer_schema_length
)

backend = backend_from_options!(opts)

backend.read_ndjson(
filename,
opts[:infer_schema_length],
opts[:with_batch_size]
)
end

@doc """
Writes a dataframe to a ndjson file.
"""
@spec write_ndjson(df :: DataFrame.t(), filename :: String.t()) ::
{:ok, String.t()} | {:error, term()}
def write_ndjson(df, filename) do
apply_impl(df, :write_ndjson, [filename])
end

@doc """
Creates a new dataframe from a map or keyword of lists or series.
Expand Down
14 changes: 14 additions & 0 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ defmodule Explorer.PolarsBackend.DataFrame do
end
end

@impl true
def read_ndjson(filename, infer_schema_length, with_batch_size) do
with {:ok, df} <- Native.df_read_ndjson(filename, infer_schema_length, with_batch_size) do
{:ok, Shared.to_dataframe(df)}
end
end

@impl true
def write_ndjson(%DataFrame{data: df}, filename) do
with {:ok, _} <- Native.df_write_ndjson(df, filename) do
{:ok, filename}
end
end

@impl true
def to_binary(%DataFrame{} = df, header?, delimiter) do
<<delimiter::utf8>> = delimiter
Expand Down
13 changes: 13 additions & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ defmodule Explorer.PolarsBackend.Native do
),
do: err()

def df_read_ndjson(
_filename,
_infer_schema_length,
_with_batch_size
),
do: err()

def df_write_ndjson(
_df,
_filename
),
do: err()

def df_as_str(_df), do: err()
def df_clone(_df), do: err()
def df_column(_df, _name), do: err()
Expand Down
27 changes: 27 additions & 0 deletions native/explorer/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use rustler::{Term, TermType};

use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::result::Result;

use crate::series::{to_ex_series_collection, to_series_collection};
Expand Down Expand Up @@ -162,6 +163,32 @@ pub fn df_write_ipc(data: ExDataFrame, filename: &str) -> Result<(), ExplorerErr
})
}

#[rustler::nif]
pub fn df_read_ndjson(
filename: &str,
infer_schema_length: Option<usize>,
with_batch_size: usize,
) -> Result<ExDataFrame, ExplorerError> {
let file = File::open(filename)?;
let buf_reader = BufReader::new(file);
let df = JsonReader::new(buf_reader)
.with_json_format(JsonFormat::JsonLines)
.with_batch_size(with_batch_size)
.infer_schema_len(infer_schema_length)
.finish()?;

Ok(ExDataFrame::new(df))
}

#[rustler::nif]
pub fn df_write_ndjson(data: ExDataFrame, filename: &str) -> Result<(), ExplorerError> {
df_read!(data, df, {
let file = File::create(filename).expect("could not create file");
JsonWriter::new(file).finish(&mut df.clone())?;
Ok(())
})
}

#[rustler::nif]
pub fn df_as_str(data: ExDataFrame) -> Result<String, ExplorerError> {
df_read!(data, df, { Ok(format!("{:?}", df)) })
Expand Down
2 changes: 2 additions & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ rustler::init!(
df_read_csv,
df_read_ipc,
df_read_parquet,
df_read_ndjson,
df_write_ndjson,
df_select,
df_select_at_idx,
df_set_column_names,
Expand Down
77 changes: 77 additions & 0 deletions test/explorer/data_frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,83 @@ defmodule Explorer.DataFrameTest do
end
end

describe "read_ndjson/2" do
@tag :tmp_dir
test "reads from file with default options", %{tmp_dir: tmp_dir} do
ndjson_path = write_ndjson(tmp_dir)

assert {:ok, df} = DF.read_ndjson(ndjson_path)

assert DF.names(df) == ~w[a b c d]
assert DF.dtypes(df) == [:integer, :float, :boolean, :string]

assert take_five(df["a"]) == [1, -10, 2, 1, 7]
assert take_five(df["b"]) == [2.0, -3.5, 0.6, 2.0, -3.5]
assert take_five(df["c"]) == [false, true, false, false, true]
assert take_five(df["d"]) == ["4", "4", "text", "4", "4"]

assert {:error, _message} = DF.read_ndjson(Path.join(tmp_dir, "idontexist.ndjson"))
end

@tag :tmp_dir
test "reads from file with options", %{tmp_dir: tmp_dir} do
ndjson_path = write_ndjson(tmp_dir)

assert {:ok, df} = DF.read_ndjson(ndjson_path, infer_schema_length: 3, with_batch_size: 3)

assert DF.names(df) == ~w[a b c d]
assert DF.dtypes(df) == [:integer, :float, :boolean, :string]
end

defp write_ndjson(tmp_dir) do
ndjson_path = Path.join(tmp_dir, "test.ndjson")

contents = """
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":-10, "b":-3.5, "c":true, "d":"4"}
{"a":2, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":7, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":5, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":1, "b":-3.5, "c":true, "d":"4"}
{"a":100000000000000, "b":0.6, "c":false, "d":"text"}
"""

:ok = File.write!(ndjson_path, contents)
ndjson_path
end

defp take_five(series) do
series |> Series.to_list() |> Enum.take(5)
end
end

describe "write_ndjson" do
@tag :tmp_dir
test "writes to a file", %{tmp_dir: tmp_dir} do
df =
DF.from_columns(
a: [1, -10, 2, 1, 7, 1, 1, 5, 1, 1, 1, 100_000_000_000_000],
b: [2.0, -3.5, 0.6, 2.0, -3.5, 0.6, 2.0, -3.5, 0.6, 2.0, -3.5, 0.6],
c: [false, true, false, false, true, false, false, true, false, false, true, false],
d: ["4", "4", "text", "4", "4", "text", "4", "4", "text", "4", "4", "text"]
)

ndjson_path = Path.join(tmp_dir, "test-write.ndjson")

assert {:ok, ^ndjson_path} = DF.write_ndjson(df, ndjson_path)
assert {:ok, ndjson_df} = DF.read_ndjson(ndjson_path)

assert DF.names(df) == DF.names(ndjson_df)
assert DF.dtypes(df) == DF.dtypes(ndjson_df)
assert DF.to_map(df) == DF.to_map(ndjson_df)
end
end

describe "table/1" do
test "prints what we expect" do
df = Datasets.iris()
Expand Down

0 comments on commit f6c88ae

Please sign in to comment.