diff --git a/scripts/kgpy b/scripts/kgpy index 68aab51..fc2d93b 100644 --- a/scripts/kgpy +++ b/scripts/kgpy @@ -154,7 +154,7 @@ def create_sys_cmd_functions(): for x in filter(lambda n: n.startswith("sys_cmd_"), dir(m)): fn = getattr(m,x) name = _get_name(fn.__doc__) - registry[name] = fn + registry[name] = fn return registry @@ -236,39 +236,46 @@ def run_file(klong_loop, klong, fname, verbose=False): run_in_klong_loop(klong_loop, klong, f.read(), verbose=verbose) -def start_io_loop(ioloop): - asyncio.set_event_loop(ioloop) - ioloop.run_forever() +klong_stop_event = asyncio.Event() +io_stop_event = asyncio.Event() -def start_klong_loop(klong_loop): - asyncio.set_event_loop(klong_loop) - klong_loop.run_forever() +def start_loop(loop, stop_event): + asyncio.set_event_loop(loop) + loop.run_until_complete(stop_event.wait()) -def setup_async_loop(start_loop_func, debug: bool = False) -> asyncio.AbstractEventLoop: +def setup_async_loop(start_loop_func, debug=False, slow_callback_duration=2) -> asyncio.AbstractEventLoop: loop = asyncio.new_event_loop() + loop.slow_callback_duration = slow_callback_duration if debug: loop.set_debug(True) thread = threading.Thread(target=start_loop_func, args=(loop,), daemon=True) thread.start() - return loop + return loop, thread -def cleanup_async_loop(loop: Optional[asyncio.AbstractEventLoop] = None, debug: bool = False) -> None: - if loop is None: - loop = asyncio.get_event_loop() +def cleanup_async_loop(loop: asyncio.AbstractEventLoop, loop_thread, stop_event, debug=True) -> None: - while len(asyncio.all_tasks(loop=loop)) > 0: - if debug: - print(asyncio.all_tasks(loop=loop)) - time.sleep(0.1) + if loop.is_closed(): + return - for task in asyncio.all_tasks(loop=loop): - loop.call_soon_threadsafe(task.cancel) + loop.call_soon_threadsafe(stop_event.set) + loop_thread.join() - loop.call_soon_threadsafe(loop.stop) - loop.close() + pending_tasks = asyncio.all_tasks(loop=loop) + if len(pending_tasks) > 0: + print(f"cancelling {len(pending_tasks)} pending tasks...") + for task in pending_tasks: + loop.call_soon_threadsafe(task.cancel) + # wait for all tasks to be cancelled but we can't use run_until_complete because the loop is already running + while len(asyncio.all_tasks(loop=loop)) > 0: + time.sleep(0) + + loop.stop() + + if not loop.is_closed(): + loop.close() if __name__ == "__main__": @@ -300,9 +307,8 @@ if __name__ == "__main__": klong = KlongInterpreter() - io_loop = loop = setup_async_loop(start_io_loop, debug=True) - klong_loop = loop = setup_async_loop(start_klong_loop, debug=True) - klong_loop.slow_callback_duration = 2 # Set threshold to 2 seconds + io_loop, io_loop_thread = setup_async_loop(lambda loop: start_loop(loop, io_stop_event), debug=True, slow_callback_duration=2) + klong_loop, klong_loop_thread = setup_async_loop(lambda loop: start_loop(loop, klong_stop_event), debug=True, slow_callback_duration=2) console_loop = asyncio.new_event_loop() asyncio.set_event_loop(console_loop) @@ -328,7 +334,7 @@ if __name__ == "__main__": continue print(x) klong(x) - + if args.filename: if args.verbose: print(f"Running: {args.filename}") @@ -350,5 +356,5 @@ if __name__ == "__main__": shutdown_event.trigger() - cleanup_async_loop(loop=klong_loop, debug=args.debug) - cleanup_async_loop(loop=io_loop, debug=args.debug) + cleanup_async_loop(io_loop, io_loop_thread, stop_event=io_stop_event, debug=args.debug) + cleanup_async_loop(klong_loop, klong_loop_thread, stop_event=klong_stop_event, debug=args.debug)