Skip to content

Commit

Permalink
Improved error handling during startup
Browse files Browse the repository at this point in the history
  • Loading branch information
ars-ka0s committed Mar 16, 2024
1 parent 3ff178a commit c0682eb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
57 changes: 35 additions & 22 deletions eesdr_owrx_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys

from eesdr_tci import tci
from eesdr_tci.Listener import Listener
from eesdr_tci.listener import Listener
from eesdr_tci.tci import TciCommandSendAction, TciStreamType

def eprint(*args, **kwargs):
Expand Down Expand Up @@ -56,7 +56,7 @@ async def handle_control(self, reader, writer):
self.keystore[k] = iv
if self.verbose:
print('New values', self.keystore, flush=True)
await self.ks_handlers[k]()
self.ks_handlers[k]()
except ValueError:
continue
except Exception as e:
Expand Down Expand Up @@ -101,14 +101,14 @@ def __init__(self):
self.iq_packets = None
self.shutdown = False

async def update_rate(self):
await self.tci_listener.send(tci.COMMANDS['IQ_SAMPLERATE'].prepare_string(
def update_rate(self):
self.tci_listener.send_nowait(tci.COMMANDS['IQ_SAMPLERATE'].prepare_string(
TciCommandSendAction.WRITE,
params=[self.keystore['samp_rate']]
))

async def update_center(self):
await self.tci_listener.send(tci.COMMANDS['DDS'].prepare_string(
def update_center(self):
self.tci_listener.send_nowait(tci.COMMANDS['DDS'].prepare_string(
TciCommandSendAction.WRITE,
rx=self.args.receiver,
params=[self.keystore['center_freq']]
Expand All @@ -118,7 +118,7 @@ async def tci_check_response(self, command, rx, subrx, param):
del subrx
if command == 'IQ_SAMPLERATE' and param != self.keystore['samp_rate']:
eprint('IQ_SAMPLERATE received that does not match desired command')
if command == 'DDS' and int(rx) == self.args.receiver and param != self.keystore['center_freq']:
if command == 'DDS' and rx == self.args.receiver and param != self.keystore['center_freq']:
eprint('DDS received that does not match desired center frequency')

async def tci_receive_data(self, packet):
Expand All @@ -127,6 +127,7 @@ async def tci_receive_data(self, packet):
async def tci_interface(self):
print(f'Opening TCI connection to {self.args.device}', flush=True)
self.tci_listener = Listener(f'ws://{self.args.device}')

await self.tci_listener.start()
await self.tci_listener.ready()

Expand All @@ -140,40 +141,49 @@ async def tci_interface(self):
self.tci_ready.set()

while not self.shutdown:
done, _ = await asyncio.wait([asyncio.create_task(self.demand_iq.wait()),
self.tci_listener._launch_task], return_when=asyncio.FIRST_COMPLETED)
if self.tci_listener._launch_task in done:
print(f'TCI client closed prematurely: {self.tci_listener._launch_task.exception()}', flush=True)
return
try:
await self.demand_iq.wait()
[task.result() for task in done]
except asyncio.exceptions.CancelledError:
return

if self.args.verbose:
print('IQ demand start', flush=True)
if self.args.startstop:
await self.tci_listener.send(tci.COMMANDS['START'].prepare_string(
self.tci_listener.send_nowait(tci.COMMANDS['START'].prepare_string(
TciCommandSendAction.WRITE
))
await self.tci_listener.send(tci.COMMANDS['RX_ENABLE'].prepare_string(
self.tci_listener.send_nowait(tci.COMMANDS['RX_ENABLE'].prepare_string(
TciCommandSendAction.WRITE,
rx=self.args.receiver,
params=[True]
))
await self.update_rate()
await self.update_center()
await self.tci_listener.send(tci.COMMANDS['IQ_START'].prepare_string(
self.update_rate()
self.update_center()
self.tci_listener.send_nowait(tci.COMMANDS['IQ_START'].prepare_string(
TciCommandSendAction.WRITE,
rx=self.args.receiver
))
while self.demand_iq.is_set():
try:
await asyncio.sleep(0.05)
if self.tci_listener._launch_task.done():
print(f'TCI client closed prematurely: {self.tci_listener._launch_task.exception()}', flush=True)
return
except asyncio.exceptions.CancelledError:
break
if self.args.verbose:
print('IQ demand stop', flush=True)
await self.tci_listener.send(tci.COMMANDS['IQ_STOP'].prepare_string(
self.tci_listener.send_nowait(tci.COMMANDS['IQ_STOP'].prepare_string(
TciCommandSendAction.WRITE,
rx=self.args.receiver
))
if self.args.startstop:
await self.tci_listener.send(tci.COMMANDS['STOP'].prepare_string(
self.tci_listener.send_nowait(tci.COMMANDS['STOP'].prepare_string(
TciCommandSendAction.WRITE
))
while self.iq_packets.qsize():
Expand Down Expand Up @@ -245,16 +255,19 @@ async def start(self):
signal.signal(signal.SIGINT, self.cleanup)

self.tci_ready = asyncio.Event()
self.tasks += [asyncio.create_task(self.tci_interface())]
await self.tci_ready.wait()
ready_task = asyncio.create_task(self.tci_ready.wait())
tci_task = asyncio.create_task(self.tci_interface())
done, _ = await asyncio.wait([ready_task, tci_task], return_when = asyncio.FIRST_COMPLETED)
if tci_task in done:
print(f'Error during TCI client start: {tci_task.exception()}', flush=True)
return
self.tasks += [tci_task]
self.tasks += [ControlServer(self.args.control, self.args.verbose, self.keystore, self.ks_handlers).start()]
self.tasks += [IqServer(self.args.port, self.args.verbose, self.demand_iq, self.iq_packets).start()]

for task in self.tasks:
try:
await task
except asyncio.exceptions.CancelledError:
continue
await asyncio.wait(self.tasks, return_when=asyncio.FIRST_COMPLETED)
self.cleanup()
await asyncio.wait(self.tasks)

if self.args.verbose:
print('All tasks complete', flush=True)
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ build-backend = "setuptools.build_meta"

[project]
name = "eesdr-owrx-connector"
version = "0.0.1"
version = "0.0.2"
dependencies = [
"eesdr-tci"
"eesdr-tci >= 0.1"
]
authors = [
{ name="Matthew R. McDougal", email="[email protected]"}
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
eesdr_tci # TCI interface
eesdr_tci >= 0.1 # TCI interface

0 comments on commit c0682eb

Please sign in to comment.