Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid race conditions when installing multiple dependencies from the same git repository. #9658

Merged
33 changes: 30 additions & 3 deletions src/poetry/installation/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import threading

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
from pathlib import Path
Expand Down Expand Up @@ -144,6 +145,7 @@ def execute(self, operations: list[Operation]) -> int:
for _, group in groups:
tasks = []
serial_operations = []
serial_git_operations = defaultdict(list)
for operation in group:
if self._shutdown:
break
Expand All @@ -158,11 +160,36 @@ def execute(self, operations: list[Operation]) -> int:
operation.package.develop
and operation.package.source_type in {"directory", "git"}
)
if not operation.skipped and is_parallel_unsafe:
# Skipped operations are safe to execute in parallel
if operation.skipped:
is_parallel_unsafe = False

if is_parallel_unsafe:
serial_operations.append(operation)
continue
elif operation.package.source_type == "git":
# Git operations on the same repository should be executed serially
serial_git_operations[operation.package.source_url].append(
operation
)
else:
tasks.append(
self._executor.submit(self._execute_operation, operation)
)
gustavgransbo marked this conversation as resolved.
Show resolved Hide resolved

tasks.append(self._executor.submit(self._execute_operation, operation))
def _serialize(
repository_serial_operations: list[Operation],
) -> None:
for operation in repository_serial_operations:
self._execute_operation(operation)

# For each git repository, execute all operations serially
for repository_git_operations in serial_git_operations.values():
tasks.append(
self._executor.submit(
_serialize,
repository_serial_operations=repository_git_operations,
)
)

try:
wait(tasks)
Expand Down
43 changes: 43 additions & 0 deletions tests/installation/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,49 @@ def test_executor_should_append_subdirectory_for_git(
assert archive_arg == tmp_venv.path / "src/demo/subdirectories/two"


def test_executor_should_install_multiple_packages_from_same_git_repository(
mocker: MockerFixture,
tmp_venv: VirtualEnv,
pool: RepositoryPool,
config: Config,
artifact_cache: ArtifactCache,
io: BufferedIO,
wheel: Path,
) -> None:
package_a = Package(
"package_a",
"0.1.2",
source_type="git",
source_reference="master",
source_resolved_reference="123456",
source_url="https://github.com/demo/subdirectories.git",
source_subdirectory="package_a",
)
package_b = Package(
"package_b",
"0.1.2",
source_type="git",
source_reference="master",
source_resolved_reference="123456",
source_url="https://github.com/demo/subdirectories.git",
source_subdirectory="package_b",
)

chef = Chef(artifact_cache, tmp_venv, Factory.create_pool(config))
chef.set_directory_wheel(wheel)
spy = mocker.spy(chef, "prepare")

executor = Executor(tmp_venv, pool, config, io)
executor._chef = chef
executor.execute([Install(package_a), Install(package_b)])

archive_arg = spy.call_args_list[0][0][0]
assert archive_arg == tmp_venv.path / "src/demo/subdirectories/package_a"

archive_arg = spy.call_args_list[1][0][0]
assert archive_arg == tmp_venv.path / "src/demo/subdirectories/package_b"


def test_executor_should_write_pep610_url_references_for_git_with_subdirectories(
tmp_venv: VirtualEnv,
pool: RepositoryPool,
Expand Down
Loading