1. 序列模型

1.1 自回归模型

(1)自回归模型:对于一个包含T个’时间’节点的输入序列,若预测其中的第t个数据,则依赖于该节点前面的观察数据

image-20240806194905905

  • 基于此,对于整个序列的估计值,可以表示为:

image-20240806195042014

  • 然而这对于长序列则计算量过大。我们可以使用该节点前面的τ个样本建模,控制模型参数的数量。
    • 如果序列可以按这种方式计算,则认为其满足马尔科夫条件。
    • 当τ=1(根据前一个节点推测后一个节点)时,序列估计可以写成如下形式。

image-20240806201637200

Tips:称为自回归的原因是输入与输出预测同一类型的数据。

(2)隐变量自回归模型 通过一个隐藏(latent):的变量推测Xt的值。而该隐藏变量来自于上一状态的隐变量以及当前Xt-1节点的值。(RNN, Recurrent Neural Network)的思想)

image-20240806171721063

1.2 训练

如下将演示如何根据正弦函数的样本点,建立τ=4的自回归模型

  • 第一步:模拟正弦函数的数据,x轴从0到1000
1
2
3
4
5
6
7
8
9
%matplotlib inline
import torch
from torch import nn
from d2l import torch as d2l

T = 1000  # 总共产生1000个点
time = torch.arange(1, T + 1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,)) #添加一点噪音
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))

image-20240806205930245

  • 第二步:生成特征与标签数据(前4个样本作为输入,第5个样本作为预测)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
tau = 4
features = torch.zeros((T - tau, tau)) #tau列
for i in range(tau): #逐列填充,每次错开一个元素
    features[:, i] = x[i: T - tau + i]
labels = x[tau:].reshape((-1, 1))

features[:4,:], labels[:4]
# (tensor([[-0.1026, -0.2982,  0.1424,  0.0798],
#          [-0.2982,  0.1424,  0.0798,  0.1033],
#          [ 0.1424,  0.0798,  0.1033,  0.0814],
#          [ 0.0798,  0.1033,  0.0814, -0.3063]]),
#  tensor([[ 0.1033],
#          [ 0.0814],
#          [-0.3063],
#          [ 0.1354]]))

batch_size, n_train = 16, 600
# 只有前n_train个样本用于训练
# 批量数据迭代
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),
                            batch_size, is_train=True)
  • 第三步:建立模型
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 初始化网络权重的函数
def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)

# 一个简单的多层感知机
def get_net():
    net = nn.Sequential(nn.Linear(4, 10),
                        nn.ReLU(),
                        nn.Linear(10, 1))
    net.apply(init_weights)
    return net

# 平方损失,不做聚合操作
loss = nn.MSELoss(reduction='none')
  • 第四步:训练模型
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def train(net, train_iter, loss, epochs, lr):
    trainer = torch.optim.Adam(net.parameters(), lr) #优化算法
    for epoch in range(epochs):
        for X, y in train_iter:
            trainer.zero_grad()
            l = loss(net(X), y)
            l.sum().backward()
            trainer.step()
        print(f'epoch {epoch + 1}, '
              f'loss: {d2l.evaluate_loss(net, train_iter, loss):f}')

net = get_net()
train(net, train_iter, loss, 5, 0.01)

1.3 预测

  • 预测方式1:特征数据全部来自已知数据
1
2
3
4
5
6
onestep_preds = net(features)
d2l.plot([time, time[tau:]],
         [x.detach().numpy(), onestep_preds.detach().numpy()], 'time',
         'x', legend=['data', '1-step preds'], xlim=[1, 1000],
         figsize=(6, 3))
# 下图左
  • 预测方式2:前604个样本来训练集的已知数据。再后面预测时,每次将新预测的样本作为输入预测下一个输出。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
multistep_preds = torch.zeros(T) #初始化全0
multistep_preds[: n_train + tau] = x[: n_train + tau] #前面的训练集数据已知,不做预测
for i in range(n_train + tau, T):
    # 将新预测的结果加入特征中,作为下一次预测的输入
    multistep_preds[i] = net(
        multistep_preds[i - tau:i].reshape((1, -1)))

d2l.plot([time, time[tau:], time[n_train + tau:]],
         [x.detach().numpy(), onestep_preds.detach().numpy(),
          multistep_preds[n_train + tau:].detach().numpy()], 'time',
         'x', legend=['data', '1-step preds', 'multistep preds'],
         xlim=[1, 1000], figsize=(6, 3))
# 下图右

image-20240806211506715

  • 如上可以看出:
    • 在单步预测(输入均为实际观测数据)时,模型效果不错;
    • 而在预测多步(将最近的预测作为下一步输入)时,即更远的预测时,模型效果不尽如人意。
skforecast:一款解决时序预测的神库-CSDN博客

2 文本预处理

一篇文章可以视为一串单词的序列,需要进行必要的预处理操作步骤:

(1)将文本作为字符串进行加载;

(2)将字符串拆分为词元(单词或字符);

(3)建立一个词表,将词元映射到数字索引;

(4)将文本转换为数字索引序列。

2.1 读取数据集

  • 示例数据:时光机器(The Time Machine)小说
  • 如下操作,按行读取全部小说文本。结果为list,其中每个元素表示一行的文本。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#@save
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
                                '090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine():  #@save
    """将时间机器数据集加载到文本行的列表中"""
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine() # list
print(f'# 文本总行数: {len(lines)}')
# 文本总行数: 3221
print(lines[0])
# the time machine by h g wells
print(lines[10])
# twinkled and his usually pale face was flushed and animated the

2.2 词元化

  • 词元(token),通常指一个单词或字符
  • 如下操作将每一行以词元为单位,拆分为一个list,结果返回一个list of list
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def tokenize(lines, token='word'):  #@save
    """将文本行拆分为单词或字符词元"""
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print('错误:未知词元类型:' + token)

tokens = tokenize(lines) #list of list, 内部list的元素即为词元
print(tokens[0])
# ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']

2.3 词表

  • 词元的类型是字符串,而模型需要的是数字;
  • 词表(Vocabulary):类似于Python中的字典,将输入的词元转换为数字索引;
  • 语料库(Corpus):对所有文本中唯一词元的统计结果,按频率降序排。
    • 对于低频率出现的词元,可设置一定标准的阈值过滤;
    • 对于语料库中不存在,或者已过滤的词元,将被映射到未知词元’<unk>’, 其数字索引记为0
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Vocab:  #@save
    """文本词表"""
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        # 按出现频率排序
        counter = count_corpus(tokens)
        # counter.items() e.g. [('word1', 5), ('word2', 3), ('word3', 8)] 
        self._token_freqs = sorted(counter.items(), key=lambda x: x[1],
                                   reverse=True)
        # 未知词元的索引为0
        self.idx_to_token = ['<unk>'] + reserved_tokens
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}
        for token, freq in self._token_freqs:
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1

    def __len__(self):
        return len(self.idx_to_token)
    
	# 如果传入单个词元,则返回其索引;如果传入词元列表,则返回对应的索引列表。
    def __getitem__(self, tokens):
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    # 根据输入的索引或索引列表返回对应的词元。
    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]

    @property
    def unk(self):  # 未知词元的索引为0
        return 0

    @property
    def token_freqs(self):
        return self._token_freqs

def count_corpus(tokens):  #@save
    """统计词元的频率"""
    # 这里的tokens是1D列表或2D列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # 将词元list of list列表展平成一个列表
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)

vocab = Vocab(tokens)
list(vocab.token_to_idx.items())[:5]
# [('<unk>', 0), ('the', 1), ('i', 2), ('and', 3), ('of', 4)]
vocab.token_freqs[:5]
# [('the', 2261), ('i', 1267), ('and', 1245), ('of', 1155), ('a', 816)]

2.4 整合所有功能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def load_corpus_time_machine(max_tokens=-1):  #@save
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char') #词元为字符
    vocab = Vocab(tokens)
    # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
    # 所以将所有文本行展平到一个列表中
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab

# corpus 全部文本的词元
corpus, vocab = load_corpus_time_machine()
len(corpus), len(vocab)
# (170580, 28)
corpus[:5]
# [3, 9, 2, 1, 3]
vocab.token_freqs[:5]
# [(' ', 29927), ('e', 17838), ('t', 13515), ('a', 11704), ('i', 10138)]

3. 语言模型和数据集

3.1 自然语言统计

  • 单个词元统计(一元语法)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import random
import torch
from d2l import torch as d2l

tokens = d2l.tokenize(d2l.read_time_machine())
# 将list of list转为list
corpus = [token for line in tokens for token in line]

vocab = d2l.Vocab(corpus)
vocab.token_freqs[:10]
# [('the', 2261), ('i', 1267), ('and', 1245)]
  • 两个连续词元统计(二元语法)
1
2
3
4
5
6
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_tokens[:3]
# [('the', 'time'), ('time', 'machine'), ('machine', 'by')]
bigram_vocab = d2l.Vocab(bigram_tokens)
bigram_vocab.token_freqs[:3]
# [(('of', 'the'), 309), (('in', 'the'), 169), (('i', 'had'), 130)]
  • 三个连续词元统计(三元语法)
1
2
3
4
5
6
7
trigram_tokens = [triple for triple in zip(
    corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_tokens)
trigram_vocab.token_freqs[:3]
# [(('the', 'time', 'traveller'), 59),
#  (('the', 'time', 'machine'), 30),
#  (('the', 'medical', 'man'), 24)]
  • 根据下图的频率分布可视化,可以看出:
    • 三者均不同程度上遵循齐普夫定律,呈现较为显著的衰减
    • 少数高频词占了全部语料库的大多数,大部分可能形式的n元组很少出现
1
2
3
4
5
6
freqs = [freq for token, freq in vocab.token_freqs]
bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',
         ylabel='frequency: n(x)', xscale='log', yscale='log',
         legend=['unigram', 'bigram', 'trigram'])

../_images/output_language-models-and-dataset_789d14_66_0.svg

3.2 读取长序列数据

如前所述,在处理长序列时,通常仅考虑待预测数据前的若干节点的观测数据。

  • batch_size:每个小批量同时处理的子序列样本数目;

  • num_steps:每个子序列中预定义的时间步数。

  • 在小批量采样时,由如下两种方式(子序列的时间步数都不重叠)

  • 随机偏移量:从一个随机起始点开始截取序列,增加每个epoch迭代的随机性

(1)随机抽样

  • 每个批量样本之间的起始时间步数无顺序关系
    • e.g. 第一个小批量的第一个序列与第二个小批量的第一个序列无相邻的顺序关系
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def seq_data_iter_random(corpus, batch_size, num_steps):  #@save
    """使用随机抽样生成一个小批量子序列"""
    # 从随机偏移量开始对序列进行分区(随机丢弃开头的几个位置数据),随机范围包括num_steps-1
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 计算可以生成多少个不重叠的子序列(减去1,是因为我们需要考虑标签)
    num_subseqs = (len(corpus) - 1) // num_steps
    # 每个子序列的起始索引
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    # 打乱初始索引
    random.shuffle(initial_indices)

    def data(pos):
        # 返回从pos位置开始的长度为num_steps的序列
        return corpus[pos: pos + num_steps]

    #根据每个小批量包含的子序列数,计算有多少个批量
    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size * num_batches, batch_size):
        # 在这里,每个批量内所有子序列的起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        X = [data(j) for j in initial_indices_per_batch]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        yield torch.tensor(X), torch.tensor(Y)
  • 示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
random_iter = seq_data_iter_random(list(range(35)), batch_size=2, num_steps=5)

next(iter(random_iter))
# (tensor([[24, 25, 26, 27, 28],
#          [14, 15, 16, 17, 18]]),
#  tensor([[25, 26, 27, 28, 29],
#          [15, 16, 17, 18, 19]]))
next(iter(random_iter))
# (tensor([[ 9, 10, 11, 12, 13],
#          [ 4,  5,  6,  7,  8]]),
#  tensor([[10, 11, 12, 13, 14],
#          [ 5,  6,  7,  8,  9]]))

(2)顺序分区

  • 保证两个相邻的小批量中的子序列在原始序列上也是相邻的。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save
    """使用顺序分区生成一个小批量子序列"""
    # 从随机偏移量开始划分序列
    offset = random.randint(0, num_steps)
    # 确保可以被整除的tokens词元数量
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    # 所有作为特征的输入序列
    Xs = torch.tensor(corpus[offset: offset + num_tokens])
    # 所有的标签值
    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
    # 行数表示每个批量包含的子序列数
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    num_batches = Xs.shape[1] // num_steps
    for i in range(0, num_steps * num_batches, num_steps):
        X = Xs[:, i: i + num_steps]
        Y = Ys[:, i: i + num_steps]
        yield X, Y
  • 示例
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
iter_seq = seq_data_iter_sequential(list(range(35)), batch_size=2, num_steps=5)

next(iter(iter_seq))
# (tensor([[ 2,  3,  4,  5,  6],
#          [18, 19, 20, 21, 22]]),
#  tensor([[ 3,  4,  5,  6,  7],
#          [19, 20, 21, 22, 23]]))

next(iter(iter_seq))
# (tensor([[ 7,  8,  9, 10, 11],
#          [23, 24, 25, 26, 27]]),
#  tensor([[ 8,  9, 10, 11, 12],
#          [24, 25, 26, 27, 28]]))

Tips:无论上述哪一种方式,生成的序列样本数据都是不重叠的。

整合上述迭代方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 定义一个类,生成数据迭代器
class SeqDataLoader:  #@save
    """加载序列数据的迭代器"""
    def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
        if use_random_iter:
            self.data_iter_fn = d2l.seq_data_iter_random
        else:
            self.data_iter_fn = d2l.seq_data_iter_sequential
        # 返回所有文档的词元索引列表,以及词表vocab
        self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
        self.batch_size, self.num_steps = batch_size, num_steps

    def __iter__(self):
        return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
    
# 综合词表与数据迭代器
def load_data_time_machine(batch_size, num_steps,  #@save
                           use_random_iter=False, max_tokens=10000):
    """返回时光机器数据集的迭代器和词表"""
    data_iter = SeqDataLoader(
        batch_size, num_steps, use_random_iter, max_tokens)
    return data_iter, data_iter.vocab

4. 循环神经网络

  • 循环神经网络是具有隐状态的神经网络;
  • 隐状态与隐藏层的概念截然不同
    • 隐藏层是指在从输入到输出的路径上,隐藏的层;
    • 隐状态可以理解为RNN中的记忆单元,它保存了序列中先前时间步的信息,并传递给后续的时间步。

4.1 无隐状态的神经网络

以简单的单隐藏层MLP为例:

  • 输入X,隐藏层输出H,隐藏层权重参数Wxh,偏置参数b
  • 输出层O,权重参数Whq,偏置参数b

image-20240807135451800

4.2 有隐状态的神经网络

对于一个特定时间步长的序列:

  • Xt步的输出(Ot,表示对于Xt+1的预测),取决于当前时间序列的隐状态Ht;
  • 而当前的隐状态Ht由当前时间步的输入Xt与前一个时间步的隐状态Ht-1共同计算得出。

image-20240807140351095

  • 对于输入的小批量数据,由n个长度为T的序列样本组成。若样本特征长度为d时,则每次训练输入数据为 Xt:n×d
  • 基于Xt的输入,参与Ht计算的权重参数为Wxh;基于Ht-1隐状态,参与Ht计算的权重参数为Whh,此外还有偏置;
  • Ht隐状态,最终计算Ot的权重参数为Whq,以及偏置。

image-20240807143437717

(1)如上可以看出,有隐状态的神经网络从公式上来看,与单隐藏层的神经网络非常类似。只是多了一项Wxh参数的计算过程。

(2)在同一批量的不同时间节点迭代时,仍然是上述这些模型参数。即模型参数的开销不会随着时间步的增加而增加。

  • 如下,演示同时对特定一个批量内多个子序列的第i个词元的隐状态计算:
    • 输入的批量包含3条子序列,每条子序列中单个词元的特征长度为1;
    • 隐状态的神经元个数设置为4。
1
2
3
4
5
6
7
import torch
from d2l import torch as d2l

X, W_xh = torch.normal(0, 1, (3, 1)), torch.normal(0, 1, (1, 4))
H, W_hh = torch.normal(0, 1, (3, 4)), torch.normal(0, 1, (4, 4))

torch.matmul(X, W_xh) + torch.matmul(H, W_hh)

4.3 基于RNN的字符级语言模型

  • 在字符级语言模型中,文本词元为字符而不是单词;
  • 如下可以理解为小批量大小为1,文本序列为’machine'

image-20240807145613774

4.4 困惑度

  • 语言模型大部分情况下可以理解为分类问题,可以利用交叉熵计算模型输入与标签之间的差异;
  • 模型性能(损失)可根据一个序列中所有词元(n)的平均交叉熵损失来衡量;
  • 实际建模时,自然语言处理的科学家更喜欢使用困惑度(Perplexity)指标。本质上只是对上述进行exp指数运算
  • 该指标可以理解为对下一个词元的实际选择数的调和平均数。
    • 困惑度越低,说明模型的预测效果越好。最好的情况为1,即完美地估计了标签词元;
    • 如果困惑度为 kk,那么可以理解为模型预测下一个词时的候选词数量大致为 k。

image-20240807151122573

5. RNN的从零实现

从零开始基于RNN实现字符级语言模型

  • 读取数据集
    • batch_size表示每个批量同时读取/处理多少条子序列
    • num_steps表示每条子序列的长度
1
2
3
4
5
6
7
8
9
%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

5.1 独热编码

  • 每个词元经转换后表示为一个数字索引,然后经独热编码表示为特征向量;
  • 若词表的唯一词元有N个(len(vocab)),则词元索引范围是 0~N-1,其特征向量长度为N
1
2
F.one_hot(torch.tensor([0, 2]), len(vocab)).shape
# torch.Size([2, 28])
  • 特征编码前,小批量输入形状为二维张量(批量大小,时间步数/序列长度)
  • 编码后则为3维张量,需要再调整下维度的顺序,方便后续操作。(时间步数/序列长度,批量大小,词表大小/特征长度)
1
2
3
4
5
6
# 批量大小为2,序列长度为5
X = torch.arange(10).reshape((2, 5))

#X.T转置操作,将序列长度维数放在前面之后,再进行独热编码
F.one_hot(X.T, 28).shape 
# torch.Size([5, 2, 28])

5.2 初始化模型参数

  • RNN模型参数可参考4.2部分介绍,主要分为隐状态参数与输出层参数
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def get_params(vocab_size, num_hiddens, device):
    # 输入与输出的词元的特征向量长度相同
    num_inputs = num_outputs = vocab_size

    def normal(shape):
        return torch.randn(size=shape, device=device) * 0.01

    # 隐藏层参数
    W_xh = normal((num_inputs, num_hiddens))
    W_hh = normal((num_hiddens, num_hiddens))
    b_h = torch.zeros(num_hiddens, device=device)
    # 输出层参数,num_outputs等于词元类别数
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)
    # 附加梯度
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    return params

5.3 RNN模型

  • 初始化隐状态,形状为(批量大小,隐藏单元数)
1
2
3
def init_rnn_state(batch_size, num_hiddens, device):
    return (torch.zeros((batch_size, num_hiddens), device=device), )
# 这里返回元组,因为后面章节的隐状态会有多个变量(LSTM)
  • 定义前向传播函数,返回一轮batch的预测结果(批量大小×序列长度,词表大小),以及更新的隐状态
    • 输入inputs为上述5.1所介绍的三维张量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def rnn(inputs, state, params):
    # inputs的形状:(时间步数量,批量大小,词表大小)
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state
    outputs = []
    # 逐时间步迭代:X的形状为(批量大小,词表大小)
    for X in inputs:
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        # Y形状:(批量大小,词表大小)
        outputs.append(Y)
    return torch.cat(outputs, dim=0), (H,) 
#纵向叠加,增加行数,列数不变,维度不变
  • 定义模型的类
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
class RNNModelScratch: #@save
    """从零开始实现的循环神经网络模型"""
    def __init__(self, vocab_size, num_hiddens, device,
                 get_params, init_state, forward_fn):
        self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
        self.params = get_params(vocab_size, num_hiddens, device)
        self.init_state, self.forward_fn = init_state, forward_fn

    def __call__(self, X, state):
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
        return self.forward_fn(X, state, self.params)

    def begin_state(self, batch_size, device):
        return self.init_state(batch_size, self.num_hiddens, device)
  • 示例输出
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)
state = net.begin_state(X.shape[0], d2l.try_gpu())

# 批量包含的序列数为2,序列长度为5
X = torch.arange(10).reshape((2, 5))

Y, new_state = net(X.to(d2l.try_gpu()), state)
Y.shape
# torch.Size([10, 28])
new_state[0].shape #批量内每个子序列最后一个时间步的隐状态
# torch.Size([2, 512])

5.4 预测

  • prefix:包含若干词元的初始文本
  • num_preds:往后预测多少个词元
  • 在预测过程中,首先逐个遍历给定的初始词元,但不做预测,仅用于更新隐状态。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def predict_ch8(prefix, num_preds, net, vocab, device):  #@save
    """在prefix后面生成新字符"""
    # batch_size=1表示单批量逐个预测
    state = net.begin_state(batch_size=1, device=device)
    # 首先将prefix的第一个词元加入到outputs中
    outputs = [vocab[prefix[0]]]
    # 取outputs里最新的一个词元,作为预测下一个词元的输入
    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
    for y in prefix[1:]:  # 预热期
        _, state = net(get_input(), state)
        outputs.append(vocab[y]) #传入真实值
    for _ in range(num_preds):  # 预测num_preds步
        y, state = net(get_input(), state)
        outputs.append(int(y.argmax(dim=1).reshape(1))) #传入预测值
    return ''.join([vocab.idx_to_token[i] for i in outputs])
  • 示例
1
2
predict_ch8('time traveller ', 10, net, vocab, d2l.try_gpu())
# 'time traveller xejnnnnnnn'

5.5 梯度剪裁

  • 对于长度为T的序列,训练时会执行T次矩阵乘法,来进行反向传播、更新梯度。
  • 这对于较长的序列,可能会导致梯度爆炸,模型无法收敛。
  • 此时,可以通过梯度剪裁,将参数的梯度的范数设置一个上限θ(不改变方向)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def grad_clipping(net, theta):  #@save
    """裁剪梯度"""
    if isinstance(net, nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    #若范数长度大于θ,就设置其值为θ
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm

5.6 训练

  • 在3.2小节中,学习了两种小批量序列样本迭代方法:(1)顺序分区;(2)随机抽样
  • 对于顺序分区,相邻两个batch iteration中,对应的第i个子序列的位序也是相邻的。
    • 隐状态仅需要在刚开始时初始化一次。在后面的多轮小批量训练时,可以继承。
    • 为减少计算量,在处理每个批量数据前,对隐状态参数梯度分离。
  • 对于随机抽样,相邻两个batch iteration的序列样本无确定关系(更常用些)
    • 隐状态在每个batch iteration时,都需要随机初始化(其权重参数是持续更新的)。
  • 如下为训练一个epoch的代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#@save
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
    """训练网络一个迭代周期(定义见第8章)"""
    state, timer = None, d2l.Timer()
    metric = d2l.Accumulator(2)  # 训练损失之和,词元数量
    for X, Y in train_iter:
        if state is None or use_random_iter:
            # 在第一次迭代或使用随机抽样时初始化state
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            if isinstance(net, nn.Module) and not isinstance(state, tuple):
                # state对于nn.GRU是个张量
                state.detach_()
            else:
                # state对于nn.LSTM或对于我们从零开始实现的模型是个张量
                for s in state:
                    s.detach_()
        y = Y.T.reshape(-1)
        X, y = X.to(device), y.to(device)
        y_hat, state = net(X, state)
        l = loss(y_hat, y.long()).mean() # y转为长整型(64位整数)
        if isinstance(updater, torch.optim.Optimizer):
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1)
            updater.step()
        else:
            l.backward()
            grad_clipping(net, 1)
            # 因为已经调用了mean函数
            updater(batch_size=1)
        metric.add(l * y.numel(), y.numel())
    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
  • 如下为训练的最终形式
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#@save
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
              use_random_iter=False):
    """训练模型(定义见第8章)"""
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
                            legend=['train'], xlim=[10, num_epochs])
    # 初始化
    if isinstance(net, nn.Module):
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
    predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
    # 训练和预测
    for epoch in range(num_epochs):
        ppl, speed = train_epoch_ch8(
            net, train_iter, loss, updater, device, use_random_iter)
        if (epoch + 1) % 10 == 0:
            print(predict('time traveller'))
            animator.add(epoch + 1, [ppl])
    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
    print(predict('time traveller'))
    print(predict('traveller'))
  • 实际训练
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
## 训练--顺序分区(下图左)
num_epochs, lr = 500, 1
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())
# 困惑度 1.0, 38769.2 词元/秒 cuda:0
# time travelleryou can show black is white by argument said filby
# travelleryou can show black is white by argument said filby

## 训练--随机抽样(下图右)
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu(),
          use_random_iter=True)
# 困惑度 1.5, 37930.8 词元/秒 cuda:0
# time travellerit s against reason said filbywas allaing the time
# travellerit s against reason said filbywas allaing the time

image-20240807190952553

6. RNN的简洁实现

  • 准备数据
1
2
3
4
5
6
7
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

6.1 定义模型

  • 基于torch的nn.RNN,定义一个具有256个隐藏单元的单隐藏层,其不涉及输出层的计算
1
2
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
  • 初始化隐状态,形状为(隐藏层数,批量大小,隐藏单元数)
1
2
3
state = torch.zeros((1, batch_size, num_hiddens))
state.shape
# torch.Size([1, 32, 256])
  • 模拟计算,更新隐状态
    • 如下的Y表示,所有批量的子序列的最后一层的隐状态(一般后面需要再接MLP预测Ot输出)
    • state_new表示所有批量的子序列的最后一步的隐状态
1
2
3
4
5
6
7
X = torch.rand(size=(num_steps, batch_size, len(vocab)))
Y, state_new = rnn_layer(X, state)
Y.shape, state_new.shape
# (torch.Size([35, 32, 256]), torch.Size([1, 32, 256]))
# 35为序列长度
# 32为批量大小
# 256 隐藏层单元数
  • 定义一个完整的RNNModel类
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#@save
class RNNModel(nn.Module):
    """循环神经网络模型"""
    def __init__(self, rnn_layer, vocab_size, **kwargs):
        super(RNNModel, self).__init__(**kwargs)
        self.rnn = rnn_layer
        self.vocab_size = vocab_size
        self.num_hiddens = self.rnn.hidden_size
        # 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
        if not self.rnn.bidirectional:
            self.num_directions = 1
            self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
        else:
            self.num_directions = 2
            self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

    def forward(self, inputs, state):
        X = F.one_hot(inputs.T.long(), self.vocab_size)
        X = X.to(torch.float32)
        Y, state = self.rnn(X, state)
        # 全连接层首先将Y的形状改为(时间步数*批量大小,隐藏单元数)
        # 它的输出形状是(时间步数*批量大小,词表大小)。
        output = self.linear(Y.reshape((-1, Y.shape[-1])))
        return output, state

    def begin_state(self, device, batch_size=1):
        if not isinstance(self.rnn, nn.LSTM):
            # nn.GRU以张量作为隐状态
            return  torch.zeros((self.num_directions * self.rnn.num_layers,
                                 batch_size, self.num_hiddens),
                                device=device)
        else:
            # nn.LSTM以元组作为隐状态
            return (torch.zeros((
                self.num_directions * self.rnn.num_layers,
                batch_size, self.num_hiddens), device=device),
                    torch.zeros((
                        self.num_directions * self.rnn.num_layers,
                        batch_size, self.num_hiddens), device=device))

6.2 训练与预测

  • 训练函数仍参考5.6小节
  • 实例化模型
1
2
3
4
5
device = d2l.try_gpu()
net = RNNModel(rnn_layer, vocab_size=len(vocab))
net = net.to(device)
d2l.predict_ch8('time traveller', 10, net, vocab, device)
# 'time travellerpppwllllll'
  • 训练模型
1
2
3
4
5
num_epochs, lr = 500, 1
d2l.train_ch8(net, train_iter, vocab, lr, num_epochs, device)
# perplexity 1.3, 255236.1 tokens/sec on cuda:0
# time traveller but now you be in aly has we mave the gratienttan
# travellerit s ala to be accupted is an absolute procimind a