-
#!/usr/bin/env python3
from textual.app import App, ComposeResult
from textual.containers import Vertical, Center, Middle
from textual.widgets import LoadingIndicator, Header, TabbedContent, Label, Markdown, TabPane, ProgressBar, Label, Digits, Static
from textual.geometry import Size
from textual import work
from decoding import decoding_speed
import os
SPEEDOMETER = """
# Speedometer
"""
OTHERS = """
# This is a tab for future expansion.
"""
class SpeedometerDigits(Static):
def compose(self) -> ComposeResult:
yield Digits(id="speedometerSpeed")
def get_content_width(self, container: Size, viewport: Size) -> int:
return super().get_content_width(container, viewport)
def get_content_height(self, container: Size, viewport: Size, width: int) -> int:
return super().get_content_height(container, viewport, width)
class SpeedometerApp(App):
BINDINGS = [
("T", "toggle_dark_mode", "Toggle dark mode"),
("Q", "request_quit", "Quit")
]
CSS_PATH = "speedometer.tcss"
TITLE = "Speedometer UI"
def compose(self) -> ComposeResult:
"""The widgets that this app is composed of?"""
yield Header()
with TabbedContent(initial="speedometer"):
with TabPane("Speedometer", id="speedometer"): # First tab
with Middle():
with Center(id="markdown_center"):
yield Markdown(SPEEDOMETER, id="speedometerDisplay") # Tab content
with Center(id="speedometerSpeed_center"):
yield SpeedometerDigits()
yield Center(Label("km/h", id="speedometerSpeedUnit"))
with Center(id="progress_center"):
yield Label("Speed: ")
yield ProgressBar(total = 40, show_eta = False)
with TabPane("Others", id="others"): # Second tab
with Vertical():
with Center():
yield Markdown(OTHERS, id="othersDisplay")
yield LoadingIndicator(id="loading")
def on_mount(self) -> None:
# Running an infinite loop with interval of 100ms
self.DecodingStatus = self.set_interval(0.001, self.get_speed, pause=False)
@work(exclusive=True, thread=True)
async def get_speed(self) -> None:
speed = float(decoding_speed())
self.query_one(Digits).update(str(speed))
self.query_one(ProgressBar).update(progress = speed)
#this is an ACTION method.
#It's associated with the action called toggle_dark_mode.
def action_toggle_dark_mode(self):
"""toggle dark mode"""
self.dark = not self.dark
#this is an ACTION method.
#It's associated with the action called request_quit.
def action_request_quit(self):
"""Action to quit the UI application"""
self.DecodingStatus.pause()
os.system('sudo ifconfig can0 down')
self.app.exit()
if _name_ == "_main_":
app = SpeedometerApp()
app.run()` This is my textual code and it importing a function from another file where it reads values from a CAN bus, decodes it and return the decoded value. I succesfully got my UI to display and update the values concurrently. The problem i am facing is that even after exiting the UI using ctrl+c the code does not stop running. I have seen some similar discussions on this page already and tried their solutions, but i still cant fix this issue. As i am new to textual and coding, i am sure my code has a lot of mistakes, hoping to get some help here. This is my decoding code #!/usr/bin/env python3
import os
import can
byte2 = 0.0
byte3 = 0.0
speed = 0.0
os.system('./can_setup.sh')
can_interface = 'can0'
bus = can.interface.Bus(channel=can_interface, bustype='socketcan') #socketcan native
initialize_msg = can.Message(arbitration_id=0x123, data=[0xa0, 0xb1, 0xc2, 0xd3, 0xe4, 0xf5, 0xf6, 0xff], is_extended_id=False)
print(initialize_msg)
try:
#sending an initialization message
os.system('cansend can0 123#1100002233445566')
print("Message sent on {}".format(can_interface))
except can.CanError:
print('message was not sent')
def decoding_speed():
try:
for msg in bus:
if msg.arbitration_id == 0x10FEF1C8:
byte2 = hex(msg.data[1])
byte3 = hex(msg.data[2])
byte2 = byte2.replace("0x", "")
#using little endian format
xSpeed = byte3 + byte2 #speed in hexadecimal
dSpeed = int(xSpeed, 16) #speed in decimal
speed = dSpeed/256 #speed in km/h
print(speed)
return "{:.2f}".format(speed)
except KeyboardInterrupt:
bus.shutdown()
os.system('sudo ifconfig can0 down')
print('\n\rKeyboard interrupt received. Exiting.')
exit() I am really lost on this issue, would appreciate any kind of help. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 3 replies
-
That's a lot of (unformatted) code to wade through. But I'd recommend having another read through the docs on Thread Workers , specifically the warning about updating the UI directly and checking if a worker was cancelled. I'm not sure that will solve the issue here, but it would at least be a good starting point. |
Beta Was this translation helpful? Give feedback.
-
Only the main thread will receive a You can poll Note that you should avoid updating the UI from a thread. Be sure to read the guide on workers which covers this. |
Beta Was this translation helpful? Give feedback.
-
Thank you guys for your suggestions and advice. I have successfully came up with a way to fix my issue. I am going to post the solution here for anyone who might be facing the same issue as me. Firstly i read through the doc on workers and followed the example and avoided updating my UI directly from a thread by modifying my code as such: def on_mount(self) -> None:
# Running an infinite loop with interval of 100ms
self.DecodingStatus = self.set_interval(0.001, self.get_speed, pause=False)
@work(exclusive=True, thread=True)
async def get_speed(self) -> None:
speed = float(decoding_speed())
await self.call_from_thread(self.update_ui, speed)
def update_ui(self, speed):
speed_widget = self.query_one(Digits)
progressBar = self.query_one(ProgressBar)
speed_widget.update(str(speed))
progressBar.update(progress=speed) And i realized that the for loop in my decoding code was blocking the moment the CAN bus stopped sending over values causing my application to hang even after exiting the UI. To fix this, i modified my code as such: def decoding_speed():
try:
while True:
msg = bus.recv(0.01)
if msg is None:
break
if msg.arbitration_id == 0x10FEF1C8:
byte2 = hex(msg.data[1])
byte3 = hex(msg.data[2])
byte2 = byte2.replace("0x", "")
#using little endian format
xSpeed = byte3 + byte2 #speed in hexadecimal
dSpeed = int(xSpeed, 16) #speed in decimal
speed = dSpeed/256 #speed in km/h
return "{:.2f}".format(speed) This change made it such that there is a timeout of 0.01 second and if no values were sent over break out of the loop. Hope this provided some help to those facing similar issues. |
Beta Was this translation helpful? Give feedback.
Thank you guys for your suggestions and advice. I have successfully came up with a way to fix my issue. I am going to post the solution here for anyone who might be facing the same issue as me.
Firstly i read through the doc on workers and followed the example and avoided updating my UI directly from a thread by modifying my code as such: