Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbenayoun committed Feb 26, 2024
1 parent 2109b3a commit 5ac76e6
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 23 deletions.
16 changes: 15 additions & 1 deletion optimum/commands/neuron/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
"""Defines the command line related to dealing with the Neuron cache repo."""

from pathlib import Path
from typing import TYPE_CHECKING

from ...neuron.utils import get_hub_cached_entries, synchronize_hub_cache
Expand All @@ -23,6 +24,7 @@
create_custom_cache_repo,
set_custom_cache_repo_name_in_hf_home,
)
from ...neuron.utils.require_utils import requires_torch_neuronx
from ...neuron.utils.runner import ExampleRunner
from ...utils import logging
from ..base import BaseOptimumCLICommand, CommandInfo
Expand Down Expand Up @@ -165,9 +167,21 @@ class SynchronizeRepoCommand(BaseOptimumCLICommand):
@staticmethod
def parse_args(parser: "ArgumentParser"):
parser.add_argument("--repo_id", type=str, default=None, help="The name of the repo to use as remote cache.")
parser.add_argument(
"--cache_dir", type=str, default=None, help="The cache directory that contains the compilation files"
)

@requires_torch_neuronx
def run(self):
synchronize_hub_cache(self.args.repo_id)
from libneuronxla.neuron_cc_cache import CacheUrl

if self.args.cache_dir is not None:
if not Path(self.args.cache_dir).is_dir():
raise ValueError(f"The {self.args.cache_dir} directory does not exist.")
cache_url = CacheUrl(self.args.cache_dir, url_type="fs")
else:
cache_url = None
synchronize_hub_cache(cache_url=cache_url, cache_repo_id=self.args.repo_id)


class LookupRepoCommand(BaseOptimumCLICommand):
Expand Down
21 changes: 8 additions & 13 deletions optimum/neuron/accelerate/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,23 +267,11 @@ def __init__(
os.environ.get("ACCELERATE_USE_NEURONX_DISTRIBUTED_TP", "false") == "true"
or os.environ.get("ACCELERATE_USE_NEURONX_DISTRIBUTED_PP", "false") == "true"
):
if not is_neuronx_distributed_available():
raise RuntimeError(
"Model parallelism requires the neuronx_distributed package. You can install it by "
"running: python -m pip install neuronx_distributed --extra-index-url "
"https://pip.repos.neuron.amazonaws.com"
)
if mp_plugin is None:
raise ValueError(
"Could not initialize `neuronx_distributed` model parallelism because no "
"`ModelParallelismPlugin` was provided."
"Could not initialize model parallelism because no `ModelParallelismPlugin` was provided."
)
if mp_plugin.should_parallelize:
if not parallel_state.model_parallel_is_initialized():
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=mp_plugin.tensor_parallel_size,
pipeline_model_parallel_size=mp_plugin.pipeline_parallel_size,
)
self.distributed_type = NeuronDistributedType.MODEL_PARALLELISM
else:
logger.warning(
Expand All @@ -293,6 +281,13 @@ def __init__(
self.mp_plugin = mp_plugin
else:
self.mp_plugin = ModelParallelismPlugin()

if torch.distributed.is_initialized() and not parallel_state.model_parallel_is_initialized():
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=self.mp_plugin.tensor_parallel_size,
pipeline_model_parallel_size=self.mp_plugin.pipeline_parallel_size,
)

if os.environ.get("ACCELERATE_USE_FSDP", "false") == "true":
self.distributed_type = NeuronDistributedType.XLA_FSDP
if self._mixed_precision != "no":
Expand Down
4 changes: 3 additions & 1 deletion optimum/neuron/distributed/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ def parallelize(
from neuronx_distributed.pipeline import NxDPPModel

tp_size = get_tensor_model_parallel_size()
pp_size = get_pipeline_model_parallel_size()

sequence_parallel_enabled = sequence_parallel_enabled and tp_size > 1

Expand All @@ -501,6 +502,8 @@ def parallelize(
parameter_to_name = {p: n for n, p in name_to_parameter.items()}

def should_parallelize_layer_predicate_func(layer):
if pp_size == 1:
return True
for p in layer.parameters():
if p not in parameter_to_name:
return True
Expand Down Expand Up @@ -558,7 +561,6 @@ def should_parallelize_layer_predicate_func(layer):
if is_main_worker():
logger.info("Load and initialization of the weights done.")

pp_size = get_pipeline_model_parallel_size()
if pp_size > 1:
if not cls.supports_pipeline_parallelism():
raise NotImplementedError("{cls} does not support pipeline parallelism.")
Expand Down
13 changes: 11 additions & 2 deletions optimum/neuron/trainers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@
from .utils.cache_utils import (
get_hf_hub_cache_repos,
get_model_name_or_path,
get_neuron_cache_path,
get_neuronxcc_version,
get_num_neuron_cores_used,
has_write_access_to_repo,
)
from .utils.hub_neuronx_cache import ModelCacheEntry, hub_neuronx_cache, patch_neuron_cc_wrapper, synchronize_hub_cache
from .utils.misc import is_main_worker
from .utils.patching import patch_everywhere
from .utils.require_utils import requires_neuronx_distributed
from .utils.require_utils import requires_neuronx_distributed, requires_torch_neuronx
from .utils.training_utils import (
TRANSFORMERS_MIN_VERSION_USE_ACCELERATE,
get_model_param_count,
Expand Down Expand Up @@ -269,12 +270,20 @@ def create_accelerator_and_postprocess(self):
ds_plugin.deepspeed_config = ds_plugin.hf_ds_config.config
ds_plugin.hf_ds_config.trainer_config_process(self.args)

@requires_torch_neuronx
def synchronize_hub_cache(self):
from libneuronxla.neuron_cc_cache import CacheUrl

repo_id = get_hf_hub_cache_repos()[0]
if xm.get_ordinal() == 0:
has_write_access = has_write_access_to_repo(repo_id)
if has_write_access:
synchronize_hub_cache(repo_id)
cache_path = get_neuron_cache_path()
if cache_path is not None:
cache_url = CacheUrl(cache_path.as_posix(), url_type="fs")
else:
cache_url = None
synchronize_hub_cache(cache_url=cache_url, cache_repo_id=repo_id)
xm.rendezvous("Hub cache synchronization done")

def _wrap_model(self, model, training=True, dataloader=None):
Expand Down
8 changes: 5 additions & 3 deletions optimum/neuron/utils/hub_neuronx_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,16 @@ def patch_neuron_cc_wrapper():


@requires_torch_neuronx
def synchronize_hub_cache(cache_repo_id: Optional[str] = None):
def synchronize_hub_cache(cache_url: Optional[CacheUrl] = None, cache_repo_id: Optional[str] = None):
"""Synchronize the neuronx compiler cache with the optimum-neuron hub cache.
Args:
repo_id (`Optional[str]`, default to None):
cache_url (`Optional[CacheUrl]`, defaults to `None`):
The cache url to use for synchronization.
cache_repo_id (`Optional[str]`, default to None):
The id of the HuggingFace cache repository, in the form 'org|user/name'.
"""
hub_cache_proxy = _create_hub_compile_cache_proxy(cache_repo_id=cache_repo_id)
hub_cache_proxy = _create_hub_compile_cache_proxy(cache_url=cache_url, cache_repo_id=cache_repo_id)
hub_cache_proxy.synchronize()


Expand Down
16 changes: 13 additions & 3 deletions optimum/neuron/utils/training_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,20 +405,27 @@ def get_model_param_count(model: Union[torch.nn.Module, "NxDPPModel"], trainable
named_parameters = model.named_parameters()
shared_parameters_across_pipeline_stages = {}

pp_rank = get_pipeline_model_parallel_rank()
if torch.distributed.is_initialized():
tp_size = get_tensor_model_parallel_size()
pp_size = get_pipeline_model_parallel_size()
pp_rank = get_pipeline_model_parallel_rank()
else:
tp_size = 1
pp_size = 1
pp_rank = 0

def numel(parameter_name, parameter) -> int:
should_count_param = shared_parameters_across_pipeline_stages.get(parameter_name, pp_rank) == pp_rank

num_elements = parameter.numel()
if getattr(parameter, "tensor_model_parallel", False):
num_elements *= get_tensor_model_parallel_size()
num_elements *= tp_size

return num_elements if should_count_param else 0

param_count = sum(numel(n, p) for n, p in named_parameters if not trainable_only or p.requires_grad)

if get_pipeline_model_parallel_size() > 1:
if pp_size > 1:
param_count = torch.tensor(param_count, dtype=torch.float32).to(xm.xla_device())
param_count = xm.all_reduce(xm.REDUCE_SUM, param_count, groups=get_pipeline_model_parallel_group(as_list=True))
param_count = int(param_count.detach().item())
Expand All @@ -436,6 +443,9 @@ def is_main_worker_for_metrics() -> bool:
get_tensor_model_parallel_rank,
)

if not torch.distributed.is_initialized():
return True

dp_rank = get_data_parallel_rank()
tp_rank = get_tensor_model_parallel_rank()
pp_rank = get_pipeline_model_parallel_rank()
Expand Down
3 changes: 3 additions & 0 deletions tests/distributed/test_model_parallelization.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ def _parallel_model_matches_original_model(
model = accelerator.patch_model_for_neuron(model)
with torch.no_grad():
if pp_size == 1:
# This is set to False by `accelerator.prepare`, which we want in the general case, but here let's
# enable the cache to test that the KV cache matches the original model.
model.config.use_cache = True
model = model.eval()
model_outputs = model(**xla_inputs)
else:
Expand Down

0 comments on commit 5ac76e6

Please sign in to comment.