はじめに
時系列データの異常検知(Anomaly Detection)は、IoTセンサの故障検出、製造ラインの品質管理、金融市場の不正取引検知、サーバ監視など幅広い分野で不可欠な技術です。異常を早期に検知することで、大規模な障害やコストの発生を未然に防ぐことができます。
本記事では、時系列データに対する3つの異常検知手法を体系的に解説し、Pythonで実装・比較します。
- 移動平均 + 標準偏差(ボリンジャーバンド法) — 最もシンプルな統計的手法
- 指数移動平均(EMA)+ 残差分析 — 適応的な平滑化による手法
- カルマンフィルタのイノベーション系列 — 状態空間モデルに基づく手法
シンプルな手法からモデルベースの手法へと段階的に進み、それぞれの特徴と使い分けを明確にします。
異常の分類
時系列データにおける異常は、大きく3種類に分類されます。
点異常(Point Anomaly)
単一のデータ点が他のデータから大きく逸脱するケースです。センサのスパイクノイズや、通信エラーによる異常値がこれに該当します。
文脈異常(Contextual Anomaly)
値そのものは正常範囲内ですが、文脈(時間帯、季節など)を考慮すると異常となるケースです。例えば、冬季に夏季と同じ気温が観測された場合がこれに当たります。
集団異常(Collective Anomaly)
個々のデータ点は正常でも、一連のデータ点のパターンが全体として異常となるケースです。例えば、心電図で短時間の間に通常とは異なるリズムが連続する場合です。
本記事では、最も基本的かつ実用的な点異常の検知に焦点を当てます。
テストデータの生成
3つの手法を公平に比較するため、共通のテストデータを生成します。ベース信号にサイン波とトレンドを含め、以下の異常を注入します。
- 点異常(スパイク):既知のインデックスに突発的な値を追加
- レベルシフト:ある時点から定常値がステップ状に変化
- 緩やかなドリフト:ある時点から徐々に値が増加
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
n = 300 # データ点数
# ベース信号:サイン波 + 線形トレンド + ノイズ
t = np.arange(n)
base_signal = 5 * np.sin(2 * np.pi * t / 100) + 0.02 * t
noise = np.random.normal(0, 0.5, n)
data = base_signal + noise
# 真のラベル(0: 正常, 1: 異常)
labels = np.zeros(n, dtype=int)
# 点異常(スパイク)の注入
spike_indices = [50, 120, 200, 250]
for idx in spike_indices:
data[idx] += np.random.choice([-1, 1]) * np.random.uniform(8, 12)
labels[idx] = 1
# レベルシフトの注入(インデックス160から180)
shift_start, shift_end = 160, 180
data[shift_start:shift_end] += 6.0
labels[shift_start:shift_end] = 1
# 緩やかなドリフトの注入(インデックス220から240)
drift_start, drift_end = 220, 240
drift = np.linspace(0, 5, drift_end - drift_start)
data[drift_start:drift_end] += drift
labels[drift_start:drift_end] = 1
# データの可視化
fig, axes = plt.subplots(2, 1, figsize=(14, 6), sharex=True)
axes[0].plot(t, data, "b-", linewidth=0.8, label="Observed")
axes[0].plot(t, base_signal, "g--", linewidth=0.8, alpha=0.7, label="True signal")
axes[0].scatter(t[labels == 1], data[labels == 1], c="red", s=20,
zorder=5, label="Anomaly (ground truth)")
axes[0].set_ylabel("Value")
axes[0].set_title("Test Data with Injected Anomalies")
axes[0].legend(loc="upper left", fontsize=9)
axes[0].grid(True, alpha=0.3)
axes[1].stem(t, labels, linefmt="r-", markerfmt="ro", basefmt="k-")
axes[1].set_ylabel("Label")
axes[1].set_xlabel("Time step")
axes[1].set_title("Ground Truth Labels")
axes[1].set_yticks([0, 1])
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
このデータには、点異常(4箇所のスパイク)、レベルシフト(20ステップ)、緩やかなドリフト(20ステップ)が含まれており、各手法の検知性能を多面的に評価できます。
手法1: 移動平均 + 標準偏差(ボリンジャーバンド法)
コンセプト
最もシンプルな異常検知手法は、過去のデータから局所的な平均と標準偏差を計算し、現在の値がその範囲を超えたときに異常と判定するものです。金融分野ではボリンジャーバンドとして知られています。
数理
直近 \(W\) 個のデータ点から、ローリング平均とローリング標準偏差を計算します。
\[ \mu_t = \frac{1}{W}\sum_{i=t-W+1}^{t} x_i \tag{1} \]\[ \sigma_t = \sqrt{\frac{1}{W}\sum_{i=t-W+1}^{t} (x_i - \mu_t)^2} \tag{2} \]異常判定の基準は以下のとおりです。
\[ \text{anomaly if } |x_t - \mu_t| > k\sigma_t \tag{3} \]ここで \(k\) はしきい値の倍率パラメータです。正規分布を仮定すると、\(k=3\) で約99.7%のデータが正常範囲に収まります。
パラメータ
- 窓幅 \(W\): 大きいほど平滑化が強くなるが、変化への追従が遅れる
- しきい値倍率 \(k\): 大きいほど異常判定が厳しくなる(偽陽性が減るが、偽陰性が増える)
Python実装
def detect_sma(data, window=30, k=3.0):
"""移動平均 + 標準偏差による異常検知"""
n = len(data)
anomalies = np.zeros(n, dtype=int)
scores = np.zeros(n)
for t in range(window, n):
segment = data[t - window:t]
mu = np.mean(segment)
sigma = np.std(segment)
scores[t] = abs(data[t] - mu) / sigma if sigma > 1e-10 else 0.0
if scores[t] > k:
anomalies[t] = 1
return anomalies, scores
特性
- 利点: 実装が簡単で直感的。パラメータの意味が明確
- 欠点: 固定窓幅のため信号のダイナミクスに適応できない。窓幅の半分だけ検知が遅延する。レベルシフトなどの持続的な変化には窓に取り込まれると検知できなくなる
移動平均フィルタの詳細は移動平均フィルタの種類と比較を参照してください。
手法2: 指数移動平均(EMA)+ 残差分析
コンセプト
EMAを使うと、最新のデータにより大きな重みを置いた適応的な平滑化が可能です。EMAによる予測値と実際の観測値の残差を分析することで、異常を検知します。
数理
EMAによる予測値は以下の再帰式で計算されます。
\[ \hat{x}_t = \alpha x_t + (1-\alpha)\hat{x}_{t-1} \tag{4} \]1ステップ前の予測値と現在の観測値の残差(予測誤差)を異常スコアとします。
\[ e_t = x_t - \hat{x}_{t-1} \tag{5} \]残差の指数加重移動標準偏差(EWMSTD)を使って正規化し、異常を判定します。
\[ \text{anomaly if } |e_t| > k \cdot \text{EWMSTD}_t \tag{6} \]ここで EWMSTD は残差の指数加重移動標準偏差です。
\[ \text{EWMSTD}_t^2 = \alpha \cdot e_t^2 + (1-\alpha) \cdot \text{EWMSTD}_{t-1}^2 \tag{7} \]Python実装
def detect_ema(data, alpha=0.3, k=3.0):
"""EMA + 残差分析による異常検知"""
n = len(data)
anomalies = np.zeros(n, dtype=int)
scores = np.zeros(n)
ema_val = data[0]
ema_var = 0.0
for t in range(1, n):
# 残差の計算
residual = data[t] - ema_val
ema_std = np.sqrt(ema_var)
# 異常スコア(現在の残差を取り込む前の分散で評価)
scores[t] = abs(residual) / ema_std if ema_std > 1e-10 else 0.0
if scores[t] > k:
anomalies[t] = 1
# 指数加重移動分散の更新(スコア計算後に更新)
ema_var = alpha * residual ** 2 + (1 - alpha) * ema_var
# EMAの更新
ema_val = alpha * data[t] + (1 - alpha) * ema_val
return anomalies, scores
特性
- 利点: 最新データにより強く追従するため、SMAより遅延が小さい。メモリ効率が良い(\(O(1)\))
- 欠点: 信号のダイナミクス(トレンド成分など)を明示的にモデル化していない。\(\alpha\) の選択に経験が必要
EMAの数学的性質の詳細は指数移動平均(EMA)フィルタの周波数特性を参照してください。
手法3: カルマンフィルタによる異常検知
この手法は、時系列データを状態空間モデルとして記述し、カルマンフィルタのイノベーション系列の統計的性質を利用して異常を検知します。信号のダイナミクス(レベルとトレンド)を明示的にモデル化できる点が統計的手法との大きな違いです。
状態空間モデルの設定
時系列データを「レベル」と「トレンド(傾き)」の2つの状態で記述します。
状態ベクトル \(\mathbf{x}_k = [\ell_k, b_k]^T\)(レベル、トレンド)に対して、以下の状態空間モデルを定義します。
状態遷移行列:
\[ A = \begin{bmatrix} 1 & 1 \\\ 0 & 1 \end{bmatrix} \tag{8} \]これは「レベルはトレンド分だけ増加し、トレンドは一定」というモデルです。
観測行列:
\[ H = \begin{bmatrix} 1 & 0 \end{bmatrix} \tag{9} \]観測されるのはレベル成分のみです。
カルマンフィルタの基本理論についてはカルマンフィルタの理論とPython実装で詳しく解説しています。
イノベーション系列(Innovation Sequence)
カルマンフィルタの予測ステップで得られるイノベーション(観測残差)は、モデルが予測した値と実際の観測値の差です。
\[ \mathbf{y}_k = \mathbf{z}_k - H\hat{\mathbf{x}}_{k|k-1} \tag{10} \]正常な動作条件下では、イノベーションは以下の分布に従います。
\[ \mathbf{y}_k \sim \mathcal{N}(\mathbf{0}, S_k) \tag{11} \]ここで \(S_k\) はイノベーション共分散行列です。
\[ S_k = HP_{k|k-1}H^T + R \tag{12} \]モデルが正しく信号のダイナミクスを捉えている限り、イノベーションはゼロ平均の白色ノイズ系列になります。異常が発生すると、イノベーションの統計的性質が変化するため、これを異常検知に利用できます。
正規化イノベーション二乗(NIS: Normalized Innovation Squared)
イノベーションをその共分散で正規化した正規化イノベーション二乗(NIS)を定義します。
\[ \nu_k = \mathbf{y}_k^T S_k^{-1} \mathbf{y}_k \tag{13} \]正常条件下では、NISは自由度 \(m\)(観測の次元)のカイ二乗分布に従います。
\[ \nu_k \sim \chi^2(m) \tag{14} \]本記事の1次元観測(\(m=1\))の場合、異常判定は以下のようになります。
\[ \text{anomaly if } \nu_k > \chi^2_{1-\alpha}(1) \tag{15} \]有意水準 \(\alpha = 0.01\) では \(\chi^2_{0.99}(1) \approx 6.63\) がしきい値となります。
Python実装
from scipy.stats import chi2
def detect_kalman(data, q=0.01, r=1.0, sig_level=0.01):
"""カルマンフィルタのイノベーション系列による異常検知"""
n = len(data)
anomalies = np.zeros(n, dtype=int)
nis_values = np.zeros(n)
innovations = np.zeros(n)
filtered = np.zeros(n)
# 状態空間モデルの定義(レベル + トレンド)
A = np.array([[1.0, 1.0],
[0.0, 1.0]])
H = np.array([[1.0, 0.0]])
Q = q * np.eye(2)
R = np.array([[r]])
# 初期化
x = np.array([data[0], 0.0]) # [レベル, トレンド]
P = np.diag([1.0, 1.0])
# カイ二乗しきい値(自由度1, 有意水準sig_level)
threshold = chi2.ppf(1 - sig_level, df=1)
for k in range(1, n):
# 予測ステップ
x_pred = A @ x
P_pred = A @ P @ A.T + Q
# イノベーション
y = data[k] - H @ x_pred
S = H @ P_pred @ H.T + R
# NIS(正規化イノベーション二乗)
S_inv = 1.0 / S[0, 0]
nis = float(y[0] ** 2 * S_inv)
nis_values[k] = nis
innovations[k] = y[0]
# 異常判定
if nis > threshold:
anomalies[k] = 1
# 更新ステップ(カルマンゲイン)
K = P_pred @ H.T * S_inv
x = x_pred + K.flatten() * y[0]
P = (np.eye(2) - K @ H) @ P_pred
filtered[k] = x[0]
filtered[0] = data[0]
return anomalies, nis_values, innovations, filtered
異常が検知された時点ではカルマンゲインによって通常どおり状態が更新されます。より頑健な実装では、異常と判定された観測を棄却し、予測値のみで状態を維持するアプローチも可能です。
比較実験
3つの手法を同一のテストデータに適用し、検知性能を比較します。
from sklearn.metrics import precision_score, recall_score, f1_score
# 各手法の適用
sma_anomalies, sma_scores = detect_sma(data, window=30, k=3.0)
ema_anomalies, ema_scores = detect_ema(data, alpha=0.3, k=3.0)
kf_anomalies, kf_nis, kf_innov, kf_filtered = detect_kalman(data, q=0.01, r=1.0, alpha=0.01)
# 評価指標の計算(最初の30ステップはSMAの窓幅分を除外)
eval_start = 30
y_true = labels[eval_start:]
results = {}
for name, preds in [("SMA+σ", sma_anomalies),
("EMA+σ", ema_anomalies),
("Kalman NIS", kf_anomalies)]:
y_pred = preds[eval_start:]
p = precision_score(y_true, y_pred, zero_division=0)
r = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
results[name] = {"Precision": p, "Recall": r, "F1": f1}
# 結果のテーブル表示
print(f"{'Method':<15} {'Precision':>10} {'Recall':>10} {'F1 Score':>10}")
print("-" * 50)
for name, metrics in results.items():
print(f"{name:<15} {metrics['Precision']:>10.3f} {metrics['Recall']:>10.3f} {metrics['F1']:>10.3f}")
検知結果の可視化
fig, axes = plt.subplots(4, 1, figsize=(14, 14), sharex=True)
# 元データ
axes[0].plot(t, data, "b-", linewidth=0.8, label="Observed")
axes[0].scatter(t[labels == 1], data[labels == 1], c="red", s=25,
zorder=5, label="Ground truth")
axes[0].set_ylabel("Value")
axes[0].set_title("Test Data")
axes[0].legend(loc="upper left", fontsize=9)
axes[0].grid(True, alpha=0.3)
# SMA + σ
sma_det = sma_anomalies.astype(bool)
axes[1].plot(t, data, "b-", linewidth=0.8, alpha=0.5)
axes[1].scatter(t[sma_det], data[sma_det], c="orange", s=30,
zorder=5, marker="x", label="Detected")
axes[1].scatter(t[labels == 1], data[labels == 1], c="red", s=15,
zorder=4, alpha=0.5, label="Ground truth")
axes[1].set_ylabel("Value")
axes[1].set_title("SMA + σ (W=30, k=3)")
axes[1].legend(loc="upper left", fontsize=9)
axes[1].grid(True, alpha=0.3)
# EMA + σ
ema_det = ema_anomalies.astype(bool)
axes[2].plot(t, data, "b-", linewidth=0.8, alpha=0.5)
axes[2].scatter(t[ema_det], data[ema_det], c="orange", s=30,
zorder=5, marker="x", label="Detected")
axes[2].scatter(t[labels == 1], data[labels == 1], c="red", s=15,
zorder=4, alpha=0.5, label="Ground truth")
axes[2].set_ylabel("Value")
axes[2].set_title("EMA + σ (α=0.3, k=3)")
axes[2].legend(loc="upper left", fontsize=9)
axes[2].grid(True, alpha=0.3)
# Kalman NIS
kf_det = kf_anomalies.astype(bool)
axes[3].plot(t, data, "b-", linewidth=0.8, alpha=0.5)
axes[3].scatter(t[kf_det], data[kf_det], c="orange", s=30,
zorder=5, marker="x", label="Detected")
axes[3].scatter(t[labels == 1], data[labels == 1], c="red", s=15,
zorder=4, alpha=0.5, label="Ground truth")
axes[3].set_ylabel("Value")
axes[3].set_title("Kalman Filter NIS (q=0.01, r=1.0, α=0.01)")
axes[3].set_xlabel("Time step")
axes[3].legend(loc="upper left", fontsize=9)
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
結果の考察
上記コードを実行すると各手法の Precision / Recall / F1 Score が表示されます。テストデータは np.random.seed(42) で固定されているため、再現可能です。以下は代表的な傾向です(パラメータ調整で変動します)。
各手法の検知傾向には以下の特徴があります。
- SMA + σ: 点異常(スパイク)の検知には有効ですが、窓幅分の遅延があり、レベルシフトの先頭部分しか検知できません。ドリフトのような緩やかな変化は窓に取り込まれて正常と判定されやすくなります
- EMA + σ: SMAよりも反応が速く、レベルシフトの検知が改善されます。しかしトレンド成分を明示的にモデル化していないため、トレンドの変化に弱い面があります
- Kalman NIS: トレンドを状態としてモデル化しているため、レベルシフトやドリフトの開始点を検知しやすくなります。一方、モデルが信号のダイナミクスに適応した後は正常と判定するため、持続的な異常の全区間を検知するわけではありません。なお、本実験ではレベルシフトの全区間を異常としてラベル付けしていますが、変化点検知(Change Point Detection)の観点では遷移点のみを異常とする定義もあります。この定義の違いにより、カルマンフィルタのRecallは見かけ上低くなる点に注意してください
手法の選び方
各手法の適用に関する実践的なガイドです。
| 評価基準 | SMA + σ | EMA + σ | Kalman NIS |
|---|---|---|---|
| 実装の複雑さ | 低い | 低い | 中程度 |
| パラメータ感度 | 中程度 | 中程度 | 中程度(Q, Rの設定に依存) |
| モデルの前提条件 | なし | なし | 線形ガウス |
| トレンドへの対応 | できない | 部分的 | できる |
| 検知遅延 | \(W/2\) ステップ | 1ステップ | 1ステップ |
| メモリ使用量 | \(O(W)\) | \(O(1)\) | \(O(d^2)\)(\(d\): 状態次元) |
| 適した用途 | シンプルな閾値監視 | ストリーミングデータ | モデルベースのシステム |
選択の指針:
- まず試すべき手法: SMA + σ。シンプルで解釈しやすく、多くの場面で十分な性能を発揮します
- リアルタイム性が重要な場合: EMA + σ。1ステップ遅延で計算コストも \(O(1)\) です
- トレンドやレベル変化を伴う場合: カルマンフィルタNIS。信号のダイナミクスを明示的にモデル化できるため、より正確な異常判定が可能です
まとめ
本記事では、時系列データの異常検知に対する3つの手法を解説し、Pythonで実装・比較しました。
- 移動平均 + 標準偏差は最もシンプルで直感的な手法であり、スパイク型の点異常に効果的
- EMA + 残差分析は適応的な平滑化により、SMAよりも検知遅延が小さく、ストリーミング処理に適している
- カルマンフィルタのNISは状態空間モデルに基づく手法で、トレンド変化を含む異常を検知できる
- 手法の選択はデータの特性と要件(リアルタイム性、トレンドの有無、実装の制約)に依存する
- 実際の応用では、複数の手法を組み合わせることで検知の信頼性を向上させることができる
関連記事
- カルマンフィルタの理論とPython実装 - 本記事のカルマンフィルタ法の基礎となる理論と実装を解説しています。
- 拡張カルマンフィルタ(EKF)の理論とPython実装 - 非線形な時系列データに対応する拡張カルマンフィルタを解説しています。
- 指数移動平均(EMA)フィルタの周波数特性 - EMAの数学的性質と周波数応答の詳細を解説しています。
- 移動平均フィルタの種類と比較 - SMA・WMA・EMAの特性比較とPython実装を解説しています。
- ARIMAモデルによる時系列予測のPython実装 - 時系列モデリングの代表的手法であるARIMAを解説しています。
- 信号処理におけるフィルタリング手法の基礎 - カルマンフィルタを含む各種フィルタリング手法の概要を解説しています。
- カルマンスムーザ(RTS Smoother)の理論とPython実装 - オフライン異常検知に有用な、全時刻データを使った平滑化手法を解説しています。
参考文献
- Basseville, M., & Nikiforov, I. V. (1993). Detection of Abrupt Changes: Theory and Application. Prentice Hall.
- Bar-Shalom, Y., Li, X. R., & Kirubarajan, T. (2004). Estimation with Applications to Tracking and Navigation. Wiley.
- SciPy Documentation: scipy.stats.chi2