PyTorch实战:文本分类

下载单标签多分类数据集20 Newsgroups,该数据集包含 20 个不同主题的新闻组文章。我们将在本节请全方位地通过文本分类任务介绍如何进行深度学习实验,包括数据集的构建与划分、训练集与验证集上的调参,以及最终的测试报告生成。

数据预处理

我们下载的数据解压后是由20个文件夹组成的,每个文件夹对应一个分类的类别,文件夹的名字就是类别名称,文件夹内存放着该类别的新闻文本,一篇新闻对应一个文本文件。我们预处理要做的事情包括:遍历文件,生成文本的词序列与类别标签,构建词典,划分训练、验证与测试集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os,pickle
from sklearn.model_selection import train_test_split

def preprocessing(max_size=50000, min_freq=1):
vocab_dic = {}
label2idx = {}
datas = []
labels = []
for root,dirs,files in os.walk('./data/Classification/20news-18828/'):
if len(files) > 0:
category = os.path.split(root)[-1]
label2idx[category] = label2idx.get(category, len(label2idx))
for file in files:
with open(os.path.join(root, file), encoding='utf-8', errors='ignore') as fin:
doc = []
for sline in fin:
word_lst = sline.strip().split(' ')
for word in word_lst:
if len(word) > 0:
word = word.lower()
vocab_dic[word] = vocab_dic.get(word, 0) + 1
doc.append(word)
doc.append(word)
datas.append(doc)
labels.append(label2idx[category])
vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq],
key=lambda x: x[1], reverse=True)[:max_size]
vocab_list.insert(0, ('<pad>', min_freq))
vocab_list.insert(1, ('<unk>', min_freq))
word2idx = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
idx2word = {idx: word_count[0] for idx, word_count in enumerate(vocab_list)}

with open('./data/Classification/vocab.pkl', 'wb') as fout:
pickle.dump((word2idx, idx2word), fout)
print("vocab size is: {}".format(str(len(word2idx))))
with open('./data/Classification/label.pkl', 'wb') as fout:
pickle.dump(label2idx, fout)
print("class number is: {}".format(len(label2idx)))
# 划分训练、验证、测试集
X_train, X_test, Y_train, Y_test = train_test_split(datas, labels, test_size=0.1, shuffle=True)
X_train, X_eval, Y_train, Y_eval = train_test_split(X_train, Y_train, test_size=0.1, shuffle=True)
with open('./data/Classification/train-eval-corpus.pkl', 'wb') as fout:
pickle.dump((X_train, Y_train, X_eval, Y_eval), fout)
with open('./data/Classification/test-corpus.pkl', 'wb') as fout:
pickle.dump((X_test, Y_test), fout)
print("train-corpus size is: {}, eval-corpus size is: {}, test-corpus size is: {}".format(
len(X_train), len(X_eval), len(X_test)))

我们设置 max_size=50000, min_freq=1,先单独执行预处理操作,将预处理生成的数据保存到磁盘上。

数据加载

这次的数据读取与加载我们仅通过 DataLoader 来实现,同时我们设置了文章的最大长度,采取了 “长则截断,短则补齐” 的方式将所有的文档统一到一个固定长度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import torch

def create_loader(word2idx, datas, labels, max_len, batch_size):
data = []
for doc in datas:
doc_tokens = []
for i in range(min(max_len, len(doc))):
doc_tokens.append(word2idx.get(doc[i], word2idx['<unk>']))
data.append(doc_tokens + [0] * max(0, max_len - len(doc_tokens)))

import torch.utils.data as DataSet
dataset = DataSet.TensorDataset(torch.LongTensor(data).to(device), torch.LongTensor(labels).to(device))

return DataSet.DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True)

由于预处理过程并没有将文档转换为单词编号的序列,所以需要同时传入词典、文本、标签三个参数,并且在初始化时完成单词向编号的映射,对于词典中不包含的词,我们赋值为 ‘\‘ ,不同于语言模型,我们同时返回了数据和标签两个结果(顺序一一对应)。

定义网络

我们定义一个使用卷积神经网络进行文本分类的模型:它包含一个 embedding 层,用于将每个单词编号转换成一个连续空间里的向量;然后是若干个卷积层,用于提取文本的特征;卷积之后的结果需要经过 ReLU 激活,并进行最大池化的降维;最后是一个全连接层用来进行分类,我们把多个卷积操作提取的特征池化后拼接在一起输入到全连接层中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import torch.nn as nn

class TextCNN(nn.Module):
def __init__(self, config):
super(TextCNN, self).__init__()
self.embedding = nn.Embedding(config.vocab_size, config.embed_size)
self.convs = nn.ModuleList(
[nn.Conv2d(1, config.num_filters, (k, config.embed_size)) for k in config.filter_sizes])
self.dropout = nn.Dropout(config.dropout)
self.fc = nn.Linear(config.num_filters * len(config.filter_sizes), config.num_classes)

def conv_and_pool(self, x, conv):
x = torch.relu(conv(x)).squeeze(3)
x = torch.max_pool1d(x, x.size(2)).squeeze(2)
return x

def forward(self, x):
out = self.embedding(x)
out = out.unsqueeze(1)
out = torch.cat([self.conv_and_pool(out, conv) for conv in self.convs], 1)
out = self.dropout(out)
out = self.fc(out)
return out

由于多个卷积层基本都是相似的参数结构,所以我们通过一个列表包含来循环构建,并使用了一个新的模块 nn.ModuleList 来存储它们,加入 ModuleList 里面的 module 是会自动注册到整个网络上,同时 module 的 parameters 也会自动添加到整个网络中。

这里我们需要重点解释一下网络中各层的输入输出形状,我们输入模型的 x 的形状是 [ batch_size, seq_len ],经过 embedding 操作后,变成了 [ batch_size, seq_len, embed_size ];我们使用的卷积网络为 Conv2d 需要的输入是四阶的,即 [ B, C, H, W ],所以我们需要在第一轴上插入一个维度,变成 [ batch_size, 1, seq_len, embed_size ](当然,如果我们选择 Conv1d 则不需要进行形状改造);每个卷积层的输出是通过 conv_and_pool 函数来实现,其中包括对卷积操作后的 ReLU 激活,这里需要计算一下卷积操作的输出:

可以看出,卷积操作的输出尺寸受到卷积核的尺寸、padding、步长等因素的影响,本例中我们将卷积核的宽度 W 设置成了与词向量的维度相同的值,则根据公式可以计算得到卷积输出后的 W 为 1,整个张量的形状为 [ batch_size, num_filters, seq_len-k, 1 ]。因为我们的池化操作 max_pool1d 是一维的,所以我们需要把卷积输出张量的最后一个轴(维度为1)剔除掉。池化后的输出形状就变成了 [ batch_size, num_filters, 1 ],只要把维度为1的第二个轴剔除掉就可以看作是最初输入的特征表示了。多个卷积层之间的输出需要被连接起来。再经过 dropout 后通过全连接层进行分类。

开始训练

启动训练之前,我们需要设置好相关的超参数,我们不妨把这些参数预先定义到一个配置类里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Config(object):
batch_size = 64
max_len = 300
num_epochs = 30

vocab_size = 50002
embed_size = 128
filter_sizes = (2, 3, 4)
num_filters = 256
dropout = 0.5
num_classes = 20
learning_rate = 0.002
require_improvement = 1000

log_path = './log/TextCNN.log'
save_path = './model/TextCNN.ckpt'

with open('./data/Classification/label.pkl', 'rb') as fin:
label2idx = pickle.load(fin)
class_list = [''] * len(label2idx)
for label,idx in label2idx.items():
class_list[idx] = label

然后,我们开始正式的训练过程,其基本步骤和上一节的语言模型是一样的,不同之处在于我们划分了训练集、验证集和测试集,并在训练了100个 batch 后,分别输出在训练集和验证集上的误差与准确率变化。同时,我们使用 tensorboradX 模块来记录训练过程中各种指标的变化,该模块可以方便可视化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import torch.nn.functional as F

import numpy as np
from datetime import timedelta
from sklearn import metrics
from tensorboardX import SummaryWrite

def train(config, model, train_loader, dev_loader, test_loader):
start_time = time.time()
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)

total_batch = 0 # 记录进行到多少batch
dev_best_loss = float('inf')
last_improve = 0 # 记录上次验证集loss下降的batch数
flag = False # 记录是否很久没有效果提升
writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime()))
for epoch in range(config.num_epochs):
print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs))
for i, (trains, labels) in enumerate(train_loader):
outputs = model(trains)
model.zero_grad()
loss = F.cross_entropy(outputs, labels)
loss.backward()
optimizer.step()
if total_batch % 100 == 0:
true = labels.data.cpu()
predic = torch.max(outputs.data, 1)[1].cpu()
train_acc = metrics.accuracy_score(true, predic)
dev_acc, dev_loss = evaluate(config, model, dev_loader)
if dev_loss < dev_best_loss:
dev_best_loss = dev_loss
torch.save(model.state_dict(), config.save_path)
improve = '*'
last_improve = total_batch
else:
improve = ''
time_dif = get_time_dif(start_time)
msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, \
Val Loss: {3:>5.2}, Val Acc: {4:>6.2%}, Time: {5} {6}'
print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve))
writer.add_scalar("loss/train", loss.item(), total_batch)
writer.add_scalar("loss/dev", dev_loss, total_batch)
writer.add_scalar("acc/train", train_acc, total_batch)
writer.add_scalar("acc/dev", dev_acc, total_batch)
model.train()
total_batch += 1
if total_batch - last_improve > config.require_improvement:
# 验证集loss超过1000batch没下降,结束训练
print("No optimization for a long time, auto-stopping...")
flag = True
break
if flag:
break
writer.close()
test(config, model, test_loader)

在训练集上实时计算并记录损失与准确率是一般训练过程必须实现的,下面附上如何在验证集上计算相关指标的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def evaluate(config, model, data_iter, test=False):
model.eval()
loss_total = 0
predict_all = np.array([], dtype=int)
labels_all = np.array([], dtype=int)
with torch.no_grad():
for texts, labels in data_iter:
outputs = model(texts)
loss = F.cross_entropy(outputs, labels)
loss_total += loss
labels = labels.data.cpu().numpy()
predic = torch.max(outputs.data, 1)[1].cpu().numpy()
labels_all = np.append(labels_all, labels)
predict_all = np.append(predict_all, predic)

acc = metrics.accuracy_score(labels_all, predict_all)
if test:
report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4)
confusion = metrics.confusion_matrix(labels_all, predict_all)
return acc, loss_total / len(data_iter), report, confusion
return acc, loss_total / len(data_iter)

训练结束后,我们需要在测试集上真正检测模型的效果,我们使用 sklearn 中的 metrics 模块来生成测试报告,得到一个比较全面的评估结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
def test(config, model, test_iter):
model.load_state_dict(torch.load(config.save_path))
model.eval()
start_time = time.time()
test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True)
msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}'
print(msg.format(test_loss, test_acc))
print("Precision, Recall and F1-Score...")
print(test_report)
print("Confusion Matrix...")
print(test_confusion)
time_dif = get_time_dif(start_time)
print("Time usage:", time_dif)

我们还需要实现一个初始化网络参数的方法,已经有研究论证了对于 ReLU 激活函数最佳的初始化方式是Kaiming He提出来的初始化值(对于 sigmoid 或 tanh 激活函数最佳的初始化方式是 xavier):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def init_network(model, method='kaiming', exclude='embedding', seed=123):
for name, w in model.named_parameters():
if exclude not in name:
if 'weight' in name:
if method == 'xavier':
nn.init.xavier_normal_(w)
elif method == 'kaiming':
nn.init.kaiming_normal_(w)
else:
nn.init.normal_(w)
elif 'bias' in name:
nn.init.constant_(w, 0)
else:
pass

最后我们把读取数据与训练过程通过一个 run 函数串联起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def run():
with open('./data/Classification/vocab.pkl', 'rb') as fin:
word2idx, _ = pickle.load(fin)
with open('./data/Classification/train-eval-corpus.pkl', 'rb') as fin:
X_train, Y_train, X_eval, Y_eval = pickle.load(fin)
with open('./data/Classification/test-corpus.pkl', 'rb') as fin:
X_test, Y_test = pickle.load(fin)

config = Config()
model = TextCNN(config).to(device)
init_network(model)

train_loader = create_loader(word2idx, X_train, Y_train, config.batch_size)
eval_loader = create_loader(word2idx, X_eval, Y_eval, config.max_len, config.batch_size)
test_loader = create_loader(word2idx, X_test, Y_test, config.max_len, config.batch_size)

train(config, model, train_loader, eval_loader, test_loader)

其他网络

除了采用卷积网络以外,我们还可以采用循环网络 LSTM 来实现,这里我们介绍三种 RNN 网络的文本分类模型:RNN、RCNN、RNN+Attention。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class TextRNN(nn.Module):
def __init__(self, config, hidden_size, num_layers, type='RNN', attn_hidden=None):
super(TextRNN, self).__init__()
self.embedding = nn.Embedding(config.vocab_size, config.embed_size)
self.lstm = nn.LSTM(config.embed_size, hidden_size, num_layers, bidirectional=True,
batch_first=True, dropout=0.5)
self.type = type
if self.type == 'RNN':
self.fc = nn.Linear(hidden_size * 2, config.num_classes)
if self.type == 'RCNN':
self.maxpool = nn.MaxPool1d(config.max_len)
self.fc = nn.Linear(hidden_size*2+config.embed_size, config.num_classes)
elif self.type == 'Attention':
self.tanh1 = nn.Tanh()
self.w = nn.Parameter(torch.zeros(hidden_size*2))
self.tanh2 = nn.Tanh()
self.fc1 = nn.Linear(hidden_size*2, attn_hidden)
self.fc = nn.Linear(attn_hidden, config.num_classes)


def forward(self, x):
embed = self.embedding(x)
out, _ = self.lstm(embed)
if self.type == 'RNN':
out = self.fc(out[:, -1, :])
elif self.type == 'RCNN':
out = torch.cat((embed, out), 2)
out = F.relu(out)
out = out.permute(0, 2, 1)
out = self.maxpool(out).squeeze()
out = self.fc(out)
elif self.type == 'Attention':
M = self.tanh1(out)
alpha = F.softmax(torch.matmul(M, self.w), dim=1).unsqueeze(-1)
out = out * alpha
out = torch.sum(out, 1)
out = F.relu(out)
out = self.fc1(out)
out = self.fc(out)
return out

我们还是以网络层的输入输出形状来介绍这三种框架:

  • 输入数据的形状: [batch_size, seq_len]
  • 经过 embedding 输出: [batch_size, seq_len, embed_size]
  • 双向 LSTM 输出: [batch_szie, seq_len, hidden_size*2]
  • 三种框架:
    • RNN
      • 取最后时刻的隐层值: [batch_size, hidden_size*2]
    • RCNN
      • 将 embeeding 层与 LSTM 输出拼接,并进行非线性激活: [batch_size, seq_len, hidden_size*2+embed_size]
      • 池化层,seq 个特征中取最大: [batch_size, hidden_size*2+embed_size]
    • RNN+Attention
      • 初始化一个可学习的权重矩阵 W: [hidden_size*2, 1]
      • 对 LSTM 的输出进行非线性激活后与 W 进行矩阵相乘,并进行 softmax 归一化: [batch_size, seq_len, 1]
      • 将 LSTM 的每一时刻的隐层状态乘对应的注意力分值后求和,得到加权平均后的终极隐层值: [batch_size, hidden_size*2]
  • 全连接: [batch_size, num_class]
  • Softmax 预测: [batch_size, 1]