自然言語処理編 - LSTM モデル学習 その弐
前の記事では、モデル学習用データとテスト用データの準備を行った。 学習用データとは、訓練データと、教師データを指す。 学習用データは、文章を形態素解析し、得られた単語と品詞を、 一意の数値に置き換えたものを保持している。 それぞれを、単語 ID と品詞 ID とする。 テスト用データは、モデル学習用と同様だが、データ数が 1/10 程度となる。 この記事では、準備したデータで、モデル学習と推論をしてみる。
LSTM モデルを生成する
ここで作成する LSTM モデルは、以下のような構成としている。 このモデルは、単語 ID を入力すると、品詞 ID を出力する。
flowchart LR subgraph LstmModel[単語に品詞を付ける LSTM モデル] i1[...] -- メモリセル i - 1 --> lstm[LSTM] i1 -- 品詞 ID i - 1 --> lstm x[単語 ID i] --> embd embd[単語埋め込み層] --> lstm lstm --> w[線形変換層] w --> y[品詞 ID i] o1[...] lstm -- メモリセル i --> o1 lstm -- 品詞 ID i --> o1 end
実際に LSTM モデルクラスを実装する。 以下の設定は、本記事では利用しないが、次回以降の記事で利用するための設定となる。
- Embedding コンストラクタに
padding_idx=0
を設定し、訓練データのパディング用 ID を指定する。 - LSTM コンストラクタに
batch_first=True
を設定し、入力を(バッチサイズ、系列長、次元数)形式にする。 - LSTM コンストラクタに
num_layers
を設定し、多層 LSTM に対応する。
torch.nn.Embedding については、理解を助けるための記事 「PyTorch の埋め込みクラス(torch.nn.Embedding)の概略」 を掲載した。
pos_tagger_lstm.py
import torch
class PosTaggerLSTM(torch.nn.Module):
"""品詞タグ付け用 LSTM モデル
Attributes:
embedding (torch.nn.Embedding): 単語埋め込み層:単語IDをベクトル表現に変換
lstm (torch.nn.LSTM): LSTM 層
linear (torch.nn.Linear): 線形変換層
"""
def __init__(self, vocab_size: int, num_pos_tags: int, embedding_dim: int, num_layers: int = 1) -> None:
"""コンストラクタ
Args:
vocab_size (int): 単語の種類数 (語彙サイズ)
num_pos_tags (int): 品詞の種類数
embedding_dim (int): 単語ベクトルの次元数、LSTM の隠れ層の次元数
num_layers (int): LSTM の層数
"""
super(PosTaggerLSTM, self).__init__()
# 単語埋め込み層:単語IDをベクトル表現に変換
self.embedding = torch.nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
# LSTM 層
self.lstm = torch.nn.LSTM(embedding_dim, embedding_dim, batch_first=True, num_layers=num_layers)
# 線形変換層:LSTM の出力を品詞数に変換
self.linear = torch.nn.Linear(embedding_dim, num_pos_tags)
def forward(self, input_sequence: torch.Tensor) -> torch.Tensor:
"""品詞予測を行う
Args:
input_sequence (torch.Tensor): 入力系列 (単語IDのバッチ)
Returns:
torch.Tensor: 出力系列 (各単語の品詞予測)
"""
# 単語埋め込み層
embedded_sequence = self.embedding(input_sequence)
# LSTM 層
lstm_output, _ = self.lstm(embedded_sequence)
# 線形変換層
output = self.linear(lstm_output)
return output
モデルの学習を行う
実際にモデル学習を行うソースを以下に示す。 このモデル学習ソースは、基本的な構成としているため、 このままでは、大規模データの学習などには向かない。
このソースは、エポックは、10 で、1000 項目毎に途中経過をログ表示する。 1 エポック毎に、学習途中のモデルをファイルに保存する。 学習が完了すると、lstm-simple-epoch01.model から lstm-simple-epoch10.model までのモデルファイルが生成される。 筆者の環境では、学習に 10 分程度要した。
train_simple.py
import bz2
import numpy as np
import time
import torch
from pos_tagger_lstm import PosTaggerLSTM
# 処理済みの単語を重複なく保持する入出力ファイル名
WORD_FILE = 'lstm-word.txt.bz2'
# 処理済みの品詞を重複なく保持する入出力ファイル名
POS_FILE = 'lstm-pos.txt.bz2'
# 学習用訓練データ出力ファイル名
X_TRAIN_FILE = 'lstm-simple-x-train.npy'
# 学習用教師データ出力ファイル名
Y_TRAIN_FILE = 'lstm-simple-y-train.npy'
# 学習率
LEARNING_RATE = 0.01
# エポック数
EPOCHS = 10
def main():
"""LSTM モデル学習を行う"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 単語 ID 辞書を読み込む
with bz2.open(WORD_FILE, 'rt', encoding='utf-8') as f:
word_ids = {line.split('\t')[0]: int(line.split('\t')[1]) for line in f}
# 品詞 ID 辞書を読み込む
with bz2.open(POS_FILE, 'rt', encoding='utf-8') as f:
pos_ids = {line.split('\t')[0]: int(line.split('\t')[1]) for line in f}
# 訓練データファイルを読み込む
x_train = np.load(X_TRAIN_FILE, allow_pickle=True)
# 教師データファイルを読み込む
y_train = np.load(Y_TRAIN_FILE, allow_pickle=True)
# モデルに、単語 ID 数と、品詞 ID 数を指定する
model = PosTaggerLSTM(len(word_ids) + 1, len(pos_ids), 100).to(device)
# 最適化アルゴリズムに確率的勾配降下法を設定する
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
# 損失関数に交差エントロピー損失を設定する
criterion = torch.nn.CrossEntropyLoss()
for epoch in range(EPOCHS):
epoch_start_time = time.time()
# 損失の合計値
loss_sum = 0.0
# 損失値(1000 件毎)
loss_1k = 0.0
for i in range(len(x_train)):
# 訓練データをテンソル値に変換する
x = torch.tensor(x_train[i], dtype=torch.long).to(device)
# 訓練する
output = model(x)
# 教師データをテンソル値に変換する
y = torch.tensor(y_train[i], dtype=torch.long).to(device)
# 損失値(予測結果と正解データとの間の誤差)を計算する
loss = criterion(output, y)
if i % 1000 == 0:
if i >= 1000:
print(f'{i}, Loss: {loss_1k / 1000:.4f}')
loss_1k = loss.item()
else:
loss_1k += loss.item()
# 勾配を初期化する
optimizer.zero_grad()
# 勾配を計算する
loss.backward()
# 予測結果と正解データとの誤差 (損失) を小さくするように、モデルのパラメータを調整する
optimizer.step()
# 損失の合計値を更新する
loss_sum += loss.item()
outfile = f'lstm-simple-epoch{epoch + 1:02d}.model'
# モデルを保存する
torch.save(model.state_dict(), outfile)
elapsed_time = time.time() - epoch_start_time
print(f'Epoch: {epoch + 1}, Loss: {loss_sum / len(x_train):.4f}, Time: {elapsed_time:.2f} s')
if __name__ == '__main__':
start_time = time.time()
main()
print(f'経過時間: {time.time() - start_time:.2f} s')
以下のような実行結果となった。 以下は、エポックの結果のみのログとなる。
Epoch: 1, Loss: 0.6146, Time: 49.05 s
Epoch: 2, Loss: 0.2516, Time: 51.35 s
Epoch: 3, Loss: 0.1913, Time: 54.34 s
Epoch: 4, Loss: 0.1596, Time: 50.23 s
Epoch: 5, Loss: 0.1384, Time: 53.11 s
Epoch: 6, Loss: 0.1230, Time: 50.67 s
Epoch: 7, Loss: 0.1112, Time: 52.36 s
Epoch: 8, Loss: 0.1017, Time: 49.35 s
Epoch: 9, Loss: 0.0939, Time: 52.73 s
Epoch: 10, Loss: 0.0873, Time: 50.18 s
経過時間: 518.06 s
推論をする
学習済みのモデルを利用して、推論をする。 推論には、テスト用データを利用する。
infer_simple.py
import bz2
import numpy as np
import time
import torch
from pos_tagger_lstm import PosTaggerLSTM
# 処理済みの単語を重複なく保持する入出力ファイル名
WORD_FILE = 'lstm-word.txt.bz2'
# 処理済みの品詞を重複なく保持する入出力ファイル名
POS_FILE = 'lstm-pos.txt.bz2'
# テスト用訓練データ出力ファイル名
X_TEST_FILE = 'lstm-simple-x-test.npy'
# テスト用教師データ出力ファイル名
Y_TEST_FILE = 'lstm-simple-y-test.npy'
# モデルファイル名
MODEL_FILE = 'lstm-simple-epoch10.model'
def main():
"""
LSTM モデル推論を行う
"""
print('Loading datas...')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 単語 ID 辞書を読み込む
with bz2.open(WORD_FILE, 'rt', encoding='utf-8') as f:
word_ids = {line.split('\t')[0]: int(line.split('\t')[1]) for line in f}
# 品詞 ID 辞書を読み込む
with bz2.open(POS_FILE, 'rt', encoding='utf-8') as f:
pos_ids = {line.split('\t')[0]: int(line.split('\t')[1]) for line in f}
# テスト用訓練データファイルを読み込む
x_test = np.load(X_TEST_FILE, allow_pickle=True)
# テスト用教師データファイルを読み込む
y_test = np.load(Y_TEST_FILE, allow_pickle=True)
# モデルに、単語 ID 数と、品詞 ID 数を指定する
model = PosTaggerLSTM(len(word_ids) + 1, len(pos_ids), 100).to(device)
# 学習済みモデルデータを詠み込む
model.load_state_dict(torch.load(MODEL_FILE, weights_only=True, map_location=device))
# 処理した単語合計数
data_count = 0
# 推論モードに設定する
model.eval()
print('Start inference...')
with torch.no_grad():
start_time = time.time()
# 正答数
ok = 0
for i in range(len(x_test)):
data_count += len(x_test[i])
# 訓練データをテンソルに変換する
x = [x_test[i]]
x = torch.tensor(x, dtype=torch.long).to(device)
# 教師データをテンソルに変換する
y = torch.tensor(y_test[i], dtype=torch.long).to(device)
# 推論する
output = model(x)
# 推論結果から、回答を取り出す
ans = torch.argmax(output[0], dim=1)
# 正答数をカウントする
ok += torch.sum(ans == y).item()
print(f'OK: {ok}, Total: {data_count}, Accuracy: {ok / data_count:.4f}')
elapsed_time = time.time() - start_time
print(f'経過時間: {elapsed_time:.2f} s ({data_count / elapsed_time:.0f} times/s)')
if __name__ == '__main__':
main()
以下のような実行結果となった。 OK は、正答数、Total は、合計試験数、Accuracy は、正答率となる。
Loading datas...
Start inference...
OK: 422984, Total: 440767, Accuracy: 0.9597
経過時間: 6.88 s (64062 times/s)
Process finished with exit code 0
まとめ
以上で基本的なモデル学習を体験できた。 やはり、モデル学習を行うと、CPU での処理には限界を感じてくる。 試しに、Colaboratory 上の T4 GPU ランタイム環境でモデル学習を実行してみたが、10 分以上かかった。 遅い原因は、訓練データを一件ずつ処理しているためと思われる。 次の記事では、モデル学習部分の効率化を体験していこうと思う。