Skip to content

Commit

Permalink
improve loop shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
briangu committed Sep 25, 2023
1 parent aeb4fe1 commit 3b107fd
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions scripts/kgpy
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def create_sys_cmd_functions():
for x in filter(lambda n: n.startswith("sys_cmd_"), dir(m)):
fn = getattr(m,x)
name = _get_name(fn.__doc__)
registry[name] = fn
registry[name] = fn

return registry

Expand Down Expand Up @@ -236,39 +236,46 @@ def run_file(klong_loop, klong, fname, verbose=False):
run_in_klong_loop(klong_loop, klong, f.read(), verbose=verbose)


def start_io_loop(ioloop):
asyncio.set_event_loop(ioloop)
ioloop.run_forever()
klong_stop_event = asyncio.Event()
io_stop_event = asyncio.Event()


def start_klong_loop(klong_loop):
asyncio.set_event_loop(klong_loop)
klong_loop.run_forever()
def start_loop(loop, stop_event):
asyncio.set_event_loop(loop)
loop.run_until_complete(stop_event.wait())


def setup_async_loop(start_loop_func, debug: bool = False) -> asyncio.AbstractEventLoop:
def setup_async_loop(start_loop_func, debug=False, slow_callback_duration=2) -> asyncio.AbstractEventLoop:
loop = asyncio.new_event_loop()
loop.slow_callback_duration = slow_callback_duration
if debug:
loop.set_debug(True)
thread = threading.Thread(target=start_loop_func, args=(loop,), daemon=True)
thread.start()
return loop
return loop, thread


def cleanup_async_loop(loop: Optional[asyncio.AbstractEventLoop] = None, debug: bool = False) -> None:
if loop is None:
loop = asyncio.get_event_loop()
def cleanup_async_loop(loop: asyncio.AbstractEventLoop, loop_thread, stop_event, debug=True) -> None:

while len(asyncio.all_tasks(loop=loop)) > 0:
if debug:
print(asyncio.all_tasks(loop=loop))
time.sleep(0.1)
if loop.is_closed():
return

for task in asyncio.all_tasks(loop=loop):
loop.call_soon_threadsafe(task.cancel)
loop.call_soon_threadsafe(stop_event.set)
loop_thread.join()

loop.call_soon_threadsafe(loop.stop)
loop.close()
pending_tasks = asyncio.all_tasks(loop=loop)
if len(pending_tasks) > 0:
print(f"cancelling {len(pending_tasks)} pending tasks...")
for task in pending_tasks:
loop.call_soon_threadsafe(task.cancel)
# wait for all tasks to be cancelled but we can't use run_until_complete because the loop is already running
while len(asyncio.all_tasks(loop=loop)) > 0:
time.sleep(0)

loop.stop()

if not loop.is_closed():
loop.close()


if __name__ == "__main__":
Expand Down Expand Up @@ -300,9 +307,8 @@ if __name__ == "__main__":

klong = KlongInterpreter()

io_loop = loop = setup_async_loop(start_io_loop, debug=True)
klong_loop = loop = setup_async_loop(start_klong_loop, debug=True)
klong_loop.slow_callback_duration = 2 # Set threshold to 2 seconds
io_loop, io_loop_thread = setup_async_loop(lambda loop: start_loop(loop, io_stop_event), debug=True, slow_callback_duration=2)
klong_loop, klong_loop_thread = setup_async_loop(lambda loop: start_loop(loop, klong_stop_event), debug=True, slow_callback_duration=2)

console_loop = asyncio.new_event_loop()
asyncio.set_event_loop(console_loop)
Expand All @@ -328,7 +334,7 @@ if __name__ == "__main__":
continue
print(x)
klong(x)

if args.filename:
if args.verbose:
print(f"Running: {args.filename}")
Expand All @@ -350,5 +356,5 @@ if __name__ == "__main__":

shutdown_event.trigger()

cleanup_async_loop(loop=klong_loop, debug=args.debug)
cleanup_async_loop(loop=io_loop, debug=args.debug)
cleanup_async_loop(io_loop, io_loop_thread, stop_event=io_stop_event, debug=args.debug)
cleanup_async_loop(klong_loop, klong_loop_thread, stop_event=klong_stop_event, debug=args.debug)

0 comments on commit 3b107fd

Please sign in to comment.