三种循环神经网络(RNN)算法的实现(From scratch、Theano、Keras)
2016-12-30 23:00
751 查看
前言
正文
RNN From Scratch
RNN Using Theano
RNN Using Keras
后记
“由简至繁,再而至简!”
经过一段时间的学习,我初步了解了RNN的基本原理和实现方法,在这里列出三种不同的RNN实现方法,以供参考。
RNN的原理在网上能找到很多,我这里就不说了,说出来也不会比那些更好,这里先推荐一个RNN教程,讲的很好,四个post看完基本就能自己实现RNN了。
更多代码参考github
更多代码参考github
另:GRU版本的Theano代码参考github
更多代码参考github
此外,不要一开始就直接用Keras这些封装好的库,而是要先去了解RNN底层的原理和计算公式,这样对RNN才能掌握地更加透彻。而且这些封装库并不是万能的,当模型比较复杂时,有些功能通过这些高度封装的库是没有办法实现的,还是要通过theano或tensorflow自己实现。
正文
RNN From Scratch
RNN Using Theano
RNN Using Keras
后记
“由简至繁,再而至简!”
前言
跳过废话,直接看正文经过一段时间的学习,我初步了解了RNN的基本原理和实现方法,在这里列出三种不同的RNN实现方法,以供参考。
RNN的原理在网上能找到很多,我这里就不说了,说出来也不会比那些更好,这里先推荐一个RNN教程,讲的很好,四个post看完基本就能自己实现RNN了。
正文
RNN From Scratch
import nltk import csv import itertools import numpy as np from utils import * import operator from datetime import datetime import sys class RNNNumpy: def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4): # Assign instance variables self.word_dim = word_dim self.hidden_dim = hidden_dim self.bptt_truncate = bptt_truncate # Randomly initialize the network parameters self.U = np.random.uniform(-np.sqrt(1./word_dim), np.sqrt(1./word_dim), (hidden_dim, word_dim)) self.V = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (word_dim, hidden_dim)) self.W = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (hidden_dim, hidden_dim)) def forward_propagation(self, x): # The total number of time steps T = len(x) # During forward propagation we save all hidden states in s because need them later. # We add one additional element for the initial hidden, which we set to 0 s = np.zeros((T + 1, self.hidden_dim)) s[-1] = np.zeros(self.hidden_dim) # The outputs at each time step. Again, we save them for later. o = np.zeros((T, self.word_dim)) # For each time step... for t in np.arange(T): # Note that we are indxing U by x[t]. This is the same as multiplying U with a one-hot vector. s[t] = np.tanh(self.U[:,x[t]] + self.W.dot(s[t-1])) o[t] = softmax(self.V.dot(s[t])) return [o, s] def predict(self, x): # Perform forward propagation and return index of the highest score o, s = self.forward_propagation(x) return np.argmax(o, axis=1) def calculate_total_loss(self, x, y): L = 0 # For each sentence... for i in np.arange(len(y)): o, s = self.forward_propagation(x[i]) # We only care about our prediction of the "correct" words correct_word_predictions = o[np.arange(len(y[i])), y[i]] # Add to the loss based on how off we were L += -1 * np.sum(np.log(correct_word_predictions)) return L def calculate_loss(self, x, y): # Divide the total loss by the number of training examples N = np.sum((len(y_i) for y_i in y)) return self.calculate_total_loss(x,y)/N def bptt(self, x, y): T = len(y) # Perform forward propagation o, s = self.forward_propagation(x) # We accumulate the gradients in these variables dLdU = np.zeros(self.U.shape) dLdV = np.zeros(self.V.shape) dLdW = np.zeros(self.W.shape) delta_o = o delta_o[np.arange(len(y)), y] -= 1. # For each output backwards... for t in np.arange(T)[::-1]: dLdV += np.outer(delta_o[t], s[t].T) # Initial delta calculation delta_t = self.V.T.dot(delta_o[t]) * (1 - (s[t] ** 2)) # Backpropagation through time (for at most self.bptt_truncate steps) for bptt_step in np.arange(max(0, t-self.bptt_truncate), t+1)[::-1]: # print "Backpropagation step t=%d bptt step=%d " % (t, bptt_step) dLdW += np.outer(delta_t, s[bptt_step-1]) dLdU[:,x[bptt_step]] += delta_t # Update delta for next step delta_t = self.W.T.dot(delta_t) * (1 - s[bptt_step-1] ** 2) return [dLdU, dLdV, dLdW] def gradient_check(self, x, y, h=0.001, error_threshold=0.01): # Calculate the gradients using backpropagation. We want to checker if these are correct. bptt_gradients = self.bptt(x, y) # List of all parameters we want to check. model_parameters = ['U', 'V', 'W'] # Gradient check for each parameter for pidx, pname in enumerate(model_parameters): # Get the actual parameter value from the mode, e.g. model.W parameter = operator.attrgetter(pname)(self) print "Performing gradient check for parameter %s with size %d." % (pname, np.prod(parameter.shape)) # Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ... it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: ix = it.multi_index # Save the original value so we can reset it later original_value = parameter[ix] # Estimate the gradient using (f(x+h) - f(x-h))/(2*h) parameter[ix] = original_value + h gradplus = self.calculate_total_loss([x],[y]) parameter[ix] = original_value - h gradminus = self.calculate_total_loss([x],[y]) estimated_gradient = (gradplus - gradminus)/(2*h) # Reset parameter to original value parameter[ix] = original_value # The gradient for this parameter calculated using backpropagation backprop_gradient = bptt_gradients[pidx][ix] # calculate The relative error: (|x - y|/(|x| + |y|)) relative_error = np.abs(backprop_gradient - estimated_gradient)/(np.abs(backprop_gradient) + np.abs(estimated_gradient)) # If the error is to large fail the gradient check if relative_error > error_threshold: print "Gradient Check ERROR: parameter=%s ix=%s" % (pname, ix) print "+h Loss: %f" % gradplus print "-h Loss: %f" % gradminus print "Estimated_gradient: %f" % estimated_gradient print "Backpropagation gradient: %f" % backprop_gradient print "Relative Error: %f" % relative_error return it.iternext() print "Gradient check for parameter %s passed." % (pname) # Performs one step of SGD. def sgd_step(self, x, y, learning_rate): # Calculate the gradients dLdU, dLdV, dLdW = self.bptt(x, y) # Change parameters according to gradients and learning rate self.U -= learning_rate * dLdU self.V -= learning_rate * dLdV self.W -= learning_rate * dLdW # Outer SGD Loop # - model: The RNN model instance # - X_train: The training data set # - y_train: The training data labels # - learning_rate: Initial learning rate for SGD # - nepoch: Number of times to iterate through the complete dataset # - evaluate_loss_after: Evaluate the loss after this many epochs def train_with_sgd(self, X_train, y_train, learning_rate=0.005, nepoch=100, evaluate_loss_after=5): # We keep track of the losses so we can plot them later losses = [] num_examples_seen = 0 for epoch in range(nepoch): # Optionally evaluate the loss if (epoch % evaluate_loss_after == 0): loss = self.calculate_loss(X_train, y_train) losses.append((num_examples_seen, loss)) time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') print "%s: Loss after num_examples_seen=%d epoch=%d: %f" % (time, num_examples_seen, epoch, loss) # Adjust the learning rate if loss increases if (len(losses) > 1 and losses[-1][1] > losses[-2][1]): learning_rate = learning_rate * 0.5 print "Setting learning rate to %f" % learning_rate sys.stdout.flush() # ADDED! Saving model oarameters save_model_parameters_numpy("./data/rnn-numpy-%d-%d-%s.npz" % (self.hidden_dim, self.word_dim, time), self) # For each training example... for i in range(len(y_train)): # One SGD step self.sgd_step(X_train[i], y_train[i], learning_rate) num_examples_seen += 1
更多代码参考github
RNN Using Theano
import numpy as np import theano as theano import theano.tensor as T from utils import * import operator from datetime import datetime import sys class RNNTheano: def __init__(self, word_dim, hidden_dim=100, bptt_truncate=4): # Assign instance variables self.word_dim = word_dim self.hidden_dim = hidden_dim self.bptt_truncate = bptt_truncate # Randomly initialize the network parameters U = np.random.uniform(-np.sqrt(1./word_dim), np.sqrt(1./word_dim), (hidden_dim, word_dim)) V = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (word_dim, hidden_dim)) W = np.random.uniform(-np.sqrt(1./hidden_dim), np.sqrt(1./hidden_dim), (hidden_dim, hidden_dim)) # Theano: Created shared variables self.U = theano.shared(name='U', value=U.astype(theano.config.floatX)) self.V = theano.shared(name='V', value=V.astype(theano.config.floatX)) self.W = theano.shared(name='W', value=W.astype(theano.config.floatX)) # We store the Theano graph here self.theano = {} self.__theano_build__() def __theano_build__(self): U, V, W = self.U, self.V, self.W x = T.ivector('x') y = T.ivector('y') def forward_prop_step(x_t, s_t_prev, U, V, W): s_t = T.tanh(U[:,x_t] + W.dot(s_t_prev)) o_t = T.nnet.softmax(V.dot(s_t)) return [o_t[0], s_t] [o,s], updates = theano.scan( forward_prop_step, sequences=x, outputs_info=[None, dict(initial=T.zeros(self.hidden_dim))], non_sequences=[U, V, W], truncate_gradient=self.bptt_truncate, strict=True) prediction = T.argmax(o, axis=1) o_error = T.sum(T.nnet.categorical_crossentropy(o, y)) # Gradients dU = T.grad(o_error, U) dV = T.grad(o_error, V) dW = T.grad(o_error, W) # Assign functions self.forward_propagation = theano.function([x], o) self.predict = theano.function([x], prediction) self.ce_error = theano.function([x, y], o_error) self.bptt = theano.function([x, y], [dU, dV, dW]) # SGD learning_rate = T.scalar('learning_rate') self.sgd_step = theano.function([x,y,learning_rate], [], updates=[(self.U, self.U - learning_rate * dU), (self.V, self.V - learning_rate * dV), (self.W, self.W - learning_rate * dW)]) def calculate_total_loss(self, X, Y): return np.sum([self.ce_error(x,y) for x,y in zip(X,Y)]) def calculate_loss(self, X, Y): # Divide calculate_loss by the number of words num_words = np.sum([len(y) for y in Y]) return self.calculate_total_loss(X,Y)/float(num_words) def train_with_sgd(self, X_train, y_train, learning_rate=0.005, nepoch=1, evaluate_loss_after=5): # We keep track of the losses so we can plot them later losses = [] num_examples_seen = 0 for epoch in range(nepoch): # Optionally evaluate the loss if (epoch % evaluate_loss_after == 0): loss = self.calculate_loss(X_train, y_train) losses.append((num_examples_seen, loss)) time = datetime.now().strftime('%Y-%m-%d-%H-%M-%S') print "%s: Loss after num_examples_seen=%d epoch=%d: %f" % (time, num_examples_seen, epoch, loss) # Adjust the learning rate if loss increases if (len(losses) > 1 and losses[-1][1] > losses[-2][1]): learning_rate = learning_rate * 0.5 print "Setting learning rate to %f" % learning_rate sys.stdout.flush() # ADDED! Saving model oarameters save_model_parameters_theano("./data/rnn-theano-%d-%d-%s.npz" % (self.hidden_dim, self.word_dim, time), self) # For each training example... for i in range(len(y_train)): # One SGD step self.sgd_step(X_train[i], y_train[i], learning_rate) num_examples_seen += 1 def gradient_check_theano(model, x, y, h=0.001, error_threshold=0.01): # Overwrite the bptt attribute. We need to backpropagate all the way to get the correct gradient model.bptt_truncate = 1000 # Calculate the gradients using backprop bptt_gradients = model.bptt(x, y) # List of all parameters we want to chec. model_parameters = ['U', 'V', 'W'] # Gradient check for each parameter for pidx, pname in enumerate(model_parameters): # Get the actual parameter value from the mode, e.g. model.W parameter_T = operator.attrgetter(pname)(model) parameter = parameter_T.get_value() print "Performing gradient check for parameter %s with size %d." % (pname, np.prod(parameter.shape)) # Iterate over each element of the parameter matrix, e.g. (0,0), (0,1), ... it = np.nditer(parameter, flags=['multi_index'], op_flags=['readwrite']) while not it.finished: ix = it.multi_index # Save the original value so we can reset it later original_value = parameter[ix] # Estimate the gradient using (f(x+h) - f(x-h))/(2*h) parameter[ix] = original_value + h parameter_T.set_value(parameter) gradplus = model.calculate_total_loss([x],[y]) parameter[ix] = original_value - h parameter_T.set_value(parameter) gradminus = model.calculate_total_loss([x],[y]) estimated_gradient = (gradplus - gradminus)/(2*h) parameter[ix] = original_value parameter_T.set_value(parameter) # The gradient for this parameter calculated using backpropagation backprop_gradient = bptt_gradients[pidx][ix] # calculate The relative error: (|x - y|/(|x| + |y|)) relative_error = np.abs(backprop_gradient - estimated_gradient)/(np.abs(backprop_gradient) + np.abs(estimated_gradient)) # If the error is to large fail the gradient check if relative_error > error_threshold: print "Gradient Check ERROR: parameter=%s ix=%s" % (pname, ix) print "+h Loss: %f" % gradplus print "-h Loss: %f" % gradminus print "Estimated_gradient: %f" % estimated_gradient print "Backpropagation gradient: %f" % backprop_gradient print "Relative Error: %f" % relative_error return it.iternext() print "Gradient check for parameter %s passed." % (pname)
更多代码参考github
另:GRU版本的Theano代码参考github
RNN Using Keras
from __future__ import print_function from keras.models import Sequential from keras.layers import Dense, Activation, Dropout from keras.layers import LSTM from keras.optimizers import RMSprop from keras.utils.data_utils import get_file import numpy as np import random import sys class RNNKeras: def __init__(self, sentenceLen, vector_size, output_size, hidden_dim=100): # Assign instance variables self.sentenceLen = sentenceLen self.vector_size = vector_size self.output_size = output_size self.hidden_dim = hidden_dim self.__model_build__() def __model_build__(self): self.model = Sequential() self.model.add(LSTM(self.output_size, input_shape=(self.sentenceLen, self.vector_size))) self.model.add(Dense(self.vector_size))) self.model.add(Activation('softmax')) optimizer = RMSprop(lr=0.01) self.model.compile(loss='categorical_crossentropy', optimizer=optimizer) def train_model(self, X, y, batchSize=128, nepoch=1): self.model.fit(X, y, batch_size=batchSize, nb_epoch=nepoch) def predict(self, x): return model.predict(x, verbose=0)[0]
更多代码参考github
后记
近几年深度学习的研究和应用越来越热,随着CNN、RNN的出现,研究DBN和SAE的人也越来越少了。不过要想用好神经网络,了解DBN和SAE还是有必要的,我也得抽空再学学CNN,有时间再把这篇整理一下,添加点说明文字。此外,不要一开始就直接用Keras这些封装好的库,而是要先去了解RNN底层的原理和计算公式,这样对RNN才能掌握地更加透彻。而且这些封装库并不是万能的,当模型比较复杂时,有些功能通过这些高度封装的库是没有办法实现的,还是要通过theano或tensorflow自己实现。
相关文章推荐
- 三种循环神经网络(RNN)算法的实现(From scratch、Theano、Keras)
- 循环神经网络教程4-用Python和Theano实现GRU/LSTM RNN, Part 4 – Implementing a GRU/LSTM RNN with Python and Theano
- 循环神经网络教程第二部分-用python,numpy,theano实现一个RNN
- 循环神经网络教程 第四部分 用Python 和 Theano实现GRU/LSTM RNN
- YJango的循环神经网络——实现LSTM YJango的循环神经网络——实现LSTM YJango YJango 7 个月前 介绍 描述最常用的RNN实现方式:Long-Short Term Me
- 循环神经网络教程第四部分-用Python和Theano实现GRU/LSTM循环神经网络
- 各框架下(tensorflow, pytorch, theano, keras)实现几个基础结构神经网络(mlp, autoencoder, CNNs, recurrent, recursive)
- 基于循环神经网络实现基于字符的语言模型(char-level RNN Language Model)-tensorflow实现
- 循环神经网络在Python 、Numpy和Theano中的实现
- 循环神经网络教程-第二部分 用python numpy theano实现RNN
- RNN-LSTM循环神经网络-03Tensorflow进阶实现
- pytorch从头开始实现一个RNN(循环神经网络)
- 从0开始用python实现神经网络 IMPLEMENTING A NEURAL NETWORK FROM SCRATCH IN PYTHON – AN INTRODUCTION
- 深度学习(07)_RNN-循环神经网络-02-Tensorflow中的实现
- 深度学习之循环神经网络RNN概述,双向LSTM实现字符识别
- 对比学习用 Keras 搭建 CNN RNN 等常用神经网络和其后端切换(Theano和TensorFlow)
- 循环神经网络RNN以及LSTM的推导和实现
- RNN循环神经网络中的权重更新算法-BPTT
- 【Deep Learning】循环神经网络(RNN)推导和实现
- 深度学习(08)_RNN-LSTM循环神经网络-03-Tensorflow进阶实现