利用Pytorch框架构建lstm时间序列预测模型

技术

PyTorch和TensorFlow各有优势,选择哪个框架通常取决于具体的应用场景和个人偏好,PyTorch因其简洁性和动态计算图,更适合快速原型开发和学术研究,而TensorFlow凭借其丰富的功能和强大的生产部署能力,更适合工业界的大规模应用,以下链接存在利用TensorFlow构建的lstm模型探讨EMD数据泄露问题的时序预测模型:EMD-CNN-LSTM实现与分析,但在这篇公众号中将利用Pytorch框架进行lstm模型构建,参考两篇文章给读者带来两种框架不同之处的直观感受

代码实现

数据读取并划分数据集


          
import numpy as np
          
import matplotlib.pyplot as plt
          
import pandas as pd
          
plt.rcParams['font.sans-serif'] = 'SimHei' # 设置中文显示
          
plt.rcParams['axes.unicode_minus'] = False
          

          
df = pd.read_excel('df.xlsx',index_col=0, parse_dates=['数据时间'])
          

          
# 定义划分比例
          
train_ratio = 0.7
          
val_ratio = 0.1
          
test_ratio = 0.2
          

          
# 计算划分的索引
          
train_split = int(train_ratio * len(df))
          
val_split = int((train_ratio + val_ratio) * len(df))
          

          
# 划分数据集
          
train_set = df.iloc[:train_split]
          
val_set = df.iloc[train_split:val_split]
          
test_set = df.iloc[val_split:]
          

          
plt.figure(figsize=(15, 10))
          
plt.subplot(3,1,1)
          
plt.plot(train_set, color='c',  alpha=0.3)
          
plt.title('train时序图')
          

          
plt.subplot(3,1,2)
          
plt.plot(val_set, color='b',  alpha=0.3)
          
plt.title('val时序图')
          

          
plt.subplot(3,1,3)
          
plt.plot(test_set, color='r',  alpha=0.3)
          
plt.title('test时序图')
          
plt.xticks(rotation=45)
          
plt.show()
      

picture.image

读取Excel文件中的时间序列数据,然后将数据集分为训练集、验证集和测试集,最后绘制出这三部分数据的时间序列图

数据归一化


          
from sklearn.preprocessing import MinMaxScaler
          

          
def normalize_dataframe(train_set, val_set, test_set):
          
    scaler = MinMaxScaler()
          
    scaler.fit(train_set)  # 在训练集上拟合归一化模型
          
    
          
    train = pd.DataFrame(scaler.transform(train_set), columns=train_set.columns, index = train_set.index)
          
    val = pd.DataFrame(scaler.transform(val_set), columns=val_set.columns, index = val_set.index)
          
    test = pd.DataFrame(scaler.transform(test_set), columns=test_set.columns, index = test_set.index)
          
    return train, val, test
          

          
train, val, test = normalize_dataframe(train_set, val_set, test_set)
      

构建滑动窗口


          
def prepare_data(data, win_size):
          
    X = []  
          
    y = []  
          

          
    for i in range(len(data) - win_size):
          
        temp_x = data[i:i + win_size] 
          
        temp_y = data[i + win_size]    
          
        X.append(temp_x)
          
        y.append(temp_y)
          

          
    X = np.asarray(X)
          
    y = np.asarray(y)
          
    X = np.expand_dims(X, axis=-1)
          
    return X, y
          

          
win_size = 30
          

          
# 训练集
          
X_train, y_train= prepare_data(train['data'].values, win_size)
          

          
# 验证集
          
X_val, y_val= prepare_data(val['data'].values, win_size)
          

          
# 测试集
          
X_test, y_test = prepare_data(test['data'].values, win_size)
          

          
print("训练集形状:", X_train.shape, y_train.shape)
          
print("验证集形状:", X_val.shape, y_val.shape)
          
print("测试集形状:", X_test.shape, y_test.shape)
      

picture.image

定义了函数将时间序列数据转换为用于训练LSTM模型的输入和标签,并分别处理训练集、验证集和测试集的数据,接下来对训练集数据形状进行解读:

  • X_train 的形状是 (907, 30, 1),表示有 907 个训练样本,每个样本包含 30 个时间步的数据,并且每个时间步的数据维度是 1(也代表输入特征数为1 )
  • y_train 的形状是 (907,),表示有 907 个训练标签,每个标签是一个标量(对应于每个训练样本的下一个时间步的数据)

意味着将时间序列数据划分为包含 30 个时间步的窗口,共生成了 907 个这样的窗口,每个窗口对应一个下一个时间步的数据作为预测目标,其它数据集类似,到这里其实和TensorFlow框架数据预处理基本都一致,接下来就开始有一定变化了

数据定义


          
import torch
          
import torch.nn as nn # 构建神经网络的模块和层
          
import torch.optim as optim # 包含优化算法
          
from torch.utils.data import TensorDataset, DataLoader, Subset
          

          
#device 表示了一个用于存储和计算张量的设备,检查是否有可用的GPU,如果有则使用GPU,否则使用CPU
          
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 检查是否有可用的 GPU
          
# 将NumPy数组转换为PyTorch张量
          
#将 numpy 数组 X_train_ts 转换为 PyTorch 的张量,并指定数据类型为 torch.float32,将张量放置在指定的设备上进行存储和计算
          
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
          
y_train_tensor = torch.tensor(y_train, dtype=torch.float32).to(device)
          

          
X_validation_tensor=torch.tensor(X_val, dtype=torch.float32).to(device)
          
y_validation_tensor= torch.tensor(y_val,dtype=torch.float32).to(device)
          

          
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
          
y_test_tensor = torch.tensor(y_test, dtype=torch.float32).to(device)
          

          
# 创建训练集、验证集和测试集数据集
          
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
          
validation_dataset = TensorDataset(X_validation_tensor, y_validation_tensor)
          
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
          

          
# 定义批量大小
          
batch_size = 32 #批量大小,算力越强,可以设置越大,可自定义 ,常见的批量大小通常在32到256之间
          

          
# 创建数据加载器 shuffle=True 表示在每个 epoch 开始时将数据打乱,这里不进行打乱处理
          
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
          
val_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)
          
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
          

          
# 打印训练数据形状
          
dataiter = iter(train_loader)
          
sample_x, sample_y = next(dataiter)  # 修改这里,使用next方法手动获取一个批次的数据
          
print('Sample input shape: ', sample_x.shape)
          
print('Sample output shape: ', sample_y.shape)
      

picture.image

将原始数据转换为PyTorch张量,并创建了用于训练和验证模型的数据加载器。通过定义批量大小和数据加载器,能够高效地批量处理数据,有助于加快模型训练过程,torch.Size([32, 30, 1])表示一个批次包含32个样本,每个样本是一个长度为30的单变量时间序列,torch.Size([32])表示一个批次的输出包含32个样本的标签,每个标签对应输入数据窗口之后的一个值

定义LSTM模型


          
import torch
          
import torch.nn as nn
          
import torch.optim as optim
          
from torch.utils.data import TensorDataset, DataLoader, Subset
          

          
# 定义模型参数字典
          
model_params = {
          
    'lstm': {
          
        'input_size': X_train.shape[2],  # 输入特征维度
          
        'hidden_size': 256,              # LSTM隐藏层维度
          
        'num_layers': 1,                 # LSTM层数
          
        'output_size': 1                 # 输出维度
          
    }
          
}
          

          
# 定义 LSTM 模型
          
class LSTMModel(nn.Module):
          
    def __init__(self, params):
          
        super(LSTMModel, self).__init__()
          
        self.hidden_size = params['hidden_size']
          
        self.num_layers = params['num_layers']
          
        # 定义LSTM层
          
        self.lstm = nn.LSTM(params['input_size'], params['hidden_size'], params['num_layers'], batch_first=True)
          
        # 定义全连接层
          
        self.fc1 = nn.Linear(params['hidden_size'], 128)
          
        self.fc2 = nn.Linear(128, 64)
          
        self.fc3 = nn.Linear(64, 32)
          
        self.fc4 = nn.Linear(32, 16)
          
        self.fc5 = nn.Linear(16, params['output_size'])
          
        self.relu = nn.ReLU()  # 激活函数ReLU
          

          
    def forward(self, x):
          
        # 初始化隐藏状态和细胞状态
          
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
          
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
          
        
          
        # LSTM前向传播
          
        out, _ = self.lstm(x, (h0, c0))
          
        out = self.relu(out[:, -1, :])  # 取最后一个时间步的输出,并应用ReLU激活函数
          
        out = self.relu(self.fc1(out))  # 全连接层1
          
        out = self.relu(self.fc2(out))  # 全连接层2
          
        out = self.relu(self.fc3(out))  # 全连接层3
          
        out = self.relu(self.fc4(out))  # 全连接层4
          
        out = self.fc5(out)  # 输出层
          
        return out
          
    
          
# 初始化模型
          
lstm_model = LSTMModel(model_params['lstm']).to(device)
          

          
# 打印模型架构
          
print(lstm_model)
      

picture.image

定义并初始化一个包含LSTM层和多层全连接层的神经网络模型,用于处理时间序列数据

模型训练


          
criterion = nn.MSELoss()
          
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)
          

          
# 训练模型
          
num_epochs = 150
          
train_losses = []
          
val_losses = []
          

          
for epoch in range(num_epochs):
          
    lstm_model.train()
          
    train_loss = 0
          
    for X_batch, y_batch in train_loader:
          
        optimizer.zero_grad()
          
        outputs = lstm_model(X_batch)
          
        loss = criterion(outputs.squeeze(), y_batch)
          
        loss.backward()
          
        optimizer.step()
          
        train_loss += loss.item()
          
    
          
    train_loss /= len(train_loader)
          
    train_losses.append(train_loss)
          
    
          
    lstm_model.eval()
          
    val_loss = 0
          
    with torch.no_grad():
          
        for X_batch, y_batch in val_loader:
          
            outputs = lstm_model(X_batch)
          
            loss = criterion(outputs.squeeze(), y_batch)
          
            val_loss += loss.item()
          
    
          
    val_loss /= len(val_loader)
          
    val_losses.append(val_loss)
          
    
          
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}')
          

          
# 绘制损失曲线
          
plt.figure()
          
plt.plot(train_losses, label='Train Loss')
          
plt.plot(val_losses, label='Validation Loss')
          
plt.legend()
          
plt.show()
      

picture.image

使用PyTorch训练LSTM模型,通过均方误差损失函数和Adam优化器在150个epoch内迭代优化参数,绘制训练和验证损失曲线,在模型早停部分存在对该部分代码的一定注释

测试集预测


          
# 保存模型
          
torch.save(lstm_model.state_dict(), 'lstm_model.pth')
          
# 调用模型
          
lstm_model = LSTMModel(model_params['lstm']).to(device)
          
lstm_model.load_state_dict(torch.load('lstm_model.pth'))
          
lstm_model.eval()
          
# 在测试集上进行预测
          
predictions = []
          
lstm_model.eval()
          
with torch.no_grad():
          
    for inputs, _ in test_loader:
          
        outputs = lstm_model(inputs)
          
        predictions.extend(outputs.cpu().numpy())
          

          
# 将预测结果转换为 NumPy 数组
          
predictions = np.array(predictions)
      

模型评价


          
from sklearn import metrics
          
mse = metrics.mean_squared_error(y_test, np.array([i for arr in predictions for i in arr]))
          
rmse = np.sqrt(mse)
          
mae = metrics.mean_absolute_error(y_test, np.array([i for arr in predictions for i in arr]))
          
from sklearn.metrics import r2_score
          
r2 = r2_score(y_test, np.array([i for arr in predictions for i in arr]))
          
print("均方误差 (MSE):", mse)
          
print("均方根误差 (RMSE):", rmse)
          
print("平均绝对误差 (MAE):", mae)
          
print("拟合优度:", r2)
      

picture.image

预测可视化


          
# 反归一化    
          
df_max = np.max(train_set)
          
df_min = np.min(train_set)
          

          
plt.figure(figsize=(15,4), dpi =300)
          
plt.subplot(2,1,1)
          
plt.plot(train_set, color = 'c', label = '训练集')
          
plt.plot(val_set, color = 'r', label = '验证集')
          
plt.plot(test_set, color = 'b', label = '测试集')
          
plt.plot(pd.date_range(start='2021-01-06', end='2021-08-31', freq='D')
          
         ,predictions*(df_max-df_min)+df_min, color = 'y', label = '测试集预测')
          

          
plt.legend()
          

          
plt.subplot(2,1,2)
          
plt.plot(test_set, color = 'b', label = '测试集')
          
plt.plot(pd.date_range(start='2021-01-06', end='2021-08-31', freq='D')
          
         ,predictions*(df_max-df_min)+df_min, color = 'y', label = '测试集预测')
          

          
plt.legend()
          
plt.show()
      

picture.image

模型早停


          
# 定义损失函数和优化器
          
criterion = nn.MSELoss()
          
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)
          

          
# 训练参数
          
num_epochs = 150  # 总的训练轮数
          
patience = 10  # 早停的容忍次数
          
min_delta = 1e-4  # 最小损失变化
          
save_path = 'best_lstm_model.pth'  # 最佳模型保存路径
          
train_losses = []  # 存储每个 epoch 的训练损失
          
val_losses = []  # 存储每个 epoch 的验证损失
          

          
# 早停参数初始化
          
best_loss = float('inf')  # 最佳验证损失初始值为无穷大
          
current_patience = 0  # 当前容忍次数
          

          
for epoch in range(num_epochs):
          
    lstm_model.train()  # 设置模型为训练模式
          
    train_loss = 0  # 初始化训练损失
          
    for X_batch, y_batch in train_loader:
          
        optimizer.zero_grad()  # 梯度清零
          
        outputs = lstm_model(X_batch)  # 前向传播
          
        loss = criterion(outputs.squeeze(), y_batch)  # 计算损失
          
        loss.backward()  # 反向传播
          
        optimizer.step()  # 更新参数
          
        train_loss += loss.item()  # 累加训练损失
          
    
          
    train_loss /= len(train_loader)  # 计算平均训练损失
          
    train_losses.append(train_loss)  # 记录训练损失
          
    
          
    lstm_model.eval()  # 设置模型为评估模式
          
    val_loss = 0  # 初始化验证损失
          
    with torch.no_grad():  # 禁用梯度计算
          
        for X_batch, y_batch in val_loader:
          
            outputs = lstm_model(X_batch)  # 前向传播
          
            loss = criterion(outputs.squeeze(), y_batch)  # 计算损失
          
            val_loss += loss.item()  # 累加验证损失
          
    
          
    val_loss /= len(val_loader)  # 计算平均验证损失
          
    val_losses.append(val_loss)  # 记录验证损失
          
    
          
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.8f}, Val Loss: {val_loss:.8f}')  # 打印当前 epoch 的训练和验证损失
          
    
          
    # 早停逻辑判断
          
    if val_loss < best_loss - min_delta:
          
        best_loss = val_loss  # 更新最佳验证损失
          
        current_patience = 0  # 重置容忍次数
          
        torch.save(lstm_model.state_dict(), save_path)  # 保存当前最佳模型
          
    else:
          
        current_patience += 1  # 增加容忍次数
          
        if current_patience >= patience:  # 如果超过容忍次数,停止训练
          
            print(f"Early stopping at epoch {epoch+1}.")
          
            break
          

          
# 绘制训练和验证损失曲线
          
plt.figure()
          
plt.plot(train_losses, label='Train Loss')  # 绘制训练损失曲线
          
plt.plot(val_losses, label='Validation Loss')  # 绘制验证损失曲线
          
plt.legend()
          
plt.show()
      

picture.image

在模型训练时可添加模型早停,其意义在于防止模型过拟合并节省训练时间,早停是一种正则化技术,当验证损失不再改善时提前停止训练,以避免模型在训练集上过度拟合,从而在测试集上表现不佳,早停还可以减少不必要的计算,提高训练效率

  1. 设定最佳验证损失初始值:
  • 在训练开始时,将最佳验证损失 ( best\_loss ) 初始化为无穷大
  • 训练过程中的验证:
  • 在每个 epoch 结束时,计算当前的验证损失 ( val\_loss )
  • 将当前的验证损失与最佳验证损失进行比较
  • 更新最佳验证损失和保存模型:
  • 如果当前验证损失比最佳验证损失降低了一个阈值( min\_delta ),则更新最佳验证损失为当前验证损失,并重置容忍计数器 ( current\_patience )
  • 保存当前模型的状态 ( state\_dict ),作为目前的最佳模型
  • 增加容忍次数:
  • 如果当前验证损失没有显著改善,则增加容忍次数。
  • 如果容忍次数达到预设的最大容忍次数 ( patience ),则停止训练,防止过拟合

通过这种方式,模型可以在验证损失不再显著改善时自动停止训练,从而避免过拟合,并且只需要保存最优模型,节省了存储空间和训练时间

往期推荐

时间序列预测:CNN-BiLSTM模型实践

灰狼优化算法(GWO):从理论到深度学习中的实践应用

梯度提升集成:LightGBM与XGBoost组合预测

基于LSTM模型的多输入多输出单步时间序列预测

使用LSTM模型预测多特征变量的时间序列

TCN时间序列卷积神经网络

基于VMD分解的VMD-CNN-LSTM时间序列预测模型实现

基于VMD分解的VMD-LSTM时间序列预测模型实现,大力提升预测精度!

长短期记忆网络LSTM在时序数据预测演示

如果你对类似于这样的文章感兴趣。

欢迎关注、点赞、转发~

0
0
0
0
关于作者

文章

0

获赞

0

收藏

0

相关资源
融合开放,新一代边缘云网络平台 | 第 11 期边缘云主题Meetup
《融合开放,新一代边缘云网络平台 》李冰|火山引擎边缘云网络产品负责人
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论