Recommender Systems

Understanding and implementing various recommendation algorithms

Recommender systems help users discover items or content by predicting their preferences.

Collaborative Filtering

User-Based Collaborative Filtering

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class UserBasedCF:
    def __init__(self, n_neighbors=5):
        self.n_neighbors = n_neighbors
        
    def fit(self, ratings_matrix):
        self.ratings = ratings_matrix
        self.similarity_matrix = cosine_similarity(ratings_matrix)
        return self
    
    def predict(self, user_id, item_id):
        # Find similar users
        user_similarities = self.similarity_matrix[user_id]
        similar_users = np.argsort(user_similarities)[-self.n_neighbors:]
        
        # Get ratings from similar users
        similar_ratings = self.ratings[similar_users, item_id]
        weights = user_similarities[similar_users]
        
        # Calculate weighted average
        if np.sum(weights) == 0:
            return 0
        return np.sum(similar_ratings * weights) / np.sum(weights)

Item-Based Collaborative Filtering

class ItemBasedCF:
    def __init__(self, n_neighbors=5):
        self.n_neighbors = n_neighbors
        
    def fit(self, ratings_matrix):
        self.ratings = ratings_matrix
        self.similarity_matrix = cosine_similarity(ratings_matrix.T)
        return self
    
    def predict(self, user_id, item_id):
        # Find similar items
        item_similarities = self.similarity_matrix[item_id]
        similar_items = np.argsort(item_similarities)[-self.n_neighbors:]
        
        # Get user's ratings for similar items
        user_ratings = self.ratings[user_id, similar_items]
        weights = item_similarities[similar_items]
        
        # Calculate weighted average
        if np.sum(weights) == 0:
            return 0
        return np.sum(user_ratings * weights) / np.sum(weights)

Matrix Factorization

SVD (Singular Value Decomposition)

class SVDRecommender:
    def __init__(self, n_factors=100, n_epochs=20, lr=0.01, reg=0.1):
        self.n_factors = n_factors
        self.n_epochs = n_epochs
        self.lr = lr
        self.reg = reg
        
    def fit(self, ratings_matrix):
        n_users, n_items = ratings_matrix.shape
        
        # Initialize latent factors
        self.user_factors = np.random.normal(
            0, 0.1,
            (n_users, self.n_factors)
        )
        self.item_factors = np.random.normal(
            0, 0.1,
            (n_items, self.n_factors)
        )
        
        # Train model
        for epoch in range(self.n_epochs):
            for u in range(n_users):
                for i in range(n_items):
                    if ratings_matrix[u, i] > 0:
                        # Compute error
                        prediction = self.user_factors[u].dot(
                            self.item_factors[i]
                        )
                        error = ratings_matrix[u, i] - prediction
                        
                        # Update factors
                        self.user_factors[u] += self.lr * (
                            error * self.item_factors[i] - 
                            self.reg * self.user_factors[u]
                        )
                        self.item_factors[i] += self.lr * (
                            error * self.user_factors[u] - 
                            self.reg * self.item_factors[i]
                        )
        
        return self
    
    def predict(self, user_id, item_id):
        return self.user_factors[user_id].dot(self.item_factors[item_id])

Alternating Least Squares (ALS)

from scipy.sparse.linalg import svds

class ALSRecommender:
    def __init__(self, n_factors=100, n_iterations=20, reg=0.1):
        self.n_factors = n_factors
        self.n_iterations = n_iterations
        self.reg = reg
        
    def fit(self, ratings_matrix):
        n_users, n_items = ratings_matrix.shape
        
        # Initialize factors
        self.user_factors = np.random.normal(
            0, 0.1,
            (n_users, self.n_factors)
        )
        self.item_factors = np.random.normal(
            0, 0.1,
            (n_items, self.n_factors)
        )
        
        # Alternate between updating user and item factors
        for _ in range(self.n_iterations):
            # Update user factors
            for u in range(n_users):
                rated_items = ratings_matrix[u].nonzero()[0]
                if len(rated_items) > 0:
                    A = self.item_factors[rated_items]
                    b = ratings_matrix[u, rated_items]
                    A_reg = np.vstack([A, np.sqrt(self.reg) * np.eye(self.n_factors)])
                    b_reg = np.concatenate([b, np.zeros(self.n_factors)])
                    self.user_factors[u] = np.linalg.lstsq(A_reg, b_reg, rcond=None)[0]
            
            # Update item factors
            for i in range(n_items):
                rated_users = ratings_matrix[:, i].nonzero()[0]
                if len(rated_users) > 0:
                    A = self.user_factors[rated_users]
                    b = ratings_matrix[rated_users, i]
                    A_reg = np.vstack([A, np.sqrt(self.reg) * np.eye(self.n_factors)])
                    b_reg = np.concatenate([b, np.zeros(self.n_factors)])
                    self.item_factors[i] = np.linalg.lstsq(A_reg, b_reg, rcond=None)[0]
        
        return self
    
    def predict(self, user_id, item_id):
        return self.user_factors[user_id].dot(self.item_factors[item_id])

Content-Based Filtering

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import linear_kernel

class ContentBasedRecommender:
    def __init__(self):
        self.tfidf = TfidfVectorizer(
            stop_words='english',
            max_features=5000
        )
        
    def fit(self, item_descriptions):
        # Create TF-IDF matrix
        self.tfidf_matrix = self.tfidf.fit_transform(item_descriptions)
        
        # Compute similarity matrix
        self.similarity_matrix = linear_kernel(
            self.tfidf_matrix,
            self.tfidf_matrix
        )
        
        return self
    
    def recommend(self, item_id, n_recommendations=5):
        # Get similarity scores
        item_similarities = self.similarity_matrix[item_id]
        
        # Get top similar items
        similar_items = np.argsort(item_similarities)[-n_recommendations-1:]
        similar_items = similar_items[:-1]  # Remove the item itself
        
        return similar_items

Hybrid Recommender Systems

class HybridRecommender:
    def __init__(
        self,
        collaborative_weight=0.7,
        content_weight=0.3
    ):
        self.cf_weight = collaborative_weight
        self.cb_weight = content_weight
        self.cf_model = SVDRecommender()
        self.cb_model = ContentBasedRecommender()
        
    def fit(self, ratings_matrix, item_descriptions):
        # Train collaborative filtering model
        self.cf_model.fit(ratings_matrix)
        
        # Train content-based model
        self.cb_model.fit(item_descriptions)
        
        return self
    
    def predict(self, user_id, item_id):
        # Get predictions from both models
        cf_pred = self.cf_model.predict(user_id, item_id)
        cb_pred = self.cb_model.predict(user_id, item_id)
        
        # Combine predictions
        return (
            self.cf_weight * cf_pred +
            self.cb_weight * cb_pred
        )

Evaluation Metrics

def calculate_recommendation_metrics(y_true, y_pred):
    # Root Mean Square Error
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    
    # Mean Absolute Error
    mae = mean_absolute_error(y_true, y_pred)
    
    # Normalized Discounted Cumulative Gain
    ndcg = ndcg_score(y_true.reshape(1, -1), y_pred.reshape(1, -1))
    
    return {
        'rmse': rmse,
        'mae': mae,
        'ndcg': ndcg
    }

def precision_at_k(actual, predicted, k=5):
    actual_set = set(actual)
    predicted_set = set(predicted[:k])
    return len(actual_set & predicted_set) / k

def recall_at_k(actual, predicted, k=5):
    actual_set = set(actual)
    predicted_set = set(predicted[:k])
    return len(actual_set & predicted_set) / len(actual_set)

Cold Start Handling

class ColdStartHandler:
    def __init__(self, n_recommendations=5):
        self.n_recommendations = n_recommendations
        
    def handle_new_user(self, ratings_matrix, item_features):
        # Get popular items
        item_popularity = np.sum(ratings_matrix > 0, axis=0)
        popular_items = np.argsort(item_popularity)[-self.n_recommendations:]
        
        return popular_items
    
    def handle_new_item(self, item_features, existing_items):
        # Find similar items based on features
        similarity = cosine_similarity(
            item_features.reshape(1, -1),
            existing_items
        )
        similar_items = np.argsort(similarity[0])[-self.n_recommendations:]
        
        return similar_items

Best Practices

  1. Data Preprocessing
def preprocess_ratings(ratings_df):
    # Create user-item matrix
    ratings_matrix = ratings_df.pivot(
        index='user_id',
        columns='item_id',
        values='rating'
    ).fillna(0)
    
    # Normalize ratings
    ratings_mean = ratings_matrix.mean(axis=1)
    ratings_norm = ratings_matrix.sub(ratings_mean, axis=0)
    
    return ratings_norm, ratings_mean
  1. Model Selection
def select_recommender(
    n_users,
    n_items,
    sparsity,
    has_content_features=False
):
    if sparsity > 0.99:
        if has_content_features:
            return ContentBasedRecommender()
        else:
            return ItemBasedCF(n_neighbors=10)
    elif n_users * n_items > 1e6:
        return ALSRecommender()
    else:
        return SVDRecommender()
  1. Performance Optimization
def optimize_recommendations(model, ratings_matrix, batch_size=1000):
    n_users, n_items = ratings_matrix.shape
    predictions = np.zeros_like(ratings_matrix)
    
    # Process in batches
    for i in range(0, n_users, batch_size):
        batch_users = range(i, min(i + batch_size, n_users))
        for j in range(0, n_items, batch_size):
            batch_items = range(j, min(j + batch_size, n_items))
            
            # Generate predictions for batch
            for u in batch_users:
                for v in batch_items:
                    predictions[u, v] = model.predict(u, v)
    
    return predictions