-
Notifications
You must be signed in to change notification settings - Fork 2
/
__init__.py
78 lines (62 loc) · 2.89 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#/usr/bin/env python
import socket
import logging
import netifaces
import pndcp
import pniodce
import threading
import struct
import uuid
logging.basicConfig(level=logging.INFO)
IFACE_UUID_PNIO_DEV = uuid.UUID('{dea00001-6c97-11d1-8271-00a02442df7d}')
IFACE_UUID_PNIO_CTRL = uuid.UUID('{dea00002-6c97-11d1-8271-00a02442df7d}')
OBJECT_UUID_PYTHON_IO = uuid.UUID('{dea00000-6c97-11d1-8271-00640001ffff}')
class Profinet():
def __init__(self, interface):
# Get information about the given interface
self._address = netifaces.ifaddresses(interface)[netifaces.AF_INET][0]['addr']
self._netmask = netifaces.ifaddresses(interface)[netifaces.AF_INET][0]['netmask']
if len(netifaces.gateways()['default']) > 0 and netifaces.gateways()['default'][netifaces.AF_INET]:
self._gateway = netifaces.gateways()['default'][netifaces.AF_INET][0]
else:
self._gateway = self._address
self._mac = netifaces.ifaddresses(interface)[netifaces.AF_LINK][0]['addr']
# Listen for ProfiNET packets
logging.info('Listening for PROFINET packets on ' + interface)
self._pn_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x8892))
self._pn_socket.bind((interface, 0x8892))
# Create a DCP instance
self._dcp = pndcp.PnDcp(self._address, self._netmask, self._gateway)
self._pniodce = pniodce.PnIoDce(self._address)
def start(self):
threading.Thread(target = self._listen_pn).start()
threading.Thread(target = self._pniodce.listen).start()
def _listen_pn(self):
while True:
(packet, address) = self._pn_socket.recvfrom(4096)
logging.debug("incoming PN packet: " + " ".join(hex(c) for c in packet))
response = 0
s = struct.Struct('>6s6sHH')
(dst, src, type, frameId) = s.unpack(packet[0:16])
if frameId >= 0xfefc and frameId <= 0xfeff:
logging.debug('Dispatching packet to DCP')
response = self._dcp.process_packet(frameId, packet[16:])
elif frameId >= 0x8000 and frameId <= 0xBBFF:
loggin.debug('Dispatching packet to PNIO')
response = 0
else:
logging.info('Received unknown FrameID: ' + hex(frameId))
response = 0
if response:
s = struct.Struct('>6s6sHHH')
header = (
src, bytes([int(byte, 16) for byte in self._mac.split(':')]), # dst and src mac
0x8100, 0x0000, # 802.1q tag, prio 0
0x8892 # Type: PROFINET
)
response = s.pack(*header) + response
logging.debug("outgoing packet: " + " ".join(hex(c) for c in response))
self._pn_socket.send(response)
if __name__ == "__main__":
server = Profinet("vboxnet0")
server.start()