利用Pytorch框架构建lstm时间序列预测模型
PyTorch和TensorFlow各有优势,选择哪个框架通常取决于具体的应用场景和个人偏好,PyTorch因其简洁性和动态计算图,更适合快速原型开发和学术研究,而TensorFlow凭借其丰富的功能和强大的生产部署能力,更适合工业界的大规模应用,以下链接存在利用TensorFlow构建的lstm模型探讨EMD数据泄露问题的时序预测模型:EMD-CNN-LSTM实现与分析,但在这篇公众号中将利用Pytorch框架进行lstm模型构建,参考两篇文章给读者带来两种框架不同之处的直观感受
代码实现
数据读取并划分数据集
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
plt.rcParams['font.sans-serif'] = 'SimHei' # 设置中文显示
plt.rcParams['axes.unicode_minus'] = False
df = pd.read_excel('df.xlsx',index_col=0, parse_dates=['数据时间'])
# 定义划分比例
train_ratio = 0.7
val_ratio = 0.1
test_ratio = 0.2
# 计算划分的索引
train_split = int(train_ratio * len(df))
val_split = int((train_ratio + val_ratio) * len(df))
# 划分数据集
train_set = df.iloc[:train_split]
val_set = df.iloc[train_split:val_split]
test_set = df.iloc[val_split:]
plt.figure(figsize=(15, 10))
plt.subplot(3,1,1)
plt.plot(train_set, color='c', alpha=0.3)
plt.title('train时序图')
plt.subplot(3,1,2)
plt.plot(val_set, color='b', alpha=0.3)
plt.title('val时序图')
plt.subplot(3,1,3)
plt.plot(test_set, color='r', alpha=0.3)
plt.title('test时序图')
plt.xticks(rotation=45)
plt.show()
读取Excel文件中的时间序列数据,然后将数据集分为训练集、验证集和测试集,最后绘制出这三部分数据的时间序列图
数据归一化
from sklearn.preprocessing import MinMaxScaler
def normalize_dataframe(train_set, val_set, test_set):
scaler = MinMaxScaler()
scaler.fit(train_set) # 在训练集上拟合归一化模型
train = pd.DataFrame(scaler.transform(train_set), columns=train_set.columns, index = train_set.index)
val = pd.DataFrame(scaler.transform(val_set), columns=val_set.columns, index = val_set.index)
test = pd.DataFrame(scaler.transform(test_set), columns=test_set.columns, index = test_set.index)
return train, val, test
train, val, test = normalize_dataframe(train_set, val_set, test_set)
构建滑动窗口
def prepare_data(data, win_size):
X = []
y = []
for i in range(len(data) - win_size):
temp_x = data[i:i + win_size]
temp_y = data[i + win_size]
X.append(temp_x)
y.append(temp_y)
X = np.asarray(X)
y = np.asarray(y)
X = np.expand_dims(X, axis=-1)
return X, y
win_size = 30
# 训练集
X_train, y_train= prepare_data(train['data'].values, win_size)
# 验证集
X_val, y_val= prepare_data(val['data'].values, win_size)
# 测试集
X_test, y_test = prepare_data(test['data'].values, win_size)
print("训练集形状:", X_train.shape, y_train.shape)
print("验证集形状:", X_val.shape, y_val.shape)
print("测试集形状:", X_test.shape, y_test.shape)
定义了函数将时间序列数据转换为用于训练LSTM模型的输入和标签,并分别处理训练集、验证集和测试集的数据,接下来对训练集数据形状进行解读:
- X_train 的形状是 (907, 30, 1),表示有 907 个训练样本,每个样本包含 30 个时间步的数据,并且每个时间步的数据维度是 1(也代表输入特征数为1)
- y_train 的形状是 (907,),表示有 907 个训练标签,每个标签是一个标量(对应于每个训练样本的下一个时间步的数据)
意味着将时间序列数据划分为包含 30 个时间步的窗口,共生成了 907 个这样的窗口,每个窗口对应一个下一个时间步的数据作为预测目标,其它数据集类似,到这里其实和TensorFlow框架数据预处理基本都一致,接下来就开始有一定变化了
数据定义
import torch
import torch.nn as nn # 构建神经网络的模块和层
import torch.optim as optim # 包含优化算法
from torch.utils.data import TensorDataset, DataLoader, Subset
#device 表示了一个用于存储和计算张量的设备,检查是否有可用的GPU,如果有则使用GPU,否则使用CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 检查是否有可用的 GPU
# 将NumPy数组转换为PyTorch张量
#将 numpy 数组 X_train_ts 转换为 PyTorch 的张量,并指定数据类型为 torch.float32,将张量放置在指定的设备上进行存储和计算
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
X_validation_tensor=torch.tensor(X_val, dtype=torch.float32).to(device)
y_validation_tensor= torch.tensor(y_val,dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)
# 创建训练集、验证集和测试集数据集
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
validation_dataset = TensorDataset(X_validation_tensor, y_validation_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
# 定义批量大小
batch_size = 32 #批量大小,算力越强,可以设置越大,可自定义 ,常见的批量大小通常在32到256之间
# 创建数据加载器 shuffle=True 表示在每个 epoch 开始时将数据打乱,这里不进行打乱处理
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 打印训练数据形状
dataiter = iter(train_loader)
sample_x, sample_y = next(dataiter) # 修改这里,使用next方法手动获取一个批次的数据
print('Sample input shape: ', sample_x.shape)
print('Sample output shape: ', sample_y.shape)
将原始数据转换为PyTorch张量,并创建了用于训练和验证模型的数据加载器。通过定义批量大小和数据加载器,能够高效地批量处理数据,有助于加快模型训练过程,torch.Size([32, 30, 1])表示一个批次包含32个样本,每个样本是一个长度为30的单变量时间序列,torch.Size([32])表示一个批次的输出包含32个样本的标签,每个标签对应输入数据窗口之后的一个值
定义LSTM模型
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, Subset
# 定义模型参数字典
model_params = {
'lstm': {
'input_size': X_train.shape[2], # 输入特征维度
'hidden_size': 256, # LSTM隐藏层维度
'num_layers': 1, # LSTM层数
'output_size': 1 # 输出维度
}
}
# 定义 LSTM 模型
class LSTMModel(nn.Module):
def __init__(self, params):
super(LSTMModel, self).__init__()
self.hidden_size = params['hidden_size']
self.num_layers = params['num_layers']
# 定义LSTM层
self.lstm = nn.LSTM(params['input_size'], params['hidden_size'], params['num_layers'], batch_first=True)
# 定义全连接层
self.fc1 = nn.Linear(params['hidden_size'], 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 32)
self.fc4 = nn.Linear(32, 16)
self.fc5 = nn.Linear(16, params['output_size'])
self.relu = nn.ReLU() # 激活函数ReLU
def forward(self, x):
# 初始化隐藏状态和细胞状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
# LSTM前向传播
out, _ = self.lstm(x, (h0, c0))
out = self.relu(out[:, -1, :]) # 取最后一个时间步的输出,并应用ReLU激活函数
out = self.relu(self.fc1(out)) # 全连接层1
out = self.relu(self.fc2(out)) # 全连接层2
out = self.relu(self.fc3(out)) # 全连接层3
out = self.relu(self.fc4(out)) # 全连接层4
out = self.fc5(out) # 输出层
return out
# 初始化模型
lstm_model = LSTMModel(model_params['lstm']).to(device)
# 打印模型架构
print(lstm_model)
定义并初始化一个包含LSTM层和多层全连接层的神经网络模型,用于处理时间序列数据
模型训练
criterion = nn.MSELoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)
# 训练模型
num_epochs = 150
train_losses = []
val_losses = []
for epoch in range(num_epochs):
lstm_model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
optimizer.zero_grad()
outputs = lstm_model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
train_losses.append(train_loss)
lstm_model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
outputs = lstm_model(X_batch)
loss = criterion(outputs.squeeze(), y_batch)
val_loss += loss.item()
val_loss /= len(val_loader)
val_losses.append(val_loss)
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}')
# 绘制损失曲线
plt.figure()
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.legend()
plt.show()
使用PyTorch训练LSTM模型,通过均方误差损失函数和Adam优化器在150个epoch内迭代优化参数,绘制训练和验证损失曲线,在模型早停部分存在对该部分代码的一定注释
测试集预测
# 保存模型
torch.save(lstm_model.state_dict(), 'lstm_model.pth')
# 调用模型
lstm_model = LSTMModel(model_params['lstm']).to(device)
lstm_model.load_state_dict(torch.load('lstm_model.pth'))
lstm_model.eval()
# 在测试集上进行预测
predictions = []
lstm_model.eval()
with torch.no_grad():
for inputs, _ in test_loader:
outputs = lstm_model(inputs)
predictions.extend(outputs.cpu().numpy())
# 将预测结果转换为 NumPy 数组
predictions = np.array(predictions)
模型评价
from sklearn import metrics
mse = metrics.mean_squared_error(y_test, np.array([i for arr in predictions for i in arr]))
rmse = np.sqrt(mse)
mae = metrics.mean_absolute_error(y_test, np.array([i for arr in predictions for i in arr]))
from sklearn.metrics import r2_score
r2 = r2_score(y_test, np.array([i for arr in predictions for i in arr]))
print("均方误差 (MSE):", mse)
print("均方根误差 (RMSE):", rmse)
print("平均绝对误差 (MAE):", mae)
print("拟合优度:", r2)
预测可视化
# 反归一化
df_max = np.max(train_set)
df_min = np.min(train_set)
plt.figure(figsize=(15,4), dpi =300)
plt.subplot(2,1,1)
plt.plot(train_set, color = 'c', label = '训练集')
plt.plot(val_set, color = 'r', label = '验证集')
plt.plot(test_set, color = 'b', label = '测试集')
plt.plot(pd.date_range(start='2021-01-06', end='2021-08-31', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '测试集预测')
plt.legend()
plt.subplot(2,1,2)
plt.plot(test_set, color = 'b', label = '测试集')
plt.plot(pd.date_range(start='2021-01-06', end='2021-08-31', freq='D')
,predictions*(df_max-df_min)+df_min, color = 'y', label = '测试集预测')
plt.legend()
plt.show()
模型早停
# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)
# 训练参数
num_epochs = 150 # 总的训练轮数
patience = 10 # 早停的容忍次数
min_delta = 1e-4 # 最小损失变化
save_path = 'best_lstm_model.pth' # 最佳模型保存路径
train_losses = [] # 存储每个 epoch 的训练损失
val_losses = [] # 存储每个 epoch 的验证损失
# 早停参数初始化
best_loss = float('inf') # 最佳验证损失初始值为无穷大
current_patience = 0 # 当前容忍次数
for epoch in range(num_epochs):
lstm_model.train() # 设置模型为训练模式
train_loss = 0 # 初始化训练损失
for X_batch, y_batch in train_loader:
optimizer.zero_grad() # 梯度清零
outputs = lstm_model(X_batch) # 前向传播
loss = criterion(outputs.squeeze(), y_batch) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 更新参数
train_loss += loss.item() # 累加训练损失
train_loss /= len(train_loader) # 计算平均训练损失
train_losses.append(train_loss) # 记录训练损失
lstm_model.eval() # 设置模型为评估模式
val_loss = 0 # 初始化验证损失
with torch.no_grad(): # 禁用梯度计算
for X_batch, y_batch in val_loader:
outputs = lstm_model(X_batch) # 前向传播
loss = criterion(outputs.squeeze(), y_batch) # 计算损失
val_loss += loss.item() # 累加验证损失
val_loss /= len(val_loader) # 计算平均验证损失
val_losses.append(val_loss) # 记录验证损失
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}') # 打印当前 epoch 的训练和验证损失
# 早停逻辑判断
if val_loss < best_loss - min_delta:
best_loss = val_loss # 更新最佳验证损失
current_patience = 0 # 重置容忍次数
torch.save(lstm_model.state_dict(), save_path) # 保存当前最佳模型
else:
current_patience += 1 # 增加容忍次数
if current_patience >= patience: # 如果超过容忍次数,停止训练
print(f"Early stopping at epoch {epoch+1}.")
break
# 绘制训练和验证损失曲线
plt.figure()
plt.plot(train_losses, label='Train Loss') # 绘制训练损失曲线
plt.plot(val_losses, label='Validation Loss') # 绘制验证损失曲线
plt.legend()
plt.show()
在模型训练时可添加模型早停,其意义在于防止模型过拟合并节省训练时间,早停是一种正则化技术,当验证损失不再改善时提前停止训练,以避免模型在训练集上过度拟合,从而在测试集上表现不佳,早停还可以减少不必要的计算,提高训练效率
- 设定最佳验证损失初始值:
- 在训练开始时,将最佳验证损失 (
best_loss
) 初始化为无穷大
- 在训练开始时,将最佳验证损失 (
- 训练过程中的验证:
- 在每个 epoch 结束时,计算当前的验证损失 (
val_loss
) - 将当前的验证损失与最佳验证损失进行比较
- 在每个 epoch 结束时,计算当前的验证损失 (
- 更新最佳验证损失和保存模型:
- 如果当前验证损失比最佳验证损失降低了一个阈值(
min_delta
),则更新最佳验证损失为当前验证损失,并重置容忍计数器 (current_patience
) - 保存当前模型的状态 (
state_dict
),作为目前的最佳模型
- 如果当前验证损失比最佳验证损失降低了一个阈值(
- 增加容忍次数:
- 如果当前验证损失没有显著改善,则增加容忍次数。
- 如果容忍次数达到预设的最大容忍次数 (
patience
),则停止训练,防止过拟合
通过这种方式,模型可以在验证损失不再显著改善时自动停止训练,从而避免过拟合,并且只需要保存最优模型,节省了存储空间和训练时间