forked from mattdy/flightpi
-
Notifications
You must be signed in to change notification settings - Fork 1
/
ArduinoThread.py
78 lines (59 loc) · 2.28 KB
/
ArduinoThread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""
ArdunioThread
Accepts details of a flight to display, and outputs them to an Arduino for LED display
Matt Dyson
07/06/18
Part of FlightPi - http://github.com/mattdy/flightpi
"""
import threading
import logging
import time
import serial
from FlightColours import FlightColours
log = logging.getLogger('root')
class ArduinoThread(threading.Thread):
def __init__(self, address="/dev/ttyUSB0"):
threading.Thread.__init__(self)
self.stopping = False
self.display = None
self.address = address
self.device = None
def processFlight(self, flight):
""" Take the given flight details, translate it into what we want to show on the LCD """
if(flight is None):
self.device.write(b"C\n")
self.device.flush()
self.display = None
else:
if(self.display is not None and self.display['callsign'] != flight['callsign']):
# We're getting a new flight, so clear the current display
self.device.write(b"C\n")
self.device.flush()
self.display = flight
climb = "L"
if(flight['verticalRate'] is not None):
if(int(flight['verticalRate']))>250: climb = "C"
if(int(flight['verticalRate']))<-250: climb = "D"
if(flight['track'] is not None):
track = ("D%s\n" % (flight['track'])).encode()
self.device.write(track)
if(flight['altitude'] is not None):
alt = ("A%s%s\n" % (climb,flight['altitude'])).encode()
self.device.write(alt)
if flight['callsign'][:3] in FlightColours.col:
csign = ("L%s\n" % (FlightColours.col[flight['callsign'][:3]])).encode()
self.device.write(csign)
else:
# Blank the LEDs if we don't have a specified livery
self.device.write(b"LNNN\n")
self.device.flush()
def stop(self):
self.stopping = True
def run(self):
log.info("ArduinoThread starting")
self.device = serial.Serial(self.address, 9600)
while not self.stopping:
time.sleep(1)
self.device.write(b"C\n")
self.device.close()
log.info("ArduinoThread shut down")