From d834a58887158fa46e4a1f57dedb0f2aef7ebc4d Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Tue, 2 Jul 2024 10:05:10 -0600 Subject: [PATCH] Add chunk_size check, code clean up, test cases --- py/server/deephaven/_table_reader.py | 125 ++++++++++++++++++++----- py/server/deephaven/table.py | 51 +++++----- py/server/deephaven/table_listener.py | 34 +++---- py/server/tests/test_table_iterator.py | 10 +- 4 files changed, 149 insertions(+), 71 deletions(-) diff --git a/py/server/deephaven/_table_reader.py b/py/server/deephaven/_table_reader.py index b9ff7c0905c..9cab3f2e5a5 100644 --- a/py/server/deephaven/_table_reader.py +++ b/py/server/deephaven/_table_reader.py @@ -2,8 +2,8 @@ # Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending # """This module supports reading the data in a Deephaven table in a chunked manner.""" - -from typing import Union, Sequence, Generator, Dict, Optional, Any +from collections import namedtuple +from typing import Union, Sequence, Generator, Dict, Optional, Any, Tuple import jpy import numpy as np @@ -22,33 +22,13 @@ def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column] else: cols = to_sequence(cols) col_defs = [col for col in table.columns if col.name in cols] + if len(col_defs) != len(cols): + raise ValueError(f"Invalid column names: {set(cols) - {col.name for col in col_defs}}") return col_defs -def _table_reader_rows(table: Table, cols: Optional[Union[str, Sequence[str]]]) -> Generator[Dict[str, Any], None, None]: - """ A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names - to scalar values of the column data type. - - Args: - table (Table): The table to read. - cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. - - Returns: - A generator that yields a dictionary of column names to a value. - """ - col_defs = _col_defs(table, cols) - - for chunk_dict in _table_reader_chunks(table, cols=cols, row_set=table.j_table.getRowSet(), chunk_size=4096, - prev=False, to_numpy=False): - chunk_size = len(chunk_dict[col_defs[0].name]) - for i in range(chunk_size): - col_dict = {} - for col_def in col_defs: - col_dict[col_def.name] = chunk_dict[col_def.name][i] - yield col_dict - -def _table_reader_chunks(table: Table, *, cols: Optional[Union[str, Sequence[str]]], row_set: jpy.JType, chunk_size: Optional[int] = 4096, - prev: bool = False, to_numpy: bool = True) -> Generator[Dict[str, np.ndarray], None, None]: +def _table_reader_chunk_dict(table: Table, *, cols: Optional[Union[str, Sequence[str]]] = None, row_set: jpy.JType, chunk_size: int = 4096, + prev: bool = False, to_numpy: bool = True) -> Generator[Dict[str, Union[np.ndarray | jpy.JType]], None, None]: """ A generator that reads the chunks of rows over the given row set of a table into a dictionary. The dictionary is a map of column names to numpy arrays. @@ -62,7 +42,13 @@ def _table_reader_chunks(table: Table, *, cols: Optional[Union[str, Sequence[str Returns: A generator that yields a dictionary of column names to numpy arrays. + + Raises: + ValueError """ + if chunk_size < 0: + raise ValueError("chunk_size must not be negative.") + col_defs = _col_defs(table, cols) row_sequence_iterator = row_set.getRowSequenceIterator() @@ -83,3 +69,90 @@ def _table_reader_chunks(table: Table, *, cols: Optional[Union[str, Sequence[str j_reader_context.close() row_sequence_iterator.close() +def _table_reader_dict(table: Table, cols: Optional[Union[str, Sequence[str]]] = None) -> Generator[Dict[str, Any], None, None]: + """ A generator that reads one row at a time from a table into a dictionary. The dictionary is a map of column names + to scalar values of the column data type. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + + Returns: + A generator that yields a dictionary of column names to a value. + + Raises: + ValueError + """ + col_defs = _col_defs(table, cols) + + for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(), chunk_size=4096, + prev=False, to_numpy=False): + chunk_size = len(chunk_dict[col_defs[0].name]) + for i in range(chunk_size): + col_dict = {} + for col_def in col_defs: + col_dict[col_def.name] = chunk_dict[col_def.name][i] + yield col_dict + +def _table_reader_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven') -> Generator[Tuple[Any, ...], None, None]: + """ Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made + up of fields with their names being the column names and their values being of the column data types. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. + + Returns: + A generator that yields a named tuple for each row in the table + + Raises: + ValueError + """ + named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False) + + for row in _table_reader_dict(table, cols): + yield named_tuple_class(**row) + + +def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: int = 4096, + tuple_name: str = 'Deephaven')-> Generator[Tuple[np.ndarray, ...], None, None]: + """ Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named + tuple is made up of fields with their names being the column names and their values being numpy arrays of the + column data types. + + If the table is refreshing and no update graph locks are currently being held, the generator will try to acquire + the shared lock of the update graph before reading the table data. This provides a consistent view of the data. + The side effect of this is that the table will not be able to refresh while the table is being iterated on. + Additionally, the generator internally maintains a fill context. The auto acquired shared lock and the fill + context will be released after the generator is destroyed. That can happen implicitly when the generator + is used in a for-loop. When the generator is not used in a for-loop, to prevent resource leaks, it must be closed + after use by either (1) by setting it to None, (2) using the del statement, or (3) calling the close() method on it. + + Args: + table (Table): The table to read. + cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. + chunk_size (int): The number of rows to read at a time. Default is 4096. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. + + Returns: + A generator that yields a named tuple for each row in the table. + + Raises: + ValueError + """ + named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False) + + for chunk_dict in _table_reader_chunk_dict(table, cols=cols, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, + prev=False, to_numpy=True): + yield named_tuple_class(**chunk_dict) + + diff --git a/py/server/deephaven/table.py b/py/server/deephaven/table.py index dc615a53e09..5d939441c00 100644 --- a/py/server/deephaven/table.py +++ b/py/server/deephaven/table.py @@ -9,7 +9,6 @@ import contextlib import inspect -from collections import namedtuple from enum import Enum from enum import auto from typing import Any, Optional, Callable, Dict, Generator, Tuple @@ -522,12 +521,14 @@ def iter_dict(self, cols: Optional[Union[str, Sequence[str]]] = None) -> Generat Returns: A generator that yields a dictionary of column names to scalar values. - """ - from deephaven._table_reader import _table_reader_rows # to prevent circular import - return _table_reader_rows(self, cols) + Raises: + ValueError + """ + from deephaven._table_reader import _table_reader_dict # to prevent circular import + return _table_reader_dict(self, cols) - def iter_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, name: str = 'Deephaven') -> Generator[Tuple[Any], None, None]: + def iter_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, tuple_name: str = 'Deephaven') -> Generator[Tuple[Any, ...], None, None]: """ Returns a generator that reads one row at a time from the table into a named tuple. The named tuple is made up of fields with their names being the column names and their values being of the column data types. @@ -541,18 +542,18 @@ def iter_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, name: str Args: cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. Default is None. - name (str): The name of the named tuple. Default is 'Deephaven'. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. Returns: - A generator that yields a named tuple or regular tuple for each row in the table - """ - from deephaven._table_reader import _table_reader_rows # to prevent circular import - named_tuple_class = namedtuple(name, cols or [col.name for col in self.columns], rename=False) + A generator that yields a named tuple for each row in the table - for row in _table_reader_rows(self, cols): - yield named_tuple_class(**row) + Raises: + ValueError + """ + from deephaven._table_reader import _table_reader_tuple # to prevent circular import + return _table_reader_tuple(self, cols, tuple_name = tuple_name) - def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: Optional[int] = 4096)-> Generator[Dict[str, np.ndarray], None, None]: + def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: int = 4096)-> Generator[Dict[str, np.ndarray], None, None]: """ Returns a generator that reads one chunk of rows at a time from the table into a dictionary. The dictionary is a map of column names to numpy arrays of the column data type. @@ -566,7 +567,7 @@ def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, c Args: cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. - chunk_size (int, Optional): The number of rows to read at a time. Default is 4096. + chunk_size (int): The number of rows to read at a time. Default is 4096. Returns: A generator that yields a dictionary of column names to numpy arrays. @@ -574,13 +575,13 @@ def iter_chunk_dict(self, cols: Optional[Union[str, Sequence[str]]] = None, *, c Raises ValueError """ - from deephaven._table_reader import _table_reader_chunks # to prevent circular import + from deephaven._table_reader import _table_reader_chunk_dict # to prevent circular import - return _table_reader_chunks(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size, + return _table_reader_chunk_dict(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size, prev=False, to_numpy=True) - def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: Optional[int] = 4096, - name: str = 'Deephaven')-> Generator[Tuple[np.ndarray], None, None]: + def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, *, chunk_size: int = 4096, + tuple_name: str = 'Deephaven')-> Generator[Tuple[np.ndarray, ...], None, None]: """ Returns a generator that reads one chunk of rows at a time from the table into a named tuple. The named tuple is made up of fields with their names being the column names and their values being numpy arrays of the column data types. @@ -595,21 +596,17 @@ def iter_chunk_tuple(self, cols: Optional[Union[str, Sequence[str]]] = None, *, Args: cols (Optional[Union[str, Sequence[str]]]): The columns to read. If None, all columns are read. - chunk_size (int, Optional): The number of rows to read at a time. Default is 4096. - name (str): The name of the named tuple. Default is 'Deephaven'. + chunk_size (int): The number of rows to read at a time. Default is 4096. + tuple_name (str): The name of the named tuple. Default is 'Deephaven'. Returns: - A generator that yields a dictionary of column names to numpy arrays. + A generator that yields a named tuple for each row in the table. Raises: ValueError """ - from deephaven._table_reader import _table_reader_chunks # to prevent circular import - named_tuple_class = namedtuple(name, cols or [col.name for col in self.columns], rename=False) - - for chunk in _table_reader_chunks(self, cols=cols, row_set=self.j_table.getRowSet(), chunk_size=chunk_size, - prev=False, to_numpy=True): - yield named_tuple_class(**chunk) + from deephaven._table_reader import _table_reader_chunk_tuple # to prevent circular import + return _table_reader_chunk_tuple(self, cols=cols, chunk_size=chunk_size) def has_columns(self, cols: Union[str, Sequence[str]]): """Whether this table contains a column for each of the provided names, return False if any of the columns is diff --git a/py/server/deephaven/table_listener.py b/py/server/deephaven/table_listener.py index 2973d4019ed..a6b5bed4dd0 100644 --- a/py/server/deephaven/table_listener.py +++ b/py/server/deephaven/table_listener.py @@ -16,7 +16,7 @@ from deephaven._wrapper import JObjectWrapper from deephaven.jcompat import to_sequence from deephaven.table import Table -from deephaven._table_reader import _table_reader_chunks +from deephaven._table_reader import _table_reader_chunk_dict from deephaven.update_graph import UpdateGraph _JPythonReplayListenerAdapter = jpy.get_type("io.deephaven.integrations.python.PythonReplayListenerAdapter") @@ -51,8 +51,8 @@ def added(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: try: j_row_set = self.j_table_update.added.asRowSet() return next( - _table_reader_chunks(table=self.table, cols=cols, row_set= j_row_set, - prev=False, chunk_size=j_row_set.size(), to_numpy=True)) + _table_reader_chunk_dict(table=self.table, cols=cols, row_set= j_row_set, + chunk_size=j_row_set.size(), prev=False, to_numpy=True)) except StopIteration: return {} @@ -71,8 +71,8 @@ def added_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> G if not self.j_table_update.added: return (_ for _ in ()) - return _table_reader_chunks(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), - prev=False, chunk_size=chunk_size, to_numpy=True) + return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.added.asRowSet(), + chunk_size=chunk_size, prev=False, to_numpy=True) def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of @@ -90,8 +90,8 @@ def removed(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray try: j_row_set = self.j_table_update.removed.asRowSet() return next( - _table_reader_chunks(table=self.table, cols=cols, row_set=j_row_set, - chunk_size=j_row_set.size(), prev=True, to_numpy=True)) + _table_reader_chunk_dict(table=self.table, cols=cols, row_set=j_row_set, + chunk_size=j_row_set.size(), prev=True, to_numpy=True)) except StopIteration: return {} @@ -110,8 +110,8 @@ def removed_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) -> if not self.j_table_update.removed: return (_ for _ in ()) - return _table_reader_chunks(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), - chunk_size=chunk_size, prev=True, to_numpy=True) + return _table_reader_chunk_dict(table=self.table, cols=cols, row_set=self.j_table_update.removed.asRowSet(), + chunk_size=chunk_size, prev=True, to_numpy=True) def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of the current values of @@ -129,8 +129,8 @@ def modified(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarra try: j_row_set = self.j_table_update.modified.asRowSet() return next( - _table_reader_chunks(self.table, cols=cols, row_set=j_row_set, - chunk_size=j_row_set.size(), prev=False, to_numpy=True)) + _table_reader_chunk_dict(self.table, cols=cols, row_set=j_row_set, + chunk_size=j_row_set.size(), prev=False, to_numpy=True)) except StopIteration: return {} @@ -149,8 +149,8 @@ def modified_chunks(self, chunk_size: int, cols: Union[str, List[str]] = None) - if not self.j_table_update.modified: return (_ for _ in ()) - return _table_reader_chunks(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=chunk_size, prev=False, to_numpy=True) + return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + chunk_size=chunk_size, prev=False, to_numpy=True) def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.ndarray]: """Returns a dict with each key being a column name and each value being a NumPy array of the previous values of @@ -168,8 +168,8 @@ def modified_prev(self, cols: Union[str, List[str]] = None) -> Dict[str, numpy.n try: j_row_set = self.j_table_update.modified.asRowSet() return next( - _table_reader_chunks(self.table, cols=cols, row_set=j_row_set, - chunk_size=j_row_set.size(), prev=True, to_numpy=True)) + _table_reader_chunk_dict(self.table, cols=cols, row_set=j_row_set, + chunk_size=j_row_set.size(), prev=True, to_numpy=True)) except StopIteration: return {} @@ -188,8 +188,8 @@ def modified_prev_chunks(self, chunk_size: int, cols: Union[str, List[str]] = No if not self.j_table_update.modified: return (_ for _ in ()) - return _table_reader_chunks(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), - chunk_size=chunk_size, prev=True, to_numpy=True) + return _table_reader_chunk_dict(self.table, cols=cols, row_set=self.j_table_update.modified.asRowSet(), + chunk_size=chunk_size, prev=True, to_numpy=True) @property def shifted(self): diff --git a/py/server/tests/test_table_iterator.py b/py/server/tests/test_table_iterator.py index 36bac4e3d57..e346e98c20b 100644 --- a/py/server/tests/test_table_iterator.py +++ b/py/server/tests/test_table_iterator.py @@ -369,13 +369,21 @@ def test_iteration_tuple_unpack(self): for a, b, c, d, e, f in test_table.iter_tuple(): ... - def test_iteration_tuple_name_error(self): + def test_iteration_errors(self): test_table = time_table("PT00:00:00.001").update(["from = i%11"]) with self.assertRaises(ValueError) as cm: for t in test_table.iter_tuple(): pass self.assertIn("'from'", str(cm.exception)) + with self.assertRaises(ValueError) as cm: + for t in test_table.iter_tuple(cols=["from_"]): + pass + self.assertIn("'from_'", str(cm.exception)) + + with self.assertRaises(ValueError) as cm: + for t in test_table.iter_chunk_tuple(chunk_size=-1): + pass if __name__ == '__main__': unittest.main()