目前一些模型API尚未迁移到TF20中。 eg: CRF,Seq2Seq等
如果退回TF10,有些伤。
倒不如转至Torch。
Pytorch的大部分思想和TF20大致相似。
至于安装,GPU我前面说过TF20。这里不赘述。
官档安装:https://pytorch.org/get-started/locally/#start-locally
本文几乎通篇以代码案例 和 注释标注 的方式解释API。(模型的训练效果不做考虑。只看语法)
你如果懂Tensorflow2.0(Stable),那么你看本文一定不费劲。
Torch和TF20 很像!!!
因此一些地方,我会列出 TF20 与 Torch的细节对比。
import torch from torch import nn, optim from torchvision import datasets, transforms from torch.utils.data import DataLoader
data_preprocess = transforms.Compose([ # 顶预定数据处理函数,类似map()里的函数句柄 transforms.Resize(28,28), # 变形 transforms.ToTensor(), # numpy 转 Tensor ]) trian_dataset = datasets.MNIST( # TF20在keras.datasets中,未归一化(0-255) '.', # 下载至当前目录, (图片0-1,已经被归一化了) train=True, # train=True, 代表直接给你切出 训练集 download=True, # True,若未下载,则先下载 transform=data_preprocess, # 指定数据预处理函数。第一行我们指定的 ) test_dataset = datasets.MNIST( '.', train=False, # False代表测试集 # 就说下这里, False代表 给你切出测试集 download=True, transform=data_preprocess, ) train = DataLoader( # 对应TF20中的 tf.data.Dataset对数据二次预处理(分批,乱序) trian_dataset, # 把上面第一次预处理的数据集 加载进来 batch_size=16, # mini-batch shuffle=True, # 乱序,增强模型泛化能力 ) test = DataLoader( test_dataset, batch_size=16, shuffle=True, )
# 模型定义部分 class MyModel(nn.Module): # TF20是 tk.models.Model def __init__(self): # TF20 也是 __init__() super().__init__() self.model = nn.Sequential( # tk.models.Sequential , 并且 TF里面 需要加一个 [] nn.Linear(28*28, 256), # tk.layers.Dense(256) nn.ReLU(), # tk.layers.Relu() nn.Linear(256, 128), # tk.layers.Dense(128) nn.ReLU(), nn.Linear(128, 10), # tk.layers.Dense(10) ) def forward(self, x): # TF20是 __call__() x = x.view( x.size(0), 28*28 ) # x.view ==> tf.reshape x.size ==> x.shape[0] y_predict = self.model(x) return y_predict # -------------------------------华丽分割线--------------------------------- # 模型训练部分 def main(): vis = visdom.Visdom() model = MyModel() loss_ = nn.CrossEntropyLoss() # 会将 y_predict自动加一层 softmax optimizer = optim.Adam(model.parameters()) # TF20: model.trainable_variables # visdom可视化 # 这步是初始化坐标点,下面loss会用这个直接更新 vis.line( [0], # x坐标 [0], # y坐标 win='loss', # 窗口名称 opts={'title': 'loss'}, # 窗口标题 ) for epoch in range(10): # epochs for step, (x, y_true) in enumerate(train): y_predict = model(x) loss = loss_(y_predict, y_true) optimizer.zero_grad() # 优化器清零 loss.backward() # 梯度计算 optimizer.step() # 梯度下降更新 tp.gradient(loss, variables)。 # 在上面的定义的基础上更新追加画点-连成线 vis.line( [loss.item()], [step], win='loss', update='append', # 追加画点,而不是更新覆盖 ) print(loss.item()) # .item() => 相当于 tensorflow 的 numpy() if epoch % 2 == 0: total_correct_samples = 0 # 用于记录(预测正确的样本的 总数量) total_samples = 0 # 用于记录(样本的 总数量) for x_test, y_test in test: y_pred = model(x_test) y_final_pred = y_pred.argmax(dim=1) # TF20的坐标轴参数是 axis # 每一批是 batch_size=16,我们要把它们都加在一起 total_correct_samples += torch.eq(y_final_pred, y_test).float().sum().item() # 这里提一下 eq() 和 equal() 的返回值的区别, 自己看,我们通常用 eq # print( torch.equal( torch.Tensor([[1,2,3]]), torch.Tensor([[4,5,6]] ) ) ) #结果: False # print( torch.eq( torch.Tensor([[1,2,3]]), torch.Tensor([[4,5,6]] ) ) ) #结果: tensor([[0, 0, 0]], dtype=torch.uint8) per_sample = x_test.size(0) # 再说一次, size(0) 相当于TF xx.shape[0] # 获取每批次样本数量, 虽然我们知道是 16 # 但是最后一个batch_size 可能不是16,所以要准确获取。 total_samples += per_sample acc = total_correct_samples / total_samples print(f'epoch: {epoch}, loss: {loss}, acc: {acc}') # 测试部分 vis.line( [acc], [step], win='acc', update='append', # 追加画点,而不是更新覆盖 ) x, label = iter(test).next() target_predict = model(x).argmax(dim=1) # 画出测试集图片 viz.images(x, nrow=16, win="test_x", opts={'title': "test_x"}) vis.text( # 显示预测标签文本 str(target_predict.detach().numpy() ), win = 'target_predict', opts = {"title": target_predict} ) vis.text( # 显示真值文本 str(label.detach().numpy() ), win = 'target_true', opts = {"target_true": target_predict} ) main()
安装 pip install visdom 运行 python -m visdom.server (第一次可能会有点慢) # 语法和Tensorboard很像 使用 import visdom 见上代码 vis.xxxxx
模块导入和数据预处理部分和案例1的 MNIST一模一样。
只要稍稍修改 datasets.MNIST ==> datasets.CIFAR10 即可, 简单的不忍直视~~
模型定义部分:
class MyModel(nn.Module): # 温馨提示, 这是 Mmodule, 不是model def __init__(self): """ 先注明一下: TF中输入图片形状为 (样本数, 高,宽,图片通道) PyTorch中输入图片形状为 (样本数, 图片通道,高,宽) """ super().__init__() self.conv = nn.Sequential( # 再强调一遍,没有 [] nn.Conv2d( in_channels=3, # 对应TF 图片通道数(或者上一层通道) out_channels=8, # 对应TF filters, 卷积核数量 kernel_size=3, # 卷积核大小 stride=1, # 步长, TF 是 strides, 特别注意 padding=0, # no padding, 默认 ), nn.ReLU(), nn.MaxPool2d( kernel_size=3, # 滑动窗口大小 stride=None, # 默认为None, 意为和 kernel_size相同大小 ), nn.Conv2d( in_channels=8, # 对应TF 图片通道数(或者上一层通道) out_channels=16, # 对应TF filters, 卷积核数量 kernel_size=3, # 卷积核大小 stride=1, # 步长, TF 是 strides, 特别注意 padding=0, # no padding, 默认 ), nn.ReLU(), nn.MaxPool2d( kernel_size=2, # 滑动窗口大小 stride=None, # 默认为None, 意为和 kernel_size相同大小 ), ) self.dense = nn.Sequential( nn.Linear(16*4*4, 128), # 对应TF Dense nn.Linear(128, 64), nn.Linear(64, 10), ) def forward(self, x): conv_output = self.conv(x) # (16, 16, 4.4) conv_output_reshape = conv_output.view(-1, 16*4*4) dense_output = self.dense(conv_output_reshape) return dense_output
模型训练(模型调用+模型训练的定义)
def main(): vis = visdom.Visdom() epochs = 100 device = torch.device('cuda') # 预定义 GPU 槽位(一会往里面塞 模型和数据。) model = MyModel().to(device) # 模型转为 GPU 计算 # CrossEntropyLoss 会自动把下面的 dense_output ,也就是y_predict 加一层 softmax loss_ = nn.CrossEntropyLoss().to(device) optimizer = optim.Adam( model.parameters() ) for epoch in range(epochs): for step, (x_train, y_train) in enumerate(train): x_train, y_train = x_train.to(device), y_train.to(device) dense_output = model(x_train) loss = loss_(dense_output, y_train) optimizer.zero_grad() # 上一个例子提到过,梯度清零 loss.backward() # 反向传播, 并将梯度累加到 optimizer中 optimizer.step() # 相当于做了 w = w - lr * 梯度 print(loss.item()) # item() 意思就是 tensor转numpy,TF中的 API是 xx.numpy() sample_correct_numbers = 0 sample_total_numbers = 0 with torch.no_grad(): # 测试部分不需要计算梯度,因此可以包裹在上下文中。 for x_test, y_test in test: x_test, y_test = x_test.to(device), y_test.to(device) # softmax 的 y_predict 与 y_test的 one-hot做交叉熵 y_predict = model(x_test).argmax(dim=1) sample_correct_numbers += torch.eq(y_predict, y_test).float().sum().item() sample_total_numbers += x_test.size(0) # 每批样本的总数加在一起 acc = sample_correct_numbers / sample_total_numbers print(acc) main()
上述结构说明:
1conv + (2+2+2+2)*2 + 1 fc = 18层 1conv + (3+4+6+3)*2 + 1 fc = 34层 1conv + (3+4+6+3)*3 + 1 fc = 50层 1conv + (3+4+23+3)*3 + 1 fc = 101层 1conv + (3+8+36+3)*3 + 1 fc = 152层
模块导入
import cv2 import torch from torch import nn, optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import visdom import torch.nn.functional as F
数据导入预处理
data_preprocess = transforms.Compose([ transforms.Resize(32,32), transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ]) train_dataset = datasets.CIFAR10( '.', train=True, download=True, transform=data_preprocess, ) test_dataset = datasets.CIFAR10( '.', train=False, # False代表测试集 download=True, transform=data_preprocess, ) train = DataLoader( train_dataset, batch_size=16, shuffle=True, ) test = DataLoader( test_dataset, batch_size=16, shuffle=True, )
基础块定义(BasicBlock):
class BasicBlock(nn.Module): """单个残差块 2个卷积+2个BN""" def __init__(self, input_channel, output_channel, stride=1): super().__init__() self.major = nn.Sequential( # 第一个Conv的步长为指定步长,允许降采样,允许输出输出通道不一致 nn.Conv2d(input_channel,output_channel,kernel_size=3,stride=stride, padding=1), nn.BatchNorm2d(output_channel), nn.ReLU(inplace=True), # 第二个Conv的步长为定长1, 输入输出通道不变(缓冲输出) nn.Conv2d(output_channel, output_channel, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(output_channel), # 第二个Conv就不用ReLU了, 因为一会需要和 x加在一起,最后最一层大的Relu ) # 若输入通道==输出通道,且步长为1,意味着图片未被降采样,则残差网络课直接为普通网络 self.shortcut = nn.Sequential() # 若输入输出通道不匹配,这时需要将图片做同样的变换,才能加在一起。 if input_channel != output_channel or stride != 1: self.shortcut = nn.Sequential( nn.Conv2d( input_channel, output_channel, kernel_size=(1,1), stride = stride ), nn.BatchNorm2d(output_channel) ) def forward(self, x): major_out = self.major(x) # 主干网络的输出 shotcut_out = self.shortcut(x) # 残差网络的输出 # 上面这两个网络是平行的关系, 因为 它们的输出不是链式的, 而是 都是同样的 x。 # 拼接主干网络+残差网络,F 相当于TF20的 tf.nn 里面单独有各种 loss函数 return F.relu(major_out + shotcut_out) # 最后在拼接后的网络外面加一层relu
ResNet+ResBlock定义:
class ResNet(nn.Module): def __init__(self, layers): # layers用来接受,用户想要指定 ResNet的形状 super().__init__() self.conv1 = nn.Sequential( nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1), nn.BatchNorm2d(32), nn.ReLU(inplace=True), ) self.res_net = nn.Sequential( *self.ResBlock(32,64, layers[0],stride=2), # 16 *self.ResBlock(64,128, layers[1],stride=2), # 8 *self.ResBlock(128,256, layers[2],stride=2), # 4 *self.ResBlock(256,512, layers[3],stride=2), # 2 ) # 因为我们一会需要展平,里面填"通道*宽度*高度", "输出通道" self.dense = nn.Linear(512 * 2 * 2, 10) def forward(self, x): out = self.conv1(x) out = self.res_net(out) out = out.view(x.size(0), -1)# 卷积展平操作 , torch中没有flatten所以我们就得手工 out = self.dense(out) return out def ResBlock(self, input_channel, output_channel, block_nums=2, stride=2): # 自定义规定,第一个block缩小的(对应通道翻倍),其余block大小不变 # 通道翻倍,步长*2,特征减半 all_block = [BasicBlock(input_channel, output_channel,stride=stride)] for x in range(1,block_nums): all_block.append(BasicBlock(output_channel, output_channel,stride=1)) return all_block # resnet = ResNet(layers=[2,2,2,2]) # out = resnet(torch.randn(4,3,32,32)) # print(out.shape)
模型训练:
def main(): vis = visdom.Visdom() epochs = 5 device = torch.device('cuda') model = ResNet(layers=[2,2,2,2]).to(device) # 会自动把下面的 dense_output ,也就是y_predict 加一层 softmax,y_true做one-hot loss_ = nn.CrossEntropyLoss().to(device) optimizer = optim.Adam( model.parameters(), lr=0.0001) for epoch in range(epochs): total_loss = 0.0 for step, (x_train, y_train) in enumerate(train): x_train, y_train = x_train.to(device), y_train.to(device) dense_output = model(x_train) loss = loss_(dense_output, y_train) optimizer.zero_grad() # 上一个例子提到过,梯度清零 loss.backward() # 反向传播, 并将梯度累加到 optimizer中 optimizer.step() # 相当于做了 w = w - lr * 梯度 total_loss += loss.item() # item()就是 tensor转numpy, TF中的 API是 xx.numpy() if step % 50 == 49: print('epoch:',epoch, 'loss:', total_loss / step) sample_correct_numbers = 0 sample_total_numbers = 0 with torch.no_grad(): # 测试部分不需要计算梯度,因此可以包裹在上下文中。 for x_test, y_test in test: x_test, y_test = x_test.to(device), y_test.to(device) # softmax 的 y_predict 与 y_test的 one-hot做交叉熵 y_predict = model(x_test).argmax(dim=1) sample_correct_numbers += torch.eq(y_predict, y_test).float().sum().item() sample_total_numbers += x_test.size(0) # 每批样本的总数加在一起 acc = sample_correct_numbers / sample_total_numbers print(acc) torch.save(model, 'model.pkl') # 保存整个模型 main()
测试数据预处理(我随便在网上下载下来的 1 张图片):
# 这是Cifar-10数据的标准标签 label = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck'] plane = cv2.imread('plane.jpg') # 我用的opencv plane = cv2.cvtColor(plane, cv2.COLOR_BGR2RGB) # opencv读的数据格式是BGR,所以转为RGB plane = (plane - 127.5) / 127.5 # 二话不说,保持模型输入数据的概率分布,先做归一化 plane = cv2.resize(plane, (32,32)) # 图片缩小到32x32,和模型的输入保持一致 plane = torch.Tensor(plane) # 转换成 tensor plane = plane.view(1,32,32,3) # 增加一个维度 plane = plane.repeat(16,1,1,1) # 我就用一张图片,为了满足模型的形状16,我复制了16次 plane = plane.permute([0,3,1,2]) # 虽然torch也有 像TF那样的transpose,但是只能操作2D device = torch.device('cuda') # 先定义一个cuda设备对象 plane = plane.to(device) # 我们训练集用的cuda, 所以预测数据也要转为cuda
正式输入模型预测:
model = torch.load('model.pkl') # 读取出 我们训练到最后整个模型 # 说明一下,如果你的预测是另一个脚本中,class ResNet 的代码定义部分也要复制过来 out = model(plane) # 预测结果,形状为[16,10] 16个样本,10个预测概率, label_indexes = out.argmax(dim=1) # 取10个概率最大值的索引。 (1轴),形状为 [16,1] print(label_indexes) for i in label_indexes: # i为每个样本预测的最大概率值 的 索引位置。 print(label[i]) # 拿着预测标签的索引 去 真实标签中找到真实标签