文章目录
- 任务说明
- 实现思路
- 优化方向
任务说明
任务要求与任务1一样:
从论文标题、摘要作者等信息,判断该论文是否属于医学领域的文献。
可以将任务看作是一个文本二分类任务。机器需要根据对论文摘要等信息的理解,将论文划分为医学领域的文献和非医学领域的文献两个类别之一。
实现思路
使用预训练的大语言模型进行建模,在这里使用的是BERT。具体步骤如下:
- 数据预处理:首先,对文本数据进行预处理,包括文本清洗(如去除特殊字符、标点符号)、分词等操作。可以使用常见的NLP工具包(如NLTK或spaCy)来辅助进行预处理。
- 构建训练所需的dataset:构建Dataset类时,需要定义三个方法__init__,getitem, len,其中__init__方法完成类初始化,__getitem__要求返回返回内容和label,__len__方法返回数据长度
- 构造Dataloader:在其中完成对句子进行编码、填充、组装batch等动作:
- 定义预测模型利用预训练的BERT模型来解决文本二分类任务,我们将使用BERT模型编码中的[CLS]向量来完成二分类任务
[CLS]就是classification的意思,可以理解为用于下游的分类任务。
本任务的baseline如下:
#import 相关库
#导入前置依赖
import os
import pandas as pd
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
# 用于加载bert模型的分词器
from transformers import AutoTokenizer
# 用于加载bert模型
from transformers import BertModel
from pathlib import Pathbatch_size = 8
# 文本的最大长度
text_max_length = 128
# 总训练的epochs数,我只是随便定义了个数
epochs = 100
# 学习率
lr = 3e-5
# 取多少训练集的数据作为验证集
validation_ratio = 0.1
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')# 每多少步,打印一次loss
log_per_step = 50# 数据集所在位置
dataset_dir = Path("")
os.makedirs(dataset_dir) if not os.path.exists(dataset_dir) else ''# 模型存储路径
model_dir = Path("./model/bert_checkpoints")
# 如果模型目录不存在,则创建一个
os.makedirs(model_dir) if not os.path.exists(model_dir) else ''print("Device:", device)# 读取数据集,进行数据处理pd_train_data = pd.read_csv('train.csv')
pd_train_data['title'] = pd_train_data['title'].fillna('')
pd_train_data['abstract'] = pd_train_data['abstract'].fillna('')test_data = pd.read_csv('testB.csv')
test_data['title'] = test_data['title'].fillna('')
test_data['abstract'] = test_data['abstract'].fillna('')
pd_train_data['text'] = pd_train_data['title'].fillna('') + ' ' + pd_train_data['author'].fillna('') + ' ' + pd_train_data['abstract'].fillna('')+ ' ' + pd_train_data['Keywords'].fillna('')
test_data['text'] = test_data['title'].fillna('') + ' ' + test_data['author'].fillna('') + ' ' + test_data['abstract'].fillna('')+ ' ' + pd_train_data['Keywords'].fillna('')
test_data['Keywords'] = test_data['title'].fillna('')# 从训练集中随机采样测试集
validation_data = pd_train_data.sample(frac=validation_ratio)
train_data = pd_train_data[~pd_train_data.index.isin(validation_data.index)]# 构建Dataset
class MyDataset(Dataset):def __init__(self, mode='train'):super(MyDataset, self).__init__()self.mode = mode# 拿到对应的数据if mode == 'train':self.dataset = train_dataelif mode == 'validation':self.dataset = validation_dataelif mode == 'test':# 如果是测试模式,则返回内容和uuid。拿uuid做target主要是方便后面写入结果。self.dataset = test_dataelse:raise Exception("Unknown mode {}".format(mode))def __getitem__(self, index):# 取第index条data = self.dataset.iloc[index]# 取其内容text = data['text']# 根据状态返回内容if self.mode == 'test':# 如果是test,将uuid做为targetlabel = data['uuid']else:label = data['label']# 返回内容和labelreturn text, labeldef __len__(self):return len(self.dataset)
train_dataset = MyDataset('train')
validation_dataset = MyDataset('validation')
train_dataset.__getitem__(0)#获取Bert预训练模型
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
#接着构造我们的Dataloader。
#我们需要定义一下collate_fn,在其中完成对句子进行编码、填充、组装batch等动作:
def collate_fn(batch):"""将一个batch的文本句子转成tensor,并组成batch。:param batch: 一个batch的句子,例如: [('推文', target), ('推文', target), ...]:return: 处理后的结果,例如:src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}target:[1, 1, 0, ...]"""text, label = zip(*batch)text, label = list(text), list(label)# src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可# padding='max_length' 不够长度的进行填充# truncation=True 长度过长的进行裁剪src = tokenizer(text, padding='max_length', max_length=text_max_length, return_tensors='pt', truncation=True)return src, torch.LongTensor(label)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
inputs, targets = next(iter(train_loader))
print("inputs:", inputs)
print("targets:", targets)#定义预测模型,该模型由bert模型加上最后的预测层组成
class MyModel(nn.Module):def __init__(self):super(MyModel, self).__init__()# 加载bert模型self.bert = BertModel.from_pretrained('bert-base-uncased', mirror='tuna')# 最后的预测层self.predictor = nn.Sequential(nn.Linear(768, 256),nn.ReLU(),nn.Linear(256, 1),nn.Sigmoid())def forward(self, src):""":param src: 分词后的推文数据"""# 将src直接序列解包传入bert,因为bert和tokenizer是一套的,所以可以这么做。# 得到encoder的输出,用最前面[CLS]的输出作为最终线性层的输入outputs = self.bert(**src).last_hidden_state[:, 0, :]# 使用线性层来做最终的预测return self.predictor(outputs)
model = MyModel()
model = model.to(device)#定义出损失函数和优化器。这里使用Binary Cross Entropy:
criteria = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)# 由于inputs是字典类型的,定义一个辅助函数帮助to(device)
def to_device(dict_tensors):result_tensors = {}for key, value in dict_tensors.items():result_tensors[key] = value.to(device)return result_tensors#定义一个验证方法,获取到验证集的精准率和loss
def validate():model.eval()total_loss = 0.total_correct = 0for inputs, targets in validation_loader:inputs, targets = to_device(inputs), targets.to(device)outputs = model(inputs)loss = criteria(outputs.view(-1), targets.float())total_loss += float(loss)correct_num = (((outputs >= 0.5).float() * 1).flatten() == targets).sum()total_correct += correct_numreturn total_correct / len(validation_dataset), total_loss / len(validation_dataset)# 首先将模型调成训练模式
model.train()# 清空一下cuda缓存
if torch.cuda.is_available():torch.cuda.empty_cache()# 定义几个变量,帮助打印loss
total_loss = 0.
# 记录步数
step = 0# 记录在验证集上最好的准确率
best_accuracy = 0# 开始训练
for epoch in range(epochs):model.train()for i, (inputs, targets) in enumerate(train_loader):# 从batch中拿到训练数据inputs, targets = to_device(inputs), targets.to(device)# 传入模型进行前向传递outputs = model(inputs)# 计算损失loss = criteria(outputs.view(-1), targets.float())loss.backward()optimizer.step()optimizer.zero_grad()total_loss += float(loss)step += 1if step % log_per_step == 0:print("Epoch {}/{}, Step: {}/{}, total loss:{:.4f}".format(epoch+1, epochs, i, len(train_loader), total_loss))total_loss = 0del inputs, targets# 一个epoch后,使用过验证集进行验证accuracy, validation_loss = validate()print("Epoch {}, accuracy: {:.4f}, validation loss: {:.4f}".format(epoch+1, accuracy, validation_loss))torch.save(model, model_dir / f"model_{epoch}.pt")# 保存最好的模型if accuracy > best_accuracy:torch.save(model, model_dir / f"model_best.pt")best_accuracy = accuracy#加载最好的模型,然后进行测试集的预测
model = torch.load(model_dir / f"model_best.pt")
model = model.eval()test_dataset = MyDataset('test')
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)results = []
for inputs, ids in test_loader:outputs = model(inputs.to(device))outputs = (outputs >= 0.5).int().flatten().tolist()ids = ids.tolist()results = results + [(id, result) for result, id in zip(outputs, ids)]
test_label = [pair[1] for pair in results]
test_data['label'] = test_label
test_data['Keywords'] = test_data['title'].fillna('')
test_data[['uuid', 'Keywords', 'label']].to_csv('submit_task4.csv', index=None)
优化方向
- 换模型:不同的模型的效果是不同的,可以多尝试不同的模型,然后再选择一个最优的。
- 调参优化:如果模型效果不理想,可以尝试调整超参数以获得更好的性能。