diff --git a/openfl-tutorials/experimental/Federeated_Pytorch_LLM_Horovod.py b/openfl-tutorials/experimental/Federeated_Pytorch_LLM_Horovod.py index b418de1705..7f595cda99 100644 --- a/openfl-tutorials/experimental/Federeated_Pytorch_LLM_Horovod.py +++ b/openfl-tutorials/experimental/Federeated_Pytorch_LLM_Horovod.py @@ -7,7 +7,7 @@ from src.ptglue_inmemory import GlueMrpcFederatedDataLoader import openfl.interface.workspace as workspace import os -import shutil +import subprocess WORKSPACE_PREFIX = os.path.join(os.path.expanduser("~"), ".local", "workspace") @@ -22,12 +22,51 @@ #OPENFL_HOROVOD_DEMO_LOCALHOSTIP=STR with the IP address of the local node eg. "ip1" #OPENFL_HOROVOD_DEMO_HOSTS=STR with the IP address of the each node and number of slots eg. "ip1:2,ip2,2" +NP = os.environ.get('OPENFL_HOROVOD_DEMO_NP','4') +NETWORK_INTERFACES = os.environ.get('OPENFL_HOROVOD_DEMO_NICS','localhost') +LOCAL_HOST = os.environ.get('OPENFL_HOROVOD_DEMO_LOCALHOSTIP','localhost') +HOSTS = os.environ.get('OPENFL_HOROVOD_DEMO_HOSTS','localhost:4') + +print('NP:', NP) +print('NETWORK_INTERFACES:', NETWORK_INTERFACES) +print('LOCAL_HOST:', LOCAL_HOST) +print('HOSTS:', HOSTS) + +def propogate_workspace(): + remote_hosts = [ + i.split(":")[0] for i in HOSTS.split(",") if i.split(":")[0] != LOCAL_HOST + ] + for rem_host in remote_hosts: + result = subprocess.run( + [ + "scp", + "-r", + WORKSPACE_PREFIX, + rem_host + + ":" + + WORKSPACE_PREFIX.replace('workspace',''), + ], + capture_output=True, + ) + print([ + "scp", + "-r", + WORKSPACE_PREFIX, + rem_host + + ":" + + WORKSPACE_PREFIX, + ]) + if result.returncode != 0: + raise RuntimeError(result.stderr) def main(): + print(WORKSPACE_PREFIX) log_level = "INFO" log_file = None workspace.create(WORKSPACE_PREFIX, "torch_llm") os.chdir(WORKSPACE_PREFIX) + sys.path.append(WORKSPACE_PREFIX) + propogate_workspace() fx.setup_logging(level=log_level, log_file=log_file) num_collaborators = 1 diff --git a/openfl-workspace/torch_llm/src/pt_model.py b/openfl-workspace/torch_llm/src/pt_model.py index 9991e7c967..38df8e4613 100644 --- a/openfl-workspace/torch_llm/src/pt_model.py +++ b/openfl-workspace/torch_llm/src/pt_model.py @@ -39,6 +39,11 @@ LOCAL_HOST = os.environ.get('OPENFL_HOROVOD_DEMO_LOCALHOSTIP','localhost') HOSTS = os.environ.get('OPENFL_HOROVOD_DEMO_HOSTS','localhost:4') +print('NP:', NP) +print('NETWORK_INTERFACES:', NETWORK_INTERFACES) +print('LOCAL_HOST:', LOCAL_HOST) +print('HOSTS:', HOSTS) + class LLMTaskRunner(PyTorchTaskRunner): def __init__( @@ -219,7 +224,6 @@ def validate( "round_num": round_num, "input_tensor_dict": None, "use_tqdm": use_tqdm, - "use_horovod": True, } result = self.launch_horovod(data_path, state_path, out_path, horovod_kwags) @@ -264,14 +268,13 @@ def train_batches( local_output_dict: Tensors to maintain in the local TensorDB """ self.rebuild_model(round_num, input_tensor_dict) - state_path, out_path, data_path = self.save_modelstate(col_name, round_num, 'validate', kwargs) + state_path, out_path, data_path = self.save_modelstate(col_name, round_num, 'train_batches', kwargs) self.propogate_modelstate(state_path) horovod_kwags = { "col_name": col_name, "round_num": round_num, "input_tensor_dict": None, "use_tqdm": use_tqdm, - "use_horovod": True, } result = self.launch_horovod(data_path, state_path, out_path, horovod_kwags) if result.returncode != 0: @@ -411,7 +414,80 @@ def save_native( pt.save(pickle_dict, filepath) -class InHorovodLLMTaskRunner(LLMTaskRunner): +class InHorovodLLMTaskRunner(PyTorchTaskRunner): + + def __init__( + self, + data_loader, + base_model_name="roberta-base", + device=None, + metric=None, + args=None, + **kwargs, + ): + kwargs["data_loader"] = data_loader + super().__init__(device, **kwargs) + self.base_model_name = base_model_name + self.kwargs = kwargs + self.metric = load_metric("glue", "mrpc") + self._init_model() + self._init_optimizer() + + def _init_model(self): + model = AutoModelForSequenceClassification.from_pretrained( + self.base_model_name, return_dict=True + ) + peft_config = LoraConfig( + task_type=TaskType.SEQ_CLS, + inference_mode=False, + r=16, + lora_alpha=16, + lora_dropout=0.1, + bias="lora_only", + ) + self.model = get_peft_model(model, peft_config) + self.model.to(self.device) + + def _init_optimizer(self): + ALL_LAYERNORM_LAYERS = [nn.LayerNorm] + decay_parameters = get_parameter_names(self.model, ALL_LAYERNORM_LAYERS) + decay_parameters = [name for name in decay_parameters if "bias" not in name] + + optimizer_grouped_parameters = [ + { + "params": [ + p + for n, p in self.model.named_parameters() + if (n in decay_parameters and p.requires_grad) + ], + "weight_decay": 0.01, + }, + { + "params": [ + p + for n, p in self.model.named_parameters() + if (n not in decay_parameters and p.requires_grad) + ], + "weight_decay": 0.0, + }, + ] + self.optimizer = AdamW(optimizer_grouped_parameters, lr=0.001) + self.lr_scheduler = get_scheduler( + name="linear", + optimizer=self.optimizer, + num_warmup_steps=0, + num_training_steps=len(self.data_loader.train_set) * 5, + ) + self.initialize_tensorkeys_for_functions() + + def train(self): + return self.model.train() + + def state_dict(self): + return get_peft_model_state_dict(self.model) + + def load_state_dict(self, state_dict: Mapping[str, Any], strict: bool = True): + return set_peft_model_state_dict(self.model, state_dict) def load_state(self, kwargs): checkpoint = torch.load(kwargs["state_path"]) @@ -515,6 +591,60 @@ def validate( if hvd.rank() == 0: torch.save({"output": result}, kwargs["out_path"]) + + def train_epoch(self, batch_generator) -> Metric: + """Train single epoch. + + Override this function in order to use custom training. + + Args: + batch_generator: Train dataset batch generator. Yields (samples, targets) tuples of + size = `self.data_loader.batch_size`. + Returns: + Metric: An object containing name and np.ndarray value. + """ + losses = [] + for sample in batch_generator: + self.model.zero_grad() + output = self.model(**sample) + loss = output.loss + loss.backward(loss) + torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) + self.optimizer.step() + self.lr_scheduler.step() + losses.append(loss.detach().cpu().numpy()) + loss = np.mean(losses) + return np.array(loss) + + def save_native( + self, + filepath, + model_state_dict_key="model_state_dict", + optimizer_state_dict_key="optimizer_state_dict", + **kwargs, + ): + """ + Save model and optimizer states in a picked file specified by the \ + filepath. model_/optimizer_state_dicts are stored in the keys provided. \ + Uses pt.save(). + + Args: + filepath (string) : Path to pickle file to be + created by pt.save(). + model_state_dict_key (string) : key for model state dict + in pickled file. + optimizer_state_dict_key (string) : key for optimizer state + dict in picked file. + kwargs : unused + + Returns: + None + """ + pickle_dict = { + model_state_dict_key: get_peft_model_state_dict(self.model), + optimizer_state_dict_key: self.optimizer.state_dict(), + } + pt.save(pickle_dict, filepath) def get_args(): @@ -551,9 +681,9 @@ def get_args(): def main(): args = get_args() data_loader = InHorovodGlueMrpcFederatedDataLoader( - data_path=args.data_path, batch_size=args.batch_size, use_horovod=True + data_path=args.data_path, batch_size=args.batch_size ) - taskrunner = InHorovodLLMTaskRunner(data_loader, use_horovod=True) + taskrunner = InHorovodLLMTaskRunner(data_loader) func = getattr(taskrunner, args.func) kwargs = json.loads(args.kwargs) kwargs.update(