Anomaly Detection

This section covers various anomaly detection techniques and their implementation in machine learning.

Statistical Methods

1. Z-Score Method

import numpy as np
import matplotlib.pyplot as plt
from scipy import stats

class ZScoreDetector:
    def __init__(self, threshold=3.0):
        """Initialize Z-score detector"""
        self.threshold = threshold
        self.mean = None
        self.std = None
    
    def fit(self, X):
        """Fit detector"""
        self.mean = np.mean(X, axis=0)
        self.std = np.std(X, axis=0)
        return self
    
    def predict(self, X):
        """Predict anomalies"""
        z_scores = np.abs((X - self.mean) / self.std)
        return np.any(z_scores > self.threshold, axis=1)
    
    def plot_scores(self, X):
        """Plot Z-scores"""
        z_scores = np.abs((X - self.mean) / self.std)
        
        plt.figure(figsize=(10, 6))
        plt.plot(z_scores, 'b.')
        plt.axhline(
            y=self.threshold,
            color='r',
            linestyle='--',
            label=f'Threshold ({self.threshold})'
        )
        plt.xlabel('Sample')
        plt.ylabel('Z-score')
        plt.title('Z-score Analysis')
        plt.legend()
        return plt.gcf()

2. Interquartile Range (IQR) Method

class IQRDetector:
    def __init__(self, k=1.5):
        """Initialize IQR detector"""
        self.k = k
        self.q1 = None
        self.q3 = None
    
    def fit(self, X):
        """Fit detector"""
        self.q1 = np.percentile(X, 25, axis=0)
        self.q3 = np.percentile(X, 75, axis=0)
        return self
    
    def predict(self, X):
        """Predict anomalies"""
        iqr = self.q3 - self.q1
        lower_bound = self.q1 - self.k * iqr
        upper_bound = self.q3 + self.k * iqr
        
        return np.any(
            (X < lower_bound) | (X > upper_bound),
            axis=1
        )
    
    def plot_bounds(self, X):
        """Plot bounds and data"""
        iqr = self.q3 - self.q1
        lower_bound = self.q1 - self.k * iqr
        upper_bound = self.q3 + self.k * iqr
        
        plt.figure(figsize=(10, 6))
        for i in range(X.shape[1]):
            plt.subplot(X.shape[1], 1, i+1)
            plt.plot(X[:, i], 'b.')
            plt.axhline(
                y=lower_bound[i],
                color='r',
                linestyle='--'
            )
            plt.axhline(
                y=upper_bound[i],
                color='r',
                linestyle='--'
            )
            plt.ylabel(f'Feature {i+1}')
        
        plt.xlabel('Sample')
        plt.tight_layout()
        return plt.gcf()

Density-Based Methods

1. Local Outlier Factor (LOF)

from sklearn.neighbors import LocalOutlierFactor

class LOFDetector:
    def __init__(
        self,
        n_neighbors=20,
        contamination='auto',
        **kwargs
    ):
        """Initialize LOF detector"""
        self.model = LocalOutlierFactor(
            n_neighbors=n_neighbors,
            contamination=contamination,
            **kwargs
        )
    
    def fit_predict(self, X):
        """Fit and predict anomalies"""
        return self.model.fit_predict(X) == -1
    
    def predict(self, X):
        """Predict anomalies"""
        return self.model.predict(X) == -1
    
    def score_samples(self, X):
        """Get anomaly scores"""
        return -self.model.negative_outlier_factor_
    
    def plot_scores(self, X):
        """Plot anomaly scores"""
        scores = self.score_samples(X)
        
        plt.figure(figsize=(10, 6))
        plt.plot(scores, 'b.')
        plt.xlabel('Sample')
        plt.ylabel('Anomaly Score')
        plt.title('LOF Anomaly Scores')
        return plt.gcf()

2. Isolation Forest

from sklearn.ensemble import IsolationForest

class IForestDetector:
    def __init__(
        self,
        n_estimators=100,
        contamination='auto',
        **kwargs
    ):
        """Initialize Isolation Forest detector"""
        self.model = IsolationForest(
            n_estimators=n_estimators,
            contamination=contamination,
            **kwargs
        )
    
    def fit(self, X):
        """Fit detector"""
        self.model.fit(X)
        return self
    
    def predict(self, X):
        """Predict anomalies"""
        return self.model.predict(X) == -1
    
    def score_samples(self, X):
        """Get anomaly scores"""
        return -self.model.score_samples(X)
    
    def plot_scores(self, X):
        """Plot anomaly scores"""
        scores = self.score_samples(X)
        
        plt.figure(figsize=(10, 6))
        plt.plot(scores, 'b.')
        plt.xlabel('Sample')
        plt.ylabel('Anomaly Score')
        plt.title('Isolation Forest Anomaly Scores')
        return plt.gcf()

Deep Learning Methods

1. Autoencoder

import tensorflow as tf
from tensorflow.keras import layers, models

class AutoencoderDetector:
    def __init__(
        self,
        input_dim,
        encoding_dim,
        hidden_layers=None,
        threshold=None
    ):
        """Initialize autoencoder detector"""
        self.input_dim = input_dim
        self.encoding_dim = encoding_dim
        self.hidden_layers = hidden_layers or []
        self.threshold = threshold
        
        self.model = self._build_autoencoder()
    
    def _build_autoencoder(self):
        """Build autoencoder model"""
        # Encoder
        encoder = models.Sequential()
        encoder.add(layers.Input(shape=(self.input_dim,)))
        
        for units in self.hidden_layers:
            encoder.add(layers.Dense(units, activation='relu'))
        
        encoder.add(layers.Dense(self.encoding_dim, activation='relu'))
        
        # Decoder
        decoder = models.Sequential()
        decoder.add(layers.Input(shape=(self.encoding_dim,)))
        
        for units in reversed(self.hidden_layers):
            decoder.add(layers.Dense(units, activation='relu'))
        
        decoder.add(layers.Dense(self.input_dim, activation='sigmoid'))
        
        # Autoencoder
        autoencoder = models.Sequential([encoder, decoder])
        return autoencoder
    
    def fit(self, X, **kwargs):
        """Fit detector"""
        self.model.compile(
            optimizer='adam',
            loss='mse'
        )
        
        history = self.model.fit(
            X, X,
            **kwargs
        )
        
        if self.threshold is None:
            # Set threshold based on reconstruction error
            reconstructed = self.model.predict(X)
            errors = np.mean(
                np.square(X - reconstructed),
                axis=1
            )
            self.threshold = np.percentile(errors, 95)
        
        return history
    
    def predict(self, X):
        """Predict anomalies"""
        reconstructed = self.model.predict(X)
        errors = np.mean(
            np.square(X - reconstructed),
            axis=1
        )
        return errors > self.threshold
    
    def plot_reconstruction_error(self, X):
        """Plot reconstruction error"""
        reconstructed = self.model.predict(X)
        errors = np.mean(
            np.square(X - reconstructed),
            axis=1
        )
        
        plt.figure(figsize=(10, 6))
        plt.plot(errors, 'b.')
        if self.threshold is not None:
            plt.axhline(
                y=self.threshold,
                color='r',
                linestyle='--',
                label='Threshold'
            )
        plt.xlabel('Sample')
        plt.ylabel('Reconstruction Error')
        plt.title('Autoencoder Reconstruction Error')
        plt.legend()
        return plt.gcf()

Model Evaluation

1. Evaluation Metrics

from sklearn.metrics import (
    precision_score,
    recall_score,
    f1_score,
    roc_curve,
    auc
)

def evaluate_detector(y_true, y_pred, scores=None):
    """Evaluate anomaly detector"""
    metrics = {
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred)
    }
    
    if scores is not None:
        fpr, tpr, _ = roc_curve(y_true, scores)
        metrics['auc'] = auc(fpr, tpr)
    
    return metrics

def plot_roc_curve(y_true, scores):
    """Plot ROC curve"""
    fpr, tpr, _ = roc_curve(y_true, scores)
    roc_auc = auc(fpr, tpr)
    
    plt.figure(figsize=(8, 6))
    plt.plot(
        fpr,
        tpr,
        color='darkorange',
        lw=2,
        label=f'ROC curve (AUC = {roc_auc:.2f})'
    )
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic')
    plt.legend(loc='lower right')
    return plt.gcf()

Best Practices

1. Data Preparation

  • Scale features appropriately
  • Handle missing values
  • Remove known anomalies
  • Consider dimensionality reduction

2. Algorithm Selection

  • Consider data characteristics
  • Evaluate computational requirements
  • Test multiple methods
  • Validate results

3. Parameter Tuning

  • Set appropriate thresholds
  • Consider contamination level
  • Adjust model parameters
  • Monitor performance

4. Model Evaluation

  • Use multiple metrics
  • Consider business impact
  • Validate with domain experts
  • Monitor false positives