Tensorflow cluster 异步分布式集群使用说明
2017-04-16 20:24
323 查看
Tensorflow cluster 异步分布式集群使用说明
Tensorflow,cluster,异步训练一.Tensorflow 集群基本介绍
1.基本概念
Tensorflow的分布式并行基于gRPC通信框架,其中包括一个master负责创建Session.还有多个worker负责执行计算图中的任务。我们需要先创建一个Tensorflow Cluster对象,它包含了一组task(每个task一般是一台单独的机器)用来分布式地执行Tensorflow的计算图。一个Cluster可以切分为多个job,一个job是指一类特定的任务,比如parameter server(PS)、worker、每一个job里可以包含多个task。我们需要为每一个task创建一个server,然后连接到cluster上,通常每个task会执行在不同的机器上,当然也可以一台机器上执行多个task(控制不同的GPU)。2.运行细节
A:在运行多GPU分布式并行程序之前,需要通过nvidia-smi命令查看GPU显存是否为0.或者占用很小,如果占用较大,分布式并行则无法成功,处于等待其它task状态之下。
B:在单机情况下执行多GPU分布式并行程序,需要同时打开多个terminal窗口。然后在不同terminal窗口按传入参数服务器,工作服务器的顺序分别执行不同的shell脚本传入对应参数。
C:shell脚本传入的参数为
tf.train.ClusterSpec,具体示例如下(集群时需要替换localhost为具体主机的IP地址):
CUDA_VISIBLE_DEVICES=0 python dis_func.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224,localhost:2226 --job_name=ps --task_index=0
3.函数拟合代码解
备注:以下代码来源于http://blog.csdn.net/guotong1988/article/details/53909844
import numpy as np import tensorflow as tf # Define parameters FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.') tf.app.flags.DEFINE_integer('steps_to_validate', 1000, 'Steps to validate and print loss') # For distributed tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") #Hyperparameters learning_rate = FLAGS.learning_rate steps_to_validate = FLAGS.steps_to_validate def main(_): ps_hosts = FLAGS.ps_hosts.split(",") #解析flags并通过tf.train.ClusterSpec配置Tensorflow集群 worker_hosts = FLAGS.worker_hosts.split(",") cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) #通过ClusterSpec以及当前任务创建Server server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index) #参数服务器只需要管理Tensorflow中的变量,不需要执行训练的过程。server.join()会一直停#在这条语句上 if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): global_step = tf.Variable(0, name='global_step', trainable=False) #define our model input = tf.placeholder("float") label = tf.placeholder("float") weight = tf.get_variable("weight", [1], tf.float32, initializer=tf.random_normal_initializer()) biase = tf.get_variable("biase", [1], tf.float32, initializer=tf.random_normal_initializer()) pred = tf.multiply(input, weight) + biase #计算loss loss_value = loss(label, pred) train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss_value, global_step=global_step) init_op = tf.initialize_all_variables() saver = tf.train.Saver() #tf.train.Supervisor管理训练深度学习模型的通用功能 sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), init_op=init_op, saver=saver, global_step=global_step, save_model_secs=60) with sv.managed_session(server.target) as sess: step = 0 while step < 1000000: train_x = np.random.randn(1) train_y = 2 * train_x + np.random.randn(1) * 0.33 + 10 _, loss_v, step = sess.run([train_op, loss_value,global_step], feed_dict={input:train_x, label:train_y}) if step % steps_to_validate == 0: w,b = sess.run([weight,biase]) print("step: %d, weight: %f, biase: %f, loss: %f" %(step, w, b, loss_v)) sv.stop() def loss(label, pred): return tf.square(label - pred) if __name__ == "__main__": tf.app.run()
4.分布式代码框架
#coding=utf-8 import numpy as np import tensorflow as tf #Define parameters FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.') tf.app.flags.DEFINE_integer('steps_to_validate', 1000, 'Steps to validate and print loss') # For distributed tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") #Hyperparameters learning_rate = FLAGS.learning_rate steps_to_validate = FLAGS.steps_to_validate def main(_): ps_hosts = FLAGS.ps_hosts.split(",") #解析flags并通过tf.train.ClusterSpec配置Tensorflow集群 worker_hosts = FLAGS.worker_hosts.split(",") cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) #通过ClusterSpec以及当前任务创建Server server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index) #参数服务器只需要管理Tensorflow中的变量,不需要执行训练的过程。server.join()会一直停 #在这条语句上 if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)): global_step = tf.Variable(0, name='global_step', trainable=False) #define our model #在以下定义我们的模型即可 #计算loss loss_value = loss(label, pred) train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss_value, global_step=global_step) init_op = tf.initialize_all_variables() saver = tf.train.Saver() #tf.train.Supervisor管理训练深度学习模型的通用功能 sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), init_op=init_op, saver=saver, global_step=global_step, save_model_secs=60) with sv.managed_session(server.target) as sess: step = 0 while step < 1000000: #传入参数 _, loss_v = sess.run([train_op, loss_value], feed_dict={input:train_x, label:train_y}) sv.stop() def loss(label, pred): # define our loss function** return tf.square(label - pred) if __name__ == "__main__": tf.app.run()
5.shell脚本(以下示例为单机多GPU情况下的使用,具体为4块GPU,第一块PS,第二,三,四为WORKER )
1.由于共在四个设备(GPUS)上运行分布式程序,故有四个shell脚本文件,具体如下所示:function1.sh
function2.sh
function3.sh
function4.sh
2.function1.sh如下:
CUDA_VISIBLE_DEVICES=0 python dis_func.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224,localhost:2226 --job_name=ps --task_index=0
3.function2.sh如下:
CUDA_VISIBLE_DEVICES=1 python dis_func.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224,localhost:2226 --job_name=worker --task_index=0
4.function3.sh如下:
CUDA_VISIBLE_DEVICES=2 python dis_func.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224,localhost:2226 --job_name=worker --task_index=1
4.function3.sh如下:
CUDA_VISIBLE_DEVICES=3 python dis_func.py --ps_hosts=localhost:2222 --worker_hosts=localhost:2223,localhost:2224,localhost:2226 --job_name=worker --task_index=2
最后打开四个terminal终端,分别在每个终端中按1.2.3.4的顺序分别运行这四个shell脚本文件
# in terminal 1
bash function1.sh
# in terminal 2
bash function2.sh
# in terminal 3
bash function3.sh
# in terminal 4
bash function4.sh
相关文章推荐
- 分布式缓存集群方案特性使用场景(Memcache/Redis(Twemproxy/Codis/Redis-cluster))优缺点对比及选型
- 异步套接字基础:select函数以及FD_ZERO、FD_SET、FD_CLR、FD_ISSET使用说明[转]
- 异步套接字基础:select函数以及FD_ZERO、FD_SET、FD_CLR、FD_ISSET使用说明
- 使用mod_cluster进行apache httpd server和jboss eap 6.1集群配置
- 同步和异步有何异同,在什么情况下分别使用他们?举例说明。
- OpenStack Swift集群与Keystone的整合使用说明
- Hadoop中的集群配置和使用技巧——分布式计算开源框架Hadoop入门实践(二)
- OpenStack Swift集群与Keystone的整合使用说明
- 宝贝鱼(CshBBrain)集群配置使用说明
- 支付宝接口使用文档说明 支付宝异步通知(notify_url)与return_url.
- 异步socket函数使用说明
- 异步套接字基础:select函数以及FD_ZERO、FD_SET、FD_CLR、FD_ISSET使用说明
- 支付宝接口使用文档说明 支付宝异步通知(notify_url)与return_url
- OpenStack Swift集群与Keystone的整合使用说明
- 异步套接字基础:select函数以及FD_ZERO、FD_SET、FD_CLR、FD_ISSET使用说明
- 支付宝接口使用文档说明 支付宝异步通知(notify_url)与return_url
- linux 集群+oracle10.2 Cluster 在使用过程当中出现ORA-12519报错
- 王家林 第六讲Hadoop图文训练课程:使用HDFS命令行工具操作Hadoop分布式集群初体验
- JBoss 系列二:使用Apache httpd(mod_cluster)和JBoss构架高可用集群环境
- 支付宝接口使用文档说明 支付宝异步通知(notify_url)与return_url