Introduction
Anomaly detection in time-series data is an essential technique across many domains: IoT sensor fault detection, manufacturing quality control, financial fraud detection, and server monitoring. Detecting anomalies early can prevent costly failures and system outages.
This article systematically covers three anomaly detection methods for time-series data, with Python implementations and performance comparisons.
- Moving Average + Standard Deviation (Bollinger Band method) – the simplest statistical approach
- Exponential Moving Average (EMA) + Residual Analysis – an adaptive smoothing approach
- Kalman Filter Innovation Sequence – a state-space model-based approach
We progress from simple to model-based methods, making the strengths and trade-offs of each approach clear.
Types of Anomalies
Anomalies in time-series data fall into three broad categories.
Point Anomaly
A single data point deviates significantly from the rest of the data. Examples include sensor spikes and communication errors producing outlier values.
Contextual Anomaly
The value itself falls within the normal range, but is abnormal given its context (time of day, season, etc.). For example, observing summer-level temperatures in winter.
Collective Anomaly
Individual data points appear normal, but a sequence of points together forms an anomalous pattern. An example would be an unusual rhythm appearing briefly in an ECG signal.
This article focuses on point anomaly detection, the most fundamental and practically useful category.
Test Data Generation
To compare the three methods fairly, we generate a common test dataset. The base signal includes a sine wave and a linear trend, with the following injected anomalies:
- Point anomalies (spikes): sudden value jumps at known indices
- Level shift: a step change in the baseline at a known index
- Gradual drift: a slowly increasing offset starting at a known index
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(42)
n = 300 # Number of data points
# Base signal: sine wave + linear trend + noise
t = np.arange(n)
base_signal = 5 * np.sin(2 * np.pi * t / 100) + 0.02 * t
noise = np.random.normal(0, 0.5, n)
data = base_signal + noise
# Ground truth labels (0: normal, 1: anomaly)
labels = np.zeros(n, dtype=int)
# Inject point anomalies (spikes)
spike_indices = [50, 120, 200, 250]
for idx in spike_indices:
data[idx] += np.random.choice([-1, 1]) * np.random.uniform(8, 12)
labels[idx] = 1
# Inject level shift (index 160 to 180)
shift_start, shift_end = 160, 180
data[shift_start:shift_end] += 6.0
labels[shift_start:shift_end] = 1
# Inject gradual drift (index 220 to 240)
drift_start, drift_end = 220, 240
drift = np.linspace(0, 5, drift_end - drift_start)
data[drift_start:drift_end] += drift
labels[drift_start:drift_end] = 1
# Visualization
fig, axes = plt.subplots(2, 1, figsize=(14, 6), sharex=True)
axes[0].plot(t, data, "b-", linewidth=0.8, label="Observed")
axes[0].plot(t, base_signal, "g--", linewidth=0.8, alpha=0.7, label="True signal")
axes[0].scatter(t[labels == 1], data[labels == 1], c="red", s=20,
zorder=5, label="Anomaly (ground truth)")
axes[0].set_ylabel("Value")
axes[0].set_title("Test Data with Injected Anomalies")
axes[0].legend(loc="upper left", fontsize=9)
axes[0].grid(True, alpha=0.3)
axes[1].stem(t, labels, linefmt="r-", markerfmt="ro", basefmt="k-")
axes[1].set_ylabel("Label")
axes[1].set_xlabel("Time step")
axes[1].set_title("Ground Truth Labels")
axes[1].set_yticks([0, 1])
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
This dataset contains point anomalies (4 spikes), a level shift (20 steps), and gradual drift (20 steps), enabling a multi-faceted evaluation of each method’s detection capabilities.
Method 1: Moving Average + Standard Deviation (Bollinger Band Method)
Concept
The simplest anomaly detection approach computes local mean and standard deviation from past data, and flags points that fall outside that range. This is known as the Bollinger Band method in finance.
Mathematics
From the most recent \(W\) data points, we compute the rolling mean and rolling standard deviation.
\[ \mu_t = \frac{1}{W}\sum_{i=t-W+1}^{t} x_i \tag{1} \]\[ \sigma_t = \sqrt{\frac{1}{W}\sum_{i=t-W+1}^{t} (x_i - \mu_t)^2} \tag{2} \]The anomaly criterion is as follows.
\[ \text{anomaly if } |x_t - \mu_t| > k\sigma_t \tag{3} \]Here \(k\) is the threshold multiplier. Under a normal distribution assumption, \(k=3\) keeps approximately 99.7% of data within the normal range.
Parameters
- Window size \(W\): larger values produce stronger smoothing but slower response to changes
- Threshold multiplier \(k\): larger values reduce false positives but increase false negatives
Python Implementation
def detect_sma(data, window=30, k=3.0):
"""Anomaly detection using moving average + standard deviation"""
n = len(data)
anomalies = np.zeros(n, dtype=int)
scores = np.zeros(n)
for t in range(window, n):
segment = data[t - window:t]
mu = np.mean(segment)
sigma = np.std(segment)
scores[t] = abs(data[t] - mu) / sigma if sigma > 1e-10 else 0.0
if scores[t] > k:
anomalies[t] = 1
return anomalies, scores
Characteristics
- Pros: Simple to implement and interpret. Parameter meanings are clear
- Cons: Fixed window cannot adapt to signal dynamics. Detection is delayed by half the window size. Sustained changes like level shifts get absorbed into the window and become undetectable
For more on moving average filters, see Moving Average Filter Types and Comparison.
Method 2: Exponential Moving Average (EMA) + Residual Analysis
Concept
EMA provides adaptive smoothing that places greater weight on recent data. By analyzing the residuals between EMA predictions and actual observations, we can detect anomalies.
Mathematics
The EMA prediction is computed by the following recursive formula.
\[ \hat{x}_t = \alpha x_t + (1-\alpha)\hat{x}_{t-1} \tag{4} \]The residual (prediction error) between the previous prediction and the current observation serves as the anomaly score.
\[ e_t = x_t - \hat{x}_{t-1} \tag{5} \]We normalize by the exponentially weighted moving standard deviation (EWMSTD) and flag anomalies.
\[ \text{anomaly if } |e_t| > k \cdot \text{EWMSTD}_t \tag{6} \]Where EWMSTD is the exponentially weighted moving standard deviation of the residuals.
\[ \text{EWMSTD}_t^2 = \alpha \cdot e_t^2 + (1-\alpha) \cdot \text{EWMSTD}_{t-1}^2 \tag{7} \]Python Implementation
def detect_ema(data, alpha=0.3, k=3.0):
"""Anomaly detection using EMA + residual analysis"""
n = len(data)
anomalies = np.zeros(n, dtype=int)
scores = np.zeros(n)
ema_val = data[0]
ema_var = 0.0
for t in range(1, n):
# Compute residual
residual = data[t] - ema_val
ema_std = np.sqrt(ema_var)
# Anomaly score (evaluate with variance BEFORE incorporating current residual)
scores[t] = abs(residual) / ema_std if ema_std > 1e-10 else 0.0
if scores[t] > k:
anomalies[t] = 1
# Update exponentially weighted moving variance (after score computation)
ema_var = alpha * residual ** 2 + (1 - alpha) * ema_var
# Update EMA
ema_val = alpha * data[t] + (1 - alpha) * ema_val
return anomalies, scores
Characteristics
- Pros: Faster response than SMA due to heavier weighting on recent data. Memory efficient at \(O(1)\)
- Cons: Does not explicitly model signal dynamics (trends, etc.). Choosing \(\alpha\) requires experience
For a deeper dive into EMA’s mathematical properties, see Frequency Characteristics of the EMA Filter.
Method 3: Kalman Filter Anomaly Detection
This method models time-series data as a state-space system and leverages the statistical properties of the Kalman filter’s innovation sequence to detect anomalies. The key advantage over statistical methods is the ability to explicitly model signal dynamics (level and trend).
State-Space Model Setup
We describe the time series using two states: “level” and “trend (slope).”
For the state vector \(\mathbf{x}_k = [\ell_k, b_k]^T\) (level, trend), we define the following state-space model.
State transition matrix:
\[ A = \begin{bmatrix} 1 & 1 \\\ 0 & 1 \end{bmatrix} \tag{8} \]This models “level increases by the trend, and trend remains constant.”
Observation matrix:
\[ H = \begin{bmatrix} 1 & 0 \end{bmatrix} \tag{9} \]Only the level component is observed.
For the foundational theory, see Kalman Filter Theory and Python Implementation.
Innovation Sequence
The innovation (observation residual) from the Kalman filter’s prediction step is the difference between the predicted and actual observation.
\[ \mathbf{y}_k = \mathbf{z}_k - H\hat{\mathbf{x}}_{k|k-1} \tag{10} \]Under normal operating conditions, the innovation follows this distribution:
\[ \mathbf{y}_k \sim \mathcal{N}(\mathbf{0}, S_k) \tag{11} \]Where \(S_k\) is the innovation covariance matrix.
\[ S_k = HP_{k|k-1}H^T + R \tag{12} \]As long as the model correctly captures the signal dynamics, the innovation is a zero-mean white noise sequence. When an anomaly occurs, the statistical properties of the innovation change, which we can use for anomaly detection.
Normalized Innovation Squared (NIS)
We define the Normalized Innovation Squared (NIS) by normalizing the innovation with its covariance.
\[ \nu_k = \mathbf{y}_k^T S_k^{-1} \mathbf{y}_k \tag{13} \]Under normal conditions, NIS follows a chi-squared distribution with \(m\) degrees of freedom (where \(m\) is the observation dimension).
\[ \nu_k \sim \chi^2(m) \tag{14} \]For our 1D observation case (\(m=1\)), the anomaly criterion becomes:
\[ \text{anomaly if } \nu_k > \chi^2_{1-\alpha}(1) \tag{15} \]At significance level \(\alpha = 0.01\), the threshold is \(\chi^2_{0.99}(1) \approx 6.63\).
Python Implementation
from scipy.stats import chi2
def detect_kalman(data, q=0.01, r=1.0, sig_level=0.01):
"""Anomaly detection using Kalman filter innovation sequence"""
n = len(data)
anomalies = np.zeros(n, dtype=int)
nis_values = np.zeros(n)
innovations = np.zeros(n)
filtered = np.zeros(n)
# State-space model definition (level + trend)
A = np.array([[1.0, 1.0],
[0.0, 1.0]])
H = np.array([[1.0, 0.0]])
Q = q * np.eye(2)
R = np.array([[r]])
# Initialization
x = np.array([data[0], 0.0]) # [level, trend]
P = np.diag([1.0, 1.0])
# Chi-squared threshold (1 degree of freedom, significance level sig_level)
threshold = chi2.ppf(1 - sig_level, df=1)
for k in range(1, n):
# Prediction step
x_pred = A @ x
P_pred = A @ P @ A.T + Q
# Innovation
y = data[k] - H @ x_pred
S = H @ P_pred @ H.T + R
# NIS (Normalized Innovation Squared)
S_inv = 1.0 / S[0, 0]
nis = float(y[0] ** 2 * S_inv)
nis_values[k] = nis
innovations[k] = y[0]
# Anomaly decision
if nis > threshold:
anomalies[k] = 1
# Update step (Kalman gain)
K = P_pred @ H.T * S_inv
x = x_pred + K.flatten() * y[0]
P = (np.eye(2) - K @ H) @ P_pred
filtered[k] = x[0]
filtered[0] = data[0]
return anomalies, nis_values, innovations, filtered
When an anomaly is detected, the state is still updated normally via the Kalman gain. A more robust implementation could reject anomalous observations and maintain the state using the prediction alone.
Comparison Experiment
We apply all three methods to the same test data and compare their detection performance.
from sklearn.metrics import precision_score, recall_score, f1_score
# Apply each method
sma_anomalies, sma_scores = detect_sma(data, window=30, k=3.0)
ema_anomalies, ema_scores = detect_ema(data, alpha=0.3, k=3.0)
kf_anomalies, kf_nis, kf_innov, kf_filtered = detect_kalman(data, q=0.01, r=1.0, alpha=0.01)
# Compute evaluation metrics (exclude first 30 steps for SMA window)
eval_start = 30
y_true = labels[eval_start:]
results = {}
for name, preds in [("SMA+σ", sma_anomalies),
("EMA+σ", ema_anomalies),
("Kalman NIS", kf_anomalies)]:
y_pred = preds[eval_start:]
p = precision_score(y_true, y_pred, zero_division=0)
r = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
results[name] = {"Precision": p, "Recall": r, "F1": f1}
# Display results table
print(f"{'Method':<15} {'Precision':>10} {'Recall':>10} {'F1 Score':>10}")
print("-" * 50)
for name, metrics in results.items():
print(f"{name:<15} {metrics['Precision']:>10.3f} {metrics['Recall']:>10.3f} {metrics['F1']:>10.3f}")
Visualization of Detection Results
fig, axes = plt.subplots(4, 1, figsize=(14, 14), sharex=True)
# Original data
axes[0].plot(t, data, "b-", linewidth=0.8, label="Observed")
axes[0].scatter(t[labels == 1], data[labels == 1], c="red", s=25,
zorder=5, label="Ground truth")
axes[0].set_ylabel("Value")
axes[0].set_title("Test Data")
axes[0].legend(loc="upper left", fontsize=9)
axes[0].grid(True, alpha=0.3)
# SMA + σ
sma_det = sma_anomalies.astype(bool)
axes[1].plot(t, data, "b-", linewidth=0.8, alpha=0.5)
axes[1].scatter(t[sma_det], data[sma_det], c="orange", s=30,
zorder=5, marker="x", label="Detected")
axes[1].scatter(t[labels == 1], data[labels == 1], c="red", s=15,
zorder=4, alpha=0.5, label="Ground truth")
axes[1].set_ylabel("Value")
axes[1].set_title("SMA + σ (W=30, k=3)")
axes[1].legend(loc="upper left", fontsize=9)
axes[1].grid(True, alpha=0.3)
# EMA + σ
ema_det = ema_anomalies.astype(bool)
axes[2].plot(t, data, "b-", linewidth=0.8, alpha=0.5)
axes[2].scatter(t[ema_det], data[ema_det], c="orange", s=30,
zorder=5, marker="x", label="Detected")
axes[2].scatter(t[labels == 1], data[labels == 1], c="red", s=15,
zorder=4, alpha=0.5, label="Ground truth")
axes[2].set_ylabel("Value")
axes[2].set_title("EMA + σ (α=0.3, k=3)")
axes[2].legend(loc="upper left", fontsize=9)
axes[2].grid(True, alpha=0.3)
# Kalman NIS
kf_det = kf_anomalies.astype(bool)
axes[3].plot(t, data, "b-", linewidth=0.8, alpha=0.5)
axes[3].scatter(t[kf_det], data[kf_det], c="orange", s=30,
zorder=5, marker="x", label="Detected")
axes[3].scatter(t[labels == 1], data[labels == 1], c="red", s=15,
zorder=4, alpha=0.5, label="Ground truth")
axes[3].set_ylabel("Value")
axes[3].set_title("Kalman Filter NIS (q=0.01, r=1.0, α=0.01)")
axes[3].set_xlabel("Time step")
axes[3].legend(loc="upper left", fontsize=9)
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
Discussion
Running the code above prints each method’s Precision, Recall, and F1 Score. The test data is reproducible via np.random.seed(42), so you can verify the exact numbers. The results will vary with parameter choices. Below are the qualitative trends.
Each method exhibits distinct detection characteristics:
- SMA + σ: Effective for spike-type point anomalies, but the window-size delay means it only catches the leading edge of level shifts. Gradual changes like drift get absorbed into the window and are classified as normal
- EMA + σ: Responds faster than SMA, improving level shift detection. However, without explicit trend modeling, it remains vulnerable to trend changes
- Kalman NIS: By modeling trend as a state variable, it can detect the onset of level shifts and drift. However, once the model adapts to the new signal dynamics, it classifies the post-adaptation data as normal rather than flagging the entire anomalous region. Note that our ground truth labels the entire level-shift region as anomalous, but in a change-point detection framing, only the transition point would be labeled anomalous. This definitional difference makes the Kalman filter’s recall appear lower than it would under a change-point evaluation
Choosing the Right Method
A practical guide for selecting the appropriate method.
| Criterion | SMA + σ | EMA + σ | Kalman NIS |
|---|---|---|---|
| Implementation complexity | Low | Low | Medium |
| Parameter sensitivity | Moderate | Moderate | Moderate (depends on Q, R) |
| Model assumptions | None | None | Linear-Gaussian |
| Handles trends | No | Partially | Yes |
| Detection latency | \(W/2\) steps | 1 step | 1 step |
| Memory usage | \(O(W)\) | \(O(1)\) | \(O(d^2)\) (\(d\): state dim) |
| Best for | Simple threshold monitoring | Streaming data | Model-based systems |
Selection guidelines:
- Start with: SMA + σ. It is simple, interpretable, and performs well enough in many situations
- When real-time performance matters: EMA + σ. It has 1-step latency and \(O(1)\) computational cost
- When trends or level changes are present: Kalman filter NIS. It can explicitly model signal dynamics for more accurate anomaly detection
Summary
This article covered three anomaly detection methods for time-series data, with Python implementations and comparisons.
- Moving average + standard deviation is the simplest and most intuitive method, effective for spike-type point anomalies
- EMA + residual analysis provides adaptive smoothing with less detection latency than SMA, well suited for streaming applications
- Kalman filter NIS is a state-space model-based method that can detect anomalies involving trend changes
- Method selection depends on data characteristics and requirements (real-time needs, presence of trends, implementation constraints)
- In practice, combining multiple methods can improve detection reliability
Related Articles
- Kalman Filter Theory and Python Implementation - Covers the foundational theory and implementation underlying the Kalman filter method used in this article.
- Extended Kalman Filter (EKF) Theory and Python Implementation - Extends the Kalman filter for nonlinear time-series data.
- Frequency Characteristics of the EMA Filter - Detailed analysis of EMA’s mathematical properties and frequency response.
- Moving Average Filter Types and Comparison - Comparison of SMA, WMA, and EMA characteristics with Python implementations.
- ARIMA Time-Series Forecasting in Python - Covers ARIMA, a representative time-series modeling method.
- Fundamentals of Filtering Methods in Signal Processing - An overview of various filtering methods including the Kalman filter.
- RTS Smoother Theory and Python Implementation - A smoothing method useful for offline anomaly detection that uses data from all time steps.
References
- Basseville, M., & Nikiforov, I. V. (1993). Detection of Abrupt Changes: Theory and Application. Prentice Hall.
- Bar-Shalom, Y., Li, X. R., & Kirubarajan, T. (2004). Estimation with Applications to Tracking and Navigation. Wiley.
- SciPy Documentation: scipy.stats.chi2