diff --git a/eesdr_owrx_connector/connector.py b/eesdr_owrx_connector/connector.py index 0fe3aea..0326957 100644 --- a/eesdr_owrx_connector/connector.py +++ b/eesdr_owrx_connector/connector.py @@ -4,7 +4,7 @@ import sys from eesdr_tci import tci -from eesdr_tci.Listener import Listener +from eesdr_tci.listener import Listener from eesdr_tci.tci import TciCommandSendAction, TciStreamType def eprint(*args, **kwargs): @@ -56,7 +56,7 @@ async def handle_control(self, reader, writer): self.keystore[k] = iv if self.verbose: print('New values', self.keystore, flush=True) - await self.ks_handlers[k]() + self.ks_handlers[k]() except ValueError: continue except Exception as e: @@ -101,14 +101,14 @@ def __init__(self): self.iq_packets = None self.shutdown = False - async def update_rate(self): - await self.tci_listener.send(tci.COMMANDS['IQ_SAMPLERATE'].prepare_string( + def update_rate(self): + self.tci_listener.send_nowait(tci.COMMANDS['IQ_SAMPLERATE'].prepare_string( TciCommandSendAction.WRITE, params=[self.keystore['samp_rate']] )) - async def update_center(self): - await self.tci_listener.send(tci.COMMANDS['DDS'].prepare_string( + def update_center(self): + self.tci_listener.send_nowait(tci.COMMANDS['DDS'].prepare_string( TciCommandSendAction.WRITE, rx=self.args.receiver, params=[self.keystore['center_freq']] @@ -118,7 +118,7 @@ async def tci_check_response(self, command, rx, subrx, param): del subrx if command == 'IQ_SAMPLERATE' and param != self.keystore['samp_rate']: eprint('IQ_SAMPLERATE received that does not match desired command') - if command == 'DDS' and int(rx) == self.args.receiver and param != self.keystore['center_freq']: + if command == 'DDS' and rx == self.args.receiver and param != self.keystore['center_freq']: eprint('DDS received that does not match desired center frequency') async def tci_receive_data(self, packet): @@ -127,6 +127,7 @@ async def tci_receive_data(self, packet): async def tci_interface(self): print(f'Opening TCI connection to {self.args.device}', flush=True) self.tci_listener = Listener(f'ws://{self.args.device}') + await self.tci_listener.start() await self.tci_listener.ready() @@ -140,40 +141,49 @@ async def tci_interface(self): self.tci_ready.set() while not self.shutdown: + done, _ = await asyncio.wait([asyncio.create_task(self.demand_iq.wait()), + self.tci_listener._launch_task], return_when=asyncio.FIRST_COMPLETED) + if self.tci_listener._launch_task in done: + print(f'TCI client closed prematurely: {self.tci_listener._launch_task.exception()}', flush=True) + return try: - await self.demand_iq.wait() + [task.result() for task in done] except asyncio.exceptions.CancelledError: return + if self.args.verbose: print('IQ demand start', flush=True) if self.args.startstop: - await self.tci_listener.send(tci.COMMANDS['START'].prepare_string( + self.tci_listener.send_nowait(tci.COMMANDS['START'].prepare_string( TciCommandSendAction.WRITE )) - await self.tci_listener.send(tci.COMMANDS['RX_ENABLE'].prepare_string( + self.tci_listener.send_nowait(tci.COMMANDS['RX_ENABLE'].prepare_string( TciCommandSendAction.WRITE, rx=self.args.receiver, params=[True] )) - await self.update_rate() - await self.update_center() - await self.tci_listener.send(tci.COMMANDS['IQ_START'].prepare_string( + self.update_rate() + self.update_center() + self.tci_listener.send_nowait(tci.COMMANDS['IQ_START'].prepare_string( TciCommandSendAction.WRITE, rx=self.args.receiver )) while self.demand_iq.is_set(): try: await asyncio.sleep(0.05) + if self.tci_listener._launch_task.done(): + print(f'TCI client closed prematurely: {self.tci_listener._launch_task.exception()}', flush=True) + return except asyncio.exceptions.CancelledError: break if self.args.verbose: print('IQ demand stop', flush=True) - await self.tci_listener.send(tci.COMMANDS['IQ_STOP'].prepare_string( + self.tci_listener.send_nowait(tci.COMMANDS['IQ_STOP'].prepare_string( TciCommandSendAction.WRITE, rx=self.args.receiver )) if self.args.startstop: - await self.tci_listener.send(tci.COMMANDS['STOP'].prepare_string( + self.tci_listener.send_nowait(tci.COMMANDS['STOP'].prepare_string( TciCommandSendAction.WRITE )) while self.iq_packets.qsize(): @@ -245,16 +255,19 @@ async def start(self): signal.signal(signal.SIGINT, self.cleanup) self.tci_ready = asyncio.Event() - self.tasks += [asyncio.create_task(self.tci_interface())] - await self.tci_ready.wait() + ready_task = asyncio.create_task(self.tci_ready.wait()) + tci_task = asyncio.create_task(self.tci_interface()) + done, _ = await asyncio.wait([ready_task, tci_task], return_when = asyncio.FIRST_COMPLETED) + if tci_task in done: + print(f'Error during TCI client start: {tci_task.exception()}', flush=True) + return + self.tasks += [tci_task] self.tasks += [ControlServer(self.args.control, self.args.verbose, self.keystore, self.ks_handlers).start()] self.tasks += [IqServer(self.args.port, self.args.verbose, self.demand_iq, self.iq_packets).start()] - for task in self.tasks: - try: - await task - except asyncio.exceptions.CancelledError: - continue + await asyncio.wait(self.tasks, return_when=asyncio.FIRST_COMPLETED) + self.cleanup() + await asyncio.wait(self.tasks) if self.args.verbose: print('All tasks complete', flush=True) diff --git a/pyproject.toml b/pyproject.toml index 7bae36d..f3f6ad3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,9 +4,9 @@ build-backend = "setuptools.build_meta" [project] name = "eesdr-owrx-connector" -version = "0.0.1" +version = "0.0.2" dependencies = [ - "eesdr-tci" + "eesdr-tci >= 0.1" ] authors = [ { name="Matthew R. McDougal", email="ka0s@arrl.net"} diff --git a/requirements.txt b/requirements.txt index 3143e8e..a45531b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -eesdr_tci # TCI interface +eesdr_tci >= 0.1 # TCI interface