随着这几年深度学习的出现,人工智能也得到了更好的发展, 不知不觉已进入我们的生活,并且一点一点地影响着我们.之前待过一家公司里面主要是做ai算法项目.虽然负责的工程这块,几十种算法模型都有nlp和cv算法工程师开发.我们只是包装这些算法模型成一个个对外的服务,随着耳濡目染,慢慢地会去研究下平时这些算法是怎么训练的
1. 网络结构
卷积神经网络一般包括卷积层,池化层和全连接层,这些层通常我们叫做隐藏层
1.1卷积层
如图一个5x5的矩阵 通过一个3x3的卷积核的得到一个3x3的矩阵(为什么做卷积呢,我们把这个矩阵想象成一个rgb的图, 那么图片上每个色素值附近的值都是相近或相等的,那么我们完全可以只取某一部分特征,没必要使用全部提取,那样参数会很多成百上千亿的参数,而且参数过多也容易过拟合)
1.2池化层
池化层(pooling)的作用主要是降低维度,通过对卷积后的结果进行降采样来降低维度,分为最大池化和平均池化两类。
1.2.1 最大池化
最大池化顾名思义,降采样的时候采用最大值的方式采样,如图所示,其中池化核的大小为22,步长也为22
1.2.2 平均池化
平均池化就是用局部的平均值作为采样的值,还是上面的数据,平均池化后的结果为:
1.3全连接层
全连接层就是把卷积层和池化层的输出展开成一维形式,在后面接上与普通网络结构相同的回归网络或者分类网络,一般接在池化层后面,如图所示;
2.文本分类实战
在深度学习领域已经有多种框架本次才有谷歌的tensorflow来实现,在tensorflow有三个比较重要的概念
下图就是本次模型的计算图
首先通过embedding
然后做卷积操作 和池化
再是全连接层 和做dropout 和选择激活函数relu
后面通过 softmax 和 argmax 得到分类结果
…
再来一个详细图
3. 代码实战
# coding: utf-8 import tensorflow as tf class TCNNConfig(object): """CNN配置参数""" embedding_dim = 64 # 词向量维度 seq_length = 600 # 序列长度 num_classes = 10 # 类别数 num_filters = 256 # 卷积核数目 kernel_size = 5 # 卷积核尺寸 vocab_size = 5000 # 词汇表达小 hidden_dim = 128 # 全连接层神经元 dropout_keep_prob = 0.5 # dropout保留比例 learning_rate = 1e-3 # 学习率 batch_size = 64 # 每批训练大小 num_epochs = 1 # 总迭代轮次 print_per_batch = 100 # 每多少轮输出一次结果 save_per_batch = 10 # 每多少轮存入tensorboard class TextCNN(object): """文本分类,CNN模型""" def __init__(self, config): self.config = config # 三个待输入的数据 self.input_x = tf.placeholder(tf.int32, [None, self.config.seq_length], name='input_x') self.input_y = tf.placeholder(tf.float32, [None, self.config.num_classes], name='input_y') self.keep_prob = tf.placeholder(tf.float32, name='keep_prob') self.cnn() def cnn(self): """CNN模型""" # 词向量映射 with tf.device('/cpu:0'): embedding = tf.get_variable('embedding', [self.config.vocab_size, self.config.embedding_dim]) embedding_inputs = tf.nn.embedding_lookup(embedding, self.input_x) with tf.name_scope("cnn"): # CNN layer conv = tf.layers.conv1d(embedding_inputs, self.config.num_filters, self.config.kernel_size, name='conv') # global max pooling layer gmp = tf.reduce_max(conv, reduction_indices=[1], name='gmp') with tf.name_scope("score"): # 全连接层,后面接dropout以及relu激活 fc = tf.layers.dense(gmp, self.config.hidden_dim, name='fc1') fc = tf.contrib.layers.dropout(fc, self.keep_prob) fc = tf.nn.relu(fc) # 分类器 logits shape shape=(?, 10) self.logits = tf.layers.dense(fc, self.config.num_classes, name='fc2') # tf.nn.softmax 把logits 的数字变成总和等于1 tf.argmax取最大值的下标 准确率最高的 self.y_pred_cls_min = tf.nn.softmax(self.logits) # 预测类别 self.y_pred_cls = tf.argmax(self.y_pred_cls_min, 1) # 预测类别 with tf.name_scope("optimize"): # 损失函数,交叉熵 cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=self.logits, labels=self.input_y) self.loss = tf.reduce_mean(cross_entropy) # 优化器 self.optim = tf.train.AdamOptimizer(learning_rate=self.config.learning_rate).minimize(self.loss) with tf.name_scope("accuracy"): # 准确率 correct_pred = tf.equal(tf.argmax(self.input_y, 1), self.y_pred_cls) self.acc = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
#!/usr/bin/python # -*- coding: utf-8 -*- from __future__ import print_function import os import sys import time from datetime import timedelta import numpy as np import tensorflow as tf from sklearn import metrics from cnn_model import TCNNConfig, TextCNN from data.cnews_loader import read_vocab, read_category, batch_iter, process_file, build_vocab base_dir = 'data/cnews' train_dir = os.path.join(base_dir, 'cnews.train.txt') test_dir = os.path.join(base_dir, 'cnews.test.txt') val_dir = os.path.join(base_dir, 'cnews.val.txt') vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt') save_dir = 'checkpoints/textcnn' save_path = os.path.join(save_dir, 'best_validation') # 最佳验证结果保存路径 def get_time_dif(start_time): """获取已使用时间""" end_time = time.time() time_dif = end_time - start_time return timedelta(seconds=int(round(time_dif))) def feed_data(x_batch, y_batch, keep_prob): feed_dict = { model.input_x: x_batch, model.input_y: y_batch, model.keep_prob: keep_prob } return feed_dict def evaluate(sess, x_, y_): """评估在某一数据上的准确率和损失""" data_len = len(x_) batch_eval = batch_iter(x_, y_, 128) total_loss = 0.0 total_acc = 0.0 for x_batch, y_batch in batch_eval: batch_len = len(x_batch) feed_dict = feed_data(x_batch, y_batch, 1.0) loss, acc = sess.run([model.loss, model.acc], feed_dict=feed_dict) total_loss += loss * batch_len total_acc += acc * batch_len return total_loss / data_len, total_acc / data_len def train(): print("Configuring TensorBoard and Saver...") # 配置 Tensorboard,重新训练时,请将tensorboard文件夹删除,不然图会覆盖 tensorboard_dir = 'tensorboard/textcnn' if not os.path.exists(tensorboard_dir): os.makedirs(tensorboard_dir) tf.summary.scalar("loss", model.loss) tf.summary.scalar("accuracy", model.acc) merged_summary = tf.summary.merge_all() writer = tf.summary.FileWriter(tensorboard_dir) # 配置 Saver saver = tf.train.Saver() if not os.path.exists(save_dir): os.makedirs(save_dir) print("Loading training and validation data...") # 载入训练集与验证集 start_time = time.time() x_train, y_train = process_file(train_dir, word_to_id, cat_to_id, config.seq_length) x_val, y_val = process_file(val_dir, word_to_id, cat_to_id, config.seq_length) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) # 创建session session = tf.Session() session.run(tf.global_variables_initializer()) writer.add_graph(session.graph) print('Training and evaluating...') start_time = time.time() total_batch = 0 # 总批次 best_acc_val = 0.0 # 最佳验证集准确率 last_improved = 0 # 记录上一次提升批次 require_improvement = 1000 # 如果超过1000轮未提升,提前结束训练 flag = False for epoch in range(config.num_epochs): print('Epoch:', epoch + 1) batch_train = batch_iter(x_train, y_train, config.batch_size) for x_batch, y_batch in batch_train: feed_dict = feed_data(x_batch, y_batch, config.dropout_keep_prob) if total_batch % config.save_per_batch == 0: # 每多少轮次将训练结果写入tensorboard scalar s = session.run(merged_summary, feed_dict=feed_dict) writer.add_summary(s, total_batch) if total_batch % config.print_per_batch == 0: # 每多少轮次输出在训练集和验证集上的性能 feed_dict[model.keep_prob] = 1.0 loss_train, acc_train = session.run([model.loss, model.acc], feed_dict=feed_dict) loss_val, acc_val = evaluate(session, x_val, y_val) # todo if acc_val > best_acc_val: # 保存最好结果 best_acc_val = acc_val last_improved = total_batch saver.save(sess=session, save_path=save_path) improved_str = '*' else: improved_str = '' time_dif = get_time_dif(start_time) msg = 'Iter: {0:>6}, Train Loss: {1:>6.2}, Train Acc: {2:>7.2%},' \ + ' Val Loss: {3:>6.2}, Val Acc: {4:>7.2%}, Time: {5} {6}' print(msg.format(total_batch, loss_train, acc_train, loss_val, acc_val, time_dif, improved_str)) feed_dict[model.keep_prob] = config.dropout_keep_prob session.run(model.optim, feed_dict=feed_dict) # 运行优化 total_batch += 1 if total_batch - last_improved > require_improvement: # 验证集正确率长期不提升,提前结束训练 print("No optimization for a long time, auto-stopping...") flag = True break # 跳出循环 if flag: # 同上 break def test(): print("Loading test data...") start_time = time.time() x_test, y_test = process_file(test_dir, word_to_id, cat_to_id, config.seq_length) session = tf.Session() session.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess=session, save_path=save_path) # 读取保存的模型 print('Testing...') loss_test, acc_test = evaluate(session, x_test, y_test) msg = 'Test Loss: {0:>6.2}, Test Acc: {1:>7.2%}' print(msg.format(loss_test, acc_test)) batch_size = 128 data_len = len(x_test) num_batch = int((data_len - 1) / batch_size) + 1 y_test_cls = np.argmax(y_test, 1) y_pred_cls = np.zeros(shape=len(x_test), dtype=np.int32) # 保存预测结果 for i in range(num_batch): # 逐批次处理 start_id = i * batch_size end_id = min((i + 1) * batch_size, data_len) feed_dict = { model.input_x: x_test[start_id:end_id], model.keep_prob: 1.0 } y_pred_cls[start_id:end_id] = session.run(model.y_pred_cls, feed_dict=feed_dict) # 评估 print("Precision, Recall and F1-Score...") print(metrics.classification_report(y_test_cls, y_pred_cls, target_names=categories)) # 混淆矩阵 print("Confusion Matrix...") cm = metrics.confusion_matrix(y_test_cls, y_pred_cls) print(cm) time_dif = get_time_dif(start_time) print("Time usage:", time_dif) if __name__ == '__main__': # if len(sys.argv) != 2 or sys.argv[1] not in ['train', 'test']: # raise ValueError("""usage: python run_cnn.py [train / test]""") print('Configuring CNN model...') config = TCNNConfig() if not os.path.exists(vocab_dir): # 如果不存在词汇表,重建 build_vocab(train_dir, vocab_dir, config.vocab_size) categories, cat_to_id = read_category() words, word_to_id = read_vocab(vocab_dir) config.vocab_size = len(words) model = TextCNN(config) train() # if sys.argv[1] == 'train': # train() # else: # test()
# coding: utf-8 import sys from collections import Counter import numpy as np #import tensorflow.keras as kr import keras as kr if sys.version_info[0] > 2: is_py3 = True else: reload(sys) sys.setdefaultencoding("utf-8") is_py3 = False def native_word(word, encoding='utf-8'): """如果在python2下面使用python3训练的模型,可考虑调用此函数转化一下字符编码""" if not is_py3: return word.encode(encoding) else: return word def native_content(content): if not is_py3: return content.decode('utf-8') else: return content def open_file(filename, mode='r'): """ 常用文件操作,可在python2和python3间切换. mode: 'r' or 'w' for read or write """ if is_py3: return open(filename, mode, encoding='utf-8', errors='ignore') else: return open(filename, mode) def read_file(filename): """读取文件数据""" contents, labels = [], [] with open_file(filename) as f: for line in f: try: label, content = line.strip().split('\t') if content: contents.append(list(native_content(content))) labels.append(native_content(label)) except: pass return contents, labels def build_vocab(train_dir, vocab_dir, vocab_size=5000): """根据训练集构建词汇表,存储""" data_train, _ = read_file(train_dir) all_data = [] for content in data_train: all_data.extend(content) counter = Counter(all_data) count_pairs = counter.most_common(vocab_size - 1) words, _ = list(zip(*count_pairs)) # 添加一个 <PAD> 来将所有文本pad为同一长度 words = ['<PAD>'] + list(words) open_file(vocab_dir, mode='w').write('\n'.join(words) + '\n') def read_vocab(vocab_dir): """读取词汇表""" # words = open_file(vocab_dir).read().strip().split('\n') with open_file(vocab_dir) as fp: # 如果是py2 则每个值都转化为unicode words = [native_content(_.strip()) for _ in fp.readlines()] word_to_id = dict(zip(words, range(len(words)))) return words, word_to_id def read_category(): """读取分类目录,固定""" categories = ['体育', '财经', '房产', '家居', '教育', '科技', '时尚', '时政', '游戏', '娱乐'] categories = [native_content(x) for x in categories] cat_to_id = dict(zip(categories, range(len(categories)))) return categories, cat_to_id def to_words(content, words): """将id表示的内容转换为文字""" return ''.join(words[x] for x in content) def process_file(filename, word_to_id, cat_to_id, max_length=600): """将文件转换为id表示""" contents, labels = read_file(filename) data_id, label_id = [], [] for i in range(len(contents)): data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id]) label_id.append(cat_to_id[labels[i]]) # 使用keras提供的pad_sequences来将文本pad为固定长度 x_pad = kr.preprocessing.sequence.pad_sequences(data_id, max_length) y_pad = kr.utils.to_categorical(label_id, num_classes=len(cat_to_id)) # 将标签转换为one-hot表示 return x_pad, y_pad def batch_iter(x, y, batch_size=64): """生成批次数据""" data_len = len(x) num_batch = int((data_len - 1) / batch_size) + 1 indices = np.random.permutation(np.arange(data_len)) x_shuffle = x[indices] y_shuffle = y[indices] for i in range(num_batch): start_id = i * batch_size end_id = min((i + 1) * batch_size, data_len) yield x_shuffle[start_id:end_id], y_shuffle[start_id:end_id]
# coding: utf-8 from __future__ import print_function import os import tensorflow as tf #import tensorflow.contrib.keras as kr import keras as kr from cnn_model import TCNNConfig, TextCNN from data.cnews_loader import read_category, read_vocab try: bool(type(unicode)) except NameError: unicode = str base_dir = 'data/cnews' vocab_dir = os.path.join(base_dir, 'cnews.vocab.txt') save_dir = 'checkpoints/textcnn' save_path = os.path.join(save_dir, 'best_validation') # 最佳验证结果保存路径 class CnnModel: def __init__(self): self.config = TCNNConfig() self.categories, self.cat_to_id = read_category() self.words, self.word_to_id = read_vocab(vocab_dir) self.config.vocab_size = len(self.words) self.model = TextCNN(self.config) self.session = tf.Session() self.session.run(tf.global_variables_initializer()) saver = tf.train.Saver() saver.restore(sess=self.session, save_path=save_path) # 读取保存的模型 def predict(self, message): # 支持不论在python2还是python3下训练的模型都可以在2或者3的环境下运行 content = unicode(message) data = [self.word_to_id[x] for x in content if x in self.word_to_id] feed_dict = { self.model.input_x: kr.preprocessing.sequence.pad_sequences([data], self.config.seq_length), self.model.keep_prob: 1.0 } # y_pred_cls = self.session.run(self.model.y_pred_cls, feed_dict=feed_dict) y_pred_cls, y_pred_cls_min = self.session.run([self.model.y_pred_cls, self.model.y_pred_cls_min], feed_dict=feed_dict) print(y_pred_cls_min) print(y_pred_cls) return self.categories[y_pred_cls[0]] if __name__ == '__main__': cnn_model = CnnModel() test_demo = ['vivo手机超感光微云台双主摄,蔡司联合影像系统,高通骁龙888芯片,120Hz高刷新率,55W闪充', '詹姆斯:100%健康比排位重要,提出附加赛想法的人该被解雇'] for i in test_demo: print(cnn_model.predict(i))