Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Ensure recovery from BrokenPipeError #563

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions python_packages/jupyter_lsp/jupyter_lsp/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import string
import subprocess
from copy import copy
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone

from tornado.ioloop import IOLoop
from tornado.queues import Queue
Expand Down Expand Up @@ -49,8 +49,12 @@ class LanguageServerSession(LoggingConfigurable):
status = UseEnum(SessionStatus, default_value=SessionStatus.NOT_STARTED)
last_handler_message_at = Instance(datetime, allow_none=True)
last_server_message_at = Instance(datetime, allow_none=True)
allow_server_failure_not_more_often_than = Instance(
timedelta, allow_none=False, default_value=timedelta(minutes=20)
)

_tasks = None
_last_failure = None

_skip_serialize = ["argv", "debug_argv"]

Expand Down Expand Up @@ -169,7 +173,12 @@ async def _read_lsp(self):
await self.reader.read()

async def _write_lsp(self):
await self.writer.write()
task = self.writer.write()
results = await asyncio.gather(task, return_exceptions=True)
for result in results:
if isinstance(result, BrokenPipeError):
self._handle_server_failure(result)
return results

async def _broadcast_from_lsp(self):
"""loop for reading messages from the queue of messages from the language
Expand All @@ -179,3 +188,34 @@ async def _broadcast_from_lsp(self):
self.last_server_message_at = self.now()
await self.parent.on_server_message(message, self)
self.from_lsp.task_done()

def _handle_server_failure(self, error):
description: str
action: str
now = datetime.now()

allowed = self.allow_server_failure_not_more_often_than
if self._last_failure and now - self._last_failure > allowed:
delta = now - self._last_failure
description = (
f"giving up as the previous failure was {delta} ago"
f" which is less than te minimum allowed interval ({allowed})"
)
action = "raise"
else:
action = "restart"
description = "restarting session..."

text = (
f"Encountered {self.language_server} language server failure;"
f" {description}"
f" (exception: {error})"
f" (faulty process: {self.process})"
)
self.log.warning(text)

if action == "raise":
raise
elif action == "restart":
self.stop()
self.initialize()
7 changes: 6 additions & 1 deletion python_packages/jupyter_lsp/jupyter_lsp/stdio.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ async def _read_content(
if len(raw) != length: # pragma: no cover
self.log.warning(
f"Readout and content-length mismatch: {len(raw)} vs {length};"
f"remaining empties: {max_empties}; remaining parts: {max_parts}"
f" remaining empties: {max_empties}; remaining parts: {max_parts}"
)

return raw
Expand Down Expand Up @@ -191,7 +191,12 @@ async def write(self) -> None:
body = message.encode("utf-8")
response = "Content-Length: {}\r\n\r\n{}".format(len(body), message)
await convert_yielded(self._write_one(response.encode("utf-8")))
except BrokenPipeError:
self.queue.task_done()
# propagate broken pipe errors
raise
except Exception: # pragma: no cover
# catch other (hopefully mild) exceptions
self.log.exception("%s couldn't write message: %s", self, response)
finally:
self.queue.task_done()
Expand Down
19 changes: 19 additions & 0 deletions python_packages/jupyter_lsp/jupyter_lsp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ def jsonrpc_init_msg():
)


@fixture
def did_open_message():
return json.dumps(
{
"id": 0,
"jsonrpc": "2.0",
"method": "textDocument/didOpen",
"params": {
"textDocument": {
"uri": pathlib.Path(__file__).as_uri(),
"languageId": "python",
"version": 0,
"text": "",
}
},
}
)


@fixture
def app():
return MockServerApp()
Expand Down
49 changes: 49 additions & 0 deletions python_packages/jupyter_lsp/jupyter_lsp/tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import asyncio
import logging
import subprocess

import pytest

from ..handlers import LanguageServerWebSocketHandler
from ..schema import SERVERS_RESPONSE
from ..session import LanguageServerSession


def assert_status_set(handler, expected_statuses, language_server=None):
Expand Down Expand Up @@ -100,3 +104,48 @@ async def test_ping(handlers):
assert ws_handler._ping_sent is True

ws_handler.on_close()


@pytest.mark.asyncio
async def test_broken_pipe(handlers, jsonrpc_init_msg, did_open_message, caplog):
"""If the pipe breaks (server dies), can we recover by restarting the server?"""
a_server = "pyls"

# use real handler in this test rather than a mock
# -> testing broken pipe requires that here
handler, ws_handler = handlers
manager = handler.manager

manager.initialize()

assert_status_set(handler, {"not_started"}, a_server)

ws_handler.open(a_server)

await ws_handler.on_message(jsonrpc_init_msg)
assert_status_set(handler, {"started"}, a_server)

session: LanguageServerSession = manager.sessions[a_server]
process: subprocess.Popen = session.process
process.kill()

with caplog.at_level(logging.WARNING):
# an attempt to write should raise BrokenPipeError
await ws_handler.on_message(did_open_message)
await asyncio.sleep(1)

# which should be caught
assert "Encountered pyls language server failure" in caplog.text
assert "exception: [Errno 32] Broken pipe" in caplog.text

# and the server should get restarted
assert "restarting session..." in caplog.text

assert_status_set(handler, {"started"}, a_server)

with caplog.at_level(logging.WARNING):
# we should be able to send a message now
await ws_handler.on_message(did_open_message)
assert caplog.text == ""

ws_handler.on_close()