Skip to content

Commit

Permalink
refactor: keep reference to cull task for memory leak debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
maartenbreddels committed Dec 8, 2023
1 parent 3a5c9f3 commit 76b4e26
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions solara/server/kernel_context.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import concurrent.futures
import contextlib
import dataclasses
import enum
Expand Down Expand Up @@ -63,6 +64,7 @@ class VirtualKernelContext:
page_status: Dict[str, PageStatus] = dataclasses.field(default_factory=dict)
# only used for testing
_last_kernel_cull_task: "Optional[asyncio.Future[None]]" = None
_last_kernel_cull_future: "Optional[concurrent.futures.Future[None]]" = None
closed_event: threading.Event = dataclasses.field(default_factory=threading.Event)
lock: threading.RLock = dataclasses.field(default_factory=threading.RLock)

Expand All @@ -86,6 +88,11 @@ def close(self):
with self.lock:
for key in self.page_status:
self.page_status[key] = PageStatus.CLOSED
if self._last_kernel_cull_task:
self._last_kernel_cull_task.cancel()
if self._last_kernel_cull_future:
self._last_kernel_cull_future.cancel()

with self:
if self.app_object is not None:
if isinstance(self.app_object, reacton.core._RenderContext):
Expand Down Expand Up @@ -129,6 +136,8 @@ def state_save(self, state_directory: os.PathLike):
pickle.dump(state, f)

def page_connect(self, page_id: str):
if self.closed_event.is_set():
raise RuntimeError("Cannot connect a page to a closed kernel")
logger.info("Connect page %s for kernel %s", page_id, self.id)
with self.lock:
if page_id in self.page_status and self.page_status.get(page_id) == PageStatus.CLOSED:
Expand Down Expand Up @@ -182,20 +191,21 @@ async def kernel_cull():

has_connected_pages = PageStatus.CONNECTED in self.page_status.values()
if not has_connected_pages:
# when we have no connected pages, we will schedule a kernel cull
if self._last_kernel_cull_task:
self._last_kernel_cull_task.cancel()
with self.lock:
# when we have no connected pages, we will schedule a kernel cull
if self._last_kernel_cull_task:
self._last_kernel_cull_task.cancel()

async def create_task():
task = asyncio.create_task(kernel_cull())
# create a reference to the task so we can cancel it later
self._last_kernel_cull_task = task
try:
await task
except RuntimeError:
pass # event loop already closed, happens during testing
async def create_task():
task = asyncio.create_task(kernel_cull())
# create a reference to the task so we can cancel it later
self._last_kernel_cull_task = task
try:
await task
except RuntimeError:
pass # event loop already closed, happens during testing

asyncio.run_coroutine_threadsafe(create_task(), keep_alive_event_loop)
self._last_kernel_cull_future = asyncio.run_coroutine_threadsafe(create_task(), keep_alive_event_loop)
else:
future.set_result(None)
return future
Expand All @@ -207,7 +217,8 @@ def page_close(self, page_id: str):
different from a websocket/page disconnect, which we might want to recover from.
"""

if self.closed_event.is_set():
raise RuntimeError("Cannot connect a page to a closed kernel")
logger.info("page status: %s", self.page_status)
with self.lock:
if self.page_status[page_id] == PageStatus.CLOSED:
Expand Down

0 comments on commit 76b4e26

Please sign in to comment.