Skip to content

Commit

Permalink
Merge pull request #326 from eosti/main
Browse files Browse the repository at this point in the history
Add TCPIP instrument discovery
  • Loading branch information
MatthieuDartiailh committed Sep 15, 2022
2 parents bd68b55 + 5a72ad2 commit 76bda99
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
PyVISA-py Changelog
===================

0.5.4 (Unreleased)
------------------
- Implement list_resources for TCPIP instruments

0.5.3 (12-05-2022)
------------------
- fix tcp/ip connections dropping from inside Docker containers after 5 minute idling #285
Expand Down
4 changes: 3 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ Requirements
- Python (tested with 3.6+)
- PyVISA 1.11+

Optionally
Optionally:

- PySerial (to interface with Serial instruments)
- PyUSB (to interface with USB instruments)
- linux-gpib (to interface with gpib instruments, only on linux)
- gpib-ctypes (to interface with GPIB instruments on Windows and Linux)
- psutil (to discover TCPIP devices across multiple interfaces)


Installation
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
# All configuration values have a default; values that are commented out
# serve to show the default.

import datetime
import os
import sys
import datetime

if sys.version_info >= (3, 8):
from importlib.metadata import version as get_version
Expand Down
6 changes: 5 additions & 1 deletion docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Pyvisa-py relies on :py:mod:`socket` module in the Python Standard Library to
interact with the instrument which you do not need to install any extra library
to access those resources.

To discover devices on all interfaces, please install `psutil`_. Otherwise,
discovery will only occur on the default network interface.


Serial resources: ASRL INSTR
----------------------------
Expand Down Expand Up @@ -97,4 +100,5 @@ form GitHub_::
.. _`LibreVISA`: http://www.librevisa.org/
.. _`issue tracker`: https://github.com/pyvisa/pyvisa-py/issues
.. _`linux-gpib`: http://linux-gpib.sourceforge.net/
.. _`gpib-ctypes`: https://pypi.org/project/gpib-ctypes/
.. _`gpib-ctypes`: https://pypi.org/project/gpib-ctypes/
.. _`psutil`: https://pypi.org/project/psutil/
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ dynamic=["version"]
gpib-ctypes = ["gpib-ctypes>=0.3.0"]
serial = ["pyserial>=3.0"]
usb = ["pyusb"]
psutil = ["psutil"]


[project.urls]
homepage = "https://github.com/pyvisa/pyvisa-py"
Expand Down
58 changes: 58 additions & 0 deletions pyvisa_py/protocols/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,52 @@ def dummy():
self.reply_handler(reply, fromaddr)
return replies

def send_call(self, proc, args, pack_func):
if pack_func is None and args is not None:
raise TypeError("non-null args with null pack_func")
self.start_call(proc)
if pack_func:
pack_func(args)
call = self.packer.get_buf()
try:
_sendto(self.sock, call, (self.host, self.port))
except OSError as exc:
raise RPCError("unable to send broadcast") from exc

def recv_call(self, unpack_func):
BUFSIZE = 8192 # Max UDP buffer size (for reply)
replies = []
if unpack_func is None:

def dummy():
pass

unpack_func = dummy
while 1:
r, w, x = [self.sock], [], []
if select:
if self.timeout is None:
r, w, x = select.select(r, w, x)
else:
r, w, x = select.select(r, w, x, self.timeout)
if self.sock not in r:
break
try:
reply, fromaddr = self.sock.recvfrom(BUFSIZE)
except OSError as exc:
raise RPCError("unable to recieve broadcast") from exc
u = self.unpacker
u.reset(reply)
xid, verf = u.unpack_replyheader()
if xid != self.lastxid:
continue
reply = unpack_func()
self.unpacker.done()
replies.append((reply, fromaddr))
if self.reply_handler:
self.reply_handler(reply, fromaddr)
return replies


# Port mapper interface

Expand Down Expand Up @@ -729,6 +775,18 @@ def get_port(self, mapping):
self.unpacker.unpack_uint,
)

def send_port(self, mapping):
return self.send_call(
PortMapperVersion.get_port,
mapping,
self.packer.pack_mapping,
)

def recv_port(self, mapping):
return self.recv_call(
self.unpacker.unpack_uint,
)

def dump(self):
return self.make_call(
PortMapperVersion.dump, None, None, self.unpacker.unpack_pmaplist
Expand Down
54 changes: 52 additions & 2 deletions pyvisa_py/tcpip.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
:license: MIT, see LICENSE for more details.
"""
import ipaddress
import random
import select
import socket
import time
from typing import Any, List, Optional, Tuple

# Let psutil be optional dependency
try:
import psutil # type: ignore
except ImportError:
psutil = None

from pyvisa import attributes, constants, errors, rname
from pyvisa.constants import ResourceAttribute, StatusCode

Expand Down Expand Up @@ -64,8 +71,51 @@ class TCPIPInstrSession(Session):

@staticmethod
def list_resources() -> List[str]:
# TODO: is there a way to get this?
return []
broadcast_addr = []
if psutil is not None:
# Get broadcast address for each interface
for interface, snics in psutil.net_if_addrs().items():
for snic in snics:
if snic.family is socket.AF_INET:
addr = snic.address
mask = snic.netmask
network = ipaddress.IPv4Network(addr + "/" + mask, strict=False)
broadcast_addr.append(str(network.broadcast_address))
else:
# If psutil unavailable fallback to default interface
broadcast_addr.append("255.255.255.255")

pmap_list = [rpc.BroadcastUDPPortMapperClient(ip) for ip in broadcast_addr]
for pmap in list(pmap_list):
pmap.set_timeout(0)
try:
pmap.send_port(
(vxi11.DEVICE_CORE_PROG, vxi11.DEVICE_CORE_VERS, rpc.IPPROTO_TCP, 0)
)
except rpc.RPCError:
pmap_list.remove(pmap)

# Timeout for responses
time.sleep(1)

all_res = []
for pmap in pmap_list:
try:
resp = pmap.recv_port(
(vxi11.DEVICE_CORE_PROG, vxi11.DEVICE_CORE_VERS, rpc.IPPROTO_TCP, 0)
)
except rpc.RPCError:
pass
else:
res = [r[1][0] for r in resp if r[0] > 0]
res = sorted(
res, key=lambda ip: tuple(int(part) for part in ip.split("."))
)
# TODO: Detect GPIB over TCPIP
res = ["TCPIP::{}::INSTR".format(host) for host in res]
all_res.extend(res)

return all_res

def after_parsing(self) -> None:
# TODO: board_number not handled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
"""
import pytest
from pyvisa.testsuite.keysight_assisted_tests import copy_func, require_virtual_instr
from pyvisa.rname import ResourceName
from pyvisa.testsuite.keysight_assisted_tests import (
RESOURCE_ADDRESSES,
copy_func,
require_virtual_instr,
)
from pyvisa.testsuite.keysight_assisted_tests.test_resource_manager import (
TestResourceManager as BaseTestResourceManager,
TestResourceParsing as BaseTestResourceParsing,
Expand All @@ -14,9 +19,15 @@
class TestPyResourceManager(BaseTestResourceManager):
""" """

test_list_resource = pytest.mark.xfail(
copy_func(BaseTestResourceManager.test_list_resource)
)
def test_list_resource(self):
"""Test listing the available resources.
The bot supports only TCPIP and of those resources we expect to be able
to list only INSTR resources not SOCKET.
"""
# Default settings
resources = self.rm.list_resources()
for v in (v for v in RESOURCE_ADDRESSES.values() if v.endswith("INSTR")):
assert str(ResourceName.from_string(v)) in resources

test_last_status = pytest.mark.xfail(
copy_func(BaseTestResourceManager.test_last_status)
Expand Down

0 comments on commit 76bda99

Please sign in to comment.