-
Notifications
You must be signed in to change notification settings - Fork 3
/
main_threading.py
135 lines (114 loc) · 4.21 KB
/
main_threading.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
import odrive
from odrive.enums import *
from fibre.protocol import ChannelBrokenException
from usb.core import USBError
from UDPComms import Subscriber, Publisher, timeout
import time
import os
import threading
import time
if os.geteuid() != 0:
exit("You need to have root privileges to run this script.\nPlease try again, this time using 'sudo'. Exiting.")
cmd = Subscriber(8830, timeout = 0.3)
telemetry = Publisher(8810)
# motor 0 is right
# motor 1 is left
odrives = [ ['middle' , "207B37883548", [1, 1]],
['front', "207D37A33548", [-1, 1]],
['back' , "207B37813548", [-1, 1]]]
def clear_errors(odv):
if odv.axis0.error:
print("axis 0", odv.axis0.error)
odv.axis0.error = 0
if odv.axis1.error:
print("axis 1", odv.axis1.error)
odv.axis1.error = 0
if odv.axis0.motor.error:
print("motor 0", odv.axis0.motor.error)
odv.axis0.motor.error = 0
if odv.axis1.motor.error:
print("motor 1", odv.axis1.motor.error)
odv.axis1.motor.error = 0
if odv.axis0.encoder.error:
print("encoder 0", odv.axis0.encoder.error)
odv.axis0.encoder.error = 0
if odv.axis1.encoder.error:
print("encoder 1", odv.axis1.encoder.error)
odv.axis1.encoder.error = 0
def send_state(odv, state):
try:
odv.axis0.requested_state = state
except:
pass
try:
odv.axis1.requested_state = state
except:
pass
def get_data(odv):
return [odv.vbus_voltage,
odv.axis0.motor.current_control.Iq_measured,
odv.axis1.motor.current_control.Iq_measured]
def atomic_print(s):
print(str(s)+'\n',end='')
def run_odrive(name, serial_number, d):
atomic_print("looking for "+name+" odrive")
odv = odrive.find_any(serial_number=serial_number)
atomic_print("found " +name+ " odrive")
send_state(odv, AXIS_STATE_IDLE)
try:
while True:
# read from USB Block
lostConnection = True
try:
UDPLock.acquire()
msg = cmd.get()
atomic_print(msg)
lostConnection = False
except timeout:
lostConnection = True
msg = {'t':0, 'f':0}
finally:
UDPLock.release()
# Write to Odrives block
try:
clear_errors(odv)
if lostConnection:
# if msg['f'] == 0 and msg['t'] == 0:
atomic_print("Timeout sending safe")
send_state(odv, AXIS_STATE_IDLE)
else:
send_state(odv, AXIS_STATE_CLOSED_LOOP_CONTROL)
# axis 0 (right) is always same sign
# axis 1 (left) is always opposite sign
odv.axis0.controller.vel_setpoint = d[0]*( \
msg['f'] + msg['t'])
odv.axis1.controller.vel_setpoint = d[1]*( \
msg['f'] - msg['t'])
odv.axis0.watchdog_feed()
odv.axis1.watchdog_feed()
tele[name] = get_data(odv)
except (USBError, ChannelBrokenException) as e:
atomic_print("Lost contact with "+name+" odrive!")
odv = odrive.find_any(serial_number=serial_number)
atomic_print("refound " + name + " odrive")
finally:
atomic_print("Exiting and Sending safe command")
send_state(odv, AXIS_STATE_IDLE)
odv.axis0.controller.vel_setpoint = 0
odv.axis1.controller.vel_setpoint = 0
if __name__ == "__main__":
UDPLock = threading.Lock()
threads = []
tele = {}
for odv in odrives:
atomic_print("starting "+str(odv))
thread = threading.Thread(target=run_odrive, args=odv, daemon=True)
thread.start()
threads.append(thread)
# if any thread shuts down (which it shouldn't) we exit the program
# which exits all other threads to
while all(t.is_alive() for t in threads):
time.sleep(1)
atomic_print(str(list( o[0] + str(t.is_alive()) for t,o in zip(threads,odrives))))
telemetry.send(tele)