Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Gpib events #175

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
171 changes: 164 additions & 7 deletions pyvisa-py/gpib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from pyvisa import constants, logger, attributes

from .sessions import Session, UnknownAttribute
from .sessions import Session, UnknownAttribute, EventData

try:
import gpib
Expand All @@ -25,6 +25,7 @@
def _patch_Gpib():
if not hasattr(Gpib, "close"):
_old_del = Gpib.__del__

def _inner(self):
_old_del(self)
self._own = False
Expand Down Expand Up @@ -57,6 +58,8 @@ def _find_listeners():
# TODO: Check board indices other than 0.
BOARD = 0
# TODO: Check secondary addresses.


@Session.register(constants.InterfaceType.gpib, 'INSTR')
class GPIBSession(Session):
"""A GPIB Session that uses linux-gpib to do the low level communication.
Expand All @@ -82,10 +85,20 @@ def after_parsing(self):
timeout = 13
send_eoi = 1
eos_mode = 0
self.interface = Gpib(name=minor, pad=pad, sad=sad, timeout=timeout, send_eoi=send_eoi, eos_mode=eos_mode)
self.controller = Gpib(name=minor) # this is the bus controller device
self.interface = Gpib(name=minor, pad=pad, sad=sad,
timeout=timeout, send_eoi=send_eoi, eos_mode=eos_mode)
self.controller = Gpib(name=minor) # this is the bus controller device
# force timeout setting to interface
self.set_attribute(constants.VI_ATTR_TMO_VALUE, attributes.AttributesByID[constants.VI_ATTR_TMO_VALUE].default)
self.set_attribute(constants.VI_ATTR_TMO_VALUE,
attributes.AttributesByID[constants.VI_ATTR_TMO_VALUE].default)

# prepare set of allowed events
self.valid_event_types = {constants.VI_EVENT_IO_COMPLETION,
constants.VI_EVENT_SERVICE_REQ}

self.enabled_queue_events = set()

self.event_queue = []

def _get_timeout(self, attribute):
if self.interface:
Expand Down Expand Up @@ -150,9 +163,9 @@ def read(self, count):
"""

# 0x2000 = 8192 = END
checker = lambda current: self.interface.ibsta() & 8192
def checker(current): return self.interface.ibsta() & 8192

reader = lambda: self.interface.read(count)
def reader(): return self.interface.read(count)

return self._read(reader, count, checker, False, None, False, gpib.GpibError)

Expand All @@ -171,7 +184,7 @@ def write(self, data):

try:
self.interface.write(data)
count = self.interface.ibcnt() # number of bytes transmitted
count = self.interface.ibcnt() # number of bytes transmitted

return count, StatusCode.success

Expand Down Expand Up @@ -378,3 +391,147 @@ def read_stb(self):
return self.interface.serial_poll(), StatusCode.success
except gpib.GpibError:
return 0, StatusCode.error_system_error

def disable_event(self, event_type, mechanism):
"""Disables notification of the specified event type(s) via the specified mechanism(s).

Corresponds to viDisableEvent function of the VISA library.

:param event_type: Logical event identifier.
:param mechanism: Specifies event handling mechanisms to be disabled.
(Constants.VI_QUEUE, .VI_HNDLR, .VI_SUSPEND_HNDLR, .VI_ALL_MECH)
:return: return value of the library call.
:rtype: :class:`pyvisa.constants.StatusCode`
"""

if event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if mechanism in (constants.VI_QUEUE, constants.VI_ALL_MECH):
if event_type not in self.enabled_queue_events:
return StatusCode.success_event_already_disabled

self.enabled_queue_events.remove(event_type)
return StatusCode.success

return StatusCode.error_invalid_mechanism

def discard_events(self, event_type, mechanism):
"""Discards event occurrences for specified event types and mechanisms in a session.

Corresponds to viDiscardEvents function of the VISA library.

:param event_type: Logical event identifier.
:param mechanism: Specifies event handling mechanisms to be discarded.
(Constants.VI_QUEUE, .VI_SUSPEND_HNDLR, .VI_ALL_MECH)
:return: return value of the library call.
:rtype: :class:`pyvisa.constants.StatusCode`
"""
if event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if mechanism in (constants.VI_QUEUE, constants.VI_ALL_MECH):
self.event_queue = [(t, a) for t, a in self.event_queue if not (
event_type == constants.VI_ALL_ENABLED_EVENTS or t == event_type)]
return StatusCode.success

return StatusCode.error_invalid_mechanism

def enable_event(self, event_type, mechanism, context=None):
"""Enable event occurrences for specified event types and mechanisms in a session.

Corresponds to viEnableEvent function of the VISA library.

:param event_type: Logical event identifier.
:param mechanism: Specifies event handling mechanisms to be enabled.
(Constants.VI_QUEUE, .VI_HNDLR, .VI_SUSPEND_HNDLR)
:param context:
:return: return value of the library call.
:rtype: :class:`pyvisa.constants.StatusCode`
"""

if event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if mechanism in (constants.VI_QUEUE, constants.VI_ALL_MECH):
# enable GPIB autopoll
try:
self.controller.config(7, 1)
except gpib.GpibError:
return StatusCode.error_invalid_setup

if event_type in self.enabled_queue_events:
return StatusCode.success_event_already_enabled
else:
self.enabled_queue_events.add(event_type)
return StatusCode.success

# mechanisms which are not implemented: constants.VI_SUSPEND_HNDLR, constants.VI_ALL_MECH
return StatusCode.error_invalid_mechanism

def wait_on_event(self, in_event_type, timeout):
"""Waits for an occurrence of the specified event for a given session.

Corresponds to viWaitOnEvent function of the VISA library.

:param in_event_type: Logical identifier of the event(s) to wait for.
:param timeout: Absolute time period in time units that the resource shall wait for a specified event to
occur before returning the time elapsed error. The time unit is in milliseconds.
:return: - Logical identifier of the event actually received
- An object specifying the unique occurrence of an event
- return value of the library call.
:rtype: - eventtype
- EventData
- :class:`pyvisa.constants.StatusCode`
"""

if in_event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if in_event_type not in self.enabled_queue_events:
return StatusCode.error_not_enabled

# if the event queue is empty, wait for more events
if not self.event_queue:
old_timeout = self.timeout
self.timeout = timeout

event_mask = 0

if in_event_type in (constants.VI_EVENT_IO_COMPLETION, constants.VI_ALL_ENABLED_EVENTS):
event_mask |= 0x100 # CMPL

if in_event_type in (constants.VI_EVENT_SERVICE_REQ, constants.VI_ALL_ENABLED_EVENTS):
event_mask |= gpib.RQS

if timeout != 0:
event_mask |= gpib.TIMO

self.interface.wait(event_mask)
sta = self.interface.ibsta()

self.timeout = old_timeout

if 0x100 & event_mask & sta:
# TODO: implement all event attributes
# VI_ATTR_EVENT_TYPE: VI_EVENT_IO_COMPLETION,
# VI_ATTR_STATUS: return code of the asynchronous IO operation that has completed,
# VI_ATTR_JOB_ID: job ID of the asynchronous operation that has completed,
# VI_ATTR_BUFFER: the address of the buffer that was used in the asynchronous operation,
# VI_ATTR_RET_COUNT/VI_ATTR_RET_COUNT_32/VI_ATTR_RET_COUNT_64: number of elements that were asynchronously transferred,
# VI_ATTR_OPER_NAME: name of the operation generating the event
self.event_queue.append(
(constants.VI_EVENT_IO_COMPLETION,
EventData({constants.VI_ATTR_EVENT_TYPE: constants.VI_EVENT_IO_COMPLETION})))

if gpib.RQS & event_mask & sta:
self.event_queue.append(
(constants.VI_EVENT_SERVICE_REQ,
EventData({constants.VI_ATTR_EVENT_TYPE: constants.VI_EVENT_SERVICE_REQ})))

try:
out_event_type, event_data = self.event_queue.pop()
return out_event_type, event_data, StatusCode.success
except IndexError:
return in_event_type, None, StatusCode.error_timeout

Loading