Skip to content

Commit

Permalink
Add chunk_size check, code clean up, test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Jul 2, 2024
1 parent f298bc7 commit d834a58
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 71 deletions.
125 changes: 99 additions & 26 deletions py/server/deephaven/_table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
#
"""This module supports reading the data in a Deephaven table in a chunked manner."""

from typing import Union, Sequence, Generator, Dict, Optional, Any
from collections import namedtuple
from typing import Union, Sequence, Generator, Dict, Optional, Any, Tuple

import jpy
import numpy as np
Expand All @@ -22,33 +22,13 @@ def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]
else:
cols = to_sequence(cols)
col_defs = [col for col in table.columns if col.name in cols]
if len(col_defs) != len(cols):
raise ValueError(f"Invalid column names: {set(cols) - {col.name for col in col_defs}}")

return col_defs

def _table_reader_rows(table: Table, cols: Optional[Union[str, Sequence[str]]]) -> Generator[Dict[str, Any], None, None]:
""" A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names
to scalar values of the column data type.
Args:
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
Returns:
A generator that yields a dictionary of column names to a value.
"""
col_defs = _col_defs(table, cols)

for chunk_dict in _table_reader_chunks(table, cols=cols, row_set=table.j_table.getRowSet(), chunk_size=4096,
prev=False, to_numpy=False):
chunk_size = len(chunk_dict[col_defs[0].name])
for i in range(chunk_size):
col_dict = {}
for col_def in col_defs:
col_dict[col_def.name] = chunk_dict[col_def.name][i]
yield col_dict

def _table_reader_chunks(table: Table, *, cols: Optional[Union[str, Sequence[str]]], row_set: jpy.JType, chunk_size: Optional[int] = 4096,
prev: bool = False, to_numpy: bool = True) -> Generator[Dict[str, np.ndarray], None, None]:
def _table_reader_chunk_dict(table: Table, *, cols: Optional[Union[str, Sequence[str]]] = None, row_set: jpy.JType, chunk_size: int = 4096,
prev: bool = False, to_numpy: bool = True) -> Generator[Dict[str, Union[np.ndarray | jpy.JType]], None, None]:
""" A generator that reads the chunks of rows over the given row set of a table into a dictionary. The dictionary is
a map of column names to numpy arrays.
Expand All @@ -62,7 +42,13 @@ def _table_reader_chunks(table: Table, *, cols: Optional[Union[str, Sequence[str
Returns:
A generator that yields a dictionary of column names to numpy arrays.
Raises:
ValueError
"""
if chunk_size < 0:
raise ValueError("chunk_size must not be negative.")

col_defs = _col_defs(table, cols)

row_sequence_iterator = row_set.getRowSequenceIterator()
Expand All @@ -83,3 +69,90 @@ def _table_reader_chunks(table: Table, *, cols: Optional[Union[str, Sequence[str
j_reader_context.close()
row_sequence_iterator.close()

def _table_reader_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None) -> Generator[Dict[str, Any], None, None]:
""" A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names
to scalar values of the column data type.
Args:
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
Returns:
A generator that yields a dictionary of column names to a value.
Raises:
ValueError
"""
col_defs = _col_defs(table, cols)

for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(), chunk_size=4096,
prev=False, to_numpy=False):
chunk_size = len(chunk_dict[col_defs[0].name])
for i in range(chunk_size):
col_dict = {}
for col_def in col_defs:
col_dict[col_def.name] = chunk_dict[col_def.name][i]
yield col_dict

def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven') -> Generator[Tuple[Any, ...], None, None]:
""" Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made
up of fields with their names being the column names and their values being of the column data types.
If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire
the shared lock of the update graph before reading the table data. This provides a consistent view of the data.
The side effect of this is that the table will not be able to refresh while the table is being iterated on.
Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill
context will be released after the generator is destroyed. That can happen implicitly when the generator
is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed
after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it.
Args:
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None.
tuple_name (str): The name of the named tuple. Default is 'Deephaven'.
Returns:
A generator that yields a named tuple for each row in the table
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)

for row in _table_reader_dict(table, cols):
yield named_tuple_class(**row)


def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: int = 4096,
tuple_name: str = 'Deephaven')-> Generator[Tuple[np.ndarray, ...], None, None]:
""" Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named
tuple is made up of fields with their names being the column names and their values being numpy arrays of the
column data types.
If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire
the shared lock of the update graph before reading the table data. This provides a consistent view of the data.
The side effect of this is that the table will not be able to refresh while the table is being iterated on.
Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill
context will be released after the generator is destroyed. That can happen implicitly when the generator
is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed
after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it.
Args:
table (Table): The table to read.
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
chunk_size (int): The number of rows to read at a time. Default is 4096.
tuple_name (str): The name of the named tuple. Default is 'Deephaven'.
Returns:
A generator that yields a named tuple for each row in the table.
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)

for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(), chunk_size=chunk_size,
prev=False, to_numpy=True):
yield named_tuple_class(**chunk_dict)


51 changes: 24 additions & 27 deletions py/server/deephaven/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import contextlib
import inspect
from collections import namedtuple
from enum import Enum
from enum import auto
from typing import Any, Optional, Callable, Dict, Generator, Tuple
Expand Down Expand Up @@ -522,12 +521,14 @@ def iter_dict(self, cols: Optional[Union[str, Sequence[str]]] = None) -> Generat
Returns:
A generator that yields a dictionary of column names to scalar values.
"""
from deephaven._table_reader import _table_reader_rows # to prevent circular import
return _table_reader_rows(self, cols)
Raises:
ValueError
"""
from deephaven._table_reader import _table_reader_dict # to prevent circular import
return _table_reader_dict(self, cols)

def iter_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, name: str = 'Deephaven') -> Generator[Tuple[Any], None, None]:
def iter_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven') -> Generator[Tuple[Any, ...], None, None]:
""" Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made
up of fields with their names being the column names and their values being of the column data types.
Expand All @@ -541,18 +542,18 @@ def iter_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, name: str
Args:
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None.
name (str): The name of the named tuple. Default is 'Deephaven'.
tuple_name (str): The name of the named tuple. Default is 'Deephaven'.
Returns:
A generator that yields a named tuple or regular tuple for each row in the table
"""
from deephaven._table_reader import _table_reader_rows # to prevent circular import
named_tuple_class = namedtuple(name, cols or [col.name for col in self.columns], rename=False)
A generator that yields a named tuple for each row in the table
for row in _table_reader_rows(self, cols):
yield named_tuple_class(**row)
Raises:
ValueError
"""
from deephaven._table_reader import _table_reader_tuple # to prevent circular import
return _table_reader_tuple(self, cols, tuple_name = tuple_name)

def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: Optional[int] = 4096)-> Generator[Dict[str, np.ndarray], None, None]:
def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: int = 4096)-> Generator[Dict[str, np.ndarray], None, None]:
""" Returns a generator that reads one chunk of rows at a time from the table into a dictionary. The dictionary
is a map of column names to numpy arrays of the column data type.
Expand All @@ -566,21 +567,21 @@ def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, c
Args:
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
chunk_size (int, Optional): The number of rows to read at a time. Default is 4096.
chunk_size (int): The number of rows to read at a time. Default is 4096.
Returns:
A generator that yields a dictionary of column names to numpy arrays.
Raises
ValueError
"""
from deephaven._table_reader import _table_reader_chunks # to prevent circular import
from deephaven._table_reader import _table_reader_chunk_dict # to prevent circular import

return _table_reader_chunks(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size,
return _table_reader_chunk_dict(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size,
prev=False, to_numpy=True)

def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: Optional[int] = 4096,
name: str = 'Deephaven')-> Generator[Tuple[np.ndarray], None, None]:
def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: int = 4096,
tuple_name: str = 'Deephaven')-> Generator[Tuple[np.ndarray, ...], None, None]:
""" Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named
tuple is made up of fields with their names being the column names and their values being numpy arrays of the
column data types.
Expand All @@ -595,21 +596,17 @@ def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, *,
Args:
cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read.
chunk_size (int, Optional): The number of rows to read at a time. Default is 4096.
name (str): The name of the named tuple. Default is 'Deephaven'.
chunk_size (int): The number of rows to read at a time. Default is 4096.
tuple_name (str): The name of the named tuple. Default is 'Deephaven'.
Returns:
A generator that yields a dictionary of column names to numpy arrays.
A generator that yields a named tuple for each row in the table.
Raises:
ValueError
"""
from deephaven._table_reader import _table_reader_chunks # to prevent circular import
named_tuple_class = namedtuple(name, cols or [col.name for col in self.columns], rename=False)

for chunk in _table_reader_chunks(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size,
prev=False, to_numpy=True):
yield named_tuple_class(**chunk)
from deephaven._table_reader import _table_reader_chunk_tuple # to prevent circular import
return _table_reader_chunk_tuple(self, cols=cols, chunk_size=chunk_size)

def has_columns(self, cols: Union[str, Sequence[str]]):
"""Whether this table contains a column for each of the provided names, return False if any of the columns is
Expand Down
34 changes: 17 additions & 17 deletions py/server/deephaven/table_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from deephaven._wrapper import JObjectWrapper
from deephaven.jcompat import to_sequence
from deephaven.table import Table
from deephaven._table_reader import _table_reader_chunks
from deephaven._table_reader import _table_reader_chunk_dict
from deephaven.update_graph import UpdateGraph

_JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter")
Expand Down Expand Up @@ -51,8 +51,8 @@ def added(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
try:
j_row_set = self.j_table_update.added.asRowSet()
return next(
_table_reader_chunks(table=self.table, cols=cols, row_set= j_row_set,
prev=False, chunk_size=j_row_set.size(), to_numpy=True))
_table_reader_chunk_dict(table=self.table, cols=cols, row_set= j_row_set,
chunk_size=j_row_set.size(), prev=False, to_numpy=True))
except StopIteration:
return {}

Expand All @@ -71,8 +71,8 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G
if not self.j_table_update.added:
return (_ for _ in ())

return _table_reader_chunks(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(),
prev=False, chunk_size=chunk_size, to_numpy=True)
return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(),
chunk_size=chunk_size, prev=False, to_numpy=True)

def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
"""Returns a dict with each key being a column name and each value being a NumPy array of
Expand All @@ -90,8 +90,8 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray
try:
j_row_set = self.j_table_update.removed.asRowSet()
return next(
_table_reader_chunks(table=self.table, cols=cols, row_set=j_row_set,
chunk_size=j_row_set.size(), prev=True, to_numpy=True))
_table_reader_chunk_dict(table=self.table, cols=cols, row_set=j_row_set,
chunk_size=j_row_set.size(), prev=True, to_numpy=True))
except StopIteration:
return {}

Expand All @@ -110,8 +110,8 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) ->
if not self.j_table_update.removed:
return (_ for _ in ())

return _table_reader_chunks(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(),
chunk_size=chunk_size, prev=True, to_numpy=True)
return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(),
chunk_size=chunk_size, prev=True, to_numpy=True)

def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
"""Returns a dict with each key being a column name and each value being a NumPy array of the current values of
Expand All @@ -129,8 +129,8 @@ def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarra
try:
j_row_set = self.j_table_update.modified.asRowSet()
return next(
_table_reader_chunks(self.table, cols=cols, row_set=j_row_set,
chunk_size=j_row_set.size(), prev=False, to_numpy=True))
_table_reader_chunk_dict(self.table, cols=cols, row_set=j_row_set,
chunk_size=j_row_set.size(), prev=False, to_numpy=True))
except StopIteration:
return {}

Expand All @@ -149,8 +149,8 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -
if not self.j_table_update.modified:
return (_ for _ in ())

return _table_reader_chunks(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
chunk_size=chunk_size, prev=False, to_numpy=True)
return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
chunk_size=chunk_size, prev=False, to_numpy=True)

def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]:
"""Returns a dict with each key being a column name and each value being a NumPy array of the previous values of
Expand All @@ -168,8 +168,8 @@ def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.n
try:
j_row_set = self.j_table_update.modified.asRowSet()
return next(
_table_reader_chunks(self.table, cols=cols, row_set=j_row_set,
chunk_size=j_row_set.size(), prev=True, to_numpy=True))
_table_reader_chunk_dict(self.table, cols=cols, row_set=j_row_set,
chunk_size=j_row_set.size(), prev=True, to_numpy=True))
except StopIteration:
return {}

Expand All @@ -188,8 +188,8 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No
if not self.j_table_update.modified:
return (_ for _ in ())

return _table_reader_chunks(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
chunk_size=chunk_size, prev=True, to_numpy=True)
return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(),
chunk_size=chunk_size, prev=True, to_numpy=True)

@property
def shifted(self):
Expand Down
Loading

0 comments on commit d834a58

Please sign in to comment.