您的位置:首页 > 其它

1TensorFlow实现自编码器-1.4 TensorFlow实现降噪自动编码器--运行会话,训练模型

2017-10-27 11:13 686 查看
import numpy as np
import sklearn.preprocessing as prep
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'

# Xvaier均匀初始化
# fan_in是输入节点的数量,fan_out是输出节点的数量。
def xavier_init(fan_in, fan_out, constant=1):
low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
high = constant * np.sqrt(6.0 / (fan_in + fan_out))
return tf.random_uniform((fan_in, fan_out), minval=low, maxval=high, dtype=tf.float32)

# 加性高斯噪声的自动编码
class AdditiveGaussianNoiseAutoencoder(object):
# 在初始的数据中加入高斯噪声。在实现降噪自编码器的时候,
# 只是在输入加进去的时候,在输入上加上高斯噪声就行
# 其他的部分和基本自编码器一样
# n_input:输入变量数;n_hidden:隐含层节点数;transfer_function:隐含层激活函数;optimizer:优化器;scale:高斯噪声系数;
# Class内的scale声明成一个占位符placeholder,参数初始化则使用了接下来定义的_initialize_weights函数。我们只用了一个隐含层。
def __init__(self, n_input, n_hidden, transfer_function=tf.nn.softplus, optimizer=tf.train.AdamOptimizer(),scale=0.1):
self.n_input = n_input
self.n_hidden = n_hidden
# n_input,n_hidden都是输入和隐藏层的维度
self.transfer = transfer_function
# self.scale = tf.placeholder(tf.float32)
self.training_scale = scale
# scale 就是一个标量
network_weights = self._initialize_weights()
self.weights = network_weights

# model
with tf.name_scope("RawInput"):
self.x = tf.placeholder(tf.float32, [None, self.n_input])
# none不给定具体的值,它由输入的数目来决定

with tf.name_scope("NoiseAdder"):
self.scale = tf.placeholder(tf.float32)
self.noise_x = self.x + self.scale * tf.random_normal((n_input,))

with tf.name_scope("Encoder"):
self.hidden = self.transfer(tf.add(tf.matmul(self.noise_x,self.weights['w1']), self.weights['b1']))
# 在输入的时候,在输入的数据上加上一些高斯噪声,
# tf.random_normal((n_input,)) 默认给的是一个均值为0,标准差是1的正态分布的随机数。

with tf.name_scope("Reconstruction"):
self.reconstruction = tf.add(tf.matmul(self.hidden, self.weights['w2']), self.weights['b2'])

# x:一维的数量为n_input的placeholder;
# 建立一个能提取特征的隐含层:
# 先对输入x添加高斯噪声,即self.x + scale * tf.random_normal((n_input,));
#  再用tf.matmul()让被噪声污染的信号与隐藏层的权重相乘,再用tf.add()添加偏置;
# 最后使用transfer()对加权汇总结果进行激活函数处理。

# 经过隐藏层后,在输出层进行数据复原和重建操作,即建立reconstruction层,这时候就不需要激活函数了,
# 直接将隐含层的输出self.hidden乘以输出层的权重w2再加上输出层的偏置b2。

# cost
with tf.name_scope("Loss"):
self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(self.reconstruction, self.x), 2.0))

with tf.name_scope("Train"):
self.optimizer = optimizer.minimize(self.cost)

init = tf.global_variables_initializer()
self.sess = tf.Session()
self.sess.run(init)
print("begin ti run session...")

def _initialize_weights(self):
all_weights = dict()
all_weights['w1'] = tf.Variable(xavier_init(self.n_input, self.n_hidden),name='weight1')
# 在初始化的时候,就只是第一个权重矩阵是赋予一些随机值,其他的都是赋予全0矩阵
all_weights['b1'] = tf.Variable(tf.zeros([self.n_hidden], dtype=tf.float32),name='bias1')
all_weights['w2'] = tf.Variable(tf.zeros([self.n_hidden, self.n_input], dtype=tf.float32),name='weight2')
all_weights['b2'] = tf.Variable(tf.zeros([self.n_input], dtype=tf.float32),name='bias2')
return all_weights
#变量初始化函数:创建一个dicts,包含所有Variable类型的变量w1,b1,w2,b2
#隐藏层:w1用前面写的xavier_init函数初始化,传入输入节点数和隐含节点数,即可得到一个比较适合softplus等激活函数的初始状态,b1初始化为0

#在一个批次上训练模型
def partial_fit(self,X):
cost,opt = self.sess.run((self.cost,self.optimizer),feed_dict={self.x:X,self.scale:self.training_scale})
return cost

#在给定的样本集合上计算损失(用于测试阶段)
def calc_total_cost(self,X):
return self.sess.run(self.cost,feed_dict={self.x:X,self.scale:self.training_scale})

#返回自编码器隐含层的输出结果,获得抽象后的高阶特征表示
def transform(self,X):
return self.sess.run(self.hidden,feed_dict={self.x:X,self.scale:self.training_scale})

#将隐藏层的高阶特征作为输入,将其重建为原始输入数据
def generate(self,hidden=None):
if hidden == None:
hidden = np.random.normal(size=self.weights['b1'])
return self.sess.run(self.reconstruction,feed_dict={self.hidden:hidden})

#整体运行一遍复原过程,包括提取高阶特征以及重建原始数据,输入原始数据,输出复原后的数据
def reconstruction(self,X):
return self.sess.run(self.reconstruction,feed_dict={self.x:X,self.scale:self.training_scale})

#获取隐含层的权重
def getWeights(self):
return self.sess.run(self.weights['w1'])

#获取隐含层的偏置
def getBiases(self):
return self.sess.run(self.weights['b1'])

print('产生AdditiveGaussianNoiseAutoencoder类对象实例')
#产生一个AdditiveGaussianNoiseAutoencoder类的对象实例,调用tf.summary.FileWriter把计算图写入文件,使用TensorBoard查看
AGN_AutoEncoder = AdditiveGaussianNoiseAutoencoder(n_input=784,n_hidden=200,transfer_function=tf.nn.softplus,
optimizer=tf.train.AdamOptimizer(learning_rate=0.01),scale=0.01)

print('把计算图写入事件文件,在TensorBoard里面查看')
writer = tf.summary.FileWriter(logdir='logs',graph=AGN_AutoEncoder.sess.graph)
writer.close()

#读取数据集
mnist = input_data.read_data_sets('MNIST_data/',one_hot=True)
# 考虑多类情况。非onehot,标签是类似0  1  2   3...n这样。而onehot标签则是顾名思义,一个长度为n的数组,只有一个元素是1.0,其他元素是0.0。
# 例如在n为4的情况下,标签2对应的onehot标签就是 0.0  0.0   1.0  0.0使用onehot的直接原因是现在多分类cnn网络的输出通常是softmax层,
# 而它的输出是一个概率分布,从而要求输入的标签也以概率分布的形式出现,进而算交叉熵之类。

#使用sklearn.preprocess的数据标准化操作(0均值标准差为1)预处理数据
#首先在训练集上估计均值与方差,然后将其作用到训练集和测试集
#StandardScaler:尺度标准化
def standard_scale(X_train,X_test):
preprocessor = prep.StandardScaler().fit(X_train)
X_train = preprocessor.transform(X_train)
X_test = preprocessor.transform(X_test)
return X_train,X_test
#获取随机block数据的函数:取一个0到len(data) -  batch_size的随机整数
#以这个随机整数为起始索引,抽出一个batch_size的批次的样本

def get_random_block_from_data(data,batch_size):
start_index = np.random.randint(0,len(data) - batch_size)
return  data[start_index:(start_index + batch_size)]

#使用标准化操作变换数据集,
X_train,X_test = standard_scale(mnist.train.images,mnist.test.images)

#定义训练参数
n_samples = int(mnist.train.num_examples)#训练样本的总数
training_epochs = 20 #最大训练轮数,每n_samples/batch_size个批次为1轮
batch_size = 128     #每个批次的样本数量
display_step = 1     #输出训练结果的间隔

#下面开始训练过程,在每一轮epoch训练开始时,将平均损失avg_cost设为0
#计算总共需要的batch数量(样本总数/batch_size),这里使用的是有放回抽样,
#所以不能保证每个样本被抽到并参与训练

for epoch in range(training_epochs):
avg_cost = 0
total_batch = int(n_samples / batch_size)
#在每个batch的循环中,随机抽取一个batch的数据,使用成员函数partial_fit训练这个batch的数据,
#计算cost,累积到当前回合的平均cost中
for i in range(total_batch):
batch_xs = get_random_block_from_data(X_train,batch_size)
cost = AGN_AutoEncoder.partial_fit(batch_xs)
avg_cost += cost / batch_size
avg_cost /= total_batch

if epoch % display_step == 0:
print("epoch : %04d,cost = %.9f" % (epoch+1,avg_cost))

print("Total cost:",str(AGN_AutoEncoder.calc_total_cost(X_test)))


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐