diff --git a/py/client/pydeephaven/_arrow_flight_service.py b/py/client/pydeephaven/_arrow_flight_service.py index 95200aacfd4..55875fac59e 100644 --- a/py/client/pydeephaven/_arrow_flight_service.py +++ b/py/client/pydeephaven/_arrow_flight_service.py @@ -21,7 +21,7 @@ def import_table(self, data: pa.Table) -> Table: try: if not isinstance(data, (pa.Table, pa.RecordBatch)): raise DHError("source data must be either a pa table or RecordBatch.") - ticket = self.session.next_export_ticket_no() + ticket = self.session.next_export_ticket_number() dh_fields = [] for f in data.schema: dh_fields.append(pa.field(name=f.name, type=f.type, metadata=map_arrow_type(f.type))) diff --git a/py/client/pydeephaven/_session_service.py b/py/client/pydeephaven/_session_service.py index 05fcee2f198..34a523f6dd5 100644 --- a/py/client/pydeephaven/_session_service.py +++ b/py/client/pydeephaven/_session_service.py @@ -39,8 +39,8 @@ def close(self): except Exception as e: raise DHError("failed to close the session.") from e - def release(self, ticket: Ticket): - """Releases an exported ticket.""" + def release(self, ticket: ExportTicket) -> None: + """Releases an export ticket.""" try: self.session.wrap_rpc( self._grpc_session_stub.Release, diff --git a/py/client/pydeephaven/experimental/plugin_client.py b/py/client/pydeephaven/experimental/plugin_client.py index 9646aef199d..de455b51c61 100644 --- a/py/client/pydeephaven/experimental/plugin_client.py +++ b/py/client/pydeephaven/experimental/plugin_client.py @@ -53,7 +53,8 @@ class Fetchable(ServerObject): """ def __init__(self, session, typed_ticket: ticket_pb2.TypedTicket): - super().__init__(type=typed_ticket.type, ticket=_ticket_from_proto(typed_ticket.ticket)) + export_ticket = ExportTicket(typed_ticket.ticket.ticket) + super().__init__(type=typed_ticket.type, ticket=export_ticket) self.session = session def fetch(self) -> Union[Table, PluginClient]: diff --git a/py/client/pydeephaven/session.py b/py/client/pydeephaven/session.py index 81ac0f5acb3..dc2d14ffbc2 100644 --- a/py/client/pydeephaven/session.py +++ b/py/client/pydeephaven/session.py @@ -335,11 +335,11 @@ def plugin_object_service(self) -> PluginObjService: def make_export_ticket(self, ticket_no=None) -> ExportTicket: if not ticket_no: - ticket_no = self.next_export_ticket_no() + ticket_no = self.next_export_ticket_number() ticket_bytes = ticket_no.to_bytes(4, byteorder='little', signed=True) return ExportTicket(ticket_bytes=b'e' + ticket_bytes) - def next_export_ticket_no(self) -> int: + def next_export_ticket_number(self) -> int: with self._r_lock: self._last_ticket += 1 if self._last_ticket == 2 ** 31 - 1: @@ -487,8 +487,8 @@ def close(self) -> None: self._last_ticket = 0 self._flight_client.close() - def release(self, ticket: Ticket) -> None: - """Releases a ticket. + def release(self, ticket: ExportTicket) -> None: + """Releases an export ticket. Args: ticket (Ticket): the ticket to release diff --git a/py/client/pydeephaven/ticket.py b/py/client/pydeephaven/ticket.py index 68fb3c1f673..1934f4ee6b6 100644 --- a/py/client/pydeephaven/ticket.py +++ b/py/client/pydeephaven/ticket.py @@ -114,6 +114,41 @@ def scope_ticket(cls, name: str) -> ScopeTicket: return cls(ticket_bytes=f's/{name}'.encode(encoding='ascii')) +class ApplicationTicket(Ticket): + """An ApplicationTicket is a ticket that references a field of an application on the server. """ + + def __init__(self, ticket_bytes: bytes): + """Initializes an ApplicationTicket object + + Args: + ticket_bytes (bytes): the raw bytes for the ticket + """ + if not ticket_bytes: + raise DHError('ApplicationTicket: ticket is None') + elif not ticket_bytes.startswith(b'a/'): + raise DHError(f'ApplicationTicket: ticket {ticket_bytes} is not an application ticket') + elif len(ticket_bytes.split(b'/')) != 3: + raise DHError(f'ApplicationTicket: ticket {ticket_bytes} is not in the correct format') + + self.app_id = ticket_bytes.split(b'/')[1].decode(encoding='ascii') + self.field = ticket_bytes.split(b'/')[2].decode(encoding='ascii') + + super().__init__(ticket_bytes) + + @classmethod + def app_ticket(cls, app_id: str, field: str) -> ApplicationTicket: + """Creates an application ticket that references a field of an application. + + Args: + app_id (str): the application id + field (str): the name of the application field + + Returns: + an ApplicationTicket object + """ + return cls(ticket_bytes=f'a/{app_id}/{field}'.encode(encoding='ascii')) + + def _ticket_from_proto(ticket: ticket_pb2.Ticket) -> Ticket: """Creates a Ticket object from a gRPC protobuf ticket object. @@ -133,6 +168,8 @@ def _ticket_from_proto(ticket: ticket_pb2.Ticket) -> Ticket: return ExportTicket(ticket_bytes) elif ticket_bytes.startswith(b's/'): return ScopeTicket(ticket_bytes) + elif ticket_bytes.startswith(b'a/'): + return ApplicationTicket(ticket_bytes) else: raise DHError(f'Unknown ticket type: {ticket_bytes}')