深層学習で時系列データの予測問題に取り組んでいたのですが、単に予測値と真値が近くなるようにするだけでなく、前の時間ステップでの真値からの変化傾向が一致することも重視したいと考え、損失関数をMSE (mean squared error)から自作の損失関数の実装に取組みました。
この実装の場合、予測値と真値以外の入力に使用する値が必要なため、model.compile()時にlossを指定するというやり方では実装することができません。
kerasのModelにはadd_loss() というメソッドがあり、これにより、やりたいことが実現できます。
入力の値に応じた自作損失関数の注意
損失関数で入力の値に応じた罰則項を導入する等して算出する場合、その計算値は、その入力に特有の値であることを注意しなくてはいけません。
この注意をしなくてはいけない場面の例として、バッチサイズの設定があります。
バッチサイズを1より大きくすることにより処理速度が速くなることが期待できますが、損失関数の計算時にはバッチ内のデータごとに算出されるlossの平均値を誤差逆伝搬します。
ここで、損失関数に入力の値に応じた罰則項を導入している場合、誤差逆伝搬時にバッチ内のデータ側の要因で変化するlossの平均を取ることになってしまいます。そのため、学習時にバッチサイズを1より大きくする際は、罰則項が平均化されることが正しいのかを検討する必要があります。
モデルクラスの作成
今回はテストのためこちらの簡単な2層のモデルで考えることにします。
クラス化する理由は様々なニューロン数などの条件で実験したいときを想定して再利用性を高めるためです。
import tensorflow as tf
class ModelCustomLoss(tf.keras.models.Model):
def __init__(self, *args, **kwargs):
super(ModelCustomLoss, self).__init__()
self.layer_dense_1 = tf.keras.layers.Dense(kwargs.get('n_dense'))
self.layer_dense_out = tf.keras.layers.Dense(1)
def call(self, inputs):
dense_1 = self.layer_dense_1(inputs[0])
out = self.layer_dense_out(dense_1)
return out
損失関数の定義
ここでは、値が一致していること以外に、前回の値からの変化傾向が一致していることも重視したいため、損失関数にはMSEに加え、変化傾向が一致しないことに罰則を与える形で損失関数を定義したいと思います。
not_trend_matchで予測した変化傾向と実際の変化傾向が一致しているかを判定しており、判定していない場合、誤差の絶対値の分だけ罰則を与えるというものになっています。
def custom_loss(y_true, y_pred, y_last, lambda_trend=1e-3):
error = y_true - y_pred
trend_true = y_true - y_last
trend_pred = y_pred - y_last
not_trend_match = tf.cast(tf.math.sign(trend_true)==tf.math.sign(trend_pred), tf.float32)
loss = tf.math.reduce_mean(error**2 + lambda_trend * tf.math.abs(trend_pred) * not_trend_match)
return loss
ここで、単にlossを
loss = tf.math.reduce_mean(error**2)
とすればMSEとなります。
add_loss()で損失関数の設定
モデルに損失関数を定義する方法及び呼び出し方法は以下の通りになります。
Modelクラス内のcallでadd_lossを呼び出すのですが、この時、custom_lossの引数となるyの真値、yの前の値が必要になります。model.fit()内で複数の入力を指定することでcall内でも参照できるようになります。
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
def custom_loss(y_true, y_pred, y_last, lambda_trend=1e-3):
error = y_true - y_pred
trend_true = y_true - y_last
trend_pred = y_pred - y_last
not_trend_match = tf.cast(tf.math.sign(trend_true)==tf.math.sign(trend_pred), tf.float32)
loss = tf.math.reduce_mean(error**2 + lambda_trend * tf.math.abs(trend_pred) * not_trend_match)
return loss
class ModelCustomLoss(tf.keras.models.Model):
def __init__(self, *args, **kwargs):
super(ModelCustomLoss, self).__init__()
self.layer_dense_1 = tf.keras.layers.Dense(kwargs.get('n_dense'))
self.layer_dense_out = tf.keras.layers.Dense(1)
def call(self, inputs):
dense_1 = self.layer_dense_1(inputs[0])
out = self.layer_dense_out(dense_1)
y = inputs[1]
y_last = inputs[2]
self.add_loss(custom_loss(y, out, y_last))
return out
model = ModelCustomLoss(n_dense=32)
model.compile(optimizer=Adam(learning_rate=1e-5))
# X, y, y_lastは学習データセット
model.fit(
[X, y, y_last],
y,
batch_size=1,
epochs=10,
validation_split=0.15
)
まとめ
少し複雑な損失関数を実装する方法についてまとめました。
重みに対する罰則ではなく、データセットごとに算出される罰則のため、バッチごとに平均化されて本当に良いのかを検討する必要があります。
また、今回実装するにあたって、add_lossをモデルのcall内で呼び出せず、「Keras “Output missing from loss dictionary”」というエラーが出てしまうという問題がありました。
この問題はGithubのissue内でも指摘されていたのですが、tensorflowのバージョンを2.5.0にアップグレードすることによって解決することができました。
コメント