Association Rule Learning

This section covers association rule learning algorithms and their implementation in machine learning.

Apriori Algorithm

1. Basic Implementation

import numpy as np
import pandas as pd
from itertools import combinations

class Apriori:
    def __init__(
        self,
        min_support=0.1,
        min_confidence=0.5
    ):
        """Initialize Apriori algorithm"""
        self.min_support = min_support
        self.min_confidence = min_confidence
        self.frequent_itemsets = {}
        self.rules = []
    
    def _create_itemsets(self, transactions):
        """Create initial itemsets"""
        items = set()
        for transaction in transactions:
            for item in transaction:
                items.add(frozenset([item]))
        return items
    
    def _calculate_support(self, itemset, transactions):
        """Calculate support for an itemset"""
        count = sum(1 for transaction in transactions
                   if itemset.issubset(transaction))
        return count / len(transactions)
    
    def _generate_candidates(self, prev_itemsets, k):
        """Generate candidate itemsets"""
        candidates = set()
        for i in prev_itemsets:
            for j in prev_itemsets:
                union = i.union(j)
                if len(union) == k:
                    candidates.add(union)
        return candidates
    
    def fit(self, transactions):
        """Find frequent itemsets and generate rules"""
        # Convert transactions to sets if needed
        transactions = [set(t) for t in transactions]
        
        # Find frequent 1-itemsets
        items = self._create_itemsets(transactions)
        k = 1
        frequent_k = set()
        
        for item in items:
            support = self._calculate_support(item, transactions)
            if support >= self.min_support:
                frequent_k.add(item)
                self.frequent_itemsets[item] = support
        
        # Find frequent k-itemsets
        while frequent_k:
            k += 1
            candidates = self._generate_candidates(frequent_k, k)
            frequent_k = set()
            
            for candidate in candidates:
                support = self._calculate_support(
                    candidate,
                    transactions
                )
                if support >= self.min_support:
                    frequent_k.add(candidate)
                    self.frequent_itemsets[candidate] = support
        
        # Generate rules
        self._generate_rules()
        
        return self
    
    def _generate_rules(self):
        """Generate association rules"""
        for itemset in self.frequent_itemsets:
            if len(itemset) < 2:
                continue
            
            for i in range(1, len(itemset)):
                for antecedent in combinations(itemset, i):
                    antecedent = frozenset(antecedent)
                    consequent = itemset - antecedent
                    
                    confidence = (
                        self.frequent_itemsets[itemset] /
                        self.frequent_itemsets[antecedent]
                    )
                    
                    if confidence >= self.min_confidence:
                        lift = (
                            confidence /
                            self.frequent_itemsets[consequent]
                        )
                        
                        self.rules.append({
                            'antecedent': antecedent,
                            'consequent': consequent,
                            'support': self.frequent_itemsets[itemset],
                            'confidence': confidence,
                            'lift': lift
                        })
    
    def get_rules(self, sort_by='lift', ascending=False):
        """Get association rules as DataFrame"""
        if not self.rules:
            return pd.DataFrame()
        
        df = pd.DataFrame(self.rules)
        df['antecedent'] = df['antecedent'].apply(list)
        df['consequent'] = df['consequent'].apply(list)
        
        return df.sort_values(sort_by, ascending=ascending)

2. Rule Visualization

import matplotlib.pyplot as plt
import networkx as nx

def plot_rules_network(rules_df, min_lift=1.0):
    """Plot rules as network graph"""
    # Create graph
    G = nx.DiGraph()
    
    # Add edges for rules
    for _, rule in rules_df.iterrows():
        if rule['lift'] >= min_lift:
            for ant in rule['antecedent']:
                for cons in rule['consequent']:
                    G.add_edge(
                        ant,
                        cons,
                        weight=rule['lift'],
                        confidence=rule['confidence']
                    )
    
    # Set up plot
    plt.figure(figsize=(12, 8))
    pos = nx.spring_layout(G)
    
    # Draw nodes
    nx.draw_networkx_nodes(
        G, pos,
        node_color='lightblue',
        node_size=1000
    )
    
    # Draw edges
    edges = G.edges()
    weights = [G[u][v]['weight'] for u, v in edges]
    nx.draw_networkx_edges(
        G, pos,
        edge_color=weights,
        edge_cmap=plt.cm.Reds,
        width=2
    )
    
    # Add labels
    nx.draw_networkx_labels(G, pos)
    
    plt.title('Association Rules Network')
    plt.axis('off')
    return plt.gcf()

def plot_rules_matrix(rules_df, metric='lift'):
    """Plot rules as matrix"""
    # Create items set
    items = set()
    for _, rule in rules_df.iterrows():
        items.update(rule['antecedent'])
        items.update(rule['consequent'])
    items = list(items)
    
    # Create matrix
    matrix = np.zeros((len(items), len(items)))
    
    # Fill matrix
    for _, rule in rules_df.iterrows():
        for ant in rule['antecedent']:
            for cons in rule['consequent']:
                i = items.index(ant)
                j = items.index(cons)
                matrix[i, j] = rule[metric]
    
    # Plot
    plt.figure(figsize=(10, 8))
    plt.imshow(matrix, cmap='YlOrRd')
    plt.colorbar(label=metric.capitalize())
    
    plt.xticks(
        range(len(items)),
        items,
        rotation=45,
        ha='right'
    )
    plt.yticks(range(len(items)), items)
    
    plt.title(f'Association Rules Matrix ({metric})')
    plt.tight_layout()
    return plt.gcf()

FP-Growth Algorithm

1. FP-Tree Implementation

class FPNode:
    def __init__(self, item, count=1, parent=None):
        """Initialize FP-tree node"""
        self.item = item
        self.count = count
        self.parent = parent
        self.children = {}
        self.next = None

class FPTree:
    def __init__(self):
        """Initialize FP-tree"""
        self.root = FPNode(None)
        self.headers = {}
    
    def _update_header(self, node):
        """Update header table"""
        if node.item in self.headers:
            current = self.headers[node.item]
            while current.next:
                current = current.next
            current.next = node
        else:
            self.headers[node.item] = node
    
    def add_transaction(self, transaction, count=1):
        """Add transaction to tree"""
        current = self.root
        
        for item in transaction:
            if item in current.children:
                current.children[item].count += count
            else:
                # Create new node
                new_node = FPNode(
                    item,
                    count,
                    current
                )
                current.children[item] = new_node
                self._update_header(new_node)
            
            current = current.children[item]

class FPGrowth:
    def __init__(
        self,
        min_support=0.1,
        min_confidence=0.5
    ):
        """Initialize FP-Growth algorithm"""
        self.min_support = min_support
        self.min_confidence = min_confidence
        self.frequent_patterns = {}
        self.rules = []
    
    def _create_header_table(self, transactions):
        """Create header table with item frequencies"""
        header_table = {}
        for transaction in transactions:
            for item in transaction:
                header_table[item] = header_table.get(item, 0) + 1
        
        # Filter by minimum support
        min_count = self.min_support * len(transactions)
        header_table = {
            k: v for k, v in header_table.items()
            if v >= min_count
        }
        
        return header_table
    
    def _build_fptree(self, transactions, header_table):
        """Build FP-tree"""
        # Sort items by frequency
        sorted_items = sorted(
            header_table.items(),
            key=lambda x: (-x[1], x[0])
        )
        item_order = {item[0]: i for i, item in enumerate(sorted_items)}
        
        # Create tree
        tree = FPTree()
        
        for transaction in transactions:
            # Filter and sort items
            filtered_transaction = [
                item for item in transaction
                if item in header_table
            ]
            filtered_transaction.sort(
                key=lambda x: item_order[x]
            )
            
            if filtered_transaction:
                tree.add_transaction(filtered_transaction)
        
        return tree
    
    def fit(self, transactions):
        """Find frequent patterns and generate rules"""
        # Create header table
        header_table = self._create_header_table(transactions)
        
        # Build FP-tree
        tree = self._build_fptree(transactions, header_table)
        
        # Mine patterns
        self._mine_patterns(tree, set(), len(transactions))
        
        # Generate rules
        self._generate_rules()
        
        return self

Model Evaluation

1. Rule Metrics

def evaluate_rules(rules_df):
    """Evaluate association rules"""
    metrics = {
        'total_rules': len(rules_df),
        'avg_support': rules_df['support'].mean(),
        'avg_confidence': rules_df['confidence'].mean(),
        'avg_lift': rules_df['lift'].mean(),
        'rules_by_length': rules_df.apply(
            lambda x: len(x['antecedent']) + len(x['consequent']),
            axis=1
        ).value_counts().to_dict()
    }
    
    return metrics

def plot_rule_metrics(rules_df):
    """Plot rule metrics distribution"""
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # Support distribution
    axes[0].hist(rules_df['support'], bins=20)
    axes[0].set_title('Support Distribution')
    axes[0].set_xlabel('Support')
    
    # Confidence distribution
    axes[1].hist(rules_df['confidence'], bins=20)
    axes[1].set_title('Confidence Distribution')
    axes[1].set_xlabel('Confidence')
    
    # Lift distribution
    axes[2].hist(rules_df['lift'], bins=20)
    axes[2].set_title('Lift Distribution')
    axes[2].set_xlabel('Lift')
    
    plt.tight_layout()
    return fig

Best Practices

1. Data Preparation

  • Clean transaction data
  • Handle missing values
  • Consider item encoding
  • Validate data format

2. Algorithm Selection

  • Consider dataset size
  • Evaluate memory requirements
  • Test multiple methods
  • Validate results

3. Parameter Tuning

  • Set appropriate support
  • Adjust confidence threshold
  • Consider lift values
  • Monitor rule quality

4. Rule Evaluation

  • Use multiple metrics
  • Filter meaningful rules
  • Consider business context
  • Validate with experts