您的位置:首页 > 其它

深度学习(八)RBM受限波尔兹曼机学习-未完待续

2015-10-07 15:16 405 查看
RBM受限波尔兹曼机学习

原文地址

作者:hjimce

一、相关理论

"受限波尔兹曼"这名字听起来就霸气,算法如其名,也挺难的。之所以难,是因为我们大部分人都没学过概率图模型,其实RBM是条件随机场的变体,所以如果学习这个算法,建议先把CRF给熟悉了,那么学起来就会轻松很多。

未完待续……

二、基础知识
三、算法概率
四、源码实现

#coding=utf-8
import timeit

try:
import PIL.Image as Image
except ImportError:
import Image

import numpy

import theano
import theano.tensor as T
import os

from theano.tensor.shared_randomstreams import RandomStreams

from utils import tile_raster_images
from logistic_sgd import load_data

#RBM网络结构设置,主要是参数初始化
class RBM(object):
def __init__(self,input=None,n_visible=784,n_hidden=500,W=None,hbias=None,vbias=None,numpy_rng=None,theano_rng=None):

self.n_visible = n_visible
self.n_hidden = n_hidden

if numpy_rng is None:
numpy_rng = numpy.random.RandomState(1234)
if theano_rng is None:
theano_rng = RandomStreams(numpy_rng.randint(2 ** 30))
#参数初始化公式 -4*sqrt(6./(n_visible+n_hidden))
if W is None:
initial_W = numpy.asarray(
numpy_rng.uniform(
low=-4 * numpy.sqrt(6. / (n_hidden + n_visible)),
high=4 * numpy.sqrt(6. / (n_hidden + n_visible)),
size=(n_visible, n_hidden)
),
dtype=theano.config.floatX
)
#在GPU上,多线程权重共享
W = theano.shared(value=initial_W, name='W', borrow=True)
#隐藏层偏置b初始化为0
if hbias is None:
hbias = theano.shared(value=numpy.zeros(n_hidden,dtype=theano.config.floatX),name='hbias',borrow=True)
#可见层偏置项b初始化为0
if vbias is None:
vbias = theano.shared(value=numpy.zeros(n_visible,dtype=theano.config.floatX),name='vbias',borrow=True)

#网络输入
self.input = input
if not input:
self.input = T.matrix('input')

self.W = W
self.hbias = hbias
self.vbias = vbias
self.theano_rng = theano_rng
#网络的所有参数,我们把它们放在一个列表中,以便后续访问
self.params = [self.W, self.hbias, self.vbias]

#能量函数的定义,主要是用于计算可视层的能量
def free_energy(self, v_sample):
''' Function to compute the free energy '''
wx_b = T.dot(v_sample, self.W) + self.hbias
vbias_term = T.dot(v_sample, self.vbias)
hidden_term = T.sum(T.log(1 + T.exp(wx_b)), axis=1)
return -hidden_term - vbias_term
#前向传导 从可视层到隐藏层,计算p(h=1|v)
def propup(self, vis):
pre_sigmoid_activation = T.dot(vis, self.W) + self.hbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
#根据可视层状态 计算隐藏层状态
def sample_h_given_v(self, v0_sample):
#计算隐藏层每个神经元为状态1的概率,即 p(h=1|v)
pre_sigmoid_h1, h1_mean = self.propup(v0_sample)
#根据给定的概率,进行采样,就像抛硬币一样。n表示抛硬币的次数 ,每个神经元随机采样一次,就可以得到每个神经元的状态了
#p是生成1的概率,binomial函数生成的是一个0、1数
h1_sample = self.theano_rng.binomial(size=h1_mean.shape,n=1,p=h1_mean,dtype=theano.config.floatX)
return [pre_sigmoid_h1, h1_mean, h1_sample]
#后向传导,从隐藏层到可视层,计算p(v=1|h)
def propdown(self, hid):
pre_sigmoid_activation = T.dot(hid, self.W.T) + self.vbias
return [pre_sigmoid_activation, T.nnet.sigmoid(pre_sigmoid_activation)]
#根据隐藏层的状态计算可视层状态
def sample_v_given_h(self, h0_sample):
pre_sigmoid_v1, v1_mean = self.propdown(h0_sample)
v1_sample = self.theano_rng.binomial(size=v1_mean.shape,
n=1, p=v1_mean,
dtype=theano.config.floatX)
return [pre_sigmoid_v1, v1_mean, v1_sample]
#计算从隐藏层-》可视层-》隐藏层的一个状态转移过程,相当于一次的Gibbs sampling采样
def gibbs_hvh(self, h0_sample):
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h0_sample)
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v1_sample)
return [pre_sigmoid_v1, v1_mean, v1_sample,
pre_sigmoid_h1, h1_mean, h1_sample]
#计算从可视层-》隐藏层-》可视层的状态转移过程,相当于一次的Gibbs sampling采样
def gibbs_vhv(self, v0_sample):
pre_sigmoid_h1, h1_mean, h1_sample = self.sample_h_given_v(v0_sample)
pre_sigmoid_v1, v1_mean, v1_sample = self.sample_v_given_h(h1_sample)
return [pre_sigmoid_h1, h1_mean, h1_sample,
pre_sigmoid_v1, v1_mean, v1_sample]

# k用于设置Gibbs sampling采样次数,也就是相当于来回跑了多少次(来回一趟算一次)
def get_cost_updates(self, lr=0.1, persistent=None, k=1):

# 当我们输入数据的时候,首先根据输入x,计算隐藏层的概率分布,概率分布的采样结果
pre_sigmoid_ph, ph_mean, ph_sample = self.sample_h_given_v(self.input)

# decide how to initialize persistent chain:
# for CD, we use the newly generate hidden sample
# for PCD, we initialize from the old state of the chain
if persistent is None:
chain_start = ph_sample
else:
chain_start = persistent
#让函数来回跑k次
(
[
pre_sigmoid_nvs,
nv_means,
nv_samples,
pre_sigmoid_nhs,
nh_means,
nh_samples
],
updates
) = theano.scan(self.gibbs_hvh,outputs_info=[None, None, None, None, None, chain_start],n_steps=k)
#拿到最后一次循环的状态
chain_end = nv_samples[-1]
#构造损失函数
cost = T.mean(self.free_energy(self.input)) - T.mean(
self.free_energy(chain_end))
#计算梯度,然后进行梯度下降更新
gparams = T.grad(cost, self.params, consider_constant=[chain_end])
for gparam, param in zip(gparams, self.params):
updates[param] = param - gparam * T.cast(lr,dtype=theano.config.floatX)

if persistent:
# Note that this works only if persistent is a shared variable
updates[persistent] = nh_samples[-1]
# pseudo-likelihood is a better proxy for PCD
monitoring_cost = self.get_pseudo_likelihood_cost(updates)
else:
# reconstruction cross-entropy is a better proxy for CD
monitoring_cost = self.get_reconstruction_cost(updates,pre_sigmoid_nvs[-1])

return monitoring_cost, updates
# end-snippet-4

def get_pseudo_likelihood_cost(self, updates):
"""Stochastic approximation to the pseudo-likelihood"""

# index of bit i in expression p(x_i | x_{\i})
bit_i_idx = theano.shared(value=0, name='bit_i_idx')

# binarize the input image by rounding to nearest integer
xi = T.round(self.input)

# calculate free energy for the given bit configuration
fe_xi = self.free_energy(xi)

# flip bit x_i of matrix xi and preserve all other bits x_{\i}
# Equivalent to xi[:,bit_i_idx] = 1-xi[:, bit_i_idx], but assigns
# the result to xi_flip, instead of working in place on xi.
xi_flip = T.set_subtensor(xi[:, bit_i_idx], 1 - xi[:, bit_i_idx])

# calculate free energy with bit flipped
fe_xi_flip = self.free_energy(xi_flip)

# equivalent to e^(-FE(x_i)) / (e^(-FE(x_i)) + e^(-FE(x_{\i})))
cost = T.mean(self.n_visible * T.log(T.nnet.sigmoid(fe_xi_flip -
fe_xi)))

# increment bit_i_idx % number as part of updates
updates[bit_i_idx] = (bit_i_idx + 1) % self.n_visible

return cost

def get_reconstruction_cost(self, updates, pre_sigmoid_nv):
"""Approximation to the reconstruction error

Note that this function requires the pre-sigmoid activation as
input.  To understand why this is so you need to understand a
bit about how Theano works. Whenever you compile a Theano
function, the computational graph that you pass as input gets
optimized for speed and stability.  This is done by changing
several parts of the subgraphs with others.  One such
optimization expresses terms of the form log(sigmoid(x)) in
terms of softplus.  We need this optimization for the
cross-entropy since sigmoid of numbers larger than 30. (or
even less then that) turn to 1. and numbers smaller than
-30. turn to 0 which in terms will force theano to compute
log(0) and therefore we will get either -inf or NaN as
cost. If the value is expressed in terms of softplus we do not
get this undesirable behaviour. This optimization usually
works fine, but here we have a special case. The sigmoid is
applied inside the scan op, while the log is
outside. Therefore Theano will only see log(scan(..)) instead
of log(sigmoid(..)) and will not apply the wanted
optimization. We can not go and replace the sigmoid in scan
with something else also, because this only needs to be done
on the last step. Therefore the easiest and more efficient way
is to get also the pre-sigmoid activation as an output of
scan, and apply both the log and sigmoid outside scan such
that Theano can catch and optimize the expression.

"""

cross_entropy = T.mean(
T.sum(
self.input * T.log(T.nnet.sigmoid(pre_sigmoid_nv)) +
(1 - self.input) * T.log(1 - T.nnet.sigmoid(pre_sigmoid_nv)),
axis=1
)
)

return cross_entropy

#
def test_rbm(learning_rate=0.1, training_epochs=15,
dataset='mnist.pkl.gz', batch_size=20,
n_chains=20, n_samples=10, output_folder='rbm_plots',
n_hidden=500):

datasets = load_data(dataset)

train_set_x, train_set_y = datasets[0]
test_set_x, test_set_y = datasets[2]

#计算训练的批数
n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size

index = T.lscalar()    # index to a [mini]batch
x = T.matrix('x')  # the data is presented as rasterized images

rng = numpy.random.RandomState(123)
theano_rng = RandomStreams(rng.randint(2 ** 30))

persistent_chain = theano.shared(numpy.zeros((batch_size, n_hidden),dtype=theano.config.floatX),borrow=True)

#网络构建 ,可视层神经元个数为28*28,隐藏层神经元为500
rbm = RBM(input=x, n_visible=28 * 28,n_hidden=n_hidden, numpy_rng=rng, theano_rng=theano_rng)

# get the cost and the gradient corresponding to one step of CD-15
cost, updates = rbm.get_cost_updates(lr=learning_rate,persistent=persistent_chain, k=15)

#################################
#     Training the RBM          #
#################################
if not os.path.isdir(output_folder):
os.makedirs(output_folder)
os.chdir(output_folder)

# start-snippet-5
# it is ok for a theano function to have no output
# the purpose of train_rbm is solely to update the RBM parameters
train_rbm = theano.function([index],cost,updates=updates,givens={x: train_set_x[index * batch_size: (index + 1) * batch_size]},name='train_rbm')

plotting_time = 0.
start_time = timeit.default_timer()

# go through training epochs
for epoch in xrange(training_epochs):

# go through the training set
mean_cost = []
for batch_index in xrange(n_train_batches):
mean_cost += [train_rbm(batch_index)]

print 'Training epoch %d, cost is ' % epoch, numpy.mean(mean_cost)

# Plot filters after each training epoch
plotting_start = timeit.default_timer()
# Construct image from the weight matrix
image = Image.fromarray(
tile_raster_images(
X=rbm.W.get_value(borrow=True).T,
img_shape=(28, 28),
tile_shape=(10, 10),
tile_spacing=(1, 1)
)
)
image.save('filters_at_epoch_%i.png' % epoch)
plotting_stop = timeit.default_timer()
plotting_time += (plotting_stop - plotting_start)

end_time = timeit.default_timer()

pretraining_time = (end_time - start_time) - plotting_time

print ('Training took %f minutes' % (pretraining_time / 60.))
# end-snippet-5 start-snippet-6
#################################
#     Sampling from the RBM     #
#################################
# find out the number of test samples
number_of_test_samples = test_set_x.get_value(borrow=True).shape[0]

# pick random test examples, with which to initialize the persistent chain
test_idx = rng.randint(number_of_test_samples - n_chains)
persistent_vis_chain = theano.shared(
numpy.asarray(
test_set_x.get_value(borrow=True)[test_idx:test_idx + n_chains],
dtype=theano.config.floatX
)
)
# end-snippet-6 start-snippet-7
plot_every = 1000
# define one step of Gibbs sampling (mf = mean-field) define a
# function that does `plot_every` steps before returning the
# sample for plotting
(
[
presig_hids,
hid_mfs,
hid_samples,
presig_vis,
vis_mfs,
vis_samples
],
updates
) = theano.scan(
rbm.gibbs_vhv,
outputs_info=[None, None, None, None, None, persistent_vis_chain],
n_steps=plot_every
)

# add to updates the shared variable that takes care of our persistent
# chain :.
updates.update({persistent_vis_chain: vis_samples[-1]})
# construct the function that implements our persistent chain.
# we generate the "mean field" activations for plotting and the actual
# samples for reinitializing the state of our persistent chain
sample_fn = theano.function(
[],
[
vis_mfs[-1],
vis_samples[-1]
],
updates=updates,
name='sample_fn'
)

# create a space to store the image for plotting ( we need to leave
# room for the tile_spacing as well)
image_data = numpy.zeros(
(29 * n_samples + 1, 29 * n_chains - 1),
dtype='uint8'
)
for idx in xrange(n_samples):
# generate `plot_every` intermediate samples that we discard,
# because successive samples in the chain are too correlated
vis_mf, vis_sample = sample_fn()
print ' ... plotting sample ', idx
image_data[29 * idx:29 * idx + 28, :] = tile_raster_images(
X=vis_mf,
img_shape=(28, 28),
tile_shape=(1, n_chains),
tile_spacing=(1, 1)
)

# construct image
image = Image.fromarray(image_data)
image.save('samples.png')
# end-snippet-7
os.chdir('../')

if __name__ == '__main__':
test_rbm()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: