Skip to content

Commit

Permalink
Add arrow adapter (#755)
Browse files Browse the repository at this point in the history
* rebased 1st commit

* rebased

* tried to implement streamed batch writing

* appending trial for arrow adapter

* create two instance of arrow adapter for stream and random access format

* tested open_append_stream. note: closing stream does not allow open_append_stream work

* only filereader is allowed. note: unit tests are to be implemented

* some more refactoring

* some more refactoring. still needs unit testing

* implemented some unit tests, to be continued ...

* refactor and fix test

* fixed changelog error

* addressed comments

* rebased the branch

* addressed comments

---------

Co-authored-by: Seher Karakuzu <[email protected]>
  • Loading branch information
skarakuzu and Seher Karakuzu authored Aug 9, 2024
1 parent 15abbc9 commit b3e47d4
Show file tree
Hide file tree
Showing 9 changed files with 442 additions and 4 deletions.
9 changes: 9 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ disallow_incomplete_defs = True
disallow_untyped_calls = True
disallow_untyped_decorators = True

[mypy-tiled._tests.adapters.*]
ignore_errors = False
ignore_missing_imports = False
check_untyped_defs = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
disallow_untyped_calls = True
disallow_untyped_decorators = False

[mypy-tiled._tests.test_protocols]
ignore_errors = False
ignore_missing_imports = False
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Write the date in place of the "Unreleased" in the case a new version is release

### Added
- Add method to `TableAdapter` which accepts a Python dictionary.
- Added an `Arrow` adapter which supports reading/writing arrow tables via `RecordBatchFileReader`/`RecordBatchFileWriter`.

### Changed
- Make `tiled.client` accept a Python dictionary when fed to `write_dataframe()`.
Expand Down Expand Up @@ -71,7 +72,6 @@ Write the date in place of the "Unreleased" in the case a new version is release
## v0.1.0b1 (2024-05-25)

### Added

- Support for `FullText` search on SQLite-backed catalogs

### Fixed
Expand Down
Empty file.
69 changes: 69 additions & 0 deletions tiled/_tests/adapters/test_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import tempfile

import pyarrow as pa
import pytest

from tiled.adapters.arrow import ArrowAdapter
from tiled.structures.table import TableStructure

names = ["f0", "f1", "f2"]
data0 = [
pa.array([1, 2, 3, 4, 5]),
pa.array(["foo0", "bar0", "baz0", None, "goo0"]),
pa.array([True, None, False, True, None]),
]
data1 = [
pa.array([6, 7, 8, 9, 10, 11, 12]),
pa.array(["foo1", "bar1", None, "baz1", "biz", None, "goo"]),
pa.array([None, True, True, False, False, None, True]),
]
data2 = [pa.array([13, 14]), pa.array(["foo2", "baz2"]), pa.array([False, None])]

batch0 = pa.record_batch(data0, names=names)
batch1 = pa.record_batch(data1, names=names)
batch2 = pa.record_batch(data2, names=names)
data_uri = "file://localhost/" + tempfile.gettempdir()


@pytest.fixture
def adapter() -> ArrowAdapter:
table = pa.Table.from_arrays(data0, names)
structure = TableStructure.from_arrow_table(table, npartitions=3)
assets = ArrowAdapter.init_storage(data_uri, structure=structure)
return ArrowAdapter([asset.data_uri for asset in assets], structure=structure)


def test_attributes(adapter: ArrowAdapter) -> None:
assert adapter.structure().columns == names
assert adapter.structure().npartitions == 3


def test_write_read(adapter: ArrowAdapter) -> None:
# test writing to a partition and reading it
adapter.write_partition(batch0, 0)
assert pa.Table.from_arrays(data0, names) == pa.Table.from_pandas(
adapter.read_partition(0)
)

adapter.write_partition([batch0, batch1], 1)
assert pa.Table.from_batches([batch0, batch1]) == pa.Table.from_pandas(
adapter.read_partition(1)
)

adapter.write_partition([batch0, batch1, batch2], 2)
assert pa.Table.from_batches([batch0, batch1, batch2]) == pa.Table.from_pandas(
adapter.read_partition(2)
)

# test write to all partitions and read all
adapter.write_partition([batch0, batch1, batch2], 0)
adapter.write_partition([batch2, batch0, batch1], 1)
adapter.write_partition([batch1, batch2, batch0], 2)

assert pa.Table.from_pandas(adapter.read()) == pa.Table.from_batches(
[batch0, batch1, batch2, batch2, batch0, batch1, batch1, batch2, batch0]
)

# test adapter.write() raises NotImplementedError when there are more than 1 partitions
with pytest.raises(NotImplementedError):
adapter.write(batch0)
4 changes: 2 additions & 2 deletions tiled/_tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from ..structures.data_source import DataSource
from ..structures.sparse import COOStructure
from ..structures.table import TableStructure
from ..utils import patch_mimetypes
from ..utils import APACHE_ARROW_FILE_MIME_TYPE, patch_mimetypes
from ..validation_registration import ValidationRegistry
from .utils import fail_with_status_code

Expand Down Expand Up @@ -537,7 +537,7 @@ def test_write_with_specified_mimetype(tree):
df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
structure = TableStructure.from_pandas(df)

for mimetype in [PARQUET_MIMETYPE, "text/csv"]:
for mimetype in [PARQUET_MIMETYPE, "text/csv", APACHE_ARROW_FILE_MIME_TYPE]:
x = client.new(
"table",
[
Expand Down
Loading

0 comments on commit b3e47d4

Please sign in to comment.