堆栈自编码器
2021-12-21 21:01
127 查看
参考文档
堆栈自编码器
Part1 加载项目需要的包
import torch from torch import nn, optim, functional, utils import torchvision from torchvision import datasets, utils import time, os,sys from common.datas import get_mnist_loader from torch.utils.data.dataloader import DataLoader from torchvision.datasets import MNIST
Part2 导入数据集
def get_mnist_loader(batch_size=100, shuffle=True): """ :return: train_loader, test_loader """ train_dataset = MNIST(root='../data', train=True, transform=torchvision.transforms.ToTensor(), download=True) test_dataset = MNIST(root='../data', train=False, transform=torchvision.transforms.ToTensor(), download=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=shuffle) test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=shuffle) return train_loader, test_loader
Part3 自编码器层
class AutoEncoderLayer(nn.Module): """ fully-connected linear layers for stacked autoencoders. This module can automatically be trained when training each layer is enabled Yes, this is much like the simplest auto-encoder """ def __init__(self, input_dim=None, output_dim=None, SelfTraining=False): super(AutoEncoderLayer, self).__init__() # if input_dim is None or output_dim is None:raise ValueError self.in_features = input_dim self.out_features = output_dim self.is_training_self = SelfTraining # 指示是否进行逐层预训练,还是训练整个网络 self.encoder = nn.Sequential( nn.Linear(self.in_features, self.out_features, bias=True), nn.Sigmoid() # 统一使用Sigmoid激活 ) self.decoder = nn.Sequential( # 此处decoder不使用encoder的转置, 并使用Sigmoid进行激活. nn.Linear(self.out_features, self.in_features, bias=True), nn.Sigmoid() ) def forward(self, x): out = self.encoder(x) # 重点------- if self.is_training_self: return self.decoder(out) else: return out def lock_grad(self): for param in self.parameters(): param.requires_grad = False def acquire_grad(self): for param in self.parameters(): param.requires_grad = True @property def input_dim(self): return self.in_features @property def output_dim(self): return self.out_features @property def is_training_layer(self): return self.is_training_self @is_training_layer.setter def is_training_layer(self, other: bool): self.is_training_self = other
Part 4 堆栈自编码器
class StackedAutoEncoder(nn.Module): """ Construct the whole network with layers_list > 栈式自编码器的架构一般是关于中间隐层对称的 """ def __init__(self, layers_list=None): super(StackedAutoEncoder, self).__init__() self.layers_list = layers_list self.initialize() self.encoder_1 = self.layers_list[0] self.encoder_2 = self.layers_list[1] self.encoder_3 = self.layers_list[2] self.encoder_4 = self.layers_list[3] def initialize(self): for layer in self.layers_list: # assert isinstance(layer, AutoEncoderLayer) layer.is_training_layer = False # for param in layer.parameters(): # param.requires_grad = True def forward(self, x): out = x # for layer in self.layers_list: # out = layer(out) out = self.encoder_1(out) out = self.encoder_2(out) out = self.encoder_3(out) out = self.encoder_4(out) return out
Part5 参数设置
num_tranin_layer_epochs = 20 num_tranin_whole_epochs = 50 batch_size = 100 shuffle = True
Part6 单层训练
def train_layers(layers_list=None, layer=None, epoch=None, validate=True): """ 逐层贪婪预训练 --当训练第i层时, 将i-1层冻结 """ if torch.cuda.is_available(): for model in layers_list: model.cuda() train_loader, test_loader = get_mnist_loader(batch_size=batch_size, shuffle=True) optimizer = optim.SGD(layers_list[layer].parameters(), lr=0.001) criterion = nn.BCELoss() # train for epoch_index in range(epoch): sum_loss = 0. # 冻结当前层之前的所有层的参数 --第0层没有前置层 if layer != 0: for index in range(layer): layers_list[index].lock_grad() layers_list[index].is_training_layer = False # 除了冻结参数,也要设置冻结层的输出返回方式 for batch_index, (train_data, _) in enumerate(train_loader): # 生成输入数据 if torch.cuda.is_available(): train_data = train_data.cuda() # 注意Tensor放到GPU上的操作方式,和model不同 out = train_data.view(train_data.size(0), -1) # 对前(layer-1)冻结了的层进行前向计算 if layer != 0: for lay in range(layer): out = layers_list[lay](out) # 训练第layer层,该层是完整的自编码器 pred = layers_list[layer](out) optimizer.zero_grad() loss = criterion(pred, out) sum_loss += loss loss.backward() optimizer.step() if (batch_index + 1) % 10 == 0: print("Train Layer: {}, Epoch: {}/{}, Iter: {}/{}, Loss: {:.4f}".format( layer, (epoch_index + 1), epoch, (batch_index + 1), len(train_loader), loss )) if validate: pass
Part6 训练所有层
def train_whole(model=None, epoch=50, validate=True): print(">> start training whole model") if torch.cuda.is_available(): model.cuda() # 解锁因预训练单层而冻结的参数 for param in model.parameters(): param.require_grad = True train_loader, test_loader = get_mnist_loader(batch_size=batch_size, shuffle=shuffle) optimizer = optim.SGD(model.parameters(), lr=0.001) # criterion = BCELoss() criterion = nn.MSELoss() # 生成/保存原始test图片 --取一个batch_size test_data, _ = next(iter(test_loader)) torchvision.utils.save_image(test_data, './test_images/real_test_images.png') # train for epoch_index in range(epoch): sum_loss = 0. for batch_index, (train_data, _) in enumerate(train_loader): if torch.cuda.is_available(): train_data = train_data.cuda() x = train_data.view(train_data.size(0), -1) out = model(x) optimizer.zero_grad() loss = criterion(out, x) sum_loss += loss loss.backward() optimizer.step() if (batch_index + 1) % 10 == 0: print("Train Whole, Epoch: {}/{}, Iter: {}/{}, Loss: {:.4f}".format( (epoch_index + 1), epoch, (batch_index + 1), len(train_loader), loss )) if batch_index == len(train_loader) - 1: torchvision.utils.save_image(out.view(100, 1, 28, 28), "./test_images/out_{}_{}.png".format(epoch_index, batch_index)) # 每个epoch验证一次 if validate: if torch.cuda.is_available(): test_data = test_data.cuda() x = test_data.view(test_data.size(0), -1) out = model(x) loss = criterion(out, x) print("Test Epoch: {}/{}, Iter: {}/{}, test Loss: {}".format( epoch_index + 1, epoch, (epoch_index + 1), len(test_loader), loss )) image_tensor = out.view(batch_size, 1, 28, 28) torchvision.utils.save_image(image_tensor, './test_images/test_image_epoch_{}.png'.format(epoch_index)) print("<< end training whole model")
Part7 训练
if __name__ == '__main__': import os if not os.path.exists('test_images'): os.mkdir('test_images') if not os.path.exists('models'): os.mkdir('models') nun_layers = 5 encoder_1 = AutoEncoderLayer(784, 256, SelfTraining=True) encoder_2 = AutoEncoderLayer(256, 64, SelfTraining=True) decoder_3 = AutoEncoderLayer(64, 256, SelfTraining=True) decoder_4 = AutoEncoderLayer(256, 784, SelfTraining=True) layers_list = [encoder_1, encoder_2, decoder_3, decoder_4] # 按照顺序对每一层进行预训练 for level in range(nun_layers - 1): print("current level = ",level ) train_layers(layers_list=layers_list, layer=level, epoch=num_tranin_layer_epochs, validate=True) # 统一训练 SAE_model = StackedAutoEncoder(layers_list=layers_list) train_whole(model=SAE_model, epoch=num_tranin_whole_epochs, validate=True) # 保存模型 refer: https://pytorch.org/docs/master/notes/serialization.html torch.save(SAE_model, './models/sae_model.pt')
相关文章推荐
- [论文学习]2——Variable-Wise Weighted SAE (VW-SAE) 可变量加权堆栈自编码器
- 计算机存储及堆栈(相关资料来自百度,及其它)
- Android——DEBUG 堆栈
- arm堆栈操作
- 5-5 堆栈操作合法性 (20分)
- 调试技巧之调用堆栈 - Call stack
- 函数调用堆栈
- backtrace-----打印进程异常退出时的堆栈信息
- c++的内存管理 之 堆栈
- C的堆栈,内存分配
- ucos堆栈和任务控制块初始化
- 深入分析任务切换与堆栈
- 函数调用堆栈--读书笔记2
- linux信号机制 - 用户堆栈和内核堆栈的变化
- 堆栈的区别
- java中堆(heap)和堆栈(stack)区别深入讲解
- 面向对象堆栈及队列的实现
- 献给汇编初学者-函数调用堆栈变化分析
- 调试android源码之打堆栈加重写控件
- tslang开发语法解析器-一个函数的堆栈处理设计流程