PyTorch实战:基于BERT的NER

命名实体识别(NER)是 NLP 中传统的序列标注任务,随着近年来预训练语言模型的兴起,很多 NLP 的传统任务都演化为了“基于预训练模型的微调”这样的范式。所以,本节我们将使用 BERT+BiLSTM+CRF 的框架来实现中文命名实体识别的任务,我们使用 CLUENER2020 的数据进行实验,我们需要使用 transformers 包中的 BERT 模型。

数据预处理

下载的数据已经划分好了训练、验证与测试集了,但是其格式需要再处理成 “文本字符序列[TAB]标签序列” 这样的格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def preprocess(input_dir, output_dir):
text_lst = []
label_lst = []
for file in glob.glob(input_dir+'/*.json'):
with open(file, encoding='utf-8') as fin:
for sline in fin:
json_dict = json.loads(sline.strip())
char_lst = list(json_dict['text'])
label_entities = json_dict.get('label', None)
labels = ['O'] * len(char_lst)
if label_entities is not None:
for key, value in label_entities.items():
for sub_name, sub_index in value.items():
for start_index, end_index in sub_index:
assert ''.join(char_lst[start_index:end_index + 1]) == sub_name
if start_index == end_index:
labels[start_index] = 'S-' + key
else:
labels[start_index] = 'B-' + key
labels[start_index + 1:end_index + 1] = ['I-' + key] * (len(sub_name) - 1)
text_lst.append(char_lst)
label_lst.append(labels)
fout = open(output_dir+'/'+os.path.split(file)[-1][:-5]+'.tsv', 'w', encoding='utf-8')
for i in range(len(text_lst)):
fout.write("%s\t%s\n" % (text_lst[i], label_lst[i]))
fout.close()
# 单独保存label标签ID
if 'train' in file:
label2id = {}
for labels in label_lst:
for label in labels:
if label not in label2id:
label2id[label] = len(label2id)
fout = open(os.path.join(output_dir, 'label2id.json'), 'w', encoding='utf-8')
fout.write("%s\n" % json.dumps(label2id, ensure_ascii=False))
fout.close()

由于我们使用的是 BERT+ 的模式,所以我们不需要构建词汇表,而是直接使用预训练模型本身对应的词汇表;我们只需要自己构建和保存好 label 的编号映射。

加载数据

我们首先来构造 Dataset,其中使用 BERT 的分词器来进行文本序列 ID 化(由于我们是基于字符建模,所以不需要进行分词),同时需要按照 BERT 要求的 fine-tuning 数据格式,在文本序列的开头增加一个 [CLS] token。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import numpy as np
from transformers import BertTokenizer
from torch.utils.data import Dataset

class NERDataset(Dataset):
def __init__(self, text_lst, label_lst, label2id, bert_tokenizer):
self.charseq_labelindex = []
self.labels = []
# 变成单个字的列表,开头加上[CLS]
for text in text_lst:
words = ['[CLS]'] + [token for token in text]
token_start_idxs = 1 + np.cumsum([0] + [1]*(len(words)-1))
self.charseq_labelindex.append((bert_tokenizer.convert_tokens_to_ids(words), token_start_idxs))
for tag in label_lst:
label_id = [label2id.get(t) for t in tag]
self.labels.append(label_id)

def __len__(self):
return len(self.labels)

def __getitem__(self, idx):
return self.charseq_labelindex[idx], self.labels[idx]

我们构造的 Dataset 数据结构为:[(char_seq, label_index), label_seq],其中 label_index 标记了真实序列中存在标签的位置([CLS] token 没有标签)。通过 DataLoader 来实现批量数据的生成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import torch
from torch.utils.data import DataLoader

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

def create_loader(corpus_file, label2id, bert_tokenizer, batch_size, word_pad_idx=0, label_pad_idx=-1):
text_lst = []
label_lst = []
with open(corpus_file, encoding='utf-8') as fin:
for sline in fin:
text, label = sline.strip().split('\t')
text_lst.append(eval(text))
label_lst.append(eval(label))

dataset = NERDataset(text_lst, label_lst, label2id, bert_tokenizer)

def collate_fn(batch_input):
batch_size = len(batch_input) # 输入数据结构: [(char_seq, label_index), label_seq]
max_len = max([len(s[0][0]) for s in batch_input])
max_label_len = 0

# 组建批量的文本序列,使用0进行padding,并生成标签位置标记序列
batch_data = word_pad_idx * np.ones((batch_size, max_len))
batch_label_starts = []
for j in range(batch_size):
cur_len = len(batch_input[j][0][0])
batch_data[j][:cur_len] = batch_input[j][0][0]
# 标记有NER标签的元素,对应位置为1,无标签位置为0([CLS]位置、padding位置)
label_start_idx = batch_input[j][0][1]
label_starts = np.zeros(max_len)
label_starts[[idx for idx in label_start_idx if idx < max_len]] = 1
batch_label_starts.append(label_starts)
max_label_len = max(int(sum(label_starts)), max_label_len)

# 组建批量的label序列,使用-1进行padding
batch_labels = label_pad_idx * np.ones((batch_size, max_label_len))
for j in range(batch_size):
cur_tags_len = len(batch_input[j][1])
batch_labels[j][:cur_tags_len] = batch_input[j][1]

batch_data = torch.tensor(batch_data, dtype=torch.long).to(device)
batch_label_starts = torch.tensor(batch_label_starts, dtype=torch.long).to(device)
batch_labels = torch.tensor(batch_labels, dtype=torch.long).to(device)

return batch_data, batch_label_starts, batch_labels

return DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True,
collate_fn=collate_fn, drop_last=True)

最终返回三组数据构成的迭代器,包括:采取批量数据中最长序列的长度进行补齐后的字符 ID 序列,相应的补齐后的标签序列,真实标签序列的标记序列。

定义网络

使用 BERT+ 的框架,可以简单地认为使用 BERT 网络替换 embedding 层,我们先给出代码,再做详细解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from transformers.models.bert import BertPreTrainedModel,BertModel
from torch.nn.utils.rnn import pad_sequence
from torchcrf import CRF
from torch import nn

class BertNER(BertPreTrainedModel):
def __init__(self, config, lstm_embedding_size, lstm_dropout_prob):
super(BertNER, self).__init__(config)
self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.bilstm = nn.LSTM(
input_size=lstm_embedding_size,
hidden_size=config.hidden_size // 2,
batch_first=True,
num_layers=2,
dropout=lstm_dropout_prob,
bidirectional=True
)
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
self.crf = CRF(config.num_labels, batch_first=True)

self.init_weights()

def forward(self, input_data, token_type_ids=None, attention_mask=None, labels=None,
position_ids=None, inputs_embeds=None, head_mask=None):
input_ids, input_token_starts = input_data
outputs = self.bert(input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds)
sequence_output = outputs[0]

# 去除[CLS]标签等位置,获得与label对齐的pre_label表示
origin_sequence_output = [layer[starts.nonzero().squeeze(1)]
for layer, starts in zip(sequence_output, input_token_starts)]
# 将sequence_output的pred_label维度padding到最大长度
padded_sequence_output = pad_sequence(origin_sequence_output, batch_first=True)
# dropout pred_label的一部分feature
padded_sequence_output = self.dropout(padded_sequence_output)
lstm_output, _ = self.bilstm(padded_sequence_output)
# 得到判别值
logits = self.classifier(lstm_output)
outputs = (logits,)
if labels is not None:
loss_mask = labels.gt(-1)
loss = self.crf(logits, labels, loss_mask) * (-1)
outputs = (loss,) + outputs

return outputs

transformers 包里封装很多基于 Transformer 实现的预训练语言模型(如 BERT、RoBERTa、XLNet 等),其中的核心三大组件包括 Configuration 配置类、Tokenizer 分词类、Model 模型类。Model 类封装了预训练模型的计算图过程,遵循着相同的范式,如根据 token ids 进行 embedding matrix 映射,紧接着多个self-attention 层做编码,最后一层 task-specific 做预测。我们这里需要使用 BertModel,其构造函数和接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class BertModel(BertPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.config = config
self.embeddings = BertEmbeddings(config)
self.encoder = BertEncoder(config)
self.pooler = BertPooler(config)
self.init_weights()

def forward(self, input_ids=None, attention_mask=None,token_type_ids=None,
position_ids=None, head_mask=None, inputs_embeds=None,
encoder_hidden_states=None, encoder_attention_mask=None,
output_attentions=None, output_hidden_states=None,):
...
# sequence_output, pooled_output, (hidden_states), (attentions)
return outputs

我们重点解释一下接口的参数含义:

  • input_ids: 带特殊标记([CLS]、[SEP])的token ids序列,其形状为 [batch_size, seq_len]
  • inputs_embeds: 和 input_ids 参数二选一。inputs_embeds 代表给定了输入 tokens 对应的token embeddings,比如用 word2vec 作为 token embeddings
  • attention_mask: 可选,形状和 input_ids 一致。当对 encoder 端的序列做 self-attention 时,默认全为1;decoder 端序列做 self-attention 时,默认为类似下三角矩阵的形式
  • token_type_ids: 可选,形状和 input_ids 一致,单语句输入时,取值全为 0;在“语句对”的输入中,前一句为全 0,后一句全 1。
  • head_mask: 想用哪些 head,就为 1 或者 None,不想用的 head 就为 0。形状为 [num_heads] 或者 [num_hidden_layers x num_heads],即:可以每层每个 head 单独设置 mask
  • position_ids: 可选,位置 id,默认就是 0~seq_len
  • encoder_hidden_states/encoder_attention_mask:decoder 端对 encoder 端做 cross-attention 时使用,此时 K 和 V 即通过 encoder_hidden_states 得到

另外,从 BertModel 的构造函数中也可以看出,BertEmbedding 可以获取序列的 embedding;BertEncoder 可以完成对序列的编码,获取 sequence token-level encoding;BertPooler 对 [CLS] 对应的 hidden state 进行非线性变换得到 sequence-level encoding。最终返回 sequence token-level encoding 和 sequence-level encoding。

网络定义中还需要注意的是 config 参数是 BERT 自带的配置,下载公开的中文预训练BERT模型,解压模型包请修改配置文件的名字为 “config.json”,并与模型保持在同一文件夹下。模型对应的词典也可以下载参考,尽管当前任务并不需要。另外,CRF 的安装命令是:pip install pytorch-crf 别装错了。

开始训练

我们照例把训练相关的超参数,定义到一个配置类里:

1
2
3
4
5
6
7
8
9
10
11
12
class Config(object):
def __init__(self, label_file):
self.batch_size = 64
self.epoch_num = 5
self.clip_grad = 5 # 进行梯度剪裁时的最大梯度范数
self.patience = 0.0002 # 训练精度提升的最小阈值
self.patience_num = 10 # 未达到提升阈值的次数超过该值则提前终止训练
self.learning_rate = 3e-5
self.bert_model = './model/bert-chn/'
self.model_dir = './model/ner/'
with open(label_file, encoding='utf-8') as fin:
self.label2id = json.loads(fin.readline().strip())

训练过程的实现和其他模型基本一致,这里通过精度提升的变化来控制训练,即精度不再提升(或提升很有限)则提前终止训练:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import logging
from tqdm import tqdm

def train_epoch(train_loader, model, optimizer, scheduler, epoch, config):
model.train()
train_losses = 0
for idx, batch_samples in enumerate(tqdm(train_loader)):
batch_data, batch_token_starts, batch_labels = batch_samples
batch_masks = batch_data.gt(0)
loss = model((batch_data, batch_token_starts),
token_type_ids=None, attention_mask=batch_masks, labels=batch_labels)[0]
train_losses += loss.item()
model.zero_grad()
loss.backward()
# 进行梯度剪裁
nn.utils.clip_grad_norm_(parameters=model.parameters(), max_norm=config.clip_grad)
optimizer.step()
scheduler.step()
train_loss = float(train_losses) / len(train_loader)
logging.info("Epoch: {}, train loss: {}".format(epoch, train_loss))


def train(train_loader, dev_loader, model, optimizer, scheduler, config):
best_val_f1 = 0.0
patience_counter = 0
for epoch in range(1, config.epoch_num + 1):
train_epoch(train_loader, model, optimizer, scheduler, epoch, config)
val_metrics = evaluate(dev_loader, model, config, mode='dev')
val_f1 = val_metrics['f1']
logging.info("Epoch: {}, dev loss: {}, f1 score: {}".format(epoch, val_metrics['loss'], val_f1))
improve_f1 = val_f1 - best_val_f1
if improve_f1 > 1e-5:
best_val_f1 = val_f1
model.save_pretrained(config.model_dir)
logging.info("--------Save best model!--------")
if improve_f1 < config.patience:
patience_counter += 1
else:
patience_counter = 0
else:
patience_counter += 1
# 多次精度提升有限后提前终止训练
if (patience_counter >= config.patience_num and epoch > config.min_epoch_num) or epoch == config.epoch_num:
logging.info("Best val f1: {}".format(best_val_f1))
break
logging.info("Training Finished!")

由于篇幅有限,我们不再给出训练过程中涉及的 evaluate 方法的具体实现,大家可以参考Github上的实现。最后,我们把训练的流程串起来,调用 BertNER 的时候需要显式地传递一个 num_labels 的参数(也可以通过在 BERT 的配置文件里增加一个字段),该参数会自动增加到 config 的参数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from transformers.optimization import get_cosine_schedule_with_warmup, AdamW

def run():
config = Config('./data/NER/output/label2id.json')

bert_tokenizer = BertTokenizer.from_pretrained(config.bert_model, do_lower_case=True)
train_loader = create_loader('./data/NER/output/train.tsv', config.label2id,
bert_tokenizer, config.batch_size)
eval_loader = create_loader('./data/NER/output/dev.tsv', config.label2id,
bert_tokenizer, config.batch_size)

model = BertNER.from_pretrained(config.bert_model, num_labels=len(config.label2id),
lstm_embedding_size=768, lstm_dropout_prob=0.5)
model.to(device)
param_optimizer = list(model.classifier.named_parameters())
optimizer_grouped_parameters = [{'params': [p for n, p in param_optimizer]}]
optimizer = AdamW(optimizer_grouped_parameters, lr=config.learning_rate, correct_bias=False)
train_data_size = 13436
train_steps_per_epoch = train_data_size // config.batch_size
scheduler = get_cosine_schedule_with_warmup(optimizer,
num_warmup_steps=(config.epoch_num // 10) * train_steps_per_epoch,
num_training_steps=config.epoch_num * train_steps_per_epoch)

train(train_loader, eval_loader, model, optimizer, scheduler, config)

我们使用 AdamW(Adam + weight decay) 作为优化器,AdamW 是对传统的 Adam + L2 regularization 的改进。我们使用cosine schedule with warmup 调整学习率。我们将 warmup steps 设置为总训练轮次的十分之一。因此,学习率会在前十分之一的训练轮次线性递增到设置的学习率数值,在之后余弦下降。训练采取了简化操作,即只 Fine-Tune 最上面的分类层(classifier)。

基于BERT的其他任务

在上面的实现中可以总结 BERT+ 的范式为 使用 BertModel 的输出替换任务基本框架中的 embedding 层,所以在其他的 NLP 任务上使用 BERT 应该说是可以举一反三的。不过,transformers 的 Model 组件中还做了一些灵活的扩展以用于下游任务,例如在预训练好的 Base 模型基础上,添加 task-specific heads。其代码级接口通常命名为,XXXForSequenceClassification 等,其中 XXX 是模型的名称(如Bert), 结尾是下游任务的类型(SequenceClassification)。

  1. BertForTokenClassification做命名实体识别
    1
    2
    3
    4
    5
    6
    7
    8
    from transformers.models.bert import BertForTokenClassification

    model = BertForTokenClassification.from_pretrained(bert_model_dir, num_labels=config.num_labels)
    out = model(input_ids=None, attention_mask=None,
    token_type_ids=None, position_ids=None,
    head_mask=None, inputs_embeds=None, labels=None,
    output_attentions=None, output_hidden_states=None)
    # (loss), scores, (hidden_states), (attentions)
  2. BertForSequenceClassification做文本分类
    1
    2
    3
    4
    5
    6
    7
    8
    from transformers.models.bert import BertForSequenceClassification

    model = BertForSequenceClassification.from_pretrained(bert_model_dir, num_labels=config.num_labels)
    out = model(input_ids=None, attention_mask=None,
    token_type_ids=None, position_ids=None,
    head_mask=None, inputs_embeds=None, labels=None,
    output_attentions=None, output_hidden_states=None)
    # (loss), logits, (hidden_states), (attentions)
  3. BertForMultipleChoice做多项选择
    1
    2
    3
    4
    5
    6
    7
    8
    from transformers.models.bert import BertForMultipleChoice

    model = BertForMultipleChoice.from_pretrained(bert_model_dir, num_labels=config.num_labels)
    out = model(input_ids=None, attention_mask=None,
    token_type_ids=None, position_ids=None, head_mask=None,
    inputs_embeds=None, labels=None, output_attentions=None,
    output_hidden_states=None, return_dict=None)
    # (loss), logits, (hidden_states), (attentions)
  4. BertForQuestionAnswering做SQuAD任务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    from transformers.models.bert import BertForQuestionAnswering

    model = BertForQuestionAnswering.from_pretrained(bert_model_dir, num_labels=config.num_labels)
    out = model(input_ids=None, attention_mask=None,
    token_type_ids=None, position_ids=None,
    head_mask=None, inputs_embeds=None,
    start_positions=None, end_positions=None,
    output_attentions=None, output_hidden_states=None)
    # (loss), start_logits, end_logits, (hidden_states), (attentions)

直接使用它们会更加快速地构建 BERT+ 模型以完成特定的 NLP 任务,输出中括号括住的部分表示可选,其中 loss 只有在训练过程中传入了 labels 才会返回。