diff --git a/tiled/adapters/arrow.py b/tiled/adapters/arrow.py index c8738e470..4143f488d 100644 --- a/tiled/adapters/arrow.py +++ b/tiled/adapters/arrow.py @@ -17,56 +17,6 @@ from .type_alliases import JSON -def read_csv( - data_uri: str, - structure: Optional[TableStructure] = None, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - **kwargs: Any, -) -> TableAdapter: - """ - Read a CSV. - - Internally, this uses dask.dataframe.read_csv. - It forward all parameters to that function. See - https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.read_csv - - Examples - -------- - - >>> read_csv("myfiles.*.csv") - >>> read_csv("s3://bucket/myfiles.*.csv") - - Parameters - ---------- - data_uri : - structure : - metadata : - specs : - access_policy : - kwargs : - - Returns - ------- - """ - filepath = path_from_uri(data_uri) - ddf = dask.dataframe.read_csv(filepath, **kwargs) - # TODO Pass structure through rather than just re-creating it - # in from_dask_dataframe. - return DataFrameAdapter.from_dask_dataframe( - ddf, metadata=metadata, specs=specs, access_policy=access_policy - ) - - -read_csv.__doc__ = """ -This wraps dask.dataframe.read_csv. Original docstring: - -""" + ( - dask.dataframe.read_csv.__doc__ or "" -) - - class ArrowAdapter: """ """ @@ -111,26 +61,6 @@ def metadata(self) -> JSON: """ return self._metadata - @property - def dataframe_adapter(self) -> TableAdapter: - """ - - Returns - ------- - - """ - partitions = [] - for path in self._partition_paths: - if not Path(path).exists(): - partition = None - else: - # partition = dask.dataframe.read_csv(path) - # with pyarrow.ipc.open_file(path) as reader: - with pyarrow.ipc.open_stream(path) as reader: - partition = reader - partitions.append(partition) - return DataFrameAdapter(partitions, self._structure) - @classmethod def init_storage(cls, data_uri: str, structure: TableStructure) -> List[Asset]: """ @@ -157,6 +87,163 @@ def init_storage(cls, data_uri: str, structure: TableStructure) -> List[Asset]: ] return assets + def structure(self) -> TableStructure: + """ + + Returns + ------- + + """ + return self._structure + + # def get(self, key: str) -> Union[ArrayAdapter, None]: + # """ + # + # Parameters + # ---------- + # key : + # + # Returns + # ------- + # + # """ + # if key not in self.structure().columns: + # return None + # return ArrayAdapter.from_array(self.read([key])[key].values) + + # def generate_data_sources( + # self, + # mimetype: str, + # dict_or_none: Callable[[TableStructure], Dict[str, str]], + # item: Union[str, Path], + # is_directory: bool, + # ) -> List[DataSource]: + # """ + # + # Parameters + # ---------- + # mimetype : + # dict_or_none : + # item : + # is_directory : + # + # Returns + # ------- + # + # """ + # return [ + # DataSource( + # structure_family=self.dataframe_adapter.structure_family, + # mimetype=mimetype, + # structure=dict_or_none(self.dataframe_adapter.structure()), + # parameters={}, + # management=Management.external, + # assets=[ + # Asset( + # data_uri=ensure_uri(item), + # is_directory=is_directory, + # parameter="data_uris", # <-- PLURAL! + # num=0, # <-- denoting that the Adapter expects a list, and this is the first element + # ) + # ], + # ) + # ] + # + @classmethod + def from_single_file( + cls, + data_uri: str, + structure: Optional[TableStructure] = None, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + ) -> "ArrowAdapter": + """ + + Parameters + ---------- + data_uri : + structure : + metadata : + specs : + access_policy : + + Returns + ------- + + """ + return cls( + [data_uri], + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + + # def __getitem__(self, key: str) -> ArrayAdapter: + # """ + # + # Parameters + # ---------- + # key : + # + # Returns + # ------- + # + # """ + # # Must compute to determine shape. + # return ArrayAdapter.from_array(self.read([key])[key].values) + + # def items(self) -> Iterator[Tuple[str, ArrayAdapter]]: + # """ + # + # Returns + # ------- + # + # """ + # yield from ( + # (key, ArrayAdapter.from_array(self.read([key])[key].values)) + # for key in self._structure.columns + # ) + + +class ArrowAdapterStream(ArrowAdapter): + def __init__( + self, + data_uris: List[str], + structure: Optional[TableStructure] = None, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + ) -> None: + super().__init__( + data_uris=data_uris, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + + @property + def dataframe_adapter(self) -> TableAdapter: + """ + + Returns + ------- + + """ + partitions = [] + for path in self._partition_paths: + if not Path(path).exists(): + partition = None + else: + # partition = dask.dataframe.read_csv(path) + # with pyarrow.ipc.open_file(path) as reader: + with pyarrow.ipc.open_stream(path) as reader: + partition = reader + partitions.append(partition) + return DataFrameAdapter(partitions, self._structure) + def append_partition( self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int ) -> None: @@ -172,9 +259,7 @@ def append_partition( """ uri = self._partition_paths[partition] - # feather.write_feather(data, uri) print("HELL0 URI In APPEND", type(uri)) - # with pyarrow.OSFile('/tmp/test_csv_adapter/partition-0.arrow', mode='ab') as sink: self.stream_writer.write_batch(data) # self.stream_writer.close() @@ -192,9 +277,14 @@ def write_partition( ------- """ + if isinstance(data, list): + schema = data[0].schema + else: + schema = data.schema + uri = self._partition_paths[partition] if not hasattr(self, "stream_writer"): - self.stream_writer = pyarrow.ipc.new_stream(uri, data.schema) + self.stream_writer = pyarrow.ipc.new_stream(uri, schema) self.stream_writer.write_batch(data) @@ -209,12 +299,17 @@ def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None ------- """ + if isinstance(data, list): + schema = data[0].schema + else: + schema = data.schema + if self.structure().npartitions != 1: raise NotImplementedError uri = self._partition_paths[0] if not hasattr(self, "stream_writer"): - self.stream_writer = pyarrow.ipc.new_stream(uri, data.schema) - data.to_csv(uri, index=False) + self.stream_writer = pyarrow.ipc.new_stream(uri, schema) + self.stream_writer.write_batch(data) def read( self, *args: Any, **kwargs: Any @@ -246,121 +341,141 @@ def read_partition(self, *args: Any, **kwargs: Any) -> pandas.DataFrame: """ return self.dataframe_adapter.read_partition(*args, **kwargs) - def structure(self) -> TableStructure: + +class ArrowAdapterRandomAccess(ArrowAdapter): + def __init__( + self, + data_uris: List[str], + structure: Optional[TableStructure] = None, + metadata: Optional[JSON] = None, + specs: Optional[List[Spec]] = None, + access_policy: Optional[AccessPolicy] = None, + ) -> None: + super().__init__( + data_uris=data_uris, + structure=structure, + metadata=metadata, + specs=specs, + access_policy=access_policy, + ) + + @property + def dataframe_adapter(self) -> TableAdapter: """ Returns ------- """ - return self._structure + partitions = [] + for path in self._partition_paths: + if not Path(path).exists(): + partition = None + else: + # partition = dask.dataframe.read_csv(path) + with pyarrow.ipc.open_file(path) as reader: + # with pyarrow.ipc.open_stream(path) as reader: + partition = reader + partitions.append(partition) + return DataFrameAdapter(partitions, self._structure) - def get(self, key: str) -> Union[ArrayAdapter, None]: + def append_partition( + self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int + ) -> None: """ Parameters ---------- - key : + data : + partition : Returns ------- """ - if key not in self.structure().columns: - return None - return ArrayAdapter.from_array(self.read([key])[key].values) + uri = self._partition_paths[partition] + print("HELL0 URI In APPEND", type(uri)) + self.file_writer.write_batch(data) + # self.file_writer.close() - def generate_data_sources( - self, - mimetype: str, - dict_or_none: Callable[[TableStructure], Dict[str, str]], - item: Union[str, Path], - is_directory: bool, - ) -> List[DataSource]: + def write_partition( + self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame], partition: int + ) -> None: """ Parameters ---------- - mimetype : - dict_or_none : - item : - is_directory : + data : + partition : Returns ------- """ - return [ - DataSource( - structure_family=self.dataframe_adapter.structure_family, - mimetype=mimetype, - structure=dict_or_none(self.dataframe_adapter.structure()), - parameters={}, - management=Management.external, - assets=[ - Asset( - data_uri=ensure_uri(item), - is_directory=is_directory, - parameter="data_uris", # <-- PLURAL! - num=0, # <-- denoting that the Adapter expects a list, and this is the first element - ) - ], - ) - ] + if isinstance(data, list): + schema = data[0].schema + else: + schema = data.schema - @classmethod - def from_single_file( - cls, - data_uri: str, - structure: Optional[TableStructure] = None, - metadata: Optional[JSON] = None, - specs: Optional[List[Spec]] = None, - access_policy: Optional[AccessPolicy] = None, - ) -> "ArrowAdapter": + uri = self._partition_paths[partition] + if not hasattr(self, "stream_writer"): + self.file_writer = pyarrow.ipc.new_file(uri, schema) + + self.file_writer.write_batch(data) + # self.file_writer.close() + + def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None: """ Parameters ---------- - data_uri : - structure : - metadata : - specs : - access_policy : + data : Returns ------- """ - return cls( - [data_uri], - structure=structure, - metadata=metadata, - specs=specs, - access_policy=access_policy, - ) + if isinstance(data, list): + schema = data[0].schema + else: + schema = data.schema + + if self.structure().npartitions != 1: + raise NotImplementedError + uri = self._partition_paths[0] + if not hasattr(self, "file_writer"): + self.file_writer = pyarrow.ipc.new_file(uri, schema) + self.file_writer.write_batch(data) + # self.file_writer.close() - def __getitem__(self, key: str) -> ArrayAdapter: + def read( + self, *args: Any, **kwargs: Any + ) -> Union[pandas.DataFrame, dask.dataframe.DataFrame]: """ Parameters ---------- - key : + args : + kwargs : Returns ------- """ - # Must compute to determine shape. - return ArrayAdapter.from_array(self.read([key])[key].values) + self.file_writer.close() + return self.dataframe_adapter.read(*args, **kwargs) - def items(self) -> Iterator[Tuple[str, ArrayAdapter]]: + def read_partition(self, *args: Any, **kwargs: Any) -> pandas.DataFrame: """ + Parameters + ---------- + args : + kwargs : + Returns ------- """ - yield from ( - (key, ArrayAdapter.from_array(self.read([key])[key].values)) - for key in self._structure.columns - ) + self.file_writer.close() + return self.dataframe_adapter.read_partition(*args, **kwargs) diff --git a/tiled/adapters/table.py b/tiled/adapters/table.py index 2b8ce5d60..1917917a5 100644 --- a/tiled/adapters/table.py +++ b/tiled/adapters/table.py @@ -197,6 +197,10 @@ def read( return pyarrow.concat_tables( [partition.read_all() for partition in self._partitions] ) + if isinstance(self._partitions[0], pyarrow.ipc.RecordBatchFileReader): + return pyarrow.concat_tables( + [partition.read_all() for partition in self._partitions] + ) df = pandas.concat(self._partitions, axis=0) if fields is not None: @@ -230,9 +234,12 @@ def read_partition( if isinstance(df, dask.dataframe.DataFrame): return df.compute() # if isinstance(df, pyarrow.ipc.RecordBatchFileReader): + print("HERE In TABLE ADAPTER", df) if isinstance(df, pyarrow.ipc.RecordBatchStreamReader): batches = [b for b in df] return batches[batch] + if isinstance(df, pyarrow.ipc.RecordBatchFileReader): + return df.get_batch(batch) return partition