Anomaly Detection
This section covers various anomaly detection techniques and their implementation in machine learning.
Statistical Methods
1. Z-Score Method
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
class ZScoreDetector:
def __init__(self, threshold=3.0):
"""Initialize Z-score detector"""
self.threshold = threshold
self.mean = None
self.std = None
def fit(self, X):
"""Fit detector"""
self.mean = np.mean(X, axis=0)
self.std = np.std(X, axis=0)
return self
def predict(self, X):
"""Predict anomalies"""
z_scores = np.abs((X - self.mean) / self.std)
return np.any(z_scores > self.threshold, axis=1)
def plot_scores(self, X):
"""Plot Z-scores"""
z_scores = np.abs((X - self.mean) / self.std)
plt.figure(figsize=(10, 6))
plt.plot(z_scores, 'b.')
plt.axhline(
y=self.threshold,
color='r',
linestyle='--',
label=f'Threshold ({self.threshold})'
)
plt.xlabel('Sample')
plt.ylabel('Z-score')
plt.title('Z-score Analysis')
plt.legend()
return plt.gcf()
2. Interquartile Range (IQR) Method
class IQRDetector:
def __init__(self, k=1.5):
"""Initialize IQR detector"""
self.k = k
self.q1 = None
self.q3 = None
def fit(self, X):
"""Fit detector"""
self.q1 = np.percentile(X, 25, axis=0)
self.q3 = np.percentile(X, 75, axis=0)
return self
def predict(self, X):
"""Predict anomalies"""
iqr = self.q3 - self.q1
lower_bound = self.q1 - self.k * iqr
upper_bound = self.q3 + self.k * iqr
return np.any(
(X < lower_bound) | (X > upper_bound),
axis=1
)
def plot_bounds(self, X):
"""Plot bounds and data"""
iqr = self.q3 - self.q1
lower_bound = self.q1 - self.k * iqr
upper_bound = self.q3 + self.k * iqr
plt.figure(figsize=(10, 6))
for i in range(X.shape[1]):
plt.subplot(X.shape[1], 1, i+1)
plt.plot(X[:, i], 'b.')
plt.axhline(
y=lower_bound[i],
color='r',
linestyle='--'
)
plt.axhline(
y=upper_bound[i],
color='r',
linestyle='--'
)
plt.ylabel(f'Feature {i+1}')
plt.xlabel('Sample')
plt.tight_layout()
return plt.gcf()
Density-Based Methods
1. Local Outlier Factor (LOF)
from sklearn.neighbors import LocalOutlierFactor
class LOFDetector:
def __init__(
self,
n_neighbors=20,
contamination='auto',
**kwargs
):
"""Initialize LOF detector"""
self.model = LocalOutlierFactor(
n_neighbors=n_neighbors,
contamination=contamination,
**kwargs
)
def fit_predict(self, X):
"""Fit and predict anomalies"""
return self.model.fit_predict(X) == -1
def predict(self, X):
"""Predict anomalies"""
return self.model.predict(X) == -1
def score_samples(self, X):
"""Get anomaly scores"""
return -self.model.negative_outlier_factor_
def plot_scores(self, X):
"""Plot anomaly scores"""
scores = self.score_samples(X)
plt.figure(figsize=(10, 6))
plt.plot(scores, 'b.')
plt.xlabel('Sample')
plt.ylabel('Anomaly Score')
plt.title('LOF Anomaly Scores')
return plt.gcf()
2. Isolation Forest
from sklearn.ensemble import IsolationForest
class IForestDetector:
def __init__(
self,
n_estimators=100,
contamination='auto',
**kwargs
):
"""Initialize Isolation Forest detector"""
self.model = IsolationForest(
n_estimators=n_estimators,
contamination=contamination,
**kwargs
)
def fit(self, X):
"""Fit detector"""
self.model.fit(X)
return self
def predict(self, X):
"""Predict anomalies"""
return self.model.predict(X) == -1
def score_samples(self, X):
"""Get anomaly scores"""
return -self.model.score_samples(X)
def plot_scores(self, X):
"""Plot anomaly scores"""
scores = self.score_samples(X)
plt.figure(figsize=(10, 6))
plt.plot(scores, 'b.')
plt.xlabel('Sample')
plt.ylabel('Anomaly Score')
plt.title('Isolation Forest Anomaly Scores')
return plt.gcf()
Deep Learning Methods
1. Autoencoder
import tensorflow as tf
from tensorflow.keras import layers, models
class AutoencoderDetector:
def __init__(
self,
input_dim,
encoding_dim,
hidden_layers=None,
threshold=None
):
"""Initialize autoencoder detector"""
self.input_dim = input_dim
self.encoding_dim = encoding_dim
self.hidden_layers = hidden_layers or []
self.threshold = threshold
self.model = self._build_autoencoder()
def _build_autoencoder(self):
"""Build autoencoder model"""
# Encoder
encoder = models.Sequential()
encoder.add(layers.Input(shape=(self.input_dim,)))
for units in self.hidden_layers:
encoder.add(layers.Dense(units, activation='relu'))
encoder.add(layers.Dense(self.encoding_dim, activation='relu'))
# Decoder
decoder = models.Sequential()
decoder.add(layers.Input(shape=(self.encoding_dim,)))
for units in reversed(self.hidden_layers):
decoder.add(layers.Dense(units, activation='relu'))
decoder.add(layers.Dense(self.input_dim, activation='sigmoid'))
# Autoencoder
autoencoder = models.Sequential([encoder, decoder])
return autoencoder
def fit(self, X, **kwargs):
"""Fit detector"""
self.model.compile(
optimizer='adam',
loss='mse'
)
history = self.model.fit(
X, X,
**kwargs
)
if self.threshold is None:
# Set threshold based on reconstruction error
reconstructed = self.model.predict(X)
errors = np.mean(
np.square(X - reconstructed),
axis=1
)
self.threshold = np.percentile(errors, 95)
return history
def predict(self, X):
"""Predict anomalies"""
reconstructed = self.model.predict(X)
errors = np.mean(
np.square(X - reconstructed),
axis=1
)
return errors > self.threshold
def plot_reconstruction_error(self, X):
"""Plot reconstruction error"""
reconstructed = self.model.predict(X)
errors = np.mean(
np.square(X - reconstructed),
axis=1
)
plt.figure(figsize=(10, 6))
plt.plot(errors, 'b.')
if self.threshold is not None:
plt.axhline(
y=self.threshold,
color='r',
linestyle='--',
label='Threshold'
)
plt.xlabel('Sample')
plt.ylabel('Reconstruction Error')
plt.title('Autoencoder Reconstruction Error')
plt.legend()
return plt.gcf()
Model Evaluation
1. Evaluation Metrics
from sklearn.metrics import (
precision_score,
recall_score,
f1_score,
roc_curve,
auc
)
def evaluate_detector(y_true, y_pred, scores=None):
"""Evaluate anomaly detector"""
metrics = {
'precision': precision_score(y_true, y_pred),
'recall': recall_score(y_true, y_pred),
'f1': f1_score(y_true, y_pred)
}
if scores is not None:
fpr, tpr, _ = roc_curve(y_true, scores)
metrics['auc'] = auc(fpr, tpr)
return metrics
def plot_roc_curve(y_true, scores):
"""Plot ROC curve"""
fpr, tpr, _ = roc_curve(y_true, scores)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8, 6))
plt.plot(
fpr,
tpr,
color='darkorange',
lw=2,
label=f'ROC curve (AUC = {roc_auc:.2f})'
)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic')
plt.legend(loc='lower right')
return plt.gcf()
Best Practices
1. Data Preparation
- Scale features appropriately
- Handle missing values
- Remove known anomalies
- Consider dimensionality reduction
2. Algorithm Selection
- Consider data characteristics
- Evaluate computational requirements
- Test multiple methods
- Validate results
3. Parameter Tuning
- Set appropriate thresholds
- Consider contamination level
- Adjust model parameters
- Monitor performance
4. Model Evaluation
- Use multiple metrics
- Consider business impact
- Validate with domain experts
- Monitor false positives