-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Comparison of Distributed Training Implementation
Framework | Glossary | Communication | Model Parallism |
---|---|---|---|
Caffe2 | data_parallel_model | Gloo | No |
TensorFlow | device | grpc | Yes |
Paddle | pserver/trainer | TCP | No |
Caffe2 uses a "storage op" to store variables at remote side. There are two storage op implementations including: "File store", "Redis store", under caffe2/distributed/
.These ops use "Gloo" as it's remote communication library.
Caffe2 defines it's distributed training procedures at caffe2/python/data_parallel_model.py
To run a training job parallel, we need to call: The remote storage coordinator is called "rendezvous". Sample code is at here
if num_shards > 1:
# Create rendezvous for distributed computation
store_handler = "store_handler"
if args.redis_host is not None:
# Use Redis for rendezvous if Redis host is specified
workspace.RunOperatorOnce(
core.CreateOperator(
"RedisStoreHandlerCreate", [], [store_handler],
host=args.redis_host,
port=args.redis_port,
prefix=args.run_id,
)
)
else:
# Use filesystem for rendezvous otherwise
workspace.RunOperatorOnce(
core.CreateOperator(
"FileStoreHandlerCreate", [], [store_handler],
path=args.file_store_path,
)
)
rendezvous = dict(
kv_handler=store_handler,
shard_id=shard_id,
num_shards=num_shards,
engine="GLOO",
exit_nets=None)
And then create the data_parallel_model
:
# Create parallelized model
data_parallel_model.Parallelize(
train_model,
input_builder_fun=add_image_input,
forward_pass_builder_fun=create_resnet50_model_ops,
optimizer_builder_fun=add_optimizer,
devices=gpus,
rendezvous=rendezvous,
optimize_gradient_memory=True,
cpu_device=args.use_cpu,
)
In TensorFlow, parameter server is described as a "device", which is returned by function replica_device_setter
at python/training/device_setter.py
. There are two types of devices: ps and worker. Users should put "Variables" on "ps" and put calculations on "workers":
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
sess.run(train_op)
And we define these devices by calling tf.train.ClusterSpec
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
To do distributed training, paddle needs to start sevaral pserver processes on different nodes, and then start trainers on another group of nodes, we tell pservers and trainers the distributed job configurations by specifing arguments to these processes, for pserver:
./pserver --use_gpu=False --port=7164 --ports_num=1 --num_of_gradient_servers=2 ...
For trainers:
paddle.init(use_gpu=False, port=7164, ports_num=1, is_local=False, num_of_gradient_servers=2)