tensorflow学习笔记(十九):分布式Tensorflow
2017-02-27 15:03
295 查看
分布式Tensorflow
最近在学习怎么分布式Tensorflow训练深度学习模型,看官网教程看的云里雾里,最终结合着其它资料,终于对分布式Tensorflow有了些初步了解.gRPC (google remote procedure call)
分布式Tensorflow底层的通信是gRPC
gRPC首先是一个RPC,即
远程过程调用,通俗的解释是:假设你在本机上执行一段代码
num=add(a,b),它调用了一个过程
call,然后返回了一个值
num,你感觉这段代码只是在本机上执行的, 但实际情况是,本机上的
add方法是将参数打包发送给服务器,然后服务器运行服务器端的
add方法,返回的结果再将数据打包返回给客户端.
Cluster.Job.Task
Job是Task的集合.Cluster是Job的集合
为什么要分成Cluster,Job,和Task呢?
首先,我们介绍一下Task:Task就是主机上的一个进程,在大多数情况下,一个机器上只运行一个Task.
为什么
Job是
Task的集合呢? 在分布式深度学习框架中,我们一般把
Job划分为
Parameter和
Worker,
Parameter Job是管理参数的存储和更新工作.
Worker Job是来运行
ops.如果参数的数量太大,一台机器处理不了,这就要需要多个
Tasks.
Cluster是
Jobs的集合:
Cluster(集群),就是我们用的集群系统了
如何创建集群
从上面的描述我们可以知道,组成Cluster的基本单位是
Task(动态上理解,主机上的一个进程,从静态的角度理解,
Task就是我们写的代码).我们只需编写
Task代码,然后将代码运行在不同的主机上,这样就构成了
Cluster(集群)
如何编写Task
代码
首先,Task需要知道集群上都有哪些主机,以及它们都监听什么端口.
tf.train.ClusterSpec()就是用来描述这个.
tf.train.ClusterSpec({ "worker": [ "worker_task0.example.com:2222",# /job:worker/task:0 运行的主机 "worker_task1.example.com:2222",# /job:worker/task:1 运行的主机 "worker_task2.example.com:2222"# /job:worker/task:3 运行的主机 ], "ps": [ "ps_task0.example.com:2222", # /job:ps/task:0 运行的主机 "ps_task1.example.com:2222" # /job:ps/task:0 运行的主机 ]})
这个
ClusterSec告诉我们,我们这个
Cluster(集群)有两个
Job(worker.ps),
worker中有三个
Task(即,有三个
Task执行
Tensorflow op操作)
然后,将
ClusterSpec当作参数传入到
tf.train.Server()中,同时指定此
Task的
Job_name和
task_index.
#jobName和taskIndex是函数运行时,通过命令行传递的参数 server = tf.train.Server(cluster, job_name=jobName, task_index=taskIndex)
下面代码描述的是,一个
cluster中有一个
Job,叫做(
worker), 这个
job有两个
task,这两个
task是运行在两个主机上的
#在主机(10.1.1.1)上,实际是运行以下代码 cluster = tf.train.ClusterSpec({"worker": ["10.1.1.1:2222", "10.1.1.2:3333"]}) server = tf.train.Server(cluster, job_name="local", task_index=0) #在主机(10.1.1.2)上,实际运行以下代码 cluster = tf.train.ClusterSpec({"worker": ["10.1.1.1:2222", "10.1.1.2:3333"]}) server = tf.train.Server(cluster, job_name="local", task_index=1)
tf.trian.Server干了些什么呢?
首先,一个
tf.train.Server包含了: 本地设备(GPUs,CPUs)的集合,可以连接到到其它
task的
ip:port(存储在
cluster中), 还有一个
session target用来执行分布操作.还有最重要的一点就是,它创建了一个服务器,监听
port端口,如果有数据传过来,他就会在本地执行(启动
session target,调用本地设备执行运算),然后结果返回给调用者.
我们继续来写我们的
task代码:在你的
model中指定分布式设备
with tf.device("/job:ps/task:0"): weights_1 = tf.Variable(...) biases_1 = tf.Variable(...) with tf.device("/job:ps/task:1"): weights_2 = tf.Variable(...) biases_2 = tf.Variable(...) with tf.device("/job:worker/task:0"): #映射到主机(10.1.1.1)上去执行 input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) with tf.device("/job:worker/task:1"): #映射到主机(10.1.1.2)上去执行 input, labels = ... layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1) logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2) # ... train_op = ... with tf.Session("grpc://10.1.1.2:3333") as sess:#在主机(10.1.1.2)上执行run for _ in range(10000): sess.run(train_op)
with tf.Session("grpc://..")是指定
gprc://..为
master,
master将
op分发给对应的
task
写分布式程序时,我们需要关注一下问题:
(1) 使用
In-graph replication还是
Between-graph replication
In-graph replication:一个
client(显示调用tf::Session的进程),将里面的
参数和
ops指定给对应的
job去完成.数据分发只由一个
client完成.
Between-graph replication:下面的代码就是这种形式,有很多独立的
client,各个
client构建了相同的
graph(包含参数,通过使用
tf.train.replica_device_setter,将这些参数映射到
ps_server上.)
(2)
同步训练,还是
异步训练
Synchronous training:在这种方式中,每个
graph的副本读取相同的
parameter的值,并行的计算
gradients,然后将所有计算完的
gradients放在一起处理.
Tensorlfow提供了函数(
tf.train.SyncReplicasOptimizer)来处理这个问题(在
Between-graph replication情况下),在
In-graph replication将所有的
gradients平均就可以了
Asynchronous training:自己计算完
gradient就去更新
paramenter,不同
replica之间不会去协调进度
(3)
一个完整的例子,来自官网链接:
import tensorflow as tf # Flags for defining the tf.train.ClusterSpec tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs") # Flags for defining the tf.train.Server tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") FLAGS = tf.app.flags.FLAGS
由于是相同的代码运行在不同的主机上,所以要传入
job_name和
task_index加以区分,而
ps_hosts和
worker_hosts对于所有主机来说,都是一样的,用来描述集群的
def main(_): ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") # Create a cluster from the parameter server and worker hosts. cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) # Create and start a server for the local task. server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index) if FLAGS.job_name == "ps": server.join()
我们都知道,服务器进程如果执行完的话,服务器就会关闭.为了是我们的
ps_server能够一直处于监听状态,我们需要使用
server.join().这时,进程就会
block在这里.至于为什么
ps_server刚创建就
join呢:原因是因为下面的代码会将
参数指定给
ps_server保管,所以
ps_server静静的监听就好了.
elif FLAGS.job_name == "worker": # Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
tf.train.replica_device_setter(ps_tasks=0, ps_device='/job:ps', worker_device='/job:worker', merge_devices=True, cluster=None, ps_ops=None)),返回值可以被
tf.device使用,指明下面代码中
variable和
ops放置的设备.
example:
# To build a cluster with two ps jobs on hosts ps0 and ps1, and 3 worker # jobs on hosts worker0, worker1 and worker2. cluster_spec = { "ps": ["ps0:2222", "ps1:2222"], "worker": ["worker0:2222", "worker1:2222", "worker2:2222"]} with tf.device(tf.replica_device_setter(cluster=cluster_spec)): # Build your graph v1 = tf.Variable(...) # assigned to /job:ps/task:0 v2 = tf.Variable(...) # assigned to /job:ps/task:1 v3 = tf.Variable(...) # assigned to /job:ps/task:0 # Run compute
这个例子是没有指定参数worker_device
和ps_device
的,你可以手动指定
继续代码注释,下面就是,模型的定义了# Build model...variables and ops loss = ... global_step = tf.Variable(0) train_op = tf.train.AdagradOptimizer(0.01).minimize( loss, global_step=global_step) saver = tf.train.Saver() summary_op = tf.merge_all_summaries() init_op = tf.initialize_all_variables() # Create a "supervisor", which oversees the training process. sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), logdir="/tmp/train_logs", init_op=init_op, summary_op=summary_op, saver=saver, global_step=global_step, save_model_secs=600) # The supervisor takes care of session initialization, restoring from # a checkpoint, and closing when done or an error occurs. with sv.managed_session(server.target) as sess: # Loop until the supervisor shuts down or 1000000 steps have completed. step = 0 while not sv.should_stop() and step < 1000000: # Run a training step asynchronously. # See `tf.train.SyncReplicasOptimizer` for additional details on how to # perform *synchronous* training. _, step = sess.run([train_op, global_step]) # Ask for all the services to stop. sv.stop()
考虑一个场景(
Between-graph),我们有一个
parameter server(存放着参数的副本),有好几个
worker server(分别保存着相同的
graph的副本).更通俗的说,我们有10台电脑,其中一台作为
parameter server,其余九台作为
worker server.因为同一个程序在10台电脑上同时运行(不同电脑,
job_name,
task_index不同),所以每个
worker server上都有我们建立的
graph的副本(
replica).这时我们可以使用
Supervisor帮助我们管理各个
process.
Supervisor的
is_chief参数很重要,它指明用哪个
task进行参数的初始化工作.
sv.managed_session(server.target)创建一个被
sv管理的
session
if __name__ == "__main__": tf.app.run()
To start the trainer with two parameter servers and two workers, use the following command line (assuming the script is called trainer.py):
# On ps0.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=0 # On ps1.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=ps --task_index=1 # On worker0.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=0 # On worker1.example.com: $ python trainer.py \ --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \ --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \ --job_name=worker --task_index=1
可以看出,我们只需要写一个程序,在不同的主机上,传入不同的参数使其运行
下篇将要介绍如何分布式计算深度学习模型
参考博客:
[1] http://weibo.com/ttarticle/p/show?id=2309403987407065210809
[2] http://weibo.com/ttarticle/p/show?id=2309403988813608274928
[3] http://blog.csdn.net/luodongri/article/details/52596780
相关文章推荐
- tensorflow学习笔记(十九):分布式Tensorflow
- TensorFlow学习笔记(十九) 基本算术运算和Reduction归约计算
- 天下国家之事,败于小人者十一,败于君子者十九(转)
- 旅游指南之十九----大理
- .NET基础示例系列之十九:Dundas For ASP.NET
- 数学之美 系列十九 - 马尔可夫链的扩展 贝叶斯网络 (Bayesian Networks)
- AgoBot 僵尸网络研究笔记(十九)
- PC Camera 开发日志(十九)------- 各种流行皮肤控件库
- Visual Studio 2008 每日提示(十九)
- 通过100个单词掌握英语语法(十九)do/does
- 软考网络规划设计师学习笔记连载之十九(3.1网络服务器)
- 数据库安全性之网络传输安全性.十九
- Linux Bash Shell学习(十九):String I/O——read
- WCF RIA 服务 (十九)-- 共享代码 2
- 【连载】【FPGA黑金开发板】Verilog HDL那些事儿--串口发送|接收 封装(十九)
- shell实例(十九) ---nl,bc命令
- 《当程序员的那些狗日日子》(十九)新的天空下
- 200佳优秀的国外创意网站推荐(系列十九)
- Dave Python 练习十九 -- 网络客户端编程
- 【iOS-Cocos2d游戏开发之十九】游戏数据存储的四种常用方式; 推荐