Clustering

Understanding and implementing clustering algorithms in machine learning

Clustering is an unsupervised learning technique that groups similar data points together based on their features.

K-Means Clustering

Basic Implementation

from sklearn.cluster import KMeans
import numpy as np

def kmeans_clustering(X, n_clusters=5):
    # Initialize and fit KMeans
    kmeans = KMeans(
        n_clusters=n_clusters,
        random_state=42
    )
    clusters = kmeans.fit_predict(X)
    
    return kmeans, clusters

def plot_kmeans_clusters(X, kmeans):
    plt.figure(figsize=(10, 6))
    plt.scatter(
        X[:, 0], X[:, 1],
        c=kmeans.labels_,
        cmap='viridis'
    )
    plt.scatter(
        kmeans.cluster_centers_[:, 0],
        kmeans.cluster_centers_[:, 1],
        marker='x',
        color='red',
        s=200,
        label='Centroids'
    )
    plt.title('K-Means Clustering')
    plt.legend()
    return plt

Elbow Method

def find_optimal_k(X, max_k=10):
    inertias = []
    
    for k in range(1, max_k + 1):
        kmeans = KMeans(n_clusters=k, random_state=42)
        kmeans.fit(X)
        inertias.append(kmeans.inertia_)
    
    # Plot elbow curve
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, max_k + 1), inertias, 'bx-')
    plt.xlabel('k')
    plt.ylabel('Inertia')
    plt.title('Elbow Method for Optimal k')
    return plt

Silhouette Analysis

from sklearn.metrics import silhouette_score, silhouette_samples

def silhouette_analysis(X, n_clusters):
    # Compute silhouette scores
    kmeans = KMeans(n_clusters=n_clusters)
    cluster_labels = kmeans.fit_predict(X)
    silhouette_avg = silhouette_score(X, cluster_labels)
    sample_silhouette_values = silhouette_samples(X, cluster_labels)
    
    # Plot silhouette analysis
    plt.figure(figsize=(10, 6))
    y_lower = 10
    
    for i in range(n_clusters):
        ith_cluster_values = sample_silhouette_values[cluster_labels == i]
        ith_cluster_values.sort()
        
        size_cluster_i = ith_cluster_values.shape[0]
        y_upper = y_lower + size_cluster_i
        
        plt.fill_betweenx(
            np.arange(y_lower, y_upper),
            0, ith_cluster_values,
            alpha=0.7
        )
        
        y_lower = y_upper + 10
    
    plt.title(f'Silhouette Analysis (avg score: {silhouette_avg:.3f})')
    plt.xlabel('Silhouette Coefficient')
    plt.ylabel('Cluster')
    
    return plt

Hierarchical Clustering

Agglomerative Clustering

from sklearn.cluster import AgglomerativeClustering
from scipy.cluster.hierarchy import dendrogram, linkage

def hierarchical_clustering(X, n_clusters=5):
    # Perform hierarchical clustering
    clustering = AgglomerativeClustering(n_clusters=n_clusters)
    clusters = clustering.fit_predict(X)
    
    return clustering, clusters

def plot_dendrogram(X):
    # Create linkage matrix
    linkage_matrix = linkage(X, method='ward')
    
    # Plot dendrogram
    plt.figure(figsize=(12, 8))
    dendrogram(linkage_matrix)
    plt.title('Hierarchical Clustering Dendrogram')
    plt.xlabel('Sample Index')
    plt.ylabel('Distance')
    return plt

DBSCAN (Density-Based Spatial Clustering)

from sklearn.cluster import DBSCAN
from sklearn.neighbors import NearestNeighbors

def dbscan_clustering(X, eps=0.5, min_samples=5):
    # Perform DBSCAN clustering
    dbscan = DBSCAN(eps=eps, min_samples=min_samples)
    clusters = dbscan.fit_predict(X)
    
    return dbscan, clusters

def find_optimal_eps(X, n_neighbors=5):
    # Calculate distances to nearest neighbors
    neigh = NearestNeighbors(n_neighbors=n_neighbors)
    neigh.fit(X)
    distances, _ = neigh.kneighbors(X)
    
    # Plot k-distance graph
    plt.figure(figsize=(10, 6))
    plt.plot(np.sort(distances[:, -1]))
    plt.xlabel('Points')
    plt.ylabel(f'Distance to {n_neighbors}th nearest neighbor')
    plt.title('K-distance Graph')
    return plt

Gaussian Mixture Models (GMM)

from sklearn.mixture import GaussianMixture

def gmm_clustering(X, n_components=5):
    # Fit Gaussian Mixture Model
    gmm = GaussianMixture(
        n_components=n_components,
        random_state=42
    )
    clusters = gmm.fit_predict(X)
    
    return gmm, clusters

def plot_gmm_contours(X, gmm):
    # Create mesh grid
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(
        np.linspace(x_min, x_max, 100),
        np.linspace(y_min, y_max, 100)
    )
    
    # Get probabilities
    Z = gmm.predict_proba(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape((100, 100, gmm.n_components))
    
    # Plot
    plt.figure(figsize=(10, 6))
    plt.contourf(xx, yy, Z[:, :, 0])
    plt.scatter(X[:, 0], X[:, 1], c=gmm.predict(X))
    plt.title('GMM Clustering with Probability Contours')
    return plt

Evaluation Metrics

from sklearn.metrics import (
    calinski_harabasz_score,
    davies_bouldin_score,
    adjusted_rand_score
)

def evaluate_clustering(X, labels, true_labels=None):
    metrics = {
        'calinski_harabasz': calinski_harabasz_score(X, labels),
        'davies_bouldin': davies_bouldin_score(X, labels)
    }
    
    if true_labels is not None:
        metrics['adjusted_rand'] = adjusted_rand_score(
            true_labels,
            labels
        )
    
    return metrics

Feature Preprocessing

def preprocess_for_clustering(X):
    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # Reduce dimensionality if needed
    if X.shape[1] > 50:
        pca = PCA(n_components=0.95)
        X_reduced = pca.fit_transform(X_scaled)
        return X_reduced, scaler, pca
    
    return X_scaled, scaler, None

Visualization Tools

def plot_cluster_distribution(labels):
    plt.figure(figsize=(10, 6))
    pd.Series(labels).value_counts().plot(kind='bar')
    plt.title('Cluster Size Distribution')
    plt.xlabel('Cluster')
    plt.ylabel('Number of Samples')
    return plt

def plot_feature_distribution_by_cluster(X, labels, feature_names):
    n_features = X.shape[1]
    n_clusters = len(np.unique(labels))
    
    fig, axes = plt.subplots(
        n_features, 1,
        figsize=(12, 4 * n_features)
    )
    
    for i, (ax, feature) in enumerate(zip(axes, feature_names)):
        for cluster in range(n_clusters):
            ax.hist(
                X[labels == cluster, i],
                alpha=0.5,
                label=f'Cluster {cluster}'
            )
        ax.set_title(f'Distribution of {feature} by Cluster')
        ax.legend()
    
    plt.tight_layout()
    return fig

Best Practices

  1. Data Preparation
def prepare_data_for_clustering(X):
    # Handle missing values
    imputer = SimpleImputer(strategy='mean')
    X = imputer.fit_transform(X)
    
    # Remove outliers
    isolation_forest = IsolationForest(random_state=42)
    mask = isolation_forest.fit_predict(X) == 1
    X_clean = X[mask]
    
    # Scale features
    X_scaled = StandardScaler().fit_transform(X_clean)
    
    return X_scaled
  1. Cluster Validation
def validate_clustering(X, clustering_methods):
    results = {}
    
    for name, method in clustering_methods.items():
        labels = method.fit_predict(X)
        scores = evaluate_clustering(X, labels)
        results[name] = scores
    
    return pd.DataFrame(results)
  1. Ensemble Clustering
from sklearn.ensemble import VotingClassifier

def create_clustering_ensemble(X, base_clusterers):
    # Get predictions from each clusterer
    predictions = []
    
    for clusterer in base_clusterers:
        pred = clusterer.fit_predict(X)
        predictions.append(pred)
    
    # Combine predictions (majority voting)
    ensemble_pred = np.apply_along_axis(
        lambda x: np.bincount(x).argmax(),
        axis=0,
        arr=np.array(predictions)
    )
    
    return ensemble_pred