所有文章 > 日积月累 > NLP文本分类任务实战,附代码模板,手把手带你跑通
NLP文本分类任务实战,附代码模板,手把手带你跑通

NLP文本分类任务实战,附代码模板,手把手带你跑通

今天给大家带来NLP领域的文本分类实战,数据用的是真假新闻分类数据集,来自于kaggle,数据集以及代码我会放在后台,回复’新闻文本分类’就可以全部免费领取。这篇文章很长,希望大家能读到最后。

这次任务我打算用前两天刚发布的ModernBert来完成,并且和远古bert来做一个对比,ModernBert比起原版的Bert支持更长的文本输入,并且优化了模型架构,整体效果肯定要比Bert更好。

这里要说一点,如果你想用ModernBert,你的transformer版本至少要4.48以上,不过现在这个版本还在开发,还没有正式发布,在正式发布前你要升级的话可以用下面命令:

pip install git+https://github.com/huggingface/transformers.git

配置文件

首先我们先来看config文件,它的作用是用来集中管理各种训练配置。简单来说,这个类就像是一个工具箱,里面放着模型训练过程中需要的各种“工具”和“参数”。这些参数直接影响模型的性能和训练流程。

import torch

class CFG:
model_name = 'bert_models/ModernBert-base' # 模型路径
tokenizer_path = 'bert_models/ModernBert-base'
max_length = 1024 # 输入文本的最大长度
batch_size = 8 # 训练时的批量大小
learning_rate = 2e-5 # 学习率
step_size = 7 # 学习率调度器的步进大小
gamma = 0.1 # 学习率调度器的衰减因子
num_classes = 2 # 分类的类别数
pooling = 'attention' # 池化方法
epochs = 1 # 训练的轮数
grad_clip=False
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_path = 'dataset/train.csv'
valid_path = 'dataset/evaluation.csv'
target_cols = ['label']
test_path = 'dataset/test.csv'
best_model_path = 'output/best_model.pth'

比如,model_name 和 tokenizer_path 指向模型和分词器的位置,这里使用了前几天刚发布的 ModernBert-base。模型和分词器的路径需要一致,因为分词器的输出要和模型的输入匹配。

max_length 是输入文本的最大长度,比如1024。如果文本超过这个长度,模型就会截断多余的部分。这个值设置得太短可能会丢失重要信息,太长又会增加计算量。

batch_size 是每次训练时处理的样本数量,值为 8,意味着一次训练会同时处理 8 条数据。这个值和显存大小密切相关,显存大的设备可以设置更大的批量。

learning_rate 是学习率,决定模型在训练中参数更新的步伐。这里的值是 2e-5,是一个比较小的值,适合微调预训练模型,如果你的batch比较大,那么你的学习率也要相应调大,不然模型收敛会很慢。

step_size 和 gamma 是学习率调度器的两个参数。step_size 指每隔多少步或多少个epoch调整一次学习率(你可以将步数调整到和训练一个epoch所需要的步数一致),gamma 是调整的倍率,比如每隔7个epoch,学习率会乘以0.1`。

num_classes 表示有两个分类类别,比如我的这个例子中所用到的“真假新闻”。

grad_clip 是一个布尔值,决定是否对梯度进行裁剪。如果梯度爆炸或者训练不稳定,可以打开这个选项,不过我这里设备够了,所以它是关闭状态。

train_pathvalid_path 和 test_path 是数据集的路径,分别对应训练集、验证集和测试集。

数据处理

下面来看看代码,整体代码并不多,你甚至可以把它当作一个模板,以后有类似的任务就可以拿他们出来魔改,首先我们先来看一下数据长什么样:

这个数据格式并不是很好,ID列或者说索引列都没有做好名称设置,至于titletext则是新闻的标题和正文,label是标签列,1代表新闻是真新闻,0代表新闻是假新闻。

整个数据集将训练集划分为24353行,验证集和测试集都划分为8117行,不过里面有一些数据是坏的,正常读没法读出来,所以只能加上on_bad_lines='skip',这点在后面用的时候要注意。下面先来讲讲Dataprocess里面每一个函数的作用:

def collate_fn(batch):
"""
用于动态padding和batch数据的处理。
"""
inputs, labels = zip(*batch) # 分离输入和标签
# 获取 tokenizer 的 keys,例如 'input_ids', 'attention_mask'
input_keys = inputs[0].keys()
# 动态 padding 对每个 key 的张量进行对齐
batch_inputs = {
key: pad_sequence([inp[key].squeeze(0) for inp in inputs], batch_first=True, padding_value=0)
for key in input_keys
}
# 将标签堆叠
batch_labels = torch.stack(labels)

return batch_inputs, batch_labels

先来讲讲数据处理部分,这段代码主要是用来处理批量数据加载的问题,特别是处理那些输入长度不一的数据。我们通过这个叫 collate_fn 的函数来动态地“整理”这些数据,确保每次输入到模型里的数据格式都是一致的。

假设你的数据是个列表,每个元素是一个(输入, 标签)对。inputs,labels=zip(*batch) 这一行就是把它们分成两个单独的部分:inputs是所有输入数据labels是所有标签,这个相信大家都不陌生。假设你的batch站这样:

batch = [({'input_ids': [1, 2, 3]}, 0), ({'input_ids': [4, 5]}, 1)]

那么zip之后就长这样:

inputs = ({'input_ids': [1, 2, 3]}, {'input_ids': [4, 5]})
labels = (0, 1)

而很多时候,每个输入数据的长度可能不一样,比如这个例子中新闻标题+文本的长度。我们需要“补齐”它们,这样才能放进一个统一的张量里,这就是动态补齐,这里用的是 pad_sequence 方法。

  • • input_keys=inputs[0].keys()先获取输入字典的所有key,比如input_idsattention_mask
  • • 然后对每个key的数据单独处理,确保它们在维度上对齐。

假设输入有两句话:

第一句:[1, 2, 3]

第二句:[4, 5]

Padding之后就变成下面这样,对齐了维度:

[[1, 2, 3],
 [4, 5, 0]]

而标签呢没有长度上的不一致问题,自然不需要padding,只需要把它们按批次组合成一个张量供后续计算就行,不需要像输入那样动态补齐,直接用 torch.stack 把它们变成一个张量就好。

这个 collate_fn 的核心任务就是动态补齐数据,让模型可以接受批量数据输入。如果你输入的数据格式不对,或者有些必要的key缺失,那它就无法正常工作,所以在你使用之前要确保数据的完整性。

def prepare_input(cfg, text):
tokenizer = AutoTokenizer.from_pretrained(cfg.tokenizer_path)
inputs = tokenizer(text, return_tensors='pt', truncation=True, padding=True, max_length=cfg.max_length)
return inputs

这段代码的主要功能是把一段文本转化成模型可以处理的输入格式。简单来说,它是用来把“人话”变成“机器能懂的话”,而这个过程主要是靠分词器(tokenizer)来完成的。

首先看第一行,tokenizer = AutoTokenizer.from_pretrained(cfg.tokenizer_path),这里是从预训练的分词器中加载一个模型。我们通过 cfg.tokenizer_path 指定了分词器的位置,这里我在config文件中配置的是ModernBert-base和Bert-base的路径,这一步相当于拿到了一个“文本翻译工具”。

接着,inputs = tokenizer(...) 这一步是翻译的关键。我们把原始文本text丢给分词器,让它做几件事:

  1. 1. 分词:把整段文本拆成更小的单元,比如单词或子词。
  2. 2. 转化成 ID:每个单词或子词都对应一个数字ID,比如 “hello” 可能变成101。
  3. 3. 加特殊符号:像[CLS][SEP]这样的特殊符号会加到句子的开头或结尾,帮助模型理解句子的边界。
  4. 4. 截断:如果文本太长(超过了 cfg.max_length),会被裁剪掉,确保不会超出模型的输入限制。
  5. 5. 填充:如果文本太短,就在后面补零,补到 cfg.max_length 这么长,保证所有输入的长度一致。
  6. 6. 返回张量:分词结果会变成 PyTorch 的张量格式(return_tensors='pt'),这样就可以直接送到模型里。

最后,处理完的 inputs 就是一个包含多个键值对的字典,比如:

  • • input_ids:分词后的 ID。
  • • attention_mask:用来告诉模型哪些位置是填充的,哪些是有效内容。

举个例子,假设你的 cfg.max_length=10,文本是 "I love coding"。分词器处理后的结果可能是:

{
'input_ids':[[101,1045,2293,13639,102,0,0,0,0,0]],
'attention_mask':[[1,1,1,1,1,0,0,0,0,0]]
}

这里 101 和 102 是特殊符号[CLS]和 [SEP],中间是单词的 ID,最后几个零是补的。

文本内容明确,分词器路径有效,比如:

cfg = {'tokenizer_path': 'bert-base-uncased', 'max_length': 10}
text = "I love coding"

这会正常输出一个包含input_idsattention_mask的字典。

当然如果你的配置文件有问题,比如cfg.tokenizer_path指向了不存在的路径,或者cfg.max_length没有设置。这种情况下,代码可能会报错,比如:

OSError: Can't load tokenizer for path: ...

或者如果文本太长而没有设置 truncation=True模型会因为输入过长而无法处理,不过modernbert有8k的限制,比起bert的512长太多了,长度没啥问题。

class FakeNewsTrainDataset(Dataset):
def __init__(self, cfg, df):
self.cfg = cfg
self.titles = df['title'].values
self.texts = df['text'].values

self.contents = self.titles + self.texts
self.labels = df[cfg.target_cols].values

def __len__(self):
return len(self.labels)

def __getitem__(self, item):
inputs = prepare_input(self.cfg, self.contents[item])
label = torch.tensor(self.labels[item], dtype = torch.float)
return inputs, label

然后我们来看这个FakeNewsTrainDataset类,这是PyTorch里用来处理数据集的标准模板。一开始,__init__ 方法负责初始化数据集,类似于“开场准备”。

然后我们传入配置 cfg 和数据表 df,把标题(title)和正文(text)从数据表中提取出来,然后我们把标题和正文拼接在一起,存到self.contents,这样每条数据就是完整的新闻内容。同时把对应的标签列,存到self.labels

接着,__len__ 方法告诉我们这个数据集有多少条数据,就是标签的数量。比如如果有1000篇新闻,这里会返回1000。

最后,这个__getitem__ 方法是核心,用来根据索引取出一条数据。在我定义的这个类里面的具体操作是这样的:

  1. 1. 用刚才说到的prepare_input函数处理新闻内容,把原始文本转化为模型输入格式,模型能理解的格式。这里传入的内容是self.contents[item],也就是第item条新闻。
  2. 2. 把对应的标签转化成PyTorch张量,torch.tensor(self.labels[item], dtype=torch.float) 确保标签是浮点数格式,方便后续计算。

每次返回的就是一个 (inputs, label) 二元组,inputs模型的输入,label是真实的答案。

举个例子,假设数据表df是这样的:

df = pd.DataFrame({
'title':['Breaking News!','Tech Update'],
'text':['Aliens landed on Earth.','New AI model released.'],
'label':[0,1]
})
cfg = {'target_cols':'label'}

对于第 0 条数据,__getitem__(0) 会返回:

inputs = {'input_ids': [[101, 2924, 3246, ...]], 'attention_mask': [[1, 1, 1, ...]]}  # 经过分词和编码的新闻内容
label = tensor(0.0) # 假新闻标签

⚠️注意:在实际打比赛或者其他的测试任务里,在没有标签的情况下,这里应该再定义一个类:

class FakeNewsTestDataset(Dataset):
def __init__(self, cfg, df):
self.cfg = cfg
self.titles = df['title'].values
self.texts = df['text'].values

self.contents = self.titles + self.texts
# self.labels = df[cfg.target_cols].values # 关于label的都要删除掉

def __len__(self):
return len(self.labels)

def __getitem__(self, item):
inputs = prepare_input(self.cfg, self.contents[item])
# label = torch.tensor(self.labels[item], dtype = torch.float)
return inputs

池化

# 平均池化层
class MeanPooling(nn.Module):
def __init__(self):
super(MeanPooling, self).__init__()

def forward(self, last_hidden_state, attention_mask):
input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1)
sum_mask = input_mask_expanded.sum(1)
sum_mask = torch.clamp(sum_mask, min = 1e-9)
mean_embeddings = sum_embeddings/sum_mask
return mean_embeddings


# 最大池化层
class MaxPooling(nn.Module):
def __init__(self):
super(MaxPooling, self).__init__()

def forward(self, last_hidden_state, attention_mask):
input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
embeddings = last_hidden_state.clone()
embeddings[input_mask_expanded == 0] = -1e4
max_embeddings, _ = torch.max(embeddings, dim = 1)
return max_embeddings

我在poolings文件里头写了四种不同的池化方法,每种方法的作用是把输入的隐藏状态(last_hidden_state)压缩成一个固定大小的向量,用来表示整个序列的语义信息。池化的好处是将每个序列的变长特征统一成固定大小的向量,让模型更容易处理。

平均池化层的逻辑是对序列的每个位置求平均值。我们用 attention_mask 把无效的部分过滤掉,然后对有效位置的隐藏状态取平均。假如我们输入一个句子,它的 last_hidden_state 是一个 [4, 768] 的张量,表示有 4 个 token,每个 token 的向量大小是 768。通过平均池化后,输出会变成 [1, 768],即用每个 token 的特征向量的平均值表示整句话。

最大池化层稍微复杂一点。它取序列每个位置上的最大值。实现上,我们用 attention_mask 来扩展输入,然后把无效位置的值设成一个很小的数(-1e4),确保这些位置不会影响最大值计算。如果输入是 [4, 768] 的张量,最大池化会找出每一列的最大值作为结果。

# 最小池化层
class MinPooling(nn.Module):
def __init__(self):
super(MinPooling, self).__init__()

def forward(self, last_hidden_state, attention_mask):
input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
embeddings = last_hidden_state.clone()
embeddings[input_mask_expanded == 0] = 1e-4
min_embeddings, _ = torch.min(embeddings, dim = 1)
return min_embeddings

最小池化层的逻辑和最大池化类似,只是它找的是最小值。我们会把无效位置的值设成一个很大的数(1e4),然后取每列的最小值。这种方法通常在特定任务中才用,毕竟最小值并不总是能很好地表示序列的语义。

# 注意力池化层
class AttentionPooling(nn.Module):
def __init__(self, in_dim):
super().__init__()
self.attention = nn.Sequential(
nn.Linear(in_dim, in_dim),
nn.LayerNorm(in_dim),
nn.GELU(),
nn.Linear(in_dim, 1),
)

def forward(self, last_hidden_state, attention_mask):
w = self.attention(last_hidden_state).float()
w[attention_mask==0]=float('-inf')
w = torch.softmax(w,1)
attention_embeddings = torch.sum(w * last_hidden_state, dim=1)
return attention_embeddings

注意力池化层就复杂一点了。它引入了一个自定义的注意力机制,用来动态调整每个位置的重要性。我们先通过一个全连接网络为每个 token 生成权重 w,然后用 softmax 归一化权重,确保总和为 1。接着用这些权重加权隐藏状态,得到最终的序列表示。比如,如果输入句子中“关键字”的权重比较高,最终的表示会更侧重于这些关键字的特征。

举个例子,假如我们有一个句子“这是一条假新闻”。分词后有 6 个 token,它的隐藏状态是 [6, 768],注意力掩码是 [1, 1, 1, 1, 1, 1],表示每个位置都有效。对于平均池化,结果会是简单平均值;对于最大池化,可能更侧重于“假”和“新闻”的特征;对于注意力池化,如果权重主要集中在“假新闻”,结果就会特别强调这些词的信息。

不同的池化方法适合不同的任务。比如,平均池化简单直接,适合泛化任务;最大池化适合强调最强特征;注意力池化则更灵活,可以动态选择重点,但计算成本稍高。如果方法选得不对,比如在句子中重点词被弱化,模型效果可能会打折扣。

模型定义

这里我用的是自定义模型,有人可能会疑惑我为什么不直接用AutoModelForSequenceClassification,因为我这里是用的是自定义池化操作,在前面我也有提到,你甚至可以将这一份代码当作一份模板,随时可以取来用并且灵活修改,如果你的过程中需要用到什么自定义的操作,自己修改自己运行即可。

# 定义模型
class FakeNewsModel(nn.Module):
def __init__(self, CFG):
super().__init__()
self.CFG = CFG
self.tokenizer = AutoTokenizer.from_pretrained(CFG.model_name)
self.config = AutoConfig.from_pretrained(CFG.model_name, ouput_hidden_states=True)
self.model = AutoModel.from_pretrained(CFG.model_name, config=self.config)

# 自定义池化层
if CFG.pooling == 'mean':
self.pool = MeanPooling()
elif CFG.pooling == 'max':
self.pool = MaxPooling()
elif CFG.pooling == 'min':
self.pool = MinPooling()
elif CFG.pooling == 'attention':
self.pool = AttentionPooling(self.config.hidden_size)
else:
raise ValueError(f"Unsupported pooling type: {CFG.pooling}")

# 分类头部
self.fc = nn.Linear(self.model.config.hidden_size, CFG.num_classes)

def feature(self, inputs) -> torch.Tensor:
"""
提取隐藏层特征并进行池化。
"""
outputs = self.model(**inputs)
# 使用最后一层的 hidden state(即 last_hidden_state)
last_hidden_state = outputs.last_hidden_state # [batch_size, seq_len, hidden_size]
attention_mask = inputs['attention_mask']
feature = self.pool(last_hidden_state, attention_mask) # 进行池化
return feature

def forward(self, inputs) -> torch.Tensor:
"""
前向传播,输出分类 logits。
"""
feature = self.feature(inputs)
logits = self.fc(feature)
return logits

我的这个自定义模型基于PyTorch的nn.Module,并且用了预训练的语言模型,我用的是bert,你也可以改为其他的模型。接下来我们逐步拆解一下。

初始化部分首先加载了分词器、配置和预训练模型

if CFG.pooling =='mean':
self.pool =MeanPooling()
elif CFG.pooling =='max':
self.pool =MaxPooling()
elif CFG.pooling =='min':
self.pool =MinPooling()
elif CFG.pooling =='attention':
self.pool =AttentionPooling(self.config.hidden_size)
else:
raiseValueError(f"Unsupported pooling type: {CFG.pooling}")

接下来定义了一个自定义的池化层,这里我是设置了有4种不同的池化方式,如果你有其他的池化方式也可以加在Pooling.py文件里面:

  1. 1. 平均池化(mean),就是取每个序列位置的平均值,得到句子的整体表示。
  2. 2. 最大池化(max),取每个位置上的最大值。
  3. 3. 最小池化(min),取每个位置上的最小值。
  4. 4. 注意力池化(attention),用注意力机制动态地决定每个位置的重要性。

最后的分类层 self.fc 是一个全连接层,它的输入是模型提取出来的特征,输出的维度是分类类别数(比如真假新闻,2 类)。

feature函数是用来提取特征的。它先通过self.model获取每个token的隐藏状态(last_hidden_state),这是模型理解输入文本后的结果。然后结合注意力掩码attention_mask进行池化,把序列压缩成一个固定大小的特征向量。

forward函数是模型的前向传播逻辑。它调用feature提取特征,再通过分类层计算出分类结果logits,每个类别对应一个得分。

举个例子,你输入一个新闻文本

inputs = {'input_ids': [[101, 2003, 2023, 1037, 2173, 102]], 'attention_mask': [[1, 1, 1, 1, 1, 1]]}

这里的 input_ids 是分词后的 ID,attention_mask表示哪些位置有效。经过feature提取特征后,可能得到一个形状为 [batch_size, hidden_size] 的向量,比如[1, 768]。然后分类层输出形状为[1, 2],两个得分表示“假新闻”和“真新闻”的置信度。

模型训练

模型训练方面我没有一开始就初始化整个模型的参数,毕竟这也只是个简单的分类任务,预训练模型的性能加上微调就已经足够了。

def train(cfg, df1, df2):
# 加载数据
train_dataset = FakeNewsTrainDataset(cfg, df1)
valid_dataset = FakeNewsTrainDataset(cfg, df2)
train_loader = DataLoader(train_dataset, batch_size=cfg.batch_size, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=cfg.batch_size, shuffle=True, collate_fn=collate_fn)

# 加载模型
model = FakeNewsModel(cfg)
model.to(cfg.device)

# 加载优化器
optimizer = torch.optim.AdamW(model.parameters(), lr=cfg.learning_rate)

# 加载学习率调度器
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=cfg.step_size, gamma=cfg.gamma)

# 训练模型
best_val_loss = float('inf')
for epoch in range(cfg.epochs):
# Training
model.train()
train_loss = 0.0
correct = 0
total = 0

# tqdm 显示训练进度
loop = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{cfg.epochs}", leave=False)
for inputs, labels in loop:
inputs = {key: value.to(cfg.device) for key, value in inputs.items()}
labels = labels.to(cfg.device)

# 如果标签是 one-hot 编码,需要用argmax获取类索引
if len(labels.shape) > 1:
labels = labels.argmax(dim=1)
# 前向传播
outputs = model(inputs)
loss = F.cross_entropy(outputs, labels)
# 反向传播
loss.backward()
# 梯度裁剪(防止梯度爆炸)
if cfg.grad_clip:
torch.nn.utils.clip_grad_norm_(model.parameters(), cfg.grad_clip)

optimizer.step()
optimizer.zero_grad()

# 累计损失
train_loss += loss.item() * cfg.batch_size
# 计算准确率
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()

# 更新 tqdm 描述
loop.set_postfix(loss=loss.item(), acc=100.0 * correct / total)

# 学习率调度器步进
if scheduler:
scheduler.step()

# 训练集平均损失和准确率
train_loss /= len(train_loader.dataset)
train_acc = 100.0 * correct / total

# 验证模型
val_loss, val_acc = validate(cfg, model, valid_loader)

# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), cfg.best_model_path)

# 打印每个 epoch 的结果
print(f"Epoch {epoch + 1}/{cfg.epochs}")
print(f" Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%")
print(f" Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")

我们一步步来拆解,让你明白每一部分都在干嘛。

首先,我们需要把数据准备好。这里有两个数据集,一个是训练集 df1,另一个是验证集 df2。每个数据集都通过 FakeNewsTrainDataset 类处理后,变成了适合加载器使用的对象train_dataset和 valid_dataset。然后我们用DataLoader把它们包装起来,这样就能批量读取数据。每次读取一批的大小是 cfg.batch_size(比如 8 或 16),而且训练数据会被随机打乱,这对模型的泛化能力很重要。

接下来是模型部分。我们用上面我们自定义的FakeNewsModel。然后,我们需要一个优化器来调整模型的参数,这里用的是AdamW,能帮助模型更快收敛。还加了一个学习率调度器 StepLR,它会在每隔一定的训练步长(step_size)后,把学习率按照衰减因子(gamma)降低,帮助模型更稳地学习。

训练过程是核心部分,分成多个轮次(epochs)。每一轮中,我们会把整个训练集跑一遍,同时在验证集上测试模型的表现。

在训练时,我们会让模型进入“训练模式”(model.train()),它会启用一些特定的机制,比如 Dropout(用来随机忽略部分神经元,防止过拟合),如果你不想dropout,可以在定义模型的时候,设置:

self.config.hidden_dropout = 0.
self.config.hidden_dropout_prob = 0.
self.config.attention_dropout = 0.
self.config.attention_probs_dropout_prob = 0.

然后,我们开始遍历训练数据,用tqdm显示进度条,这样我们能实时看到训练的进度。

每次处理一批数据,输入和标签都会被放到 GPU 或 CPU 上。如果标签是 one-hot 编码,我们先用 argmax 转成分类索引,这样计算交叉熵损失(F.cross_entropy)时不会报错。模型会通过输入数据生成输出,然后计算损失值。损失值越小,说明模型表现越好。

接着是反向传播,loss.backward() 会计算梯度,让模型知道每个参数该怎么调整。为了防止梯度值太大导致模型不稳定,我们可以用梯度裁剪(clip_grad_norm_)限制梯度的大小。最后一步是优化器更新参数,并清空累积的梯度。

在训练中,我们还会统计损失值和准确率。通过比较模型的预测值和真实标签,我们能知道预测对了多少个,占总数的百分比就是准确率。

当一轮训练结束后,我们会用验证集测试模型,看看它对没见过的数据表现如何。验证部分会用 validate 函数完成。这里的损失和准确率是我们评估模型的关键指标。

如果发现这一轮的验证损失比之前最低的还小,就说明模型变得更好了。我们把这个“最好”的模型存起来,文件名保存在 cfg.best_model_path

最后,每轮结束后都会打印一份报告,告诉我们这轮的训练损失和准确率,还有验证损失和准确率。这些信息能帮我们判断模型是否在正常学习,还是出现了过拟合(验证损失升高)。

用bert训练的效果如下(1epoch,512 max_len):

用modernbert的效果如下(1epoch,1024 max_len, 能更长,效果能更好):

这个训练过程非常常见,但里面有许多细节可以优化,比如选择不同的优化器、调整学习率调度器的参数、或者尝试更多轮次的训练来找到最好的模型等。

验证和测试

def validate(cfg, model, val_loader):
"""
验证模型性能。

参数:
- cfg: 配置对象
- model: PyTorch 模型
- val_loader: 验证数据加载器

返回:
- val_loss: 验证集平均损失
- val_acc: 验证集准确率
"""
model.eval()
val_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
for inputs, labels in val_loader:
inputs = {key: value.to(cfg.device) for key, value in inputs.items()}
labels = labels.to(cfg.device)

if len(labels.shape) > 1:
labels = labels.argmax(dim=1)
outputs = model(inputs)
loss = F.cross_entropy(outputs, labels)

# 累计损失
val_loss += loss.item() * cfg.batch_size

# 计算准确率
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()

# 平均损失和准确率
val_loss /= len(val_loader.dataset)
val_acc = 100.0 * correct / total
return val_loss, val_acc

model = FakeNewsModel(CFG)
model.load_state_dict(torch.load(CFG.best_model_path))
model.to(CFG.device)
model.eval()

test_df = pd.read_csv(CFG.test_path, on_bad_lines='skip', sep=';').sample(100)
test_dataset = FakeNewsTrainDataset(CFG, test_df)
test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, collate_fn=collate_fn)

test_loss, test_acc = validate(CFG, model, test_loader)
print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")

这段代码的目的是验证模型的性能,看看它在验证数据集上的表现怎么样。我们主要关注两件事:平均损失和准确率。

代码的第一步是让模型进入“评估模式”(model.eval())。评估模式和训练模式不同,它会关闭一些训练时才需要的功能,比如 Dropout。这一步很重要,因为评估的时候,我们希望模型的行为尽可能稳定。

接着,torch.no_grad()会告诉PyTorch在这段代码里不用计算梯度。这是为了节省内存和计算资源,因为验证时我们不需要更新模型参数。

然后就是处理验证数据。val_loader是验证数据加载器,会一批一批地喂数据给模型。每一批包括输入 inputs 和标签 labels。这些数据会被送到 GPU 或 CPU 上,具体看你的设备配置。

如果标签是one-hot编码(比如[0, 1]代表第1类,[1, 0]代表第0类),我们用argmax把它转成类索引(比如 10)。这样模型输出的预测值可以直接和真实标签进行比较。

模型会根据输入生成输出 outputs,我们用交叉熵损失函数(F.cross_entropy)计算预测结果和真实标签之间的误差,这就是每一批数据的损失。

我们还要计算模型的准确率。outputs.max(1) 会找到每一行(也就是每个样本)预测值最大的那个类别,这就是模型的预测结果。接着,我们用 predicted.eq(labels) 比较预测和真实标签,看预测对了多少个。累加所有批次的数据后,用对的样本数除以总样本数,就得到了验证集的准确率。

最后,我们计算验证集的平均损失。验证集的总损失除以样本数量,就是平均损失。准确率则是用百分比表示,方便观察。

比如说,如果验证集有1000个样本,模型预测对了850个,那么准确率就是85%。如果平均损失值是0.2,说明模型的预测结果和真实标签之间的误差比较小。

这段代码会返回两个值,一个是验证集的平均损失(val_loss),另一个是验证集的准确率(val_acc)。我们可以用它们来判断模型在验证数据上的表现。如果验证损失和准确率都很理想,说明模型学到了好的特征。如果验证损失越来越高,准确率却没有提升,可能是模型过拟合了。

用bert的测试结果和modernbert的测试结果一样,都是100%的准确率。

本文章转载微信公众号@Chal1ceAI

#你可能也喜欢这些API文章!