Data Leakage
Understanding, detecting, and preventing data leakage in machine learning projects
Understanding Data Leakage
Data leakage occurs when information from outside the training dataset influences the model training process, leading to overly optimistic performance metrics but poor real-world performance.
Types of Data Leakage
-
Target Leakage
- Using future information to predict the past
- Including target-related features
- Using post-event data
-
Train-Test Contamination
- Information sharing between train and test sets
- Preprocessing before splitting
- Feature selection on entire dataset
Detection Methods
1. Temporal Analysis
def check_temporal_leakage(df, date_column, feature_columns):
"""Check for temporal dependencies in features"""
leakage_report = {}
# Sort by date
df_sorted = df.sort_values(date_column)
for feature in feature_columns:
# Calculate correlation with future values
future_corr = df_sorted[feature].corr(
df_sorted[feature].shift(-1)
)
leakage_report[feature] = {
'future_correlation': future_corr,
'risk_level': 'High' if abs(future_corr) > 0.7 else 'Low'
}
return pd.DataFrame(leakage_report).T
2. Feature Correlation Analysis
def analyze_feature_correlations(df, target_column, threshold=0.9):
"""Identify suspiciously high correlations"""
# Calculate correlations
corr_matrix = df.corr()
# Find high correlations with target
target_corr = corr_matrix[target_column].abs()
suspicious_features = target_corr[target_corr > threshold]
return suspicious_features
3. Train-Test Contamination Check
def check_train_test_contamination(X_train, X_test):
"""Check for data contamination between train and test sets"""
contamination_report = {}
# Check for duplicate rows
duplicates = np.array([x in X_train for x in X_test])
# Check for near-identical samples
from sklearn.neighbors import NearestNeighbors
nn = NearestNeighbors(n_neighbors=1)
nn.fit(X_train)
distances, _ = nn.kneighbors(X_test)
contamination_report = {
'exact_duplicates': duplicates.sum(),
'near_duplicates': (distances < 1e-6).sum(),
'min_distance': distances.min()
}
return contamination_report
Prevention Strategies
1. Proper Cross-Validation
from sklearn.model_selection import TimeSeriesSplit
def time_aware_cv(X, y, date_column, n_splits=5):
"""Implement time-aware cross-validation"""
# Sort by date
sorted_indices = np.argsort(X[date_column])
X_sorted = X.iloc[sorted_indices]
y_sorted = y.iloc[sorted_indices]
# Create time series split
tscv = TimeSeriesSplit(n_splits=n_splits)
# Generate splits
splits = []
for train_idx, test_idx in tscv.split(X_sorted):
splits.append((train_idx, test_idx))
return splits
2. Feature Engineering Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
class SafeFeatureEngineering(BaseEstimator, TransformerMixin):
"""Safe feature engineering without leakage"""
def __init__(self):
self.feature_stats = {}
def fit(self, X, y=None):
"""Calculate feature statistics only on training data"""
for column in X.columns:
self.feature_stats[column] = {
'mean': X[column].mean(),
'std': X[column].std(),
'median': X[column].median()
}
return self
def transform(self, X):
"""Transform using pre-computed statistics"""
X_transformed = X.copy()
for column in X.columns:
# Use stored statistics for normalization
X_transformed[column] = (
X[column] - self.feature_stats[column]['mean']
) / self.feature_stats[column]['std']
return X_transformed
3. Target Encoding
def safe_target_encoding(train_df, test_df, feature_col, target_col):
"""Implement safe target encoding without leakage"""
# Calculate encoding map using only training data
encoding_map = train_df.groupby(feature_col)[target_col].mean()
# Apply encoding to both sets
train_encoded = train_df[feature_col].map(encoding_map)
test_encoded = test_df[feature_col].map(encoding_map)
return train_encoded, test_encoded
Monitoring and Validation
1. Performance Monitoring
def monitor_performance_drift(model, X_train, X_test, y_train, y_test):
"""Monitor performance drift as potential leakage indicator"""
from sklearn.metrics import roc_auc_score
# Calculate performance metrics
train_pred = model.predict_proba(X_train)[:, 1]
test_pred = model.predict_proba(X_test)[:, 1]
train_auc = roc_auc_score(y_train, train_pred)
test_auc = roc_auc_score(y_test, test_pred)
# Calculate performance drift
drift_metrics = {
'train_auc': train_auc,
'test_auc': test_auc,
'auc_diff': train_auc - test_auc,
'drift_warning': train_auc - test_auc > 0.1
}
return drift_metrics
2. Feature Distribution Analysis
def analyze_feature_distributions(X_train, X_test):
"""Analyze feature distributions for potential leakage"""
from scipy import stats
distribution_tests = {}
for column in X_train.columns:
# Perform Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(
X_train[column],
X_test[column]
)
distribution_tests[column] = {
'ks_statistic': ks_stat,
'p_value': p_value,
'significant_diff': p_value < 0.05
}
return pd.DataFrame(distribution_tests).T
Best Practices
1. Data Splitting
- Split data before any preprocessing
- Maintain temporal order
- Use appropriate validation strategy
- Avoid look-ahead bias
2. Feature Engineering
- Process each fold independently
- Use only past information
- Validate feature creation logic
- Document assumptions
3. Model Validation
- Use multiple validation methods
- Check for unrealistic performance
- Monitor feature importance
- Test on fresh data
4. Common Pitfalls
- Preprocessing before splitting
- Using future information
- Leaking target information
- Incorrect cross-validation