diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index f220d28e5..3da2d3295 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -36,6 +36,7 @@ # interface_name => (module, classname) BACKENDS: Dict[str, Tuple[str, str]] = { + "zlg": ("can.interfaces.zlg", "ZlgBus"), "kvaser": ("can.interfaces.kvaser", "KvaserBus"), "socketcan": ("can.interfaces.socketcan", "SocketcanBus"), "serial": ("can.interfaces.serial.serial_can", "SerialBus"), diff --git a/can/interfaces/zlg/__init__.py b/can/interfaces/zlg/__init__.py new file mode 100644 index 000000000..6d52db7a9 --- /dev/null +++ b/can/interfaces/zlg/__init__.py @@ -0,0 +1,5 @@ +__all__ = [ + "ZlgBus", +] + +from can.interfaces.zlg.zlg import * diff --git a/can/interfaces/zlg/zlg.py b/can/interfaces/zlg/zlg.py new file mode 100644 index 000000000..118a2cb39 --- /dev/null +++ b/can/interfaces/zlg/zlg.py @@ -0,0 +1,325 @@ +import can +from . import zlgcan +import logging +from typing import Optional, Dict, List +from can import BusABC +import threading +from queue import Queue, Empty +from can.exceptions import ( + CanInitializationError, + CanInterfaceNotImplementedError, + CanOperationError, +) + + +class CanIso: + CAN_ISO = "0" + CAN_NO_ISO = "1" + + +class CanMode: + NORMAL = 0 + ONLY_LISTENING = 1 + + +class TransmitType: + NORMAL = 0 # Retry when fail + ONCE = 1 # NO Retry when fail + RECEIVE_OWN_MESSAGES = 2 # Retry when fail + ONCE_RECEIVE_OWN_MESSAGES = 3 # NO Retry when fail + + +MAX_RCV_NUM = 1000 + +log = logging.getLogger("can.zlg") + + +def raise_can_operation_error(ret: int, message): + if ret != zlgcan.ZCAN_STATUS_OK: + raise CanOperationError(message) + + +class ZlgBus(BusABC): + """ + The CAN Bus implemented for the Kvaser interface. + """ + + def __init__( + self, + channel: int, + dev_type: int, + fd: bool = False, + bitrate: int = 500000, + data_bitrate: int = 2000000, + receive_own_messages: bool = False, + filters: Optional[Dict] = None, + retry_when_send_fail: bool = False, + **kwargs, + ): + """ + :param int channel: + The Channel id to create this bus with. + + :param int dev_type: + The dev_type is type of Can device. + :param bool fd: + If CAN-FD frames should be supported. + + :param int bitrate: + Bitrate of channel in bit/s + + :param int data_bitrate: + Which bitrate to use for data phase in CAN FD. + Defaults to arbitration bitrate. + + :param bool driver_mode: + Silent or normal. + + :param bool receive_own_messages: + If messages transmitted should also be received back. + Only works if single_handle is also False. + If you want to receive messages from other applications on the same + computer, set this to True or set single_handle to True. + + :param list filters: + Filters to apply to the bus, it is dict, key include "is_extended/filter_start/filter_end". + + :param bool retry_when_send_fail: + If the packet fails to be sent, resend the packet automatically. + + """ + self.channel = channel + self.receive_own_messages = receive_own_messages + self.retry_when_send_fail = retry_when_send_fail + self.fd = fd + self.queue_recv = Queue() + self.queue_send = Queue() + self.zcanlib = zlgcan.ZCAN() + self.device_handle = self.zcanlib.OpenDevice( + dev_type, 0, 0 + ) # 第二个参数是设备序号,默认0,只用第一个周立功 + if self.device_handle == zlgcan.INVALID_DEVICE_HANDLE: + raise CanInitializationError( + f"fail to get device handle, device type {dev_type}" + ) from None + # set bitrate + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, str(channel) + "/canfd_abit_baud_rate", str(bitrate).encode("utf-8") + ) + raise_can_operation_error( + ret, f"fail to set data_bitrate {data_bitrate} for channel {channel}" + ) + if fd: + # set canfd + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, str(channel) + "/canfd_standard", CanIso.CAN_ISO.encode("utf-8") + ) + raise_can_operation_error(ret, f"fail to set canfd for channel {channel}") + # set data_bitrate for canfd + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, + str(channel) + "/canfd_dbit_baud_rate", + str(data_bitrate).encode("utf-8"), + ) + raise_can_operation_error( + ret, f"fail to set data_bitrate {data_bitrate} for channel {channel}" + ) + # merge can canfd + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, str(channel) + "/set_device_recv_merge", "1".encode("utf-8") + ) # 0-单通道接收,1合并接收 + raise_can_operation_error(ret, f"set_device_recv_merge failed for channel {channel}!") + chn_init_cfg = zlgcan.ZCAN_CHANNEL_INIT_CONFIG() + chn_init_cfg.can_type = zlgcan.ZCAN_TYPE_CANFD if fd else zlgcan.ZCAN_TYPE_CAN + chn_init_cfg.config.canfd.mode = CanMode.NORMAL + # init can channel + self.chn_handle = self.zcanlib.InitCAN(self.device_handle, channel, chn_init_cfg) + if self.chn_handle == 0 or self.chn_handle is None: + raise CanOperationError(f"init CAN-Channel {channel} failed!") + # set filter + if filters: + for filter in filters: + self.__set_filter(filter) + self.__ack_filters() + self._is_filtered = True + else: + self._is_filtered = False + # start can channel + ret = self.zcanlib.StartCAN(self.chn_handle) + raise_can_operation_error(ret, f"start CAN-Channel {channel} failed!") + # thread event + self.event_recv_send_batch_zlg = threading.Event() + # start thread for recv + threading.Thread( + None, target=self.__recv_send_batch_zlg, args=(self.event_recv_send_batch_zlg,) + ).start() + super().__init__( + channel=channel, + **kwargs, + ) + + def __recv_send_batch_zlg(self, event): + while not event.is_set(): + # 发送 + from ctypes import sizeof, memset + + send_size = self.queue_send.qsize() + msgs: List[can.Message] = [] + for _ in range(send_size): + msgs.append(self.queue_send.get()) + len_msgs = len(msgs) + DataObj = (zlgcan.ZCANDataObj * len_msgs)() + memset(DataObj, 0, sizeof(DataObj)) + for i in range(len_msgs): + msg = msgs[i] + data = self.__trans_data2zlg(msg.data, msg.dlc) + DataObj[i].dataType = 1 # can报文 + DataObj[i].chnl = self.channel + DataObj[i].zcanfddata.flag.frameType = 1 if msg.is_fd else 0 # 0-can,1-canfd + DataObj[i].zcanfddata.flag.txDelay = 0 # 不添加延迟 + DataObj[i].zcanfddata.flag.txEchoRequest = 1 # 发送回显请求,0-不回显,1-回显 + if self.retry_when_send_fail: + DataObj[i].zcanfddata.flag.transmitType = TransmitType.NORMAL + else: + DataObj[i].zcanfddata.flag.transmitType = TransmitType.ONCE + DataObj[i].zcanfddata.frame.eff = ( + 1 if msg.is_extended_id else 0 + ) # 0-标准帧,1-扩展帧 + DataObj[i].zcanfddata.frame.rtr = ( + 1 if msg.is_remote_frame else 0 + ) # 0-数据帧,1-远程帧 + DataObj[i].zcanfddata.frame.can_id = msg.arbitration_id + DataObj[i].zcanfddata.frame.len = msg.dlc + if msg.is_fd: + DataObj[i].zcanfddata.frame.brs = ( + 1 if msg.bitrate_switch else 0 + ) # BRS 加速标志位:0不加速,1加速 + for j in range(DataObj[i].zcanfddata.frame.len): + DataObj[i].zcanfddata.frame.data[j] = data[j] + ret = self.zcanlib.TransmitData(self.device_handle, DataObj, len_msgs) + log.debug(f"Tranmit Num: {ret}.") + # 接收 + rcv_num = self.zcanlib.GetReceiveNum(self.chn_handle, zlgcan.ZCAN_TYPE_MERGE) + if rcv_num: + read_cnt = MAX_RCV_NUM if rcv_num >= MAX_RCV_NUM else rcv_num + msgs_zlg, read_cnt = self.zcanlib.ReceiveData(self.device_handle, read_cnt) + for i in range(read_cnt): + msg_zlg = msgs_zlg[i] + if msg_zlg.dataType != 1: # 筛选出can报文 + continue + msg = can.Message( + is_fd=bool(msg_zlg.zcanfddata.flag.frameType), + timestamp=float(msg_zlg.zcanfddata.timestamp) / 1000000, + is_extended_id=bool(msg_zlg.zcanfddata.frame.eff), + arbitration_id=msg_zlg.zcanfddata.frame.can_id, + data=[ + msg_zlg.zcanfddata.frame.data[j] + for j in range(msg_zlg.zcanfddata.frame.len) + ], + dlc=msg_zlg.zcanfddata.frame.len, + channel=self.channel, + is_remote_frame=bool(msg_zlg.zcanfddata.frame.rtr), + is_rx=False if msg_zlg.zcanfddata.flag.txEchoed else True, + ) + self.queue_recv.put(msg) + event.wait(timeout=0.005) + + def add_filters(self, filters: Optional[Dict]): + if filters is None: + return + for filter in filters: + self.__set_filter(filter) + self.__ack_filters() + self._is_filtered = True + + def __set_filter(self, filter: Dict): + # 0 standard/1 extended + is_extended = filter["is_extended"] + filter_start = filter["filter_start"] + filter_end = filter["filter_end"] + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, str(self.channel) + "/filter_mode", str(is_extended).encode("utf-8") + ) # 扩展帧滤波 + raise_can_operation_error(ret, f"Set CH{self.channel} filter_mode failed!") + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, + str(self.channel) + "/filter_start", + hex(filter_start).encode("utf-8"), + ) + raise_can_operation_error(ret, f"Set CH{self.channel} filter_start failed!") + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, str(self.channel) + "/filter_end", hex(filter_end).encode("utf-8") + ) + raise_can_operation_error(ret, f"Set CH{self.channel} filter_end failed!") + + def __ack_filters(self): + ret = self.zcanlib.ZCAN_SetValue( + self.device_handle, str(self.channel) + "/filter_ack", "0".encode("utf-8") + ) + raise_can_operation_error(ret, f"Set CH{self.channel} filter_ack failed!") + + def flush_tx_buffer(self): + raise CanInterfaceNotImplementedError("flush_tx_buffer is not implemented") + + def _recv_internal(self, timeout=None): + try: + msg = self.queue_recv.get(block=True, timeout=timeout) + except Empty: + return None, self._is_filtered + else: + return msg, self._is_filtered + + @staticmethod + def __trans_data2zlg(data, dlc: int): + if isinstance(data, int): + data = data.to_bytes(length=dlc, byteorder="big") + elif isinstance(data, bytearray) or isinstance(data, bytes): + data = data + else: + data = list(data) + return data + + def send(self, msg: can.Message, timeout=None): + self.queue_send.put(msg) + + def shutdown(self): + self.event_recv_send_batch_zlg.set() + # Close CAN + ret = self.zcanlib.ResetCAN(self.chn_handle) + if ret == 1: + log.debug("Close CAN successfully.") + # Close Device + ret = self.zcanlib.CloseDevice(self.device_handle) + if ret == 1: + log.debug("Close Device success! ") + + @staticmethod + def _detect_available_configs(): + zcanlib = zlgcan.ZCAN() + handle = zcanlib.OpenDevice(zlgcan.ZCAN_USBCANFD_MINI, 0, 0) + if handle == zlgcan.INVALID_DEVICE_HANDLE: + return [] + info: str = str(zcanlib.GetDeviceInf(handle)) + zcanlib.CloseDevice(handle) + param: Dict = { + line_.split(":", 1)[0]: line_.split(":", 1)[1] for line_ in info.splitlines() + } + dev_type_name: str = param.get("Hardware Type", None) + if "USBCAN" not in dev_type_name: + return [] + dev_type_var_name: str = f'ZCAN_{dev_type_name.replace("-", "_")}' + dev_type: int = eval(f"zlgcan.{dev_type_var_name}") + chn_num = int(param.get("CAN Number", None)) + fd = "CANFD" in dev_type_name.upper() + serial = param.get("Serial", None) + return [ + dict( + interface="zlg", + dev_type_name=dev_type_name, + dev_type=dev_type, + channel=i, + fd=fd, + serial=serial, + ) + for i in range(chn_num) + ] diff --git a/can/interfaces/zlg/zlgcan.py b/can/interfaces/zlg/zlgcan.py new file mode 100644 index 000000000..7f51937e5 --- /dev/null +++ b/can/interfaces/zlg/zlgcan.py @@ -0,0 +1,511 @@ +# -*- coding:utf-8 -*- +# zlgcan.py +# +# ~~~~~~~~~~~~ +# +# ZLGCAN API +# +# ~~~~~~~~~~~~ +# +# ------------------------------------------------------------------ +# Author : guochuangjian +# Last change: 21.02.2019 +# +# Language: Python 2.7, 3.6 +# ------------------------------------------------------------------ +# +from ctypes import * +import platform +import threading + +import time + +ZCAN_DEVICE_TYPE = c_uint + +INVALID_DEVICE_HANDLE = 0 +INVALID_CHANNEL_HANDLE = 0 + +''' + Device Type +''' +ZCAN_PCI5121 = ZCAN_DEVICE_TYPE(1) +ZCAN_PCI9810 = ZCAN_DEVICE_TYPE(2) +ZCAN_USBCAN1 = ZCAN_DEVICE_TYPE(3) +ZCAN_USBCAN2 = ZCAN_DEVICE_TYPE(4) +ZCAN_PCI9820 = ZCAN_DEVICE_TYPE(5) +ZCAN_CAN232 = ZCAN_DEVICE_TYPE(6) +ZCAN_PCI5110 = ZCAN_DEVICE_TYPE(7) +ZCAN_CANLITE = ZCAN_DEVICE_TYPE(8) +ZCAN_ISA9620 = ZCAN_DEVICE_TYPE(9) +ZCAN_ISA5420 = ZCAN_DEVICE_TYPE(10) +ZCAN_PC104CAN = ZCAN_DEVICE_TYPE(11) +ZCAN_CANETUDP = ZCAN_DEVICE_TYPE(12) +ZCAN_CANETE = ZCAN_DEVICE_TYPE(12) +ZCAN_DNP9810 = ZCAN_DEVICE_TYPE(13) +ZCAN_PCI9840 = ZCAN_DEVICE_TYPE(14) +ZCAN_PC104CAN2 = ZCAN_DEVICE_TYPE(15) +ZCAN_PCI9820I = ZCAN_DEVICE_TYPE(16) +ZCAN_CANETTCP = ZCAN_DEVICE_TYPE(17) +ZCAN_PCIE_9220 = ZCAN_DEVICE_TYPE(18) +ZCAN_PCI5010U = ZCAN_DEVICE_TYPE(19) +ZCAN_USBCAN_E_U = ZCAN_DEVICE_TYPE(20) +ZCAN_USBCAN_2E_U = ZCAN_DEVICE_TYPE(21) +ZCAN_PCI5020U = ZCAN_DEVICE_TYPE(22) +ZCAN_EG20T_CAN = ZCAN_DEVICE_TYPE(23) +ZCAN_PCIE9221 = ZCAN_DEVICE_TYPE(24) +ZCAN_WIFICAN_TCP = ZCAN_DEVICE_TYPE(25) +ZCAN_WIFICAN_UDP = ZCAN_DEVICE_TYPE(26) +ZCAN_PCIe9120 = ZCAN_DEVICE_TYPE(27) +ZCAN_PCIe9110 = ZCAN_DEVICE_TYPE(28) +ZCAN_PCIe9140 = ZCAN_DEVICE_TYPE(29) +ZCAN_USBCAN_4E_U = ZCAN_DEVICE_TYPE(31) +ZCAN_CANDTU_200UR = ZCAN_DEVICE_TYPE(32) +ZCAN_CANDTU_MINI = ZCAN_DEVICE_TYPE(33) +ZCAN_USBCAN_8E_U = ZCAN_DEVICE_TYPE(34) +ZCAN_CANREPLAY = ZCAN_DEVICE_TYPE(35) +ZCAN_CANDTU_NET = ZCAN_DEVICE_TYPE(36) +ZCAN_CANDTU_100UR = ZCAN_DEVICE_TYPE(37) +ZCAN_PCIE_CANFD_100U = ZCAN_DEVICE_TYPE(38) +ZCAN_PCIE_CANFD_200U = ZCAN_DEVICE_TYPE(39) +ZCAN_PCIE_CANFD_400U = ZCAN_DEVICE_TYPE(40) +ZCAN_USBCANFD_200U = ZCAN_DEVICE_TYPE(41) +ZCAN_USBCANFD_100U = ZCAN_DEVICE_TYPE(42) +ZCAN_USBCANFD_MINI = ZCAN_DEVICE_TYPE(43) +ZCAN_CANFDCOM_100IE = ZCAN_DEVICE_TYPE(44) +ZCAN_CANSCOPE = ZCAN_DEVICE_TYPE(45) +ZCAN_CLOUD = ZCAN_DEVICE_TYPE(46) +ZCAN_CANDTU_NET_400 = ZCAN_DEVICE_TYPE(47) +ZCAN_VIRTUAL_DEVICE = ZCAN_DEVICE_TYPE(99) + +''' + Interface return status +''' +ZCAN_STATUS_ERR = 0 +ZCAN_STATUS_OK = 1 +ZCAN_STATUS_ONLINE = 2 +ZCAN_STATUS_OFFLINE = 3 +ZCAN_STATUS_UNSUPPORTED = 4 + +''' + CAN type +''' +ZCAN_TYPE_CAN = c_uint(0) +ZCAN_TYPE_CANFD = c_uint(1) +ZCAN_TYPE_MERGE = c_uint(2) + + +def input_thread(): + input() + + +''' + Device information +''' + + +class ZCAN_DEVICE_INFO(Structure): + _fields_ = [("hw_Version", c_ushort), + ("fw_Version", c_ushort), + ("dr_Version", c_ushort), + ("in_Version", c_ushort), + ("irq_Num", c_ushort), + ("can_Num", c_ubyte), + ("str_Serial_Num", c_ubyte * 20), + ("str_hw_Type", c_ubyte * 40), + ("reserved", c_ushort * 4)] + + def __str__(self): + return "Hardware Version:%s\nFirmware Version:%s\nDriver Interface:%s\nInterface Interface:%s\nInterrupt Number:%d\nCAN Number:%d\nSerial:%s\nHardware Type:%s\n" % ( \ + self.hw_version, self.fw_version, self.dr_version, self.in_version, self.irq_num, self.can_num, self.serial, self.hw_type) + + def _version(self, version): + return ("V%02x.%02x" if version // 0xFF >= 9 else "V%d.%02x") % (version // 0xFF, version & 0xFF) + + @property + def hw_version(self): + return self._version(self.hw_Version) + + @property + def fw_version(self): + return self._version(self.fw_Version) + + @property + def dr_version(self): + return self._version(self.dr_Version) + + @property + def in_version(self): + return self._version(self.in_Version) + + @property + def irq_num(self): + return self.irq_Num + + @property + def can_num(self): + return self.can_Num + + @property + def serial(self): + serial = '' + for c in self.str_Serial_Num: + if c > 0: + serial += chr(c) + else: + break + return serial + + @property + def hw_type(self): + hw_type = '' + for c in self.str_hw_Type: + if c > 0: + hw_type += chr(c) + else: + break + return hw_type + + +class _ZCAN_CHANNEL_CAN_INIT_CONFIG(Structure): + _fields_ = [("acc_code", c_uint), + ("acc_mask", c_uint), + ("reserved", c_uint), + ("filter", c_ubyte), + ("timing0", c_ubyte), + ("timing1", c_ubyte), + ("mode", c_ubyte)] + + +class _ZCAN_CHANNEL_CANFD_INIT_CONFIG(Structure): + _fields_ = [("acc_code", c_uint), + ("acc_mask", c_uint), + ("abit_timing", c_uint), + ("dbit_timing", c_uint), + ("brp", c_uint), + ("filter", c_ubyte), + ("mode", c_ubyte), + ("pad", c_ushort), + ("reserved", c_uint)] + + +class _ZCAN_CHANNEL_INIT_CONFIG(Union): + _fields_ = [("can", _ZCAN_CHANNEL_CAN_INIT_CONFIG), ("canfd", _ZCAN_CHANNEL_CANFD_INIT_CONFIG)] + + +class ZCAN_CHANNEL_INIT_CONFIG(Structure): + _fields_ = [("can_type", c_uint), + ("config", _ZCAN_CHANNEL_INIT_CONFIG)] + + +class ZCAN_CHANNEL_ERR_INFO(Structure): + _fields_ = [("error_code", c_uint), + ("passive_ErrData", c_ubyte * 3), + ("arLost_ErrData", c_ubyte)] + + +class ZCAN_CHANNEL_STATUS(Structure): + _fields_ = [("errInterrupt", c_ubyte), + ("regMode", c_ubyte), + ("regStatus", c_ubyte), + ("regALCapture", c_ubyte), + ("regECCapture", c_ubyte), + ("regEWLimit", c_ubyte), + ("regRECounter", c_ubyte), + ("regTECounter", c_ubyte), + ("Reserved", c_ubyte)] + + +class ZCAN_CAN_FRAME(Structure): + _fields_ = [("can_id", c_uint, 29), + ("err", c_uint, 1), + ("rtr", c_uint, 1), + ("eff", c_uint, 1), + ("can_dlc", c_ubyte), + ("__pad", c_ubyte), + ("__res0", c_ubyte), + ("__res1", c_ubyte), + ("data", c_ubyte * 8)] + + +class ZCAN_CANFD_FRAME(Structure): + _fields_ = [("can_id", c_uint, 29), + ("err", c_uint, 1), + ("rtr", c_uint, 1), + ("eff", c_uint, 1), + ("len", c_ubyte), + ("brs", c_ubyte, 1), + ("esi", c_ubyte, 1), + ("__res", c_ubyte, 6), + ("__res0", c_ubyte), + ("__res1", c_ubyte), + ("data", c_ubyte * 64)] + + +class ZCANdataFlag(Structure): + _pack_ = 1 + _fields_ = [("frameType", c_uint, 2), + ("txDelay", c_uint, 2), + ("transmitType", c_uint, 4), + ("txEchoRequest", c_uint, 1), + ("txEchoed", c_uint, 1), + ("reserved", c_uint, 22), + ] + + +class ZCANFDData(Structure): ##表示 CAN/CANFD 帧结构,目前仅作为 ZCANDataObj 结构的成员使用 + _pack_ = 1 + _fields_ = [("timestamp", c_uint64), + ("flag", ZCANdataFlag), + ("extraData", c_ubyte * 4), + ("frame", ZCAN_CANFD_FRAME), ] + + +class ZCANDataObj(Structure): + _pack_ = 1 + _fields_ = [("dataType", c_ubyte), + ("chnl", c_ubyte), + ("flag", c_ushort), + ("extraData", c_ubyte * 4), + ("zcanfddata", ZCANFDData), ##88个字节 + ("reserved", c_ubyte * 4), + ] + + +class ZCAN_Transmit_Data(Structure): + _fields_ = [("frame", ZCAN_CAN_FRAME), ("transmit_type", c_uint)] + + +class ZCAN_Receive_Data(Structure): + _fields_ = [("frame", ZCAN_CAN_FRAME), ("timestamp", c_ulonglong)] + + +class ZCAN_TransmitFD_Data(Structure): + _fields_ = [("frame", ZCAN_CANFD_FRAME), ("transmit_type", c_uint)] + + +class ZCAN_ReceiveFD_Data(Structure): + _fields_ = [("frame", ZCAN_CANFD_FRAME), ("timestamp", c_ulonglong)] + + +class ZCAN_AUTO_TRANSMIT_OBJ(Structure): + _fields_ = [("enable", c_ushort), + ("index", c_ushort), + ("interval", c_uint), + ("obj", ZCAN_Transmit_Data)] + + +class ZCANFD_AUTO_TRANSMIT_OBJ(Structure): + _fields_ = [("enable", c_ushort), + ("index", c_ushort), + ("interval", c_uint), + ("obj", ZCAN_TransmitFD_Data)] + + +class ZCANFD_AUTO_TRANSMIT_OBJ_PARAM(Structure): # auto_send delay + _fields_ = [("indix", c_ushort), + ("type", c_ushort), + ("value", c_uint)] + + +class IProperty(Structure): + _fields_ = [("SetValue", c_void_p), + ("GetValue", c_void_p), + ("GetPropertys", c_void_p)] + + +class ZCAN(object): + def __init__(self): + if platform.system() == "Windows": + self.__dll = windll.LoadLibrary("./zlgcan.dll")) + else: + print("No support now!") + if self.__dll == None: + print("DLL couldn't be loaded!") + + def OpenDevice(self, device_type, device_index, reserved): + try: + return self.__dll.ZCAN_OpenDevice(device_type, device_index, reserved) + except: + print("Exception on OpenDevice!") + raise + + def CloseDevice(self, device_handle): + try: + return self.__dll.ZCAN_CloseDevice(device_handle) + except: + print("Exception on CloseDevice!") + raise + + def GetDeviceInf(self, device_handle): + try: + info = ZCAN_DEVICE_INFO() + ret = self.__dll.ZCAN_GetDeviceInf(device_handle, byref(info)) + return info if ret == ZCAN_STATUS_OK else None + except: + print("Exception on ZCAN_GetDeviceInf") + raise + + def DeviceOnLine(self, device_handle): + try: + return self.__dll.ZCAN_IsDeviceOnLine(device_handle) + except: + print("Exception on ZCAN_ZCAN_IsDeviceOnLine!") + raise + + def InitCAN(self, device_handle, can_index, init_config): + try: + return self.__dll.ZCAN_InitCAN(device_handle, can_index, byref(init_config)) + except: + print("Exception on ZCAN_InitCAN!") + raise + + def StartCAN(self, chn_handle): + try: + return self.__dll.ZCAN_StartCAN(chn_handle) + except: + print("Exception on ZCAN_StartCAN!") + raise + + def ResetCAN(self, chn_handle): + try: + return self.__dll.ZCAN_ResetCAN(chn_handle) + except: + print("Exception on ZCAN_ResetCAN!") + raise + + def ClearBuffer(self, chn_handle): + try: + return self.__dll.ZCAN_ClearBuffer(chn_handle) + except: + print("Exception on ZCAN_ClearBuffer!") + raise + + def ReadChannelErrInfo(self, chn_handle): + try: + ErrInfo = ZCAN_CHANNEL_ERR_INFO() + ret = self.__dll.ZCAN_ReadChannelErrInfo(chn_handle, byref(ErrInfo)) + return ErrInfo if ret == ZCAN_STATUS_OK else None + except: + print("Exception on ZCAN_ReadChannelErrInfo!") + raise + + def ReadChannelStatus(self, chn_handle): + try: + status = ZCAN_CHANNEL_STATUS() + ret = self.__dll.ZCAN_ReadChannelStatus(chn_handle, byref(status)) + return status if ret == ZCAN_STATUS_OK else None + except: + print("Exception on ZCAN_ReadChannelStatus!") + raise + + def GetReceiveNum(self, chn_handle, can_type=ZCAN_TYPE_CAN): + try: + return self.__dll.ZCAN_GetReceiveNum(chn_handle, can_type) + except: + print("Exception on ZCAN_GetReceiveNum!") + raise + + def Transmit(self, chn_handle, std_msg, len): + try: + return self.__dll.ZCAN_Transmit(chn_handle, byref(std_msg), len) + except: + print("Exception on ZCAN_Transmit!") + raise + + def Receive(self, chn_handle, rcv_num, wait_time=c_int(-1)): + try: + rcv_can_msgs = (ZCAN_Receive_Data * rcv_num)() + ret = self.__dll.ZCAN_Receive(chn_handle, byref(rcv_can_msgs), rcv_num, wait_time) + return rcv_can_msgs, ret + except: + print("Exception on ZCAN_Receive!") + raise + + def TransmitFD(self, chn_handle, fd_msg, len): + try: + return self.__dll.ZCAN_TransmitFD(chn_handle, byref(fd_msg), len) + except: + print("Exception on ZCAN_TransmitFD!") + raise + + def TransmitData(self, device_handle, msg, len): + try: + return self.__dll.ZCAN_TransmitData(device_handle, byref(msg), len) + except: + print("Exception on ZCAN_TransmitData!") + raise + + def ReceiveFD(self, chn_handle, rcv_num, wait_time=c_int(-1)): + try: + rcv_canfd_msgs = (ZCAN_ReceiveFD_Data * rcv_num)() + ret = self.__dll.ZCAN_ReceiveFD(chn_handle, byref(rcv_canfd_msgs), rcv_num, wait_time) + return rcv_canfd_msgs, ret + except: + print("Exception on ZCAN_ReceiveFD!") + raise + + def ReceiveData(self, device_handle, rcv_num, wait_time=c_int(-1)): + try: + rcv_can_data_msgs = (ZCANDataObj * rcv_num)() + ret = self.__dll.ZCAN_ReceiveData(device_handle, byref(rcv_can_data_msgs), rcv_num, wait_time) + return rcv_can_data_msgs, ret + except: + print("Exception on ZCAN_ReceiveData!") + raise + + def GetIProperty(self, device_handle): + try: + self.__dll.GetIProperty.restype = POINTER(IProperty) + return self.__dll.GetIProperty(device_handle) + except: + print("Exception on ZCAN_GetIProperty!") + raise + + def SetValue(self, iproperty, path, value): + try: + func = CFUNCTYPE(c_uint, c_char_p, c_char_p)(iproperty.contents.SetValue) + return func(c_char_p(path.encode("utf-8")), c_char_p(value.encode("utf-8"))) + except: + print("Exception on IProperty SetValue") + raise + + def SetValue1(self, iproperty, path, value): ############################# + try: + func = CFUNCTYPE(c_uint, c_char_p, c_char_p)(iproperty.contents.SetValue) + return func(c_char_p(path.encode("utf-8")), c_void_p(value)) + except: + print("Exception on IProperty SetValue") + raise + + def GetValue(self, iproperty, path): + try: + func = CFUNCTYPE(c_char_p, c_char_p)(iproperty.contents.GetValue) + return func(c_char_p(path.encode("utf-8"))) + except: + print("Exception on IProperty GetValue") + raise + + def ReleaseIProperty(self, iproperty): + try: + return self.__dll.ReleaseIProperty(iproperty) + except: + print("Exception on ZCAN_ReleaseIProperty!") + raise + + def ZCAN_SetValue(self, device_handle, path, value): + try: + self.__dll.ZCAN_SetValue.argtypes = [c_void_p, c_char_p, c_void_p] + return self.__dll.ZCAN_SetValue(device_handle, path.encode("utf-8"), value) + except: + print("Exception on ZCAN_SetValue") + raise + + def ZCAN_GetValue(self, device_handle, path): + try: + self.__dll.ZCAN_GetValue.argtypes = [c_void_p, c_char_p] + self.__dll.ZCAN_GetValue.restype = c_void_p + return self.__dll.ZCAN_GetValue(device_handle, path.encode("utf-8")) + except: + print("Exception on ZCAN_GetValue") + raise +