Bias and Fairness
Understanding and mitigating bias in machine learning systems
Techniques for detecting and mitigating bias in machine learning systems.
Bias Detection
Demographic Analysis
import pandas as pd
import numpy as np
from sklearn.metrics import confusion_matrix
def compute_demographic_metrics(predictions, labels, protected_attributes):
metrics = {}
for attribute in protected_attributes:
groups = np.unique(protected_attributes[attribute])
group_metrics = {}
for group in groups:
mask = protected_attributes[attribute] == group
group_preds = predictions[mask]
group_labels = labels[mask]
# Calculate metrics
tn, fp, fn, tp = confusion_matrix(
group_labels,
group_preds
).ravel()
group_metrics[group] = {
'accuracy': (tp + tn) / (tp + tn + fp + fn),
'precision': tp / (tp + fp) if (tp + fp) > 0 else 0,
'recall': tp / (tp + fn) if (tp + fn) > 0 else 0,
'false_positive_rate': fp / (fp + tn) if (fp + tn) > 0 else 0,
'false_negative_rate': fn / (fn + tp) if (fn + tp) > 0 else 0
}
metrics[attribute] = group_metrics
return metrics
def calculate_disparate_impact(predictions, protected_attribute):
groups = np.unique(protected_attribute)
# Calculate positive prediction rates for each group
positive_rates = {}
for group in groups:
mask = protected_attribute == group
group_preds = predictions[mask]
positive_rates[group] = np.mean(group_preds)
# Calculate disparate impact ratio
min_rate = min(positive_rates.values())
max_rate = max(positive_rates.values())
return min_rate / max_rate if max_rate > 0 else 1.0
Fairness Metrics
class FairnessMetrics:
def __init__(self, predictions, labels, protected_attributes):
self.predictions = predictions
self.labels = labels
self.protected_attributes = protected_attributes
def demographic_parity(self):
"""Measures if predictions are independent of protected attribute"""
group_predictions = {}
for group in np.unique(self.protected_attributes):
mask = self.protected_attributes == group
group_predictions[group] = np.mean(self.predictions[mask])
return max(group_predictions.values()) - min(group_predictions.values())
def equal_opportunity(self):
"""Measures if true positive rates are equal across groups"""
group_tpr = {}
for group in np.unique(self.protected_attributes):
mask = (self.protected_attributes == group) & (self.labels == 1)
group_tpr[group] = np.mean(self.predictions[mask])
return max(group_tpr.values()) - min(group_tpr.values())
def equalized_odds(self):
"""Measures if true positive and false positive rates are equal"""
group_rates = {}
for group in np.unique(self.protected_attributes):
mask = self.protected_attributes == group
# True positive rate
tpr_mask = mask & (self.labels == 1)
tpr = np.mean(self.predictions[tpr_mask])
# False positive rate
fpr_mask = mask & (self.labels == 0)
fpr = np.mean(self.predictions[fpr_mask])
group_rates[group] = {'tpr': tpr, 'fpr': fpr}
tpr_diff = max(r['tpr'] for r in group_rates.values()) - \
min(r['tpr'] for r in group_rates.values())
fpr_diff = max(r['fpr'] for r in group_rates.values()) - \
min(r['fpr'] for r in group_rates.values())
return max(tpr_diff, fpr_diff)
Bias Mitigation
Pre-processing Techniques
class Reweighting:
def __init__(self, protected_attribute):
self.protected_attribute = protected_attribute
self.weights = None
def fit(self, X, y):
groups = np.unique(self.protected_attribute)
total_samples = len(y)
self.weights = np.ones(total_samples)
for group in groups:
mask = self.protected_attribute == group
group_size = np.sum(mask)
# Calculate weight for group
weight = total_samples / (len(groups) * group_size)
self.weights[mask] = weight
return self
def transform(self, X, y):
return X, y, self.weights
class DataAugmentation:
def __init__(self, protected_attribute):
self.protected_attribute = protected_attribute
def generate_synthetic_samples(self, X, y, group_mask):
from sklearn.neighbors import KNeighborsRegressor
# Train KNN on group data
knn = KNeighborsRegressor(n_neighbors=5)
group_X = X[group_mask]
group_y = y[group_mask]
knn.fit(group_X, group_y)
# Generate synthetic samples
n_samples = np.sum(~group_mask) - np.sum(group_mask)
if n_samples <= 0:
return np.array([]), np.array([])
synthetic_X = group_X[np.random.choice(len(group_X), n_samples)]
synthetic_y = knn.predict(synthetic_X)
return synthetic_X, synthetic_y
def fit_transform(self, X, y):
groups = np.unique(self.protected_attribute)
# Find minority groups
group_sizes = {
group: np.sum(self.protected_attribute == group)
for group in groups
}
max_size = max(group_sizes.values())
augmented_X = X.copy()
augmented_y = y.copy()
augmented_protected = self.protected_attribute.copy()
# Generate synthetic samples for minority groups
for group in groups:
if group_sizes[group] < max_size:
group_mask = self.protected_attribute == group
synthetic_X, synthetic_y = self.generate_synthetic_samples(
X, y, group_mask
)
if len(synthetic_X) > 0:
augmented_X = np.vstack([augmented_X, synthetic_X])
augmented_y = np.concatenate([augmented_y, synthetic_y])
augmented_protected = np.concatenate([
augmented_protected,
np.full(len(synthetic_X), group)
])
return augmented_X, augmented_y, augmented_protected
In-processing Techniques
class AdversarialDebiasing:
def __init__(self, predictor, adversary_hidden_dims=[64, 32]):
self.predictor = predictor
self.adversary = nn.Sequential(
nn.Linear(predictor.output_dim, adversary_hidden_dims[0]),
nn.ReLU(),
nn.Linear(adversary_hidden_dims[0], adversary_hidden_dims[1]),
nn.ReLU(),
nn.Linear(adversary_hidden_dims[1], 1)
)
def forward(self, x):
# Predictor forward pass
pred_output = self.predictor(x)
# Adversary forward pass
adv_output = self.adversary(pred_output.detach())
return pred_output, adv_output
def train_step(self, x, y, protected_attribute, optimizer_p, optimizer_a):
# Train predictor
optimizer_p.zero_grad()
pred_output, adv_output = self.forward(x)
pred_loss = F.binary_cross_entropy(pred_output, y)
pred_loss.backward()
optimizer_p.step()
# Train adversary
optimizer_a.zero_grad()
_, adv_output = self.forward(x)
adv_loss = F.binary_cross_entropy(adv_output, protected_attribute)
adv_loss.backward()
optimizer_a.step()
return pred_loss.item(), adv_loss.item()
Post-processing Techniques
class ThresholdOptimizer:
def __init__(self, protected_attribute):
self.protected_attribute = protected_attribute
self.thresholds = {}
def find_optimal_threshold(self, scores, labels, group):
thresholds = np.linspace(0, 1, 100)
best_threshold = 0.5
min_diff = float('inf')
for threshold in thresholds:
predictions = (scores >= threshold).astype(int)
# Calculate true/false positive rates
mask_pos = labels == 1
mask_neg = labels == 0
tpr = np.mean(predictions[mask_pos])
fpr = np.mean(predictions[mask_neg])
# Minimize difference between TPR and target rate
target_rate = np.mean(labels)
diff = abs(np.mean(predictions) - target_rate)
if diff < min_diff:
min_diff = diff
best_threshold = threshold
return best_threshold
def fit(self, scores, labels):
groups = np.unique(self.protected_attribute)
for group in groups:
mask = self.protected_attribute == group
group_scores = scores[mask]
group_labels = labels[mask]
self.thresholds[group] = self.find_optimal_threshold(
group_scores,
group_labels,
group
)
return self
def predict(self, scores):
predictions = np.zeros_like(scores)
for group in self.thresholds:
mask = self.protected_attribute == group
predictions[mask] = (
scores[mask] >= self.thresholds[group]
).astype(int)
return predictions
Model Evaluation
def evaluate_model_fairness(model, X, y, protected_attributes):
# Basic performance metrics
predictions = model.predict(X)
performance_metrics = {
'accuracy': accuracy_score(y, predictions),
'precision': precision_score(y, predictions),
'recall': recall_score(y, predictions)
}
# Fairness metrics
fairness = FairnessMetrics(predictions, y, protected_attributes)
fairness_metrics = {
'demographic_parity': fairness.demographic_parity(),
'equal_opportunity': fairness.equal_opportunity(),
'equalized_odds': fairness.equalized_odds()
}
# Demographic analysis
demographic_metrics = compute_demographic_metrics(
predictions,
y,
protected_attributes
)
return {
'performance': performance_metrics,
'fairness': fairness_metrics,
'demographic': demographic_metrics
}
Best Practices
- Data Collection and Preprocessing
def prepare_fair_dataset(data, protected_columns):
# Check for representation
representation_stats = {}
for col in protected_columns:
value_counts = data[col].value_counts(normalize=True)
representation_stats[col] = value_counts
# Check for severe imbalance
if value_counts.min() < 0.1:
print(f"Warning: Severe underrepresentation in {col}")
# Check for correlations
correlations = data.corr()
protected_correlations = correlations[protected_columns]
return {
'representation': representation_stats,
'correlations': protected_correlations
}
- Model Selection
def select_fair_model(dataset_size, protected_attributes, target_fairness_metric):
if target_fairness_metric == 'demographic_parity':
return AdversarialDebiasing
elif dataset_size < 10000:
return ThresholdOptimizer
else:
return Reweighting
- Monitoring and Reporting
def generate_fairness_report(model, data, protected_attributes, period='2023-Q1'):
# Evaluate model
metrics = evaluate_model_fairness(
model,
data['X'],
data['y'],
protected_attributes
)
# Generate visualizations
plots = {
'demographic_distribution': plot_demographic_distribution(
data,
protected_attributes
),
'fairness_metrics': plot_fairness_metrics(metrics['fairness']),
'performance_by_group': plot_performance_by_group(
metrics['demographic']
)
}
return {
'period': period,
'metrics': metrics,
'plots': plots,
'recommendations': generate_fairness_recommendations(metrics)
}