Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support creating multiple ssh connections #1790

Merged
merged 46 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
4750f68
BROKEN - preliminary version of `FractalSSHCollection`
tcompa Sep 19, 2024
5c1209e
Improve `FractalSSHCollection` implementation
tcompa Sep 20, 2024
c798ff3
Improve `FractalSSHCollection`
tcompa Sep 20, 2024
648298f
Add test_FractalSSHCollection
tcompa Sep 20, 2024
efa2bd7
Rename method
tcompa Sep 20, 2024
653158d
Add `fractal_ssh_collection` fixture
tcompa Sep 20, 2024
fcabee6
Add `FractalSSHCollection.contains` method
tcompa Sep 20, 2024
7150d97
Add test_run_command_through_collection
tcompa Sep 20, 2024
6bc68f8
Add `FractalSSHCollection.size` property
tcompa Sep 20, 2024
d0a472d
Move from `app.state.fractal_ssh` to `app.state.fractal_ssh_collection`
tcompa Sep 20, 2024
2f6fc8f
Use new `app.state.fractal_ssh_collection` in API
tcompa Sep 20, 2024
c1a5c0a
Adapt scripts/client
tcompa Sep 20, 2024
79cbef7
Adapt `app` fixture
tcompa Sep 20, 2024
d52ecab
Typo
tcompa Sep 20, 2024
761b964
Update `test_unit_lifespan.py`
tcompa Sep 20, 2024
d528c11
Update FractalSSHCollection unit test
tcompa Sep 20, 2024
15872b7
Update tests for SSH task collection and job execution
tcompa Sep 20, 2024
76f1ac1
Merge branch 'main' into 1782-support-creationg-of-multiple-ssh-conne…
tcompa Sep 20, 2024
5f7bff2
Update CHANGELOG [skip ci]
tcompa Sep 20, 2024
11140da
Fix call args/kwargs
tcompa Sep 20, 2024
00fa11c
Update `test_task_collection_ssh_from_pypi
tcompa Sep 20, 2024
d2ad7b8
Simplify SSH fixtures
tcompa Sep 20, 2024
5142411
Add more logging to `FractalSSHCollection.close_all` method
tcompa Sep 20, 2024
a8bbd50
Move from `FractalSSHCollection.pop` to `remove` method
tcompa Sep 20, 2024
16afe8c
Fix wrong type of Path in test
tcompa Sep 20, 2024
1967b54
Remove comment [skip ci]
tcompa Sep 20, 2024
4a419ee
Rename FractalSSHCollection into List
tcompa Sep 20, 2024
cce1476
BROKEN - count_threads fixture init
tcompa Sep 20, 2024
8318eca
Use context manager in test
tcompa Sep 20, 2024
920b10b
Stop/join threads more often in Slurm executor
tcompa Sep 20, 2024
777c0cf
Introduce `_stop_and_join_wait_thread` in SSH executor
tcompa Sep 20, 2024
97c6d91
Improve thread-counting fixture
tcompa Sep 20, 2024
a3bcd08
Fix use of stop/join method in SSH executors
tcompa Sep 20, 2024
456c8b4
Reduce fixture grace time
tcompa Sep 20, 2024
073f8fb
Improve `check_threads` fixture
tcompa Sep 23, 2024
efd9261
Improve thread cleanup in FractalSlurmSSHExecutor
tcompa Sep 23, 2024
6c19394
Make `FractalSSH.close` more aggressive (ref #1792)
tcompa Sep 23, 2024
fc38dd7
Merge pull request #1793 from fractal-analytics-platform/fix-threads-…
tcompa Sep 23, 2024
5d8d3e3
Fix test_check_connection_failure
tcompa Sep 23, 2024
bedc8dd
Improve `check_threads`
tcompa Sep 23, 2024
4dda7d2
Update CHANGELOG [skip ci]
tcompa Sep 23, 2024
626b4a6
Add logs to SSH Slurm executor
tcompa Sep 23, 2024
4bb8c63
Improve testing of SSH executor and error handling
tcompa Sep 23, 2024
01f8c04
Remove obsolete get_ssh_connection function
tcompa Sep 23, 2024
b0e8027
Always set `look_for_keys=False` for SSH connection (close #1796)
tcompa Sep 23, 2024
e05be73
Update CHANGELOG
tcompa Sep 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
**Note**: Numbers like (\#1234) point to closed Pull Requests on the fractal-server repository.

# 2.5.2

* App:
* Replace `fractal_ssh` attribute with `fractal_ssh_list`, in `app.state` (\#1790).
* Move creation of SSH connections from app startup to endpoints (\#1790).
* Internal
* Introduce `FractalSSHList`, in view of support for multiple SSH/Slurm service users (\#1790).
* Make `FractalSSH.close()` more aggressively close `Transport` attribute (\#1790).
* Set `look_for_keys=False` for paramiko/fabric connection (\#1790).
* Testing:
* Add fixture to always test that threads do not accumulate during tests (\#1790).

# 2.5.1

Expand Down
14 changes: 13 additions & 1 deletion fractal_server/app/routes/api/v2/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ async def apply_workflow(
await db.merge(job)
await db.commit()

# User appropriate FractalSSH object
if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
ssh_credentials = dict(
user=settings.FRACTAL_SLURM_SSH_USER,
host=settings.FRACTAL_SLURM_SSH_HOST,
key_path=settings.FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH,
)
fractal_ssh_list = request.app.state.fractal_ssh_list
fractal_ssh = fractal_ssh_list.get(**ssh_credentials)
else:
fractal_ssh = None

background_tasks.add_task(
submit_workflow,
workflow_id=workflow.id,
Expand All @@ -246,7 +258,7 @@ async def apply_workflow(
worker_init=job.worker_init,
slurm_user=user.slurm_user,
user_cache_dir=user.cache_dir,
fractal_ssh=request.app.state.fractal_ssh,
fractal_ssh=fractal_ssh,
)
request.app.state.jobsV2.append(job.id)
logger.info(
Expand Down
11 changes: 10 additions & 1 deletion fractal_server/app/routes/api/v2/task_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,20 @@ async def collect_tasks_pip(
db.add(state)
await db.commit()

# User appropriate FractalSSH object
ssh_credentials = dict(
user=settings.FRACTAL_SLURM_SSH_USER,
host=settings.FRACTAL_SLURM_SSH_HOST,
key_path=settings.FRACTAL_SLURM_SSH_PRIVATE_KEY_PATH,
)
fractal_ssh_list = request.app.state.fractal_ssh_list
fractal_ssh = fractal_ssh_list.get(**ssh_credentials)

background_tasks.add_task(
background_collect_pip_ssh,
state.id,
task_pkg,
request.app.state.fractal_ssh,
fractal_ssh,
)

response.status_code = status.HTTP_201_CREATED
Expand Down
28 changes: 24 additions & 4 deletions fractal_server/app/runner/executors/slurm/ssh/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,34 @@ def __init__(
settings = Inject(get_settings)
self.python_remote = settings.FRACTAL_SLURM_WORKER_PYTHON
if self.python_remote is None:
self._stop_and_join_wait_thread()
raise ValueError("FRACTAL_SLURM_WORKER_PYTHON is not set. Exit.")

# Initialize connection and perform handshake
self.fractal_ssh = fractal_ssh
logger.warning(self.fractal_ssh)
self.handshake()
try:
self.handshake()
except Exception as e:
logger.warning(
"Stop/join waiting thread and then "
f"re-raise original error {str(e)}"
)
self._stop_and_join_wait_thread()
raise e

# Set/validate parameters for SLURM submission scripts
self.slurm_account = slurm_account
self.common_script_lines = common_script_lines or []
self._validate_common_script_lines()
try:
self._validate_common_script_lines()
except Exception as e:
logger.warning(
"Stop/join waiting thread and then "
f"re-raise original error {str(e)}"
)
self._stop_and_join_wait_thread()
raise e

# Set/initialize some more options
self.keep_pickle_files = keep_pickle_files
Expand Down Expand Up @@ -1385,6 +1402,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self.fractal_ssh.run_command(cmd=scancel_command)
logger.debug("Executor shutdown: end")

def _stop_and_join_wait_thread(self):
self.wait_thread.stop()
self.wait_thread.join()

def __exit__(self, *args, **kwargs):
"""
See
Expand All @@ -1393,8 +1414,7 @@ def __exit__(self, *args, **kwargs):
logger.debug(
"[FractalSlurmSSHExecutor.__exit__] Stop and join `wait_thread`"
)
self.wait_thread.stop()
self.wait_thread.join()
self._stop_and_join_wait_thread()
logger.debug("[FractalSlurmSSHExecutor.__exit__] End")

def run_squeue(self, job_ids):
Expand Down
8 changes: 6 additions & 2 deletions fractal_server/app/runner/executors/slurm/sudo/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def __init__(
for line in self.common_script_lines
if line.startswith("#SBATCH --account=")
)
self._stop_and_join_wait_thread()
raise RuntimeError(
"Invalid line in `FractalSlurmExecutor.common_script_lines`: "
f"'{invalid_line}'.\n"
Expand Down Expand Up @@ -1287,6 +1288,10 @@ def shutdown(self, wait=True, *, cancel_futures=False):

logger.debug("Executor shutdown: end")

def _stop_and_join_wait_thread(self):
self.wait_thread.stop()
self.wait_thread.join()

def __exit__(self, *args, **kwargs):
"""
See
Expand All @@ -1295,6 +1300,5 @@ def __exit__(self, *args, **kwargs):
logger.debug(
"[FractalSlurmExecutor.__exit__] Stop and join `wait_thread`"
)
self.wait_thread.stop()
self.wait_thread.join()
self._stop_and_join_wait_thread()
logger.debug("[FractalSlurmExecutor.__exit__] End")
22 changes: 12 additions & 10 deletions fractal_server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,34 @@ async def lifespan(app: FastAPI):
settings = Inject(get_settings)

if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
from fractal_server.ssh._fabric import get_ssh_connection
from fractal_server.ssh._fabric import FractalSSH

connection = get_ssh_connection()
app.state.fractal_ssh = FractalSSH(connection=connection)
from fractal_server.ssh._fabric import FractalSSHList

app.state.fractal_ssh_list = FractalSSHList()

logger.info(
f"Created SSH connection "
f"({app.state.fractal_ssh.is_connected=})."
"Added empty FractalSSHList to app.state "
f"(id={id(app.state.fractal_ssh_list)})."
)
else:
app.state.fractal_ssh = None
app.state.fractal_ssh_list = None

config_uvicorn_loggers()
logger.info("End application startup")
reset_logger_handlers(logger)

yield

logger = get_logger("fractal_server.lifespan")
logger.info("Start application shutdown")

if settings.FRACTAL_RUNNER_BACKEND == "slurm_ssh":
logger.info(
f"Closing SSH connection "
f"(current: {app.state.fractal_ssh.is_connected=})."
"Close FractalSSH connections "
f"(current size: {app.state.fractal_ssh_list.size})."
)

app.state.fractal_ssh.close()
app.state.fractal_ssh_list.close_all()

logger.info(
f"Current worker with pid {os.getpid()} is shutting down. "
Expand Down
Loading
Loading