Clustering
Understanding and implementing clustering algorithms in machine learning
Clustering is an unsupervised learning technique that groups similar data points together based on their features.
K-Means Clustering
Basic Implementation
from sklearn.cluster import KMeans
import numpy as np
def kmeans_clustering(X, n_clusters=5):
# Initialize and fit KMeans
kmeans = KMeans(
n_clusters=n_clusters,
random_state=42
)
clusters = kmeans.fit_predict(X)
return kmeans, clusters
def plot_kmeans_clusters(X, kmeans):
plt.figure(figsize=(10, 6))
plt.scatter(
X[:, 0], X[:, 1],
c=kmeans.labels_,
cmap='viridis'
)
plt.scatter(
kmeans.cluster_centers_[:, 0],
kmeans.cluster_centers_[:, 1],
marker='x',
color='red',
s=200,
label='Centroids'
)
plt.title('K-Means Clustering')
plt.legend()
return plt
Elbow Method
def find_optimal_k(X, max_k=10):
inertias = []
for k in range(1, max_k + 1):
kmeans = KMeans(n_clusters=k, random_state=42)
kmeans.fit(X)
inertias.append(kmeans.inertia_)
# Plot elbow curve
plt.figure(figsize=(10, 6))
plt.plot(range(1, max_k + 1), inertias, 'bx-')
plt.xlabel('k')
plt.ylabel('Inertia')
plt.title('Elbow Method for Optimal k')
return plt
Silhouette Analysis
from sklearn.metrics import silhouette_score, silhouette_samples
def silhouette_analysis(X, n_clusters):
# Compute silhouette scores
kmeans = KMeans(n_clusters=n_clusters)
cluster_labels = kmeans.fit_predict(X)
silhouette_avg = silhouette_score(X, cluster_labels)
sample_silhouette_values = silhouette_samples(X, cluster_labels)
# Plot silhouette analysis
plt.figure(figsize=(10, 6))
y_lower = 10
for i in range(n_clusters):
ith_cluster_values = sample_silhouette_values[cluster_labels == i]
ith_cluster_values.sort()
size_cluster_i = ith_cluster_values.shape[0]
y_upper = y_lower + size_cluster_i
plt.fill_betweenx(
np.arange(y_lower, y_upper),
0, ith_cluster_values,
alpha=0.7
)
y_lower = y_upper + 10
plt.title(f'Silhouette Analysis (avg score: {silhouette_avg:.3f})')
plt.xlabel('Silhouette Coefficient')
plt.ylabel('Cluster')
return plt
Hierarchical Clustering
Agglomerative Clustering
from sklearn.cluster import AgglomerativeClustering
from scipy.cluster.hierarchy import dendrogram, linkage
def hierarchical_clustering(X, n_clusters=5):
# Perform hierarchical clustering
clustering = AgglomerativeClustering(n_clusters=n_clusters)
clusters = clustering.fit_predict(X)
return clustering, clusters
def plot_dendrogram(X):
# Create linkage matrix
linkage_matrix = linkage(X, method='ward')
# Plot dendrogram
plt.figure(figsize=(12, 8))
dendrogram(linkage_matrix)
plt.title('Hierarchical Clustering Dendrogram')
plt.xlabel('Sample Index')
plt.ylabel('Distance')
return plt
DBSCAN (Density-Based Spatial Clustering)
from sklearn.cluster import DBSCAN
from sklearn.neighbors import NearestNeighbors
def dbscan_clustering(X, eps=0.5, min_samples=5):
# Perform DBSCAN clustering
dbscan = DBSCAN(eps=eps, min_samples=min_samples)
clusters = dbscan.fit_predict(X)
return dbscan, clusters
def find_optimal_eps(X, n_neighbors=5):
# Calculate distances to nearest neighbors
neigh = NearestNeighbors(n_neighbors=n_neighbors)
neigh.fit(X)
distances, _ = neigh.kneighbors(X)
# Plot k-distance graph
plt.figure(figsize=(10, 6))
plt.plot(np.sort(distances[:, -1]))
plt.xlabel('Points')
plt.ylabel(f'Distance to {n_neighbors}th nearest neighbor')
plt.title('K-distance Graph')
return plt
Gaussian Mixture Models (GMM)
from sklearn.mixture import GaussianMixture
def gmm_clustering(X, n_components=5):
# Fit Gaussian Mixture Model
gmm = GaussianMixture(
n_components=n_components,
random_state=42
)
clusters = gmm.fit_predict(X)
return gmm, clusters
def plot_gmm_contours(X, gmm):
# Create mesh grid
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(
np.linspace(x_min, x_max, 100),
np.linspace(y_min, y_max, 100)
)
# Get probabilities
Z = gmm.predict_proba(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape((100, 100, gmm.n_components))
# Plot
plt.figure(figsize=(10, 6))
plt.contourf(xx, yy, Z[:, :, 0])
plt.scatter(X[:, 0], X[:, 1], c=gmm.predict(X))
plt.title('GMM Clustering with Probability Contours')
return plt
Evaluation Metrics
from sklearn.metrics import (
calinski_harabasz_score,
davies_bouldin_score,
adjusted_rand_score
)
def evaluate_clustering(X, labels, true_labels=None):
metrics = {
'calinski_harabasz': calinski_harabasz_score(X, labels),
'davies_bouldin': davies_bouldin_score(X, labels)
}
if true_labels is not None:
metrics['adjusted_rand'] = adjusted_rand_score(
true_labels,
labels
)
return metrics
Feature Preprocessing
def preprocess_for_clustering(X):
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Reduce dimensionality if needed
if X.shape[1] > 50:
pca = PCA(n_components=0.95)
X_reduced = pca.fit_transform(X_scaled)
return X_reduced, scaler, pca
return X_scaled, scaler, None
Visualization Tools
def plot_cluster_distribution(labels):
plt.figure(figsize=(10, 6))
pd.Series(labels).value_counts().plot(kind='bar')
plt.title('Cluster Size Distribution')
plt.xlabel('Cluster')
plt.ylabel('Number of Samples')
return plt
def plot_feature_distribution_by_cluster(X, labels, feature_names):
n_features = X.shape[1]
n_clusters = len(np.unique(labels))
fig, axes = plt.subplots(
n_features, 1,
figsize=(12, 4 * n_features)
)
for i, (ax, feature) in enumerate(zip(axes, feature_names)):
for cluster in range(n_clusters):
ax.hist(
X[labels == cluster, i],
alpha=0.5,
label=f'Cluster {cluster}'
)
ax.set_title(f'Distribution of {feature} by Cluster')
ax.legend()
plt.tight_layout()
return fig
Best Practices
- Data Preparation
def prepare_data_for_clustering(X):
# Handle missing values
imputer = SimpleImputer(strategy='mean')
X = imputer.fit_transform(X)
# Remove outliers
isolation_forest = IsolationForest(random_state=42)
mask = isolation_forest.fit_predict(X) == 1
X_clean = X[mask]
# Scale features
X_scaled = StandardScaler().fit_transform(X_clean)
return X_scaled
- Cluster Validation
def validate_clustering(X, clustering_methods):
results = {}
for name, method in clustering_methods.items():
labels = method.fit_predict(X)
scores = evaluate_clustering(X, labels)
results[name] = scores
return pd.DataFrame(results)
- Ensemble Clustering
from sklearn.ensemble import VotingClassifier
def create_clustering_ensemble(X, base_clusterers):
# Get predictions from each clusterer
predictions = []
for clusterer in base_clusterers:
pred = clusterer.fit_predict(X)
predictions.append(pred)
# Combine predictions (majority voting)
ensemble_pred = np.apply_along_axis(
lambda x: np.bincount(x).argmax(),
axis=0,
arr=np.array(predictions)
)
return ensemble_pred