Skip to content

Commit

Permalink
tested open_append_stream. note: closing stream does not allow open_a…
Browse files Browse the repository at this point in the history
…ppend_stream work
  • Loading branch information
Seher Karakuzu authored and Seher Karakuzu committed Jul 5, 2024
1 parent e487b9b commit 78c46a1
Showing 1 changed file with 37 additions and 8 deletions.
45 changes: 37 additions & 8 deletions tiled/adapters/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pandas
import pyarrow
import pyarrow.feather as feather
import pyarrow.fs

from ..structures.core import Spec, StructureFamily
from ..structures.data_source import Asset, DataSource, Management
Expand Down Expand Up @@ -259,9 +260,20 @@ def append_partition(
"""
uri = self._partition_paths[partition]
print("HELL0 URI In APPEND", type(uri))
self.stream_writer.write_batch(data)
print("HELL0 URI In APPEND", uri)
# self.stream_writer.write_batch(data)
# self.stream_writer.close()
# stream_writer = pyarrow.ipc.new_stream(uri, data.schema)
# stream_writer.write_batch(data)

# with pyarrow.OSFile(str(uri), 'ab') as sink:
# with pyarrow.RecordBatchStreamWriter(uri, data.schema) as writer:
# writer.write_batch(data)
# writer.close()

pyarrow.fs.LocalFileSystem().open_append_stream(path=str(uri)).write(
data.serialize()
)

def write_partition(
self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int
Expand All @@ -283,10 +295,18 @@ def write_partition(
schema = data.schema

uri = self._partition_paths[partition]
if not hasattr(self, "stream_writer"):
self.stream_writer = pyarrow.ipc.new_stream(uri, schema)
# if not hasattr(self, "stream_writer"):
# self.stream_writer = pyarrow.ipc.new_stream(uri, schema)
# self.stream_writer.write_batch(data)

stream_writer = pyarrow.ipc.new_stream(uri, schema)
stream_writer.write_batch(data)
# stream_writer.close()

self.stream_writer.write_batch(data)
# with pyarrow.OSFile(str(uri), 'wb') as sink:
# with pyarrow.RecordBatchStreamWriter(sink, data.schema) as writer:
# writer.write_batch(data)
# writer.close()

def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None:
"""
Expand All @@ -307,9 +327,18 @@ def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None
if self.structure().npartitions != 1:
raise NotImplementedError
uri = self._partition_paths[0]
if not hasattr(self, "stream_writer"):
self.stream_writer = pyarrow.ipc.new_stream(uri, schema)
self.stream_writer.write_batch(data)
# if not hasattr(self, "stream_writer"):
# self.stream_writer = pyarrow.ipc.new_stream(uri, schema)
# self.stream_writer.write_batch(data)

stream_writer = pyarrow.ipc.new_stream(uri, schema)
stream_writer.write_batch(data)
# stream_writer.close()

# with pyarrow.OSFile(str(uri), 'wb') as sink:
# with pyarrow.RecordBatchStreamWriter(sink, data.schema) as writer:
# writer.write_batch(data)
# writer.close()

def read(
self, *args: Any, **kwargs: Any
Expand Down

0 comments on commit 78c46a1

Please sign in to comment.