Skip to content

Commit

Permalink
feat!: Add TableDefinition wrapper for python (#5892)
Browse files Browse the repository at this point in the history
* `deephaven.table.TableDefinition`: new python wrapper for
`io.deephaven.engine.table.TableDefinition`
* `deephaven.column.ColumnDefinition`: new python wrapper for
`io.deephaven.engine.table.ColumnDefinition`
* `deephaven.table.TableDefinitionLike`: new type alias to allow for
consistent public APIs and conversions into
`deephaven.table.TableDefinition`
* `deephaven.column.Column`: deprecated for removal
* `deephaven.jcompat.j_table_definition`: deprecated for removal
* `deephaven.stream.kafka.consumer.json_spec`: `cols_defs` specified as
`List[Tuple[str, DType]]` deprecated for removal

Fixes #4822

BREAKING CHANGE: `deephaven.column.InputColumn` no longer inherits from
`deephaven.column.Column`; as such, it no longer exposes Column's
attributes. This is unlikely to affect users as InputColumn is really a
structure meant to support `new_table`.
  • Loading branch information
devinrsmith committed Aug 15, 2024
1 parent 9ca4332 commit 07786ef
Show file tree
Hide file tree
Showing 30 changed files with 854 additions and 338 deletions.
20 changes: 10 additions & 10 deletions py/server/deephaven/_table_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import numpy as np
from deephaven import update_graph

from deephaven.column import Column
from deephaven.column import ColumnDefinition
from deephaven.jcompat import to_sequence
from deephaven.numpy import _column_to_numpy_array
from deephaven.table import Table
Expand All @@ -18,7 +18,7 @@

T = TypeVar('T')

def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]:
def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[ColumnDefinition]:
if not cols:
col_defs = table.columns
else:
Expand All @@ -31,7 +31,7 @@ def _col_defs(table: Table, cols: Union[str, Sequence[str]]) -> Sequence[Column]


def _table_reader_all(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *,
emitter: Callable[[Sequence[Column], jpy.JType], T], row_set: jpy.JType,
emitter: Callable[[Sequence[ColumnDefinition], jpy.JType], T], row_set: jpy.JType,
prev: bool = False) -> T:
""" Reads all the rows in the given row set of a table. The emitter converts the Java data into a desired Python
object.
Expand Down Expand Up @@ -103,7 +103,7 @@ def _table_reader_all_dict(table: Table, cols: Optional[Union[str, Sequence[str]


def _table_reader_chunk(table: Table, cols: Optional[Union[str, Sequence[str]]] = None, *,
emitter: Callable[[Sequence[Column], jpy.JType], Iterable[T]], row_set: jpy.JType,
emitter: Callable[[Sequence[ColumnDefinition], jpy.JType], Iterable[T]], row_set: jpy.JType,
chunk_size: int = 2048, prev: bool = False) \
-> Generator[T, None, None]:
""" Returns a generator that reads one chunk of rows at a time from the table. The emitter converts the Java chunk
Expand Down Expand Up @@ -178,7 +178,7 @@ def _table_reader_chunk_dict(table: Table, cols: Optional[Union[str, Sequence[st
Raises:
ValueError
"""
def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Generator[Dict[str, np.ndarray], None, None]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Generator[Dict[str, np.ndarray], None, None]:
yield {col_def.name: _column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)}

return _table_reader_chunk(table, cols, emitter=_emitter, row_set=row_set, chunk_size=chunk_size, prev=prev)
Expand Down Expand Up @@ -210,9 +210,9 @@ def _table_reader_chunk_tuple(table: Table, cols: Optional[Union[str, Sequence[s
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)
named_tuple_class = namedtuple(tuple_name, cols or table.column_names, rename=False)

def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Generator[Tuple[np.ndarray], None, None]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Generator[Tuple[np.ndarray], None, None]:
yield named_tuple_class._make([_column_to_numpy_array(col_def, j_array[i]) for i, col_def in enumerate(col_defs)])

return _table_reader_chunk(table, cols, emitter=_emitter, row_set=table.j_table.getRowSet(), chunk_size=chunk_size, prev=False)
Expand Down Expand Up @@ -242,7 +242,7 @@ def _table_reader_row_dict(table: Table, cols: Optional[Union[str, Sequence[str]
Raises:
ValueError
"""
def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Iterable[Dict[str, Any]]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Iterable[Dict[str, Any]]:
make_dict = lambda values: {col_def.name: value for col_def, value in zip(col_defs, values)}
mvs = [memoryview(j_array[i]) if col_def.data_type.is_primitive else j_array[i] for i, col_def in enumerate(col_defs)]
return map(make_dict, zip(*mvs))
Expand Down Expand Up @@ -275,9 +275,9 @@ def _table_reader_row_tuple(table: Table, cols: Optional[Union[str, Sequence[str
Raises:
ValueError
"""
named_tuple_class = namedtuple(tuple_name, cols or [col.name for col in table.columns], rename=False)
named_tuple_class = namedtuple(tuple_name, cols or table.column_names, rename=False)

def _emitter(col_defs: Sequence[Column], j_array: jpy.JType) -> Iterable[Tuple[Any, ...]]:
def _emitter(col_defs: Sequence[ColumnDefinition], j_array: jpy.JType) -> Iterable[Tuple[Any, ...]]:
mvs = [memoryview(j_array[i]) if col_def.data_type.is_primitive else j_array[i] for i, col_def in enumerate(col_defs)]
return map(named_tuple_class._make, zip(*mvs))

Expand Down
184 changes: 145 additions & 39 deletions py/server/deephaven/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@

""" This module implements the Column class and functions that work with Columns. """

from dataclasses import dataclass, field
from enum import Enum
from typing import Sequence, Any
from functools import cached_property
from typing import Sequence, Any, Optional
from warnings import warn

import jpy

import deephaven.dtypes as dtypes
from deephaven import DHError
from deephaven.dtypes import DType
from deephaven.dtypes import _instant_array
from deephaven.dtypes import DType, _instant_array, from_jtype
from deephaven._wrapper import JObjectWrapper

_JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader")
_JColumn = jpy.get_type("io.deephaven.qst.column.Column")
Expand All @@ -32,46 +33,151 @@ def __repr__(self):
return self.name


@dataclass
class Column:
""" A Column object represents a column definition in a Deephaven Table. """
name: str
data_type: DType
component_type: DType = None
column_type: ColumnType = ColumnType.NORMAL
class ColumnDefinition(JObjectWrapper):
"""A Deephaven column definition."""

@property
def j_column_header(self):
return _JColumnHeader.of(self.name, self.data_type.qst_type)
j_object_type = _JColumnDefinition

def __init__(self, j_column_definition: jpy.JType):
self.j_column_definition = j_column_definition

@property
def j_column_definition(self):
if hasattr(self.data_type.j_type, 'jclass'):
j_data_type = self.data_type.j_type.jclass
else:
j_data_type = self.data_type.qst_type.clazz()
j_component_type = self.component_type.qst_type.clazz() if self.component_type else None
j_column_type = self.column_type.value
return _JColumnDefinition.fromGenericType(self.name, j_data_type, j_component_type, j_column_type)


@dataclass
class InputColumn(Column):
""" An InputColumn represents a user defined column with some input data. """
input_data: Any = field(default=None)

def __post_init__(self):
def j_object(self) -> jpy.JType:
return self.j_column_definition

@cached_property
def name(self) -> str:
"""The column name."""
return self.j_column_definition.getName()

@cached_property
def data_type(self) -> DType:
"""The column data type."""
return from_jtype(self.j_column_definition.getDataType())

@cached_property
def component_type(self) -> Optional[DType]:
"""The column component type."""
return from_jtype(self.j_column_definition.getComponentType())

@cached_property
def column_type(self) -> ColumnType:
"""The column type."""
return ColumnType(self.j_column_definition.getColumnType())


class Column(ColumnDefinition):
"""A Column object represents a column definition in a Deephaven Table. Deprecated for removal next release, prefer col_def."""

def __init__(
self,
name: str,
data_type: DType,
component_type: DType = None,
column_type: ColumnType = ColumnType.NORMAL,
):
"""Deprecated for removal next release, prefer col_def."""
warn(
"Column is deprecated for removal next release, prefer col_def",
DeprecationWarning,
stacklevel=2,
)
super().__init__(
col_def(name, data_type, component_type, column_type).j_column_definition
)


class InputColumn:
"""An InputColumn represents a user defined column with some input data."""

def __init__(
self,
name: str = None,
data_type: DType = None,
component_type: DType = None,
column_type: ColumnType = ColumnType.NORMAL,
input_data: Any = None,
):
"""Creates an InputColumn.
Args:
name (str): the column name
data_type (DType): the column data type
component_type (Optional[DType]): the column component type, None by default
column_type (ColumnType): the column type, NORMAL by default
input_data: Any: the input data, by default is None
Returns:
a new InputColumn
Raises:
DHError
"""
try:
if self.input_data is None:
self.j_column = _JColumn.empty(self.j_column_header)
else:
if self.data_type.is_primitive:
self.j_column = _JColumn.ofUnsafe(self.name, dtypes.array(self.data_type, self.input_data,
remap=dtypes.null_remap(self.data_type)))
else:
self.j_column = _JColumn.of(self.j_column_header, dtypes.array(self.data_type, self.input_data))
self._column_definition = col_def(
name, data_type, component_type, column_type
)
self.j_column = self._to_j_column(input_data)
except Exception as e:
raise DHError(e, f"failed to create an InputColumn ({self.name}).") from e
raise DHError(e, f"failed to create an InputColumn ({name}).") from e

def _to_j_column(self, input_data: Any = None) -> jpy.JType:
if input_data is None:
return _JColumn.empty(
_JColumnHeader.of(
self._column_definition.name,
self._column_definition.data_type.qst_type,
)
)
if self._column_definition.data_type.is_primitive:
return _JColumn.ofUnsafe(
self._column_definition.name,
dtypes.array(
self._column_definition.data_type,
input_data,
remap=dtypes.null_remap(self._column_definition.data_type),
),
)
return _JColumn.of(
_JColumnHeader.of(
self._column_definition.name, self._column_definition.data_type.qst_type
),
dtypes.array(self._column_definition.data_type, input_data),
)


def col_def(
name: str,
data_type: DType,
component_type: Optional[DType] = None,
column_type: ColumnType = ColumnType.NORMAL,
) -> ColumnDefinition:
"""Creates a ColumnDefinition.
Args:
name (str): the column name
data_type (DType): the column data type
component_type (Optional[DType]): the column component type, None by default
column_type (ColumnType): the column type, ColumnType.NORMAL by default
Returns:
a new ColumnDefinition
Raises:
DHError
"""
try:
return ColumnDefinition(
_JColumnDefinition.fromGenericType(
name,
data_type.j_type.jclass
if hasattr(data_type.j_type, "jclass")
else data_type.qst_type.clazz(),
component_type.qst_type.clazz() if component_type else None,
column_type.value,
)
)
except Exception as e:
raise DHError(e, f"failed to create a ColumnDefinition ({name}).") from e


def bool_col(name: str, data: Sequence) -> InputColumn:
Expand Down
13 changes: 4 additions & 9 deletions py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,8 @@

from deephaven import DHError
from deephaven._wrapper import JObjectWrapper
from deephaven.column import Column
from deephaven.dtypes import DType
from deephaven.experimental import s3

from deephaven.jcompat import j_table_definition

from deephaven.table import Table
from deephaven.table import Table, TableDefinition, TableDefinitionLike

_JIcebergInstructions = jpy.get_type("io.deephaven.iceberg.util.IcebergInstructions")
_JIcebergCatalogAdapter = jpy.get_type("io.deephaven.iceberg.util.IcebergCatalogAdapter")
Expand All @@ -39,14 +34,14 @@ class IcebergInstructions(JObjectWrapper):
j_object_type = _JIcebergInstructions

def __init__(self,
table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None,
table_definition: Optional[TableDefinitionLike] = None,
data_instructions: Optional[s3.S3Instructions] = None,
column_renames: Optional[Dict[str, str]] = None):
"""
Initializes the instructions using the provided parameters.
Args:
table_definition (Optional[Union[Dict[str, DType], List[Column], None]]): the table definition; if omitted,
table_definition (Optional[TableDefinitionLike]): the table definition; if omitted,
the definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table
will have that definition. This is useful for specifying a subset of the Iceberg schema columns.
data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when
Expand All @@ -62,7 +57,7 @@ def __init__(self,
builder = self.j_object_type.builder()

if table_definition is not None:
builder.tableDefinition(j_table_definition(table_definition))
builder.tableDefinition(TableDefinition(table_definition).j_table_definition)

if data_instructions is not None:
builder.dataInstructions(data_instructions.j_object)
Expand Down
Loading

0 comments on commit 07786ef

Please sign in to comment.