Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
porteratzo committed Nov 29, 2023
1 parent 8767f24 commit 2632b28
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 7 deletions.
41 changes: 40 additions & 1 deletion openfl-tutorials/experimental/Federeated_Pytorch_LLM_Horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from src.ptglue_inmemory import GlueMrpcFederatedDataLoader
import openfl.interface.workspace as workspace
import os
import shutil
import subprocess

WORKSPACE_PREFIX = os.path.join(os.path.expanduser("~"), ".local", "workspace")

Expand All @@ -22,12 +22,51 @@
#OPENFL_HOROVOD_DEMO_LOCALHOSTIP=STR with the IP address of the local node eg. "ip1"
#OPENFL_HOROVOD_DEMO_HOSTS=STR with the IP address of the each node and number of slots eg. "ip1:2,ip2,2"

NP = os.environ.get('OPENFL_HOROVOD_DEMO_NP','4')
NETWORK_INTERFACES = os.environ.get('OPENFL_HOROVOD_DEMO_NICS','localhost')
LOCAL_HOST = os.environ.get('OPENFL_HOROVOD_DEMO_LOCALHOSTIP','localhost')
HOSTS = os.environ.get('OPENFL_HOROVOD_DEMO_HOSTS','localhost:4')

print('NP:', NP)
print('NETWORK_INTERFACES:', NETWORK_INTERFACES)
print('LOCAL_HOST:', LOCAL_HOST)
print('HOSTS:', HOSTS)

def propogate_workspace():
remote_hosts = [
i.split(":")[0] for i in HOSTS.split(",") if i.split(":")[0] != LOCAL_HOST
]
for rem_host in remote_hosts:
result = subprocess.run(
[
"scp",
"-r",
WORKSPACE_PREFIX,
rem_host
+ ":" +
WORKSPACE_PREFIX.replace('workspace',''),
],
capture_output=True,
)
print([
"scp",
"-r",
WORKSPACE_PREFIX,
rem_host
+ ":" +
WORKSPACE_PREFIX,
])
if result.returncode != 0:
raise RuntimeError(result.stderr)

def main():
print(WORKSPACE_PREFIX)
log_level = "INFO"
log_file = None
workspace.create(WORKSPACE_PREFIX, "torch_llm")
os.chdir(WORKSPACE_PREFIX)
sys.path.append(WORKSPACE_PREFIX)
propogate_workspace()
fx.setup_logging(level=log_level, log_file=log_file)
num_collaborators = 1

Expand Down
142 changes: 136 additions & 6 deletions openfl-workspace/torch_llm/src/pt_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
LOCAL_HOST = os.environ.get('OPENFL_HOROVOD_DEMO_LOCALHOSTIP','localhost')
HOSTS = os.environ.get('OPENFL_HOROVOD_DEMO_HOSTS','localhost:4')

print('NP:', NP)
print('NETWORK_INTERFACES:', NETWORK_INTERFACES)
print('LOCAL_HOST:', LOCAL_HOST)
print('HOSTS:', HOSTS)


class LLMTaskRunner(PyTorchTaskRunner):
def __init__(
Expand Down Expand Up @@ -219,7 +224,6 @@ def validate(
"round_num": round_num,
"input_tensor_dict": None,
"use_tqdm": use_tqdm,
"use_horovod": True,
}
result = self.launch_horovod(data_path, state_path, out_path, horovod_kwags)

Expand Down Expand Up @@ -264,14 +268,13 @@ def train_batches(
local_output_dict: Tensors to maintain in the local TensorDB
"""
self.rebuild_model(round_num, input_tensor_dict)
state_path, out_path, data_path = self.save_modelstate(col_name, round_num, 'validate', kwargs)
state_path, out_path, data_path = self.save_modelstate(col_name, round_num, 'train_batches', kwargs)
self.propogate_modelstate(state_path)
horovod_kwags = {
"col_name": col_name,
"round_num": round_num,
"input_tensor_dict": None,
"use_tqdm": use_tqdm,
"use_horovod": True,
}
result = self.launch_horovod(data_path, state_path, out_path, horovod_kwags)
if result.returncode != 0:
Expand Down Expand Up @@ -411,7 +414,80 @@ def save_native(
pt.save(pickle_dict, filepath)


class InHorovodLLMTaskRunner(LLMTaskRunner):
class InHorovodLLMTaskRunner(PyTorchTaskRunner):

def __init__(
self,
data_loader,
base_model_name="roberta-base",
device=None,
metric=None,
args=None,
**kwargs,
):
kwargs["data_loader"] = data_loader
super().__init__(device, **kwargs)
self.base_model_name = base_model_name
self.kwargs = kwargs
self.metric = load_metric("glue", "mrpc")
self._init_model()
self._init_optimizer()

def _init_model(self):
model = AutoModelForSequenceClassification.from_pretrained(
self.base_model_name, return_dict=True
)
peft_config = LoraConfig(
task_type=TaskType.SEQ_CLS,
inference_mode=False,
r=16,
lora_alpha=16,
lora_dropout=0.1,
bias="lora_only",
)
self.model = get_peft_model(model, peft_config)
self.model.to(self.device)

def _init_optimizer(self):
ALL_LAYERNORM_LAYERS = [nn.LayerNorm]
decay_parameters = get_parameter_names(self.model, ALL_LAYERNORM_LAYERS)
decay_parameters = [name for name in decay_parameters if "bias" not in name]

optimizer_grouped_parameters = [
{
"params": [
p
for n, p in self.model.named_parameters()
if (n in decay_parameters and p.requires_grad)
],
"weight_decay": 0.01,
},
{
"params": [
p
for n, p in self.model.named_parameters()
if (n not in decay_parameters and p.requires_grad)
],
"weight_decay": 0.0,
},
]
self.optimizer = AdamW(optimizer_grouped_parameters, lr=0.001)
self.lr_scheduler = get_scheduler(
name="linear",
optimizer=self.optimizer,
num_warmup_steps=0,
num_training_steps=len(self.data_loader.train_set) * 5,
)
self.initialize_tensorkeys_for_functions()

def train(self):
return self.model.train()

def state_dict(self):
return get_peft_model_state_dict(self.model)

def load_state_dict(self, state_dict: Mapping[str, Any], strict: bool = True):
return set_peft_model_state_dict(self.model, state_dict)

def load_state(self, kwargs):
checkpoint = torch.load(kwargs["state_path"])
Expand Down Expand Up @@ -515,6 +591,60 @@ def validate(

if hvd.rank() == 0:
torch.save({"output": result}, kwargs["out_path"])

def train_epoch(self, batch_generator) -> Metric:
"""Train single epoch.
Override this function in order to use custom training.
Args:
batch_generator: Train dataset batch generator. Yields (samples, targets) tuples of
size = `self.data_loader.batch_size`.
Returns:
Metric: An object containing name and np.ndarray value.
"""
losses = []
for sample in batch_generator:
self.model.zero_grad()
output = self.model(**sample)
loss = output.loss
loss.backward(loss)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.optimizer.step()
self.lr_scheduler.step()
losses.append(loss.detach().cpu().numpy())
loss = np.mean(losses)
return np.array(loss)

def save_native(
self,
filepath,
model_state_dict_key="model_state_dict",
optimizer_state_dict_key="optimizer_state_dict",
**kwargs,
):
"""
Save model and optimizer states in a picked file specified by the \
filepath. model_/optimizer_state_dicts are stored in the keys provided. \
Uses pt.save().
Args:
filepath (string) : Path to pickle file to be
created by pt.save().
model_state_dict_key (string) : key for model state dict
in pickled file.
optimizer_state_dict_key (string) : key for optimizer state
dict in picked file.
kwargs : unused
Returns:
None
"""
pickle_dict = {
model_state_dict_key: get_peft_model_state_dict(self.model),
optimizer_state_dict_key: self.optimizer.state_dict(),
}
pt.save(pickle_dict, filepath)


def get_args():
Expand Down Expand Up @@ -551,9 +681,9 @@ def get_args():
def main():
args = get_args()
data_loader = InHorovodGlueMrpcFederatedDataLoader(
data_path=args.data_path, batch_size=args.batch_size, use_horovod=True
data_path=args.data_path, batch_size=args.batch_size
)
taskrunner = InHorovodLLMTaskRunner(data_loader, use_horovod=True)
taskrunner = InHorovodLLMTaskRunner(data_loader)
func = getattr(taskrunner, args.func)
kwargs = json.loads(args.kwargs)
kwargs.update(
Expand Down

0 comments on commit 2632b28

Please sign in to comment.