所有文章 > AI驱动 > 卷积神经网络和循环神经网络的代码实现
卷积神经网络和循环神经网络的代码实现

卷积神经网络和循环神经网络的代码实现

在之前的文章里面,我对CNN和RNN做了一定的介绍,并且在最后给出了一些直接调用模型、函数库来做识别的例子,并没有给出CNN和RNN究竟用代码该如何实现,这篇文章就来介绍一下简单CNN和RNN用TensorFlowPytorch的实现。之前那篇文章的链接在这,感兴趣的话可以点进去看看:

CNN

这里依然使用 MNIST手写数字数据集来进行展示,下面是一个使用CNN来对数据集进行分类的代码,新手可以根据你的实际需求进一步调整和扩展它:

import tensorflow as tf
from tensorflow.keras import datasets, layers, models

# 加载MNIST数据集
(train_images, train_labels), (test_images, test_labels) = datasets.mnist.load_data()

# 对数据进行归一化处理,将像素值缩放到0到1之间
train_images, test_images = train_images / 255.0, test_images / 255.0

# 扩展维度,使数据形状符合卷积神经网络输入要求(原本是 (60000, 28, 28) 变为 (60000, 28, 28, 1))
train_images = tf.expand_dims(train_images, -1)
test_images = tf.expand_dims(test_images, -1)

# 构建卷积神经网络模型
model = models.Sequential()
# 第一个卷积层,32个卷积核,卷积核大小3x3,激活函数用ReLU,输入形状为 (28, 28, 1)
model.add(layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)))
model.add(layers.MaxPooling2D((2, 2)))
# 第二个卷积层
model.add(layers.Conv2D(64, (3, 3), activation='relu'))
model.add(layers.MaxPooling2D((2, 2)))
# 将卷积层输出展平,以便连接全连接层
model.add(layers.Flatten())
# 全连接层,有128个神经元,激活函数用ReLU
model.add(layers.Dense(128, activation='relu'))
# 输出层,10个神经元对应10个数字类别,激活函数用softmax
model.add(layers.Dense(10, activation='softmax'))

# 编译模型,指定优化器、损失函数和评估指标
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])

# 训练模型
model.fit(train_images, train_labels, epochs=5, batch_size=64)

# 在测试集上评估模型
test_loss, test_acc = model.evaluate(test_images, test_labels)
print(f"测试准确率: {test_acc}")

这个简单的 CNN 包含了卷积层(用于提取图像特征)、池化层(降采样层,进行特征降维)以及全连接层(用于分类决策)等常见结构,通过在 MNIST 数据集上的训练和评估来展示其基本的工作流程。如果想用于其他图像相关任务,通常需要根据具体的图像数据特点(比如图像尺寸、类别数量等)来相应地修改网络结构和训练参数等内容。

下面来给大家讲讲这简单的CNN每一层的功能:

先说这卷积层吧,你可以把它想象成是一个能从图像里找特点的小能手。它里面有好多那种小小的 “工具”,就好比一个个小筛子一样,这些 “小筛子” 在图像上呀,就像扫地似的,这扫扫那扫扫,从图像的各个地方把那些有用的特点给找出来。

比如说图像里物体的边边,或者是那种纹理啥的,通过这些 “小筛子” 一弄,原来的图像就变成了另外一种有这些特点的图啦,而且一个卷积层里可有好多个这样的 “小筛子” 同时干活,这样就能从好多不同的角度把图像的各种特点都给找出来了。

再讲讲池化层,这个池化层的作用就是给前面找出来的那些特点降降量,为啥要降量呢?因为前面弄出来的那些特点信息太多啦,电脑处理起来就挺费劲的,而且太多了也容易出问题呀。

那它咋降量呢?有两种常见的办法,一种就是找最大的,啥意思呢?就是在图像的一小块地方里,挑出那个数值最大的,然后就用这个最大的代表这一小块地方,整个图像都这么弄一遍,图像就变小了,但是重要的特点还留着呢。还有一种办法就是算平均,就是把那一小块地方里的数值都加一块儿,然后除以个数,算出个平均数来代表这小块地方,反正不管用哪种办法吧,最后就是让那些特点信息变少了,电脑处理起来就轻松多啦。

最后就是全连接层,这个全连接层呢,就像是个做决定的大管家一样。它先把前面弄好的那些特点信息都整理到一块儿,变成一条长长的 “信息链”,然后呢,它里面有好多小 “工作人员”,每个 “工作人员” 都跟这 “信息链” 上的每个东西连着呢,它们按照自己的一套办法算一算,最后就能告诉你这个图像大概是属于哪一类的啦,比如说这图像里画的是个啥东西呀,是猫还是狗之类的,它就能给个判断出来。

这么一个简单的 CNN 呢,就是靠着这几个部分互相配合,就能完成处理图像的事了。

这里再给出一个Pytorch的实现,这些代码都可以直接使用:

import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms

# 定义数据预处理操作,将图像转换为张量并归一化
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5,), (0.5,))
])

# 加载MNIST训练数据集
trainset = torchvision.datasets.MNIST(root='./data', train=True,
download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64,
shuffle=True, num_workers=2)

# 加载MNIST测试数据集
testset = torchvision.datasets.MNIST(root='./data', train=False,
download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64,
shuffle=False, num_workers=2)

# 定义卷积神经网络模型结构
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3) # 输入通道1(灰度图像),32个卷积核,卷积核大小3x3
self.pool = nn.MaxPool2d(2, 2) # 最大池化层,池化核2x2
self.conv2 = nn.Conv2d(32, 64, 3) # 第二层卷积,输入通道32,64个卷积核,卷积核大小3x3
self.fc1 = nn.Linear(64 * 12 * 12, 128) # 全连接层,将卷积层输出展平后连接
self.fc2 = nn.Linear(128, 10) # 输出层,对应10个数字类别

def forward(self, x):
x = self.pool(torch.relu(self.conv1(x)))
x = self.pool(torch.relu(self.conv2(x)))
x = x.view(-1, 64 * 12 * 12)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x


net = Net()

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

# 训练网络
for epoch in range(5): # 训练轮数可以按需调整
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
inputs, labels = data
optimizer.zero_grad()

outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

running_loss += loss.item()
if i % 200 == 199:
print(f'[{epoch + 1}, {i + 1}] loss: {running_loss / 200}')
running_loss = 0.0

print('训练完成')

# 在测试集上测试网络
correct = 0
total = 0
with torch.no_grad():
for data in testloader:
images, labels = data
outputs = net(images)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

print(f'测试集上的准确率: {100 * correct / total}%')

RNN

下面讲讲RNN的实现,老样子依旧先给出一份代码,然后我再根据这份代码来讲解RNN

import tensorflow as tf
import numpy as np

# 1. 准备模拟数据
# 设定时间序列的长度,也就是每个样本包含的数据点个数
time_steps = 10
# 批量大小,一次处理多少个样本
batch_size = 32
# 输入数据的维度,这里简单设为1(例如单变量的时间序列,像股票价格随时间变化这种)
input_dim = 1
# 输出数据的维度,同样设为1,比如预测下一个时间点的值
output_dim = 1

# 生成模拟的输入数据,形状为 (批量大小, 时间序列长度, 输入维度)
x = np.linspace(0, 20 * np.pi, time_steps * batch_size).reshape(batch_size, time_steps, input_dim)
# 对应的输出数据(这里简单以正弦函数为例生成目标输出,模拟预测下一个时间点的正弦值)
y = np.sin(x).reshape(batch_size, time_steps, output_dim)

# 2. 定义RNN的相关参数
# RNN单元中的隐藏单元数量,决定了模型的复杂度和对数据特征的学习能力
hidden_units = 32

# 3. 定义输入占位符(placeholder),这是TensorFlow中用于输入数据的一种机制
# 第一个None表示批量大小可以在运行时动态指定,后面的维度对应时间序列长度和输入维度
X = tf.placeholder(tf.float32, [None, time_steps, input_dim])
# 同样,为输出数据定义占位符
Y = tf.placeholder(tf.float32, [None, time_steps, output_dim])

# 4. 定义RNN的权重和偏置(这是模型学习的参数)
# 用于将输入转换到隐藏状态的权重,形状为 (输入维度, 隐藏单元数量)
weights_input_hidden = tf.Variable(tf.random_normal([input_dim, hidden_units]))
# 隐藏状态到隐藏状态的权重(因为RNN在每个时间步会用上一时刻的隐藏状态更新当前隐藏状态),形状为 (隐藏单元数量, 隐藏单元数量)
weights_hidden_hidden = tf.Variable(tf.random_normal([hidden_units, hidden_units]))
# 隐藏状态到输出的权重,形状为 (隐藏单元数量, 输出维度)
weights_hidden_output = tf.Variable(tf.random_normal([hidden_units, output_dim]))

# 输入到隐藏层的偏置,形状为 (隐藏单元数量,)
bias_input_hidden = tf.Variable(tf.zeros([hidden_units]))
# 隐藏层到隐藏层的偏置,形状为 (隐藏单元数量,)
bias_hidden_hidden = tf.Variable(tf.zeros([hidden_units]))
# 隐藏层到输出的偏置,形状为 (输出维度,)
bias_hidden_output = tf.Variable(tf.zeros([output_dim]))

# 5. 定义RNN的计算逻辑(这里手动实现简单的循环计算过程)
def rnn_cell(inputs, hidden_state):
"""
这是定义单个时间步的RNN单元计算逻辑的函数。

参数:
- inputs: 当前时间步的输入数据,形状为 (批量大小, 输入维度)
- hidden_state: 上一个时间步的隐藏状态,形状为 (批量大小, 隐藏单元数量)

返回值:
- new_hidden_state: 当前时间步更新后的隐藏状态,形状为 (批量大小, 隐藏单元数量)
"""
# 先将输入数据进行线性变换(乘以权重并加上偏置),为了和隐藏状态进行相加
input_transformed = tf.matmul(tf.reshape(inputs, [-1, input_dim]), weights_input_hidden) + bias_input_hidden
# 将变换后的输入重塑为合适的形状,方便后续计算
input_transformed = tf.reshape(input_transformed, [-1, hidden_units])
# 用上一时刻的隐藏状态进行线性变换
hidden_transformed = tf.matmul(hidden_state, weights_hidden_hidden) + bias_hidden_hidden
# 将变换后的输入和隐藏状态相加,得到新的隐藏状态(这就是RNN的核心更新步骤)
new_hidden_state = tf.tanh(input_transformed + hidden_transformed)
return new_hidden_state

# 初始化第一个时间步的隐藏状态为全0,形状为 (批量大小, 隐藏单元数量)
initial_hidden_state = tf.zeros([batch_size, hidden_units])

# 循环遍历每个时间步,逐步计算隐藏状态
all_hidden_states = []
current_hidden_state = initial_hidden_state
for step in range(time_steps):
"""
这里通过循环来模拟RNN在每个时间步的计算过程。
"""
# 获取当前时间步的输入数据
current_input = X[:, step, :]
# 使用rnn_cell函数计算当前时间步的隐藏状态更新
current_hidden_state = rnn_cell(current_input, current_hidden_state)
# 将当前时间步的隐藏状态保存下来,方便后续使用
all_hidden_states.append(current_hidden_state)

# 将所有时间步的隐藏状态堆叠起来,形状变为 (时间序列长度, 批量大小, 隐藏单元数量)
all_hidden_states = tf.stack(all_hidden_states, axis=0)

# 调整隐藏状态的顺序,使其符合常规的 (批量大小, 时间序列长度, 隐藏单元数量) 格式
all_hidden_states = tf.transpose(all_hidden_states, [1, 0, 2])

# 6. 计算输出(根据最终的隐藏状态得到预测的输出)
# 取最后一个时间步的隐藏状态(这里假设我们只关心最后时刻的输出,实际应用中可根据需求调整)
last_hidden_state = all_hidden_states[:, -1, :]
# 通过线性变换将隐藏状态转换为输出数据
outputs = tf.matmul(last_hidden_state, weights_hidden_output) + bias_hidden_output

# 7. 定义损失函数(这里使用均方误差,衡量预测值和真实值之间的差距)
loss = tf.reduce_mean(tf.square(outputs - Y))

# 8. 定义优化器(这里使用Adam优化器来更新模型的权重,让损失函数尽量变小)
optimizer = tf.train.AdamOptimizer(learning_rate=0.001).minimize(loss)

# 9. 创建会话并进行训练
with tf.Session() as sess:
# 初始化所有变量
sess.run(tf.global_variables_initializer())
# 进行多轮训练
for epoch in range(100):
_, current_loss = sess.run([optimizer, loss], feed_dict={X: x, Y: y})
if epoch % 10 == 0:
print(f'Epoch {epoch}: Loss = {current_loss}')

# 训练完成后,可以用训练好的模型进行预测(这里简单示例,实际应用中可按需扩展)
prediction = sess.run(outputs, feed_dict={X: x})
print("预测结果示例:", prediction[0])

输入层

这就是数据进入 RNN 的 “大门” ,就好比你要往一个机器里放东西,那输入层就是放数据的地方。比如说我要是处理一句话,那就把这句话里的一个个字或者词按照顺序,一个一个送进去,这就是最开始数据进来的那一步。

隐藏层

隐藏层呢是 RNN 里很关键的部分。它里面有一个个小小的 “计算单元”,这些单元在每个时间步都会工作。啥是时间步呢?就是按顺序处理数据的时候,每处理一个小部分就是一个时间步,像处理一句话里的每一个字的时候,处理每个字那就是一个时间步。

在每个时间步里,这个隐藏层的计算单元会拿到当前输入的数据,然后还会参考上一个时间步自己算出来的一个 “状态”,把这俩东西结合起来,再经过一番计算,得出一个新的 “状态”,留着给下一个时间步接着用。就好像这个计算单元有个小 “记忆”,能记住之前算出来的东西,然后再接着往下算。

而且隐藏层里的这些计算单元会重复这样的操作,一个时间步接着一个时间步地算下去,直到把咱送进去的一整串数据,比如一整句话,都处理完为止。

输出层

等隐藏层把所有的数据都处理完了,就轮到输出层登场啦。输出层就是根据隐藏层最后给出的那个 “状态”,来得出一个结果。比如说我要是想判断一句话是啥意思,或者预测下一个字可能是啥,那输出层就会根据隐藏层最后的那个状态,给出它的 “答案”,就像告诉咱们它琢磨出来这句话大概表达什么,或者觉得下一个字可能是啥样的。

而现在模型库都已经内置实现好了的RNN,要使用的话可以直接调用就行。

import tensorflow as tf
import numpy as np

# 生成一些模拟的时间序列数据示例(这里简单生成正弦函数相关的数据当作示例)
time_steps = 20
batch_size = 32
input_dim = 1
output_dim = 1

# 生成输入数据
x = np.linspace(0, 20 * np.pi, time_steps * batch_size).reshape(batch_size, time_steps, input_dim)
y = np.sin(x).reshape(batch_size, time_steps, output_dim)

# 定义RNN模型
model = tf.keras.Sequential([
tf.keras.layers.SimpleRNN(units=32, return_sequences=True, input_shape=(time_steps, input_dim)),
tf.keras.layers.Dense(units=output_dim)
])

# 编译模型,指定优化器、损失函数等
model.compile(optimizer='adam', loss='mean_squared_error')

# 训练模型
model.fit(x, y, epochs=10, batch_size=batch_size)

# 使用模型进行预测(这里只是简单演示预测下一步,实际可以按需拓展)
new_x = np.linspace(20 * np.pi, 21 * np.pi, time_steps).reshape(1, time_steps, input_dim)
predicted_y = model.predict(new_x)

print(predicted_y

下面还有pytorch的调用:

import torch
import torch.nn as nn

# 超参数
input_size = 1
hidden_size = 32
num_layers = 1
output_size = 1
sequence_length = 20

# 生成模拟输入数据(类似上面的示例,简单用正弦函数相关模拟)
x = torch.linspace(0, 20 * torch.pi, sequence_length).unsqueeze(1).unsqueeze(0).repeat(1, 1, 1)
y = torch.sin(x).squeeze(-1)

# 定义RNN模型类
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)

def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
out, _ = self.rnn(x, h0)
out = self.fc(out[:, -1, :])
return out


rnn_model = RNN(input_size, hidden_size, num_layers, output_size)

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(rnn_model.parameters())

# 训练模型
for epoch in range(100):
optimizer.zero_grad()
output = rnn_model(x)
loss = criterion(output, y)
loss.backward()
optimizer.step()
if epoch % 10 == 0:
print(f'Epoch {epoch}: Loss {loss.item()}')

# 进行简单预测(示例,可按需拓展)
new_x = torch.linspace(20 * torch.pi, 21 * torch.pi, sequence_length).unsqueeze(1).unsqueeze(0)
predicted_y = rnn_model(new_x)
print(predicted_y)

本文章转载微信公众号@Chal1ceAI

#你可能也喜欢这些API文章!