Bias and Fairness

Understanding and mitigating bias in machine learning systems

Techniques for detecting and mitigating bias in machine learning systems.

Bias Detection

Demographic Analysis

import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix

def compute_demographic_metrics(predictions, labels, protected_attributes):
    metrics = {}
    
    for attribute in protected_attributes:
        groups = np.unique(protected_attributes[attribute])
        group_metrics = {}
        
        for group in groups:
            mask = protected_attributes[attribute] == group
            group_preds = predictions[mask]
            group_labels = labels[mask]
            
            # Calculate metrics
            tn, fp, fn, tp = confusion_matrix(
                group_labels,
                group_preds
            ).ravel()
            
            group_metrics[group] = {
                'accuracy': (tp + tn) / (tp + tn + fp + fn),
                'precision': tp / (tp + fp) if (tp + fp) > 0 else 0,
                'recall': tp / (tp + fn) if (tp + fn) > 0 else 0,
                'false_positive_rate': fp / (fp + tn) if (fp + tn) > 0 else 0,
                'false_negative_rate': fn / (fn + tp) if (fn + tp) > 0 else 0
            }
        
        metrics[attribute] = group_metrics
    
    return metrics

def calculate_disparate_impact(predictions, protected_attribute):
    groups = np.unique(protected_attribute)
    
    # Calculate positive prediction rates for each group
    positive_rates = {}
    for group in groups:
        mask = protected_attribute == group
        group_preds = predictions[mask]
        positive_rates[group] = np.mean(group_preds)
    
    # Calculate disparate impact ratio
    min_rate = min(positive_rates.values())
    max_rate = max(positive_rates.values())
    
    return min_rate / max_rate if max_rate > 0 else 1.0

Fairness Metrics

class FairnessMetrics:
    def __init__(self, predictions, labels, protected_attributes):
        self.predictions = predictions
        self.labels = labels
        self.protected_attributes = protected_attributes
    
    def demographic_parity(self):
        """Measures if predictions are independent of protected attribute"""
        group_predictions = {}
        
        for group in np.unique(self.protected_attributes):
            mask = self.protected_attributes == group
            group_predictions[group] = np.mean(self.predictions[mask])
        
        return max(group_predictions.values()) - min(group_predictions.values())
    
    def equal_opportunity(self):
        """Measures if true positive rates are equal across groups"""
        group_tpr = {}
        
        for group in np.unique(self.protected_attributes):
            mask = (self.protected_attributes == group) & (self.labels == 1)
            group_tpr[group] = np.mean(self.predictions[mask])
        
        return max(group_tpr.values()) - min(group_tpr.values())
    
    def equalized_odds(self):
        """Measures if true positive and false positive rates are equal"""
        group_rates = {}
        
        for group in np.unique(self.protected_attributes):
            mask = self.protected_attributes == group
            
            # True positive rate
            tpr_mask = mask & (self.labels == 1)
            tpr = np.mean(self.predictions[tpr_mask])
            
            # False positive rate
            fpr_mask = mask & (self.labels == 0)
            fpr = np.mean(self.predictions[fpr_mask])
            
            group_rates[group] = {'tpr': tpr, 'fpr': fpr}
        
        tpr_diff = max(r['tpr'] for r in group_rates.values()) - \
                   min(r['tpr'] for r in group_rates.values())
        fpr_diff = max(r['fpr'] for r in group_rates.values()) - \
                   min(r['fpr'] for r in group_rates.values())
        
        return max(tpr_diff, fpr_diff)

Bias Mitigation

Pre-processing Techniques

class Reweighting:
    def __init__(self, protected_attribute):
        self.protected_attribute = protected_attribute
        self.weights = None
    
    def fit(self, X, y):
        groups = np.unique(self.protected_attribute)
        total_samples = len(y)
        self.weights = np.ones(total_samples)
        
        for group in groups:
            mask = self.protected_attribute == group
            group_size = np.sum(mask)
            
            # Calculate weight for group
            weight = total_samples / (len(groups) * group_size)
            self.weights[mask] = weight
        
        return self
    
    def transform(self, X, y):
        return X, y, self.weights

class DataAugmentation:
    def __init__(self, protected_attribute):
        self.protected_attribute = protected_attribute
    
    def generate_synthetic_samples(self, X, y, group_mask):
        from sklearn.neighbors import KNeighborsRegressor
        
        # Train KNN on group data
        knn = KNeighborsRegressor(n_neighbors=5)
        group_X = X[group_mask]
        group_y = y[group_mask]
        knn.fit(group_X, group_y)
        
        # Generate synthetic samples
        n_samples = np.sum(~group_mask) - np.sum(group_mask)
        if n_samples <= 0:
            return np.array([]), np.array([])
        
        synthetic_X = group_X[np.random.choice(len(group_X), n_samples)]
        synthetic_y = knn.predict(synthetic_X)
        
        return synthetic_X, synthetic_y
    
    def fit_transform(self, X, y):
        groups = np.unique(self.protected_attribute)
        
        # Find minority groups
        group_sizes = {
            group: np.sum(self.protected_attribute == group)
            for group in groups
        }
        max_size = max(group_sizes.values())
        
        augmented_X = X.copy()
        augmented_y = y.copy()
        augmented_protected = self.protected_attribute.copy()
        
        # Generate synthetic samples for minority groups
        for group in groups:
            if group_sizes[group] < max_size:
                group_mask = self.protected_attribute == group
                synthetic_X, synthetic_y = self.generate_synthetic_samples(
                    X, y, group_mask
                )
                
                if len(synthetic_X) > 0:
                    augmented_X = np.vstack([augmented_X, synthetic_X])
                    augmented_y = np.concatenate([augmented_y, synthetic_y])
                    augmented_protected = np.concatenate([
                        augmented_protected,
                        np.full(len(synthetic_X), group)
                    ])
        
        return augmented_X, augmented_y, augmented_protected

In-processing Techniques

class AdversarialDebiasing:
    def __init__(self, predictor, adversary_hidden_dims=[64, 32]):
        self.predictor = predictor
        self.adversary = nn.Sequential(
            nn.Linear(predictor.output_dim, adversary_hidden_dims[0]),
            nn.ReLU(),
            nn.Linear(adversary_hidden_dims[0], adversary_hidden_dims[1]),
            nn.ReLU(),
            nn.Linear(adversary_hidden_dims[1], 1)
        )
        
    def forward(self, x):
        # Predictor forward pass
        pred_output = self.predictor(x)
        
        # Adversary forward pass
        adv_output = self.adversary(pred_output.detach())
        
        return pred_output, adv_output
    
    def train_step(self, x, y, protected_attribute, optimizer_p, optimizer_a):
        # Train predictor
        optimizer_p.zero_grad()
        pred_output, adv_output = self.forward(x)
        pred_loss = F.binary_cross_entropy(pred_output, y)
        pred_loss.backward()
        optimizer_p.step()
        
        # Train adversary
        optimizer_a.zero_grad()
        _, adv_output = self.forward(x)
        adv_loss = F.binary_cross_entropy(adv_output, protected_attribute)
        adv_loss.backward()
        optimizer_a.step()
        
        return pred_loss.item(), adv_loss.item()

Post-processing Techniques

class ThresholdOptimizer:
    def __init__(self, protected_attribute):
        self.protected_attribute = protected_attribute
        self.thresholds = {}
    
    def find_optimal_threshold(self, scores, labels, group):
        thresholds = np.linspace(0, 1, 100)
        best_threshold = 0.5
        min_diff = float('inf')
        
        for threshold in thresholds:
            predictions = (scores >= threshold).astype(int)
            
            # Calculate true/false positive rates
            mask_pos = labels == 1
            mask_neg = labels == 0
            
            tpr = np.mean(predictions[mask_pos])
            fpr = np.mean(predictions[mask_neg])
            
            # Minimize difference between TPR and target rate
            target_rate = np.mean(labels)
            diff = abs(np.mean(predictions) - target_rate)
            
            if diff < min_diff:
                min_diff = diff
                best_threshold = threshold
        
        return best_threshold
    
    def fit(self, scores, labels):
        groups = np.unique(self.protected_attribute)
        
        for group in groups:
            mask = self.protected_attribute == group
            group_scores = scores[mask]
            group_labels = labels[mask]
            
            self.thresholds[group] = self.find_optimal_threshold(
                group_scores,
                group_labels,
                group
            )
        
        return self
    
    def predict(self, scores):
        predictions = np.zeros_like(scores)
        
        for group in self.thresholds:
            mask = self.protected_attribute == group
            predictions[mask] = (
                scores[mask] >= self.thresholds[group]
            ).astype(int)
        
        return predictions

Model Evaluation

def evaluate_model_fairness(model, X, y, protected_attributes):
    # Basic performance metrics
    predictions = model.predict(X)
    performance_metrics = {
        'accuracy': accuracy_score(y, predictions),
        'precision': precision_score(y, predictions),
        'recall': recall_score(y, predictions)
    }
    
    # Fairness metrics
    fairness = FairnessMetrics(predictions, y, protected_attributes)
    fairness_metrics = {
        'demographic_parity': fairness.demographic_parity(),
        'equal_opportunity': fairness.equal_opportunity(),
        'equalized_odds': fairness.equalized_odds()
    }
    
    # Demographic analysis
    demographic_metrics = compute_demographic_metrics(
        predictions,
        y,
        protected_attributes
    )
    
    return {
        'performance': performance_metrics,
        'fairness': fairness_metrics,
        'demographic': demographic_metrics
    }

Best Practices

  1. Data Collection and Preprocessing
def prepare_fair_dataset(data, protected_columns):
    # Check for representation
    representation_stats = {}
    for col in protected_columns:
        value_counts = data[col].value_counts(normalize=True)
        representation_stats[col] = value_counts
        
        # Check for severe imbalance
        if value_counts.min() < 0.1:
            print(f"Warning: Severe underrepresentation in {col}")
    
    # Check for correlations
    correlations = data.corr()
    protected_correlations = correlations[protected_columns]
    
    return {
        'representation': representation_stats,
        'correlations': protected_correlations
    }
  1. Model Selection
def select_fair_model(dataset_size, protected_attributes, target_fairness_metric):
    if target_fairness_metric == 'demographic_parity':
        return AdversarialDebiasing
    elif dataset_size < 10000:
        return ThresholdOptimizer
    else:
        return Reweighting
  1. Monitoring and Reporting
def generate_fairness_report(model, data, protected_attributes, period='2023-Q1'):
    # Evaluate model
    metrics = evaluate_model_fairness(
        model,
        data['X'],
        data['y'],
        protected_attributes
    )
    
    # Generate visualizations
    plots = {
        'demographic_distribution': plot_demographic_distribution(
            data,
            protected_attributes
        ),
        'fairness_metrics': plot_fairness_metrics(metrics['fairness']),
        'performance_by_group': plot_performance_by_group(
            metrics['demographic']
        )
    }
    
    return {
        'period': period,
        'metrics': metrics,
        'plots': plots,
        'recommendations': generate_fairness_recommendations(metrics)
    }