From 70c3e5cf3d7d0d737776605f0154f23e51d54ed3 Mon Sep 17 00:00:00 2001 From: Michael Benayoun Date: Fri, 15 Sep 2023 15:00:19 +0200 Subject: [PATCH] [WIP] --- optimum/neuron/distributed/parallel_layers.py | 2 +- optimum/neuron/utils/runner.py | 4 +-- .../distributed/test_model_parallelization.py | 27 ++++++++++--------- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/optimum/neuron/distributed/parallel_layers.py b/optimum/neuron/distributed/parallel_layers.py index 6bb1734db..0938c06ed 100644 --- a/optimum/neuron/distributed/parallel_layers.py +++ b/optimum/neuron/distributed/parallel_layers.py @@ -605,7 +605,7 @@ def forward(self, input: torch.Tensor, target: torch.Tensor) -> torch.Tensor: # from torch_neuronx.xla_impl.ops import SimpleCrossEntropyLoss # output = SimpleCrossEntropyLoss.gen_override().forward(self, input, target) output = safe_parallel_cross_entropy( - input.clone(), + input, target, weight=self.weight, ignore_index=self.ignore_index, diff --git a/optimum/neuron/utils/runner.py b/optimum/neuron/utils/runner.py index 9034378d8..420103e96 100644 --- a/optimum/neuron/utils/runner.py +++ b/optimum/neuron/utils/runner.py @@ -90,8 +90,8 @@ class Precision(str, Enum): bf16 = "bf16" -def run_command_with_realtime_output(cmd: List[str]) -> Tuple[int, str]: - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) +def run_command_with_realtime_output(cmd: List[str], **popen_kwargs) -> Tuple[int, str]: + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **popen_kwargs) stdout = [] decoder = codecs.getincrementaldecoder("utf-8")() while True: diff --git a/tests/distributed/test_model_parallelization.py b/tests/distributed/test_model_parallelization.py index 89d9d7c73..bcdf81ede 100644 --- a/tests/distributed/test_model_parallelization.py +++ b/tests/distributed/test_model_parallelization.py @@ -45,6 +45,7 @@ from optimum.neuron.utils.cache_utils import get_num_neuron_cores, set_neuron_cache_path from optimum.neuron.utils.import_utils import is_neuronx_available +from optimum.neuron.utils.runner import run_command_with_realtime_output from ..test_utils import is_trainium_test @@ -207,14 +208,12 @@ def _test_model_parallel( cmd.insert(1, f"--rdzv_endpoint={rdzv_endpoint_host}:{rdzv_endpoint_port}") env["NEURON_RT_VISIBLE_CORES"] = f"0-{num_neuron_cores - 1}" - p_original = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) # When running tests in parallel, synchronization is done after both processes started. if not run_test_in_parallel: - stdout, _ = p_original.communicate() - stdout = stdout.decode("utf-8") - full_output = f"Original model standard output:\n{stdout}" - print(full_output) + _, stdout = run_command_with_realtime_output(cmd, env=env) + else: + p_original = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) # Parallel model. env = {"is_parallel": "true", **specialization_env, "NEURON_CC_FLAGS": neuron_cc_flags} @@ -222,18 +221,22 @@ def _test_model_parallel( # Updating the rendez-vous endpoint for the parallel model process. cmd[1] = f"--rdzv_endpoint={rdzv_endpoint_host}:{rdzv_endpoint_port + 1}" env["NEURON_RT_VISIBLE_CORES"] = f"{num_neuron_cores}-{2 * num_neuron_cores - 1}" - p_parallel = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) - if run_test_in_parallel: + p_parallel = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env) + stdout, _ = p_original.communicate() stdout = stdout.decode("utf-8") full_output = f"Original model standard output:\n{stdout}" print(full_output) - stdout, _ = p_parallel.communicate() - stdout = stdout.decode("utf-8") - full_output = f"Parallel model standard output:\n{stdout}" - print(full_output) + stdout, _ = p_parallel.communicate() + stdout = stdout.decode("utf-8") + full_output = f"Parallel model standard output:\n{stdout}" + print(full_output) + + else: + _, stdout = run_command_with_realtime_output(cmd, env=env) + temporary_dir = Path(tmpdirname) original_model_outputs = torch.load(temporary_dir / "original.bin") @@ -256,7 +259,7 @@ def test_model_parallel_from_config_without_lazy_load( self._test_model_parallel( num_neuron_cores=8, tp_size=2, - run_test_in_parallel=True, + run_test_in_parallel=False, model_class_name=model_class_name, model_name_or_path=model_name_or_path, from_config=True,