Probabilistic Graphical Models

Probabilistic Graphical Models (PGMs) provide a framework for modeling complex systems by representing probability distributions over sets of random variables using graphs.

Theoretical Foundations

1. Bayesian Networks (Directed Graphical Models)

  1. Probability Factorization

    • Joint probability factorizes as: P(X1,...,Xn)=i=1nP(XiPa(Xi))P(X_1,...,X_n) = \prod_{i=1}^n P(X_i|Pa(X_i))
    • Where Pa(Xi)Pa(X_i) represents the parents of node XiX_i
  2. Conditional Independence

    • Variables are conditionally independent given their Markov blanket
    • Markov blanket includes: parents, children, and children's parents

2. Markov Random Fields (Undirected Graphical Models)

  1. Gibbs Distribution

    P(X = x) = \frac{1}{Z} \prod_{c \in C} \phi_c(x_c)
    

    where:

    • ZZ is the partition function
    • ϕc\phi_c are potential functions
    • CC is the set of maximal cliques
  2. Hammersley-Clifford Theorem

    • Any positive distribution that satisfies local Markov properties can be represented as a Gibbs distribution

Implementation Examples

1. Simple Bayesian Network

import numpy as np
from scipy.stats import norm
import networkx as nx
import matplotlib.pyplot as plt

class BayesianNetwork:
    def __init__(self):
        self.graph = nx.DiGraph()
        self.cpds = {}  # Conditional Probability Distributions
        
    def add_node(self, node_name, cpd):
        """Add node with its conditional probability distribution"""
        self.graph.add_node(node_name)
        self.cpds[node_name] = cpd
        
    def add_edge(self, parent, child):
        """Add directed edge from parent to child"""
        self.graph.add_edge(parent, child)
        
    def get_parents(self, node):
        """Get parents of a node"""
        return list(self.graph.predecessors(node))
    
    def sample(self, node, parent_values=None):
        """Sample from node's distribution given parent values"""
        cpd = self.cpds[node]
        if parent_values is None:
            return cpd()
        return cpd(**parent_values)

# Example: Weather Network
def rain_cpd():
    """Prior probability of rain"""
    return np.random.choice([0, 1], p=[0.7, 0.3])

def sprinkler_cpd(rain):
    """Conditional probability of sprinkler given rain"""
    if rain:
        return np.random.choice([0, 1], p=[0.9, 0.1])
    return np.random.choice([0, 1], p=[0.4, 0.6])

def grass_wet_cpd(rain, sprinkler):
    """Conditional probability of wet grass given rain and sprinkler"""
    if rain and sprinkler:
        return np.random.choice([0, 1], p=[0.1, 0.9])
    elif rain:
        return np.random.choice([0, 1], p=[0.2, 0.8])
    elif sprinkler:
        return np.random.choice([0, 1], p=[0.3, 0.7])
    return np.random.choice([0, 1], p=[0.9, 0.1])

# Create and visualize network
bn = BayesianNetwork()
bn.add_node('Rain', rain_cpd)
bn.add_node('Sprinkler', sprinkler_cpd)
bn.add_node('Grass_Wet', grass_wet_cpd)
bn.add_edge('Rain', 'Sprinkler')
bn.add_edge('Rain', 'Grass_Wet')
bn.add_edge('Sprinkler', 'Grass_Wet')

2. Markov Random Field

class MarkovRandomField:
    def __init__(self):
        self.graph = nx.Graph()
        self.potentials = {}
        
    def add_node(self, node):
        self.graph.add_node(node)
        
    def add_edge(self, node1, node2, potential_func):
        """Add edge with associated potential function"""
        self.graph.add_edge(node1, node2)
        self.potentials[(node1, node2)] = potential_func
        
    def get_neighbors(self, node):
        """Get neighbors of a node"""
        return list(self.graph.neighbors(node))
    
    def compute_energy(self, configuration):
        """Compute energy of a configuration"""
        energy = 0
        for edge in self.graph.edges():
            potential = self.potentials[edge]
            energy += -np.log(potential(
                configuration[edge[0]], 
                configuration[edge[1]]
            ))
        return energy

# Example: Ising Model
def ising_potential(s1, s2, beta=1.0):
    """Potential function for the Ising model"""
    return np.exp(beta * s1 * s2)

# Create 2D Ising model
def create_2d_ising_model(n, beta=1.0):
    mrf = MarkovRandomField()
    
    # Add nodes
    for i in range(n):
        for j in range(n):
            mrf.add_node((i, j))
    
    # Add horizontal edges
    for i in range(n):
        for j in range(n-1):
            mrf.add_edge((i,j), (i,j+1), 
                        lambda s1, s2: ising_potential(s1, s2, beta))
    
    # Add vertical edges
    for i in range(n-1):
        for j in range(n):
            mrf.add_edge((i,j), (i+1,j), 
                        lambda s1, s2: ising_potential(s1, s2, beta))
    
    return mrf

Inference Methods

1. Variable Elimination

def variable_elimination(factors, query_var, evidence):
    """Implement variable elimination algorithm"""
    def multiply_factors(f1, f2):
        """Multiply two factors"""
        variables = list(set(f1.variables + f2.variables))
        new_factor = Factor(variables)
        # Implementation details...
        return new_factor
    
    def sum_out_variable(factor, variable):
        """Sum out a variable from a factor"""
        remaining_vars = [v for v in factor.variables if v != variable]
        new_factor = Factor(remaining_vars)
        # Implementation details...
        return new_factor
    
    # Order variables for elimination
    elimination_order = get_elimination_order(factors, query_var)
    
    # Eliminate variables one by one
    for var in elimination_order:
        # Multiply all factors containing var
        relevant_factors = [f for f in factors if var in f.variables]
        if not relevant_factors:
            continue
            
        product = relevant_factors[0]
        for f in relevant_factors[1:]:
            product = multiply_factors(product, f)
            
        # Sum out variable
        new_factor = sum_out_variable(product, var)
        
        # Update factor list
        factors = [f for f in factors if f not in relevant_factors]
        factors.append(new_factor)
    
    return normalize(multiply_all_factors(factors))

2. Belief Propagation

class BeliefPropagation:
    def __init__(self, graph):
        self.graph = graph
        self.messages = {}
        self.beliefs = {}
        
    def init_messages(self):
        """Initialize messages to uniform distributions"""
        for edge in self.graph.edges():
            self.messages[edge] = np.ones(2) / 2
            self.messages[edge[::-1]] = np.ones(2) / 2
            
    def update_message(self, i, j):
        """Update message from node i to node j"""
        neighbors = list(self.graph.neighbors(i))
        neighbors.remove(j)
        
        msg = np.ones(2)
        for x_i in [0, 1]:
            for x_j in [0, 1]:
                # Compute product of incoming messages
                product = 1
                for n in neighbors:
                    product *= self.messages[(n, i)][x_i]
                
                # Multiply by local potential
                potential = self.graph.potentials[(i, j)](x_i, x_j)
                msg[x_j] *= potential * product
                
        # Normalize message
        msg /= np.sum(msg)
        self.messages[(i, j)] = msg
        
    def run_belief_propagation(self, max_iter=100, tol=1e-6):
        """Run belief propagation algorithm"""
        self.init_messages()
        
        for iteration in range(max_iter):
            old_messages = self.messages.copy()
            
            # Update all messages
            for edge in self.graph.edges():
                self.update_message(*edge)
                self.update_message(*edge[::-1])
            
            # Check convergence
            diff = max(np.max(np.abs(old_messages[e] - self.messages[e]))
                      for e in self.messages)
            if diff < tol:
                break
                
        # Compute final beliefs
        self.compute_beliefs()
        
    def compute_beliefs(self):
        """Compute beliefs for all nodes"""
        for node in self.graph.nodes():
            belief = np.ones(2)
            for neighbor in self.graph.neighbors(node):
                belief *= self.messages[(neighbor, node)]
            belief /= np.sum(belief)
            self.beliefs[node] = belief

Learning Methods

1. Maximum Likelihood Estimation

def mle_parameters(data, structure):
    """Maximum likelihood estimation of parameters"""
    parameters = {}
    
    for node in structure.nodes():
        parents = structure.get_parents(node)
        if not parents:
            # Estimate prior probability
            counts = np.bincount(data[node])
            parameters[node] = counts / len(data)
        else:
            # Estimate conditional probabilities
            parent_configs = get_parent_configurations(data, parents)
            for config in parent_configs:
                mask = get_config_mask(data, parents, config)
                counts = np.bincount(data[node][mask])
                parameters[(node, config)] = counts / len(mask)
    
    return parameters

2. Structure Learning

def score_structure(data, structure, score_type='bic'):
    """Score a network structure"""
    def compute_bic(data, structure):
        """Compute BIC score"""
        ll = log_likelihood(data, structure)
        n_params = count_parameters(structure)
        n_samples = len(data)
        return ll - 0.5 * n_params * np.log(n_samples)
    
    if score_type == 'bic':
        return compute_bic(data, structure)
    # Add other scoring functions as needed
    
def structure_learning(data, score_type='bic'):
    """Learn network structure from data"""
    current_structure = initialize_structure()
    current_score = score_structure(data, current_structure, score_type)
    
    while True:
        # Try possible edge additions/deletions/reversals
        operations = get_valid_operations(current_structure)
        best_operation = None
        best_score = current_score
        
        for op in operations:
            new_structure = apply_operation(current_structure, op)
            score = score_structure(data, new_structure, score_type)
            
            if score > best_score:
                best_score = score
                best_operation = op
        
        if best_operation is None:
            break
            
        current_structure = apply_operation(
            current_structure, best_operation)
        current_score = best_score
    
    return current_structure

Advanced Topics

1. Dynamic Bayesian Networks

class DynamicBayesianNetwork:
    def __init__(self, transition_model, emission_model):
        self.transition_model = transition_model
        self.emission_model = emission_model
        
    def forward_algorithm(self, observations):
        """Implement forward algorithm for filtering"""
        T = len(observations)
        n_states = self.transition_model.shape[0]
        alpha = np.zeros((T, n_states))
        
        # Initialize
        alpha[0] = self.emission_model[:, observations[0]]
        
        # Forward pass
        for t in range(1, T):
            for j in range(n_states):
                alpha[t, j] = self.emission_model[j, observations[t]] * \
                             np.sum(alpha[t-1] * self.transition_model[:, j])
                
        return alpha

2. Factor Graphs

class FactorGraph:
    def __init__(self):
        self.var_nodes = set()
        self.factor_nodes = set()
        self.edges = set()
        
    def add_variable(self, var):
        self.var_nodes.add(var)
        
    def add_factor(self, factor, connected_vars):
        self.factor_nodes.add(factor)
        for var in connected_vars:
            self.edges.add((factor, var))
            
    def get_neighboring_factors(self, var):
        return {f for f, v in self.edges if v == var}
        
    def get_neighboring_variables(self, factor):
        return {v for f, v in self.edges if f == factor}

Best Practices

  1. Model Selection

    • Choose appropriate graph structure
    • Consider domain knowledge
    • Balance model complexity
    • Validate assumptions
  2. Inference

    • Select suitable inference method
    • Consider computational constraints
    • Handle numerical stability
    • Implement approximations when needed
  3. Learning

    • Handle missing data
    • Use regularization
    • Cross-validate parameters
    • Monitor convergence
  4. Evaluation

    • Use multiple metrics
    • Test predictive performance
    • Validate causal relationships
    • Consider interpretability